Golang 分佈式鎖技術攻略

0 前言

幾個月前和大家分享過一篇——Golang 單機鎖實現原理,恰逢最近在研究 etcd 的 watch 機制,這是一項可以應用於實現分佈式鎖的核心能力,於是擇日不日撞日,接下來就和大家一起來聊聊單機鎖的升級版本——Golang 分佈式鎖技術攻略介紹

本文內容分爲兩部分:第一部分聊聊如何通過 redis 實現主動輪詢模型下的分佈式鎖;第二部分和大家一起探討如何使用 etcd 實現 watch 回調模型下的分佈式鎖. 兩個部分均會通過原理闡述結合源碼剖析的方式爲大家作具體的內容介紹

1 分佈式鎖

1.1 使用背景

在併發場景中,爲了保證臨界資源的數據一致性,我們會經常使用到 “鎖” 這個工具對臨界資源進行保護,讓混亂的併發訪問行爲退化爲秩序的串行訪問行爲.

在本地環境中,由於多線程之間能夠共享進程的數據,因此可以比較簡單地實現進程內的互斥鎖;然而在分佈式場景中,有時我們需要跨域多個物理節點執行加鎖操作,因此我們就需要依賴到類似於 redis、mysql 這樣的狀態存儲組件,在此基礎之上實現所謂的 “分佈式鎖” 技術.

1.2 核心性質

分佈式鎖應當具備如下幾項核心性質:

1.3 實現類型

分佈式鎖根據其實現模型,可以被劃分爲兩大類:

1.4 一些個人理解

在單機環境中,主動輪詢和 watch 回調兩種鎖模型各有優劣,所謂的”優 “和” 劣“也是相對而言,需要對 cpu 空轉以及阻塞協程兩種行爲的損耗做出權衡. (大家對這部分概念如果不清晰,可以閱讀一下我之前發表的文章——”Golang 單機鎖實現原理“).

然而,在分佈式場景中,我個人覺得優勢的天平在略微朝着 watch 回調型的實現策略傾斜. 這是因爲分佈式場景中” 輪詢 “這一動作的成本相比於單機鎖而言要高很多,背後存在的行爲可能是一次甚至多次網絡 IO 請求. 這種情況下,取鎖方基於 watch 回調的方式,在確保鎖被釋放、自身有機會取鎖的情況下,纔會重新發出嘗試取鎖的請求,這樣能在很大程度上避免無意義的輪詢損耗.

當然,主動輪詢型的分佈式鎖能夠保證使用方始終佔據流程的主動權,整個流程可以更加輕便靈活;此外,watch 機制在實現過程中需要建立長連接完成 watch 監聽動作,也會存在一定的資源損耗. 因此這個問題沒有標準答案,應該結合實際的需求背景採取不同的應對策略:在併發激烈程度較高時傾向於 watch 回調型分佈式鎖;反之,主動輪詢型分佈式鎖可能會是更好的選擇.

除此之外,基於 watch 回調模型實現的分佈式鎖背後可能還存在其他的問題,比如:當有多個嘗試取鎖的使用方 watch 監聽同一把鎖時,一次鎖的釋放事件可能會引發 “驚羣效應”. 這個問題以及對應的解決方案將會在本文第 4 章中進行探討.

2 主動輪詢型

2.1 實現思路

主動輪詢型分佈式鎖的實現思路爲:

2.2 技術選型

實現主動輪詢型分佈式鎖時,我們常用的組件包括 redis 和 mysql.

(1)redis

在實現主動輪詢型分佈式鎖時,redis 算得上是大家最常用的組件. 在第 3 章中,本文會以 redis 爲例,進行主動輪詢型分佈式鎖的實踐介紹.

redis 官方文檔:https://redis.io/

redis 基於內存實現數據的存儲,因此足夠高輕便高效. 此外,redis 基於單線程模型完成數據處理工作,支持 SETNX 原子指令(set only if not exist),能夠很方便地支持分佈式鎖的加鎖操作.

