Golang 使用 Zookeeper 實現分佈式鎖

什麼是分佈式鎖?

分佈式鎖是一種在分佈式系統中用於控制併發訪問的機制。在分佈式系統中,多個客戶端可能會同時對同一個資源進行訪問,這可能導致數據不一致的問題。分佈式鎖的作用是確保同一時刻只有一個客戶端能夠對某個資源進行訪問,從而避免數據不一致的問題。

分佈式鎖的實現通常依賴於一些具有分佈式特性的技術,如 ZooKeeperRedis、數據庫等。這些技術提供了在分佈式環境中實現互斥訪問的機制,使得多個客戶端在競爭同一個資源時能夠有序地進行訪問。

通過使用分佈式鎖,可以確保分佈式系統中的數據一致性和併發訪問的有序性,從而提高系統的可靠性和穩定性。

Zookeeper 與 Redis 的分佈式鎖對比

ZooKeeperRedis 都是常用的實現分佈式鎖的工具,但它們在實現方式、特性、適用場景等方面有一些區別。以下是 ZooKeeper 分佈式鎖與 Redis 分佈式鎖的比較:

實現方式

特性

適用場景

可靠性

綜上所述,ZooKeeper 分佈式鎖和 Redis 分佈式鎖各有優缺點,具體選擇哪種方式取決於實際業務場景和需求。在需要保證順序性和公平性的場景下,ZooKeeper 分佈式鎖可能更適合;而在需要高性能和快速響應的場景下,Redis 分佈式鎖可能更合適。

爲什麼 Zookeeper 可以實現分佈式鎖

ZooKeeper 可以實現分佈式鎖,主要得益於其以下幾個特性:

  1. 臨時節點:ZooKeeper 支持創建臨時節點,這些節點在創建它們的客戶端會話結束時會被自動刪除。這種特性使得 ZooKeeper 的節點具有生命週期,可以隨着客戶端的存活而存在,客戶端斷開連接後自動消失,非常適合作爲鎖的標識。

  2. 順序節點:ZooKeeper 的另一個重要特性是支持創建順序節點。在創建節點時,ZooKeeper 會在節點名稱後自動添加一個自增的數字,確保節點在 ZNode 中的順序性。這個特性使得 ZooKeeper 可以實現分佈式鎖中的公平鎖,按照請求的順序分配鎖。

  3. Watcher 機制:ZooKeeper 還提供了 Watcher 機制,允許客戶端在指定的節點上註冊監聽事件。當這些事件觸發時,ZooKeeper 服務端會將事件通知到感興趣的客戶端,從而允許客戶端做出相應的措施。這種機制使得 ZooKeeper 的分佈式鎖可以實現阻塞鎖,即當客戶端嘗試獲取已經被其他客戶端持有的鎖時,它可以等待鎖被釋放。

基於以上特性,ZooKeeper 可以實現分佈式鎖。具體實現流程如下:

  1. 客戶端需要獲取鎖時,在 ZooKeeper 中創建一個臨時順序節點作爲鎖標識。

  2. 客戶端判斷自己創建的節點是否是所有臨時順序節點中序號最小的。如果是,則客戶端獲得鎖;如果不是,則客戶端監聽序號比它小的那個節點。

  3. 當被監聽的節點被刪除時(即持有鎖的客戶端釋放鎖),監聽者會收到通知,然後重新判斷自己是否獲得鎖。

  4. 當客戶端釋放鎖時,只需要將會話關閉,臨時節點就會被自動刪除,從而釋放了鎖。

因此,ZooKeeper 通過其臨時節點、順序節點和 Watcher 機制等特性,實現了分佈式鎖的功能。

使用 Golang 實現 Zookeeper 分佈式鎖

下面我們通過一個簡單的例子來演示如何使用 Golang 實現 ZooKeeper 分佈式鎖。

創建 zookeeper 客戶端連接

import "github.com/go-zookeeper/zk"

