1. Zookeeper概述

1.1 简介


Zookeeper是Apache软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。

1.2 架构

通过冗余服务实现高可用性。

1.3 设计目标

将复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接口提供给用户使用。

1.4 功能特性

一个典型的分布式数据一致性的解决方案,分布式应用程序可以直接基于它实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、分布式锁和分布式队列等功能。

1.5 CAP理论

对于一个分布式系统来说,以下三点不能同时满足:

  • Consistency:一致性,一个节点在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。
  • Availability:可用性,每次请求都能获取到正确的响应,但是不保证获取的数据为最新数据
  • Partion tolerance:分区容错性,分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非整个网络环境都发生了故障。

三大古老的注册中心对比

组件名 语言 CAP 服务健康检查 对外暴露接口 Spring Cloud集成
Eureka Java AP 可配支持 HTTP 已集成
Consul Go CP 支持 HTTP/DNS 已集成
Zookeeper Java CP 支持 客户端 已集成

1.6 BASE理论

BASE理论是以下三个短语的缩写:

  • Basically Available:基本可用,在分布式系统出现故障,允许损失部分可用性(服务降级)
  • Soft-state:软状态,允许分布式系统出现中间状态,而且中间状态不影响系统的可用性,即允许系统不同节点的数据副本之间进行同步的过程存在时延。
  • Eventually Consistent:最终一致性,系统中所有的数据副本,在经过一段时间的同步后,最终能达到一致的状态。

BASE理论是对CAP中一致性和可用性进行一个权衡的结果,核心思想:无法做到强一致,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性。

2. Zookeeper安装

注意

3.4版本是基于jdk8构建的,3.5版本之后是基于jdk11构建的。

2.1 Linux安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 下载jdk8
$ yum install -y java-1.8.0-openjdk-devel.x86_64
# 配置环境变量
$ vim /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64/jre/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
# 使配置生效
source /etc/profile
# 下载源码
$ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
# 解压安装包
$ tar -zxvf zookeeper-3.4.14.tar.gz
# 修改配置
$ cd conf/ && cp zoo_sample.cfg zoo.cfg
# 启动服务端
$ cd ../bin/ && sh zkServer.sh start

2.2 Docker安装

1
2
3
4
# 拉取镜像
$ docker pull zookeeper:3.4.14
# 启动服务
$ docker run -d -p 2181:2181 -d --name zookeeper zookeeper:3.4.14

3. Zookeeper数据模型

3.1 模型结构


3.2 模型特点

  • 每个子目录如/app1都被称作一个znode(节点),这个znode是被它所在的路径唯一标识
  • znode可以有子节点目录,并且每个znode可以存储数据
  • znode是有版本的,每个znode中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
  • znode可以被监控,包括这个目录中存储的数据修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端

3.3 节点分类

(1)持久节点(PERSISTENT)

是指在节点创建后,就一直存在,直到有删除操作来主动删除这个节点——不会因为创建该节点的客户端会话失效而消失。

(2)持久顺序节点(PERSISTENT_SEQUENTIAL)

这类节点的基本特性和上面的节点类型是一致的。额外的特性是,在ZK中,每个父节点会为它的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点名加上一个数字后缀,作为新的节点名。这个数字后缀的范围是整形的最大值。

(3)临时节点(EPHEMERAL)

和持久化节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非连接断开。另外,在临时节点下面不能创建子节点。

(4)临时顺序节点(EPHEMERAL SEQUETIAL)

具有临时节点特点,额外的特性是,每个父节点会为它的第一级子节点维护一份时序,这点和刚才提到的持久顺序节点类似。

4. zookeeper配置文件

4.1 zoo.cfg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60

# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

## Metrics Providers
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

4.2 参数说明

参数 说明
tickTime 集群服务器节点之间或者服务器与客户端之间维持心跳的时间间隔,即每隔tickTime会发生心跳包,时间单位为ms,并且最小session超时时间为2 * tickTime。
initLimit 初始化集群时Leader与Follower之间的最多心跳数,限定Follower连接到Leader的时限为initLimit * tickTime。
syncLimit 集群运行时Leader与Follower之间同步的最大响应时间单位,即响应时间超过syncLimit * tickTime,Leader认为Follower死亡,从服务器列表删除Follower。
dataDir 数据存储位置。
clientPort 服务监听端口。
maxClientCnxns 最大客户端连接数量。
autopurge.snapRetainCount 快照保存数量,到达这个数量自动进行快照合并。
autopurge.purgeInterval 快照清理频率,单位为小时,即这个时间间隔内快照数量到达上述指定数量,就进行快照合并。

5. ZooKeeper基本指令

进入docker内部客户端

1
2
$ docker exec -it zookeeper bash
$ ./bin/zkCli.sh

5.1 基本指令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ ls path                 # 查看特定节点下面的子节点

