一種低延遲的超時中心實現方式

一  背景

在很多產品中都存在生命週期相關的設計,時間節點到了之後需要做對應的事情。

超時中心(TimeOutCenter,TOC)負責存儲和調度生命週期節點上面的超時任務,當超時任務設置的超時時間到期後,超時中心需要立即調度處理這些超時任務。對於一些需要低延遲的超時場景,超時中心調度延遲會給產品帶來不可估量的影響。

因此本文提出一種低延遲的超時中心實現方式,首先介紹傳統的超時中心的實現方案,以及傳統方案中的缺點,然後介紹低延遲的方案,說明如何解決傳統方案中的延遲問題。

二  傳統高延遲方案

1  整體框架

傳統的超時中心整體框架如下所示,任務輸入後存儲在超時任務庫中,定時器觸發運行數據庫掃描器,數據庫掃描器從超時任務庫中掃描已經到達超時時間的任務,已經到達超時時間的任務存儲在機器的內存隊列中,等待交給業務處理器進行處理,業務處理器處理完成後更新任務狀態。

在大數據時代,超時任務數量肯定是很大的,傳統的超時中心通過分庫分表支持存儲海量的超時任務,定時器觸發也需要做相應的改變,需要充分利用集羣的能力,下面分別從超時任務庫和定時器觸發兩方面詳細介紹。

2  任務庫設計

任務庫數據模型如下所示,採用分庫分表存儲,一般可設計爲 8 個庫 1024 個表,具體可以根據業務需求調整。biz_id 爲分表鍵,job_id 爲全局唯一的任務 ID,status 爲超時任務的狀態,action_time 爲任務的執行時間,attribute 存儲額外的數據。只有當 action_time 小於當前時間且 status 爲待處理時,任務才能被掃描器加載到內存隊列。任務被處理完成後,任務的狀態被更新成已處理。

job_id                        bigint unsigned      超時任務的ID,全局唯一
gmt_create                    datetime             創建時間
gmt_modified                  datetime             修改時間
biz_id                        bigint unsigned      業務id,一般爲關聯的主訂單或子訂單id
biz_type                      bigint unsigned      業務類型
status                        tinyint              超時任務狀態(0待處理,2已處理,3取消)
action_time                   datetime             超時任務執行時間
attribute                     varchar              額外數據

3  定時調度設計

定時調度流程圖如下所示,定時器每間隔 10 秒觸發一次調度,從集羣 configserver 中獲取集羣 ip 列表併爲當前機器編號,然後給所有 ip 分配表。分配表時需要考慮好幾件事:一張表只屬於一臺機器,不會出現重複掃描;機器上線下線需要重新分配表。當前機器從所分配的表中掃描出所有狀態爲待處理的超時任務,遍歷掃描出的待處理超時任務。對於每個超時任務,當內存隊列不存在該任務且內存隊列未滿時,超時任務才加入內存隊列,否則循環檢查等待。

4  缺點

三  低延遲方案

1  整體框架

任務輸入後分爲兩個步驟。第一個步驟是將任務存儲到任務庫,本方案的任務庫模型設計和上面方案中的任務庫模型設計一樣;第二步驟是任務定時,將任務的 jobId 和 actionTime 以一定方式設置到 Redis 集羣中,當定時任務的超時時間到了之後,從 Redis 集羣 pop 超時任務的 jobId,根據 jobId 從任務庫中查詢詳細的任務信息交給業務處理器進行處理,最後更新任務庫中任務的狀態。

本方案與上述方案最大的不同點就是超時任務的獲取部分,上述方案採用定時調度掃描任務庫,本方案採用基於 Redis 的任務定時系統,接下來將具體講解任務定時的設計。

2  Redis 存儲設計

Topic 的設計

Topic 的定義有三部分組成,topic 表示主題名稱,slotAmount 表示消息存儲劃分的槽數量,topicType 表示消息的類型。主題名稱是一個 Topic 的唯一標示,相同主題名稱 Topic 的 slotAmount 和 topicType 一定是一樣的。消息存儲採用 Redis 的 Sorted Set 結構,爲了支持大量消息的堆積,需要把消息分散存儲到很多個槽中,slotAmount 表示該 Topic 消息存儲共使用的槽數量,槽數量一定需要是 2 的 n 次冪。在消息存儲的時候,採用對指定數據或者消息體哈希求餘得到槽位置。

StoreQueue 的設計

上圖中 topic 劃分了 8 個槽位,編號 0-7。計算消息體對應的 CRC32 值,CRC32 值對槽數量進行取模得到槽序號,SlotKey 設計爲#{topic}_#{index}(也即 Redis 的鍵),其中#{} 表示佔位符。

StoreQueue 結構採用 Redis 的 Sorted Set,Redis 的 Sorted Set 中的數據按照分數排序,實現定時消息的關鍵就在於如何利用分數、如何添加消息到 Sorted Set、如何從 Sorted Set 中彈出消息。定時消息將時間戳作爲分數,消費時每次彈出分數小於當前時間戳的一個消息。

PrepareQueue 的設計

爲了保障每條消息至少消費一次,消費者不是直接 pop 有序集合中的元素,而是將元素從 StoreQueue 移動到 PrepareQueue 並返回消息給消費者,等消費成功後再從 PrepareQueue 從刪除,或者消費失敗後從 PreapreQueue 重新移動到 StoreQueue,這便是根據二階段提交的思想實現的二階段消費。

