etcd 實戰基礎篇: 實現分佈式鎖以及分佈式隊列

上一篇 etcd 實戰基礎篇 (一) 我們主要介紹了 etcd 使用場景以及最基礎性的一些操作 (put、get、watch)。 這一篇我們接着實戰 etcd 其他業務場景。

基於 etcd 的分佈式鎖

基於 etcd 實現一個分佈式鎖特別簡單。etcd 提供了開箱即用的包 concurrency,幾行代碼就實現一個分佈式鎖。

package src

import (
  "context"
  "flag"
  "fmt"
  "github.com/coreos/etcd/clientv3"
  "github.com/coreos/etcd/clientv3/concurrency"
  "log"
  "strings"
  "time"
)

var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")

// 初始化etcd客戶端
func initEtcdClient() *clientv3.Client {
  var client *clientv3.Client
  var err error
  // 解析etcd的地址,編程[]string
  endpoints := strings.Split(*addr, ",")
  // 創建一個 etcd 的客戶端
  client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {
    fmt.Printf("初始化客戶端失敗:%v\\n", err)
    log.Fatal(err)
  }
  return client
}

func Lock(id int, lockName string) {
  client := initEtcdClient()
  defer client.Close()

  // 創建一個 session,如果程序宕機奔潰,etcd可以知道
  s, err := concurrency.NewSession(client)
  if err != nil {
    log.Fatal(err)
  }
  defer s.Close()

  // 創建一個etcd locker
  locker := concurrency.NewLocker(s, lockName)

  log.Printf("id:%v 嘗試獲取鎖%v", id, lockName)
  locker.Lock()
  log.Printf("id:%v取得鎖%v", id, lockName)

  // 模擬業務耗時
  time.Sleep(time.Millisecond * 300)

  locker.Unlock()
  log.Printf("id:%v釋放鎖%v", id, lockName)
}

我們再寫個腳本運行,看看結果。

package main

import (
  "etcd-test/src"
  "sync"
)

func main() {
  var lockName = "locker-test"
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(item int) {
      defer wg.Done()
      src.Lock(item, lockName)
    }(i)
  }
  wg.Wait()
}

我們發起了 10 個併發搶同一個 key 鎖的命令。運行結果如下, 

set key value [EX seconds] [PX milliseconds] [NX|XX]

這其中,介紹兩個關鍵的屬性:

我們在使用 redis 做分佈式鎖的時候會這麼寫。(代碼用了包 https://github.com/go-redis/redis)

func RedisLock(item int) {
  rdb = redis.NewClient(&redis.Options{
    Addr: "127.0.0.1:6379",
    Password: "",
    DB: 0,
  })
  fmt.Printf("item:%v 嘗試獲取鎖,時間:%v\\n", item, time.Now().String())
  res, _ := rdb.SetNX(ctx, "key", "value", 2*time.Second).Result()
  if !res {
    fmt.Printf("item:%v 嘗試獲取鎖失敗\\n", item)
    return
  }

  fmt.Printf("item:%v 獲取到鎖,時間:%v\\n", item, time.Now().String())
  time.Sleep(1 * time.Second) //模擬業務耗時
  fmt.Printf("item:%v 釋放鎖,時間:%v\\n", item, time.Now().String())
  rdb.Del(ctx, "key")
}
rdb.SetNX(ctx, "key", "value", 2*time.Second)

我們規定鎖的過期時間是 2 秒,下面有一句 time.Sleep(1 * time.Second) 用來模擬處理業務的耗時。業務處理結束,我們刪除 key rdb.Del(ctx, "key") 。

我們寫個簡單的腳本,

func main() {
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(item int) {
      defer wg.Done()
      RedisLock(item)
    }(i)
  }
  wg.Wait()
}

我們開啓十個 G 併發的調用 RedisLock 函數。每次調用,函數內部都會新建一個 redis 客戶端,本質上是 10 個客戶端。

運行這段程序,

從圖中看出,同一時刻只有一個客戶端獲取到鎖,並且在一秒的任務處理後,釋放了鎖,好像沒太大的問題。

