Zookeeper-java客户端操作
wenking 3/14/2023 zookeeper
# 导入依赖
<project>
<properties>
<curator.framework.version>5.2.0</curator.framework.version>
<curator.recipes.version>5.2.0</curator.recipes.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.framework.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.recipes.version}</version>
</dependency>
</dependencies>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- framework为curator的核心框架,包含操作zk的一些基础API封装
- recipes 是基于zookeeper特性封装的一些方法,比如分布式锁、leader选举等
# 连接建立和销毁
public class CuratorAPITest {
public static void main(String[] args) {
CuratorFramework curator = CuratorFrameworkFactory
.builder()
.connectionTimeoutMs(10000) // 这个值要大于一个心跳时间,否则很有可能连接超时
.connectString("test.king.com:2181") // 直接用ip去连接zk会特别慢,原因是zk会用ip换域名;解决方案是在hosts中添加ip映射,然后用域名访问
/**
* RetryNTimes 指定最大重试次数
* RetryUntilElapsed 一直重试,直到达到规定时间
* RetryOneTime 只重试一次
* ExponentialBackoffRetry 指数退避重试 失败n次后 1000 * 2**(n-1) ms 后重试
*/
.retryPolicy(new ExponentialBackoffRetry(1000, 3)) // 指数退避重试: 连接失败后 1000 ms 重试,失败后n次 1000 * 2**(n-1) ms 后重试
.sessionTimeoutMs(150000)
.build();
curator.start();
try {
// api 操作
curatorOperation(curator);
} catch (Exception e) {
e.printStackTrace();
} finally {
curator.close();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 基本操作
# 创建节点
public class CuratorAPITest {
public static void curatorOperation(CuratorFramework curator) {
// acl 权限配置, 可选项
ArrayList<ACL> acls = Lists.newArrayList(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest("admin:123"))));
// 节点创建成功后会返回节点路径 /test
String nodePath = curator.create()
// 如果父节点不存在则创建
.creatingParentsIfNeeded()
// 节点类型
.withMode(CreateMode.PERSISTENT)
.withACL(acls, false)
// 节点路径[和数据]
.forPath("/test", "for test".getBytes(StandardCharsets.UTF_8));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 获取节点数据/信息
public class CuratorAPITest {
public static void curatorOperation(CuratorFramework curator) {
public static void curatorOperation(CuratorFramework curator) throws Exception {
// zk的授权在会话层面,因此当节点需要认证,在创建会话时就应该指定会话授权信息 .authorization("digest", "admin:123".getBytes(StandardCharsets.UTF_8))
Stat stat = new Stat();
byte[] bytes = curator.getData().storingStatIn(stat).forPath("/zk_test");
String s = new String(bytes);
System.out.println("结果:" + s);
System.out.println("节点信息:" + stat);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 设置节点
public class CuratorAPITest {
public static void curatorOperation(CuratorFramework curator) {
curator.setData()
// .withVersion() 可以使用乐观锁更新数据
.forPath("/zk_test", "for test2".getBytes(StandardCharsets.UTF_8));
byte[] bytes = curator.getData().forPath("/zk_test");
System.out.println("结果:" + new String(bytes));
}
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 获取子节点信息
public class CuratorAPITest {
public static void curatorOperation(CuratorFramework curator) {
List<String> children = curator.getChildren().forPath("/zk_test");
}
}
1
2
3
4
5
2
3
4
5
# 删除节点
public class CuratorAPITest {
public static void curatorOperation(CuratorFramework curator) {
curator.delete().forPath(/zk_test);
}
}
1
2
3
4
5
2
3
4
5
# 判断节点是否存在
public class CuratorAPITest {
public static void curatorOperation(CuratorFramework curator) {
curator.checkExists().forPath(/zk_test);
}
}
1
2
3
4
5
2
3
4
5
# 异步操作
public class CuratorAPITest {
public static void curatorOperation(CuratorFramework curator) {
curator.create().creatingParentsIfNeeded().inBackground((session, event) -> {
String path = event.getPath();
}).forPath("/zk_test");
}
}
1
2
3
4
5
6
7
2
3
4
5
6
7
# Watch监听机制
3.6 版本之前,由于 watcher 只能监听一次变化,因此如果需要重复监听,需要在watcher执行完成后再次进行watcher注册
public class CuratorAPITest {
public static void curatorOperation(CuratorFramework curator) throws Exception {
CuratorWatcher watcher = new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
System.out.println("监听到的事件" + event.toString());
// 3.6之前,由于zk watcher是一次性的,因此监听到数据变化之后需要重新进行注册
curator.checkExists().usingWatcher(this).forPath(event.getPath());
}
};
String path = "/watcher1";
// 创建节点
curator.create().forPath(path, "set date 0".getBytes(StandardCharsets.UTF_8));
// 注册事件监听
curator.getData().usingWatcher(watcher).forPath(path);
// 第一次修改数据
curator.setData().forPath(path, "set date 1".getBytes(StandardCharsets.UTF_8));
// 确保 watcher 执行完成,并且重新注册 watcher 成功
Thread.sleep(1000);
// 第二次修改数据
curator.setData().forPath(path, "set date 2".getBytes(StandardCharsets.UTF_8));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
监听机制的注意事项
使用不同的命令进行监听,可触发的事件类型是不一样的;
- 使用
getData
,checkExists
命令进行 watcher 配置,会监听到:NodeCreated、NodeDeleted、NodeDataChanged事件 - 使用
getChildren
命令进行 watcher 配置,会监听到:NodeChildrenChanged 事件
public class CuratorAPITest {
public static void curatorOperation(CuratorFramework curator) {
CountDownLatch latch = new CountDownLatch(1);
CuratorWatcher watcher = new CuratorWatcher() {
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
System.out.println("watcher监听到的事件类型" + watchedEvent.getType());
System.out.println("watcher监听到的节点路径" + watchedEvent.getPath());
System.out.println("监听到的节点状态" + watchedEvent.getState());
latch.countDown();
}
};
// curator.checkExists().usingWatcher(watcher).forPath("/w/asd");
curator.getData().usingWatcher(watcher).forPath("/w/asd");
latch.await();
System.out.println("事件监听结束");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20