Zookeeper 從 0 到 1 實現一個分佈式鎖

分佈式鎖,在實際的業務使用場景中算是比較常用的了,而分佈式鎖的實現,常見的除了 redis 之外,就是 zk 的實現了,前面一篇博文介紹了 zk 的基本概念與使用姿勢,那麼如果讓我們來記住 zk 的特性來設計一個分佈式鎖,可以怎麼做呢?

I. 方案設計

1. 創建節點方式實現

zk 有四種節點,一個最容易想到的策略就是創建節點,誰創建成功了,就表示誰持有了這個鎖

這個思路與 redis 的setnx有點相似,因爲 zk 的節點創建,也只會有一個會話會創建成功,其他的則會拋已存在的異常

藉助臨時節點,會話丟掉之後節點刪除,這樣可以避免持有鎖的實例異常而沒有主動釋放導致所有實例都無法持有鎖的問題

如果採用這種方案,如果我想實現阻塞獲取鎖的邏輯,那麼其中一個方案就需要寫一個 while(true) 來不斷重試

while(true) {
    if (tryLock(xxx)) return true;
    else Thread.sleep(1000);
}

另外一個策略則是藉助事件監聽,當節點存在時,註冊一個節點刪除的觸發器,這樣就不需要我自己重試判斷了;充分藉助 zk 的特性來實現異步回調

public void lock() {
  if (tryLock(path,  new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            synchronized (path){
                path.notify();
            }
        }
    })) {
      return true;
  }

  synchronized (path) {
      path.wait();
  }
}

那麼上面這個實現有什麼問題呢?

每次節點的變更,那麼所有的都會監聽到變動,好處是非公平鎖的支持;缺點就是剩下這些喚醒的實例中也只會有一個搶佔到鎖,無意義的喚醒浪費性能

2. 臨時順序節點方式

接下來這種方案更加常見,晚上大部分的教程也是這種 case,主要思路就是創建臨時順序節點

只有序號最小的節點,才表示搶佔鎖成功;如果不是最小的節點,那麼就監聽它前面一個節點的刪除事件,前面節點刪除了,一種可能是他放棄搶鎖,一種是他釋放自己持有的鎖,不論哪種情況,對我而言,我都需要撈一下所有的節點,要麼拿鎖成功;要麼換一個前置節點

II. 分佈式鎖實現

接下來我們來一步步看下,基於臨時順序節點,可以怎麼實現分佈式鎖

對於 zk,我們依然採用 apache 的提供的包 zookeeper來操作;後續提供Curator的分佈式鎖實例

1. 依賴

核心依賴

<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.7.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

版本說明:

2. 簡單的分佈式鎖

第一步,都是實例創建

public class ZkLock implements Watcher {

    private ZooKeeper zooKeeper;
    // 創建一個持久的節點,作爲分佈式鎖的根目錄
    private String root;

    public ZkLock(String root) throws IOException {
        try {
            this.root = root;
            zooKeeper = new ZooKeeper("127.0.0.1:2181", 500_000, this);
            Stat stat = zooKeeper.exists(root, false);
            if (stat == null) {
                // 不存在則創建
                createNode(root, true);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    // 簡單的封裝節點創建,這裏只考慮持久 + 臨時順序
    private String createNode(String path, boolean persistent) throws Exception {
        return zooKeeper.create(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, persistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL_SEQUENTIAL);
    }
}

在我們的這個設計中,我們需要持有當前節點和監聽前一個節點的變更,所以我們在 ZkLock 實例中,添加兩個成員

/**
 * 當前節點
 */
private String current;

/**
 * 前一個節點
 */
private String pre;

接下來就是嘗試獲取鎖的邏輯

/**
 * 嘗試獲取鎖,創建順序臨時節點,若數據最小,則表示搶佔鎖成功;否則失敗
 *
 * @return
 */
public boolean tryLock() {
    try {
        String path = root + "/";
        if (current == null) {
            // 創建臨時順序節點
            current = createNode(path, false);
        }
        List<String> list = zooKeeper.getChildren(root, false);
        Collections.sort(list);

        if (current.equalsIgnoreCase(path + list.get(0))) {
            // 獲取鎖成功
            return true;
        } else {
            // 獲取鎖失敗,找到前一個節點
            int index = Collections.binarySearch(list, current.substring(path.length()));
            // 查詢當前節點前面的那個
            pre = path + list.get(index - 1);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return false;
}

請注意上面的實現,這裏並沒有去監聽前一個節點的變更,在設計tryLock,因爲是立馬返回成功 or 失敗,所以使用這個接口的,不需要註冊監聽

我們的監聽邏輯,放在 lock() 同步阻塞裏面

public boolean lock() {
    if (tryLock()) {
        return true;
    }

    try {
        // 監聽前一個節點的刪除事件
        Stat state = zooKeeper.exists(pre, true);
        if (state != null) {
            synchronized (pre) {
                // 阻塞等待前面的節點釋放
                pre.wait();
                // 這裏不直接返回true,因爲前面的一個節點刪除,可能並不是因爲它持有鎖並釋放鎖,如果是因爲這個會話中斷導致臨時節點刪除,這個時候需要做的是換一下監聽的 preNode
                return lock();
            }
        } else {
          // 不存在,則再次嘗試拿鎖
          return lock();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return false;
}

注意:

爲啥不是直接返回 true? 而是需要重新競爭呢?

最後別忘了釋放鎖

public void unlock() {
    try {
        zooKeeper.delete(current, -1);
        current = null;
        zooKeeper.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

到此,我們的分佈式鎖就完成了,接下來我們覆盤下實現過程

這個實現,支持了鎖的重入(why? 因爲鎖未釋放時,我們保存了 current,當前節點存在時則直接判斷是不是最小的;而不是重新創建)

3. 測試

最後寫一個測試 case,來看下

@SpringBootApplication
public class Application {

    private void tryLock(long time) {
        ZkLock zkLock = null;
        try {
            zkLock = new ZkLock("/lock");
            System.out.println("嘗試獲取鎖: " + Thread.currentThread() + " at: " + LocalDateTime.now());
            boolean ans = zkLock.lock();
            System.out.println("執行業務邏輯:" + Thread.currentThread() + " at:" + LocalDateTime.now());
            Thread.sleep(time);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (zkLock != null) {
                zkLock.unlock();
            }
        }
    }

    public Application() throws IOException, InterruptedException {
        new Thread(() -> tryLock(10_000)).start();

        Thread.sleep(1000);
        // 獲取鎖到執行鎖會有10s的間隔,因爲上面的線程搶佔到鎖,並持有了10s
        new Thread(() -> tryLock(1_000)).start();
        System.out.println("---------over------------");

        Scanner scanner = new Scanner(System.in);
        String ans = scanner.next();
        System.out.println("---> over --->" + ans);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}

輸出結果如下

II. 其他

0. 項目

1. 一灰灰 Blog

盡信書則不如,以上內容,純屬一家之言,因個人能力有限,難免有疏漏和錯誤之處,如發現 bug 或者有更好的建議,歡迎批評指正,不吝感激

下面一灰灰的個人博客,記錄所有學習和工作中的博文,歡迎大家前去逛逛

一灰灰 blog

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/za20FhZNL6zEhRu2deIdcQ