Zookeeper锁

云程序员 2020年10月01日 11次浏览

在Java中,大家都非常熟悉锁。在并发编程中,我们通过锁,来避免由于竞争而造成的数据不一致问题。通常,我们以synchronized 、Lock来使用它。

但是Java中的锁,只能保证在同一个JVM进程内中的安全,分布式系统的锁就需要用到redis、zookeeper、本文只要是使用zookeeper锁来进行讲述

ZooKeeper分布式锁的原理

  1. ZooKeeper的每一个节点,都是一个天然的顺序发号器。

在每一个节点下面创建临时顺序节点(EPHEMERAL_SEQUENTIAL)类型,新的子节点后面,会加上一个次序编号,而这个生成的次序编号,是上一个生成的次序编号加一。

例如,有一个用于发号的节点“/test/lock”为父亲节点,可以在这个父节点下面创建相同前缀的临时顺序子节点,假定相同的前缀为“/test/lock/seq-”。第一个创建的子节点基本上应该为/test/lock/seq-0000000000,下一个节点则为/test/lock/seq-0000000001

  1. ZooKeeper节点的递增有序性,可以确保锁的公平

一个ZooKeeper分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT类型),然后每个要获得锁的线程,都在这个节点下创建个临时顺序节点。由于ZK节点,是按照创建的次序,依次递增的。

为了确保公平,可以简单的规定:编号最小的那个节点,表示获得了锁。所以,每个线程在尝试占用锁之前,首先判断自己是排号是不是当前最小,如果是,则获取锁。

  1. ZooKeeper的节点监听机制,可以保障占有锁的传递有序而且高效

每个线程抢占锁之前,先尝试创建自己的ZNode。同样,释放锁的时候,就需要删除创建的Znode。创建成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode
的通知就可以了。前一个Znode删除的时候,会触发Znode事件,当前节点能监听到删除事件,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传花似的依次向后。

ZooKeeper的节点监听机制,能够非常完美地实现这种击鼓传花似的信息传递。具体的方法是,每一个等通知的Znode节点,只需要监听(linsten)或者监视(watch)排号在自己前面那个,而且紧挨在自己前面的那个节点,就能收到其删除事件了。
只要上一个节点被删除了,就进行再一次判断,看看自己是不是序号最小的那个节点,如果是,自己就获得锁。

另外,ZooKeeper的内部优越的机制,能保证由于网络异常或者其他原因,集群中占用锁的客户端失联时,锁能够被有效释放。一旦占用Znode锁的客户端与ZooKeeper集群服务器失去联系,这个临时Znode也将自动删除。排在它后面的那个节点,也能收到删除事件,从而获得锁。正是由于这个原因,在创建取号节点的时候,尽量创建临时znode
节点

  1. ZooKeeper的节点监听机制,能避免羊群效应

ZooKeeper这种首尾相接,后面监听前面的方式,可以避免羊群效应。所谓羊群效应就是一个节点挂掉,所有节点都去监听,然后做出反应,这样会给服务器带来巨大压力,所以有了临时顺序节点,当一个节点挂掉,只有它后面的那一个节点才做出反应。

锁类型

排他锁

Zookeeper实现排他锁的过程比较简单,过程大概如下

  1. 所有客户端同时创建一个临时节点,创建成功的获取锁

  2. 创建失败的用户创建监听

  3. 获取到监听变化的用户重新创建锁,直到获取锁

共享锁

共享锁用到的原理是有序节点,根据享锁定义,不同事务都可以同时对同一个对象进行读取,而更新必须在没有任何事务进行读写操作的情况下进行,基于这个原则,大致有下列几个步骤

  1. 获取共享锁的时候,所有客户端会创建一个临时顺序节点,如果是读请求,就创建R-00000000001节点,如果是写请求,就创建W-00000000001节点

  2. 对节点下的所有节点进行监听

  3. 确定自己节点序号的顺序

  4. 对于读请求:

    如果没有比自己序号小的节点,或者所有比自己小的序号都是读请求,那表明自己获得了共享锁,否则进入等待

    对于写请求:

    如果自己不是序号最小的请求,就进入等待

分布式锁优化

在整个分布式锁的竞争过程中,大量的 “watch通知”和“子节点列表获取”两个操作重复运行,只是为了判断自己是否最小节点,看起来不是那么科学,那么我们有什么办法可以解决这个问题吗?