$ create path data # 创建一个节点,并给节点绑定数据(默认是持久性节点)
$ create -s path data # 创建持久性顺序节点
$ create -e path data # 创建临时性节点(临时性节点不能含有任何子节点)
$ create -e -s path data # 创建临时性顺序节点(临时性节点不能含有任何子节点)

$ stat path # 查看节点状态
$ set path data # 修改节点数据
$ ls2 path # 查看节点下孩子和当前节点的状态
$ history # 查看操作历史
$ get path # 获得节点上绑定的数据信息
$ delete path # 删除节点(删除节点不能含有子节点)
$ rmr path # 递归删除节点(会将当前节点下所有节点删除)
$ quit # 退出当前会话

5.2 stat结果详解

1
2
3
4
5
6
7
8
9
10
11
1) czxid - 创建节点的事务的zxid
2) ctime - 创建节点的毫秒数(从1970年开始)
3) mzxid - 更新节点的事务zxid
4) mtime - 最后更新的毫秒数(从1970年开始)
5) pZxid - 最后更新子节点的事务zxid
6) cversion - 子节点变化号,子节点修改次数
7) dataversion - 数据变化号
8) aclVersion - 访问控制列表的变化号
9) ephemeralOwner - 如果是临时节点,这个是znode拥有者的session id;如果不是临时节点则是0
10) dataLength - 数据长度
11) numChildren - 子节点数量

5.3 节点监听机制

客户端可以监测znode节点的变化,znode节点的变化触发相应的事件,然后清除对该节点的监测。当监测一个znode节点时候,zookeeper会发送通知给监测节点。一个watch事件是一个一次性的触发器,当被设置了watch的数据获取目录发生了改变的时候,则服务器将这个改变发送给设置了watch的客户端以便通知它们。

1
2
$ ls /path true            # 监听节点目录的变化
$ get /path true # 监听节点数据的变化

6. Zookeeper集群搭建

6.1 docker搭建伪集群


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 查看最小容器ip
$ docker network inspect bridge
# 创建三个zk的ip分别在这个最小的ip上加上1,2,3
# zk1: 172.17.0.7
# zk2: 172.17.0.8
# zk3: 172.17.0.9

# 创建集群的挂载目录
$ mkdir -p /opt/zookeeper/cluster/
# 创建配置和数据目录
$ mkdir -p node1/conf node1/data node2/conf node2/data node3/conf node3/data
# 创建配置文件和myid
$ touch node1/conf/zoo.cfg node2/conf/zoo.cfg node3/conf/zoo.cfg node1/data/myid node2/data/myid node3/data/myid
# 编辑配置文件
# 三个文件相同,内容如下
dataDir=/data
dataLogDir=/data/log
tickTime=2000
initLimit=5
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=0
maxClientCnxns=60
standaloneEnabled=true
admin.enableServer=true
4lw.commands.whitelist=*
clientPort=2181
server.1=172.17.0.7:2888:3888
server.2=172.17.0.8:2888:3888
server.3=172.17.0.9:2888:3888
# 编辑myid
$ echo "1" >> node1/data/myid
$ echo "2" >> node2/data/myid
$ echo "3" >> node3/data/myid

# 启动三个容器
$ docker run -d -p 2182:2181 -v /opt/zookeeper/cluster/node1/conf:/conf -v /opt/zookeeper/cluster/node1/data:/data --name zk1 zookeeper:3.4.14
$ docker run -d -p 2183:2181 -v /opt/zookeeper/cluster/node2/conf:/conf -v /opt/zookeeper/cluster/node2/data:/data --name zk2 zookeeper:3.4.14
$ docker run -d -p 2184:2181 -v /opt/zookeeper/cluster/node3/conf:/conf -v /opt/zookeeper/cluster/node3/data:/data --name zk3 zookeeper:3.4.14

6.2 查看节点znode状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ docker exec -it zk<i> bash
$ ./bin/zkServer.sh status

# zk1
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: follower
# zk2
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: follower
# zk3
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: leader

可以看到leader结点为zk3,zk1和zk2为follower节点。

7. Java客户端操作

7.1 引入依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>

7.2 日志配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
log4j.rootLogger = INFO,CONSOLE,FILE,HIGHNESS,

# 输出日志到控制台
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.CONSOLE.Target=System.out
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss.SSS} HIGHNESS %-5p [%c] [%t] [%F:%L]- %m%n

# 输出日志到文件
log4j.appender.FILE=org.apache.log4j.FileAppender
log4j.appender.FILE.File=log/project.log
log4j.appender.FILE.Append=false
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.ConversionPattern=%d HIGHNESS %-5p [%c] - %m%n

# 每日一个日志文件
log4j.appender.HIGHNESS=org.apache.log4j.DailyRollingFileAppender
# 日志最低的输出级别
log4j.appender.HIGHNESS.Threshold=INFO
# 日志日期格式
log4j.appender.HIGHNESS.DatePattern='_'yyyy-MM-dd
# 日志编码格式
log4j.appender.HIGHNESS.encoding=UTF-8
# 有日志时立即输出
log4j.appender.HIGHNESS.ImmediateFlush=true
# 日志文件的保存位置及文件名
log4j.appender.HIGHNESS.File=log/ProjectDaily.log
# 日志文件的最大大小
log4j.appender.HIGHNESS.maxFileSize=10KB
# 日志布局方式
log4j.appender.HIGHNESS.layout=org.apache.log4j.PatternLayout
# 日志文件中日志的格式
log4j.appender.HIGHNESS.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss.SSS} HIGHNESS %-5p [%c]- %m%n

