RocketMQ 基礎概念剖析 - 源碼解析

Topic

Topic 是一類消息的集合,是一種邏輯上的分區。爲什麼說是邏輯分區呢?因爲最終數據是存儲到 Broker 上的,而且爲了滿足高可用,採用了分佈式的存儲。

這和 Kafka 中的實現如出一轍,Kafka 的 Topic 也是一種邏輯概念,每個 Topic 的數據會分成很多份,然後存儲在不同的 Broker 上,這個「份」叫 Partition。而在 RocketMQ 中,Topic 的數據也會分佈式的存儲,這個「份」叫 MessageQueue

其分佈可以用下圖來表示。

這樣一來,如果某個 Broker 所在的機器意外宕機,而且剛好 MessageQueue 中的數據還沒有持久化到磁盤,那麼該 Topic 下的這部分消息就會完全丟失。此時如果有備份的話,MQ 就可以繼續對外提供服務。

爲什麼還會出現沒有持久化到磁盤的情況呢?現在的 OS 當中,程序寫入數據到文件之後,並不會立馬寫入到磁盤,因爲磁盤 I/O 是非常耗時的操作,在計算機來看是非常慢的一種操作。所以寫入文件的數據會先寫入到 OS 自己的緩存中去,然後擇機異步的將 Buffer 中的數據刷入磁盤。

通過多副本冗餘的機制,使得 RocketMQ 具有了高可用的特性。除此之外,分佈式存儲能夠應對後期業務大量的數據存儲。如果不使用分佈式進行存儲,那麼隨着後期業務發展,消息量越來越大,單機是無論如何也滿足不了 RocketMQ 消息的存儲需求的。如果不做處理,那麼一臺機器的磁盤總有被塞滿的時候,此時的系統就不具備可伸縮的特性,也無法滿足業務的使用要求了。

但是這裏的可伸縮,和微服務中的服務可伸縮還不太一樣。因爲在微服務中,各個服務是無狀態的。而 Broker 是有狀態的,每個 Broker 上存儲的數據都不太一樣,因爲 Producer 在發送消息的時候會通過指定的算法,從 Message Queue 列表中選出一個 MessageQueue 發送消息。

如果不是很理解這個橫向擴展,那麼可以把它當成 Redis 的 Cluster,通過一致性哈希,選擇到 Redis Cluster 中的具體某個節點,然後將數據寫入 Redis Master 中去。如果此時想要擴容很方便,只需要往 Redis Cluster 中新增 Master 節點就好了。

所以,數據分佈式的存儲本質上是一種數據分片的機制。在此基礎上,通過冗餘多副本,達成了高可用。

Broker

Broker 可以理解爲我們微服務中的一個服務的某個實例,因爲微服務中我們的服務一般來說都會多實例部署,而 RocketMQ 也同理,多實例部署可以幫助系統扛住更多的流量,也從某種方面提高了系統的健壯性

在 RocketMQ4.5 之前,它使用主從架構,每一個 Master Broker 都有一個自己的 Slave Broker。

那 RocketMQ 的主從 Broker 是如何進行數據同步的呢?

Broker 啓動的時候,會啓動一個定時任務,定期的從 Master Broker 同步全量的數據。

這塊可以先不用糾結,後面我們會通過源碼來驗證這個主從同步邏輯。

上面提到了 Broker 會部署很多個實例,那麼既然多實例部署,那必然會存在一個問題,客戶端是如何得知自己是連接的哪個服務器?如何得知對應的 Broker 的 IP 地址和端口?如果某個 Broker 突然掛了怎麼辦?

NameServer

這就需要 NameServer 了,NameServer 是什麼?

這裏先拿 Spring Cloud 舉例子——Spring Cloud 中服務啓動的時候會將自己註冊到 Eureka 註冊中心上。當服務實例啓動的時候,會從 Eureka 拉取全量的註冊表,並且之後定期的從 Eureka 增量同步,並且每隔 30 秒發送心跳到 Eureka 去續約。如果 Eureka 檢測到某個服務超過了 90 秒沒有發送心跳,那麼就會該服務宕機,就會將其從註冊表中移除。

RocketMQ 中,NameServer 充當的也是類似的角色。兩者從功能上也有一定的區別

Broker 在啓動的時候會向 NameServer 註冊自己,並且每隔 30 秒向 NameServerv 發送心跳。如果某個 Broker 超過了 120 秒沒有發送心跳,那麼就會認爲該 Broker 宕機,就會將其從維護的信息中移除。這塊後面也會從源碼層面驗證。

