在 Go 中如何使用分佈式鎖解決併發問題?

在分佈式系統中,協調多個服務實例之間的共享資源訪問是一個經典的挑戰。傳統的單機鎖(如 sync.Mutex)無法實現跨進程工作,此時就需要用到分佈式鎖了。本文將介紹 Go 語言生態中基於 Redis 實現的分佈式鎖庫 redsync,並探討其使用方法和實現原理。

分佈式鎖

首先我們來探討下爲什麼需要分佈式鎖?當我們編寫的程序出現資源競爭的時候,就需要使用互斥鎖來保證併發安全。而我們的服務很有可能不會單機部署,而是採用多副本的集羣部署方案。無論哪種方案運行程序,我們都需要合適的工具來解決併發問題。在解決單個進程間多個協程之間的併發資源搶佔問題時,我們往往採用 sync.Mutex。而在解決多個進程間的併發資源搶佔問題時,就需要採用分佈式鎖了,這就引出了我們今天要講解的 redsync

爲什麼是 redsync

在 Go 中分佈式鎖的開源實現有很多,爲什麼選擇介紹和使用 redsync 呢?簡單一句話:redsync 是 Redis 官方 唯一推薦的 Go Redis 分佈式鎖解決方案,遵循 Redlock 算法。它允許在多個獨立 Redis 節點上創建高可用的鎖,適用於需要強一致性的分佈式場景。

我們可以對比下 sync.Mutex 和 redsync 之間的區別,讓你有個感性的認識。

KONW16

二者分別適用於不同的併發場景,選擇時需要根據實際需求(單機還是分佈式)來決定。

redsync 快速上手

redsync 雖然內部實現上比較複雜,但別被嚇到,它的用法超級簡單。

示例代碼如下:

package main

import (
    "context"

    "github.com/go-redsync/redsync/v4"                  // 引入 redsync 庫,用於實現基於 Redis 的分佈式鎖
    "github.com/go-redsync/redsync/v4/redis/goredis/v9"// 引入 redsync 的 goredis 連接池
    goredislib "github.com/redis/go-redis/v9"           // 引入 go-redis 庫,用於與 Redis 服務器通信
)

func main() {
    // 創建一個 Redis 客戶端
    client := goredislib.NewClient(&goredislib.Options{
        Addr:     "localhost:36379", // Redis 服務器地址
        Password: "nightwatch",
    })

    // 使用 go-redis 客戶端創建一個 redsync 連接池
    pool := goredis.NewPool(client)

    // 創建一個 redsync 實例,用於管理分佈式鎖
    rs := redsync.New(pool)

    // 創建一個名爲 "test-redsync" 的互斥鎖(Mutex)
    mutex := rs.NewMutex("test-redsync")

    // 創建一個上下文(context),一般用於控制鎖的超時和取消
    ctx := context.Background()

    // 獲取鎖,如果獲取失敗(例如鎖已被其他進程持有),會返回錯誤
    if err := mutex.LockContext(ctx); err != nil {
        panic(err) // 如果獲取鎖失敗,程序會 panic
    }

    // TODO 執行業務邏輯
    // ...

    // 釋放鎖,如果釋放失敗(例如鎖已過期或不屬於當前進程),會返回錯誤
    if _, err := mutex.UnlockContext(ctx); err != nil {
        panic(err) // 如果釋放鎖失敗,程序會 panic
    }
}

因爲 redsync 依賴 Redis,所以我們首先需要創建一個 Redis 客戶端對象 client,調用 goredis.NewPool(client) 會基於這個 client 創建一個 redsync 的連接池,有了這個連接池 pool 就可以調用 redsync.New(pool) 創建一個 redsync 實例來申請分佈式鎖了。

redsync 提供了 NewMutex 方法可以創建一個分佈式鎖,它接收一個 name 參數作爲鎖的名字,這個名字會作爲 Redis 中的 key

拿到鎖對象 mutex 以後,調用 mutex.LockContext(ctx) 就可以加鎖,加鎖後便可以訪問競態資源了,資源訪問完成後,調用 mutex.UnlockContext(ctx) 便可以釋放鎖。

