生產環境 MQ 集羣一個非常詭異的消費延遲排查
大家好,我是威哥,《RocketMQ 技術內幕》作者、RocketMQ 社區首席佈道師、極客時間《中間件核心技術與實戰》專欄作者、中通快遞基礎架構資深架構師,越努力越幸運,唯有堅持不懈,與大家共勉。
1、問題現象
某一天,項目組一個同事向我反饋,他們使用公司的數據同步產品將 MySQL 數據同步到 MQ 集羣,然後使用消費者將數據再同步到 ES,反饋數據同步延遲嚴重,但對應的消費組確沒有積壓,但最近最近幾分鐘的數據都沒有同步過來。
那問題來了,消費端沒有消費積壓,而且通過查看數據同步平臺該通過任務的同步狀態,同樣顯示沒有積壓,那是爲什麼呢?
遇到這個問題,我們應該冷靜下來,分析一下其大概的數據流向圖,梳理後如下圖所示:
通過初步的診斷,從數據同步產品查看 Binlog 同步無延遲、MQ 消費無積壓,那爲什麼最終 Es 集羣中的數據與 MySQL 有高達幾分鐘的延遲呢?
2、問題排查
根據上圖幾個關鍵組件數據同步延遲的檢測,基本就排除了數據同步組件、MQ 消費端本身消費的問題,問題的癥結應該就是數據同步組件成功將數據寫入到 MQ 集羣,並且 MQ 集羣返回了寫入成功,但消費端並沒有及時感知這個消息,也就是說消息雖然寫入到 MQ 集羣,但並沒有達到消費隊列。
因爲如果數據同步組件如果沒有寫入成功,則 MySQL Binlog 日誌就會出現延遲。但如果是 MQ 消費端的問題,則 MQ 平臺也會顯示消費組積壓。
那爲什麼消息服務器寫入成功,但消費組爲什麼感知不到呢?
首先爲了驗證上述結論是否正確,我還特意去看了一下主題的詳細信息:
查看主題的統計信息時發現當前系統的時間爲 19:01 分, 但主題最新的寫入時間纔是 18:50,兩者之間相差將近 10 分鐘。
備註:上述界面是我們公司內部的消息運營管理平臺,其實底層是調用了 RocketMQ 提供的 topicStatus 命令。
那這又是怎麼造成的呢?
在這裏我假設大家對 RocketMQ 底層的實現原理還不是特別熟悉,在這樣的情況下,我覺得我們應該首先摸清楚 topicStatus 這個命令返回的 minOffset、maxOffset 以及 lastUpdate 這些是的具體獲取邏輯,只有瞭解了這些,我們才能尋根究底,最終找到解決辦法。
2.1 問題探究與原理剖析
在這個場景中,我們可以通過對 topicStatus 命令進行解析,從而探究其背後的實現原理。
當我們在命令行中輸入 sh ./mqadmin topicStatus 命令時,最終是調用 defaultMQAdminExtImpl 的 examineTopicStats 方法,最終在服務端的處理邏輯定義在 AdminBrokerProcessor 的 getTopicStatsInfo 方法中,核心代碼如下:
這裏的實現要點:
-
通過 MessageStore 的 getMinOffsetInQueue 獲取最小偏移量。
-
通過 MessageStore 的 getMaxOffsetInQueue 獲取最大偏移量。
-
最新更新時間爲最大偏移量減去一**(表示最新一條消息)的存儲時間**
故要弄清隊列最大、最小偏移量,關鍵是要看懂 getMaxOffsetInQueue 或者 getMinOffsetInQueue 的計算邏輯。
我也注意到分析源碼雖然能直抵真相,但閱讀起來太粗糙,所以我接下來的文章會盡量避免通篇的源碼解讀,取而代之的是隻點出源碼的入口處,其旁支細節將通過時序圖獲流程圖,方便感興趣的讀者朋友去探究,我重點進行知識點的提煉,降低大家的學習成本。
如果大家想成體系的研究 RocketMQ,想將消息中間件當成自己職業的閃光點,強烈建議購買我的兩本關於 RocketMQ 的數據:《RocketMQ 技術內幕》與《RocketMQ 實戰》。
MessageStore 的 getMaxOffsetInQueue 的時序圖如下所示:
從上述時序圖我們可以得知,調用 DefaultMessageStore 的 getMaxOffsetInQueue 方法,首先是根據主題、隊列 ID 獲取 ConsumeQueue 對象 (在 RocketMQ 中一個主題的一個隊列會對應一個 ConsumeQueue,代表一個消費隊列),也就是這裏獲取的偏移量指的是消費者隊列中的偏移量,而不是 Commitlog 文件的偏移量。
如果是找最大偏移量,就從該隊列中的找到最後一個文件,去獲取器最大的有效偏移量,也就是等於文件的起始偏移量 (fileFromOffset) 加上該文件當前最大可讀的偏移量(readPosition),故引起這張時序圖一個非常關鍵的點,就是如何獲取消費隊列最大的可讀偏移量,代碼見 MappedFile 的 getReadPosition:
public int getReadPosition() {
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}
由於 ConsumeQueue 並沒有 transientStorePoolEnable 機制,數據直接寫入到 FlieChannel 中,故這裏的 writeBuffer 爲空,取的是 wrotePosition 的值,那 ConsumeQueue 文件的 wrotePosition 值在什麼地方更新呢?
這個可以通過查看 MappedFile 中修改 wrotePosition 的方法 appendMessage 方法的調用,如下圖所示:
與 ConsumeQueue 對應的入口主要有兩個:
-
ReputMessageService#doReput Commitlog 異步轉發線程, 通過該線程異步構建 Consumequeue、Index 等文件
-
Commitlog#recoverAbnormally RocketMQ 啓動時根據 Commitlog 文件自動恢復 Consumequeue 文件
今天的主角當然不讓非 ReputMessageService 莫屬**,這裏先和大家普及一下一個最基本的知識:RocketMQ 爲了追求極致的順序寫,會將所有主題的消息順序寫入到一個文件 (Commitlog 文件),然後異步轉發到 ConsumeQueue(消費隊列文件)、IndexFile(索引文件)。**
其轉發服務就是通過 ReputMessageService 來實現的。
在深入介紹 Commitlog 文件的轉發機制之前,我在這裏先問大家一個問題:消息是寫入到內存就轉發給 ConsumeQueue,亦或是刷寫到磁盤後再轉發呢?
爲了方便大家對這個問題的探究,其代碼的核心入口如下圖所示:
這裏的關鍵實現要點如下:
-
判斷是否轉發關鍵條件在於 isCommitlogAvailable() 方法返回 true
-
根據轉發位點 reputFromOffset,從 Commitlog 文件中獲取消息的物理偏移量、消息大小,tags 等信息轉發到消息消費隊列、索引文件。
那 isCommitlogAvailable 的核心如下所示:
故轉發的關鍵就在於 Commitlog 的 maxOffset 的獲取邏輯了,其實現時序圖如下所示:
這裏核心重點是 getReadPosition 方法的實現,在 RocketMQ 寫 Commitlog 文件,爲了提升寫入性能,引入了內存級讀寫分離機制,具體的實現原理如下圖所示:
具體在實現層面,就是如果 transientStorePoolEnable=true,數據寫入到堆外內存 (writeBuffer) 中,然後再提交到 FileChannel, 提交的位置(commitedPosition 來表示)。
大家可以分別看一下改變 wrotePosition 與 committedPposition 的調用鏈。
其中 wrotePosition 的調用鏈如下所示:
可以得知:wrotePosition 是消息寫入到內存 (pagecache 或者堆外內存) 都會更新,但一旦開啓了堆外內存機制,並不會取該值,所以我們可以理解爲當消息寫入到 Pagecache 中時,就可以被轉發到消息消費隊列。
緊接着我們再看一下 committedPosition 的調用鏈,如下所示:
原來在 RocketMQ 中,如果開啓了 transientStorePoolEnable 機制,消息先寫入到堆外內存,然後就會向消息發送者返回發送成功,然後會有一個異步線程 (CommitRealTimeService)定時將消息 (默認 200ms 一次循環) 提交到 FileChannel,即更新 committedPosition 的值,消息就會轉發給消費隊列,從而消費者就可以進行消費。
2.2 問題原因提煉
經過上面的解析,問題應該有所眉目了。
由於我們公司爲了提高 RocketMQ 的資源利用率,提升 RocketMQ 的寫入性能,我們開啓了 transientStorePoolEnable 機制,消息發送端寫入到堆外內存,就會返回寫入成功,這樣 MySQL Binlog 數據同步並不會產生延遲,那這裏的問題,無非就 2 個:
-
CommitRealTimeService 線程並沒有及時將堆外內存中的數據提交到 FileChannel
-
ReputMessageService 線程沒有及時將數據轉發到消費隊列
由於目前我暫時對底層存儲寫入的原理還認識不夠深入,對相關係統採集指標不夠敏感,當時主要分析了一下線程棧,發現 ReputMessageService 線程一直在工作,推測可能是轉發不及時,這塊我還需要更加深入去研究,如果大家對這塊有其實理解,歡迎留言,我也會在後續工作中提升這塊的技能,更加深入去理解底層的原理。
也就是目前知道了問題的表象原因,雖然底層原理還未通透,但目前足以指導我們更好的處理問題:將集羣內消息寫入大的主題,遷移到其他負載較低的集羣,從而降低該集羣的寫入壓力,當遷移了幾個主題後,果不其然,消息到達消費隊列接近實時,集羣得以恢復。
溫馨提示:中通內部的消息運維平臺,可以隨時將主題遷移到其他集羣,而發送方、消費方應用無需重啓應用即可感知。開源地址:https://github.com/ZTO-Express/zms 歡迎大家 star
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/rRscaC4Qm706d9X7mcXSRQ