用 Redis 做一個可靠的延遲隊列

我們先看看以下業務場景:

  1. 當訂單一直處於未支付狀態時,如何及時的關閉訂單,並退還庫存?

  2. 新創建店鋪,N 天內沒有上傳商品,系統如何知道該信息,併發送激活短信?

上述場景最簡單直接的解決方案是定時掃表。我們假設 10 分鐘未支付則關閉訂單、定時任務設置爲 5 分鐘一次,那麼一個訂單最晚會在 15 分鐘關閉。高達 5 分鐘的誤差是業務難以接受的。另一方面頻繁的掃表可能消耗過多數據庫資源,影響線上交易吞吐量。

此外還有朋友使用 Redis 的過期通知、時間輪、Java 的 DelayQueue 等方式實現延時任務。我們在之前的文章中討論過他們的缺陷:比如使用 Redis 過期通知不保證準時、發送即忘不保證送達,時間輪缺乏持久化機制容易丟失等。

總結一下,我們對於延時隊列的要求有下列幾條(從重要到不重要排列):

  1. 持久化: 服務重啓或崩潰不能丟失任務

  2. 確認重試機制: 任務處理失敗或超時應該有重試

  3. 定時儘量精確

最合適的解決方案是使用 Pulsa、RocketMQ 等專業消息隊列的延時投遞功能。不過引入新的中間件通常存在各種非技術方面的麻煩。Redis 作爲廣泛使用的中間件,何不用 Redis 來製作延時隊列呢?

使用有序集合結構實現延時隊列的方法已經廣爲人知,無非是將消息作爲有序集合的 member 投遞時間戳作爲 score,使用 zrangebyscore 命令搜索已到投遞時間的消息然後將其發給消費者。

除了基本的延時投遞之外我們的消息隊列具有下列優勢:

  1. 提供 ACK 和重試機制

  2. 只需要 Redis 和消費者即可運行,無需其它組件

  3. 提供 At-Least-One 投遞語義、並保證消息不會併發消費

本文的完整代碼實現在 hdt3213/delayqueue,可以直接 go get github.com/hdt3213/delayqueue 完成安裝。

具體使用也非常簡單,只需要註冊處理消息的回調函數並調用 start() 即可:

package main
import (
  "github.com/go-redis/redis/v8"
  "github.com/hdt3213/delayqueue"
  "strconv"
  "time"
)
func main() {
  redisCli := redis.NewClient(&redis.Options{
    Addr: "127.0.0.1:6379",
  })
  queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
    // 註冊處理消息的回調函數
        // 返回 true 表示已成功消費,返回 false 消息隊列會重新投遞次消息
    return true
  })
  // 發送延時消息
  for i := 0; i < 10; i++ {
    err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
    if err != nil {
      panic(err)
    }
  }
  // 啓動消費協程
  done := queue.StartConsume()
  // 阻塞等待消費協程退出
  <-done
}

由於數據存儲在 redis 中所以我們最多能保證在 redis 無故障且消息隊列相關 key 未被外部篡改的情況下不會丟失消息。

原理詳解

消息隊列涉及幾個關鍵的 redis 數據結構:

流程如下圖所示:

由於我們允許分佈式地部署多個消費者,每個消費者都在定時執行 lua 腳本,所以多個消費者可能處於上述流程中不同狀態,我們無法預知(或控制)上圖中五個操作發生的先後順序,也無法控制有多少實例正在執行同一個操作。

因此我們需要保證上圖中五個操作滿足三個條件:

  1. 都是原子性的

  2. 不會重複處理同一條消息

  3. 操作前後消息隊列始終處於正確的狀態

只要滿足這三個條件,我們就可以部署多個實例且不需要使用分佈式鎖等技術來進行狀態同步。

是不是聽起來有點嚇人?😂 其實簡單的很,讓我們一起來詳細看看吧~

pending2ReadyScript

pending2ReadyScript 使用 zrangebyscore 掃描已到投遞時間的消息 ID 並把它們移動到 ready 中:

-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- 從 pending key 中找出已到投遞時間的消息
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- 將他們放入 ready key 中
for _,v in ipairs(msgs) do
  table.insert(args2, v) 
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- 從 pending key 中刪除已投遞的消息

ready2UnackScript

ready2UnackScript 從 ready 或者 retry 中取出一條消息發送給消費者並放入 unack 中,類似於 RPopLPush:

-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg

unack2RetryScript

unack2RetryScript 從 retry 中找出所有已到重試時間的消息並把它們移動到 unack 中:

