分布式架构——第3篇:ZooKeeper第三方客户端工具包zkClient

zkClient是GitHub上的开源项目,解决了watcher的一次性注册问题,将znode的事件重新定义为子节点的变化数据的变化连接以及状态的变化三类,由zkClient统一将watcher的WatcherEvent转换到以上三种情况中去处理,watcher执行后重新读取数据的同时,再注册相同的watcher。

生成jar包

zkclient在GitHub上只提供了源码,如果要使用它的话,需要自己将其制作成jar包。

  1. 创建一个java工程;
  2. 将git下来的src->main内容,复制粘贴进项目的src目录中;
  3. 将原来的zkClient项目依赖的lib全部引入进刚新建zkclient的项目;
  4. 生成jar包:Export->Java->JAR file->[命名](假设命名为zkclient.jar)。

zkClient开发

将生成的JAR file名字为zkclient.jar,以及ZooKeeper本来依赖的其他JAR全部导入新项目中,就可以开始开发了。 zkClient主要实现了三个订阅事件:

子节点的变化

zkClient.subscribeChildChanges(PATH, new IZkChildListener() {

@Override
public void handleChildChange(String arg0, List<String> arg1)
throws Exception {
System.out.println("handleChildChange");
}
});

数据的变化

包括数据被删除、数据改变。

zkClient.subscribeDataChanges(PATH, new IZkDataListener() {

@Override
public void handleDataDeleted(String arg0) throws Exception {
System.out.println("handleDataDeleted");
}

@Override
public void handleDataChange(String arg0, Object arg1) throws Exception {
System.out.println("handleDataChange");
}
});

连接以及状态的变化

包括状态改变、Session建立出错、新建Session。

zkClient.subscribeStateChanges(new IZkStateListener() {

@Override
public void handleStateChanged(KeeperState arg0) throws Exception {
System.out.println("handleStateChanged");
}

@Override
public void handleSessionEstablishmentError(Throwable arg0)
throws Exception {
System.out.println("handleSessionEstablishmentError");
}

@Override
public void handleNewSession() throws Exception {
System.out.println("handleNewSession");
}
});

附录

package org.byteparallel.zkclient;

import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher.Event.KeeperState;

public class ZooKeeperClient {

private static final String serverList = "192.168.1.102";
private static final String PATH = "/root";

public static void main(String[] args) {
ZkClient zkClient = new ZkClient(serverList);

zkClient.subscribeChildChanges(PATH, new IZkChildListener() {

@Override
public void handleChildChange(String arg0, List<String> arg1)
throws Exception {
System.out.println("handleChildChange");
}
});

zkClient.subscribeDataChanges(PATH, new IZkDataListener() {

@Override
public void handleDataDeleted(String arg0) throws Exception {
System.out.println("handleDataDeleted");
}

@Override
public void handleDataChange(String arg0, Object arg1) throws Exception {
System.out.println("handleDataChange");
}
});

zkClient.subscribeStateChanges(new IZkStateListener() {

@Override
public void handleStateChanged(KeeperState arg0) throws Exception {
System.out.println("handleStateChanged");
}

@Override
public void handleSessionEstablishmentError(Throwable arg0)
throws Exception {
System.out.println("handleSessionEstablishmentError");
}

@Override
public void handleNewSession() throws Exception {
System.out.println("handleNewSession");
}
});

if (!zkClient.exists(PATH)){
zkClient.createPersistent(PATH);
}
if (!zkClient.exists(PATH+"/child")){
zkClient.create(PATH+"/child", "child znode", CreateMode.EPHEMERAL);
}
List<String> children = zkClient.getChildren(PATH);
System.out.println(children.toString());
int childCount = zkClient.countChildren(PATH);
System.out.println(childCount);
zkClient.writeData(PATH+"/child", "hello everyone");
Object obj = zkClient.readData(PATH+"/child");
System.out.println(obj);
zkClient.delete(PATH+"/child");
}

}

References: [1] 大型分布式网站架构设计与实践.陈康贤著