Apache Zookeeper 学习:Zookeeper实现统一配置管理中心

Apache Zookeeper 学习:模拟三台节点组成的Zookeeper集群实现的统一配置管理中心

模拟Zookeeper集群

架构图

Zookeeper集群架构

创建zookeeper集群节点

模拟三台节点组成的zookeeper集群,需要在本机zookeeper目录下
创建三个zookeeper集群节点配置文件

1
2
3
4
$ cd conf/
$ cp zoo_sample.cfg zoo1.cfg
$ cp zoo_sample.cfg zoo2.cfg
$ cp zoo_sample.cfg zoo3.cfg

创建所配置的各个文件夹

1
2
3
4
5
6
7
8
9
$ mkdir /tmp/zookeeper1
$ mkdir /tmp/zookeeper1/data
$ mkdir /tmp/zookeeper1/dataLog
$ mkdir /tmp/zookeeper2
$ mkdir /tmp/zookeeper2/data
$ mkdir /tmp/zookeeper2/dataLog
$ mkdir /tmp/zookeeper3
$ mkdir /tmp/zookeeper3/data
$ mkdir /tmp/zookeeper3/dataLog

/tmp/zookeeperX/data文件夹下创建myid文件

1
2
3
$ echo 1 > /tmp/zookeeper1/data/myid
$ echo 2 > /tmp/zookeeper2/data/myid
$ echo 3 > /tmp/zookeeper3/data/myid

配置zookeeper节点信息

zoo1.cfg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper1/data
dataLogDir=/tmp/zookeeper1/dataLog
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

# 格式:server.num=xxxx:port1:port2
# num对应myid中的内容,port1是zookeeper集群中各服务间的通信端口,port2是zookeeper集群选举leader的端口
server.1=localhost:2888:3888
server.2=localhost:2899:3899
server.3=localhost:2877:3877

zoo2.cfg

1
2
3
4
5
6
7
8
9
10
11
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper2/data
dataLogDir=/tmp/zookeeper2/dataLog
# the port at which the clients will connect
clientPort=2182
...
server.1=localhost:2888:3888
server.2=localhost:2899:3899
server.3=localhost:2877:3877

zoo3.cfg

1
2
3
4
5
6
7
8
9
10
11
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper3/data
dataLogDir=/tmp/zookeeper3/dataLog
# the port at which the clients will connect
clientPort=2183
...
server.1=localhost:2888:3888
server.2=localhost:2899:3899
server.3=localhost:2877:3877

配置文件中dataDir,dataLogDir,clientPort,三个zookeeper节点配置信息都不同
搭建zookeeper集群,需要在每个zookeeper安装目录下的data文件中创建名为myid的文件,修改zooX.cfg内容如下:

1
2
3
server.1=xxx:2888:3888
server.2=xxx:2899:3899
server.3=xxx:2877:3877

格式:server.num=xxxx:port1:port2
num对应myid中的内容,port1是zookeeper集群中各服务间的通信端口,port2是zookeeper集群选举leader的端口

启动模拟集群节点

1
2
3
4
5
6
7
8
9
10
11
12
$ ./zkServer.sh start ../conf/zoo1.cfg
ZooKeeper JMX enabled by default
Using config: ../conf/zoo1.cfg
Starting zookeeper ... STARTED
$ ./zkServer.sh start ../conf/zoo2.cfg
ZooKeeper JMX enabled by default
Using config: ../conf/zoo2.cfg
Starting zookeeper ... STARTED
$ ./zkServer.sh start ../conf/zoo3.cfg
ZooKeeper JMX enabled by default
Using config: ../conf/zoo3.cfg
Starting zookeeper ... STARTED

查看集群状态

1
2
3
4
5
6
7
8
9
10
11
12
$ ./zkServer.sh status ../conf/zoo1.cfg
ZooKeeper JMX enabled by default
Using config: ../conf/zoo1.cfg
Mode: follower
$ ./zkServer.sh status ../conf/zoo2.cfg
ZooKeeper JMX enabled by default
Using config: ../conf/zoo2.cfg
Mode: leader
$ ./zkServer.sh status ../conf/zoo3.cfg
ZooKeeper JMX enabled by default
Using config: ../conf/zoo3.cfg
Mode: follower

创建zookeeper集群client

创建监听节点变化的server(zookeeper集群client)

