etcd 實戰基礎篇: 實現分佈式鎖以及分佈式隊列
上一篇 etcd 實戰基礎篇 (一) 我們主要介紹了 etcd 使用場景以及最基礎性的一些操作 (put、get、watch)。 這一篇我們接着實戰 etcd 其他業務場景。
基於 etcd 的分佈式鎖
基於 etcd 實現一個分佈式鎖特別簡單。etcd 提供了開箱即用的包 concurrency,幾行代碼就實現一個分佈式鎖。
package src
import (
"context"
"flag"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"log"
"strings"
"time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")
// 初始化etcd客戶端
func initEtcdClient() *clientv3.Client {
var client *clientv3.Client
var err error
// 解析etcd的地址,編程[]string
endpoints := strings.Split(*addr, ",")
// 創建一個 etcd 的客戶端
client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
DialTimeout: 5 * time.Second})
if err != nil {
fmt.Printf("初始化客戶端失敗:%v\\n", err)
log.Fatal(err)
}
return client
}
func Lock(id int, lockName string) {
client := initEtcdClient()
defer client.Close()
// 創建一個 session,如果程序宕機奔潰,etcd可以知道
s, err := concurrency.NewSession(client)
if err != nil {
log.Fatal(err)
}
defer s.Close()
// 創建一個etcd locker
locker := concurrency.NewLocker(s, lockName)
log.Printf("id:%v 嘗試獲取鎖%v", id, lockName)
locker.Lock()
log.Printf("id:%v取得鎖%v", id, lockName)
// 模擬業務耗時
time.Sleep(time.Millisecond * 300)
locker.Unlock()
log.Printf("id:%v釋放鎖%v", id, lockName)
}
我們再寫個腳本運行,看看結果。
package main
import (
"etcd-test/src"
"sync"
)
func main() {
var lockName = "locker-test"
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(item int) {
defer wg.Done()
src.Lock(item, lockName)
}(i)
}
wg.Wait()
}
我們發起了 10 個併發搶同一個 key 鎖的命令。運行結果如下,
set key value [EX seconds] [PX milliseconds] [NX|XX]
這其中,介紹兩個關鍵的屬性:
-
EX 標示設置過期時間,單位是秒。
-
NX 表示 當對應的 key 不存在時,才創建。
我們在使用 redis 做分佈式鎖的時候會這麼寫。(代碼用了包 https://github.com/go-redis/redis
)
func RedisLock(item int) {
rdb = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
fmt.Printf("item:%v 嘗試獲取鎖,時間:%v\\n", item, time.Now().String())
res, _ := rdb.SetNX(ctx, "key", "value", 2*time.Second).Result()
if !res {
fmt.Printf("item:%v 嘗試獲取鎖失敗\\n", item)
return
}
fmt.Printf("item:%v 獲取到鎖,時間:%v\\n", item, time.Now().String())
time.Sleep(1 * time.Second) //模擬業務耗時
fmt.Printf("item:%v 釋放鎖,時間:%v\\n", item, time.Now().String())
rdb.Del(ctx, "key")
}
rdb.SetNX(ctx, "key", "value", 2*time.Second)
我們規定鎖的過期時間是 2 秒,下面有一句 time.Sleep(1 * time.Second)
用來模擬處理業務的耗時。業務處理結束,我們刪除 key rdb.Del(ctx, "key")
。
我們寫個簡單的腳本,
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(item int) {
defer wg.Done()
RedisLock(item)
}(i)
}
wg.Wait()
}
我們開啓十個 G 併發的調用 RedisLock
函數。每次調用,函數內部都會新建一個 redis 客戶端,本質上是 10 個客戶端。
運行這段程序,
從圖中看出,同一時刻只有一個客戶端獲取到鎖,並且在一秒的任務處理後,釋放了鎖,好像沒太大的問題。
那麼,我再寫一個簡單的例子。
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"sync"
"time"
)
var ctx = context.Background()
var rdb *redis.Client
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
ExampleLock(1, 0)
}()
go func() {
defer wg.Done()
ExampleLock(2, 5)
}()
wg.Wait()
}
func ExampleLock(item int, timeSleep time.Duration) {
rdb = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
if timeSleep > 0 {
time.Sleep(time.Second * timeSleep)
}
fmt.Printf("item:%v 嘗試獲取鎖,時間:%v\\n", item, time.Now().String())
res, _ := rdb.SetNX(ctx, "key", "value", 3*time.Second).Result()
if !res {
fmt.Printf("item:嘗試獲取鎖失敗:%v\\n", item)
return
}
fmt.Printf("item:%v 獲取到鎖,時間:%v\\n", item, time.Now().String())
time.Sleep(7 * time.Second)
fmt.Printf("item:%v 釋放鎖,時間:%v\\n", item, time.Now().String())
rdb.Del(ctx, "key")
}
我們設置鎖的過期時間是 3 秒,而獲取鎖之後的任務處理時間爲 7 秒。
然後我們開啓兩個 G。
ExampleLock(1, 0)
ExampleLock(2, 5)
其中第二行數字 5,從代碼中可以看出,是指啓動 G 後過 5 秒去獲取鎖。
這段代碼整體流程是這樣的: G(1) 獲取到鎖後,設置的鎖持有時間是 3 秒,由於任務執行需要 7 秒的時間,因此在 3 秒過後鎖會自動釋放。G(2) 可以在第 5 秒的時候獲取到鎖,然後它執行任務也得 7 秒。
最後,G(1) 在獲取鎖後 7 秒執行釋放鎖的操作,G(2) 同理。
發現問題了嗎?
G(1) 的鎖在 3 秒後已經自動釋放了。但是在任務處理結束後又執行了解鎖的操作, 可此時這個鎖是 G(2) 的呀。
那麼接下來由於 G(1) 誤解了 G(2) 的鎖,如果此時有其他的 G,那麼就可以獲取到鎖。
等 G(2) 任務執行結束,同理又會誤解其他 G 的鎖,這是一個惡性循環。 這也是掘金一篇由 redis 分佈式鎖造成茅臺超賣重大事故的原因之一。
至於其他的,可以自行查看這篇文章 [1]。
基於 etcd 的分佈式隊列
對隊列更多的理論知識就不加以介紹了。我們都知道,隊列是一種先進先出的數據結構,一般也只有入隊和出隊兩種操作。 我們常常在單機的應用中使用到隊列。
那麼,如何實現一個分佈式的隊列呢?。
我們可以使用 etcd 開箱即用的工具,在 etcd
底層 recipe
包裏結構 Queue
,實現了一個多讀多寫的分佈式隊列。
type Queue struct {
client *v3.Client
ctx context.Context
keyPrefix string
}
func NewQueue(client *v3.Client, keyPrefix string) *Queue
func (q *Queue) Dequeue() (string, error)
func (q *Queue) Enqueue(val string)
我們基於此包可以很方便的實現。
package src
import (
"github.com/coreos/etcd/clientv3"
recipe "github.com/coreos/etcd/contrib/recipes"
"log"
"strconv"
"strings"
"sync"
"time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")
// 初始化etcd客戶端
func initEtcdClient() *clientv3.Client {
var client *clientv3.Client
var err error
// 解析etcd的地址,編程[]string
endpoints := strings.Split(*addr, ",")
// 創建一個 etcd 的客戶端
client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
DialTimeout: 5 * time.Second})
if err != nil {
log.Printf("初始化客戶端失敗:%v\\n", err)
log.Fatal(err)
}
return client
}
func Push(keyName string) {
client := initEtcdClient()
defer client.Close()
q := recipe.NewQueue(client, keyName)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
for j := 0; j < 10; j++ {
wg.Add(1)
go func(item int) {
defer wg.Done()
err := q.Enqueue(strconv.Itoa(item))
if err != nil {
log.Printf("push err:%v\\n", err)
}
}(j)
}
time.Sleep(2 * time.Second)
}
wg.Wait()
}
func Pop(keyName string) {
client := initEtcdClient()
defer client.Close()
q := recipe.NewQueue(client, keyName)
for {
res, err := q.Dequeue()
if err != nil {
log.Fatal(err)
return
}
log.Printf("接收值:%v\\n", res)
}
}
在 push
中,我們開啓 3 輪發送值入隊,每次發送 10 個,發送一輪休息 2 秒。 在 pop
中,通過死循環獲取隊列中的值。
運行腳本程序如下。
package main
import (
"etcd-test/src"
"time"
)
func main() {
key := "test-queue"
go src.Pop(key)
time.Sleep(1 * time.Second)
go src.Push(key)
time.Sleep(20 * time.Second)
}
我們使用兩個 G
代表 分別運行 push
和 pop
操作。 同時爲了達到運行效果,我們先運行 pop 等待有入隊的元素。 運行結果動畫如下,
etcd
還提供了優先級的分佈式的隊列。和上面的用法相似。只是在入隊的時候,不僅僅需要提供一個值,還需要提供一個整數,來表示當前 push
值的優先級。數值越小,優先級越高。
我們改動一下上述的代碼。
package src
import (
"github.com/coreos/etcd/clientv3"
recipe "github.com/coreos/etcd/contrib/recipes"
"log"
"strconv"
"strings"
"sync"
"time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")
// 初始化etcd客戶端
func initEtcdClient() *clientv3.Client {
var client *clientv3.Client
var err error
// 解析etcd的地址,編程[]string
endpoints := strings.Split(*addr, ",")
// 創建一個 etcd 的客戶端
client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
DialTimeout: 5 * time.Second})
if err != nil {
log.Printf("初始化客戶端失敗:%v\\n", err)
log.Fatal(err)
}
return client
}
func PriorityPush(keyName string) {
client := initEtcdClient()
defer client.Close()
q := recipe.NewPriorityQueue(client, keyName)
var wg sync.WaitGroup
for j := 0; j < 10; j++ {
wg.Add(1)
go func(item int) {
defer wg.Done()
err := q.Enqueue(strconv.Itoa(item), uint16(item))
if err != nil {
log.Printf("push err:%v\\n", err)
}
}(j)
}
wg.Wait()
}
func PriorityPop(keyName string) {
client := initEtcdClient()
defer client.Close()
q := recipe.NewPriorityQueue(client, keyName)
for {
res, err := q.Dequeue()
if err != nil {
log.Fatal(err)
return
}
log.Printf("接收值:%v\\n", res)
}
}
然後以下是我們的測試代碼:
package main
import (
"etcd-test/src"
"sync"
"time"
)
func main() {
key := "test-queue"
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
src.PriorityPush(key)
}()
wg.Wait()
go src.PriorityPop(key)
time.Sleep(20 * time.Second)
}
我們把 0 到 9 的數併發的 push
到隊列中,對應的優先級整數值就是它本身,push
完畢,我們運行 PriorityPop
函數,看最終結果顯示就是從 0 到 9。
總結
這篇文章主要介紹瞭如何使用 etcd 實現分佈式鎖以及分佈式隊列。其他 etcd 的場景,可以自行實踐。
附錄
[1]https://juejin.cn/post/6854573212831842311
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/VCIPmAD5YSWz0DAuFqfwyg