我们再来回顾一下上面的流程,核心逻辑在于:判断自己是否是最小的,那么,每个节点其实只需要关注比自己小的节点的变更即可-而不需要关系全局子节点的变更情况。

改进后的流程

  1. 获取共享锁的时候,所有客户端会创建一个临时顺序节点,如果是读请求,就创建R-00000000001节点,如果是写请求,就创建W-00000000001节点

  2. 获取所有子节点列表,注意这里不做watch。

  3. 如果没办法获取锁,那么就调用exists来对比自己小的节点进行监听。
    读请求:向比自己小的最后写节点进行监听
    写请求: 向比自己小的最后一个节点进行监听

  4. 等待watch通知。

FIFO队列

由于Zookeeper的临时有序性,要实现先入先出也比较简单。和分布式锁的实现也比较相似

  1. 获取所有子节点

  2. 确定自己的顺序

  3. 如果不是最小节点,就进入等待,同时向比自己小的节点进行watch注册

  4. 接收到watch通知后,重复步骤1

Barrier

Barrier原意是障碍物,屏障,而在分布式系统中,特指系统之间的一个协调条件,规定了一个队列的元素必须都聚集后才能统一进行安排,否则一直等待

  1. 获取父节点的数据值 10

  2. 获取子节点数

  3. 如果子节点数少于10,监听,进入等待

  4. 接收到通知后重复第二步

一个简单锁的代码实现

public class DistributedLock implements Lock, Watcher {
    private ZooKeeper zk = null;
    // 根节点
    private String ROOT_LOCK = "/locks";
    // 竞争的资源
    private String lockName;
    // 等待的前一个锁
    private String WAIT_LOCK;
    // 当前锁
    private String CURRENT_LOCK;
    // 计数器
    private CountDownLatch countDownLatch;
    private int sessionTimeout = 30000;
    private List<Exception> exceptionList = new ArrayList<Exception>();

    /**
     * 配置分布式锁
     * @param config 连接的url
     * @param lockName 竞争资源
     */
    public DistributedLock(String config, String lockName) {
        this.lockName = lockName;
        try {
            // 连接zookeeper,注册监听器this
            zk = new ZooKeeper(config, sessionTimeout, this);
            Stat stat = zk.exists(ROOT_LOCK, false);
            if (stat == null) {
                // 如果根节点不存在,则创建根节点
                zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    // 节点监视器
    public void process(WatchedEvent event) {
        if (this.countDownLatch != null) {
            this.countDownLatch.countDown();
        }
    }

    public void lock() {
        if (exceptionList.size() > 0) {
            throw new LockException(exceptionList.get(0));
        }
        try {
            if (this.tryLock()) {
                System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
                return;
            } else {
                // 等待锁
                waitForLock(WAIT_LOCK, sessionTimeout);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if (lockName.contains(splitStr)) {
                throw new LockException("锁名有误");
            }
            // 创建临时有序节点
            CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(CURRENT_LOCK + " 已经创建");
            // 取所有子节点
            List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
            // 取出所有lockName的锁
            List<String> lockObjects = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if (_node.equals(lockName)) {
                    lockObjects.add(node);
                }
            }
            Collections.sort(lockObjects);
            System.out.println(Thread.currentThread().getName() + " 的锁是 " + CURRENT_LOCK);
            // 若当前节点为最小节点,则获取锁成功
            if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
                return true;
            }

            // 若不是最小节点,则找到自己的前一个节点
            String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
            WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        return false;
    }

    public boolean tryLock(long timeout, TimeUnit unit) {
        try {
            if (this.tryLock()) {
                return true;
            }
            return waitForLock(WAIT_LOCK, timeout);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    // 等待锁
    private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
        Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);

        if (stat != null) {
            System.out.println(Thread.currentThread().getName() + "等待锁 " + ROOT_LOCK + "/" + prev);
            this.countDownLatch = new CountDownLatch(1);
            // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
            //如果临界点监听回调已经过了,怎么办
            this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
            this.countDownLatch = null;
            System.out.println(Thread.currentThread().getName() + " 等到了锁");
        }
        return true;
    }

    public void unlock() {
        try {
            System.out.println("释放锁 " + CURRENT_LOCK);
            zk.delete(CURRENT_LOCK, -1);
            CURRENT_LOCK = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public Condition newCondition() {
        return null;
    }

    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }


    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }
}