Golang 分佈式鎖技術攻略
0 前言
幾個月前和大家分享過一篇——Golang 單機鎖實現原理,恰逢最近在研究 etcd 的 watch 機制,這是一項可以應用於實現分佈式鎖的核心能力,於是擇日不日撞日,接下來就和大家一起來聊聊單機鎖的升級版本——Golang 分佈式鎖技術攻略介紹
本文內容分爲兩部分:第一部分聊聊如何通過 redis 實現主動輪詢模型下的分佈式鎖;第二部分和大家一起探討如何使用 etcd 實現 watch 回調模型下的分佈式鎖. 兩個部分均會通過原理闡述結合源碼剖析的方式爲大家作具體的內容介紹
1 分佈式鎖
1.1 使用背景
在併發場景中,爲了保證臨界資源的數據一致性,我們會經常使用到 “鎖” 這個工具對臨界資源進行保護,讓混亂的併發訪問行爲退化爲秩序的串行訪問行爲.
在本地環境中,由於多線程之間能夠共享進程的數據,因此可以比較簡單地實現進程內的互斥鎖;然而在分佈式場景中,有時我們需要跨域多個物理節點執行加鎖操作,因此我們就需要依賴到類似於 redis、mysql 這樣的狀態存儲組件,在此基礎之上實現所謂的 “分佈式鎖” 技術.
1.2 核心性質
分佈式鎖應當具備如下幾項核心性質:
-
• 獨佔性:對於同一把鎖,在同一時刻只能被一個取鎖方佔有,這是作爲 “鎖” 工具最基礎的一項性質
-
• 健壯性:即不能產生死鎖(dead lock). 假如某個佔有鎖的使用方因爲宕機而無法主動執行解鎖動作,鎖也應該能夠被正常傳承下去,被其他使用方所延續使用
-
• 對稱性:加鎖和解鎖的使用方必須爲同一身份. 不允許非法釋放他人持有的分佈式鎖
-
• 高可用:當提供分佈式鎖服務的基礎組件中存在少量節點發生故障時,不應該影響到分佈式鎖服務的穩定性
1.3 實現類型
分佈式鎖根據其實現模型,可以被劃分爲兩大類:
-
• 主動輪詢型:該模型類似於單機鎖中的主動輪詢 + cas 樂觀鎖模型,取鎖方會持續對分佈式鎖發出嘗試獲取動作,如果鎖已被佔用則會不斷髮起重試,直到取鎖成功爲止
-
• watch 回調型:在取鎖方發現鎖已被他人佔用時,會創建 watcher 監視器訂閱鎖的釋放事件,隨後不再發起主動取鎖的嘗試;當鎖被釋放後,取鎖方能通過之前創建的 watcher 感知到這一變化,然後再重新發起取鎖的嘗試動作
1.4 一些個人理解
在單機環境中,主動輪詢和 watch 回調兩種鎖模型各有優劣,所謂的”優 “和” 劣“也是相對而言,需要對 cpu 空轉以及阻塞協程兩種行爲的損耗做出權衡. (大家對這部分概念如果不清晰,可以閱讀一下我之前發表的文章——”Golang 單機鎖實現原理“).
然而,在分佈式場景中,我個人覺得優勢的天平在略微朝着 watch 回調型的實現策略傾斜. 這是因爲分佈式場景中” 輪詢 “這一動作的成本相比於單機鎖而言要高很多,背後存在的行爲可能是一次甚至多次網絡 IO 請求. 這種情況下,取鎖方基於 watch 回調的方式,在確保鎖被釋放、自身有機會取鎖的情況下,纔會重新發出嘗試取鎖的請求,這樣能在很大程度上避免無意義的輪詢損耗.
當然,主動輪詢型的分佈式鎖能夠保證使用方始終佔據流程的主動權,整個流程可以更加輕便靈活;此外,watch 機制在實現過程中需要建立長連接完成 watch 監聽動作,也會存在一定的資源損耗. 因此這個問題沒有標準答案,應該結合實際的需求背景採取不同的應對策略:在併發激烈程度較高時傾向於 watch 回調型分佈式鎖;反之,主動輪詢型分佈式鎖可能會是更好的選擇.
除此之外,基於 watch 回調模型實現的分佈式鎖背後可能還存在其他的問題,比如:當有多個嘗試取鎖的使用方 watch 監聽同一把鎖時,一次鎖的釋放事件可能會引發 “驚羣效應”. 這個問題以及對應的解決方案將會在本文第 4 章中進行探討.
2 主動輪詢型
2.1 實現思路
主動輪詢型分佈式鎖的實現思路爲:
-
• 針對於同一把分佈式鎖,使用同一條數據進行標識(以 redis 爲例,則爲同一個 key 對應的 kv 數據記錄)
-
• 假如在存儲介質成功插入了該條數據(要求之前該 key 對應的數據不存在),則被認定爲加鎖成功
-
• 把從存儲介質中刪除該條數據這一行爲理解爲釋放鎖操作
-
• 倘若在插入該條數據時,發現數據已經存在(鎖已被他人持有),則持續輪詢,直到數據被他人刪除(他人釋放鎖),並由自身完成數據插入動作爲止(取鎖成功)
-
• 由於是併發場景,需要保證【 (1)檢查數據是否已被插入(2)數據不存在則插入數據 】這兩個步驟之間是原子化不可拆分的(在 redis 中是 set only if not exist —— SETNX 操作)
2.2 技術選型
實現主動輪詢型分佈式鎖時,我們常用的組件包括 redis 和 mysql.
(1)redis
在實現主動輪詢型分佈式鎖時,redis 算得上是大家最常用的組件. 在第 3 章中,本文會以 redis 爲例,進行主動輪詢型分佈式鎖的實踐介紹.
redis 官方文檔:https://redis.io/
redis 基於內存實現數據的存儲,因此足夠高輕便高效. 此外,redis 基於單線程模型完成數據處理工作,支持 SETNX 原子指令(set only if not exist),能夠很方便地支持分佈式鎖的加鎖操作.
setnx 使用文檔:https://redis.io/commands/setnx/ (事實上,在 redis 2.6.12 版本之後,setnx 操作已經被棄置,官方推薦大家使用 set 指令並附加 nx 參數來實現與 setnx 指令相同的效果)
此外,redis 還支持使用 lua 腳本自定義組裝同一個 redis 節點下的多筆操作形成一個具備原子性的事務.
redis lua 腳本使用文檔:https://redis.io/docs/manual/programmability/eval-intro/
在通過 redis 實現分佈式鎖時,我們可以通過將 key 對應的 value 設置爲使用方的身份標識. 在解鎖流程中,通過 lua 腳本組裝步驟:【(1)檢查釋放鎖動作執行者的身份;(2)身份合法時才進行解鎖】. 如此一來,分佈式鎖的對稱性也就得以保證了.
(2)mysql
mysql 官方文檔:https://www.mysql.com/
通過經典的關係型數據庫 mysql 關也能實現和 redis 類似的效果.
-
• 建立一張用於存儲分佈式鎖記錄的數據表
-
• 以分佈式鎖的標識鍵作爲表中的唯一鍵(類比於 redis 中的 key)
-
• 基於唯一鍵的特性,同一把鎖只能被插入一條數據,因此也就只能由一個使用方持有鎖
-
• 當鎖被佔有時,其他取鎖方嘗試插入數據時,會被 mysql 表的唯一鍵所攔截報錯,進而感知到鎖已被佔用這一情報
-
• 在表中可以新增一個字段標識使用方的身份. 完整的解鎖動作可以基於 mysql 事務(使用 innodb 引擎)保證原子性:【(1)檢查釋放鎖動作執行者的身份;(2)身份合法時才進行解鎖】. 基於此,分佈式鎖的對稱性性質能夠得到保證.
2.3 死鎖問題
下一個問題是,我們在設計主動輪詢型分佈式鎖時,如何避免出現死鎖問題而導致分佈式鎖不可能用呢?
這項能力在 mysql 中顯得捉襟見肘,不過在使用 redis 時,我們可以通過過期時間 expire time 機制得以保證. 我們通常會在插入分佈式鎖對應的 kv 數據時設置一個過期時間 expire time,這樣即便使用方因爲異常原因導致無法正常解鎖,鎖對應的數據項也會在達到過期時間閾值後被自動刪除,實現釋放分佈式鎖的效果.
值得一提的是,這種過期機制的引入也帶來了新的問題:因爲鎖的持有者並不能精確預判到自己持鎖後處理業務邏輯的實際耗時,因此此處設置的過期時間只能是一個偏向於保守的經驗值,假如因爲一些異常情況導致佔有鎖的使用方在業務處理流程中的耗時超過了設置的過期時間閾值,就會導致鎖被提前釋放,其他取鎖方可能取鎖成功,最終引起數據不一致的併發問題.
針對於這個問題,在分佈式鎖工具 redisson 中給出瞭解決方案——看門狗策略(watch dog strategy):在鎖的持有方未完成業務邏輯的處理時,會持續對分佈式鎖的過期閾值進行延期操作. 這部分內容不屬於本文的討論範疇,後續我們找機會單開一篇,連帶着 redis 中的 redlock 策略一起展開聊聊.
2.4 弱一致性問題
回顧 redis 的設計思路,爲避免單點故障問題,redis 會基於主從複製的方式實現數據備份. (以哨兵機制爲例,哨兵會持續監聽 master 節點的健康狀況,倘若 master 節點發生故障,哨兵會負責扶持 slave 節點上位,以保證整個集羣能夠正常對外提供服務). 此外,在 CAP 體系中,redis 走的是 AP 路線,爲保證服務的吞吐性能,主從節點之間的數據同步是異步延遲進行的.
到這裏問題就來了,試想一種場景:倘若 使用方 A 在 redis master 節點加鎖成功,但是對應的 kv 記錄在同步到 slave 之前,master 節點就宕機了. 此時未同步到這項數據的 slave 節點升爲 master,這樣分佈式鎖被 A 持有的 “憑證” 就這樣憑空消失了. 於是不知情的使用方 B C D 都可能加鎖成功,於是就出現了一把鎖被多方同時持有的問題,導致分佈式鎖最基本的獨佔性遭到破壞.
關於這個問題,一個比較經典的解決方案是:redis 紅鎖(redlock,全稱 redis distribution lock),本文僅僅拋出一個引子,具體內容我們後續單開一篇再聊.
3 redis 分佈式鎖
3.1 sdk 介紹
首先,本文使用到基於 golang 編寫的 redis 客戶端 sdk:redigo,用於和 redis 組件進行交互.
redigo 開源地址:https://github.com/gomodule/redigo
本文使用到的 redigo 源碼版本爲 v1.8.9
在 redigo 基礎之上,我個人編寫了一款基於 redis 實現的分佈式鎖 sdk,支持阻塞和非阻塞兩種模式進行取鎖. 項目已於 github 開源:https://github.com/xiaoxuxiansheng/redis_lock
由於個人水平有限,如有不當之處,敬請諒解,並歡迎大家批評指正.
3.2 源碼介紹
(1)redis 客戶端
-
• 在 redigo 的基礎之上,我封裝實現了一個 redis 客戶端 Client,內置了一個連接池 redis.pool 進行 redis 連接的複用
-
• 客戶端 Client 對外暴露了 SetNEX 方法,語義是 set with expire time only if key not exist. 用於支持分佈式鎖的加鎖操作
-
• 客戶端 Client 對外暴露了 Eval 方法,用以執行 lua 腳本,後續用來支持分佈式鎖的解鎖操作
package redis_lock
import (
"context"
"errors"
"time"
"github.com/gomodule/redigo/redis"
)
// Client Redis 客戶端.
type Client struct {
ClientOptions
pool *redis.Pool
}
func NewClient(network, address, password string, opts ...ClientOption) *Client {
c := Client{
ClientOptions: ClientOptions{
network: network,
address: address,
password: password,
},
}
for _, opt := range opts {
opt(&c.ClientOptions)
}
repairClient(&c.ClientOptions)
pool := c.getRedisPool()
return &Client{
pool: pool,
}
}
func (c *Client) getRedisPool() *redis.Pool {
return &redis.Pool{
MaxIdle: c.maxIdle,
IdleTimeout: time.Duration(c.idleTimeoutSeconds) * time.Second,
Dial: func() (redis.Conn, error) {
c, err := c.getRedisConn()
if err != nil {
return nil, err
}
return c, nil
},
MaxActive: c.maxActive,
Wait: c.wait,
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func (c *Client) getRedisConn() (redis.Conn, error) {
if c.address == "" {
panic("Cannot get redis address from config")
}
var dialOpts []redis.DialOption
if len(c.password) > 0 {
dialOpts = append(dialOpts, redis.DialPassword(c.password))
}
conn, err := redis.DialContext(context.Background(),
c.network, c.address, dialOpts...)
if err != nil {
return nil, err
}
return conn, nil
}
func (c *Client) GetConn(ctx context.Context) (redis.Conn, error) {
return c.pool.GetContext(ctx)
}
// 只有 key 不存在時,能夠 set 成功. set 時攜帶上超時時間,單位秒.
func (c *Client) SetNEX(ctx context.Context, key, value string, expireSeconds int64) (int64, error) {
if key == "" || value == "" {
return -1, errors.New("redis SET keyNX or value can't be empty")
}
conn, err := c.pool.GetContext(ctx)
if err != nil {
return -1, err
}
defer conn.Close()
reply, err := conn.Do("SET", key, value, "EX", expireSeconds, "NX")
if err != nil {
return -1, nil
}
r, _ := reply.(int64)
return r, nil
}
// Eval 支持使用 lua 腳本.
func (c *Client) Eval(ctx context.Context, src string, keyCount int, keysAndArgs []interface{}) (interface{}, error) {
args := make([]interface{}, 2+len(keysAndArgs))
args[0] = src
args[1] = keyCount
copy(args[2:], keysAndArgs)
conn, err := c.pool.GetContext(ctx)
if err != nil {
return -1, err
}
defer conn.Close()
return conn.Do("EVAL", args...)
}
(2)redis 分佈式鎖
-
• 定義了 redis 分佈式鎖的類型:RedisLock
-
• 鎖 RedisLock 中需要內置一個 redis 客戶端 Client,用於後續的請求交互
-
• 鎖實例被創建時,需要顯式指定鎖的標識鍵 key
-
• 鎖被創建時,會取創建者的進程 id + 協程 id,拼接生成 token,作爲使用方的身份標識
-
• 用戶可以使用 option 配置項,聲明創建的鎖是否是阻塞模式,鎖對應的過期時間閾值以及等鎖超時閾值等配置
package redis_lock
import (
"context"
"errors"
"fmt"
"time"
"github.com/xiaoxuxiansheng/redis_lock/utils"
)
var ErrLockAcquiredByOthers = errors.New("lock is acquired by others")
func IsRetryableErr(err error) bool {
return errors.Is(err, ErrLockAcquiredByOthers)
}
// 基於 redis 實現的分佈式鎖,不可重入,但保證了對稱性
type RedisLock struct {
LockOptions
key string
token string
client *Client
}
func NewRedisLock(key string, client *Client, opts ...LockOption) *RedisLock {
r := RedisLock{
key: key,
token: utils.GetProcessAndGoroutineIDStr(),
client: client,
}
for _, opt := range opts {
opt(&r.LockOptions)
}
repairLock(&r.LockOptions)
return &r
}
package utils
import (
"fmt"
"os"
"runtime"
"strconv"
"strings"
)
func GetCurrentProcessID() string {
return strconv.Itoa(os.Getpid())
}
// GetCurrentGoroutineID 獲取當前的協程ID
func GetCurrentGoroutineID() string {
buf := make([]byte, 128)
buf = buf[:runtime.Stack(buf, false)]
stackInfo := string(buf)
return strings.TrimSpace(strings.Split(strings.Split(stackInfo, "[running]")[0], "goroutine")[1])
}
func GetProcessAndGoroutineIDStr() string {
return fmt.Sprintf("%s_%s", GetCurrentProcessID(), GetCurrentGoroutineID())
}
package redis_lock
const (
// 默認連接池超過 10 s 釋放連接
DefaultIdleTimeoutSeconds = 10
// 默認最大激活連接數
DefaultMaxActive = 100
// 默認最大空閒連接數
DefaultMaxIdle = 20
)
type ClientOptions struct {
maxIdle int
idleTimeoutSeconds int
maxActive int
wait bool
// 必填參數
network string
address string
password string
}
type ClientOption func(c *ClientOptions)
func WithMaxIdle(maxIdle int) ClientOption {
return func(c *ClientOptions) {
c.maxIdle = maxIdle
}
}
func WithIdleTimeoutSeconds(idleTimeoutSeconds int) ClientOption {
return func(c *ClientOptions) {
c.idleTimeoutSeconds = idleTimeoutSeconds
}
}
func WithMaxActive(maxActive int) ClientOption {
return func(c *ClientOptions) {
c.maxActive = maxActive
}
}
func WithWaitMode() ClientOption {
return func(c *ClientOptions) {
c.wait = true
}
}
func repairClient(c *ClientOptions) {
if c.maxIdle < 0 {
c.maxIdle = DefaultMaxIdle
}
if c.idleTimeoutSeconds < 0 {
c.idleTimeoutSeconds = DefaultIdleTimeoutSeconds
}
if c.maxActive < 0 {
c.maxActive = DefaultMaxActive
}
}
type LockOption func(*LockOptions)
func WithBlock() LockOption {
return func(o *LockOptions) {
o.isBlock = true
}
}
func WithBlockWaitingSeconds(waitingSeconds int64) LockOption {
return func(o *LockOptions) {
o.blockWaitingSeconds = waitingSeconds
}
}
func WithExpireSeconds(expireSeeconds int64) LockOption {
return func(o *LockOptions) {
o.expireSeconds = expireSeeconds
}
}
func repairLock(o *LockOptions) {
if o.isBlock && o.blockWaitingSeconds <= 0 {
// 默認阻塞等待時間上限爲 5 秒
o.blockWaitingSeconds = 5
}
// 分佈式鎖默認超時時間爲 30 秒
if o.expireSeconds <= 0 {
o.expireSeconds = 30
}
}
type LockOptions struct {
isBlock bool
blockWaitingSeconds int64
expireSeconds int64
}
(3)非阻塞模式加鎖
-
• 倘若鎖處於非阻塞模式,則只會執行一次 tryLock 方法進行嘗試加鎖動作,倘若失敗,就直接返回錯誤
-
• tryLock 操作基於 redis 的 setNEX 操作實現,即基於原子操作實現 set with expire time only if key not exist 的語義
const RedisLockKeyPrefix = "REDIS_LOCK_PREFIX_"
// Lock 加鎖.
func (r *RedisLock) Lock(ctx context.Context) error {
// 不管是不是阻塞模式,都要先獲取一次鎖
err := r.tryLock(ctx)
if err == nil {
return nil
}
// 非阻塞模式加鎖失敗直接返回錯誤
if !r.isBlock {
return err
}
// 判斷錯誤是否可以允許重試,不可允許的類型則直接返回錯誤
if !IsRetryableErr(err) {
return err
}
// 基於阻塞模式持續輪詢取鎖
return r.blockingLock(ctx)
}
func (r *RedisLock) tryLock(ctx context.Context) error {
// 首先查詢鎖是否屬於自己
reply, err := r.client.SetNEX(ctx, r.getLockKey(), r.token, r.expireSeconds)
if err != nil {
return err
}
if reply != 1 {
return fmt.Errorf("reply: %d, err: %w", reply, ErrLockAcquiredByOthers)
}
return nil
}
func (r *RedisLock) getLockKey() string {
return RedisLockKeyPrefix + r.key
}
(4)阻塞模式加鎖
-
• 當鎖處在阻塞模式下,會通過 ticker,每隔 50 ms 執行一次嘗試取鎖的請求(tryLock:setNEX)
-
• 倘若某次請求取鎖成功,則直接返回
-
• 倘若達到等鎖超時閾值或者中途發生了預期之外的錯誤,則會終止流程
func (r *RedisLock) blockingLock(ctx context.Context) error {
// 阻塞模式等鎖時間上限
timeoutCh := time.After(time.Duration(r.blockWaitingSeconds) * time.Second)
// 輪詢 ticker,每隔 50 ms 嘗試取鎖一次
ticker := time.NewTicker(time.Duration(50) * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
select {
// ctx 終止了
case <-ctx.Done():
return fmt.Errorf("lock failed, ctx timeout, err: %w", ctx.Err())
// 阻塞等鎖達到上限時間
case <-timeoutCh:
return fmt.Errorf("block waiting time out, err: %w", ErrLockAcquiredByOthers)
// 放行
default:
}
// 嘗試取鎖
err := r.tryLock(ctx)
if err == nil {
// 加鎖成功,返回結果
return nil
}
// 不可重試類型的錯誤,直接返回
if !IsRetryableErr(err) {
return err
}
}
return nil
}
(5)解鎖
-
• 解鎖動作基於 lua 腳本執行
-
• lua 腳本執行內容分爲兩部分:【(1)校驗當前操作者是否擁有鎖的所有權(2)倘若是,則釋放鎖】
// Unlock 解鎖. 基於 lua 腳本實現操作原子性.
func (r *RedisLock) Unlock(ctx context.Context) error {
keysAndArgs := []interface{}{r.getLockKey(), r.token}
reply, err := r.client.Eval(ctx, LuaCheckAndDeleteDistributionLock, 1, keysAndArgs)
if err != nil {
return err
}
if ret, _ := reply.(int64); ret != 1 {
return errors.New("can not unlock without ownership of lock)
}
return nil
}
// LuaCheckAndDeleteDistributionLock 判斷是否擁有分佈式鎖的歸屬權,是則刪除
const LuaCheckAndDeleteDistributionLock = `
local lockerKey = KEYS[1]
local targetToken = ARGV[1]
local getToken = redis.call('get',lockerKey)
if (not getToken or getToken ~= targetToken) then
return 0
else
return redis.call('del',lockerKey)
end
`
4 watch 回調型
4.1 實現思路
對於實現 watch 回調型分佈式鎖,一些基本要點和 2.1 小節中聊到的主動輪詢型分佈式鎖類似:
-
• 針對於同一把分佈式鎖,使用一條相同的數據進行標識(唯一、明確的 key)
-
• 倘若在存儲介質內成功插入該條數據(要求 key 對應的數據不存在),則這一行爲被認定爲加鎖成功
-
• 把從存儲介質中刪除該條數據這行爲理解爲解鎖操作
與主動輪詢型分佈式鎖不同的是,在取鎖失敗時,watch 回調型分佈式鎖不會持續輪詢,而是會 watch 監聽鎖的刪除事件:
- • 倘若在插入數據時,發現該條記錄已經存在,說明鎖已被他人持有,此時選擇監聽這條數據記錄的刪除事件,當對應事件發生時說明鎖被釋放了,此時才繼續嘗試取鎖
4.2 技術選型
在實現上,我們需要依賴於提供了 watch 機制的狀態存儲組件,不僅能支持數據的存儲和去重,還需要利用到其中的 watch 監聽回調功能進行鎖釋放事件的訂閱感知.
爲滿足上述訴求,我們常用的技術組件包括 etcd 和 zookeeper.
(1)etcd
etcd 官方文檔:https://etcd.io/
etcd 是一款適合用於共享配置和服務發現的分佈式 kv 存儲組件,底層基於分佈式共識算法 raft 協議保證了存儲服務的強一致和高可用.
在 etcd 中提供了 watch 監聽器的功能,即針對於指定範圍的數據,通過與 etcd 服務端節點創建 grpc 長連接的方式持續監聽變更事件. 關於 watch 機制的詳細介紹,可以參見我上一週發表的兩篇文章—— etcd watch 機制源碼解析——客戶端篇 / 服務端篇.
此外,etcd 中寫入數據時,還支持通過版本 revision 機制進行取鎖秩序的統籌協調,是一款很適合用於實現分佈式鎖的組件.
etcd 是本文在介紹 watch 回調型分佈式鎖時選取的工程實踐案例,在本文第 5 章會結合實現源碼展開介紹.
(2)zookeeper
zookeeper 官方文檔:https://zookeeper.apache.org/
ZooKeeper 是一款開源的分佈式應用協調服務,底層基於分佈式共識算法 zab 協議保證了數據的強一致性和高可用性.
zookeeper 中提供了臨時順序節點(EPHEMERAL_SEQUENTIAL)類型以及 watch 監聽器機制,能夠滿足實現 watch 回調型分佈式鎖所需要具備的一切核心能力.
不過在本文中,zk 部分我們不多作展開,介紹內容以 etcd 爲核心.
4.3 死鎖問題
爲避免死鎖問題的產生,etcd 中提供了租約 lease 機制. 租約,顧名思義,是一份具有時效性的協議,一旦達到租約上規定的截止時間,租約就會失去效力. 同時,etcd 中還提供了續約機制(keepAlive),用戶可以通過續約操作來延遲租約的過期時間.
那麼,我們如何來利用租約 lease 機制解決分佈式鎖中可能存在的死鎖問題呢?實現思路如下:
-
• 用戶可以先申請一份租約,設定好租約的截止時間
-
• 異步啓動一個續約協程,負責在業務邏輯處理完成前,按照一定的時間節奏持續進行續約操作
-
• 在執行取鎖動作,將對應於鎖的 kv 數據和租約進行關聯綁定,使得鎖數據和租約擁有相同的過期時間屬性
在這樣的設定之下,倘若分佈式鎖的持有者出現異常狀況導致無法正常解鎖,則可以通過租約的過期機制完成對分佈式鎖的釋放,死鎖問題因此得以規避. 此外,鎖的使用方可以將租約的初始過期時間設定爲一個偏小的值,並通過續約機制來對租約的生效週期進行動態延長. 可以看到,此處 etcd 中的租約及續約機制,實現了與 redisson 中 watch dog 機制類似的效果.
4.4 驚羣效應
驚羣效應又稱爲羊羣效應:羊羣是一種紀律性很差的組織,平時就處在一種散漫無秩序地移動模式之下. 需要注意的是,在羊羣中一旦有某隻羊出現異動,其他的羊也會不假思索地一哄而上跑動起來,全然不估計附近可能有狼或者何處有更好的草源等客觀問題.
在 watch 回調型分佈式鎖的實現過程中,可能也會存在類似於驚羣效應的問題. 這裏指的是:倘若一把分佈式鎖的競爭比較激烈,那麼鎖的釋放事件可能同時被多個的取鎖方所監聽,一旦鎖真的被釋放了,所有的取鎖方都會一擁而上嘗試取鎖,然而我們知道,一個輪次中真正能夠取鎖成功的只會有一名角色,因此這個過程中會存在大量無意義的性能損耗,且釋放鎖時刻瞬間激增的請求流量也可能會對系統穩定性產生負面效應.
爲規避驚羣效應,etcd 中提供了前綴 prefix 機制以及版本 revision 機制,和 zookeeper 的臨時順序節點功能有些類似:
-
• 對於同一把分佈式鎖,鎖記錄數據的 key 擁有共同的前綴 prefix,作爲鎖的標識
-
• 每個取鎖方取鎖時,會以鎖前綴 prefix 拼接上自身的身份標識(租約 id),生成完整的 lock key. 因此各取鎖方完整的 lock key 都是互不相同的(只是有着相同的前綴),理論上所有取鎖方都能成功把鎖記錄數據插入到 etcd 中
-
• 每個取鎖方插入鎖記錄數據時,會獲得自身 lock key 處在鎖前綴 prefix 範圍下唯一且遞增的版本號 revision
-
• 取鎖方插入加鎖記錄數據不意味着加鎖成功,而是需要在插入數據後查詢一次鎖前綴 prefix 下的記錄列表,判定自身 lock key 對應的 revision 是不是其中最小的,如果是的話,才表示加鎖成功
-
• 如果鎖被他人佔用,取鎖方會 watch 監聽 revision 小於自己但最接近自己的那個 lock key 的刪除事件.
這樣所有的取鎖方就會在 revision 機制的協調下,根據取鎖序號(revision)的先後順序排成一條隊列,每當鎖被釋放,只會驚動到下一順位的取鎖方,驚羣問題得以避免.
5 etcd 分佈式鎖
5.1 sdk 介紹
etcd 開源地址:https://github.com/etcd-io/etcd
本文使用到的 etcd 源碼版本爲 v3.5.8.
etcd 作者在 etcd 的 concurrency 包下,基於 watch 機制結合 revision 機制實現了一款通用的 etcd 分佈式鎖,因此這部分代碼我不再手寫,而是會基於官方的實現示範進行源碼講解.
5.2 實現源碼
(1)數據結構
I Session
session 指的是一次訪問會話,背後對應的是一筆租約 lease. 用戶調用 NewSession 方法構造 session 實例時,執行的步驟包括:
-
• 通過 client.Grant 方法申請到一個 lease id
-
• 調用 client.KeepAlive 方法持續對租約進行續期
-
• 構造一個會話 session 實例
-
• 異步開啓一個守護協程,進行租約續期響應參數的處理(keepAlive)
const defaultSessionTTL = 60
// Session represents a lease kept alive for the lifetime of a client.
// Fault-tolerant applications may use sessions to reason about liveness.
type Session struct {
client *v3.Client
opts *sessionOptions
id v3.LeaseID
cancel context.CancelFunc
donec <-chan struct{}
}
// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
lg := client.GetLogger()
ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
for _, opt := range opts {
opt(ops, lg)
}
id := ops.leaseID
if id == v3.NoLease {
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
if err != nil {
return nil, err
}
id = resp.ID
}
ctx, cancel := context.WithCancel(ops.ctx)
keepAlive, err := client.KeepAlive(ctx, id)
if err != nil || keepAlive == nil {
cancel()
return nil, err
}
donec := make(chan struct{})
s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
// keep the lease alive until client error or cancelled context
go func() {
defer close(donec)
for range keepAlive {
// eat messages until keep alive channel closes
}
}()
return s,nil
}
假如用戶處理完成業務邏輯之後,可以通過 session.Close 方法完成會話的關閉,在方法中會通過 context 的 cancel 動作,停止對租約的續期行爲.
// Close orphans the session and revokes the session lease.
func (s *Session) Close() error {
s.Orphan()
// if revoke takes longer than the ttl, lease is expired anyway
ctx, cancel := context.WithTimeout(s.opts.ctx, time.Duration(s.opts.ttl)*time.Second)
_, err := s.client.Revoke(ctx, s.id)
cancel()
return err
}
// Orphan ends the refresh for the session lease. This is useful
// in case the state of the client connection is indeterminate (revoke
// would fail) or when transferring lease ownership.
func (s *Session) Orphan() {
s.cancel()
<-s.donec
}
II Mutex
Mutex 是 etcd 分佈式鎖的類型,其中核心字段包括:
-
• s:內置了一個會話 session
-
• pfx:分佈式鎖的公共前綴
-
• myKey:當前鎖使用方完整的 lock key,由 pfx 和 lease id 兩部分拼接而成
-
• myRev:當前鎖使用方 lock key 在公共鎖前綴 pfx 下對應的版本 revision
// Mutex implements the sync Locker interface with etcd
type Mutex struct {
s *Session
pfx string
myKey string
myRev int64
hdr *pb.ResponseHeader
}
func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx + "/", "", -1, nil}
}
(2)方法鏈路
TryLock
Mutex.TryLock 方法會執行一次嘗試加鎖的動作,倘若鎖已經被其他人佔有,則會直接返回錯誤,不會阻塞:
-
• 調用 Mutex.tryAcquire 方法插入 my key(已存在則查詢),獲取到 my key 對應的 revision 以及當前鎖的實際持有者
-
• 倘若鎖 pfx 從未被佔用過,或者鎖 pfx 下存在的 revision 中,自身的 revision 是其中最小的一個,則說明自己加鎖成功
-
• 倘若鎖已經被其他人佔用,則刪除自己加鎖時創建的 kv 對記錄,然後返回鎖已被他人佔用的錯誤
// TryLock locks the mutex if not already locked by another session.
// If lock is held by another session, return immediately after attempting necessary cleanup
// The ctx argument is used for the sending/receiving Txn RPC.
func (m *Mutex) TryLock(ctx context.Context) error {
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
client := m.s.Client()
// Cannot lock, so delete the key
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
m.myRev = -1
return ErrLocked
}
Lock
Mutex.Lock 方法採用的是阻塞加鎖的處理模式,倘若分佈式鎖已經被其他人佔用,則會持續阻塞等待時機,直到自己取鎖成功:
-
• 調用 Mutex.tryAcquire 方法插入 my key(已存在則查詢),獲取到 my key 對應的 revision 以及當前鎖的實際持有者
-
• 倘若鎖 pfx 從未被佔用過,或者鎖 pfx 下存在的 revision 中,自身的 revision 是其中最小的一個,則說明自己加鎖成功
-
• 倘若鎖已被他人佔用,調用 waitDeletes 方法,watch 監聽 revision 小於自己且最接近於自己的鎖記錄數據的刪除事件
-
• 當接收到解鎖事件後,會再檢查一下自身的租約有沒有過期,如果沒有,則說明加鎖成功
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
client := m.s.Client()
// wait for deletion revisions prior to myKey
// TODO: early termination if the session key is deleted before other session keys with smaller revisions.
_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
return werr
}
// make sure the session is not expired, and the owner key still exists.
gresp, werr := client.Get(ctx, m.myKey)
if werr != nil {
m.Unlock(client.Ctx())
return werr
}
if len(gresp.Kvs) == 0 { // is the session key lost?
return ErrSessionExpired
}
m.hdr = gresp.Header
return nil
}
tryAcquire
Mutex.tryAcquire 方法,使用方會完成鎖數據的插入以及 revision 的獲取:
-
• 基於 etcd 的事務操作,判定假如當前 my key 還沒創建過鎖的 kv 記錄,則創建 kv 記錄並執行 getOwner 操作獲取當前鎖的持有者;倘若已經創建過,則查詢對應的 kv 記錄,並調用 getOwner 獲取當前鎖的持有者
-
• 返回 my key 對應的 revision 和當前鎖的 owner(鎖 pfx 中最小 revision 的歸屬方),供上層的 Lock 或者 TryLock 方法使用
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
s := m.s
client := m.s.Client()
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return nil, err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
return resp, nil
}
waitDeletes
-
• 基於一個 for 循環實現自旋
-
• 每輪處理中,會獲取 revision 小於自己且最接近於自己的取鎖方的 key
-
• 倘若 key 不存在,則說明自己的 revision 已經是最小的,直接取鎖成功
-
• 倘若 key 存在,則調用 waitDelete 方法阻塞監聽這個 key 的刪除事件
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision are deleted.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
for {
resp, err := client.Get(ctx, pfx, getOpts...)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return resp.Header, nil
}
lastKey := string(resp.Kvs[0].Key)
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return nil, err
}
}
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
cctx, cancel := context.WithCancel(ctx)
defer cancel()
var wr v3.WatchResponse
wch := client.Watch(cctx, key, v3.WithRev(rev))
for wr = range wch {
for _, ev := range wr.Events {
if ev.Type == mvccpb.DELETE {
return nil
}
}
}
if err := wr.Err(); err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
return errors.New("lost watcher waiting for delete")
}
unlock
解鎖時直接刪除自己的 kv 對記錄即可,假如自己是持有鎖的角色,那麼刪除 kv 對記錄就是真正意義上的解鎖動作;即便自己並無持有鎖,刪除 kv 對就代表自己退出了搶鎖流程,也不會對流程產生負面影響.
這裏大家可能會存在一個疑問,就是假如執行 unlock 操作的角色本身只是處在等鎖隊列中,並未真正持有鎖,那麼執行刪除 kv 對記錄時是否會誤將隊列中的下一個取鎖方誤喚醒,引起秩序混亂?
答案是不會的,大家可以回過頭觀察 waitDeletes 方法的實現邏輯,取鎖方在從 waitDelete 方法中接收到前一筆 kv 記錄的刪除事件而被喚醒後,它會接着查詢一輪比它小且最接近的 revision 對應的 kv 對記錄,如果存在則繼續進行監聽,直到這樣的 kv 數據不存在時纔會取鎖成功(my revision 已經是鎖 pfx 下最小的 revision).
func (m *Mutex) Unlock(ctx context.Context) error {
if m.myKey == "" || m.myRev <= 0 || m.myKey == "\x00" {
return ErrLockReleased
}
if !strings.HasPrefix(m.myKey, m.pfx) {
return fmt.Errorf("invalid key %q, it should have prefix %q", m.myKey, m.pfx)
}
client := m.s.Client()
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
m.myRev = -1
return nil
6 總結
本篇和大家一起探討了如何基於 Golang 實現主動輪詢和 watch 回調兩種模式的分佈式鎖. 本文分別以 redis 和 etcd 兩個組件爲例進行了分佈式鎖的原理介紹及源碼展示.
redis 可以算是我們最常用於實現分佈式鎖的組件,但是由於其中缺少續約機制以及存在數據弱一致性的問題,導致分佈式鎖的獨佔性並不能夠得到保證. 後續我會單獨開一個篇章,和大家一起聊聊如何通過 watch dog 和 redlock 機制解決 redis 分佈式鎖可能存在的安全隱患.
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/KYiZvFRX0CddJVCwyfkLfQ