setnx 使用文檔:https://redis.io/commands/setnx/ (事實上,在 redis 2.6.12 版本之後,setnx 操作已經被棄置,官方推薦大家使用 set 指令並附加 nx 參數來實現與 setnx 指令相同的效果)

此外,redis 還支持使用 lua 腳本自定義組裝同一個 redis 節點下的多筆操作形成一個具備原子性的事務.

redis lua 腳本使用文檔:https://redis.io/docs/manual/programmability/eval-intro/

在通過 redis 實現分佈式鎖時,我們可以通過將 key 對應的 value 設置爲使用方的身份標識. 在解鎖流程中,通過 lua 腳本組裝步驟:【(1)檢查釋放鎖動作執行者的身份;(2)身份合法時才進行解鎖】. 如此一來,分佈式鎖的對稱性也就得以保證了.

(2)mysql

mysql 官方文檔:https://www.mysql.com/

通過經典的關係型數據庫 mysql 關也能實現和 redis 類似的效果.

2.3 死鎖問題

下一個問題是,我們在設計主動輪詢型分佈式鎖時,如何避免出現死鎖問題而導致分佈式鎖不可能用呢?

這項能力在 mysql 中顯得捉襟見肘,不過在使用 redis 時,我們可以通過過期時間 expire time 機制得以保證. 我們通常會在插入分佈式鎖對應的 kv 數據時設置一個過期時間 expire time,這樣即便使用方因爲異常原因導致無法正常解鎖,鎖對應的數據項也會在達到過期時間閾值後被自動刪除,實現釋放分佈式鎖的效果.

值得一提的是,這種過期機制的引入也帶來了新的問題:因爲鎖的持有者並不能精確預判到自己持鎖後處理業務邏輯的實際耗時,因此此處設置的過期時間只能是一個偏向於保守的經驗值,假如因爲一些異常情況導致佔有鎖的使用方在業務處理流程中的耗時超過了設置的過期時間閾值,就會導致鎖被提前釋放,其他取鎖方可能取鎖成功,最終引起數據不一致的併發問題.

針對於這個問題,在分佈式鎖工具 redisson 中給出瞭解決方案——看門狗策略(watch dog strategy):在鎖的持有方未完成業務邏輯的處理時,會持續對分佈式鎖的過期閾值進行延期操作. 這部分內容不屬於本文的討論範疇,後續我們找機會單開一篇,連帶着 redis 中的 redlock 策略一起展開聊聊.

2.4 弱一致性問題

回顧 redis 的設計思路,爲避免單點故障問題,redis 會基於主從複製的方式實現數據備份. (以哨兵機制爲例,哨兵會持續監聽 master 節點的健康狀況,倘若 master 節點發生故障,哨兵會負責扶持 slave 節點上位,以保證整個集羣能夠正常對外提供服務). 此外,在 CAP 體系中,redis 走的是 AP 路線,爲保證服務的吞吐性能,主從節點之間的數據同步是異步延遲進行的.

到這裏問題就來了,試想一種場景:倘若 使用方 A 在 redis master 節點加鎖成功,但是對應的 kv 記錄在同步到 slave 之前,master 節點就宕機了. 此時未同步到這項數據的 slave 節點升爲 master,這樣分佈式鎖被 A 持有的 “憑證” 就這樣憑空消失了. 於是不知情的使用方 B C D 都可能加鎖成功,於是就出現了一把鎖被多方同時持有的問題,導致分佈式鎖最基本的獨佔性遭到破壞.

關於這個問題,一個比較經典的解決方案是:redis 紅鎖(redlock,全稱 redis distribution lock),本文僅僅拋出一個引子,具體內容我們後續單開一篇再聊.

3 redis 分佈式鎖

3.1 sdk 介紹

