Zookeeper 实现分布式节点下的配置文件统一管理和分布式锁

一、ZooKeeper 简介

ZooKeeper 是一个集中式服务,用于维护配置信息,命名,提供分布式同步和提供组服务。
ZooKeeper 的主要应用:1、节点选举;2、配置文件统一管理;3、分布式锁;4、发布与订阅(Dubbo);5、集群管理,集群中保证数据的强一致性,下面我们主要讲配置文件统一管理和分布式锁。

Zookeeper文件系统

Zookeeper的每个子目录项如 NameService 都被称作为 znode,和文件系统一样,我们能够自由的增加、删除 znode,在一个 znode 下增加、删除子 znode,唯一的不同在于znode是可以存储数据的。

有四种类型的 znode:

1、PERSISTENT-持久化目录节点
客户端与 zookeeper 断开连接后,该节点依旧存在。
2、PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点
客户端与 zookeeper 断开连接后,该节点依旧存在,只是 zookeeper 给该节点名称进行顺序编号。
3、EPHEMERAL-临时目录节点
客户端与 zookeeper 断开连接后,该节点被删除。
4、EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
客户端与 zookeeper 断开连接后,该节点被删除,只是 zookeeper 给该节点名称进行顺序编号。

二、配置文件统一管理

1、实现思路

假如我们需要修改三(或者更多)台服务器上 redis.conf 的配置信息,如果一台一台的去修改,则会加大出错概率,而且也不实际。这时候,我们需要引入Zookeeper(下面简称zk),我们需要知道,zk 中有个 watcher 事件,包括 :
EventType:NodeCreated //节点创建
EventType:NodeDataChanged //节点的数据变更
EventType:NodeChildrentChanged //子节点下的数据变更
EventType:NodeDeleted // 节点删除
当我们监听了上面的事件时,事件触发就会被告知。以统一更新 redis.conf 配置文件为例,我们可以实现监听某一个节点的数据更新事件,当DBA更改了该节点的值(一般为 json 串,方便程序解析,例:{"type":"update","url":"ftp:192.168.2.10/config/redis.xml"}),此时我们可以根据 type 的值“update”可知,是需要更新 redis.conf 配置文件,然后根据 url 的值,获取最新的 redis.conf 文件所在的服务器地址。此时,我们可以下载最新配置文件,然后删除原来的 redis.conf 配置文件,最后将最新的配置文件添加到项目中,从而通过重启程序就可以读取到最新的配置了。

2、代码实现

这里我们模拟了三个客户端 Client1、Client2、Client3,代码都是一样的。当 zk 节点数据发送变化,就会触发数据更新的事件,从而告知其客户端(必须监听了该事件)。

package com.imooc.curator.operator;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.util.StringUtils;

import java.util.concurrent.CountDownLatch;

/**
 *  使用 zk 的 watch 事件,实现配置文件的统一配置
 * @author K. L. Mao
 * @create 2018/9/8
 */
public class Client1 {
    public CuratorFramework client;
    public static final String zkServerPath = "192.168.174.10:2181,192.168.174.11:2181,192.168.174.12:2181";