當然 NameServer 不僅僅是存儲了各個 Broker 的 IP 地址和端口,還存儲了對應的 Topic 的路由數據。什麼是路由數據呢?那就是某個 Topic 下的哪個 Message Queue 在哪臺 Broker 上。

Producer

總體流程

接下來,我們來看看 Producer 發送一條消息到 Broker 的時候會做什麼事情,整體的流程如下。

檢查消息合法性

整體來看,其實是個很簡單的操作,跟我們平時寫代碼是一樣的,來請求了先校驗請求是否合法。Producer 啓動這裏會去校驗當前 Topic 數據的合法性。

都是些很常規的操作,和我們平時寫的 checker 都差不多。

獲取 Topic 的詳情

當通過了消息的合法性校驗之後,就需要繼續往下走。此時的關注點就應該從消息是否合法轉移到我要發消息給誰

此時就需要通過當前消息所屬的 Topic 拿到 Topic 的詳細數據。

獲取 Topic 的方法源碼在上面已經給出來了,首先會從內存中維護的一份 Map 中獲取數據。順帶一提,這裏的 Map 是 ConcurrentHashMap,是線程安全的,和 Golang 中的 Sync.Map 類似。

當然,首次發送的話,這個 Map 肯定是空的,此時會調用 NameServer 的接口,通過 Topic 去獲取詳情的 Topic 數據,此時會在上面的方法中將其加入到 Map 中去,這樣一來下次再往該 Topic 發送消息就能夠直接從內存中獲取。這裏就是簡單的實現的緩存機制

從方法名稱來看,是通過 Topic 獲取路由數據。實際上該方法,通過調用 NameServer 提供的 API,更新了兩部分數據,分別是:

而這兩部分數據都來源於同一個結構體 TopicRouteData。其結構如下。

通過源碼可以看到,就包含了該 Topic 下所有 Broker 下的 Message Queue 相關的數據、所有 Broker 的地址信息。

發送的具體 Queue

此時我們獲取到了需要發送到的 Broker 詳情,包括地址和 MessageQueue,那麼此時問題的關注點又該從「消息發送給誰」轉移到「消息具體發送到哪兒」。

什麼叫發送到哪兒?

開篇提到過一個 Topic 下會被分爲很多個 MessageQueue,「發送到哪兒」指的就是具體發送到哪一個 Message Queue 中去。

Message Queue 選擇機制

核心的選擇邏輯

還是先給出流程圖

核心邏輯,用大白話講就是將一個隨機數Message Queue 的容量取模。這個隨機數存儲在 Thread Local 中,首次計算的時候,會直接隨機一個數。

此後,都直接從 ThreadLocal 中取出該值,並且 + 1 返回,拿到了 MessageQueue 的數量和隨機數兩個關鍵的參數之後,就會執行最終的計算邏輯。

接下來,我們來看看選擇 Message Queue 的方法 SelectOneMessageQueue 都做了什麼操作吧。

可以看到,主邏輯被變量 sendLatencyFaultEnable 分爲了兩部分。

容錯機制下的選擇邏輯

該變量表意爲發送延遲故障。本質上是一種容錯的策略,在原有的 MessageQueue 選擇基礎上,再過濾掉不可用的 Broker,對之前失敗的 Broker,按一定的時間做退避。

可以看到,如果調用 Broker 信息發生了異常,那麼就會調用 updateFault 這個方法,來更新 Broker 的 Aviable 情況。注意這個參數 isolation 的值爲 true。接下來我們從源碼級別來驗證上面說的退避 3000ms 的事實。

可以看到,isolation 值是 true,則 duration 通過三元運算符計算出來結果爲 30000,也就是 30 秒。所以我們可以得出結論,如果發送消息拋出了異常,那麼直接會將該 Broker 設置爲 30 秒內不可用。

而如果只是發送延遲較高,則會根據如下的 map,根據延遲的具體時間,來判斷該設置多少時間的不可用。

例如,如果上次請求的 latency 超過 550ms,就退避 3000ms;超過 1000,就退避 60000;

正常情況下的選擇邏輯

而正常情況下,如果當前發送故障延遲沒有啓用,則會走常規邏輯,同樣的會去 for 循環計算,循環中取到了 MessageQueue 之後會去判斷是否和上次選擇的 MessageQueue 屬於同一個 Broker,如果是同一個 Broker,則會重新選擇,直到選擇到不屬於同一個 Broker 的 MessageQueue,或者直到循環結束。這也是爲了將消息均勻的分發存儲,防止數據傾斜

