RocketMQ 架構簡析

Apache RocketMQ 是阿里開源的一款高性能、高吞吐量的分佈式消息中間件。

-     整體架構    -

RocketMQ 主要由 Producer、Broker、Consumer 三部分組成,其中 Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。每個 Broker 可以存儲多個 Topic 的消息,每個 Topic 的消息也可以分片存儲於集羣中的不同的 Broker Group。

-     Namesrv    -

說道 Namesrv 首先會想到服務註冊與發現。分佈式服務 SOA 架構體系中會有服務註冊與發現中心。主要作用是指導服務調用方找到服務提供者提供的服務實例。RocketMQ 體系中 Namesrv 主要作用是:爲 producer 和 consumer 提供關於 topic 的路由信息。管理 broker 節點:監控更新 broker 的實時狀態。路由註冊、路由刪除(故障剔除)。

Namesrv 充當路由消息的提供者。Namesrv 是一個幾乎無狀態節點,多個 Namesrv 實例組成集羣,但相互獨立,沒有信息交換。

  1. 路由元信息
  1. 路由註冊  RocketMQ 路由註冊是通過 broker 與 Namesrv 的心跳功能實現的。broker 啓動時向集羣中所有 Namesrv 發送心跳包,之後每隔 30 秒向集羣中所有 Namesrv 發送心跳包。心跳包中包含:broker 集羣信息、broker 信息、topic 配置信息、broker 關聯的 FilterServer 列表等。如果 brokerA 爲 Master。並且 brokerA 上的 topic1 的配置信息發生變化或初次註冊,Namesrv 會根據報文創建或更新 Topic 路由元數據,填充 topicQueueTable。

  2. 路由刪除  Namesrv 收到 brokerA 的心跳包會更新 brokerLiveTable 中的 brokerA 對應的 BrokerLiveInfo 中的 lastUpdateTimestamp。Namesrv 每隔 10 秒掃描 brokerLiveTable 一次。如果 brokerA 對應的 BrokerLiveInfo 中 lastUpdateTimestamp 距當前時間超過 120 秒,Namesrv 認爲 brokerA 失效,會將 brokerA 的路由信息移除並關閉與 broker 的 socket 連接。更新:topicQueueInfo、brokerAddrTable、brokerLiveTable、filterServerTable 等。

  3. 路由發現  RocketMQ 路由發現是非實時的。當 Topic 路由信息發生變化是,Namesrv 不會主動推送給客戶端(Producer、Consumer)。而是由客戶端定時到 Namesrv 拉去最新的路由信息並緩存(包含 Topic 路由信息)。

與 kafka 對比
kafka 由 zookeeper 集羣提供命名服務(Naming Service)。
Kafka 通過 ZooKeeper 管理集羣配置、選舉 Leader 以及在 consumer g

-     Broker    -

消息中轉角色,負責存儲消息、轉發消息。代理服務器在 RocketMQ 系統中負責接收從生產者發送來的消息並存儲、同時爲消費者的拉取請求作準備。代理服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。

Broker 是以 group 爲單位提供服務。一個 group 裏面分 Master 和 Slave。Master 和 Slave 存儲的數據一樣,slave 從 master 同步數據(同步雙寫或異步複製看配置)。一個 Master 可以對應多個 Slave,一個 Slave 只能對應一個 Master。Master 與 Slave 的對應關係通過指定相同的 BrokerName、不同的 BrokerId 來定義,BrokerId 爲 0 表示 Master,非 0 表示 Slave。Master 也可以部署多個。broker 不必須是物理機或虛擬機:

每個 Broker 與 Namesrv 集羣中的所有節點建立長連接,定時發送心跳包到所有 Namesrv,更新 broker 信息、topic 路由信息等。一個 Topic 的不同 queue(分區)可分佈到集羣中不同的 broker group 上。

與 kafka 對比:
kafka 和 RocketMQ 的 broker 都可以容納多個一個或多個分區數據(kafka 分區:partition;RocketMQ 分區:queue)。
kafka 基於 partition(分區) 做備份 / 高可用(partition follower)。
RocketMQ 增加了 broker group 的概念,基於 broker(可能包含多個分區)。

-     Producer    -

(消息)生產者。Producer 與 Namesrv 集羣中的其中一個節點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,並向提供 Topic 服務的 broker master 建立長連接,且定時向 broker master 發送心跳。Producer 完全無狀態,可集羣部署。

