golang 源碼分析:redsync

        https://github.com/go-redsync/redsync 是 golang 實現的一個 redis 分佈式鎖,支持 quorum 機制,內部通過委託模式支持 https://github.com/redis/go-redis 客戶端和 https://github.com/gomodule/redigo 客戶端。首先看下如何使用,然後分析下它的源碼具體實現。

package main
import (
  goredislib "github.com/go-redis/redis/v8"
  "github.com/go-redsync/redsync/v4"
  "github.com/go-redsync/redsync/v4/redis/goredis/v8"
)
func main() {
  // 創建一個redis的客戶端連接
  client := goredislib.NewClient(&goredislib.Options{
    Addr: "localhost:6379",
  })
  /*
    // 創建一個redis集羣模式的客戶端連接
    client := goredislib.NewClusterClient(&goredislib.ClusterOptions{
        Addr: []string{"localhost:6379"},
    })
  */
  // 創建redsync的客戶端連接池
  pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
  // 創建redsync實例
  rs := redsync.New(pool)
  // 通過相同的key值名獲取同一個互斥鎖.
  mutexname := "my-global-mutex"
  //創建基於key的互斥鎖
  mutex := rs.NewMutex(mutexname)
  // 對key進行加鎖
  if err := mutex.Lock(); err != nil {
    panic(err)
  }
  // 鎖續期
  if ok, err := mutex.Extend(); err != nil || !ok {
    panic(err)
  }
  // 釋放互斥鎖
  if ok, err := mutex.Unlock(); !ok || err != nil {
    panic("unlock failed")
  }
}

        可以看到,它的核心分爲下面幾步:

1,獲取 redis 連接池

2,創建 redsync 的客戶端連接池

3, 創建 redsync 實例

4,創建基於 key 的互斥鎖

5,對 key 進行加鎖

6,鎖續期

7,釋放互斥鎖

        下面我們依次按照上述幾步分析下它的源碼:

首先就是普通的 redis-client 創建連接池

// 創建redsync的客戶端連接池
  pool := goredis.NewPool(client)

然後就是基於委託模式 redis/goredis/v8/goredis.go 對連接池進行封裝

// NewPool returns a Goredis-based pool implementation.
func NewPool(delegate redis.UniversalClient) redsyncredis.Pool {
  return &pool{delegate}
}
type pool struct {
  delegate redis.UniversalClient
}

創建 redsync 實例源碼位於 redsync.go,傳入多個 redis 連接池實例就實現了紅鎖。

func New(pools ...redis.Pool) *Redsync {
  return &Redsync{
    pools: pools,
  }
}
// Redsync provides a simple method for creating distributed mutexes using multiple Redis connection pools.
type Redsync struct {
  pools []redis.Pool
}

創建基於 key 的互斥鎖,就是初始化鎖需要的參數,源碼位於 redsync.go

// NewMutex returns a new distributed mutex with given name.
func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
  m := &Mutex{
    name:   name,
    expiry: 8 * time.Second,
    tries:  32,
    delayFunc: func(tries int) time.Duration {
      return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
    },
    genValueFunc:  genValue,
    driftFactor:   0.01,
    timeoutFactor: 0.05,
    quorum:        len(r.pools)/2 + 1,
    pools:         r.pools,
  }
  for _, o := range options {
    o.Apply(m)
  }
  return m
}
// A Mutex is a distributed mutual exclusion lock.
type Mutex struct {
  name   string
  expiry time.Duration
  tries     int
  delayFunc DelayFunc
  driftFactor   float64
  timeoutFactor float64
  quorum int
  genValueFunc func() (string, error)
  value        string
  until        time.Time
  pools []redis.Pool
}

加鎖的源碼位於 mutex.go,所有的操作函數都封裝了兩個版本,帶 context 和不帶 context 的

func (m *Mutex) Lock() error {
  return m.LockContext(nil)
}

加鎖過程是先獲取鎖,如果超過一半節點上獲取成功,則認爲加鎖成功,否則釋放已經加鎖的節點:

