使用 Redis 和 Golang 解決併發問題

在構建分佈式系統和數據庫(如 Redis)時,併發問題可能會出現。本文將通過一個股票交易的例子,展示如何使用 Redis 和 Golang 來解決這些問題。

問題定義

場景: 構建一個股票交易應用,多個用戶可以同時購買不同公司的股票。每個公司都有一個剩餘的股票數量,用戶只能購買剩餘的股票。

代碼:

type Repository struct {
  client goRedis.Client
}

func NewRepository(address string) Repository {
  return Repository{
    client: *goRedis.NewClient(&goRedis.Options{
      Addr:     address,
    }),
  }
}

func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
  defer wg.Done()

  // 獲取當前剩餘股票數量
  currentShares, err := r.client.Get(ctx, companySharesKey).Int()
  if err != nil {
    fmt.Print(err.Error())
    return err
  }

  // 驗證剩餘股票數量是否足夠
  if currentShares < numShares {
    fmt.Print("error: company does not have enough shares \n")
    return errors.New("error: company does not have enough shares")
  }
  currentShares -= numShares

  // 更新公司剩餘股票數量
  r.client.Set(ctx, BuildCompanySharesKey(companyId), currentShares, 0)
  return nil
}

func main() {
  repository := redis.NewRepository(fmt.Sprintf("XXXXXX:XXXXX", config.Redis.Host, config.Redis.Port), config.Redis.Pass)

  // 運行併發客戶端
  companyId := "TestCompanySL"
  var wg sync.WaitGroup
  wg.Add(total_clients)

  for idx := 1; idx <= total_clients; idx++ {
    userId := fmt.Sprintf("user%d", idx)
    go repository.BuyShares(context.Background(), userId, companyId, 100, &wg)
  }
  wg.Wait()

  // 獲取公司剩餘股票數量
  shares, err := repository.GetCompanyShares(context.Background(), companyId)
  if err != nil {
    panic(err)
  }
  fmt.Printf("the number of free shares the company %s has is: %d", companyId, shares)
}

問題: 當多個用戶同時購買股票時,由於多個 goroutine 同時讀取和更新 currentShares,導致最終的剩餘股票數量不正確。

解決方法

1. 原子操作

思路: 使用 Redis 原子操作 IncrBy 來更新 currentShares,避免多個 goroutine 同時修改。

代碼:

func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
  defer wg.Done()

  // 獲取當前剩餘股票數量
  currentShares, err := tx.Get(ctx, companySharesKey).Int()
  if err != nil {
    fmt.Print(err.Error())
    return err
  }

  // 驗證剩餘股票數量是否足夠
  if currentShares < numShares {
    fmt.Print("error: company does not have enough shares \n")
    return errors.New("error: company does not have enough shares")
  }

  // 使用 IncrBy 原子操作更新剩餘股票數量
  r.client.IncrBy(ctx, BuildCompanySharesKey(companyId), -1*int64(numShares))
  return nil
}

問題: 該方法仍然存在問題。因爲在執行 IncrBy 操作之前,多個 goroutine 已經讀取了 currentShares,導致驗證邏輯失效。

2. 事務

思路: 使用 Redis 事務來確保數據的一致性。事務可以將多個操作打包在一起,要麼全部執行,要麼全部不執行。

代碼:

func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
  defer wg.Done()

  companySharesKey := BuildCompanySharesKey(companyId)

  err := r.client.Watch(ctx, func(tx *goredislib.Tx) error {
    // 獲取當前剩餘股票數量
    currentShares, err := tx.Get(ctx, companySharesKey).Int()
    if err != nil {
      fmt.Print(fmt.Errorf("error getting value %v", err.Error()))
      return err
    }

    // 驗證剩餘股票數量是否足夠
    if currentShares < numShares {
      fmt.Print("error: company does not have enough shares \n")
      return errors.New("error: company does not have enough shares")
    }
    currentShares -= numShares

    // 更新公司剩餘股票數量
    _, err = tx.TxPipelined(ctx, func(pipe goredislib.Pipeliner) error {
      // pipe handles the error case
      pipe.Pipeline().Set(ctx, companySharesKey, currentShares, 0)
      return nil
    })
    if err != nil {
      fmt.Println(fmt.Errorf("error in pipeline %v", err.Error()))
      return err
    }
    return nil
  }, companySharesKey)
  return err
}

問題: 該方法仍然存在問題。因爲多個 goroutine 同時進入事務,導致驗證邏輯失效。

3. LUA 腳本

思路: 使用 Redis LUA 腳本,將讀取、驗證和更新操作封裝在一個腳本中,確保原子性。

代碼:

var BuyShares = goRedis.NewScript(`
  local sharesKey = KEYS[1]
  local requestedShares = ARGV[1]
  local currentShares = redis.call("GET", sharesKey)
  if currentShares < requestedShares then
    return {err = "error: company does not have enough shares"}
  end

  currentShares = currentShares - requestedShares
  redis.call("SET", sharesKey, currentShares)
`)

func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
  defer wg.Done()

  keys := []string{BuildCompanySharesKey(companyId)}
  err := BuyShares.Run(ctx, r.client, keys, numShares).Err()
  if err != nil {
    fmt.Println(err.Error())
  }
  return err
}

優點:

缺點:

4. Redis 鎖

思路: 使用 Redis 鎖來控制對 currentShares 的訪問,確保只有一個 goroutine 可以訪問。

代碼:

func NewRepository(address, password string) Repository {
  client := goredislib.NewClient(&goredislib.Options{
    Addr:     address,
    Password: password,
  })
  pool := goredis.NewPool(client)
  rs := redsync.New(pool)
  mutexname := "my-global-mutex"
  mutex := rs.NewMutex(mutexname)

  return Repository{
    client: client,
    mutex: mutex,
  }
}

func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
  defer wg.Done()

  // 獲取鎖
  if err := r.mutex.Lock(); err != nil {
    fmt.Printf("error during lock: %v \n", err)
    return err
  }
  defer func() {
    if ok, err := r.mutex.Unlock(); !ok || err != nil {
      fmt.Printf("error during unlock: %v \n", err)
    }
  }()

  // 獲取當前剩餘股票數量
  currentShares, err := r.client.Get(ctx, BuildCompanySharesKey(companyId)).Int()
  if err != nil {
    fmt.Print(err.Error())
    return err
  }

  // 驗證剩餘股票數量是否足夠
  if currentShares < numShares {
    fmt.Print("error: company does not have enough shares \n")
    return errors.New("error: company does not have enough shares")
  }
  currentShares -= numShares

  // 更新公司剩餘股票數量
  r.client.Set(ctx, BuildCompanySharesKey(companyId), currentShares, 0)

  return nil
}

優點:

缺點:

總結

Redis 提供了多種方法來解決併發問題,包括原子操作、事務、LUA 腳本和 Redis 鎖。選擇哪種方法取決於具體的場景和需求。

擴展

希望本文能夠幫助您瞭解如何使用 Redis 和 Golang 解決併發問題。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/9OFD5PywddLkm3X37q-Owg