    public static final String CONFIG_NODE_PATH = "/super/imooc";
    public static final String SUB_PATH = "/redis-config";
    public static CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * 实例化 zk 客户端
     */
    public Client1(){
        /**
         * curator 连接 zk 的策略:RetryNTimes
         * n:重试次数
         * sleepMsBetweenRetries:每次重试间隔的时间
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        // 启动客户端连接
        client.start();
    }

    /**
     * 关闭 zk 客户端
     */
    public void closeZKClient() {
        if (client != null){
            client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 实例化
        Client1 operator = new Client1();
        System.out.println("client1 启动成功");
        // 创建节点
//        byte[] data = "super".getBytes();
//        operator.client.create().creatingParentsIfNeeded()
//                .withMode(CreateMode.PERSISTENT)    // 持久化节点
//                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)   // 全权限ACL
//                .forPath(nodePath, data);

        // watcher 事件 当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
//        operator.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
        // NodeCache:监听 CONFIG_NODE_PATH 下的子包数据节点的变更,会触发事件
        final PathChildrenCache nodeCache = new PathChildrenCache(operator.client, CONFIG_NODE_PATH, true);
        // 初始化的时候获取 node 的值并且缓存
        nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        // 新增监听器
        nodeCache.getListenable().addListener((client, event) -> {
            // 监听节点更新
            if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                String configNodePath = event.getData().getPath();
                if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)){
                    System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);
                    // 读取节点数据
                    String data = new String(event.getData().getData(), "UTF-8");
                    System.out.println("节点" + CONFIG_NODE_PATH + "的数据为" + data);
                    if (!StringUtils.isEmpty(data)){
                        JSONObject jsonObject = JSON.parseObject(data);
                        String type = jsonObject.getString("type");
                        String url = jsonObject.getString("url");
                        if ("add".equals(type)){
                            System.out.println("监听到新增的配置,准备下载...");
                            // ... 连接ftp服务器,根据url找到对应的配置
                            Thread.sleep(500);
                            System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
                            // ... 下载配置到你指定的目录
                            Thread.sleep(1000);
                            System.out.println("下载成功,已经添加到项目中");
                        }else if ("update".equals(type)){
                            System.out.println("监听到更新的配置,准备下载...");
                            // ... 连接ftp服务器,根据url找到对应的配置
                            Thread.sleep(500);
                            System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
                            // ... 下载配置到你指定的目录
                            Thread.sleep(1000);
                            System.out.println("下载成功");
                            System.out.println("删除项目中原配置文件...");
                            Thread.sleep(100);
                            // ... 删除原文件
                            System.out.println("拷贝配置文件到项目目录...");
                            // ... 拷贝文件到项目
                        }else if ("delete".equals(type)){
                            System.out.println("监听到删除的配置");
                            System.out.println("删除项目中原配置文件...");
                        }
                    }
                }
            }
        });
        countDownLatch.await();
        operator.closeZKClient();
    }
}

三、分布式锁

1、场景

高并场景下,对共享资源的访问,都需要加锁,分布式的环境下,就需要加分布式锁。如下代码,模拟一个分布式的并发操作。

  • Service
package com.imooc.curator.service;

import org.springframework.stereotype.Service;

/**
 * @author K. L. Mao
 * @create 2018/9/9
 */
@Service
public class PayService {

    private static int COUNT = 100;

    /**
     * 高并发下的 count-1
     * @return
     */
    public int countLock(){
        if (COUNT <= 99){
            return -1;
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        COUNT = COUNT - 1;
        return COUNT;
    }
}
  • Controller
package com.imooc.curator.web;

import com.imooc.curator.service.PayService;
import com.imooc.curator.utils.ZKCurator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author K. L. Mao
 * @create 2018/9/9
 */
@RestController
public class PayController {

    @Autowired
    private PayService payService;

    /**
     * 模拟客户端1
     * @return
     */
    @GetMapping("/lock")
    public int lock(){
       return payService.countLock();
    }

    /**
     * 模拟客户端2
     * @return
     */
    @GetMapping("/lock2")
    public int lock2(){
        return payService.countLock();
    }

}

当我们在三秒内分别访问 localhost:8080/lock 和 localhost:8080/lock2 时,就会产生2个线程对 COUNT 进行了操作,使其 变为98了。这样是不允许的,那么如何保证这两个线程是依次执行的呢?这时候,我们需要 zk 来实现分布式锁了。

2、实现思路

首先,Zookeeper 的每一个节点,都是一个天然的顺序发号器。

在每一个节点下面创建子节点时,只要选择的创建类型是有序(EPHEMERAL_SEQUENTIAL 临时有序或者PERSISTENT_SEQUENTIAL 永久有序)类型,那么,新的子节点后面,会加上一个次序编号。这个次序编号,是上一个生成的次序编号加一。

比如,创建一个用于发号的节点“/test/lock”,然后以他为父亲节点,可以在这个父节点下面创建相同前缀的子节点,假定相同的前缀为“/test/lock/seq-”,在创建子节点时,同时指明是有序类型。如果是第一个创建的子节点,那么生成的子节点为“/test/lock/seq-0000000000”,下一个节点则为“/test/lock/seq-0000000001”,依次类推,等等。



其次,Zookeeper节点的递增性,可以规定节点编号最小的那个获得锁。

一个zookeeper分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT类型),然后每个要获得锁的线程都会在这个节点下创建个临时顺序节点,由于序号的递增性,可以规定排号最小的那个获得锁。所以,每个线程在尝试占用锁之前,首先判断自己是排号是不是当前最小,如果是,则获取锁。