可以發現,redsync 用法和 sync.Mutex 非常相似,核心就是 Lock/Unlock 兩個操作。redsync 的使用無非多了一步連接 Redis 的過程。

配置選項

不知道你有沒有想過一個問題,我們在使用 sync.Mutex 時,如果某個 gorutine 加鎖後不釋放掉,那麼其他 gorutine 就無法獲取鎖,而在分佈式場景中,如果一個進程獲取了 Redis 分佈式鎖,然後在未釋放鎖之前進程掛掉了,其他進程要如何獲取鎖呢,難道要一直等待下去嗎?

這裏就要引出一個使用分佈式鎖很重要的問題,那就是一定要設置一個過期時間,這樣才能保證即使拿到鎖的進程掛掉了,只要鎖的過期時間已到,鎖也一定會被自動釋放掉,只有這樣,其他進程纔有機會獲取鎖。

而我們上面的示例中,之所以可以不設置鎖的過期時間,原因是 redsync 內部設置了默認值。以下是 redsync 中 NewMutex 方法的源碼:

// NewMutex returns a new distributed mutex with given name.
func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
    m := &Mutex{
        name:   name,
        expiry: 8 * time.Second,
        tries:  32,
        delayFunc: func(tries int) time.Duration {
            return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
        },
        genValueFunc:  genValue,
        driftFactor:   0.01,
        timeoutFactor: 0.05,
        quorum:        len(r.pools)/2 + 1,
        pools:         r.pools,
    }
    for _, o := range options {
        o.Apply(m)
    }
    if m.shuffle {
        randomPools(m.pools)
    }
    return m
}

這裏 Mutex 對象的第二個字段 expiry 就是分佈式鎖的過期時間,這裏默認爲設爲 8 秒。tries 字段是獲取鎖的重試次數,即嘗試獲取鎖失敗 32 次以後,纔會返回加鎖失敗,因爲分佈式場景下失敗是很正常的情況,所以 32 次並不是一個很誇張的值。delayFunc 字段是每次失敗後重試的間隔時間。其他字段我就不一一講解了,絕大多數我們都用不到。

根據代碼我們很容易想到這幾個字段是通過選項模式來設置的。

使用示例:

mutex := rs.NewMutex("test-redsync",
    redsync.WithExpiry(30*time.Second),
    redsync.WithTries(3),
    redsync.WithRetryDelay(500*time.Millisecond),
)

看門狗

我們現在知道使用分佈式鎖一定要設置一個過期時間了,但是這會帶來另外一個問題:如果我們的業務代碼還沒執行完,鎖就過期自動釋放了,那麼此時另外一個進程成功拿到這把鎖,也來訪問競態資源,那分佈式鎖不就失去意義了嗎?

這就引出了使用分佈式鎖的另一個重要問題,鎖自動續期。我舉一個代碼示例,你就懂了:

package main

import (
    "context"
    "log/slog"
    "time"

    "github.com/go-redsync/redsync/v4"                  // 引入 redsync 庫,用於實現基於 Redis 的分佈式鎖
    "github.com/go-redsync/redsync/v4/redis/goredis/v9"// 引入 redsync 的 goredis 連接池
    goredislib "github.com/redis/go-redis/v9"           // 引入 go-redis 庫,用於與 Redis 服務器通信
)

