使用 Redis 和 Golang 解決併發問題
在構建分佈式系統和數據庫(如 Redis)時,併發問題可能會出現。本文將通過一個股票交易的例子,展示如何使用 Redis 和 Golang 來解決這些問題。
問題定義
場景: 構建一個股票交易應用,多個用戶可以同時購買不同公司的股票。每個公司都有一個剩餘的股票數量,用戶只能購買剩餘的股票。
代碼:
type Repository struct {
client goRedis.Client
}
func NewRepository(address string) Repository {
return Repository{
client: *goRedis.NewClient(&goRedis.Options{
Addr: address,
}),
}
}
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
// 獲取當前剩餘股票數量
currentShares, err := r.client.Get(ctx, companySharesKey).Int()
if err != nil {
fmt.Print(err.Error())
return err
}
// 驗證剩餘股票數量是否足夠
if currentShares < numShares {
fmt.Print("error: company does not have enough shares \n")
return errors.New("error: company does not have enough shares")
}
currentShares -= numShares
// 更新公司剩餘股票數量
r.client.Set(ctx, BuildCompanySharesKey(companyId), currentShares, 0)
return nil
}
func main() {
repository := redis.NewRepository(fmt.Sprintf("XXXXXX:XXXXX", config.Redis.Host, config.Redis.Port), config.Redis.Pass)
// 運行併發客戶端
companyId := "TestCompanySL"
var wg sync.WaitGroup
wg.Add(total_clients)
for idx := 1; idx <= total_clients; idx++ {
userId := fmt.Sprintf("user%d", idx)
go repository.BuyShares(context.Background(), userId, companyId, 100, &wg)
}
wg.Wait()
// 獲取公司剩餘股票數量
shares, err := repository.GetCompanyShares(context.Background(), companyId)
if err != nil {
panic(err)
}
fmt.Printf("the number of free shares the company %s has is: %d", companyId, shares)
}
問題: 當多個用戶同時購買股票時,由於多個 goroutine 同時讀取和更新 currentShares,導致最終的剩餘股票數量不正確。
解決方法
1. 原子操作
思路: 使用 Redis 原子操作 IncrBy 來更新 currentShares,避免多個 goroutine 同時修改。
代碼:
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
// 獲取當前剩餘股票數量
currentShares, err := tx.Get(ctx, companySharesKey).Int()
if err != nil {
fmt.Print(err.Error())
return err
}
// 驗證剩餘股票數量是否足夠
if currentShares < numShares {
fmt.Print("error: company does not have enough shares \n")
return errors.New("error: company does not have enough shares")
}
// 使用 IncrBy 原子操作更新剩餘股票數量
r.client.IncrBy(ctx, BuildCompanySharesKey(companyId), -1*int64(numShares))
return nil
}
問題: 該方法仍然存在問題。因爲在執行 IncrBy 操作之前,多個 goroutine 已經讀取了 currentShares,導致驗證邏輯失效。
2. 事務
思路: 使用 Redis 事務來確保數據的一致性。事務可以將多個操作打包在一起,要麼全部執行,要麼全部不執行。
代碼:
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
companySharesKey := BuildCompanySharesKey(companyId)
err := r.client.Watch(ctx, func(tx *goredislib.Tx) error {
// 獲取當前剩餘股票數量
currentShares, err := tx.Get(ctx, companySharesKey).Int()
if err != nil {
fmt.Print(fmt.Errorf("error getting value %v", err.Error()))
return err
}
// 驗證剩餘股票數量是否足夠
if currentShares < numShares {
fmt.Print("error: company does not have enough shares \n")
return errors.New("error: company does not have enough shares")
}
currentShares -= numShares
// 更新公司剩餘股票數量
_, err = tx.TxPipelined(ctx, func(pipe goredislib.Pipeliner) error {
// pipe handles the error case
pipe.Pipeline().Set(ctx, companySharesKey, currentShares, 0)
return nil
})
if err != nil {
fmt.Println(fmt.Errorf("error in pipeline %v", err.Error()))
return err
}
return nil
}, companySharesKey)
return err
}
問題: 該方法仍然存在問題。因爲多個 goroutine 同時進入事務,導致驗證邏輯失效。
3. LUA 腳本
思路: 使用 Redis LUA 腳本,將讀取、驗證和更新操作封裝在一個腳本中,確保原子性。
代碼:
var BuyShares = goRedis.NewScript(`
local sharesKey = KEYS[1]
local requestedShares = ARGV[1]
local currentShares = redis.call("GET", sharesKey)
if currentShares < requestedShares then
return {err = "error: company does not have enough shares"}
end
currentShares = currentShares - requestedShares
redis.call("SET", sharesKey, currentShares)
`)
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
keys := []string{BuildCompanySharesKey(companyId)}
err := BuyShares.Run(ctx, r.client, keys, numShares).Err()
if err != nil {
fmt.Println(err.Error())
}
return err
}
優點:
-
確保原子性,解決併發問題。
-
提高性能,因爲腳本在 Redis 服務器上執行。
缺點:
-
需要學習 LUA 語言。
-
腳本執行期間會阻塞其他 Redis 操作。
4. Redis 鎖
思路: 使用 Redis 鎖來控制對 currentShares 的訪問,確保只有一個 goroutine 可以訪問。
代碼:
func NewRepository(address, password string) Repository {
client := goredislib.NewClient(&goredislib.Options{
Addr: address,
Password: password,
})
pool := goredis.NewPool(client)
rs := redsync.New(pool)
mutexname := "my-global-mutex"
mutex := rs.NewMutex(mutexname)
return Repository{
client: client,
mutex: mutex,
}
}
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
// 獲取鎖
if err := r.mutex.Lock(); err != nil {
fmt.Printf("error during lock: %v \n", err)
return err
}
defer func() {
if ok, err := r.mutex.Unlock(); !ok || err != nil {
fmt.Printf("error during unlock: %v \n", err)
}
}()
// 獲取當前剩餘股票數量
currentShares, err := r.client.Get(ctx, BuildCompanySharesKey(companyId)).Int()
if err != nil {
fmt.Print(err.Error())
return err
}
// 驗證剩餘股票數量是否足夠
if currentShares < numShares {
fmt.Print("error: company does not have enough shares \n")
return errors.New("error: company does not have enough shares")
}
currentShares -= numShares
// 更新公司剩餘股票數量
r.client.Set(ctx, BuildCompanySharesKey(companyId), currentShares, 0)
return nil
}
優點:
-
簡單易用。
-
適用於各種場景。
缺點:
- 性能可能不如 LUA 腳本。
總結
Redis 提供了多種方法來解決併發問題,包括原子操作、事務、LUA 腳本和 Redis 鎖。選擇哪種方法取決於具體的場景和需求。
擴展
-
可以使用 Redis 發佈 / 訂閱功能來實現實時股票價格更新。
-
可以使用 Redis 流來記錄股票交易歷史。
-
可以使用 Redis 集羣來提高性能和可用性。
希望本文能夠幫助您瞭解如何使用 Redis 和 Golang 解決併發問題。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/9OFD5PywddLkm3X37q-Owg