Spring-Boot + Zookeeper(Curator)实现分布式锁

Curator简介

Apache Curator是Netflix公司开源的一个Zookeeper客户端,目前已经是Apache的顶级项目,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量,通过封装的一套高级API,里面提供了更多丰富的操作,例如session超时重连、主从选举、分布式计数器、分布式锁等等适用于各种复杂场景的zookeeper操作。
本文介绍如果通过Zookeeper实现一个分布式锁。

代码结构

核心依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>

配置文件

1
2
3
4
5
6
7
8
9
10
11
server.port=8012
#重试次数
curator.retryCount=5
#重试间隔时间
curator.elapsedTimeMs=5000
# zookeeper 地址
curator.connectString=10.101.38.213:8181
# session超时时间
curator.sessionTimeoutMs=60000
# 连接超时时间
curator.connectionTimeoutMs=5000

初始化ZK-Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.austin.brant.zk.demo.config;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* curator配置
*
* @author austin-brant
* @since 2019/7/12 17:03
*/
@Configuration
public class CuratorConfiguration {

@Value("${curator.retryCount}")
private int retryCount;

@Value("${curator.elapsedTimeMs}")
private int elapsedTimeMs;

@Value("${curator.connectString}")
private String connectString;

@Value("${curator.sessionTimeoutMs}")
private int sessionTimeoutMs;

@Value("${curator.connectionTimeoutMs}")
private int connectionTimeoutMs;

@Bean(name = "curatorFramework", initMethod = "start")
public CuratorFramework curatorFramework() {
return CuratorFrameworkFactory.newClient(
connectString,
sessionTimeoutMs,
connectionTimeoutMs,
new RetryNTimes(retryCount, elapsedTimeMs)
);
}

}

分布式锁实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package com.austin.brant.zk.demo.utils;

import java.util.concurrent.CountDownLatch;

import javax.annotation.Resource;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;

/**
* 基于zk的分布锁实现
*
* @author austin-brant
* @since 2019/7/12 17:17
*/
@Slf4j
@Service
public class DistributedLockByZk implements InitializingBean {

private final static String ROOT_PATH_LOCK = "rootlock";
private CountDownLatch countDownLatch = new CountDownLatch(1);

@Resource(name = "curatorFramework")
private CuratorFramework curatorFramework;

/**
* 获取分布式锁
*/
public void acquireDistributedLock(String path) {
String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
while (true) {
try {
curatorFramework
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL) // 临时节点
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(keyPath);
log.info("success to acquire lock for path:{}", keyPath);
break;
} catch (Exception e) {
log.info("failed to acquire lock for path:{}", keyPath);
log.info("while try again .......");
if (countDownLatch.getCount() <= 0) {
countDownLatch = new CountDownLatch(1);
}
try {
// 阻塞等待锁释放,重新获取
countDownLatch.wait();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}

/**
* 释放分布式锁
*/
public boolean releaseDistributedLock(String path) {
String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
try {
if (curatorFramework.checkExists().forPath(keyPath) != null) {
curatorFramework.delete().forPath(keyPath);
}
} catch (Exception e) {
log.error("failed to release lock");
return false;
}
return true;
}

/**
* 创建 watcher 事件
*/
private void addWatcher(String path) throws Exception {
String keyPath;
if (path.equals(ROOT_PATH_LOCK)) {
keyPath = "/" + path;
} else {
keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
}

final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
String oldPath = event.getData().getPath();
log.info("success to release lock for path:{}", oldPath);
if (oldPath.contains(path)) {
//释放计数器,让当前的请求获取锁
countDownLatch.countDown();
}
}
}
});
}

/**
* 初始化创建永久父节点
*/
@Override
public void afterPropertiesSet() {
curatorFramework = curatorFramework.usingNamespace("lock-namespace");
String path = "/" + ROOT_PATH_LOCK;
try {
if (curatorFramework.checkExists().forPath(path) == null) {
curatorFramework
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path);
}
addWatcher(ROOT_PATH_LOCK);
log.info("root path 的 watcher 事件创建成功");
} catch (Exception e) {
log.error("connect zookeeper fail,please check the log >> {}", e.getMessage(), e);
}
}
}

完整代码

https://github.com/austin-brant/zookeeper-spring-boot

Curator使用参考

Curator包含了几个包:

  • curator-framework:对zookeeper的底层api的一些封装
  • curator-client:提供一些客户端的操作,例如重试策略等
  • curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼容性问题,很有可能导致节点操作失败):

Curator使用详解参考Zookeeper客户端Curator使用详解