ZooKeeper 實現分佈式鎖

基礎

ZooKeeper 的 4 個節點

ZK 分佈式鎖相關基礎知識

實現原理

核心思想

核心思想就是基於 臨時順序節點 和 Watcher(事件監聽器) 實現的。

當客戶端要獲取鎖,則創建節點,使用完鎖,則刪除該節點。

  1. 客戶端獲取鎖時,在 lock 節點下創建臨時順序節點。

  2. 臨時是防止客戶端宕機後,無法正常刪除鎖的情況

  3. 使用順序節點,是因爲所有嘗試獲取鎖的客戶端都會對持有鎖的子節點加監聽器。當該鎖被釋放之後,勢必會造成所有嘗試獲取鎖的客戶端來爭奪鎖,這樣對性能不友好。使用順序節點之後,只需要監聽前一個節點就好了,對性能更友好

  4. 然後獲取 lock 下面的所有子節點,客戶端獲取到所有的子節點之後,如果發現自己創建的子節點序號最小,那麼就認爲該客戶端獲取到了鎖。使用完鎖後,將該節點刪除。

  5. 如果發現自己創建的節點並非 lock 所有子節點中最小的,說明自己還沒有獲取到鎖,此時客戶端需要找到比自己小的那個節點,同時對其註冊事件監聽器,監聽刪除事件。

  6. 如果發現比自己小的那個節點被刪除,則客戶端的 Watcher 會收到相應通知,此時再次判斷自己創建的節點是否是 lock 子節點中序號最小的,如果是則獲取到了鎖,如果不是則重複以上步驟繼續獲取到比自己小的一個節點並註冊監聽。

獲取鎖步驟:

  1. 在 /lock 節點下創建一個有序臨時節點 (EPHEMERAL_SEQUENTIAL)。

  2. 判斷創建的節點序號是否最小,如果是最小則獲取鎖成功。不是則取鎖失敗,然後 watch 序號比本身小的前一個節點。

  3. 當取鎖失敗,設置 watch 後則等待 watch 事件到來後,再次判斷是否序號最小。

  4. 取鎖成功則執行代碼,最後釋放鎖(刪除該節點)

釋放鎖步驟:

  1. 成功獲取鎖的客戶端在執行完業務流程之後,會將對應的子節點刪除。

  2. 成功獲取鎖的客戶端在出現故障之後,對應的子節點由於是臨時順序節點,也會被自動刪除,避免了鎖無法被釋放。

  3. 事件監聽器其實監聽的就是這個子節點刪除事件,子節點刪除就意味着鎖被釋放。

羊羣效應和解決方法

實現

實際項目中,推薦使用 Curator 來實現 ZooKeeper 分佈式鎖。Curator 是 Netflix 公司開源的一套 ZooKeeper Java 客戶端框架,相比於 ZooKeeper 自帶的客戶端 zookeeper 來說,Curator 的封裝更加完善,各種 API 都可以比較方便地使用。

原生 API 實現

/**
 * 自己本身就是一個 watcher,可以得到通知
 * AutoCloseable 實現自動關閉,資源不使用的時候
 */
@Slf4j
public class ZkLock implements AutoCloseable, Watcher {

    private ZooKeeper zooKeeper;

    /**
     * 記錄當前鎖的名字
     */
    private String znode;

    public ZkLock() throws IOException {
        this.zooKeeper = new ZooKeeper("localhost:2181",
                10000,this);
    }

    public boolean getLock(String businessCode) {
        try {
            //創建業務 根節點
            Stat stat = zooKeeper.exists("/" + businessCode, false);
            if (stat==null){
                zooKeeper.create("/" + businessCode,businessCode.getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT);
            }

            //創建瞬時有序節點  /order/order_00000001
            znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);

            //獲取業務節點下 所有的子節點
            List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
            //獲取序號最小的(第一個)子節點
            Collections.sort(childrenNodes);
            String firstNode = childrenNodes.get(0);
            //如果創建的節點是第一個子節點,則獲得鎖
            if (znode.endsWith(firstNode)){
                return true;
            }
            //如果不是第一個子節點,則監聽前一個節點
            String lastNode = firstNode;
            for (String node:childrenNodes){
                if (znode.endsWith(node)){
                    zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
                    break;
                }else {
                    lastNode = node;
                }
            }
            synchronized (this){
                wait();
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public void close() throws Exception {
        zooKeeper.delete(znode,-1);
        zooKeeper.close();
        log.info("我已經釋放了鎖!");
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeDeleted){
            synchronized (this){
                notify();
            }
        }
    }
}

Curator 實現

Curator 有五種鎖:

CuratorFramework client = ZKUtils.getClient();
client.start();
// 分佈式可重入排它鎖
InterProcessLock lock1 = new InterProcessMutex(client, lockPath1);
// 分佈式不可重入排它鎖
InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, lockPath2);
// 將多個鎖作爲一個整體
InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

if (!lock.acquire(10, TimeUnit.SECONDS)) {
   throw new IllegalStateException("不能獲取多鎖");
}
System.out.println("已獲取多鎖");
System.out.println("是否有第一個鎖: " + lock1.isAcquiredInThisProcess());
System.out.println("是否有第二個鎖: " + lock2.isAcquiredInThisProcess());
try {
    // 資源操作
    resource.use();
} finally {
    System.out.println("釋放多個鎖");
    lock.release();
}
System.out.println("是否有第一個鎖: " + lock1.isAcquiredInThisProcess());
System.out.println("是否有第二個鎖: " + lock2.isAcquiredInThisProcess());
client.close();