那麼,我再寫一個簡單的例子。

import (
  "context"
  "fmt"
  "github.com/go-redis/redis/v8"
  "sync"
  "time"
)

var ctx = context.Background()
var rdb *redis.Client

func main() {
  var wg sync.WaitGroup
  wg.Add(2)
  go func() {
    defer wg.Done()
    ExampleLock(1, 0)
  }()

  go func() {
    defer wg.Done()
    ExampleLock(2, 5)
  }()
  wg.Wait()
}


func ExampleLock(item int, timeSleep time.Duration) {
  rdb = redis.NewClient(&redis.Options{
    Addr: "127.0.0.1:6379",
    Password: "",
    DB: 0,
  })
  if timeSleep > 0 {
    time.Sleep(time.Second * timeSleep)
  }
  fmt.Printf("item:%v 嘗試獲取鎖,時間:%v\\n", item, time.Now().String())
  res, _ := rdb.SetNX(ctx, "key", "value", 3*time.Second).Result()
  if !res {
    fmt.Printf("item:嘗試獲取鎖失敗:%v\\n", item)
    return
  }

  fmt.Printf("item:%v 獲取到鎖,時間:%v\\n", item, time.Now().String())
  time.Sleep(7 * time.Second)
  fmt.Printf("item:%v 釋放鎖,時間:%v\\n", item, time.Now().String())
  rdb.Del(ctx, "key")
}

我們設置鎖的過期時間是 3 秒,而獲取鎖之後的任務處理時間爲 7 秒。

然後我們開啓兩個 G。

ExampleLock(1, 0)
ExampleLock(2, 5)

其中第二行數字 5,從代碼中可以看出,是指啓動 G 後過 5 秒去獲取鎖。

這段代碼整體流程是這樣的: G(1) 獲取到鎖後,設置的鎖持有時間是 3 秒,由於任務執行需要 7 秒的時間,因此在 3 秒過後鎖會自動釋放。G(2) 可以在第 5 秒的時候獲取到鎖,然後它執行任務也得 7 秒。

最後,G(1) 在獲取鎖後 7 秒執行釋放鎖的操作,G(2) 同理。

發現問題了嗎?

G(1) 的鎖在 3 秒後已經自動釋放了。但是在任務處理結束後又執行了解鎖的操作, 可此時這個鎖是 G(2) 的呀。

那麼接下來由於 G(1) 誤解了 G(2) 的鎖,如果此時有其他的 G,那麼就可以獲取到鎖。

等 G(2) 任務執行結束,同理又會誤解其他 G 的鎖,這是一個惡性循環。 這也是掘金一篇由 redis 分佈式鎖造成茅臺超賣重大事故的原因之一。

至於其他的,可以自行查看這篇文章 [1]。

基於 etcd 的分佈式隊列

對隊列更多的理論知識就不加以介紹了。我們都知道,隊列是一種先進先出的數據結構,一般也只有入隊和出隊兩種操作。 我們常常在單機的應用中使用到隊列。

那麼,如何實現一個分佈式的隊列呢?。

我們可以使用 etcd 開箱即用的工具,在 etcd 底層 recipe 包裏結構 Queue,實現了一個多讀多寫的分佈式隊列。

type Queue struct {
  client *v3.Client
  ctx context.Context

  keyPrefix string
}
func NewQueue(client *v3.Client, keyPrefix string) *Queue
func (q *Queue) Dequeue() (string, error)
func (q *Queue) Enqueue(val string)

我們基於此包可以很方便的實現。

package src

import (
  "github.com/coreos/etcd/clientv3"
  recipe "github.com/coreos/etcd/contrib/recipes"
  "log"
  "strconv"
  "strings"
  "sync"
  "time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")

// 初始化etcd客戶端
func initEtcdClient() *clientv3.Client {
  var client *clientv3.Client
  var err error
  // 解析etcd的地址,編程[]string
  endpoints := strings.Split(*addr, ",")
  // 創建一個 etcd 的客戶端
  client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {
    log.Printf("初始化客戶端失敗:%v\\n", err)
    log.Fatal(err)
  }
  return client
}

