生產環境 MQ 集羣一個非常詭異的消費延遲排查

大家好,我是威哥,《RocketMQ 技術內幕》作者、RocketMQ 社區首席佈道師、極客時間《中間件核心技術與實戰》專欄作者、中通快遞基礎架構資深架構師,越努力越幸運,唯有堅持不懈,與大家共勉。

1、問題現象

某一天,項目組一個同事向我反饋,他們使用公司的數據同步產品將 MySQL 數據同步到 MQ 集羣,然後使用消費者將數據再同步到 ES,反饋數據同步延遲嚴重,但對應的消費組確沒有積壓,但最近最近幾分鐘的數據都沒有同步過來。

那問題來了,消費端沒有消費積壓,而且通過查看數據同步平臺該通過任務的同步狀態,同樣顯示沒有積壓,那是爲什麼呢?

遇到這個問題,我們應該冷靜下來,分析一下其大概的數據流向圖,梳理後如下圖所示:

通過初步的診斷,從數據同步產品查看 Binlog 同步無延遲、MQ 消費無積壓,那爲什麼最終 Es 集羣中的數據與 MySQL 有高達幾分鐘的延遲呢?

2、問題排查

根據上圖幾個關鍵組件數據同步延遲的檢測,基本就排除了數據同步組件、MQ 消費端本身消費的問題,問題的癥結應該就是數據同步組件成功將數據寫入到 MQ 集羣,並且 MQ 集羣返回了寫入成功,但消費端並沒有及時感知這個消息,也就是說消息雖然寫入到 MQ 集羣,但並沒有達到消費隊列。

因爲如果數據同步組件如果沒有寫入成功,則 MySQL Binlog 日誌就會出現延遲。但如果是 MQ 消費端的問題,則 MQ 平臺也會顯示消費組積壓。

那爲什麼消息服務器寫入成功,但消費組爲什麼感知不到呢?

首先爲了驗證上述結論是否正確,我還特意去看了一下主題的詳細信息:

查看主題的統計信息時發現當前系統的時間爲 19:01 分, 但主題最新的寫入時間纔是 18:50,兩者之間相差將近 10 分鐘。

備註:上述界面是我們公司內部的消息運營管理平臺,其實底層是調用了 RocketMQ 提供的 topicStatus 命令。

那這又是怎麼造成的呢?

在這裏我假設大家對 RocketMQ 底層的實現原理還不是特別熟悉,在這樣的情況下,我覺得我們應該首先摸清楚 topicStatus 這個命令返回的 minOffset、maxOffset 以及 lastUpdate 這些是的具體獲取邏輯,只有瞭解了這些,我們才能尋根究底,最終找到解決辦法。

2.1 問題探究與原理剖析

在這個場景中,我們可以通過對 topicStatus 命令進行解析,從而探究其背後的實現原理。

當我們在命令行中輸入 sh ./mqadmin topicStatus 命令時,最終是調用 defaultMQAdminExtImpl 的 examineTopicStats 方法,最終在服務端的處理邏輯定義在 AdminBrokerProcessor 的 getTopicStatsInfo 方法中,核心代碼如下:

這裏的實現要點:

故要弄清隊列最大、最小偏移量,關鍵是要看懂 getMaxOffsetInQueue 或者 getMinOffsetInQueue 的計算邏輯。

我也注意到分析源碼雖然能直抵真相,但閱讀起來太粗糙,所以我接下來的文章會盡量避免通篇的源碼解讀,取而代之的是隻點出源碼的入口處,其旁支細節將通過時序圖獲流程圖,方便感興趣的讀者朋友去探究,我重點進行知識點的提煉,降低大家的學習成本。

如果大家想成體系的研究 RocketMQ,想將消息中間件當成自己職業的閃光點,強烈建議購買我的兩本關於 RocketMQ 的數據:《RocketMQ 技術內幕》與《RocketMQ 實戰》。

MessageStore 的 getMaxOffsetInQueue 的時序圖如下所示:

從上述時序圖我們可以得知,調用 DefaultMessageStore 的 getMaxOffsetInQueue 方法,首先是根據主題、隊列 ID 獲取 ConsumeQueue 對象 (在 RocketMQ 中一個主題的一個隊列會對應一個 ConsumeQueue,代表一個消費隊列),也就是這裏獲取的偏移量指的是消費者隊列中的偏移量,而不是 Commitlog 文件的偏移量。

