一、ZooKeeper 简介
ZooKeeper 是一个集中式服务,用于维护配置信息,命名,提供分布式同步和提供组服务。
ZooKeeper 的主要应用:1、节点选举;2、配置文件统一管理;3、分布式锁;4、发布与订阅(Dubbo);5、集群管理,集群中保证数据的强一致性,下面我们主要讲配置文件统一管理和分布式锁。
Zookeeper文件系统
Zookeeper的每个子目录项如 NameService 都被称作为 znode,和文件系统一样,我们能够自由的增加、删除 znode,在一个 znode 下增加、删除子 znode,唯一的不同在于znode是可以存储数据的。
有四种类型的 znode:
1、PERSISTENT-持久化目录节点
客户端与 zookeeper 断开连接后,该节点依旧存在。
2、PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点
客户端与 zookeeper 断开连接后,该节点依旧存在,只是 zookeeper 给该节点名称进行顺序编号。
3、EPHEMERAL-临时目录节点
客户端与 zookeeper 断开连接后,该节点被删除。
4、EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
客户端与 zookeeper 断开连接后,该节点被删除,只是 zookeeper 给该节点名称进行顺序编号。
二、配置文件统一管理
1、实现思路
假如我们需要修改三(或者更多)台服务器上 redis.conf 的配置信息,如果一台一台的去修改,则会加大出错概率,而且也不实际。这时候,我们需要引入Zookeeper(下面简称zk),我们需要知道,zk 中有个 watcher 事件,包括 :
EventType:NodeCreated //节点创建
EventType:NodeDataChanged //节点的数据变更
EventType:NodeChildrentChanged //子节点下的数据变更
EventType:NodeDeleted // 节点删除
当我们监听了上面的事件时,事件触发就会被告知。以统一更新 redis.conf 配置文件为例,我们可以实现监听某一个节点的数据更新事件,当DBA更改了该节点的值(一般为 json 串,方便程序解析,例:{"type":"update","url":"ftp:192.168.2.10/config/redis.xml"}),此时我们可以根据 type 的值“update”可知,是需要更新 redis.conf 配置文件,然后根据 url 的值,获取最新的 redis.conf 文件所在的服务器地址。此时,我们可以下载最新配置文件,然后删除原来的 redis.conf 配置文件,最后将最新的配置文件添加到项目中,从而通过重启程序就可以读取到最新的配置了。
2、代码实现
这里我们模拟了三个客户端 Client1、Client2、Client3,代码都是一样的。当 zk 节点数据发送变化,就会触发数据更新的事件,从而告知其客户端(必须监听了该事件)。
package com.imooc.curator.operator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.util.StringUtils;
import java.util.concurrent.CountDownLatch;
/**
* 使用 zk 的 watch 事件,实现配置文件的统一配置
* @author K. L. Mao
* @create 2018/9/8
*/
public class Client1 {
public CuratorFramework client;
public static final String zkServerPath = "192.168.174.10:2181,192.168.174.11:2181,192.168.174.12:2181";
public static final String CONFIG_NODE_PATH = "/super/imooc";
public static final String SUB_PATH = "/redis-config";
public static CountDownLatch countDownLatch = new CountDownLatch(1);
/**
* 实例化 zk 客户端
*/
public Client1(){
/**
* curator 连接 zk 的策略:RetryNTimes
* n:重试次数
* sleepMsBetweenRetries:每次重试间隔的时间
*/
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
// 启动客户端连接
client.start();
}
/**
* 关闭 zk 客户端
*/
public void closeZKClient() {
if (client != null){
client.close();
}
}
public static void main(String[] args) throws Exception {
// 实例化
Client1 operator = new Client1();
System.out.println("client1 启动成功");
// 创建节点
// byte[] data = "super".getBytes();
// operator.client.create().creatingParentsIfNeeded()
// .withMode(CreateMode.PERSISTENT) // 持久化节点
// .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) // 全权限ACL
// .forPath(nodePath, data);
// watcher 事件 当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
// operator.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
// NodeCache:监听 CONFIG_NODE_PATH 下的子包数据节点的变更,会触发事件
final PathChildrenCache nodeCache = new PathChildrenCache(operator.client, CONFIG_NODE_PATH, true);
// 初始化的时候获取 node 的值并且缓存
nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
// 新增监听器
nodeCache.getListenable().addListener((client, event) -> {
// 监听节点更新
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
String configNodePath = event.getData().getPath();
if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)){
System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);
// 读取节点数据
String data = new String(event.getData().getData(), "UTF-8");
System.out.println("节点" + CONFIG_NODE_PATH + "的数据为" + data);
if (!StringUtils.isEmpty(data)){
JSONObject jsonObject = JSON.parseObject(data);
String type = jsonObject.getString("type");
String url = jsonObject.getString("url");
if ("add".equals(type)){
System.out.println("监听到新增的配置,准备下载...");
// ... 连接ftp服务器,根据url找到对应的配置
Thread.sleep(500);
System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
// ... 下载配置到你指定的目录
Thread.sleep(1000);
System.out.println("下载成功,已经添加到项目中");
}else if ("update".equals(type)){
System.out.println("监听到更新的配置,准备下载...");
// ... 连接ftp服务器,根据url找到对应的配置
Thread.sleep(500);
System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
// ... 下载配置到你指定的目录
Thread.sleep(1000);
System.out.println("下载成功");
System.out.println("删除项目中原配置文件...");
Thread.sleep(100);
// ... 删除原文件
System.out.println("拷贝配置文件到项目目录...");
// ... 拷贝文件到项目
}else if ("delete".equals(type)){
System.out.println("监听到删除的配置");
System.out.println("删除项目中原配置文件...");
}
}
}
}
});
countDownLatch.await();
operator.closeZKClient();
}
}
三、分布式锁
1、场景
高并场景下,对共享资源的访问,都需要加锁,分布式的环境下,就需要加分布式锁。如下代码,模拟一个分布式的并发操作。
- Service
package com.imooc.curator.service;
import org.springframework.stereotype.Service;
/**
* @author K. L. Mao
* @create 2018/9/9
*/
@Service
public class PayService {
private static int COUNT = 100;
/**
* 高并发下的 count-1
* @return
*/
public int countLock(){
if (COUNT <= 99){
return -1;
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
COUNT = COUNT - 1;
return COUNT;
}
}
- Controller
package com.imooc.curator.web;
import com.imooc.curator.service.PayService;
import com.imooc.curator.utils.ZKCurator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author K. L. Mao
* @create 2018/9/9
*/
@RestController
public class PayController {
@Autowired
private PayService payService;
/**
* 模拟客户端1
* @return
*/
@GetMapping("/lock")
public int lock(){
return payService.countLock();
}
/**
* 模拟客户端2
* @return
*/
@GetMapping("/lock2")
public int lock2(){
return payService.countLock();
}
}
当我们在三秒内分别访问 localhost:8080/lock 和 localhost:8080/lock2 时,就会产生2个线程对 COUNT 进行了操作,使其 变为98了。这样是不允许的,那么如何保证这两个线程是依次执行的呢?这时候,我们需要 zk 来实现分布式锁了。
2、实现思路
首先,Zookeeper 的每一个节点,都是一个天然的顺序发号器。
在每一个节点下面创建子节点时,只要选择的创建类型是有序(EPHEMERAL_SEQUENTIAL 临时有序或者PERSISTENT_SEQUENTIAL 永久有序)类型,那么,新的子节点后面,会加上一个次序编号。这个次序编号,是上一个生成的次序编号加一。
比如,创建一个用于发号的节点“/test/lock”,然后以他为父亲节点,可以在这个父节点下面创建相同前缀的子节点,假定相同的前缀为“/test/lock/seq-”,在创建子节点时,同时指明是有序类型。如果是第一个创建的子节点,那么生成的子节点为“/test/lock/seq-0000000000”,下一个节点则为“/test/lock/seq-0000000001”,依次类推,等等。
其次,Zookeeper节点的递增性,可以规定节点编号最小的那个获得锁。
一个zookeeper分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT类型),然后每个要获得锁的线程都会在这个节点下创建个临时顺序节点,由于序号的递增性,可以规定排号最小的那个获得锁。所以,每个线程在尝试占用锁之前,首先判断自己是排号是不是当前最小,如果是,则获取锁。
第三,Zookeeper的节点监听机制,可以保障占有锁的方式有序而且高效。
每个线程抢占锁之前,先抢号创建自己的ZNode。同样,释放锁的时候,就需要删除抢号的Znode。抢号成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode 的通知就可以了。当前一个Znode 删除的时候,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传花似的依次向后。
Zookeeper的节点监听机制,可以说能够非常完美的,实现这种击鼓传花似的信息传递。具体的方法是,每一个等通知的Znode节点,只需要监听或者 watch 监视排号在自己前面那个,而且紧挨在自己前面的那个节点。 只要上一个节点被删除了,就进行再一次判断,看看自己是不是序号最小的那个节点,如果是,则获得锁。
3、代码实现
首先定义了一个锁的接口,很简单,一个加锁方法,一个解锁方法。
public interface Lock {
boolean lock() throws Exception;
boolean unlock();
}
- lock 加锁方法
@Override
public boolean lock() {
try {
boolean locked = false;
// 尝试加锁
locked = tryLock();
if (locked) {
return true;
}
while (!locked) {
// 加锁失败,则等待
await();
if (checkLocked()) {
locked=true;
}
}
return true;
} catch (Exception e) {
e.printStackTrace();
unlock();
}
return false;
}
lock
方法的具体逻辑是,首先尝试着去加锁,如果加锁失败就去等待,然后再重复。尝试加锁的tryLock
方法是关键。做了两件重要的事情:
(1)创建临时顺序节点,并且保存自己的节点路径。
(2)判断是否是第一个,如果是第一个,则加锁成功。如果不是,就找到前一个Znode节点,并且保存其路径到 prior_path。
- tryLock 方法:
private boolean tryLock() throws Exception {
//创建临时Znode
List<String> waiters = getWaiters();
locked_path = ZKclient.instance
.createEphemeralSeqNode(LOCK_PREFIX);
if (null == locked_path) {
throw new Exception("zk error");
}
locked_short_path = getShorPath(locked_path);
//获取等待的子节点列表,判断自己是否第一个
if (checkLocked()) {
return true;
}
// 判断自己排第几个
int index = Collections.binarySearch(waiters, locked_short_path);
if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有自己了
throw new Exception("节点没有找到: " + locked_short_path);
}
//如果自己没有获得锁,则要监听前一个节点
prior_path = ZK_PATH + "/" + waiters.get(index - 1);
return false;
}
创建临时顺序节点后,其完整路径存放在locked_path
成员变量中。另外还截取了一个后缀路径,放在 locked_short_path
成员变量中。 这个后缀路径,是一个短路径,只有完整路径的最后一层。在和取到的远程子节点列表中的其他路径进行比较时,需要用到短路径。因为子节点列表的路径,都是短路径,只有最后一层。
然后,调用checkLocked
方法,判断是否是锁定成功。如果是则返回。如果自己没有获得锁,则要监听前一个节点。找出前一个节点的路径,保存在prior_path
成员中,供后面的await
等待方法,去监听使用。
在进入await
等待方法的介绍前,先说下checkLocked
锁定判断方法。
在checkLocked
方法中,判断是否可以持有锁。判断规则很简单:当前创建的节点,是否在上一步获取到的子节点列表的第一个位置:
如果是,说明可以持有锁,返回true,表示加锁成功;
如果不是,说明有其他线程早已先持有了锁,返回false。
- checkLocked 方法
private boolean checkLocked() {
//获取等待的子节点列表
List<String> waiters = getWaiters();
//节点按照编号,升序排列
Collections.sort(waiters);
// 如果是第一个,代表自己已经获得了锁
if (locked_short_path.equals(waiters.get(0))) {
log.info("成功的获取分布式锁,节点为{}", locked_short_path);
return true;
}
return false;
}
等待方法await
,表示在争夺锁失败以后的等待逻辑。那么此处该线程应该做什么呢?
- await 方法
private void await() throws Exception {
if (null == prior_path) {
throw new Exception("prior_path error");
}
final CountDownLatch latch = new CountDownLatch(1);
//订阅比自己次小顺序节点的删除事件
Watcher w = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听到的变化 watchedEvent = " + watchedEvent);
log.info("[WatchedEvent]节点删除");
latch.countDown();
}
};
client.getData().usingWatcher(w).forPath(prior_path);
latch.await(WAIT_TIME, TimeUnit.SECONDS);
}
首先添加一个watcher
监听,而监听的地址正是上面一步返回的prior_path
成员。这里,仅仅会监听自己前一个节点的变动,而不是父节点下所有节点的变动。然后,调用latch.await,进入等待状态,等到latch.countDown()
被唤醒。
一旦prior_path
节点发生了变动,那么就将线程从等待状态唤醒,重新一轮的锁的争夺。
至此,关于加锁的算法基本完成。但是,上面还没有实现锁的可重入。
- 可重入加锁方式
#修改前面的lock方法,在前面加上可重入的判断逻辑。代码如下:
public boolean lock() {
synchronized (this) {
if (lockCount.get() == 0) {
thread = Thread.currentThread();
lockCount.incrementAndGet();
} else {
if (!thread.equals(Thread.currentThread())) {
return false;
}
lockCount.incrementAndGet();
return true;
}
}
//...
}
为了变成可重入,在代码中增加了一个加锁的计数器lockCount
,计算重复加锁的次数。如果是同一个线程加锁,只需要增加次数,直接返回,表示加锁成功。
- unlock 方法
释放锁主要有两个工作:
(1)减少重入锁的计数,如果不是0,直接返回,表示成功的释放了一次;
(2)如果计数器为0,移除Watchers
监听器,并且删除创建的 Znode 临时节点;
代码如下:
@Override
public boolean unlock() {
if (!thread.equals(Thread.currentThread())) {
return false;
}
int newLockCount = lockCount.decrementAndGet();
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + locked_path);
}
if (newLockCount != 0) {
return true;
}
try {
if (ZKclient.instance.isNodeExist(locked_path)) {
client.delete().forPath(locked_path);
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
这里,为了尽量保证线程安全,可重入计数器的类型,不是 int 类型,而是Java并发包中的原子类型——AtomicInteger
。
4、分布式锁的应用场景
前面的实现,主要的价值是展示一下分布式锁的基础开发和原理。实际的开发中,如果需要使用到分布式锁,并不需要自己造轮子,可以直接使用curator
客户端中的各种官方实现的分布式锁,比如其中的InterProcessMutex
可重入锁。
- InterProcessMutex 可重入锁的使用实例如下:
@Test
public void testzkMutex() throws InterruptedException {
CuratorFramework client=ZKclient.instance.getClient();
final InterProcessMutex zkMutex =
new InterProcessMutex(client,"/mutex"); ;
for (int i = 0; i < 10; i++) {
FutureTaskScheduler.add(() -> {
try {
zkMutex.acquire();
for (int j = 0; j < 10; j++) {
count++;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("count = " + count);
zkMutex.release();
} catch (Exception e) {
e.printStackTrace();
}
});
}
Thread.sleep(Integer.MAX_VALUE);
}
最后,总结一下 Zookeeper 分布式锁。
利用在同级目录下,不能创建相同的节点特性,可以利用多线程去创建一个节点,但是只能有一个线程可以创建成功,所以该线程得到锁。释放锁:释放锁时,通过删除该节点,来触发刚才没有获取到的线程的监听,让他们再次来竞争获取。这种方案可以达到效果,但是会有一个问题产生,就是如果在并发比较大的情况下,一个临时节点的消失,会造成很多线程同时会试图创建临时节点,这种方式会影响 zk 的稳定性,这个效应称为羊群效应。
Zookeeper分布式锁,能有效的解决分布式问题,不可重入问题,实现起来较为简单。
但是,Zookeeper 实现的分布式锁其实存在一个缺点,那就是性能并不太高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能。ZK中创建和删除节点只能通过 Leader 服务器来执行,然后 Leader 服务器还需要将数据同步到所有的 Follower 机器上。所以,在高性能,高并发的场景下,不建议使用Zk的分布式锁,建议使用 Redis。