第三,Zookeeper的节点监听机制,可以保障占有锁的方式有序而且高效。

每个线程抢占锁之前,先抢号创建自己的ZNode。同样,释放锁的时候,就需要删除抢号的Znode。抢号成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode 的通知就可以了。当前一个Znode 删除的时候,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传花似的依次向后。

Zookeeper的节点监听机制,可以说能够非常完美的,实现这种击鼓传花似的信息传递。具体的方法是,每一个等通知的Znode节点,只需要监听或者 watch 监视排号在自己前面那个,而且紧挨在自己前面的那个节点。 只要上一个节点被删除了,就进行再一次判断,看看自己是不是序号最小的那个节点,如果是,则获得锁。

3、代码实现

首先定义了一个锁的接口,很简单,一个加锁方法,一个解锁方法。

public interface Lock {

    boolean lock() throws Exception;

    boolean unlock();
}
  • lock 加锁方法
    @Override
    public boolean lock() {

       try {
            boolean locked = false;

            // 尝试加锁
            locked = tryLock();

            if (locked) {
                return true;
            }
            while (!locked) {

                // 加锁失败,则等待
                await();


                if (checkLocked()) {
                    locked=true;
                }
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            unlock();
        }

        return false;
    }

lock方法的具体逻辑是,首先尝试着去加锁,如果加锁失败就去等待,然后再重复。尝试加锁的tryLock方法是关键。做了两件重要的事情:

(1)创建临时顺序节点,并且保存自己的节点路径。

(2)判断是否是第一个,如果是第一个,则加锁成功。如果不是,就找到前一个Znode节点,并且保存其路径到 prior_path。

  • tryLock 方法:
private boolean tryLock() throws Exception {
        //创建临时Znode
        List<String> waiters = getWaiters();
        locked_path = ZKclient.instance
                .createEphemeralSeqNode(LOCK_PREFIX);
        if (null == locked_path) {
            throw new Exception("zk error");
        }
        locked_short_path = getShorPath(locked_path);

        //获取等待的子节点列表,判断自己是否第一个
        if (checkLocked()) {
            return true;
        }

        // 判断自己排第几个
        int index = Collections.binarySearch(waiters, locked_short_path);
        if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有自己了
            throw new Exception("节点没有找到: " + locked_short_path);
        }

        //如果自己没有获得锁,则要监听前一个节点
        prior_path = ZK_PATH + "/" + waiters.get(index - 1);

        return false;
    }

创建临时顺序节点后,其完整路径存放在locked_path成员变量中。另外还截取了一个后缀路径,放在 locked_short_path成员变量中。 这个后缀路径,是一个短路径,只有完整路径的最后一层。在和取到的远程子节点列表中的其他路径进行比较时,需要用到短路径。因为子节点列表的路径,都是短路径,只有最后一层。

然后,调用checkLocked方法,判断是否是锁定成功。如果是则返回。如果自己没有获得锁,则要监听前一个节点。找出前一个节点的路径,保存在prior_path成员中,供后面的await等待方法,去监听使用。

在进入await等待方法的介绍前,先说下checkLocked锁定判断方法。

checkLocked方法中,判断是否可以持有锁。判断规则很简单:当前创建的节点,是否在上一步获取到的子节点列表的第一个位置:

如果是,说明可以持有锁,返回true,表示加锁成功;

如果不是,说明有其他线程早已先持有了锁,返回false。

  • checkLocked 方法
private boolean checkLocked() {
        //获取等待的子节点列表

        List<String> waiters = getWaiters();
        //节点按照编号,升序排列
        Collections.sort(waiters);

        // 如果是第一个,代表自己已经获得了锁
        if (locked_short_path.equals(waiters.get(0))) {
            log.info("成功的获取分布式锁,节点为{}", locked_short_path);
            return true;
        }
        return false;
    }

等待方法await,表示在争夺锁失败以后的等待逻辑。那么此处该线程应该做什么呢?

  • await 方法