在後面將會詳細介紹二階段消費的實現思路,這裏重點介紹下 PrepareQueue 的存儲設計。StoreQueue 中每一個 Slot 對應 PrepareQueue 中的 Slot,PrepareQueue 的 SlotKey 設計爲 prepare_{#{topic}#{index}}。PrepareQueue 採用 Sorted Set 作爲存儲,消息移動到 PrepareQueue 時刻對應的(秒級時間戳 * 1000 + 重試次數)作爲分數,字符串存儲的是消息體內容。這裏分數的設計與重試次數的設計密切相關,所以在重試次數設計章節詳細介紹。

PrepareQueue 的 SlotKey 設計中需要注意的一點,由於消息從 StoreQueue 移動到 PrepareQueue 是通過 Lua 腳本操作的,因此需要保證 Lua 腳本操作的 Slot 在同一個 Redis 節點上,如何保證 PrepareQueue 的 SlotKey 和對應的 StoreQueue 的 SlotKey 被 hash 到同一個 Redis 槽中呢。Redis 的 hash tag 功能可以指定 SlotKey 中只有某一部分參與計算 hash,這一部分採用 {} 包括,因此 PrepareQueue 的 SlotKey 中採用 {} 包括了 StoreQueue 的 SlotKey。

DeadQueue 的設計

消息重試消費 16 次後,消息將進入 DeadQueue。DeadQueue 的 SlotKey 設計爲 prepare{#{topic}#{index}},這裏同樣採用 hash tag 功能保證 DeadQueue 的 SlotKey 與對應 StoreQueue 的 SlotKey 存儲在同一 Redis 節點。

定時消息生產

生產者的任務就是將消息添加到 StoreQueue 中。首先,需要計算出消息添加到 Redis 的 SlotKey,如果發送方指定了消息的 slotBasis(否則採用 content 代替),則計算 slotBasis 的 CRC32 值,CRC32 值對槽數量進行取模得到槽序號,SlotKey 設計爲#{topic}_#{index},其中 #{} 表示佔位符。發送定時消息時需要設置 actionTime,actionTime 必須大於當前時間,表示消費時間戳,當前時間大於該消費時間戳的時候,消息纔會被消費。因此在存儲該類型消息的時候,採用 actionTime 作爲分數,採用命令 zadd 添加到 Redis。

超時消息消費

每臺機器將啓動多個 Woker 進行超時消息消費,Woker 即表示線程,定時消息被存儲到 Redis 的多個 Slot 中,因此需要 zookeeper 維護集羣中 Woker 與 slot 的關係,一個 Slot 只分配給一個 Woker 進行消費,一個 Woker 可以消費多個 Slot。Woker 與 Slot 的關係在每臺機器啓動與停止時重新分配,超時消息消費集羣監聽了 zookeeper 節點的變化。

Woker 與 Slot 關係確定後,Woker 則循環不斷地從 Redis 拉取訂閱的 Slot 中的超時消息。在 StoreQueue 存儲設計中說明了定時消息存儲時採用 Sorted Set 結構,採用定時時間 actionTime 作爲分數,因此定時消息按照時間大小存儲在 Sorted Set 中。因此在拉取超時消息進行只需採用 Redis 命令 ZRANGEBYSCORE 彈出分數小於當前時間戳的一條消息。

爲了保證系統的可用性,還需要考慮保證定時消息至少被消費一次以及消費的重試次數,下面將具體介紹如何保證至少消費一次和消費重試次數控制。

至少消費一次

至少消費一次的問題比較類似銀行轉賬問題,A 向 B 賬戶轉賬 100 元,如何保障 A 賬戶扣減 100 同時 B 賬戶增加 100,因此我們可以想到二階段提交的思想。第一個準備階段,A、B 分別進行資源凍結並持久化 undo 和 redo 日誌,A、B 分別告訴協調者已經準備好;第二個提交階段,協調者告訴 A、B 進行提交,A、B 分別提交事務。本方案基於二階段提交的思想來實現至少消費一次。

Redis 存儲設計中 PrepareQueue 的作用就是用來凍結資源並記錄事務日誌,消費者端即是參與者也是協調者。第一個準備階段,消費者端通過執行 Lua 腳本從 StoreQueue 中 Pop 消息並存儲到 PrepareQueue,同時消息傳輸到消費者端,消費者端消費該消息;第二個提交階段,消費者端根據消費結果是否成功協調消息隊列服務是提交還是回滾,如果消費成功則提交事務,該消息從 PrepareQueue 中刪除,如果消費失敗則回滾事務,消費者端將該消息從 PrepareQueue 移動到 StoreQueue,如果因爲各種異常導致 PrepareQueue 中消息滯留超時,超時後將自動執行回滾操作。二階段消費的流程圖如下所示。

消費重試次數控制

採用二階段消費方式,需要將消息在 StoreQueue 和 PrepareQueue 之間移動,如何實現重試次數控制呢,其關鍵在 StoreQueue 和 PrepareQueue 的分數設計。

PrepareQueue 的分數需要與時間相關,正常情況下,消費者不管消費失敗還是消費成功,都會從 PrepareQueue 刪除消息,當消費者系統發生異常或者宕機的時候,消息就無法從 PrepareQueue 中刪除,我們也不知道消費者是否消費成功,爲保障消息至少被消費一次,我們需要做到超時回滾,因此分數需要與消費時間相關。當 PrepareQueue 中的消息發生超時的時候,將消息從 PrepareQueue 移動到 StoreQueue。

因此 PrepareQueue 的分數設計爲:秒級時間戳 * 1000 + 重試次數。定時消息首次存儲到 StoreQueue 中的分數表示消費時間戳,如果消息消費失敗,消息從 PrepareQueue 回滾到 StoreQueue,定時消息存儲時的分數都表示剩餘重試次數,剩餘重試次數從 16 次不斷降低最後爲 0,消息進入死信隊列。消息在 StoreQueue 和 PrepareQueue 之間移動流程如下:

5  優點

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/RqErpEqwyT6T15ONtkmQVA