首先,本文使用到基於 golang 編寫的 redis 客戶端 sdk:redigo,用於和 redis 組件進行交互.

redigo 開源地址:https://github.com/gomodule/redigo

本文使用到的 redigo 源碼版本爲 v1.8.9

在 redigo 基礎之上,我個人編寫了一款基於 redis 實現的分佈式鎖 sdk,支持阻塞和非阻塞兩種模式進行取鎖. 項目已於 github 開源:https://github.com/xiaoxuxiansheng/redis_lock

由於個人水平有限,如有不當之處,敬請諒解,並歡迎大家批評指正.

3.2 源碼介紹

(1)redis 客戶端

package redis_lock


import (
    "context"
    "errors"
    "time"


    "github.com/gomodule/redigo/redis"
)


// Client Redis 客戶端.
type Client struct {
    ClientOptions
    pool *redis.Pool
}


func NewClient(network, address, password string, opts ...ClientOption) *Client {
    c := Client{
        ClientOptions: ClientOptions{
            network:  network,
            address:  address,
            password: password,
        },
    }


    for _, opt := range opts {
        opt(&c.ClientOptions)
    }


    repairClient(&c.ClientOptions)


    pool := c.getRedisPool()
    return &Client{
        pool: pool,
    }
}


func (c *Client) getRedisPool() *redis.Pool {
    return &redis.Pool{
        MaxIdle:     c.maxIdle,
        IdleTimeout: time.Duration(c.idleTimeoutSeconds) * time.Second,
        Dial: func() (redis.Conn, error) {
            c, err := c.getRedisConn()
            if err != nil {
                return nil, err
            }
            return c, nil
        },
        MaxActive: c.maxActive,
        Wait:      c.wait,
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            _, err := c.Do("PING")
            return err
        },
    }
}


func (c *Client) getRedisConn() (redis.Conn, error) {
    if c.address == "" {
        panic("Cannot get redis address from config")
    }


    var dialOpts []redis.DialOption
    if len(c.password) > 0 {
        dialOpts = append(dialOpts, redis.DialPassword(c.password))
    }
    conn, err := redis.DialContext(context.Background(),
        c.network, c.address, dialOpts...)
    if err != nil {
        return nil, err
    }
    return conn, nil
}


func (c *Client) GetConn(ctx context.Context) (redis.Conn, error) {
    return c.pool.GetContext(ctx)
}


// 只有 key 不存在時,能夠 set 成功. set 時攜帶上超時時間,單位秒.
func (c *Client) SetNEX(ctx context.Context, key, value string, expireSeconds int64) (int64, error) {
    if key == "" || value == "" {
        return -1, errors.New("redis SET keyNX or value can't be empty")
    }


    conn, err := c.pool.GetContext(ctx)
    if err != nil {
        return -1, err
    }
    defer conn.Close()


    reply, err := conn.Do("SET", key, value, "EX", expireSeconds, "NX")
    if err != nil {
        return -1, nil
    }


    r, _ := reply.(int64)
    return r, nil
}


// Eval 支持使用 lua 腳本.
func (c *Client) Eval(ctx context.Context, src string, keyCount int, keysAndArgs []interface{}) (interface{}, error) {
    args := make([]interface{}, 2+len(keysAndArgs))
    args[0] = src
    args[1] = keyCount
    copy(args[2:], keysAndArgs)


    conn, err := c.pool.GetContext(ctx)
    if err != nil {
        return -1, err
    }
    defer conn.Close()


    return conn.Do("EVAL", args...)
}

(2)redis 分佈式鎖

package redis_lock

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/xiaoxuxiansheng/redis_lock/utils"
)

var ErrLockAcquiredByOthers = errors.New("lock is acquired by others")

func IsRetryableErr(err error) bool {
    return errors.Is(err, ErrLockAcquiredByOthers)
}

// 基於 redis 實現的分佈式鎖,不可重入,但保證了對稱性
type RedisLock struct {
    LockOptions
    key    string
    token  string
    client *Client
}