-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- 找到已到重試時間的消息
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查詢剩餘重試次數
for i,v in ipairs(retryCounts) do
  local k = msgs[i]
  if tonumber(v) > 0 then -- 剩餘次數大於 0
    redis.call("HIncrBy", KEYS[2], k, -1) -- 減少剩餘重試次數
    redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中
  else -- 剩餘重試次數爲 0
    redis.call("HDel", KEYS[2], k) -- 刪除重試次數記錄
    redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待後續刪除
  end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- 將已處理的消息從 unack key 中刪除
因爲 redis 要求 lua 腳本必須在執行前在 KEYS 參數中聲明自己要訪問的 key, 而我們將每個 msg 有一個獨立的 key,我們在執行 unack2RetryScript 之前是不知道哪些 msg key 需要被刪除。所以 lua 腳本只將需要刪除的消息記在 garbage key 中,腳本執行完後再通過 del 命令將他們刪除:
func (q *DelayQueue) garbageCollect() error {
  ctx := context.Background()
  msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
  if err != nil {
    return fmt.Errorf("smembers failed: %v", err)
  }
  if len(msgIds) == 0 {
    return nil
  }
  // allow concurrent clean
  msgKeys := make([]string, 0, len(msgIds))
  for _, idStr := range msgIds {
    msgKeys = append(msgKeys, q.genMsgKey(idStr))
  }
  err = q.redisCli.Del(ctx, msgKeys...).Err()
  if err != nil && err != redis.Nil {
    return fmt.Errorf("del msgs failed: %v", err)
  }
  err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
  if err != nil && err != redis.Nil {
    return fmt.Errorf("remove from garbage key failed: %v", err)
  }
  return nil
}
之前提到的 lua 腳本都是原子性執行的,不會有其它命令插入其中。gc 函數由 3 條 redis 命令組成,在執行過程中可能會有其它命令插入執行過程中,不過考慮到一條消息進入垃圾回收流程之後不會復活所以不需要保證 3 條命令原子性。

ack

ack 只需要將消息徹底刪除即可:

func (q *DelayQueue) ack(idStr string) error {
  ctx := context.Background()
  err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
  if err != nil {
    return fmt.Errorf("remove from unack failed: %v", err)
  }
  // msg key has ttl, ignore result of delete
  _ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
  q.redisCli.HDel(ctx, q.retryCountKey, idStr)
  return nil
}

否定確認只需要將 unack key 中消息的重試時間改爲現在,隨後執行的 unack2RetryScript 會立即將它移動到 retry key

func (q *DelayQueue) nack(idStr string) error {
  ctx := context.Background()
  // update retry time as now, unack2Retry will move it to retry immediately
  err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
    Member: idStr,
    Score:  float64(time.Now().Unix()),
  }).Err()
  if err != nil {
    return fmt.Errorf("negative ack failed: %v", err)
  }
  return nil
}

consume

消息隊列的核心邏輯是每秒執行一次的 consume 函數,它負責調用上述腳本將消息轉移到正確的集合中並回調 consumer 來消費消息:

func (q *DelayQueue) consume() error {
  // 執行 pending2ready,將已到時間的消息轉移到 ready
  err := q.pending2Ready()
  if err != nil {
    return err
  }
  // 循環調用 ready2Unack 拉取消息進行消費
  var fetchCount uint
  for {
    idStr, err := q.ready2Unack()
    if err == redis.Nil { // consumed all
      break
    }
    if err != nil {
      return err
    }
    fetchCount++
    ack, err := q.callback(idStr)
    if err != nil {
      return err
    }
    if ack {
      err = q.ack(idStr)
    } else {
      err = q.nack(idStr)
    }
    if err != nil {
      return err
    }
    if fetchCount >= q.fetchLimit {
      break
    }
  }
  // 將 nack 或超時的消息放入重試隊列
  err = q.unack2Retry()
  if err != nil {
    return err
  }
    // 清理已達到最大重試次數的消息
  err = q.garbageCollect()
  if err != nil {
    return err
  }
  // 消費重試隊列
  fetchCount = 0
  for {
    idStr, err := q.retry2Unack()
    if err == redis.Nil { // consumed all
      break
    }
    if err != nil {
      return err
    }
    fetchCount++
    ack, err := q.callback(idStr)
    if err != nil {
      return err
    }
    if ack {
      err = q.ack(idStr)
    } else {
      err = q.nack(idStr)
    }
    if err != nil {
      return err
    }
    if fetchCount >= q.fetchLimit {
      break
    }
  }
  return nil
}

至此一個簡單可靠的延時隊列就做好了,何不趕緊開始試用呢😘😋?

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/rZIIL1TtAgwliYI0GpVMMg