用 Go 語言 - Redis 實現分佈式鎖,我還是第一次

一 爲什麼需要分佈式鎖

共享資源訪問控制: 當多個節點需要同時訪問共享資源時,爲了避免併發寫入導致數據不一致,需要使用分佈式鎖確保同時只有一個節點可以寫入或修改共享資源。

避免重複執行: 在分佈式系統中,某些操作可能需要在整個系統中只執行一次,比如定時任務、數據初始化等。爲了避免多個節點同時執行這些操作,需要使用分佈式鎖來確保只有一個節點可以執行。

任務協調: 在分佈式任務隊列中,多個節點競爭執行任務時,可能需要對任務進行加鎖,以確保每個任務只被一個節點執行,避免重複執行或者操作衝突。

防止死鎖: 在分佈式系統中,由於網絡延遲、節點故障等原因,可能會導致死鎖情況的發生。分佈式鎖可以用來避免死鎖的發生,通過設置合理的超時時間和重試機制,確保鎖在一定時間內被釋放。

分佈式系統中共享同一個資源時,就需要分佈式鎖來確保變更資源的一致性。這就是爲什麼要用到分佈式鎖的原因咯。

二、分佈式鎖需要具備特性

1 互斥性(Mutual Exclusion): 在任何時刻,只能有一個客戶端持有鎖,其他客戶端不能同時持有該鎖。這是最基本的鎖特性,確保在同一時間只有一個客戶端能夠訪問共享資源。

2 安全性(Safety): 在鎖被釋放之前,任何其他客戶端都不能獲得該鎖。即使是在網絡分區、節點故障等異常情況下,也要確保鎖的安全性,避免數據不一致或者操作衝突。

3 活性(Liveness): 鎖應該能夠在合理的時間內被獲取,避免長時間的等待導致死鎖或者無法響應其他客戶端請求。活性也包括在鎖被釋放後,其他客戶端能夠儘快地獲取到該鎖。

4 容錯性(Fault Tolerance): 分佈式系統中可能會發生網絡分區、節點故障等異常情況,分佈式鎖需要具備容錯性,能夠在這些異常情況下正確地工作。比如,鎖的實現應該能夠處理網絡分區導致的消息丟失或者超時等情況。

5 性能(Performance): 鎖的實現應該儘可能地減少鎖競爭和通信開銷,提高系統的性能。例如,可以使用高效的算法和數據結構來減少鎖的持有時間和等待時間,或者採用緩存和批處理等技術來減少通信開銷。

6 可擴展性(Scalability): 鎖的實現應該能夠隨着系統規模的增長而擴展,確保在高併發和大規模的分佈式環境下仍然能夠保持良好的性能和可用性。

三、實現 Redis 鎖應先掌握的知識點

set 命令
SET key value [EX seconds] [PX milliseconds] [NX|XX]

Redis.lua 腳本

我們可以使用 redis lua 腳本,將一系列命令操作封裝成 pipline,實現整體操作的原子性。

加鎖的整個流程,詳細原理說明看註釋

-- Lua 腳本實現 Redis 分佈式鎖

-- 生成唯一標識
local requestId = ARGV[1]

-- 嘗試獲取鎖
local lockKey = KEYS[1]
local lockValue = requestId
local lockExpireTime = tonumber(ARGV[2])

local result = redis.call('SET', lockKey, lockValue, 'NX''PX', lockExpireTime)

-- 判斷獲取鎖的結果
if result == 'OK' then
    -- 獲取鎖成功,設置鎖的過期時間
    return 'OK'
else
    -- 獲取鎖失敗
    return 'FAIL'
end

1 生成唯一標識: 首先,在客戶端生成一個唯一的標識,可以是 UUID、Snowflake 算法生成的分佈式 ID 等。

2 嘗試獲取鎖: 客戶端將生成的唯一標識作爲參數,調用 Redis 的 SET 命令嘗試獲取鎖。可以使用 NX(如果鍵不存在則設置)和 PX(設置鍵的過期時間)選項,確保只有一個客戶端能夠成功獲取到鎖。

3 判斷獲取鎖的結果: 如果獲取鎖成功,SET 命令會返回 OK,表示當前客戶端成功獲取了鎖。如果獲取鎖失敗,說明已經有其他客戶端持有了鎖,此時客戶端需要進行等待或者返回失敗。

