分布式架构——第11篇:ZooKeeper路由与负载均衡的实现

以前,写过一篇《分布式架构——第3篇:ZooKeeper第三方客户端工具包zkClient》。本文,同样依赖第三方包zkClient.jar,利用ZooKeeper Watcher机制实时监控服务器的接入与宕机情况。服务配置分为三层,最上层为根节点configcenter,用来聚集所有服务节点Server-AServer-B、…、Server-Z等一系列服务,通过根节点可以查询到所有服务。而通过服务名(如Server-B)可以查询到所有服务具体提供者的IP地址(如192.168.1.2),这时候消费者通过负载均衡算法选择其中一个IP地址发起请求。

服务提供者

服务提供者将创建永久节点(createPersistent)根节点configcenter、服务节点Server-B

private void provider() {
boolean rootExists = zkClient.exists(PATH);
if (!rootExists){
zkClient.createPersistent(PATH);
}
boolean serverExists = zkClient.exists(SERVICE_PATH);
if (!serverExists){
zkClient.createPersistent(SERVICE_PATH);
}
}

服务消费者

服务消费者需要实时监听Server-B服务下的具体提供者接入、宕机情况,同时更新当前可用服务器列表。

private void consumer(){
boolean serviceExists = zkClient.exists(SERVICE_PATH);
if (serviceExists){
serverList = zkClient.getChildren(SERVICE_PATH);
System.out.println(serverList);
}else {
throw new RuntimeException("server not exist!");
}
zkClient.subscribeChildChanges(SERVICE_PATH, new IZkChildListener() {

@Override
public void handleChildChange(String arg0, List<String> arg1)
throws Exception {
serverList = arg1;
System.out.println(arg1);
}
});
}

接入一台服务器

这里在Server-B服务名下,接入一台服务器(注册为临时节点createEphemeral,随时准备牺牲),那么消费者会立马收到服务器被增加的通知。

private void providerA() throws Exception{
String ip = InetAddress.getLocalHost().getHostAddress().toString();
zkClient.createEphemeral(SERVICE_PATH+"/"+ip);
}

附录

package org.nt.zkclient;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

public class LoadBalancing {
List<String> serverList = null;
String serverName = "service-B";

String PATH = "/configcenter";
String SERVICE_PATH = "/configcenter/"+serverName;

String zkServerList = "192.168.1.10:2181,192.168.1.11:2181,192.168.12:2181";

ZkClient zkClient = new ZkClient(zkServerList);

private void consumer(){
boolean serviceExists = zkClient.exists(SERVICE_PATH);
if (serviceExists){
serverList = zkClient.getChildren(SERVICE_PATH);
System.out.println(serverList);
}else {
throw new RuntimeException("server not exist!");
}
zkClient.subscribeChildChanges(SERVICE_PATH, new IZkChildListener() {

@Override
public void handleChildChange(String arg0, List<String> arg1)
throws Exception {
serverList = arg1;
System.out.println(arg1);
}
});
}

private void provider() {
boolean rootExists = zkClient.exists(PATH);
if (!rootExists){
zkClient.createPersistent(PATH);
}
boolean serverExists = zkClient.exists(SERVICE_PATH);
if (!serverExists){
zkClient.createPersistent(SERVICE_PATH);
}
}

private void providerA() throws UnknownHostException, InterruptedException{
String ip = InetAddress.getLocalHost().getHostAddress().toString();
zkClient.createEphemeral(SERVICE_PATH+"/"+ip);
}

public static void main(String[] args) throws Exception {
LoadBalancing lb = new LoadBalancing();
lb.provider();
lb.consumer();
lb.providerA();
Thread.sleep(1000);
}

}

References:
[1] 分布式架构——第2篇:ZooKeeper API实战
[2] 分布式架构——第3篇:ZooKeeper第三方客户端工具包zkClient
[3] 大型分布式网站架构设计与实践.陈康贤著
[4] https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java