func NewRedisLock(key string, client *Client, opts ...LockOption) *RedisLock {
    r := RedisLock{
        key:    key,
        token:  utils.GetProcessAndGoroutineIDStr(),
        client: client,
    }

    for _, opt := range opts {
        opt(&r.LockOptions)
    }

    repairLock(&r.LockOptions)
    return &r
}
package utils

import (
    "fmt"
    "os"
    "runtime"
    "strconv"
    "strings"
)

func GetCurrentProcessID() string {
    return strconv.Itoa(os.Getpid())
}

// GetCurrentGoroutineID 獲取當前的協程ID
func GetCurrentGoroutineID() string {
    buf := make([]byte, 128)
    buf = buf[:runtime.Stack(buf, false)]
    stackInfo := string(buf)
    return strings.TrimSpace(strings.Split(strings.Split(stackInfo, "[running]")[0]"goroutine")[1])
}

func GetProcessAndGoroutineIDStr() string {
    return fmt.Sprintf("%s_%s", GetCurrentProcessID(), GetCurrentGoroutineID())
}
package redis_lock

const (
    // 默認連接池超過 10 s 釋放連接
    DefaultIdleTimeoutSeconds = 10
    // 默認最大激活連接數
    DefaultMaxActive = 100
    // 默認最大空閒連接數
    DefaultMaxIdle = 20
)

type ClientOptions struct {
    maxIdle            int
    idleTimeoutSeconds int
    maxActive          int
    wait               bool
    // 必填參數
    network  string
    address  string
    password string
}

type ClientOption func(c *ClientOptions)

func WithMaxIdle(maxIdle int) ClientOption {
    return func(c *ClientOptions) {
        c.maxIdle = maxIdle
    }
}

func WithIdleTimeoutSeconds(idleTimeoutSeconds int) ClientOption {
    return func(c *ClientOptions) {
        c.idleTimeoutSeconds = idleTimeoutSeconds
    }
}

func WithMaxActive(maxActive int) ClientOption {
    return func(c *ClientOptions) {
        c.maxActive = maxActive
    }
}

func WithWaitMode() ClientOption {
    return func(c *ClientOptions) {
        c.wait = true
    }
}

func repairClient(c *ClientOptions) {
    if c.maxIdle < 0 {
        c.maxIdle = DefaultMaxIdle
    }

    if c.idleTimeoutSeconds < 0 {
        c.idleTimeoutSeconds = DefaultIdleTimeoutSeconds
    }

    if c.maxActive < 0 {
        c.maxActive = DefaultMaxActive
    }
}

type LockOption func(*LockOptions)

func WithBlock() LockOption {
    return func(o *LockOptions) {
        o.isBlock = true
    }
}

func WithBlockWaitingSeconds(waitingSeconds int64) LockOption {
    return func(o *LockOptions) {
        o.blockWaitingSeconds = waitingSeconds
    }
}

func WithExpireSeconds(expireSeeconds int64) LockOption {
    return func(o *LockOptions) {
        o.expireSeconds = expireSeeconds
    }
}

func repairLock(o *LockOptions) {
    if o.isBlock && o.blockWaitingSeconds <= 0 {
        // 默認阻塞等待時間上限爲 5 秒
        o.blockWaitingSeconds = 5
    }

    // 分佈式鎖默認超時時間爲 30 秒
    if o.expireSeconds <= 0 {
        o.expireSeconds = 30
    }
}

type LockOptions struct {
    isBlock             bool
    blockWaitingSeconds int64
    expireSeconds       int64
}

(3)非阻塞模式加鎖

const RedisLockKeyPrefix = "REDIS_LOCK_PREFIX_"

