高可用延遲隊列設計與實現

延遲隊列:一種帶有 延遲功能 的消息隊列

  1. 延時 → 未來一個不確定的時間

  2. mq → 消費行爲具有順序性

這樣解釋,整個設計就清楚了。你的目的是 延時,承載容器是 mq。

背景

列舉一下我日常業務中可能存在的場景:

  1. 建立延時日程,需要提醒老師上課

  2. 延時推送 → 推送老師需要的公告以及作業

爲了解決以上問題,最簡單直接的辦法就是定時去掃表:

服務啓動時,開啓一個異步協程 → 定時掃描 msg table,到了事件觸發事件,調用對應的 handler

幾個缺點:

  1. 每一個需要定時 / 延時任務的服務,都需要一個 msg table 做額外存儲 → 存儲與業務耦合

  2. 定時掃描 → 時間不好控制,可能會錯過觸發時間

  3. 對 msg table instance 是一個負擔。反覆有一個服務不斷對數據庫產生持續不斷的壓力

最大問題其實是什麼?

調度模型基本統一,不要做重複的業務邏輯

我們可以考慮將邏輯從具體的業務邏輯裏面抽出來,變成一個公共的部分。

而這個調度模型,就是 延時隊列

其實說白了:

延時隊列模型,就是將未來執行的事件提前存儲好,然後不斷掃描這個存儲,觸發執行時間則執行對應的任務邏輯。

那麼開源界是否已有現成的方案呢?答案是肯定的。Beanstalk (https://github.com/beanstalkd/beanstalkd) 它基本上已經滿足以上需求

設計目的

  1. 消費行爲 at least

  2. 高可用

  3. 實時性

  4. 支持消息刪除

一次說說上述這些目的的設計方向:

消費行爲

這個概念取自 mq 。mq 中提供了消費投遞的幾個方向:

exactly once 儘可能是 producer + consumer 兩端都保證。當 producer 沒辦法保證是,那 consumer 需要在消費前做一個去重,達到消費過一次不會重複消費,這個在延遲隊列內部直接保證。

最簡單:使用 redis 的 setNX 達到 job id 的唯一消費

高可用

支持多實例部署。掛掉一個實例後,還有後備實例繼續提供服務。

這個對外提供的 API 使用 cluster 模型,內部將多個 node 封裝起來,多個 node 之間冗餘存儲。

爲什麼不使用 Kafka?

考慮過類似基於 kafka/rocketmq 等消息隊列作爲存儲的方案,最後從存儲設計模型放棄了這類選擇。

舉個例子,假設以 Kafka 這種消息隊列存儲來實現延時功能,每個隊列的時間都需要創建一個單獨的 topic(如: Q1-1s, Q1-2s..)。這種設計在延時時間比較固定的場景下問題不太大,但如果是延時時間變化比較大會導致 topic 數目過多,會把磁盤從順序讀寫會變成隨機讀寫從導致性能衰減,同時也會帶來其他類似重啓或者恢復時間過長的問題。

  1. topic 過多 → 存儲壓力

  2. topic 存儲的是現實時間,在調度時對不同時間 (topic) 的讀取,順序讀 → 隨機讀

  3. 同理,寫入的時候順序寫 → 隨機寫

架構設計

API 設計

producer

  1. producer.At(msg []byte, at time.Time)

  2. producer.Delay(body []byte, delay time.Duration)

  3. producer.Revoke(ids string)

consumer

  1. consumer.Consume(consume handler)

使用延時隊列後,服務整體結構如下,以及隊列中 job 的狀態變遷:

  1. service → producer.At(msg []byte, at time.Time) → 插入延時 job 到 tube 中

  2. 定時觸發 → job 狀態更新爲 ready

  3. consumer 獲取到 ready job → 取出 job,開始消費;並更改狀態爲 reserved

  4. 執行傳入 consumer 中的 handler 邏輯處理函數

生產實踐

主要介紹一下在日常開發,我們使用到延時隊列的哪些具體功能。

生產端

  1. 開發中生產延時任務,只需確定任務執行時間

  2. 傳入 At()  producer.At(msg []byte, at time.Time)

  3. 內部會自行計算時間差值,插入 tube

  4. 如果出現任務時間的修改,以及任務內容的修改

  5. 在生產時可能需要額外建立一個 logic_id → job_id 的關係表

  6. 查詢到 job_id  → producer.Revoke(ids string) ,對其刪除,然後重新插入

消費端

首先,框架層面保證了消費行爲的 exactly once ,但是上層業務邏輯消費失敗或者是出現網絡問題,亦或者是各種各樣的問題,導致消費失敗,兜底交給業務開發做。這樣做的原因:

  1. 框架以及基礎組件只保證 job 狀態的流轉正確性

  2. 框架消費端只保證消費行爲的統一

  3. 延時任務在不同業務中行爲不統一

  4. 強調任務的必達性,則消費失敗時需要不斷重試直到任務成功

  5. 強調任務的準時性,則消費失敗時,對業務不敏感則可以選擇丟棄

這裏描述一下框架消費端是怎麼保證消費行爲的統一:

分爲 cluster 和 node。cluster

https://github.com/tal-tech/go-queue/blob/master/dq/consumer.go#L45

  1. cluster 內部將 consume handler 做了一層再封裝

  2. 對 consume body 做 hash,並使用此 hash 作爲 redis 去重的 key

  3. 如果存在,則不做處理,丟棄

node

https://github.com/tal-tech/go-queue/blob/master/dq/consumernode.go#L36

  1. 消費 node 獲取到 ready job;先執行 Reserve(TTR),預訂此 job,將執行該 job 進行邏輯處理

  2. 在 node 中 delete(job);然後再進行消費

  3. 如果失敗,則上拋給業務層,做相應的兜底重試

所以對於消費端,開發者需要自己實現消費的冪等性。

項目地址

go-queue 是基於 go-zero 實現的,go-zero 在 github 上 Used by 有 300+,開源一年獲得 11k+ stars.

歡迎使用並 star 支持我們!

微信交流羣

關注『微服務實踐』公衆號並點擊 交流羣 獲取社區羣二維碼。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/jcfCe2gBa3_F6T_7Cpk9jg