func (m *Mutex) LockContext(ctx context.Context) error {
    value, err := m.genValueFunc()
    case <-time.After(m.delayFunc(i)):
        n, err := func() (int, error) {
      ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
      defer cancel()
      return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
        return m.acquire(ctx, pool, value)
      })
    }()
    now := time.Now()
    until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
    if n >= m.quorum && now.Before(until) {
      m.value = value
      m.until = until
      return nil
    }
    func() (int, error) {
      ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
      defer cancel()
      return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
        return m.release(ctx, pool, value)
      })
    }()

如果加鎖失敗,會進行隨機重試。從各個節點獲取執行結果的邏輯抽象出了一個函數: 起多個 goroutine 發起請求,然後通過一個阻塞的 chan 來收集結果,返回上游供決策:

func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
  ch := make(chan result)
  for node, pool := range m.pools {
    go func(node int, pool redis.Pool) {
      r := result{Node: node}
      r.Status, r.Err = actFn(pool)
      ch <- r
    }(node, pool)
  }
  for range m.pools {
    r := <-ch
    if r.Status {
      n++
    } else if r.Err != nil {
      err = multierror.Append(err, &RedisError{Node: r.Node, Err: r.Err})
    } else {
      taken = append(taken, r.Node)
      err = multierror.Append(err, &ErrNodeTaken{Node: r.Node})
    }
  }
  if len(taken) >= m.quorum {
    return n, &ErrTaken{Nodes: taken}
  }
  return n, err

具體執行單節點加鎖的邏輯位於

func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
    conn, err := pool.Get(ctx)
    reply, err := conn.SetNX(m.name, value, m.expiry)

redis/goredis/goredis.go

func (c *conn) SetNX(name string, value string, expiry time.Duration) (bool, error) {
  ok, err := c.delegate.SetNX(name, value, expiry).Result()
  return ok, noErrNil(err)
}

釋放鎖的操作就是執行 lua 腳本,先判斷鎖是不是自己加的,如果是就釋放

func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
        status, err := conn.Eval(deleteScript, m.name, value)
var deleteScript = redis.NewScript(1, `
  if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("DEL", KEYS[1])
  else
    return 0
  end
`)

釋放互斥鎖的邏輯和加鎖類似,底層函數是一樣的

func (m *Mutex) Unlock() (bool, error) {
  return m.UnlockContext(nil)
}
func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {
        // UnlockContext unlocks m and returns the status of unlock.
func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {
  n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
    return m.release(ctx, pool, m.value)
  })
  if n < m.quorum {
    return false, err
  }
  return true, nil
}

最後看下續期操作,如果本地事務耗時特別長,鎖過期時間內完不成操作就需要鎖續期 mutex.go

// Extend resets the mutex's expiry and returns the status of expiry extension.
func (m *Mutex) Extend() (bool, error) {
  return m.ExtendContext(nil)
}
func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
          n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
    return m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond))
  })
          now := time.Now()
  until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
  if now.Before(until) {
    m.until = until
    return true, nil
  }

也是執行 lua 腳本,先看看是不是自己加的鎖,如果是則修改過期時間

func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
        status, err := conn.Eval(touchScript, m.name, value, expiry)
var touchScript = redis.NewScript(1, `
  if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("PEXPIRE", KEYS[1], ARGV[2])
  else
    return 0
  end
`)

除此之外,還有一個函數,判斷當前持有鎖是否有效,能獲取到值,和我們的 value 相等,說明有效。

func (m *Mutex) Valid() (bool, error) {
  return m.ValidContext(nil)
}
func (m *Mutex) ValidContext(ctx context.Context) (bool, error) {
  n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
    return m.valid(ctx, pool)
  })
  return n >= m.quorum, err
}
func (m *Mutex) valid(ctx context.Context, pool redis.Pool) (bool, error) {
        reply, err := conn.Get(m.name)
  if err != nil {
    return false, err
  }
  return m.value == reply, nil

        其實,到底需不需要紅鎖,我們需要判斷我們的業務場景和我們資源配置,資源允許、可用性要求很高,那麼可以使用紅鎖。那麼紅鎖真的萬無一失嗎?其實不然,首先我們的 value 是一個隨機值,既然是隨機的,就有可能相同,相同了,必然鎖失效。紅鎖是通過過半機制提升鎖的可用性,防止單節點掛掉。如果過半節點都掛了,鎖還可用嗎?

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/9SntfLZXENaUVe54NxN0Zg