private void await() throws Exception {

        if (null == prior_path) {
            throw new Exception("prior_path error");
        }

        final CountDownLatch latch = new CountDownLatch(1);


        //订阅比自己次小顺序节点的删除事件
        Watcher w = new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("监听到的变化 watchedEvent = " + watchedEvent);
                log.info("[WatchedEvent]节点删除");

                latch.countDown();
            }
        };

        client.getData().usingWatcher(w).forPath(prior_path);
      
        latch.await(WAIT_TIME, TimeUnit.SECONDS);
    }

首先添加一个watcher监听,而监听的地址正是上面一步返回的prior_path成员。这里,仅仅会监听自己前一个节点的变动,而不是父节点下所有节点的变动。然后,调用latch.await,进入等待状态,等到latch.countDown()被唤醒。

一旦prior_path节点发生了变动,那么就将线程从等待状态唤醒,重新一轮的锁的争夺。

至此,关于加锁的算法基本完成。但是,上面还没有实现锁的可重入。

  • 可重入加锁方式
#修改前面的lock方法,在前面加上可重入的判断逻辑。代码如下:

  public boolean lock() {
     synchronized (this) {
        if (lockCount.get() == 0) {
            thread = Thread.currentThread();
            lockCount.incrementAndGet();
        } else {
            if (!thread.equals(Thread.currentThread())) {
                return false;
            }
            lockCount.incrementAndGet();
            return true;
        }
    }
    
   //...
   }

为了变成可重入,在代码中增加了一个加锁的计数器lockCount,计算重复加锁的次数。如果是同一个线程加锁,只需要增加次数,直接返回,表示加锁成功。

  • unlock 方法

释放锁主要有两个工作:

(1)减少重入锁的计数,如果不是0,直接返回,表示成功的释放了一次;

(2)如果计数器为0,移除Watchers监听器,并且删除创建的 Znode 临时节点;

代码如下:

    @Override
    public boolean unlock() {

        if (!thread.equals(Thread.currentThread())) {
            return false;
        }

        int newLockCount = lockCount.decrementAndGet();

        if (newLockCount < 0) {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + locked_path);
        }

        if (newLockCount != 0) {
            return true;
        }
        try {
            if (ZKclient.instance.isNodeExist(locked_path)) {
                client.delete().forPath(locked_path);
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

        return true;
    }

这里,为了尽量保证线程安全,可重入计数器的类型,不是 int 类型,而是Java并发包中的原子类型——AtomicInteger

4、分布式锁的应用场景

前面的实现,主要的价值是展示一下分布式锁的基础开发和原理。实际的开发中,如果需要使用到分布式锁,并不需要自己造轮子,可以直接使用curator客户端中的各种官方实现的分布式锁,比如其中的InterProcessMutex可重入锁。

  • InterProcessMutex 可重入锁的使用实例如下:
@Test
public void testzkMutex() throws InterruptedException {

    CuratorFramework client=ZKclient.instance.getClient();
    final InterProcessMutex zkMutex =
            new InterProcessMutex(client,"/mutex");  ;
    for (int i = 0; i < 10; i++) {
        FutureTaskScheduler.add(() -> {

            try {
                zkMutex.acquire();

                for (int j = 0; j < 10; j++) {

                    count++;
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("count = " + count);
                zkMutex.release();

            } catch (Exception e) {
                e.printStackTrace();
            }

        });
    }

    Thread.sleep(Integer.MAX_VALUE);
}

最后,总结一下 Zookeeper 分布式锁。

利用在同级目录下,不能创建相同的节点特性,可以利用多线程去创建一个节点,但是只能有一个线程可以创建成功,所以该线程得到锁。释放锁:释放锁时,通过删除该节点,来触发刚才没有获取到的线程的监听,让他们再次来竞争获取。这种方案可以达到效果,但是会有一个问题产生,就是如果在并发比较大的情况下,一个临时节点的消失,会造成很多线程同时会试图创建临时节点,这种方式会影响 zk 的稳定性,这个效应称为羊群效应。

Zookeeper分布式锁,能有效的解决分布式问题,不可重入问题,实现起来较为简单。

但是,Zookeeper 实现的分布式锁其实存在一个缺点,那就是性能并不太高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能。ZK中创建和删除节点只能通过 Leader 服务器来执行,然后 Leader 服务器还需要将数据同步到所有的 Follower 机器上。所以,在高性能,高并发的场景下,不建议使用Zk的分布式锁,建议使用 Redis。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容