模拟监听节点变化server,启动两个BaseWatcher程序作为监听server(对zookeeper集群来说是client)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public class BaseWatcher implements Watcher {

private static ZooKeeper zookeeper;

/**
* 超时时间
*/
private static final int SESSION_TIME_OUT = 2000;

private static CountDownLatch defaultCountDownLatch = new CountDownLatch(1);

private static CountDownLatch childrenCountDownLatch = new CountDownLatch(1);

private static CountDownLatch dataCountDownLatch = new CountDownLatch(1);

@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
LOGGER.info("Watch received event");
defaultCountDownLatch.countDown();
}
if (event.getType() == EventType.NodeCreated) {
LOGGER.info("创建节点");
}
if (event.getType() == EventType.NodeDataChanged) {
LOGGER.info("节点改变");
}
if (event.getType() == EventType.NodeChildrenChanged) {
LOGGER.info("子节点节点改变");
}
if (event.getType() == EventType.NodeDeleted) {
LOGGER.info("节点删除");
}
}

/**
* 连接zookeeper
* @param host
* @throws Exception 
*/
public static void connectZookeeper(String host, Watcher defaultWatcher) throws Exception {
zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, defaultWatcher);
defaultCountDownLatch.await();
LOGGER.info("zookeeper connection success");
}

/**
* 获取路径下所有子节点
* @param path
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public static List<String> getChildren(String path, Watcher childrenWatcher) throws KeeperException, InterruptedException {
return zookeeper.getChildren(path, childrenWatcher);
}

/**
* 获取节点上面的数据
* @param path  路径
* @return
* @throws KeeperException
* @throws InterruptedException 
*/
public static String getData(String path, Watcher dataWatcher) throws KeeperException, InterruptedException {
byte[] data = zookeeper.getData(path, dataWatcher, null);
if (data == null) {
return "";
}
return new String(data);
}

/**
* 关闭连接
* @throws InterruptedException
*/
public static void closeConnection() throws InterruptedException {
if (zookeeper != null) {
zookeeper.close();
}
}

private static final String HOST = "localhost:2181,localhost:2182,localhost:2183";

private static final Logger LOGGER = LoggerFactory.getLogger(BaseWatcher.class);

public static void main(String[] args) throws Exception {

/**
* 运行程序之前需要启动zookeeper服务端,{@link BaseWatcher.HOST} 根据zookeeper服务端具体配置去修改
*
* 除了默认watcher外其他watcher一旦触发就会失效,需要充新注册,本示例中因为
* 还未想到比较好的重新注册watcher方式(考虑到如果在Watcher中持有一个zk客户端的
* 实例可能存在循环引用的问题),因此暂不实现watcher失效后重新注册watcher的问题,
* 后续可以查阅curator重新注册watcher的实现方法。
*/

BaseWatcher defaultWatcher = new BaseWatcher();

// 连接zookeeper并设置一个默认的watcher监听zookeeper文件节点的变化
connectZookeeper(HOST, defaultWatcher);
TimeUnit.SECONDS.sleep(1000000);
}

}

创建改变节点数据的server(zookeeper集群client)

如果一个节点向被监听节点中写数据,其他节点就会接受到zookeeper的 NodeDataChanged event

模拟改变数据server,启动一个BaseWatcher程序作为改变数据server(对zookeeper集群来说是client)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws Exception {
/**
* 运行程序之前需要启动zookeeper服务端,{@link BaseWatcher.HOST} 根据zookeeper服务端具体配置去修改
*
* 除了默认watcher外其他watcher一旦触发就会失效,需要充新注册,本示例中因为
* 还未想到比较好的重新注册watcher方式(考虑到如果在Watcher中持有一个zk客户端的
* 实例可能存在循环引用的问题),因此暂不实现watcher失效后重新注册watcher的问题,
* 后续可以查阅curator重新注册watcher的实现方法。
*/

BaseWatcher defaultWatcher = new BaseWatcher();

// 连接zookeeper并设置一个默认的watcher监听zookeeper文件节点的变化
connectZookeeper(HOST, defaultWatcher);

// 向/GetChildren节点写数据之前需要先创建此文件夹
// 向/GetChildren节点写数据,则监听程序就会收到zookeeper的 [NodeDataChanged] event
setData("/GetChildren", "8");
TimeUnit.SECONDS.sleep(1000000);
}

运行结果

两个监听程序收到zookeeper的 NodeDataChanged event,log如下

1
2
3
4
11:24:27.295 [main-SendThread(localhost:2183)] DEBUG org.apache.zookeeper.ClientCnxn - Got notification sessionid:0x300001251b50003
11:24:27.297 [main-SendThread(localhost:2183)] DEBUG org.apache.zookeeper.ClientCnxn - Got WatchedEvent state:SyncConnected type:NodeDataChanged path:/GetChildren for sessionid 0x300001251b50003
11:24:27.297 [main-EventThread] INFO com.github.xxx.bigdata.demo.zookeeper.BaseWatcher - Watch received event
11:24:27.297 [main-EventThread] INFO com.github.xxx.bigdata.demo.zookeeper.BaseWatcher - 节点改变