// Lock 加鎖.
func (r *RedisLock) Lock(ctx context.Context) error {
    // 不管是不是阻塞模式,都要先獲取一次鎖
    err := r.tryLock(ctx)
    if err == nil {
        return nil
    }

    // 非阻塞模式加鎖失敗直接返回錯誤
    if !r.isBlock {
        return err
    }

    // 判斷錯誤是否可以允許重試,不可允許的類型則直接返回錯誤
    if !IsRetryableErr(err) {
        return err
    }

    // 基於阻塞模式持續輪詢取鎖
    return r.blockingLock(ctx)
}

func (r *RedisLock) tryLock(ctx context.Context) error {
    // 首先查詢鎖是否屬於自己
    reply, err := r.client.SetNEX(ctx, r.getLockKey(), r.token, r.expireSeconds)
    if err != nil {
        return err
    }
    if reply != 1 {
        return fmt.Errorf("reply: %d, err: %w", reply, ErrLockAcquiredByOthers)
    }
    return nil
}

func (r *RedisLock) getLockKey() string {
    return RedisLockKeyPrefix + r.key
}

(4)阻塞模式加鎖

func (r *RedisLock) blockingLock(ctx context.Context) error {
    // 阻塞模式等鎖時間上限
    timeoutCh := time.After(time.Duration(r.blockWaitingSeconds) * time.Second)
    // 輪詢 ticker,每隔 50 ms 嘗試取鎖一次
    ticker := time.NewTicker(time.Duration(50) * time.Millisecond)
    defer ticker.Stop()

    for range ticker.C {
        select {
        // ctx 終止了
        case <-ctx.Done():
            return fmt.Errorf("lock failed, ctx timeout, err: %w", ctx.Err())
            // 阻塞等鎖達到上限時間
        case <-timeoutCh:
            return fmt.Errorf("block waiting time out, err: %w", ErrLockAcquiredByOthers)
        // 放行
        default:
        }

        // 嘗試取鎖
        err := r.tryLock(ctx)
        if err == nil {
            // 加鎖成功,返回結果
            return nil
        }

        // 不可重試類型的錯誤,直接返回
        if !IsRetryableErr(err) {
            return err
        }
    }

    return nil
}

(5)解鎖

