在 Go 中如何使用分佈式鎖解決併發問題?
在分佈式系統中,協調多個服務實例之間的共享資源訪問是一個經典的挑戰。傳統的單機鎖(如 sync.Mutex
)無法實現跨進程工作,此時就需要用到分佈式鎖了。本文將介紹 Go 語言生態中基於 Redis 實現的分佈式鎖庫 redsync
,並探討其使用方法和實現原理。
分佈式鎖
首先我們來探討下爲什麼需要分佈式鎖?當我們編寫的程序出現資源競爭的時候,就需要使用互斥鎖來保證併發安全。而我們的服務很有可能不會單機部署,而是採用多副本的集羣部署方案。無論哪種方案運行程序,我們都需要合適的工具來解決併發問題。在解決單個進程間多個協程之間的併發資源搶佔問題時,我們往往採用 sync.Mutex
。而在解決多個進程間的併發資源搶佔問題時,就需要採用分佈式鎖了,這就引出了我們今天要講解的 redsync
。
爲什麼是 redsync
在 Go 中分佈式鎖的開源實現有很多,爲什麼選擇介紹和使用 redsync
呢?簡單一句話:redsync
是 Redis 官方 唯一推薦的 Go Redis 分佈式鎖解決方案,遵循 Redlock 算法。它允許在多個獨立 Redis 節點上創建高可用的鎖,適用於需要強一致性的分佈式場景。
我們可以對比下 sync.Mutex
和 redsync
之間的區別,讓你有個感性的認識。
二者分別適用於不同的併發場景,選擇時需要根據實際需求(單機還是分佈式)來決定。
redsync
快速上手
redsync
雖然內部實現上比較複雜,但別被嚇到,它的用法超級簡單。
示例代碼如下:
package main
import (
"context"
"github.com/go-redsync/redsync/v4" // 引入 redsync 庫,用於實現基於 Redis 的分佈式鎖
"github.com/go-redsync/redsync/v4/redis/goredis/v9"// 引入 redsync 的 goredis 連接池
goredislib "github.com/redis/go-redis/v9" // 引入 go-redis 庫,用於與 Redis 服務器通信
)
func main() {
// 創建一個 Redis 客戶端
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:36379", // Redis 服務器地址
Password: "nightwatch",
})
// 使用 go-redis 客戶端創建一個 redsync 連接池
pool := goredis.NewPool(client)
// 創建一個 redsync 實例,用於管理分佈式鎖
rs := redsync.New(pool)
// 創建一個名爲 "test-redsync" 的互斥鎖(Mutex)
mutex := rs.NewMutex("test-redsync")
// 創建一個上下文(context),一般用於控制鎖的超時和取消
ctx := context.Background()
// 獲取鎖,如果獲取失敗(例如鎖已被其他進程持有),會返回錯誤
if err := mutex.LockContext(ctx); err != nil {
panic(err) // 如果獲取鎖失敗,程序會 panic
}
// TODO 執行業務邏輯
// ...
// 釋放鎖,如果釋放失敗(例如鎖已過期或不屬於當前進程),會返回錯誤
if _, err := mutex.UnlockContext(ctx); err != nil {
panic(err) // 如果釋放鎖失敗,程序會 panic
}
}
因爲 redsync
依賴 Redis,所以我們首先需要創建一個 Redis 客戶端對象 client
,調用 goredis.NewPool(client)
會基於這個 client
創建一個 redsync
的連接池,有了這個連接池 pool
就可以調用 redsync.New(pool)
創建一個 redsync
實例來申請分佈式鎖了。
redsync
提供了 NewMutex
方法可以創建一個分佈式鎖,它接收一個 name
參數作爲鎖的名字,這個名字會作爲 Redis 中的 key
。
拿到鎖對象 mutex
以後,調用 mutex.LockContext(ctx)
就可以加鎖,加鎖後便可以訪問競態資源了,資源訪問完成後,調用 mutex.UnlockContext(ctx)
便可以釋放鎖。
可以發現,redsync
用法和 sync.Mutex
非常相似,核心就是 Lock/Unlock
兩個操作。redsync 的使用無非多了一步連接 Redis 的過程。
配置選項
不知道你有沒有想過一個問題,我們在使用 sync.Mutex
時,如果某個 gorutine 加鎖後不釋放掉,那麼其他 gorutine 就無法獲取鎖,而在分佈式場景中,如果一個進程獲取了 Redis 分佈式鎖,然後在未釋放鎖之前進程掛掉了,其他進程要如何獲取鎖呢,難道要一直等待下去嗎?
這裏就要引出一個使用分佈式鎖很重要的問題,那就是一定要設置一個過期時間,這樣才能保證即使拿到鎖的進程掛掉了,只要鎖的過期時間已到,鎖也一定會被自動釋放掉,只有這樣,其他進程纔有機會獲取鎖。
而我們上面的示例中,之所以可以不設置鎖的過期時間,原因是 redsync
內部設置了默認值。以下是 redsync
中 NewMutex
方法的源碼:
// NewMutex returns a new distributed mutex with given name.
func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
m := &Mutex{
name: name,
expiry: 8 * time.Second,
tries: 32,
delayFunc: func(tries int) time.Duration {
return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
},
genValueFunc: genValue,
driftFactor: 0.01,
timeoutFactor: 0.05,
quorum: len(r.pools)/2 + 1,
pools: r.pools,
}
for _, o := range options {
o.Apply(m)
}
if m.shuffle {
randomPools(m.pools)
}
return m
}
這裏 Mutex
對象的第二個字段 expiry
就是分佈式鎖的過期時間,這裏默認爲設爲 8 秒。tries
字段是獲取鎖的重試次數,即嘗試獲取鎖失敗 32 次以後,纔會返回加鎖失敗,因爲分佈式場景下失敗是很正常的情況,所以 32 次並不是一個很誇張的值。delayFunc
字段是每次失敗後重試的間隔時間。其他字段我就不一一講解了,絕大多數我們都用不到。
根據代碼我們很容易想到這幾個字段是通過選項模式來設置的。
-
WithExpiry(time.Duration)
:設置鎖的自動過期時間(建議大於業務執行時間)。 -
WithTries(int)
:設置最大重試次數。 -
WithRetryDelay(time.Duration)
:設置重試間隔。
使用示例:
mutex := rs.NewMutex("test-redsync",
redsync.WithExpiry(30*time.Second),
redsync.WithTries(3),
redsync.WithRetryDelay(500*time.Millisecond),
)
看門狗
我們現在知道使用分佈式鎖一定要設置一個過期時間了,但是這會帶來另外一個問題:如果我們的業務代碼還沒執行完,鎖就過期自動釋放了,那麼此時另外一個進程成功拿到這把鎖,也來訪問競態資源,那分佈式鎖不就失去意義了嗎?
這就引出了使用分佈式鎖的另一個重要問題,鎖自動續期。我舉一個代碼示例,你就懂了:
package main
import (
"context"
"log/slog"
"time"
"github.com/go-redsync/redsync/v4" // 引入 redsync 庫,用於實現基於 Redis 的分佈式鎖
"github.com/go-redsync/redsync/v4/redis/goredis/v9"// 引入 redsync 的 goredis 連接池
goredislib "github.com/redis/go-redis/v9" // 引入 go-redis 庫,用於與 Redis 服務器通信
)
func main() {
// 創建一個 Redis 客戶端
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:36379", // Redis 服務器地址
Password: "nightwatch",
})
// 使用 go-redis 客戶端創建一個 redsync 連接池
pool := goredis.NewPool(client)
// 創建一個 redsync 實例,用於管理分佈式鎖
rs := redsync.New(pool)
// 創建一個名爲 "test-redsync" 的互斥鎖(Mutex)
mutex := rs.NewMutex("test-redsync", redsync.WithExpiry(5*time.Second))
// 創建一個上下文(context),一般用於控制鎖的超時和取消
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 獲取鎖,如果獲取失敗(例如鎖已被其他進程持有),會返回錯誤
if err := mutex.LockContext(ctx); err != nil {
panic(err) // 如果獲取鎖失敗,程序會 panic
}
// 看門狗,實現鎖自動續約
stopCh := make(chanstruct{})
ticker := time.NewTicker(2 * time.Second) // 每隔 2s 續約一次
defer ticker.Stop()
gofunc() {
for {
select {
case <-ticker.C:
// 續約,延長鎖的過期時間
if ok, err := mutex.ExtendContext(ctx); !ok || err != nil {
slog.Error("Failed to extend mutex", "err", err, "status", ok)
} else {
slog.Info("Successfully extend mutex")
}
case <-stopCh:
slog.Info("Exiting mutex watchdog")
return
}
}
}()
// 執行業務邏輯
time.Sleep(6 * time.Second)
// 通知看門狗停止自動續期
stopCh <- struct{}{}
// 釋放鎖,如果釋放失敗(例如鎖已過期或不屬於當前進程),會返回錯誤
if _, err := mutex.UnlockContext(ctx); err != nil {
panic(err) // 如果釋放鎖失敗,程序會 panic
}
}
這個示例延續了前文中的示例代碼,你需要重點關注的是如下這部分邏輯:
// 看門狗,實現鎖自動續約
stopCh := make(chanstruct{})
ticker := time.NewTicker(2 * time.Second) // 每隔 2s 續約一次
defer ticker.Stop()
gofunc() {
for {
select {
case <-ticker.C:
// 續約,延長鎖的過期時間
if ok, err := mutex.ExtendContext(ctx); !ok || err != nil {
slog.Error("Failed to extend mutex", "err", err, "status", ok)
} else {
slog.Info("Successfully extend mutex")
}
case <-stopCh:
slog.Info("Exiting mutex watchdog")
return
}
}
}()
redsync
提供了 mutex.ExtendContext(ctx)
方法可以延長鎖的過期時間。假設我們申請的分佈式鎖過期時間是 5 秒,而業務代碼執行時間是未知的,那麼我們在拿到鎖以後,可以單獨開啓一個 goroutine 來定時延長鎖的過期時間,當業務代碼執行完成以後,主 goroutine 通過 stopCh <- struct{}{}
向子 goroutine 發送停止信號,那麼子 goroutine 中的 <-stopCh
case 就會收到通知,子 goroutine 便會退出,也就停止了鎖自動續期。
通過爲分佈式鎖設置過期時間,再配合子 goroutine 自動續期的功能,我們就能保證,持有鎖的進程掛掉時不會影響其他進程獲取鎖,並且還能實現業務執行完成後才釋放鎖。而這個實現分佈式鎖自動續期的程序,我們通常把它叫做 “看門狗”。
我再額外囉嗦一句,關於分佈式鎖的續期時常和間隔週期的問題,一般來說,續期的時間可以設置爲等於過期時間,即鎖的過期時間設爲 5 秒,那麼每次也只續期 5 秒,redsync
內部也是這麼做的,至於間隔多久續期一次,這個時間肯定是要小於過期時間 5 秒的,通常設爲鎖過期時間的 1/3 或 1/2 都可以。
redsync
原理
我上面講解的 redsync
用法基本上能覆蓋業務開發中的大部分場景了,對於 redsync
更多的功能我就不過多介紹了,有了現有的知識,你遇到了問題也可以自己查閱文檔學習。
下面我想講點更有價值的東西,我們自己來實現一個微型的 Redis 分佈式鎖,以此來加深你對 redsync
的理解。
如何實現一個 Redis 分佈式鎖
要基於 Redis 實現一個最小化的分佈式鎖,我們可以定義一個結構體 MiniRedisMutex
作爲鎖對象:
type MiniRedisMutex struct {
name string // 會作爲分佈式鎖在 Redis 中的 key
expiry time.Duration // 鎖過期時間
conn redis.Cmdable // Redis Client
}
它僅包含必要的字段,name
是鎖的名稱,expiry
是分佈式鎖必須要有的過期時間,conn
用來存儲 Redis 客戶端連接。
我們可以定義一個構造函數 NewMutex
來創建分佈式鎖對象:
func NewMutex(name string, expiry time.Duration, conn redis.Cmdable) *MiniRedisMutex {
return &MiniRedisMutex{name, expiry, conn}
}
接下來就要實現加鎖和解鎖這兩個功能。
加鎖方法 Lock
實現如下:
func (m *MiniRedisMutex) Lock(ctx context.Context, value string) (bool, error) {
reply, err := m.conn.SetNX(ctx, m.name, value, m.expiry).Result()
if err != nil {
return false, err
}
return reply, nil
}
Lock
方法接收兩個參數,ctx
用來控制取消,value
則會作爲鎖的值。
Lock
方法內部邏輯非常簡單,直接調用 Redis 的 SetNX
命令來排他的設置一個鍵值對,鎖名稱 name
作爲 Redis 的 key
,鎖的值 value
作爲 Redis 的 value
,並指定過期時間爲 expiry
,這就是分佈式鎖的加鎖原理。
這裏有兩個關鍵點需要你注意:
-
使用
SetNX
命令:這裏之所以使用SetNX
命令而不是普通的Set
命令,是因爲加鎖操作需要排他性。我們知道,SetNX
命令的全稱是SET if Not eXists
,即通過SetNX
命令設置鍵值對時,如果key
不存在,設置其value
,若key
已存在,則不執行任何操作。這剛好符合互斥性,是實現分佈式互斥鎖的關鍵所在。 -
value
唯一性:雖然SetNX
命令能夠實現互斥,但是 Redis 的value
還是要保證唯一性。這一點我們接着往下看你就明白了。
釋放鎖方法 Unlock
實現如下:
// 釋放鎖的 lua 腳本,保證併發安全
var deleteScript = `
local val = redis.call("GET", KEYS[1])
if val == ARGV[1] then
return redis.call("DEL", KEYS[1])
elseif val == false then
return -1
else
return 0
end
`
// Unlock 釋放鎖
func (m *MiniRedisMutex) Unlock(ctx context.Context, value string) (bool, error) {
// 執行 lua 腳本,Redis 會保證其併發安全
status, err := m.conn.Eval(ctx, deleteScript, []string{m.name}, value).Result()
if err != nil {
returnfalse, err
}
if status == int64(-1) {
returnfalse, ErrLockAlreadyExpired
}
return status != int64(0), nil
}
在釋放鎖的邏輯中,我們不是簡單的將指定的 Redis 鍵值對刪除即可,而是調用 m.conn.Eval
方法執行了一段 lua 腳本的方式來釋放鎖。
在這段 lua 腳本中,我們先是從 Redis 中獲取指定 key
爲 m.name
的鍵值對,然後判斷其 value
是否等於 Unlock
方法傳入的 value
參數值,如果相等,則從 Redis 中刪除指定的鍵值對,表示釋放鎖,否則什麼也不做。
之所以要對 value
進行判斷,是因爲我們要保證這把鎖是當前進程所持有的鎖,而不是其他進程持有的鎖。那麼以什麼爲依據來說明這把鎖是當前進程持有的呢?這就是我們要保證 value
唯一的原因,每個進程在加鎖的時候,需要生成一個隨機的 value
作爲自己的鎖的標識,那麼釋放時,就可以通過這個 value
來判斷是否是自己持有的鎖。而這樣做的目的,是爲了避免一個進程搶到鎖後,還在執行業務邏輯時,鎖被另外一個進程給釋放了。
遺憾的是,這段釋放鎖的邏輯,Redis 沒有提供像 SetNX
一樣的快捷命令,所以我們只能將其放在 lua 腳本中執行,才能保證併發安全。
至此,一個微型的 Redis 分佈式鎖的核心功能咱們就講解完成了。
以下是 MiniRedisMutex
分佈式鎖完整的代碼實現:
package miniredislock
import (
"context"
"errors"
"time"
"github.com/redis/go-redis/v9"
)
var ErrLockAlreadyExpired = errors.New("miniredislock: failed to unlock, lock was already expired")
// MiniRedisMutex 一個微型的 Redis 分佈式鎖
type MiniRedisMutex struct {
name string // 會作爲分佈式鎖在 Redis 中的 key
expiry time.Duration // 鎖過期時間
conn redis.Cmdable // Redis Client
}
// NewMutex 創建 Redis 分佈式鎖
func NewMutex(name string, expiry time.Duration, conn redis.Cmdable) *MiniRedisMutex {
return &MiniRedisMutex{name, expiry, conn}
}
// Lock 加鎖
func (m *MiniRedisMutex) Lock(ctx context.Context, value string) (bool, error) {
reply, err := m.conn.SetNX(ctx, m.name, value, m.expiry).Result()
if err != nil {
returnfalse, err
}
return reply, nil
}
// 釋放鎖的 lua 腳本,保證併發安全
var deleteScript = `
local val = redis.call("GET", KEYS[1])
if val == ARGV[1] then
return redis.call("DEL", KEYS[1])
elseif val == false then
return -1
else
return 0
end
`
// Unlock 釋放鎖
func (m *MiniRedisMutex) Unlock(ctx context.Context, value string) (bool, error) {
// 執行 lua 腳本,Redis 會保證其併發安全
status, err := m.conn.Eval(ctx, deleteScript, []string{m.name}, value).Result()
if err != nil {
returnfalse, err
}
if status == int64(-1) {
returnfalse, ErrLockAlreadyExpired
}
return status != int64(0), nil
}
其實,這段代碼的主要邏輯,都是我從 redsync
源碼中提取出來。所以 redsync
其實也是這樣實現的,只不過它內部增加了很多可靠性和邊緣場景等邏輯代碼,最核心的加鎖和解鎖邏輯是一樣的。
微型分佈式鎖使用
下面我們來寫一個示例程序,演示下如何使用這個微型的分佈式鎖:
package main
import (
"fmt"
"time"
goredislib "github.com/redis/go-redis/v9"
"golang.org/x/net/context"
"github.com/jianghushinian/blog-go-example/redsync/miniredislock"
)
func main() {
// 創建一個 Redis 客戶端
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:36379", // Redis 服務器地址
Password: "nightwatch",
})
defer client.Close()
// 創建一個名爲 "test-miniredislock" 的互斥鎖
mutex := miniredislock.NewMutex("test-miniredislock", 5*time.Second, client)
ctx := context.Background()
// 互斥鎖的值應該是一個隨機值
value := "random-string"
// 獲取鎖
_, err := mutex.Lock(ctx, value)
if err != nil {
panic(err)
}
// 執行業務邏輯
fmt.Println("do something...")
time.Sleep(3 * time.Second)
// 釋放自己持有的鎖
_, err = mutex.Unlock(ctx, value)
if err != nil {
panic(err)
}
}
這個示例的具體邏輯我就不逐行講解了,相信你一看便懂。也希望你能夠自己在本機上跑起來這段代碼,真正用一下分佈式鎖,以此加深理解。
最後我再留一個作業,你可以嘗試一下實現鎖的續期方法 Extend
。
總結
分佈式鎖可以確保分佈式系統中併發安全的訪問競態資源,redsync
作爲 Go 中最流行的 Redis 分佈式鎖方案,非常值得我們學習和使用。
redsync
的用法非常簡單,加鎖和解鎖操作與 sync.Mutex
也非常類似,沒有太多的學習成本。不過,爲了避免持有鎖的進程掛掉時,其他進程還有機會獲取鎖,我們需要實現看門狗的功能。
我還帶你從零實現了一個微型的 Redis 分佈式鎖,希望你不僅會用 redsync 分佈式鎖,還能理解其原理,這樣在自己的業務開發中,如果遇到問題,我們才能更加得心應手。
本文示例源碼我都放在了 GitHub 中,歡迎點擊查看。
希望此文能對你有所啓發。
延伸閱讀
-
Distributed Locks with Redis:https://redis.io/docs/latest/develop/use/patterns/distributed-locks/
-
redsync Documentation:https://pkg.go.dev/github.com/go-redsync/redsync
-
redsync GitHub 源碼:https://github.com/go-redsync/redsync
-
go-redis GitHub 源碼:https://github.com/redis/go-redis
-
本文 GitHub 示例代碼:https://github.com/jianghushinian/blog-go-example/tree/main/redsync
聯繫我
-
公衆號:Go 編程世界
-
微信:jianghushinian
-
郵箱:jianghushinian007@outlook.com
-
博客:https://jianghushinian.cn
-
GitHub:https://github.com/jianghushinian
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/8HJXRcCoyZqeC_b6l-TD0w