func main() {
    // 創建一個 Redis 客戶端
    client := goredislib.NewClient(&goredislib.Options{
        Addr:     "localhost:36379", // Redis 服務器地址
        Password: "nightwatch",
    })

    // 使用 go-redis 客戶端創建一個 redsync 連接池
    pool := goredis.NewPool(client)

    // 創建一個 redsync 實例,用於管理分佈式鎖
    rs := redsync.New(pool)

    // 創建一個名爲 "test-redsync" 的互斥鎖(Mutex)
    mutex := rs.NewMutex("test-redsync", redsync.WithExpiry(5*time.Second))

    // 創建一個上下文(context),一般用於控制鎖的超時和取消
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 獲取鎖,如果獲取失敗(例如鎖已被其他進程持有),會返回錯誤
    if err := mutex.LockContext(ctx); err != nil {
        panic(err) // 如果獲取鎖失敗,程序會 panic
    }

    // 看門狗,實現鎖自動續約
    stopCh := make(chanstruct{})
    ticker := time.NewTicker(2 * time.Second) // 每隔 2s 續約一次
    defer ticker.Stop()
    gofunc() {
        for {
            select {
            case <-ticker.C:
                // 續約,延長鎖的過期時間
                if ok, err := mutex.ExtendContext(ctx); !ok || err != nil {
                    slog.Error("Failed to extend mutex", "err", err, "status", ok)
                } else {
                    slog.Info("Successfully extend mutex")
                }
            case <-stopCh:
                slog.Info("Exiting mutex watchdog")
                return
            }
        }
    }()

    // 執行業務邏輯
    time.Sleep(6 * time.Second)

    // 通知看門狗停止自動續期
    stopCh <- struct{}{}

    // 釋放鎖,如果釋放失敗(例如鎖已過期或不屬於當前進程),會返回錯誤
    if _, err := mutex.UnlockContext(ctx); err != nil {
        panic(err) // 如果釋放鎖失敗,程序會 panic
    }
}

這個示例延續了前文中的示例代碼,你需要重點關注的是如下這部分邏輯:

// 看門狗,實現鎖自動續約
stopCh := make(chanstruct{})
ticker := time.NewTicker(2 * time.Second) // 每隔 2s 續約一次
defer ticker.Stop()
gofunc() {
    for {
        select {
        case <-ticker.C:
            // 續約,延長鎖的過期時間
            if ok, err := mutex.ExtendContext(ctx); !ok || err != nil {
                slog.Error("Failed to extend mutex", "err", err, "status", ok)
            } else {
                slog.Info("Successfully extend mutex")
            }
        case <-stopCh:
            slog.Info("Exiting mutex watchdog")
            return
        }
    }
}()

redsync 提供了 mutex.ExtendContext(ctx) 方法可以延長鎖的過期時間。假設我們申請的分佈式鎖過期時間是 5 秒,而業務代碼執行時間是未知的,那麼我們在拿到鎖以後,可以單獨開啓一個 goroutine 來定時延長鎖的過期時間,當業務代碼執行完成以後,主 goroutine 通過 stopCh <- struct{}{} 向子 goroutine 發送停止信號,那麼子 goroutine 中的 <-stopCh case 就會收到通知,子 goroutine 便會退出,也就停止了鎖自動續期。

通過爲分佈式鎖設置過期時間,再配合子 goroutine 自動續期的功能,我們就能保證,持有鎖的進程掛掉時不會影響其他進程獲取鎖,並且還能實現業務執行完成後才釋放鎖。而這個實現分佈式鎖自動續期的程序,我們通常把它叫做 “看門狗”。

我再額外囉嗦一句,關於分佈式鎖的續期時常和間隔週期的問題,一般來說,續期的時間可以設置爲等於過期時間,即鎖的過期時間設爲 5 秒,那麼每次也只續期 5 秒,redsync 內部也是這麼做的,至於間隔多久續期一次,這個時間肯定是要小於過期時間 5 秒的,通常設爲鎖過期時間的 1/3 或 1/2 都可以。

redsync 原理

我上面講解的 redsync 用法基本上能覆蓋業務開發中的大部分場景了,對於 redsync 更多的功能我就不過多介紹了,有了現有的知識,你遇到了問題也可以自己查閱文檔學習。
下面我想講點更有價值的東西,我們自己來實現一個微型的 Redis 分佈式鎖,以此來加深你對 redsync 的理解。

如何實現一個 Redis 分佈式鎖

要基於 Redis 實現一個最小化的分佈式鎖,我們可以定義一個結構體 MiniRedisMutex 作爲鎖對象:

type MiniRedisMutex struct {
    name   string        // 會作爲分佈式鎖在 Redis 中的 key
    expiry time.Duration // 鎖過期時間
    conn   redis.Cmdable // Redis Client
}

它僅包含必要的字段,name 是鎖的名稱,expiry 是分佈式鎖必須要有的過期時間,conn 用來存儲 Redis 客戶端連接。

我們可以定義一個構造函數 NewMutex 來創建分佈式鎖對象:

func NewMutex(name string, expiry time.Duration, conn redis.Cmdable) *MiniRedisMutex {
    return &MiniRedisMutex{name, expiry, conn}
}

接下來就要實現加鎖和解鎖這兩個功能。

加鎖方法 Lock 實現如下:

func (m *MiniRedisMutex) Lock(ctx context.Context, value string) (bool, error) {
    reply, err := m.conn.SetNX(ctx, m.name, value, m.expiry).Result()
    if err != nil {
        return false, err
    }
    return reply, nil
}

Lock 方法接收兩個參數,ctx 用來控制取消,value 則會作爲鎖的值。

Lock 方法內部邏輯非常簡單,直接調用 Redis 的 SetNX 命令來排他的設置一個鍵值對,鎖名稱 name 作爲 Redis 的 key,鎖的值 value 作爲 Redis 的 value,並指定過期時間爲 expiry,這就是分佈式鎖的加鎖原理。

這裏有兩個關鍵點需要你注意:

釋放鎖方法 Unlock 實現如下:

// 釋放鎖的 lua 腳本,保證併發安全
var deleteScript = `
    local val = redis.call("GET", KEYS[1])
    if val == ARGV[1] then
        return redis.call("DEL", KEYS[1])
    elseif val == false then
        return -1
    else
        return 0
    end
`

// Unlock 釋放鎖
func (m *MiniRedisMutex) Unlock(ctx context.Context, value string) (bool, error) {
    // 執行 lua 腳本,Redis 會保證其併發安全
    status, err := m.conn.Eval(ctx, deleteScript, []string{m.name}, value).Result()
    if err != nil {
        returnfalse, err
    }
    if status == int64(-1) {
        returnfalse, ErrLockAlreadyExpired
    }
    return status != int64(0), nil
}

在釋放鎖的邏輯中,我們不是簡單的將指定的 Redis 鍵值對刪除即可,而是調用 m.conn.Eval 方法執行了一段 lua 腳本的方式來釋放鎖。

在這段 lua 腳本中,我們先是從 Redis 中獲取指定 key 爲 m.name 的鍵值對,然後判斷其 value 是否等於 Unlock 方法傳入的 value 參數值,如果相等,則從 Redis 中刪除指定的鍵值對,表示釋放鎖,否則什麼也不做。

之所以要對 value 進行判斷,是因爲我們要保證這把鎖是當前進程所持有的鎖,而不是其他進程持有的鎖。那麼以什麼爲依據來說明這把鎖是當前進程持有的呢?這就是我們要保證 value 唯一的原因,每個進程在加鎖的時候,需要生成一個隨機的 value 作爲自己的鎖的標識,那麼釋放時,就可以通過這個 value 來判斷是否是自己持有的鎖。而這樣做的目的,是爲了避免一個進程搶到鎖後,還在執行業務邏輯時,鎖被另外一個進程給釋放了。

遺憾的是,這段釋放鎖的邏輯,Redis 沒有提供像 SetNX 一樣的快捷命令,所以我們只能將其放在 lua 腳本中執行,才能保證併發安全。

至此,一個微型的 Redis 分佈式鎖的核心功能咱們就講解完成了。

以下是 MiniRedisMutex 分佈式鎖完整的代碼實現:

package miniredislock

import (
    "context"
    "errors"
    "time"

    "github.com/redis/go-redis/v9"
)

var ErrLockAlreadyExpired = errors.New("miniredislock: failed to unlock, lock was already expired")

// MiniRedisMutex 一個微型的 Redis 分佈式鎖
type MiniRedisMutex struct {
    name   string        // 會作爲分佈式鎖在 Redis 中的 key
    expiry time.Duration // 鎖過期時間
    conn   redis.Cmdable // Redis Client
}