發送消息

選到了具體的 Message Queue 之後就會開始執行發送消息的邏輯,就會調用底層 Netty 的接口給發送出去,這塊暫時沒啥可看的。

Broker 的啓動流程

主從同步

在上面提到過,RocketMQ 有自己的主從同步,但是有兩個不同的版本,版本的分水嶺是在 4.5 版本。這兩個版本區別是什麼呢?

下圖是 Broker 啓動代碼中的源碼。

可以看到判斷了是否開啓了 Dleger,默認是不開啓的。所以就會執行其中的邏輯。

剛好我們就看到了,裏面有 Rocket 主從同步數據的相關代碼。

如果當前 Broker 節點的角色是 Slave,則會啓動一個週期性的定時任務,定期(也就是 10 秒)去 Master Broker 同步全量的數據。同步的數據包括:

註冊 Broker

完成了主動同步定時任務的啓動之後,就會去調用 registerBrokerAll 去註冊 Broker。可能這裏會有點疑問,我這裏是 Broker 啓動,只有當前一個 Broker 實例,那這個 All 是什麼意思呢?

All 是指所有的 NameServer,Broker 啓動的時候會將自己註冊到每一個 NameServer 上去。爲什麼不只註冊到一個 NameServer 就完事了呢?這樣一來還可以提高效率。歸根結底還是高可用的問題。

如果 Broker 只註冊到了一臺 NameServer 上,萬一這臺 NameServer 掛了呢?這個 Broker 對所有客戶端就都不可見了。實際上 Broker 還在正常的運行。

進到 registerBrokerAll 中去。

可以看到,這裏會判斷是否需要進行註冊。通過上面的截圖可以看到,此時 forceRegister 的值爲 true,而是否要註冊,決定權就交給了 needRegister

爲什麼需要判斷是否需要註冊呢?因爲 Broker 一旦註冊到了 NameServer 之後,由於 Producer 不停的在寫入數據,Consumer 也在不停的消費數據,Broker 也可能因爲故障導致某些 Topic 下的 Message Queue 等關鍵的路由信息發生變動。

這樣一來,NameServer 中的數據和 Broker 中的數據就會不一致

如何判斷是否需要註冊

大致的思路是,Broker 會從每一個 NameServer 中獲取到當前 Broker 的數據,並和當前 Broker 節點中的數據做對比。但凡有一臺 NameServer 數據和當前 Broker 不一致,都會進行註冊操作。

接下來,我們從源碼層面驗證這個邏輯。關鍵的邏輯我在圖中也標註了出來。

可以看到, 就是通過對比 Broker 中的數據版本和 NameServer 中的數據版本來實現的。這個版本,註冊的時候會寫到註冊的數據中存入 NameServer 中。

這裏由於是有多個,所以 RocketMQ 用線程池來實現了多線程操作,並且用 CountDownLatch 來等待所有的返回結果。經典的用空間換時間,Golang 裏面也有類似的操作,那就是 sync.waitGroup。

關於任何一個數據不匹配,都會進行重新註冊的事實,我們也從源碼層面來驗證一下。

可以看到,如果任何一臺 NameServer 的數據發生了 Change,都會 break,返回 true。

這裏的結果列表使用的是 CopyOnWriteList 來實現的。

因爲這裏是多線程去執行的判斷邏輯,而正常的列表不是線程安全的。CopyOnWriteArrayList 之所以是線程安全的,這歸功於 COW(Copy On Write),讀請求時共用同一個 List,涉及到寫請求時,會複製出一個 List,並在寫入數據的時候加入獨佔鎖。比起直接對所有操作加鎖,讀寫鎖的形式分離了讀、寫請求,使其互不影響,只對寫請求加鎖,降低了加鎖的次數、減少了加鎖的消耗,提升了整體操作的併發。

執行註冊邏輯

這塊就是構建數據,然後多線程併發的去發送請求,用 CopyOnWriteArrayList 來保存結果。不過,上面我們提到過,Broker 註冊的時候,會把數據版本發送到 NameServer 並且存儲起來,這塊我們可以看看發送到 NameServer 的數據結構。

可以看到,Topic 的數據分爲了兩部分,一部分是核心的邏輯,另一部分是 DataVersion,也就是我們剛剛一直提到的數據版本。

Broker 如何存儲數據