// Unlock 解鎖. 基於 lua 腳本實現操作原子性.
func (r *RedisLock) Unlock(ctx context.Context) error {
    keysAndArgs := []interface{}{r.getLockKey(), r.token}
    reply, err := r.client.Eval(ctx, LuaCheckAndDeleteDistributionLock, 1, keysAndArgs)
    if err != nil {
        return err
    }

    if ret, _ := reply.(int64); ret != 1 {
        return errors.New("can not unlock without ownership of lock)
    }
    return nil
}
// LuaCheckAndDeleteDistributionLock 判斷是否擁有分佈式鎖的歸屬權,是則刪除
const LuaCheckAndDeleteDistributionLock = `
  local lockerKey = KEYS[1]
  local targetToken = ARGV[1]
  local getToken = redis.call('get',lockerKey)
  if (not getToken or getToken ~= targetToken) then
    return 0
  else
    return redis.call('del',lockerKey)
  end
`

4 watch 回調型

4.1 實現思路

對於實現 watch 回調型分佈式鎖,一些基本要點和 2.1 小節中聊到的主動輪詢型分佈式鎖類似:

與主動輪詢型分佈式鎖不同的是,在取鎖失敗時,watch 回調型分佈式鎖不會持續輪詢,而是會 watch 監聽鎖的刪除事件:

4.2 技術選型

在實現上,我們需要依賴於提供了 watch 機制的狀態存儲組件,不僅能支持數據的存儲和去重,還需要利用到其中的 watch 監聽回調功能進行鎖釋放事件的訂閱感知.

爲滿足上述訴求,我們常用的技術組件包括 etcd 和 zookeeper.

(1)etcd

etcd 官方文檔:https://etcd.io/

etcd 是一款適合用於共享配置和服務發現的分佈式 kv 存儲組件,底層基於分佈式共識算法 raft 協議保證了存儲服務的強一致和高可用.

在 etcd 中提供了 watch 監聽器的功能,即針對於指定範圍的數據,通過與 etcd 服務端節點創建 grpc 長連接的方式持續監聽變更事件. 關於 watch 機制的詳細介紹,可以參見我上一週發表的兩篇文章—— etcd watch 機制源碼解析——客戶端篇 / 服務端篇.

此外,etcd 中寫入數據時,還支持通過版本 revision 機制進行取鎖秩序的統籌協調,是一款很適合用於實現分佈式鎖的組件.

etcd 是本文在介紹 watch 回調型分佈式鎖時選取的工程實踐案例,在本文第 5 章會結合實現源碼展開介紹.

(2)zookeeper

zookeeper 官方文檔:https://zookeeper.apache.org/

ZooKeeper 是一款開源的分佈式應用協調服務,底層基於分佈式共識算法 zab 協議保證了數據的強一致性和高可用性.

zookeeper 中提供了臨時順序節點(EPHEMERAL_SEQUENTIAL)類型以及 watch 監聽器機制,能夠滿足實現 watch 回調型分佈式鎖所需要具備的一切核心能力.

不過在本文中,zk 部分我們不多作展開,介紹內容以 etcd 爲核心.

4.3 死鎖問題

爲避免死鎖問題的產生,etcd 中提供了租約 lease 機制. 租約,顧名思義,是一份具有時效性的協議,一旦達到租約上規定的截止時間,租約就會失去效力. 同時,etcd 中還提供了續約機制(keepAlive),用戶可以通過續約操作來延遲租約的過期時間.

那麼,我們如何來利用租約 lease 機制解決分佈式鎖中可能存在的死鎖問題呢?實現思路如下:

在這樣的設定之下,倘若分佈式鎖的持有者出現異常狀況導致無法正常解鎖,則可以通過租約的過期機制完成對分佈式鎖的釋放,死鎖問題因此得以規避. 此外,鎖的使用方可以將租約的初始過期時間設定爲一個偏小的值,並通過續約機制來對租約的生效週期進行動態延長. 可以看到,此處 etcd 中的租約及續約機制,實現了與 redisson 中 watch dog 機制類似的效果.

4.4 驚羣效應

驚羣效應又稱爲羊羣效應:羊羣是一種紀律性很差的組織,平時就處在一種散漫無秩序地移動模式之下. 需要注意的是,在羊羣中一旦有某隻羊出現異動,其他的羊也會不假思索地一哄而上跑動起來,全然不估計附近可能有狼或者何處有更好的草源等客觀問題.

在 watch 回調型分佈式鎖的實現過程中,可能也會存在類似於驚羣效應的問題. 這裏指的是:倘若一把分佈式鎖的競爭比較激烈,那麼鎖的釋放事件可能同時被多個的取鎖方所監聽,一旦鎖真的被釋放了,所有的取鎖方都會一擁而上嘗試取鎖,然而我們知道,一個輪次中真正能夠取鎖成功的只會有一名角色,因此這個過程中會存在大量無意義的性能損耗,且釋放鎖時刻瞬間激增的請求流量也可能會對系統穩定性產生負面效應.

爲規避驚羣效應,etcd 中提供了前綴 prefix 機制以及版本 revision 機制,和 zookeeper 的臨時順序節點功能有些類似:

這樣所有的取鎖方就會在 revision 機制的協調下,根據取鎖序號(revision)的先後順序排成一條隊列,每當鎖被釋放,只會驚動到下一順位的取鎖方,驚羣問題得以避免.

5 etcd 分佈式鎖

5.1 sdk 介紹

etcd 開源地址:https://github.com/etcd-io/etcd

本文使用到的 etcd 源碼版本爲 v3.5.8.

etcd 作者在 etcd 的 concurrency 包下,基於 watch 機制結合 revision 機制實現了一款通用的 etcd 分佈式鎖,因此這部分代碼我不再手寫,而是會基於官方的實現示範進行源碼講解.

5.2 實現源碼

(1)數據結構

I Session

session 指的是一次訪問會話,背後對應的是一筆租約 lease. 用戶調用 NewSession 方法構造 session 實例時,執行的步驟包括:

const defaultSessionTTL = 60

// Session represents a lease kept alive for the lifetime of a client.
// Fault-tolerant applications may use sessions to reason about liveness.
type Session struct {
    client *v3.Client
    opts   *sessionOptions
    id     v3.LeaseID

    cancel context.CancelFunc
    donec  <-chan struct{}
}

// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
    lg := client.GetLogger()
    ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
    for _, opt := range opts {
        opt(ops, lg)
    }

    id := ops.leaseID
    if id == v3.NoLease {
        resp, err := client.Grant(ops.ctx, int64(ops.ttl))
        if err != nil {
            return nil, err
        }
        id = resp.ID
    }

    ctx, cancel := context.WithCancel(ops.ctx)
    keepAlive, err := client.KeepAlive(ctx, id)
    if err != nil || keepAlive == nil {
        cancel()
        return nil, err
    }

    donec := make(chan struct{})
    s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}

    // keep the lease alive until client error or cancelled context
    go func() {
        defer close(donec)
        for range keepAlive {
            // eat messages until keep alive channel closes
        }
    }()

    return s,nil
}

