萬字長文講透 RocketMQ 的消費邏輯
這篇文章,筆者梳理了 RocketMQ 的消費邏輯,希望對大家有所啓發。
1 架構概覽
在展開集羣消費邏輯細節前,我們先對 RocketMQ 4.9.X 架構做一個概覽。
整體架構中包含四種角色 :
1、NameServer
名字服務是是一個幾乎無狀態節點,可集羣部署,節點之間無任何信息同步。它是一個非常簡單的 Topic 路由註冊中心,其角色類似 Dubbo 中的 zookeeper ,支持 Broker 的動態註冊與發現。
2、BrokerServer
Broker 主要負責消息的存儲、投遞和查詢以及服務高可用保證 。
3、Producer
消息發佈的角色,Producer 通過 MQ 的負載均衡模塊選擇相應的 Broker 集羣隊列進行消息投遞,投遞的過程支持快速失敗並且低延遲。
4、Consumer
消息消費的角色,支持以 push 推,pull 拉兩種模式對消息進行消費。
RocketMQ 集羣工作流程:
1、啓動 NameServer,NameServer 起來後監聽端口,等待 Broker、Producer 、Consumer 連上來,相當於一個路由控制中心。
2、Broker 啓動,跟所有的 NameServer 保持長連接,定時發送心跳包。心跳包中包含當前 Broker 信息 (IP + 端口等) 以及存儲所有 Topic 信息。註冊成功後,NameServer 集羣中就有 Topic 跟 Broker 的映射關係。
3、收發消息前,先創建 Topic,創建 Topic 時需要指定該 Topic 要存儲在哪些 Broker 上,也可以在發送消息時自動創建 Topic。
4、Producer 發送消息,啓動時先跟 NameServer 集羣中的其中一臺建立長連接,並從 NameServer 中獲取當前發送的 Topic 存在哪些 Broker 上,輪詢從隊列列表中選擇一個隊列,然後與隊列所在的 Broker 建立長連接從而向 Broker 發消息。
5、Consumer 跟 Producer 類似,跟其中一臺 NameServer 建立長連接,獲取當前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連接通道,開始消費消息。
2 發佈訂閱
RocketMQ 的傳輸模型是:發佈訂閱模型 。
發佈訂閱模型具有如下特點:
-
消費獨立
相比隊列模型的匿名消費方式,發佈訂閱模型中消費方都會具備的身份,一般叫做訂閱組(訂閱關係),不同訂閱組之間相互獨立不會相互影響。
-
一對多通信
基於獨立身份的設計,同一個主題內的消息可以被多個訂閱組處理,每個訂閱組都可以拿到全量消息。因此發佈訂閱模型可以實現一對多通信。
RocketMQ 支持兩種消息模式:集羣消費( Clustering )和廣播消費( Broadcasting )。
集羣消費:同一 Topic 下的一條消息只會被同一消費組中的一個消費者消費。也就是說,消息被負載均衡到了同一個消費組的多個消費者實例上。
廣播消費:當使用廣播消費模式時,每條消息推送給集羣內所有的消費者,保證消息至少被每個消費者消費一次。
爲了實現這種發佈訂閱模型 , RocketMQ 精心設計了它的存儲模型。先進入 Broker 的文件存儲目錄。
RocketMQ 採用的是混合型的存儲結構。
1、Broker 單個實例下所有的隊列共用一個數據文件(commitlog)來存儲
生產者發送消息至 Broker 端,然後 Broker 端使用同步或者異步的方式對消息刷盤持久化,保存至 commitlog 文件中。只要消息被刷盤持久化至磁盤文件 commitlog 中,那麼生產者發送的消息就不會丟失。
單個文件大小默認 1G , 文件名長度爲 20 位,左邊補零,剩餘爲起始偏移量,比如 00000000000000000000 代表了第一個文件,起始偏移量爲 0 ,文件大小爲 1 G = 1073741824 。
這種設計有兩個優點:
-
充分利用順序寫,大大提升寫入數據的吞吐量;
-
快讀定位消息。
因爲消息是一條一條寫入到 commitlog 文件 ,寫入完成後,我們可以得到這條消息的物理偏移量。
每條消息的物理偏移量是唯一的, commitlog 文件名是遞增的,可以根據消息的物理偏移量通過二分查找,定位消息位於那個文件中,並獲取到消息實體數據。
2、Broker 端的後臺服務線程會不停地分發請求並異步構建 consumequeue(消費文件)和 indexfile(索引文件)
進入索引文件存儲目錄 :
1、消費文件按照主題存儲,每個主題下有不同的隊列,圖中主題 my-mac-topic 有 16 個隊列 (0 到 15) ;
2、每個隊列目錄下 ,存儲 consumequeue 文件,每個 consumequeue 文件也是順序寫入,數據格式見下圖。
每個 consumequeue 文件包含 30 萬個條目,每個條目大小是 20 個字節,每個文件的大小是 30 萬 * 20 = 60 萬字節,每個文件大小約 5.72M 。
和 commitlog 文件類似,consumequeue 文件的名稱也是以偏移量來命名的,可以通過消息的邏輯偏移量定位消息位於哪一個文件裏。
消費文件按照主題 - 隊列來保存 ,這種方式特別適配發佈訂閱模型。
消費者從 Broker 獲取訂閱消息數據時,不用遍歷整個 commitlog 文件,只需要根據邏輯偏移量從 consumequeue 文件查詢消息偏移量 , 最後通過定位到 commitlog 文件, 獲取真正的消息數據。
要實現發佈訂閱模型,還需要一個重要文件:消費進度文件。原因有兩點:
-
不同消費組之間相互獨立,不會相互影響 ;
-
消費者下次拉取數據時,需要知道從哪個進度開始拉取 ,就像我們小時候玩單機遊戲存盤一樣。
因此消費進度文件需要保存消費組所訂閱主題的消費進度。
我們瀏覽下集羣消費場景下的 Broker 端的消費進度文件 consumerOffset.json 。
在進度文件 consumerOffset.json 裏,數據以 key-value 的結構存儲,key 表示:主題 @消費者組 , value 是 consumequeue 中每個隊列對應的邏輯偏移量 。
寫到這裏,我們粗糙模擬下 RocketMQ 存儲模型如何滿足發佈訂閱模型(集羣模式) 。
1、發送消息:生產者發送消息到 Broker ;
2、保存消息:Broker 將消息存儲到 commitlog 文件 ,異步線程會構建消費文件 consumequeue ;
3、消費流程:消費者啓動後,會通過負載均衡分配對應的隊列,然後向 Broker 發送拉取消息請求。Broker 收到消費者拉取請求之後,根據訂閱組,消費者編號,主題,隊列名,邏輯偏移量等參數 ,從該主題下的 consumequeue 文件查詢消息消費條目,然後從 commitlog 文件中獲取消息實體。消費者在收到消息數據之後,執行消費監聽器,消費完消息;
4、保存進度:消費者將消費進度提交到 Broker ,Broker 會將該消費組的消費進度存儲在進度文件裏。
3 消費流程
我們重點講解下集羣消費的消費流程 ,因爲集羣消費是使用最普遍的消費模式,理解了集羣消費,廣播消費也就能順理成章的掌握了。
集羣消費示例代碼裏,啓動消費者,我們需要配置三個核心屬性:消費組名、訂閱主題、消息監聽器,最後調用 start 方法啓動。
消費者啓動後,我們可以將整個流程簡化成:
4 負載均衡
消費端的負載均衡是指將 Broker 端中多個隊列按照某種算法分配給同一個消費組中的不同消費者,負載均衡是客戶端開始消費的起點。
RocketMQ 負載均衡的核心設計理念是
-
消費隊列在同一時間只允許被同一消費組內的一個消費者消費
-
一個消費者能同時消費多個消息隊列
負載均衡是每個客戶端獨立進行計算,那麼何時觸發呢 ?
-
消費端啓動時,立即進行負載均衡;
-
消費端定時任務每隔 20 秒觸發負載均衡;
-
消費者上下線,Broker 端通知消費者觸發負載均衡。
負載均衡流程如下:
1、發送心跳
消費者啓動後,它就會通過定時任務不斷地向 RocketMQ 集羣中的所有 Broker 實例發送心跳包(消息消費分組名稱、訂閱關係集合、消息通信模式和客戶端實例編號等信息)。
Broker 端在收到消費者的心跳消息後,會將它維護在 ConsumerManager 的本地緩存變量 consumerTable,同時並將封裝後的客戶端網絡通道信息保存在本地緩存變量 channelInfoTable 中,爲之後做消費端的負載均衡提供可以依據的元數據信息。
2、啓動負載均衡服務
負載均衡服務會根據消費模式爲”廣播模式”還是 “集羣模式” 做不同的邏輯處理,這裏主要來看下集羣模式下的主要處理流程:
(1) 獲取該主題下的消息消費隊列集合;
(2) 查詢 Broker 端獲取該消費組下消費者 Id 列表;
(3) 先對 Topic 下的消息消費隊列、消費者 Id 排序,然後用消息隊列分配策略算法(默認爲:消息隊列的平均分配算法),計算出待拉取的消息隊列;
這裏的平均分配算法,類似於分頁的算法,將所有 MessageQueue 排好序類似於記錄,將所有消費端排好序類似頁數,並求出每一頁需要包含的平均 size 和每個頁面記錄的範圍 range ,最後遍歷整個 range 而計算出當前消費端應該分配到的記錄。
(4) 分配到的消息隊列集合與 processQueueTable 做一個過濾比對操作。
消費者實例內 ,processQueueTable 對象存儲着當前負載均衡的隊列 ,以及該隊列的處理隊列 processQueue (消費快照)。
-
標紅的 Entry 部分表示與分配到的消息隊列集合互不包含,則需要將這些紅色隊列 Dropped 屬性爲 true , 然後從 processQueueTable 對象中移除。
-
綠色的 Entry 部分表示與分配到的消息隊列集合的交集,processQueueTable 對象中已經存在該隊列。
-
黃色的 Entry 部分表示這些隊列需要添加到 processQueueTable 對象中,爲每個分配的新隊列創建一個消息拉取請求
pullRequest
, 在消息拉取請求中保存一個處理隊列processQueue
(隊列消費快照),內部是紅黑樹(TreeMap
),用來保存拉取到的消息。
最後創建拉取消息請求列表,並將請求分發到消息拉取服務,進入拉取消息環節。
5 長輪詢
在負載均衡這一小節,我們已經知道負載均衡觸發了拉取消息的流程。
消費者啓動的時候,會創建一個拉取消息服務 PullMessageService ,它是一個單線程的服務。
核心流程如下:
1、負載均衡服務將消息拉取請求放入到拉取請求隊列 pullRequestQueue , 拉取消息服務從隊列中獲取拉取消息請求 ;
2、拉取消息服務向 Brorker 服務發送拉取請求 ,拉取請求的通訊模式是異步回調模式 ;
消費者的拉取消息服務本身就是一個單線程,使用異步回調模式,發送拉取消息請求到 Broker 後,拉取消息線程並不會阻塞 ,可以繼續處理隊列 pullRequestQueue 中的其他拉取任務。
3、Broker 收到消費者拉取消息請求後,從存儲中查詢出消息數據,然後返回給消費者;
4、消費者的網絡通訊層會執行拉取回調函數相關邏輯,首先會將消息數據存儲在隊列消費快照 processQueue 裏;
消費快照使用紅黑樹 msgTreeMap 存儲拉取服務拉取到的消息 。
5、回調函數將消費請求提交到消息消費服務 ,而消息消費服務會異步的消費這些消息;
6、回調函數會將處理中隊列的拉取請放入到定時任務中;
7、定時任務再次將消息拉取請求放入到隊列 pullRequestQueue 中,形成了閉環:負載均衡後的隊列總會有任務執行拉取消息請求,不會中斷。
細心的同學肯定有疑問:既然消費端是拉取消息,爲什麼是長輪詢呢 ?
雖然拉模式的主動權在消費者這一側,但是缺點很明顯。
因爲消費者並不知曉 Broker 端什麼時候有新的消息 ,所以會不停地去 Broker 端拉取消息,但拉取頻率過高, Broker 端壓力就會很大,頻率過低則會導致消息延遲。
所以要想消費消息的延遲低,服務端的推送必不可少。
下圖展示了 RocketMQ 如何通過長輪詢減小拉取消息的延遲。
核心流程如下:
1、Broker 端接收到消費者的拉取消息請求後,拉取消息處理器開始處理請求,根據拉取請求查詢消息存儲 ;
2、從消息存儲中獲取消息數據 ,若存在新消息 ,則將消息數據通過網絡返回給消費者。若無新消息,則將拉取請求放入到拉取請求表 pullRequestTable 。
3、長輪詢請求管理服務 pullRequestHoldService 每隔 5 秒從拉取請求表中判斷拉取消息請求的隊列是否有新的消息。
判定標準是:拉取消息請求的偏移量是否小於當前消費隊列最大偏移量,如果條件成立則說明有新消息了。
若存在新的消息 , 長輪詢請求管理服務會觸發拉取消息處理器重新處理該拉取消息請求。
4、當 commitlog 中新增了新的消息,消息分發服務會構建消費文件和索引文件,並且會通知長輪詢請求管理服務,觸發拉取消息處理器重新處理該拉取消息請求。
6 消費消息
在拉取消息的流程裏, Broker 端返回消息數據,消費者的通訊框架層會執行回調函數。
回調線程會將數據存儲在隊列消費快照 processQueue(內部使用紅黑樹 msgTreeMap)裏,然後將消息提交到消費消息服務,消費消息服務會異步消費這些消息。
消息消費服務有兩種類型:併發消費服務和順序消費服務 。
6.1 併發消費
併發消費是指消費者將併發消費消息,消費的時候可能是無序的。
消費消息併發服務啓動後,會初始化三個組件:消費線程池、清理過期消息定時任務、處理失敗消息定時任務。
核心流程如下:
0、通訊框架回調線程會將數據存儲在消費快照裏,然後將消息列表 msgList 提交到消費消息服務
1、 消息列表 msgList 組裝成消費對象
2、將消費對象提交到消費線程池
我們看到 10 條消息被組裝成三個消費請求對象,不同的消費線程會執行不同的消費請求對象。
3、消費線程執行消息監聽器
執行完消費監聽器,會返回消費結果。
4、處理異常消息
當消費異常時,異常消息將重新發回 Broker 端的重試隊列( RocketMQ 會爲每個 topic 創建一個重試隊列,以 %RETRY% 開頭),達到重試時間後將消息投遞到重試隊列中進行消費重試。
我們將在重試機制這一節重點講解 RocketMQ 如何實現延遲消費功能 。
假如異常的消息發送到 Broker 端失敗,則重新將這些失敗消息通過處理失敗消息定時任務重新提交到消息消費服務。
5、更新本地消費進度
消費者消費一批消息完成之後,需要保存消費進度到進度管理器的本地內存。
首先我們會從隊列消費快照 processQueue 中移除消息,返回消費快照 msgTreeMap 第一個偏移量 ,然後調用消費消息進度管理器 offsetStore 更新消費進度。
待更新的偏移量是如何計算的呢?
-
場景 1:快照中 1001(消息 1)到 1010(消息 10)消費了,快照中沒有了消息,返回已消費的消息最大偏移量 + 1 也就是 1011。
-
場景 2:快照中 1001(消息 1)到 1008(消息 8)消費了,快照中只剩下兩條消息了,返回最小的偏移量 1009。
- 場景 3:1001(消息 1)在消費對象中因爲某種原因一直沒有被消費,即使後面的消息 1005-1010 都消費完成了,返回的最小偏移量是 1001。
在場景 3,RocketMQ 爲了保證消息肯定被消費成功,消費進度只能維持在 1001(消息 1),直到 1001 也被消費完,本地的消費進度纔會一下子更新到 1011。
假設 1001(消息 1)還沒有消費完成,消費者實例突然退出(機器斷電,或者被 kill ),就存在重複消費的風險。
因爲隊列的消費進度還是維持在 1001,當隊列重新被分配給新的消費者實例的時候,新的實例從 Broker 上拿到的消費進度還是維持在 1001,這時候就會又從 1001 開始消費,1001-1010 這批消息實際上已經被消費過還是會投遞一次。
所以業務必須要保證消息消費的冪等性。
寫到這裏,我們會有一個疑問:假設 1001(消息 1)因爲加鎖或者消費監聽器邏輯非常耗時,導致極長時間沒有消費完成,那麼消費進度就會一直卡住 ,怎麼解決呢 ?
RocketMQ 提供兩種方式一起配合解決:
-
拉取服務根據併發消費間隔配置限流
拉取消息服務在拉取消息時候,會判斷當前隊列的 processQueue 消費快照裏消息的最大偏移量 - 消息的最小偏移量大於消費併發間隔(2000)的時候 , 就會觸發流控 , 這樣就可以避免消費者無限循環的拉取新的消息。
-
清理過期消息
消費消息併發服務啓動後,會定期掃描所有消費的消息,若當前時間減去開始消費的時間大於消費超時時間,首先會將過期消息發送 sendMessageBack 命令發送到 Broker ,然後從快照中刪除該消息。
6.2 順序消費
順序消息是指對於一個指定的 Topic ,消息嚴格按照先進先出(FIFO)的原則進行消息發佈和消費,即先發布的消息先消費,後發佈的消息後消費。
順序消息分爲分區順序消息和全局順序消息。
1、分區順序消息
對於指定的一個 Topic ,所有消息根據 Sharding Key 進行區塊分區,同一個分區內的消息按照嚴格的先進先出(FIFO)原則進行發佈和消費。同一分區內的消息保證順序,不同分區之間的消息順序不做要求。
-
適用場景:適用於性能要求高,以 Sharding Key 作爲分區字段,在同一個區塊中嚴格地按照先進先出(FIFO)原則進行消息發佈和消費的場景。
-
示例:電商的訂單創建,以訂單 ID 作爲 Sharding Key ,那麼同一個訂單相關的創建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息都會按照發布的先後順序來消費。
2、全局順序消息
對於指定的一個 Topic ,所有消息按照嚴格的先入先出(FIFO)的順序來發布和消費。
-
適用場景:適用於性能要求不高,所有的消息嚴格按照 FIFO 原則來發布和消費的場景。
-
示例:在證券處理中,以人民幣兌換美元爲 Topic,在價格相同的情況下,先出價者優先處理,則可以按照 FIFO 的方式發佈和消費全局順序消息。
全局順序消息實際上是一種特殊的分區順序消息,即 Topic 中只有一個分區,因此全局順序和分區順序的實現原理相同。
因爲分區順序消息有多個分區,所以分區順序消息比全局順序消息的併發度和性能更高。
消息的順序需要由兩個階段保證:
-
消息發送
如上圖所示,A1、B1、A2、A3、B2、B3 是訂單 A 和訂單 B 的消息產生的順序,業務上要求同一訂單的消息保持順序,例如訂單 A 的消息發送和消費都按照 A1、A2、A3 的順序。
如果是普通消息,訂單 A 的消息可能會被輪詢發送到不同的隊列中,不同隊列的消息將無法保持順序,而順序消息發送時 RocketMQ 支持將 Sharding Key 相同(例如同一訂單號)的消息序路由到同一個隊列中。
下圖是生產者發送順序消息的封裝,原理是發送消息時,實現 MessageQueueSelector 接口, 根據 Sharding Key 使用 Hash 取模法來選擇待發送的隊列。
生產者順序發送消息封裝 -
消息消費
消費者消費消息時,需要保證單線程消費每個隊列的消息數據,從而實現消費順序和發佈順序的一致。
順序消費服務的類是 ConsumeMessageOrderlyService ,在負載均衡階段,併發消費和順序消費並沒有什麼大的差別。
最大的差別在於:順序消費會向 Borker 申請鎖 。消費者根據分配的隊列 messageQueue ,向 Borker 申請鎖 ,如果申請成功,則會拉取消息,如果失敗,則定時任務每隔 20 秒會重新嘗試。
順序消費核心流程如下:
1、 組裝成消費對象
2、 將請求對象提交到消費線程池
和併發消費不同的是,這裏的消費請求包含消費快照 processQueue ,消息隊列 messageQueue 兩個對象,並不對消息列表做任何處理。
3、 消費線程內,對消費隊列加鎖
順序消費也是通過線程池消費的,synchronized 鎖用來保證同一時刻對於同一個隊列只有一個線程去消費它
4、 從消費快照中取得待消費的消息列表
消費快照 processQueue 對象裏,創建了一個紅黑樹對象 consumingMsgOrderlyTreeMap 用於臨時存儲的待消費的消息。
5、 執行消息監聽器
消費快照的消費鎖 consumeLock 的作用是:防止負載均衡線程把當前消費的 MessageQueue 對象移除掉。
6、 處理消費結果
消費成功時,首先計算需要提交的偏移量,然後更新本地消費進度。
消費失敗時,分兩種場景:
-
假如已消費次數小於最大重試次數,則將對象 consumingMsgOrderlyTreeMap 中臨時存儲待消費的消息,重新加入到消費快照紅黑樹 msgTreeMap 中,然後使用定時任務嘗試重新消費。
-
假如已消費次數大於等於最大重試次數,則將失敗消息發送到 Broker ,Broker 接收到消息後,會加入到死信隊列裏 , 最後計算需要提交的偏移量,然後更新本地消費進度。
我們做一個關於順序消費的總結 :
-
順序消費需要由兩個階段消息發送和消息消費協同配合,底層支撐依靠的是 RocketMQ 的存儲模型;
-
順序消費服務啓動後,隊列的數據都會被消費者實例單線程的執行消費;
-
假如消費者擴容,消費者重啓,或者 Broker 宕機 ,順序消費也會有一定幾率較短時間內亂序,所以消費者的業務邏輯還是要保障冪等。
7 保存進度
RocketMQ 消費者消費完一批數據後, 會將隊列的進度保存在本地內存,但還需要將隊列的消費進度持久化。
1、 集羣模式
集羣模式下,分兩種場景:
-
拉取消息服務會在拉取消息時,攜帶該隊列的消費進度,提交給 Broker 的拉取消息處理器。
-
消費者定時任務,每隔 5 秒將本地緩存中的消費進度提交到 Broker 的消費者管理處理器。
Broker 的這兩個處理器都調用消費者進度管理器 consumerOffsetManager 的 commitOffset 方法,定時任務異步將消費進度持久化到消費進度文件 consumerOffset.json 中。
2、 廣播模式
廣播模式消費進度存儲在消費者本地,定時任務每隔 5 秒通過 LocalFileOffsetStore 持久化到本地文件offsets.json
,數據格式爲 MessageQueue:Offset
。
廣播模式下,消費進度和消費組沒有關係,本地文件 offsets.json
存儲在配置的目錄,文件中包含訂閱主題中所有的隊列以及隊列的消費進度。
8 重試機制
集羣消費下,重試機制的本質是 RocketMQ 的延遲消息功能。
消費消息失敗後,消費者實例會通過 CONSUMER_SEND_MSG_BACK 請求,將失敗消息發回到 Broker 端。
Broker 端會爲每個 topic 創建一個重試隊列 ,隊列名稱是:%RETRY% + 消費者組名 ,達到重試時間後將消息投遞到重試隊列中進行消費重試(消費者組會自動訂閱重試 Topic)。最多重試消費 16 次,重試的時間間隔逐漸變長,若達到最大重試次數後消息還沒有成功被消費,則消息將被投遞至死信隊列。
開源 RocketMQ 4.X 支持延遲消息,默認支持 18 個 level 的延遲消息,這是通過 broker 端的 messageDelayLevel 配置項確定的,如下:
Broker 在啓動時,內部會創建一個內部主題:SCHEDULE_TOPIC_XXXX,根據延遲 level 的個數,創建對應數量的隊列,也就是說 18 個 level 對應了 18 個隊列。
我們先梳理下延遲消息的實現機制。
1、生產者發送延遲消息
Message msg = new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//設置延遲level爲5,對應延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);
2、Broker 端存儲延遲消息
延遲消息在 RocketMQ Broker 端的流轉如下圖所示:
第一步:修改消息 Topic 名稱和隊列信息
Broker 端接收到生產者的寫入消息請求後,首先都會將消息寫到 commitlog 中。假如是正常非延遲消息,MessageStore 會根據消息中的 Topic 信息和隊列信息,將其轉發到目標 Topic 的指定隊列 consumequeue 中。
但由於消息一旦存儲到 consumequeue 中,消費者就能消費到,而延遲消息不能被立即消費,所以 RocketMQ 將 Topic 的名稱修改爲 SCHEDULE_TOPIC_XXXX,並根據延遲級別確定要投遞到哪個隊列下。
同時,還會將消息原來要發送到的目標 Topic 和隊列信息存儲到消息的屬性中。
第二步:構建 consumequeue 文件時,計算並存儲投遞時間
上圖是 consumequeue 文件一條消息的格式,最後 8 個字節存儲 Tag 的哈希值,此時存儲消息的投遞時間。
第三步:定時調度服務啓動
ScheduleMessageService 類是一個定時調度服務,讀取 SCHEDULE_TOPIC_XXXX 隊列的消息,並將消息投遞到目標 Topic 中。
定時調度服務啓動時,創建一個定時調度線程池 ,並根據延遲級別的個數,啓動對應數量的 HandlePutResultTask ,每個 HandlePutResultTask 負責一個延遲級別的消費與投遞。
第四步:投遞時間到了,將消息數據重新寫入到 commitlog
消息到期後,需要投遞到目標 Topic 。第一步已經記錄了原來的 Topic 和隊列信息,這裏需要重新設置,再存儲到 commitlog 中。
第五步:將消息投遞到目標 Topic 中
Broker 端的後臺服務線程會不停地分發請求並異步構建 consumequeue(消費文件)和 indexfile(索引文件)。因此消息會直接投遞到目標 Topic 的 consumequeue 中,之後消費者就可以消費到這條消息。
回顧了延遲消息的機制,消費消息失敗後,消費者實例會通過 CONSUMER_SEND_MSG_BACK 請求,將失敗消息發回到 Broker 端。
Broker 端 SendMessageProcessor 處理器會調用 asyncConsumerSendMsgBack 方法。
首先判斷消息的當前重試次數是否大於等於最大重試次數,如果達到最大重試次數,或者配置的重試級別小於 0,則重新創建 Topic ,規則是 %DLQ% + consumerGroup,後續處理消息發送到死信隊列。
正常的消息會進入 else 分支,對於首次重試的消息,默認的 delayLevel 是 0 ,RocketMQ 會將 delayLevel + 3,也就是加到 3 ,這就是說,如果沒有顯示的配置延時級別,消息消費重試首次,是延遲了第三個級別發起的重試,也就是距離首次發送 10s 後重試,其主題的默認規則是 %RETRY% + consumerGroup。
當延時級別設置完成,刷新消息的重試次數爲當前次數加 1 ,Broker 端將該消息刷盤,邏輯如下:
延遲消息寫入到 commitlog 裏 ,這裏其實和延遲消息機制的第一步類似,後面按照延遲消息機制的流程執行即可(第二步到第六步)。
9 總結
下圖展示了集羣模式下消費者併發消費流程 :
核心流程如下:
-
消費者啓動後,觸發負載均衡服務 ,負載均衡服務爲消費者實例分配對應的隊列 ;
-
分配完隊列後,負載均衡服務會爲每個分配的新隊列創建一個消息拉取請求
pullRequest
, 拉取請求保存一個處理隊列processQueue
,內部是紅黑樹(TreeMap
),用來保存拉取到的消息 ; -
拉取消息服務單線程從拉取請求隊列
pullRequestQueue
中彈出拉取消息,執行拉取任務 ,拉取請求是異步回調模式,將拉取到的消息放入到處理隊列; -
拉取請求在一次拉取消息完成之後會複用,重新被放入拉取請求隊列
pullRequestQueue
中 ; -
拉取完成後,調用消費消息服務
consumeMessageService
的submitConsumeRequest
方法 ,消費消息服務內部有一個消費線程池; -
消費線程池的消費線程從消費任務隊列中獲取消費請求,執行消費監聽器
listener.consumeMessage
; -
消費完成後,若消費成功,則更新偏移量
updateOffset
,先更新到內存offsetTable
,定時上報到 Broker ;若消費失敗,則將失敗消費發送到 Broker 。 -
Broker 端接收到請求後, 調用消費進度管理器的
commitOffset
方法修改內存的消費進度,定時刷盤到consumerOffset.json
。
RocketMQ 4.X 的消費邏輯有兩個非常明顯的特點:
-
客戶端代碼邏輯較重。假如要支持一種新的編程語言,那麼客戶端就必須實現完整的負載均衡邏輯,此外還需要實現拉消息、位點管理、消費失敗後將消息發回 Broker 重試等邏輯。這給多語言客戶端的支持造成很大的阻礙。
-
保證冪等非常重要。當客戶端升級或者下線時,或者 Broker 宕機,都要進行負載均衡操作,可能造成消息堆積,同時有一定幾率造成重複消費。
參考資料:
1、RocketMQ 4.9.4 Github 文檔
https://github.com/apache/rocketmq/tree/rocketmq-all-4.9.4/docs
2、RocketMQ 技術內幕
3、消息隊列核心知識點
https://mp.weixin.qq.com/s/v7_ih9X5mG3X4E4ecfgYXA
4、消息 ACK 機制及消費進度管理
https://zhuanlan.zhihu.com/p/25265380
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/mlqhXCHfhEht7je8n0rArA