剛剛在聊 Producer 最後提到的是,發送消息到 Broker 就完了。不知道大家有沒有想過 Broker 是如何存儲消息的?

Commit log

先給出流程圖

然後給出結論,Producer 發送的消息是存儲在一種叫 commit log 的文件中的,Producer 端每次寫入的消息是不等長的,當該 CommitLog 文件寫入滿 1G,就會新建另一個新的 CommitLog,繼續寫入。此次採取的是順序寫入。

那麼問題來了,Consumer 來消費的時候,Broker 是如何快速找到對應的消息的呢?我們首先排除遍歷文件查找的方法, 因爲 RocketMQ 是以高吞吐高性能著稱的,肯定不可能採取這種對於很慢的操作。那 RocketMQ 是如何做的呢?

答案是 ConsumerQueue

ConsumerQueue

ConsumerQueue 是什麼?是文件。引入的目的是什麼呢?提高消費的性能

Broker 在收到一條消息的時候,寫入 Commit Log 的同時,還會將當前這條消息在 commit log 中的 offset消息的 size 和對應的 Tag 的 Hash 寫入到 consumer queue 文件中去。

每個 MessageQueue 都會有對應的 ConsumerQueue 文件存儲在磁盤上,每個 ConsumerQueue 文件包含了 30W 條消息,每條消息的 size 大小爲 20 字節,包含了 8 字節 CommitLog 的 Offset、4 字節的消息長度、8 字節的 Tag 的哈希值。這樣一來,每個 ConsumerQueue 的文件大小就約爲 5.72M。

當該 ConsumerQueue 文件寫滿了之後,就會再新建一個 ConsumerQueue 文件,繼續寫入。

所以,ConsumerQueue 文件可以看成是 CommitLog 文件的索引

負載均衡

什麼意思呢?假設我們總共有 6 個 MessageQueue,然後此時分佈在了 3 臺 Broker 上,每個 Broker 上包含了兩個 queue。此時 Consumer 有 3 臺,我們可以大致的認爲每個 Consumer 負責 2 個 MessageQueue 的消費。但是這裏有一個原則,那就是一個 MessageQueue 只能被一臺 Consumer 消費,而一臺 Consumer 可以消費多個 MessageQueue。

爲什麼?道理很簡單,RocketMQ 支持的順序消費,是指的分區順序性,也就是在單個 MessageQueue 中,消息是具有順序性的,而如果多臺 Consumer 去消費同一個 MessageQueue,就很難去保證順序消費了。

由於有很多個 Consumer 在消費多個 MessageQueue,所以爲了不出現數據傾斜,也爲了資源的合理分配利用,在 Producer 發送消息的時候,需要儘可能的將消息均勻的分發給多個 MessageQueue。

同時,上面那種一個 Consumer 消費了 2 個 MessageQueue 的情況,萬一這臺 Consumer 掛了呢?這兩個 MessageQueue 不就沒人消費了?

以上兩種情況分別是 Producer 端的負載均衡Consumer 端的負載均衡

Producer 端負載均衡

關於 Producer 端上面的負載均衡,上面的流程圖已經給了出來,並且給出了源碼的驗證。首先是容錯策略,會去避開一段時間有問題的 Broker,並且加上如果選擇了上次的 Broker,就會重新進行選擇。

Consumer 端負載均衡

首先 Consumer 端的負責均衡可以由兩個對象觸發:

Consumer 也會向所有的 Broker 發送心跳,將消息的消費組名稱訂閱關係集合消息的通信模式客戶端的 ID 等等。Broker 收到了 Consumer 的心跳之後,會將其存在 Broker 維護的一個 Manager 中,名字叫 ConsumerManager。當 Broker 監聽到了 Consumer 數量發生了變動,就會通知 Consumer 進行 Rebalance。

但是如果 Broker 通知 Consumer 進行 Rebalance 的消息丟了呢?這也就是爲什麼需要第 Consumer 自身進行觸發的原因。Consumer 會在啓動的時候啓動定時任務,週期性的執行 rebalance 操作。

默認是 20 秒執行一次。具體的代碼如下。

具體流程

首先,Consumer 的 Rebalance 會獲取到本地緩存的 Topic 的全部數據,然後向 Broker 發起請求,拉取該 Topic 和 ConsumerGroup 下的所有的消費者信息。此處的 Broker 數據來源就是 Consumer 之前的心跳發送過去的數據。然後會對 Topic 中 MessageQueue 和消費者 ID 進行排序,然後用消息隊列默認分配算法來進行分配,這裏的默認分配策略是平均分配