如果是找最大偏移量,就從該隊列中的找到最後一個文件,去獲取器最大的有效偏移量,也就是等於文件的起始偏移量 (fileFromOffset) 加上該文件當前最大可讀的偏移量(readPosition),故引起這張時序圖一個非常關鍵的點,就是如何獲取消費隊列最大的可讀偏移量,代碼見 MappedFile 的 getReadPosition:

public int getReadPosition() {
   return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}

由於 ConsumeQueue 並沒有 transientStorePoolEnable 機制,數據直接寫入到 FlieChannel 中,故這裏的 writeBuffer 爲空,取的是 wrotePosition 的值,那 ConsumeQueue 文件的 wrotePosition 值在什麼地方更新呢?

這個可以通過查看 MappedFile 中修改 wrotePosition 的方法 appendMessage 方法的調用,如下圖所示:

與 ConsumeQueue 對應的入口主要有兩個:

今天的主角當然不讓非 ReputMessageService 莫屬**,這裏先和大家普及一下一個最基本的知識:RocketMQ 爲了追求極致的順序寫,會將所有主題的消息順序寫入到一個文件 (Commitlog 文件),然後異步轉發到 ConsumeQueue(消費隊列文件)、IndexFile(索引文件)。**

其轉發服務就是通過 ReputMessageService 來實現的。

在深入介紹 Commitlog 文件的轉發機制之前,我在這裏先問大家一個問題:消息是寫入到內存就轉發給 ConsumeQueue,亦或是刷寫到磁盤後再轉發呢?

爲了方便大家對這個問題的探究,其代碼的核心入口如下圖所示:

這裏的關鍵實現要點如下:

那 isCommitlogAvailable 的核心如下所示:

故轉發的關鍵就在於 Commitlog 的 maxOffset 的獲取邏輯了,其實現時序圖如下所示:

這裏核心重點是 getReadPosition 方法的實現,在 RocketMQ 寫 Commitlog 文件,爲了提升寫入性能,引入了內存級讀寫分離機制,具體的實現原理如下圖所示:

具體在實現層面,就是如果 transientStorePoolEnable=true,數據寫入到堆外內存 (writeBuffer) 中,然後再提交到 FileChannel, 提交的位置(commitedPosition 來表示)。

大家可以分別看一下改變 wrotePosition 與 committedPposition 的調用鏈。

其中 wrotePosition 的調用鏈如下所示:

可以得知:wrotePosition 是消息寫入到內存 (pagecache 或者堆外內存) 都會更新,但一旦開啓了堆外內存機制,並不會取該值,所以我們可以理解爲當消息寫入到 Pagecache 中時,就可以被轉發到消息消費隊列。

緊接着我們再看一下 committedPosition 的調用鏈,如下所示:

原來在 RocketMQ 中,如果開啓了 transientStorePoolEnable 機制,消息先寫入到堆外內存,然後就會向消息發送者返回發送成功,然後會有一個異步線程 (CommitRealTimeService)定時將消息 (默認 200ms 一次循環) 提交到 FileChannel,即更新 committedPosition 的值,消息就會轉發給消費隊列,從而消費者就可以進行消費。

2.2 問題原因提煉

經過上面的解析,問題應該有所眉目了。

由於我們公司爲了提高 RocketMQ 的資源利用率,提升 RocketMQ 的寫入性能,我們開啓了 transientStorePoolEnable 機制,消息發送端寫入到堆外內存,就會返回寫入成功,這樣 MySQL Binlog 數據同步並不會產生延遲,那這裏的問題,無非就 2 個:

由於目前我暫時對底層存儲寫入的原理還認識不夠深入,對相關係統採集指標不夠敏感,當時主要分析了一下線程棧,發現 ReputMessageService 線程一直在工作,推測可能是轉發不及時,這塊我還需要更加深入去研究,如果大家對這塊有其實理解,歡迎留言,我也會在後續工作中提升這塊的技能,更加深入去理解底層的原理。

也就是目前知道了問題的表象原因,雖然底層原理還未通透,但目前足以指導我們更好的處理問題:將集羣內消息寫入大的主題,遷移到其他負載較低的集羣,從而降低該集羣的寫入壓力,當遷移了幾個主題後,果不其然,消息到達消費隊列接近實時,集羣得以恢復。

溫馨提示:中通內部的消息運維平臺,可以隨時將主題遷移到其他集羣,而發送方、消費方應用無需重啓應用即可感知。開源地址:https://github.com/ZTO-Express/zms 歡迎大家 star

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/rRscaC4Qm706d9X7mcXSRQ