func Push(keyName string) {
  client := initEtcdClient()
  defer client.Close()
  q := recipe.NewQueue(client, keyName)
  var wg sync.WaitGroup

  for i := 0; i < 3; i++ {
    for j := 0; j < 10; j++ {
      wg.Add(1)
      go func(item int) {
        defer wg.Done()
        err := q.Enqueue(strconv.Itoa(item))
        if err != nil {
          log.Printf("push err:%v\\n", err)
        }
      }(j)
    }
    time.Sleep(2 * time.Second)
  }
  wg.Wait()
}

func Pop(keyName string) {
  client := initEtcdClient()
  defer client.Close()
  q := recipe.NewQueue(client, keyName)
  for {
    res, err := q.Dequeue()
    if err != nil {
      log.Fatal(err)
      return
    }
    log.Printf("接收值:%v\\n", res)
  }
}

在 push 中,我們開啓 3 輪發送值入隊,每次發送 10 個,發送一輪休息 2 秒。 在 pop 中,通過死循環獲取隊列中的值。

運行腳本程序如下。

package main

import (
  "etcd-test/src"
  "time"
)

func main() {
  key := "test-queue"
  go src.Pop(key)
  time.Sleep(1 * time.Second)
  go src.Push(key)
  time.Sleep(20 * time.Second)
}

我們使用兩個 G 代表 分別運行 push 和 pop 操作。 同時爲了達到運行效果,我們先運行 pop 等待有入隊的元素。 運行結果動畫如下,

etcd 還提供了優先級的分佈式的隊列。和上面的用法相似。只是在入隊的時候,不僅僅需要提供一個值,還需要提供一個整數,來表示當前 push 值的優先級。數值越小,優先級越高。

我們改動一下上述的代碼。

package src

import (
  "github.com/coreos/etcd/clientv3"
  recipe "github.com/coreos/etcd/contrib/recipes"
  "log"
  "strconv"
  "strings"
  "sync"
  "time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")

// 初始化etcd客戶端
func initEtcdClient() *clientv3.Client {
  var client *clientv3.Client
  var err error
  // 解析etcd的地址,編程[]string
  endpoints := strings.Split(*addr, ",")
  // 創建一個 etcd 的客戶端
  client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {
    log.Printf("初始化客戶端失敗:%v\\n", err)
    log.Fatal(err)
  }
  return client
}

func PriorityPush(keyName string) {
  client := initEtcdClient()
  defer client.Close()
  q := recipe.NewPriorityQueue(client, keyName)
  var wg sync.WaitGroup

  for j := 0; j < 10; j++ {
    wg.Add(1)
    go func(item int) {
      defer wg.Done()
      err := q.Enqueue(strconv.Itoa(item), uint16(item))
      if err != nil {
        log.Printf("push err:%v\\n", err)
      }
    }(j)
  }
  wg.Wait()
}

func PriorityPop(keyName string) {
  client := initEtcdClient()
  defer client.Close()
  q := recipe.NewPriorityQueue(client, keyName)
  for {
    res, err := q.Dequeue()
    if err != nil {
      log.Fatal(err)
      return
    }
    log.Printf("接收值:%v\\n", res)
  }
}

然後以下是我們的測試代碼:

package main

import (
  "etcd-test/src"
  "sync"
  "time"
)
func main() {
  key := "test-queue"
  var wg sync.WaitGroup
  wg.Add(1)
  go func() {
    defer wg.Done()
    src.PriorityPush(key)
  }()
  wg.Wait()
  go src.PriorityPop(key)
  time.Sleep(20 * time.Second)
}

我們把 0 到 9 的數併發的 push 到隊列中,對應的優先級整數值就是它本身,push 完畢,我們運行 PriorityPop 函數,看最終結果顯示就是從 0 到 9。

總結

這篇文章主要介紹瞭如何使用 etcd 實現分佈式鎖以及分佈式隊列。其他 etcd 的場景,可以自行實踐。

附錄

[1]https://juejin.cn/post/6854573212831842311

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/VCIPmAD5YSWz0DAuFqfwyg