首先會均勻的按照類似分頁的思想,將 MessageQueue 分配給 Consumer,如果分配的不均勻,則會依次的將剩下的 MessageQueue 按照排序的順序,從上往下的分配。所以在這裏 Consumer 1 被分配到了 4 個 MessageQueue,而 Consumer 2 被分配到了 3 個 MessageQueue。

Rebalance 完了之後,會將結果和 Consumer 緩存的數據做對比,移除不在 ReBalance 結果中的 MessageQueue,將原本沒有的 MessageQueue 給新增到緩存中。

觸發時機

換一個角度來分析,其實就是兩個方面,一個是隊列信息發生了變化,另一種是消費者發生了變化

源碼驗證

然後給出核心的代碼驗證,獲取數據的邏輯如下

驗證了我們剛剛說的獲取了本地的 Topic 數據緩存,和從 Broker 端拉取所有的 ConsumerID。

接下來是驗證剛說的排序邏輯。

接下來是看判斷結果是否發生了變化的源碼。

可以看到,Consumer 通知 Broker 策略,其本質上就是發送心跳,將更新後的數據通過心跳發送給所有的 Broker。

Consumer 更多的細節

可能關於 Consumer,我們使用的更多一點。例如我們知道我們可以設置集羣消費和廣播消息,分別對應 RocketMQ 中的 CLUSTERING 和 _BROADCASTING**。_

再比如我們知道,我們可以設置順序消費和併發消費等等,接下來就讓我們用源碼來看看這些功能在 RocketMQ 中是怎麼實現的。

消費模型

在 Consumer 中,默認都是採用集羣消費,這塊在 Consumer 的代碼中也有體現。

而消費模式的不同,會影響到管理 offset 的具體實現。

可以看到,當消費模型是廣播模式時,Offset 的持久化管理會使用實現 LocalFileOffsetStorage

當消費模式是集羣消費時,則會使用 RemoteBrokerOffsetStore。

具體原因是什麼呢?首先我們得知道廣播模式和集羣模式的區別在哪兒:

所以在廣播模式下,每個 ConsumerGroup 的消費進度都不一樣,所以需要由 Consumer 自身來管理 Offset。而集羣模式下,同個 ConsumerGroup 下的消費進度其實是一樣的,所以可以交由 Broker 統一管理。

消費模式

消費模式則分爲順序消費和併發消費,分別對應實現 MessageListenerOrderly 和 MessageListenerConcurrently 兩種方式。

不同的消費方式會採取不同的底層實現,配置完成之後就會調用 start。

拉取消息

接下來我們來看一個跟我們最最相關的問題,那就是我們平時消費的消息到底是怎麼樣從 Broker 發到的 Consumer。在靠近啓動 Rebalance 的地方,Consumer 也開啓了一個定時拉取消息的線程。

這個線程做了什麼事呢?它會不停的從一個維護在內存中的 Queue 中獲取一個在寫入的時候就構建好的 PullRequest 對象,調用具體實現去不停的拉取消息了。

處理消費結果

在這裏是否開啓 AutoCommit,所做的處理差不了很多,大家也都知道,唯一區別就在於是否自動的提交 Offset。對於處理成功的邏輯也差不多,我們平時業務邏輯中可能也並不關心消費成功的消息。我們更多關注的是如果消費失敗了,RocketMQ 是怎麼處理的?

這是在 AutoCommit 下,如果消費失敗了的處理邏輯。會記錄一個失敗的 TPS,然後這裏有一個非常關鍵的邏輯,那就是 checkReconsumeTimes。

如果當前消息的重試次數,如果大於了最大的重試消費次數,就會把消費發回給 Broker。那最大重試次數是如何定義的。

如果值爲 - 1,那麼最大次數就是 MAX_VALUE,也就是 2147483647。這裏有點奇怪啊,按照我們平常的認知,難道不是重試 16 次嗎?然後就看到了很騷的一句註釋。

-1 means 16 times,這代碼確實有點,一言難盡。

然後,如果超過了最大的次數限制,就會將該消息調用 Prodcuer 的默認實現,將其發送到死信隊列中。當然,死信隊列也不是什麼特殊的存在,就是一個單獨的 Topic 而已。

通過 getRetryTopic 來獲取的,默認是給當前的 ConsumerGroup 名稱加上一個前綴。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/eiW3gweMe1s-aowAde1egg