假如用戶處理完成業務邏輯之後,可以通過 session.Close 方法完成會話的關閉,在方法中會通過 context 的 cancel 動作,停止對租約的續期行爲.

// Close orphans the session and revokes the session lease.
func (s *Session) Close() error {
    s.Orphan()
    // if revoke takes longer than the ttl, lease is expired anyway
    ctx, cancel := context.WithTimeout(s.opts.ctx, time.Duration(s.opts.ttl)*time.Second)
    _, err := s.client.Revoke(ctx, s.id)
    cancel()
    return err
}

// Orphan ends the refresh for the session lease. This is useful
// in case the state of the client connection is indeterminate (revoke
// would fail) or when transferring lease ownership.
func (s *Session) Orphan() {
    s.cancel()
    <-s.donec
}

II Mutex

Mutex 是 etcd 分佈式鎖的類型,其中核心字段包括:

// Mutex implements the sync Locker interface with etcd
type Mutex struct {
    s *Session
    
    pfx   string
    myKey string
    myRev int64
    hdr   *pb.ResponseHeader
}

func NewMutex(s *Session, pfx string) *Mutex {
    return &Mutex{s, pfx + "/""", -1, nil}
}

(2)方法鏈路

TryLock

Mutex.TryLock 方法會執行一次嘗試加鎖的動作,倘若鎖已經被其他人佔有,則會直接返回錯誤,不會阻塞:

// TryLock locks the mutex if not already locked by another session.
// If lock is held by another session, return immediately after attempting necessary cleanup
// The ctx argument is used for the sending/receiving Txn RPC.
func (m *Mutex) TryLock(ctx context.Context) error {
    resp, err := m.tryAcquire(ctx)
    if err != nil {
        return err
    }
    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }
    client := m.s.Client()
    // Cannot lock, so delete the key
    if _, err := client.Delete(ctx, m.myKey); err != nil {
        return err
    }
    m.myKey = "\x00"
    m.myRev = -1
    return ErrLocked
}

Lock