// NewMutex 創建 Redis 分佈式鎖
func NewMutex(name string, expiry time.Duration, conn redis.Cmdable) *MiniRedisMutex {
    return &MiniRedisMutex{name, expiry, conn}
}

// Lock 加鎖
func (m *MiniRedisMutex) Lock(ctx context.Context, value string) (bool, error) {
    reply, err := m.conn.SetNX(ctx, m.name, value, m.expiry).Result()
    if err != nil {
        returnfalse, err
    }
    return reply, nil
}

// 釋放鎖的 lua 腳本,保證併發安全
var deleteScript = `
    local val = redis.call("GET", KEYS[1])
    if val == ARGV[1] then
        return redis.call("DEL", KEYS[1])
    elseif val == false then
        return -1
    else
        return 0
    end
`

// Unlock 釋放鎖
func (m *MiniRedisMutex) Unlock(ctx context.Context, value string) (bool, error) {
    // 執行 lua 腳本,Redis 會保證其併發安全
    status, err := m.conn.Eval(ctx, deleteScript, []string{m.name}, value).Result()
    if err != nil {
        returnfalse, err
    }
    if status == int64(-1) {
        returnfalse, ErrLockAlreadyExpired
    }
    return status != int64(0), nil
}

其實,這段代碼的主要邏輯,都是我從 redsync 源碼中提取出來。所以 redsync 其實也是這樣實現的,只不過它內部增加了很多可靠性和邊緣場景等邏輯代碼,最核心的加鎖和解鎖邏輯是一樣的。

微型分佈式鎖使用

下面我們來寫一個示例程序,演示下如何使用這個微型的分佈式鎖:

package main

import (
    "fmt"
    "time"

    goredislib "github.com/redis/go-redis/v9"
    "golang.org/x/net/context"

    "github.com/jianghushinian/blog-go-example/redsync/miniredislock"
)

func main() {
    // 創建一個 Redis 客戶端
    client := goredislib.NewClient(&goredislib.Options{
        Addr:     "localhost:36379", // Redis 服務器地址
        Password: "nightwatch",
    })
    defer client.Close()

    // 創建一個名爲 "test-miniredislock" 的互斥鎖
    mutex := miniredislock.NewMutex("test-miniredislock", 5*time.Second, client)

    ctx := context.Background()
    // 互斥鎖的值應該是一個隨機值
    value := "random-string"

    // 獲取鎖
    _, err := mutex.Lock(ctx, value)
    if err != nil {
        panic(err)
    }

    // 執行業務邏輯
    fmt.Println("do something...")
    time.Sleep(3 * time.Second)

    // 釋放自己持有的鎖
    _, err = mutex.Unlock(ctx, value)
    if err != nil {
        panic(err)
    }
}

這個示例的具體邏輯我就不逐行講解了,相信你一看便懂。也希望你能夠自己在本機上跑起來這段代碼,真正用一下分佈式鎖,以此加深理解。

最後我再留一個作業,你可以嘗試一下實現鎖的續期方法 Extend

總結

分佈式鎖可以確保分佈式系統中併發安全的訪問競態資源,redsync 作爲 Go 中最流行的 Redis 分佈式鎖方案,非常值得我們學習和使用。

redsync 的用法非常簡單,加鎖和解鎖操作與 sync.Mutex 也非常類似,沒有太多的學習成本。不過,爲了避免持有鎖的進程掛掉時,其他進程還有機會獲取鎖,我們需要實現看門狗的功能。

我還帶你從零實現了一個微型的 Redis 分佈式鎖,希望你不僅會用 redsync 分佈式鎖,還能理解其原理,這樣在自己的業務開發中,如果遇到問題,我們才能更加得心應手。

本文示例源碼我都放在了 GitHub 中,歡迎點擊查看。

希望此文能對你有所啓發。

延伸閱讀

聯繫我

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/8HJXRcCoyZqeC_b6l-TD0w