func client() *zk.Conn {
    // 默認端口 2181
 c, _, err := zk.Connect([]string{"192.168.2.168"}, time.Second)
 if err != nil {
  panic(err)
 }
 return c
}

創建父節點 - /lock

我們可以在獲取鎖之前,先創建一個父節點,用於存放鎖節點。

type Lock struct {
 c *zk.Conn
}

// 父節點 /lock 不存在的時候進行創建
func NewLock() *Lock {
 c := client()
 e, _, err := c.Exists("/lock")
 if err != nil {
  panic(err)
 }
 if !e {
  _, err := c.Create("/lock"[]byte(""), 0, zk.WorldACL(zk.PermAll))
  if err != nil {
   panic(err)
  }
 }

 return &Lock{c: c}
}

獲取鎖

在 Zookeeper 分佈式鎖實現中,獲取鎖的過程實際上就是創建一個臨時順序節點,並判斷自己是否是所有臨時順序節點中序號最小的。

獲取鎖的關鍵是:

  1. 創建的需要是臨時節點

  2. 創建的需要是順序節點

具體創建代碼如下:

p, err := l.c.Create("/lock/lock"[]byte(""), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))

其中 zk.FlagEphemeral 表示創建的是臨時節點,zk.FlagSequence 表示創建的是順序節點。

判斷當前創建的節點是否是最小節點

具體步驟如下:

  1. 通過 l.c.Children("/lock") 獲取 /lock 下的所有子節點

  2. 對所有子節點進行排序

  3. 判斷當前創建的節點是否是最小節點

  4. 如果是最小節點,則獲取到鎖,函數調用返回;如果不是,則監聽前一個節點(這會導致函數調用阻塞)

childs, _, err := l.c.Children("/lock")
if err != nil {
    return "", err
}

// childs 是無序的,所以需要排序,以便找到當前節點的前一個節點,然後監聽前一個節點
sort.Strings(childs)

// 成功獲取到鎖
p1 := strings.Replace(p, "/lock/""", 1)
if childs[0] == p1 {
    return p, nil
}

不是最小節點,監聽前一個節點

具體步驟如下:

  1. 通過 sort.SearchStrings 找到當前節點在所有子節點中的位置

  2. 調用 l.c.ExistsW 判斷前一個節點是否依然存在(鎖有可能在調用 ExistsW 之前已經被釋放了),如果不存在則獲取到鎖

  3. 如果前一個節點依然存在,則阻塞等待前一個節點被刪除

// 監聽鎖,等待鎖釋放
// 也就是說,如果當前節點不是最小的節點,那麼就監聽前一個節點
// 一旦前一個節點被刪除,那麼就可以獲取到鎖
index := sort.SearchStrings(childs, p1)
b, _, ev, err := l.c.ExistsW("/lock/" + childs[index-1])
if err != nil {
    return "", err
}

// 在調用 ExistsW 之後,前一個節點已經被刪除
if !b {
    return p, nil
}

// 等待前一個節點被刪除
<-ev

return p, nil

在調用 ExistsW 的時候,如果前一個節點已經被刪除,那麼 ExistsW 會立即返回 false,否則我們可以通過 ExistsW 返回的第三個參數 ev 來等待前一個節點被刪除。

<-ev 處,我們通過 <-ev 來等待前一個節點被刪除,一旦前一個節點被刪除,ev 會收到一個事件,這個時候我們就可以獲取到鎖了。

釋放鎖

如果調用 Lock 可以成功獲取到鎖,我們會返回當前創建的節點的路徑,我們可以通過這個路徑來釋放鎖。

func (l *Lock) Unlock(p string) error {
 return l.c.Delete(p, -1)
}

完整代碼

package main

import (
 "github.com/go-zookeeper/zk"
 "sort"
 "strings"
 "time"
)

func client() *zk.Conn {
 c, _, err := zk.Connect([]string{"192.168.2.168"}, time.Second) //*10)
 if err != nil {
  panic(err)
 }
 return c
}