Mutex.Lock 方法採用的是阻塞加鎖的處理模式,倘若分佈式鎖已經被其他人佔用,則會持續阻塞等待時機,直到自己取鎖成功:

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
    resp, err := m.tryAcquire(ctx)
    if err != nil {
        return err
    }
    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }
    client := m.s.Client()
    // wait for deletion revisions prior to myKey
    // TODO: early termination if the session key is deleted before other session keys with smaller revisions.
    _, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
        return werr
    }


    // make sure the session is not expired, and the owner key still exists.
    gresp, werr := client.Get(ctx, m.myKey)
    if werr != nil {
        m.Unlock(client.Ctx())
        return werr
    }


    if len(gresp.Kvs) == 0 { // is the session key lost?
        return ErrSessionExpired
    }
    m.hdr = gresp.Header


    return nil
}

tryAcquire

Mutex.tryAcquire 方法,使用方會完成鎖數據的插入以及 revision 的獲取:

func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
    s := m.s
    client := m.s.Client()


    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    cmp := v3.Compare(v3.CreateRevision(m.myKey)"=", 0)
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return nil, err
    }
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    return resp, nil
}

waitDeletes

// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision are deleted.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
    getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
    for {
        resp, err := client.Get(ctx, pfx, getOpts...)
        if err != nil {
            return nil, err
        }
        if len(resp.Kvs) == 0 {
            return resp.Header, nil
        }
        lastKey := string(resp.Kvs[0].Key)
        if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
            return nil, err
        }
    }
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
    cctx, cancel := context.WithCancel(ctx)
    defer cancel()


    var wr v3.WatchResponse
    wch := client.Watch(cctx, key, v3.WithRev(rev))
    for wr = range wch {
        for _, ev := range wr.Events {
            if ev.Type == mvccpb.DELETE {
                return nil
            }
        }
    }
    if err := wr.Err(); err != nil {
        return err
    }
    if err := ctx.Err(); err != nil {
        return err
    }
    return errors.New("lost watcher waiting for delete")
}

unlock

解鎖時直接刪除自己的 kv 對記錄即可,假如自己是持有鎖的角色,那麼刪除 kv 對記錄就是真正意義上的解鎖動作;即便自己並無持有鎖,刪除 kv 對就代表自己退出了搶鎖流程,也不會對流程產生負面影響.

這裏大家可能會存在一個疑問,就是假如執行 unlock 操作的角色本身只是處在等鎖隊列中,並未真正持有鎖,那麼執行刪除 kv 對記錄時是否會誤將隊列中的下一個取鎖方誤喚醒,引起秩序混亂?

答案是不會的,大家可以回過頭觀察 waitDeletes 方法的實現邏輯,取鎖方在從 waitDelete 方法中接收到前一筆 kv 記錄的刪除事件而被喚醒後,它會接着查詢一輪比它小且最接近的 revision 對應的 kv 對記錄,如果存在則繼續進行監聽,直到這樣的 kv 數據不存在時纔會取鎖成功(my revision 已經是鎖 pfx 下最小的 revision).

func (m *Mutex) Unlock(ctx context.Context) error {
    if m.myKey == "" || m.myRev <= 0 || m.myKey == "\x00" {
        return ErrLockReleased
    }


    if !strings.HasPrefix(m.myKey, m.pfx) {
        return fmt.Errorf("invalid key %q, it should have prefix %q", m.myKey, m.pfx)
    }


    client := m.s.Client()
    if _, err := client.Delete(ctx, m.myKey); err != nil {
        return err
    }
    m.myKey = "\x00"
    m.myRev = -1
    return nil

6 總結

本篇和大家一起探討了如何基於 Golang 實現主動輪詢和 watch 回調兩種模式的分佈式鎖. 本文分別以 redis 和 etcd 兩個組件爲例進行了分佈式鎖的原理介紹及源碼展示.

redis 可以算是我們最常用於實現分佈式鎖的組件,但是由於其中缺少續約機制以及存在數據弱一致性的問題,導致分佈式鎖的獨佔性並不能夠得到保證. 後續我會單獨開一個篇章,和大家一起聊聊如何通過 watch dog 和 redlock 機制解決 redis 分佈式鎖可能存在的安全隱患.

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/KYiZvFRX0CddJVCwyfkLfQ