4 設置鎖的過期時間: 在成功獲取鎖之後,客戶端需要設置鎖的過期時間,以防止因爲客戶端崩潰或者其他原因導致鎖一直佔用,造成死鎖。

5 返回獲取鎖的結果: 根據 SET 命令的返回值,客戶端判斷是否成功獲取到了鎖,並將結果返回給調用方。

加鎖流程圖

解鎖流程

if redis.call("get", KEYS[1]) == ARGV[1] then

    return redis.call("del", KEYS[1])

else

    return 0

end

1 使用 KEYS[1] 獲取傳入的鎖鍵名。

2 使用 ARGV[1] 獲取傳入的鎖值(即加鎖時設置的唯一標識)。

3 判斷當前鎖是否存在且鎖值與傳入的鎖值相同,若是,則調用 DEL 命令刪除該鎖,並返回 1 表示解鎖成功。

4 若鎖不存在或鎖值不匹配,則返回 0 表示解鎖失敗。

解鎖的流程圖

源碼解析

package redis

import (
    "math/rand"
    "strconv"
    "sync/atomic"
    "time"

    red "github.com/go-redis/redis"
    "github.com/tal-tech/go-zero/core/logx"
)

const (
    letters  = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
    randomLen = 16

    // 默認超時時間,用來防止死鎖
    tolerance       = 300 // milliseconds
    millisPerSecond = 800

    lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
    redis.call("SET", KEYS[1], ARGV[1]"PX", ARGV[2])
    return "OK"
else
    return redis.call("SET", KEYS[1], ARGV[1]"NX""PX", ARGV[2])
end`

    delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("DEL", KEYS[1])
else
    return 0
end`

)

type redisLock struct {
    // redis客戶端
    store *Redis
    // 超時時間
    seconds uint32
    // 鎖key
    keys string
    // 鎖value,防止鎖被別人獲取到
    value string
}

func init() {
    rand.Seed(time.Now().UnixNano())
}

// NewRedisLock returns a RedisLock.
func NewRedisLock(store *Redis, keys string) *RedisLock {
    return &RedisLock{
        store: store,
        keys:   keys,
        // 獲取鎖時,鎖的值通過隨機字符串生成
        // 實際上go-zero提供更加高效的隨機字符串生成方式
        // 見core/stringx/random.go:Randn
        value:    randomStr(randomLen),
    }
}

// Acquire acquires the lock.
// 加鎖
func (rl *RedisLock) Acquire() (bool, error) {
    // 獲取過期時間
    seconds := atomic.LoadUint32(&rl.seconds)
    // 默認鎖過期時間爲500ms,防止死鎖
    resp, err := rl.store.Eval(lockCommand, []string{rl.keys}[]string{
        rl.value, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),
    })
    if err == red.Nil {
        return false, nil
    } else if err != nil {
        logx.Errorf("Error on lock for %s, %s", rl.key, err.Error())
        return false, err
    } else if resp == nil {
        return false, nil
    }

    reply, ok := resp.(string)
    if ok && reply == "OK" {
        return true, nil
    }

    logx.Errorf("Unknown reply lock for %s: %v", rl.keys, resp)
    return false, nil
}

// Release releases the lock.
// 釋放鎖
func (rl *RedisLock) Release() (bool, error) {
    resp, err := rl.store.Eval(delCommand, []string{rl.keys}[]string{rl.value})
    if err != nil {
        return false, err
    }

    reply, ok := resp.(int64)
    if !ok {
        return false, nil
    }

    return reply == 1, nil
}

func randomStr(n int) string {
    b := make([]byte, n)
    for i := range b {
        b[i] = letters[rand.Intn(len(letters))]
    }
    return string(b)
}

// SetExpire sets the expire.
// 需要注意的是需要在Acquire()之前調用
// 不然默認爲300ms自動釋放
func (rl *RedisLock) SetExpire(seconds int) {
    atomic.StoreUint32(&rl.seconds, uint32(seconds))
}

這個詳細源碼根據自己的業務需要,可以利用。

文章首發:https://learnku.com/articles/64413

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ZKILIAUIwzddM_sAAvjVew