type Lock struct {
 c *zk.Conn
}

func NewLock() *Lock {
 c := client()
 e, _, err := c.Exists("/lock")
 if err != nil {
  panic(err)
 }
 if !e {
  _, err := c.Create("/lock"[]byte(""), 0, zk.WorldACL(zk.PermAll))
  if err != nil {
   panic(err)
  }
 }

 return &Lock{c: c}
}

func (l *Lock) Lock() (string, error) {
 p, err := l.c.Create("/lock/lock"[]byte(""), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
 if err != nil {
  return "", err
 }
 childs, _, err := l.c.Children("/lock")
 if err != nil {
  return "", err
 }

 // childs 是無序的,所以需要排序,以便找到當前節點的前一個節點,然後監聽前一個節點
 sort.Strings(childs)

 // 成功獲取到鎖
 p1 := strings.Replace(p, "/lock/""", 1)
 if childs[0] == p1 {
  return p, nil
 }

 // 監聽鎖,等待鎖釋放
 // 也就是說,如果當前節點不是最小的節點,那麼就監聽前一個節點
 // 一旦前一個節點被刪除,那麼就可以獲取到鎖
 index := sort.SearchStrings(childs, p1)
 b, _, ev, err := l.c.ExistsW("/lock/" + childs[index-1])
 if err != nil {
  return "", err
 }

 // 在調用 ExistsW 之後,前一個節點已經被刪除
 if !b {
  return p, nil
 }

 // 等待前一個節點被刪除
 <-ev

 return p, nil
}

func (l *Lock) Unlock(p string) error {
 return l.c.Delete(p, -1)
}

測試代碼

下面這個例子模擬了分佈式的 counter 操作,我們通過 ZooKeeper 分佈式鎖來保證 counter 的原子性。

當然這個例子只是爲了說明 ZooKeeper 分佈式鎖的使用,實際上下面的功能通過 redis 自身提供的 incr 就可以實現,不需要這麼複雜。

package main

import (
 "context"
 "fmt"
 "github.com/redis/go-redis/v9"
 "sync"
)

func main() {
 var count = 1000
 var wg sync.WaitGroup
 wg.Add(count)

 l := NewLock()
    // 創建 redis 客戶端連接
 redisClient = redis.NewClient(&redis.Options{
  Addr:     "192.168.2.168:6379",
  Password: "", // no password set
  DB:       0,  // use default DB
 })

 for i := 0; i < count; i++ {
  go func(i1 int) {
   defer wg.Done()

    // 獲取 Zookeeper 分佈式鎖
   p, err := l.Lock()
   if err != nil {
    return
   }
   // 成功獲取到了分佈式鎖:
   // 1. 從 redis 獲取 zk_counter 的值
   // 2. 然後對 zk_counter 進行 +1 操作
   // 3. 最後將 zk_counter 的值寫回 redis
   cmd := redisClient.Get(context.Background()"zk_counter")
   i2, _ := cmd.Int()
   i2++
   redisClient.Set(context.Background()"zk_counter", i2, 0)
   // 釋放分佈式鎖
   err = l.Unlock(p)
   if err != nil {
    println(fmt.Errorf("unlock error: %v", err))
    return
   }
  }(i)
 }

 wg.Wait()

 l.c.Close()
}

我們需要將測試程序放到不同的機器上運行,這樣才能模擬分佈式環境。

總結

最後,再來回顧一下本文內容:

  1. sync.Mutex 這種鎖只能保證單進程內的併發安全,無法保證分佈式環境下的併發安全。

  2. 使用 ZookeeperRedis 都能實現分佈式鎖,但是 Zookeeper 可以保證順序性和公平性,而 Redis 可以保證高性能。

  3. Zookeeper 通過其臨時節點、順序節點和 Watcher 機制等特性,實現了分佈式鎖的功能。


本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/wcca3nuok-0tkKHrtCoqgw