Producer 負責生產消息,一般由業務系統負責生產消息。一個消息生產者會把業務應用系統裏產生的消息發送到 broker 服務器。RocketMQ 提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要 Broker 返回確認信息,單向發送不需要。

-     Consumer    -

(消息)消費者 Consumer 與 Namesrv 集羣中的其中一個節點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,並向提供 Topic 服務的 Master、Slave 建立長連接,且定時向 Master、Slave 發送心跳。Consumer 既可以從 Master 訂閱消息,也可以從 Slave 訂閱消息,訂閱規則由 Broker 配置決定。

Consumer 負責消費消息,一般是後臺系統負責異步消費。一個消息消費者會從 Broker 服務器拉取消息、並將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。

集羣模式下:相同 Consumer Group 的每個 Consumer 實例平均分攤消息。一個條消息僅能被一個 Consumer Group 消費一次。

Producer、Consumer都只需要和集羣中一個Namesrv建立長連接。Broker需要向集羣中所有的Namesrv發送心跳包。
其實很好理解:
Namesrv集羣提供高可用的命名服務。
Producer、Consumer只需要從其中一臺定期同步路由信息。
如果Broker只隨機調一臺發送心跳包。那麼不同的Namesrv保存的路由信息會出現

消費者類型:

  1. 拉取式消費(Pull Consumer) Consumer 消費的一種類型,應用通常主動調用 Consumer 的拉消息方法從 Broker 服務器拉消息、主動權由應用控制。一旦獲取了批量消息,應用就會啓動消費過程。Pull 方式裏,取消息的過程需要用戶自己寫(包括提交 offset 等操作)。

  2. 推動式消費(Push Consumer) Consumer 消費的一種類型,該模式下 Broker 收到數據後會主動推送給消費端,該消費模式一般實時性較高。Push Consumer 原理上也是採取 pull 模式。實際上就是長輪詢的 pull 模式。

-     一些概念    -

  1. 主題(Topic) 表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬於一個主題,是 RocketMQ 進行消息訂閱的基本單位。每個 topic 可分爲若干個分區(queue)。

  2. 生產者組(Producer Group) 同一類 Producer 的集合,這類 Producer 發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之後崩潰,則 Broker 服務器會聯繫同一生產者組的其他生產者實例以提交或回溯消費。

  3. 消費者組(Consumer Group) 同一類 Consumer 的集合,這類 Consumer 通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的 Topic。RocketMQ 支持兩種消息模式:集羣消費(Clustering)和廣播消費(Broadcasting)。

  4. 普通順序消息(Normal Ordered Message) 普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。

  5. 嚴格順序消息(Strictly Ordered Message) 嚴格順序消息模式下,消費者收到的所有消息均是有順序的。

  6. 消息(Message) 消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬於一個主題。RocketMQ 中每個消息擁有唯一的 Message ID,且可以攜帶具有業務標識的 Key。系統提供了通過 Message ID 和 Key 查詢消息的功能。

  7. 標籤(Tag) 爲消息設置的標誌,用於同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標籤。標籤能夠有效地保持代碼的清晰度和連貫性,並優化 RocketMQ 提供的查詢系統。消費者可以根據 Tag 實現對不同子主題的不同消費邏輯,實現更好的擴展性。

-     關於消息中間件    -

消息中間件需要解決的問題:異步化、削峯填谷。

消息中間件應具備的基礎能力是:消息發佈、訂閱、消費。概念相對簡單這裏不過多描述。

消息中間件的一些重要的機制:

1. 消息優先級(Message Priority;RocketMQ 不支持)

優先級是指在一個消息隊列中,每條消息都有不同的優先級,一般用整數來描述,優先級高的消息先投遞,如果消息完全在一個內存隊列中,那麼在投遞前可以按照優先級排序,令優先級高的先投遞。由於 RocketMQ 所有消息都是持久化的,所以如果按照優先級來排序,開銷會非常大,因此 RocketMQ 沒有特意支持消息優先級,但是可以通過變通的方式實現類似功能,即單獨配置一個優先級高的隊列,和一個普通優先級的隊列,將不同優先級發送到不同隊列即可。

2. 順序消息(Message Order)