Curator 實現可重入鎖

當調用 InterProcessMutex#acquire 方法獲取鎖的時候,會調用 InterProcessMutex#internalLock 方法。

// 獲取可重入互斥鎖,直到獲取成功爲止
@Override
public void acquire() throws Exception {
  if (!internalLock(-1, null)) {
    throw new IOException("Lost connection while trying to acquire lock: " + basePath);
  }
}

internalLock 方法會先獲取當前請求鎖的線程,然後從 threadData(ConcurrentMap<Thread, LockData> 類型) 中獲取當前線程對應的 lockData 。 lockData 包含鎖的信息和加鎖的次數,是實現可重入鎖的關鍵。

第一次獲取鎖的時候,lockData 爲 null。獲取鎖成功之後,會將當前線程和對應的 lockData 放到 threadData 中

private boolean internalLock(long time, TimeUnit unit) throws Exception {
  // 獲取當前請求鎖的線程
  Thread currentThread = Thread.currentThread();
  // 拿對應的 lockData
  LockData lockData = threadData.get(currentThread);
  // 第一次獲取鎖的話,lockData 爲 null
  if (lockData != null) {
    // 當前線程獲取過一次鎖之後
    // 因爲當前線程的鎖存在, lockCount 自增後返回,實現鎖重入.
    lockData.lockCount.incrementAndGet();
    return true;
  }
  // 嘗試獲取鎖
  String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
  if (lockPath != null) {
    LockData newLockData = new LockData(currentThread, lockPath);
     // 獲取鎖成功之後,將當前線程和對應的 lockData 放到 threadData 中
    threadData.put(currentThread, newLockData);
    return true;
  }

  return false;
}

LockData 是 InterProcessMutex 中的一個靜態內部類。

private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

private static class LockData
{
    // 當前持有鎖的線程
    final Thread owningThread;
    // 鎖對應的子節點
    final String lockPath;
    // 加鎖的次數
    final AtomicInteger lockCount = new AtomicInteger(1);

    private LockData(Thread owningThread, String lockPath)
    {
      this.owningThread = owningThread;
      this.lockPath = lockPath;
    }
}

如果已經獲取過一次鎖,後面再來獲取鎖的話,直接就會在 if (lockData != null) 這裏被攔下了,然後就會執行 lockData.lockCount.incrementAndGet(); 將加鎖次數加 1。

整個可重入鎖的實現邏輯非常簡單,直接在客戶端判斷當前線程有沒有獲取鎖,有的話直接將加鎖次數加 1 就可以了。

案例 - 模擬 12306 售票

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class Ticket12306 implements Runnable{

    private int tickets = 10;//數據庫的票數

    private InterProcessMutex lock ;

    public Ticket12306(){
        //重試策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        //2.第二種方式
        //CuratorFrameworkFactory.builder();
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.149.135:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .build();

        //開啓連接
        client.start();

        lock = new InterProcessMutex(client,"/lock");
    }

    @Override
    public void run() {
        while(true){
            //獲取鎖
            try {
                lock.acquire(3, TimeUnit.SECONDS);
                if(tickets > 0){
                    System.out.println(Thread.currentThread()+":"+tickets);
                    Thread.sleep(100);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //釋放鎖
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

測試方法:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class LockTest {

    public static void main(String[] args) {
        Ticket12306 ticket12306 = new Ticket12306();

        //創建客戶端
        Thread t1 = new Thread(ticket12306,"去哪兒");
        Thread t2 = new Thread(ticket12306,"飛豬");

        t1.start();
        t2.start();
    }
}

優缺點

優點:

  1. 可靠性:ZooKeeper 是一個高可用的分佈式協調服務,基於它的分佈式鎖具有較高的可靠性和穩定性。

  2. 順序性: ZooKeeper 的有序臨時節點保證了鎖的獲取順序,避免了死鎖和競爭問題。

  3. 避免死鎖:在鎖的持有者釋放鎖之前,其他節點無法獲取鎖,從而避免了死鎖問題。

  4. 容錯性:即使部分節點發生故障,其他節點仍然可以正常獲取鎖,保證了系統的穩定性。

缺點:

  1. 性能:ZooKeeper 是一箇中心化的協調服務,可能在高併發場景下成爲性能瓶頸。

  2. 複雜性:ZooKeeper 的部署和維護相對複雜,需要一定的運維工作。

  3. 單點故障:儘管 ZooKeeper 本身是高可用的,但如果 ZooKeeper 集羣出現問題,可能會影響到基於它的分佈式鎖。

有序臨時節點的機制確保了獲取鎖的順序,避免了循環等待,從而有效地避免了死鎖問題。因爲任何一個客戶端在釋放鎖之前都會刪除自己的節點,從而觸發下一個等待的客戶端獲取鎖。

需要注意的是,這種機制雖然能夠有效避免死鎖,但也可能帶來性能問題。當某個客戶端釋放鎖時,需要觸發所有等待的客戶端獲取鎖,可能會導致較多的網絡通信和監聽事件。因此,在高併發情況下,需要綜合考慮性能和鎖的可靠性。

總的來說,基於 ZooKeeper 的分佈式鎖能夠確保數據一致性和鎖的可靠性,但需要權衡性能和複雜性。在選擇時,需要根據具體場景來決定是否使用該種鎖機制。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/aJm9COa4ZRfv9fUpnTvTtg