7.3 搭建客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private final Logger log = LoggerFactory.getLogger(ZKClient.class);
private ZkClient zkClient;

@Before
public void before() {
// 服务器ip:port
// 会话超时时间
// 连接超时时间
// 序列化方式
zkClient = new ZkClient("192.168.117.155:2181", 6000 * 30, 6000, new SerializableSerializer());
}

@After
public void close() {
zkClient.close();
}

private static class User implements Serializable {
private final Integer id;
private final String name;
public User(Integer id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "User{" + "id=" + id + ", name='" + name + '\'' + '}';
}
}

7.4 创建节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test // 创建节点
public void create() {
// 1. 持久节点
String k1 = zkClient.create("/para1", "k1", CreateMode.PERSISTENT);
log.info("创建持久节点: {}", k1);
// 2. 持久顺序节点
String k2 = zkClient.create("/para2", "k2", CreateMode.PERSISTENT_SEQUENTIAL);
log.info("创建持久顺序节点: {}", k2);
// 3. 临时节点
String k3 = zkClient.create("/para3", "k3", CreateMode.EPHEMERAL);
log.info("创建临时节点: {}", k3);
// 4. 临时顺序节点
String k4 = zkClient.create("/para4", "k4", CreateMode.EPHEMERAL_SEQUENTIAL);
log.info("创建临时顺序节点:{}",k4);
}

7.5 查询节点数据

1
2
3
4
5
@Test // 查询节点数据
public void get() {
Object data = zkClient.readData("/para1");
log.info("读取/para1数据:{}", data);
}

7.6 查询节点状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test // 查询节点状态
public void stat() {
Stat stat = new Stat();
Object o = zkClient.readData("/para1", stat);
log.info("节点/para1状态:{}", stat);
log.info("创建节点的事务zxID:{}", stat.getCzxid());
log.info("创建毫秒数:{}", stat.getCtime());
log.info("更新节点的事务zxID:{}", stat.getMzxid());
log.info("更新毫秒数:{}", stat.getMtime());
log.info("数据长度: {}", stat.getDataLength());
log.info("访问控制列表变化号: {}", stat.getAversion());
log.info("更新子节点的事务zxID:{}", stat.getPzxid());
log.info("子节点的修改次数:{}", stat.getCversion());
log.info("子节点数量:{}", stat.getNumChildren());
}

7.7 修改节点数据

1
2
3
4
5
6
@Test // 修改节点数据
public void set() {
zkClient.writeData("/para1", new User(1, "KHighness"));
User user = zkClient.readData("/para1");
log.info("修改后的/para1: {}", user);
}

7.8 删除节点

1
2
3
4
5
@Test // 删除节点
public void delete() {
boolean delete = zkClient.delete("/para1");
log.info("delete /para1: {}", delete);
}

7.9 监听节点数据的变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test // 监听节点数据的变化,非一次性,永久监听
public void getTrue() throws IOException {
zkClient.subscribeDataChanges("/para1", new IZkDataListener() {
// nodeName:当前修改节点的名称,result:节点修改之后的数据
public void handleDataChange(String nodeName, Object result) throws Exception {
log.info("修改节点的名称:{}", nodeName);
log.info("修改后节点数据:{}", result);
}
// nodeName:当前修改节点的名称
public void handleDataDeleted(String nodeName) throws Exception {
log.info("删除节点的名称:{}", nodeName);
}
});
// 阻塞客户端
System.in.read();
}

7.10 监听节点目录的变化

1
2
3
4
5
6
7
8
9
10
11
12
@Test // 监听节点目录的变化,非一次性,永久监听
public void lsTrue() throws IOException {
zkClient.subscribeChildChanges("/para1", new IZkChildListener() {
// nodeName:当前修改节点的名称,list:发生修改的所有子节点名称
public void handleChildChange(String nodeName, List<String> list) throws Exception {
log.info("修改节点的名称:{}", nodeName);
log.info("发生修改的所有子节点名称:{}", list.toString());
}
});
// 阻塞客户端
System.in.read();
}

7.11 操作集群

1
2
3
4
5
6
7
@Test // 集群操作
public void cluster() {
// 可以将所有节点的ip:port都放入构造函数,中间用逗号隔开,不要加空格
ZkClient cluster = new ZkClient("192.168.117.155:2182,192.168.117.155:2183,192.168.117.155:2184");
Object o = zkClient.readData("/para1");
log.info("集群读取/para1: {}", o.toString());
}

暂时完结,后面分布式锁的话,待更~