用 Go - Redis 實現分佈式鎖
爲什麼需要分佈式鎖
- 用戶下單
鎖住 uid,防止重複下單。
- 庫存扣減
鎖住庫存,防止超賣。
- 餘額扣減
鎖住賬戶,防止併發操作。分佈式系統中共享同一個資源時往往需要分佈式鎖來保證變更資源一致性。
分佈式鎖需要具備特性
- 排他性
鎖的基本特性,並且只能被第一個持有者持有。
- 防死鎖
高併發場景下臨界資源一旦發生死鎖非常難以排查,通常可以通過設置超時時間到期自動釋放鎖來規避。
- 可重入
鎖持有者支持可重入,防止鎖持有者再次重入時鎖被超時釋放。
- 高性能高可用
鎖是代碼運行的關鍵前置節點,一旦不可用則業務直接就報故障了。高併發場景下,高性能高可用是基本要求。
實現 Redis 鎖應先掌握哪些知識點
- set 命令
SET key value [EX seconds] [PX milliseconds] [NX|XX]
-
EX
second :設置鍵的過期時間爲 second 秒。SET key value EX second 效果等同於 SETEX key second value 。 -
PX
millisecond :設置鍵的過期時間爲 millisecond 毫秒。SET key value PX millisecond 效果等同於 PSETEX key millisecond value 。 -
NX
:只在鍵不存在時,纔對鍵進行設置操作。SET key value NX 效果等同於 SETNX key value 。 -
XX
:只在鍵已經存在時,纔對鍵進行設置操作。
- Redis.lua 腳本
使用 redis lua 腳本能將一系列命令操作封裝成 pipline 實現整體操作的原子性。
go-zero 分佈式鎖 RedisLock 源碼分析
core/stores/redis/redislock.go
- 加鎖流程
-- KEYS[1]: 鎖key
-- ARGV[1]: 鎖value,隨機字符串
-- ARGV[2]: 過期時間
-- 判斷鎖key持有的value是否等於傳入的value
-- 如果相等說明是再次獲取鎖並更新獲取時間,防止重入時過期
-- 這裏說明是“可重入鎖”
if redis.call("GET", KEYS[1]) == ARGV[1] then
-- 設置
redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
return "OK"
else
-- 鎖key.value不等於傳入的value則說明是第一次獲取鎖
-- SET key value NX PX timeout : 當key不存在時才設置key的值
-- 設置成功會自動返回“OK”,設置失敗返回“NULL Bulk Reply”
-- 爲什麼這裏要加“NX”呢,因爲需要防止把別人的鎖給覆蓋了
return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
end
- 解鎖流程
-- 釋放鎖
-- 不可以釋放別人的鎖
if redis.call("GET", KEYS[1]) == ARGV[1] then
-- 執行成功返回“1”
return redis.call("DEL", KEYS[1])
else
return 0
end
- 源碼解析
package redis
import (
"math/rand"
"strconv"
"sync/atomic"
"time"
red "github.com/go-redis/redis"
"github.com/tal-tech/go-zero/core/logx"
)
const (
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
return "OK"
else
return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
end`
delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end`
randomLen = 16
// 默認超時時間,防止死鎖
tolerance = 500 // milliseconds
millisPerSecond = 1000
)
// A RedisLock is a redis lock.
type RedisLock struct {
// redis客戶端
store *Redis
// 超時時間
seconds uint32
// 鎖key
key string
// 鎖value,防止鎖被別人獲取到
id string
}
func init() {
rand.Seed(time.Now().UnixNano())
}
// NewRedisLock returns a RedisLock.
func NewRedisLock(store *Redis, key string) *RedisLock {
return &RedisLock{
store: store,
key: key,
// 獲取鎖時,鎖的值通過隨機字符串生成
// 實際上go-zero提供更加高效的隨機字符串生成方式
// 見core/stringx/random.go:Randn
id: randomStr(randomLen),
}
}
// Acquire acquires the lock.
// 加鎖
func (rl *RedisLock) Acquire() (bool, error) {
// 獲取過期時間
seconds := atomic.LoadUint32(&rl.seconds)
// 默認鎖過期時間爲500ms,防止死鎖
resp, err := rl.store.Eval(lockCommand, []string{rl.key}, []string{
rl.id, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),
})
if err == red.Nil {
return false, nil
} else if err != nil {
logx.Errorf("Error on acquiring lock for %s, %s", rl.key, err.Error())
return false, err
} else if resp == nil {
return false, nil
}
reply, ok := resp.(string)
if ok && reply == "OK" {
return true, nil
}
logx.Errorf("Unknown reply when acquiring lock for %s: %v", rl.key, resp)
return false, nil
}
// Release releases the lock.
// 釋放鎖
func (rl *RedisLock) Release() (bool, error) {
resp, err := rl.store.Eval(delCommand, []string{rl.key}, []string{rl.id})
if err != nil {
return false, err
}
reply, ok := resp.(int64)
if !ok {
return false, nil
}
return reply == 1, nil
}
// SetExpire sets the expire.
// 需要注意的是需要在Acquire()之前調用
// 不然默認爲500ms自動釋放
func (rl *RedisLock) SetExpire(seconds int) {
atomic.StoreUint32(&rl.seconds, uint32(seconds))
}
func randomStr(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
關於分佈式鎖還有哪些實現方案
-
etcd
-
redis redlock
項目地址
https://github.com/zeromicro/go-zero
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/ESljL5Hoo962aId0REa3VQ