消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了 3 條消息,分別是訂單創建,訂單付款,訂單完成。消費時,要按照這個順序消費纔能有意義。但是同時訂單之間是可以並行消費的。RocketMQ 可以嚴格的保證消息有序。

3. 高可用、消息可靠性

3.1 消息持久化

RocketMQ、Kafka 以文件記錄形式持久化。

RocketMQ 採用了單一的日誌文件,即把同 1 個 broker 上面所有 topic 的所有 queue 的消息,存放在一個文件裏面,從而避免了隨機的磁盤寫入。

如上圖所示,所有消息都存在一個單一的 CommitLog 文件裏面,然後有後臺線程異步的同步到 ConsumeQueue,再由 Consumer 進行消費。

TODO 同步、異步刷盤。

TODO RocketMQ 充分利用 Linux 文件系統內存 cache 來提高性能。TODO CommitLog index Commitlog segment 的大小與頁緩存一致。

RocketMQ 消息存儲機制會在後面的文章詳細說明。

3.2 broker master/salve

TODO broker group master/salve

TODO Async/Sync Master;

4. 高併發、可擴展 ==> 分佈式

提高併發效率 => 提高生產、消費並行度 => 提高分區數量。

RocketMQ、kafka 都支持 topic 數據分區存放、動態擴展。

以 RocketMQ 爲例:

topic 創建的時候可以用集羣模式去創建(這樣集羣裏面每個 broker 的 queue 的數量相同),也可以用單個 broker 模式去創建(這樣每個 broker 的 queue 數量可以不一致)。

4.1 生產並行度

RocketMQ 的生產並行度是由其自身機制及 broker 的數量決定的。這塊後面的文章會詳細分析。

4.2 消費並行度

廣播模式下所有消費者會接受並消費當前 topic 下所有 Queue 的消息。

集羣模式下,一個 queue 只分配給一個 consumer 實例:這是由於拉取消息是 consumer 主動控制的,如果多個實例同時消費一個 queue 的消息,會導致同一個消息在不同的實例下被消費多次,所以算法上都是一個 queue 只分給一個 consumer 實例,一個 consumer 實例可以允許同時分到不同的 queue。

Kafka 的消費並行度依賴 Topic 配置的分區數,如分區數爲 10,那麼最多 10 臺機器來並行消費(每臺機器只能開啓一個線程),或者一臺機器消費(10 個線程並行消費)。即消費並行度和分區數一致。RocketMQ 消費並行度分兩種情況:順序消費方式並行度同卡夫卡完全一致;亂序方式並行度取決於 Consumer 的線程數,如 Topic 配置 10 個隊列,10 臺機器消費,每臺機器 100 個線程,那麼並行度爲 1000。

4.3 消息隊列分配策略

Producer 使用 MessageQueueSelector 選擇將消息投放到哪個分區 使用 AllocateMessageQueueStrategy 將不同分區分配給 Consumer Group 中的不同 Consumer。一個分區(queue)僅允許分配給同一個 Consumer Group 下的一個 Consumer(防止重複消費)。

MessageQueueSelector

內置實現類:SelectMessageQueueByMachineRoom SelectMessageQueueByHash SelectMessageQueueByRandom

可以通過實現 MessageQueueSelector 接口,來自定義 Producer 投遞消息時選擇分區的算法。

AllocateMessageQueueStrategy

內置實現類:

AllocateMessageQueueAveragely:平均分配算法 

AllocateMessageQueueAveragelyByCircle:基於環形平均分配算法

AllocateMachineRoomNearby:基於機房臨近原則算法

AllocateMessageQueueByMachineRoom:基於機房分配算法

AllocateMessageQueueConsistentHash:基於一致性 hash 算法

AllocateMessageQueueByConfig:基於配置分配算法

可以通過實現 AllocateMessageQueueStrategy 來自定義 queue 分配給特定 Consumer Group 下不同 Consumer 的策略。

參考:

https://github.com/apache/rocketmq/blob/master/docs/cn/

https://juejin.im/post/6844903589819875336

https://jaskey.github.io/blog/2016/12/19/rocketmq-rebalance/

http://objcoding.com/2019/09/13/kafka-partition-and-rmq-queue/

http://www.itmuch.com/books/rocketmq

作者:RyanLee86799

來源:https://juejin.im/post/6844904130822029320

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Rbs2ZSm69MnmNU_q5pjFOA