一文總結 MetaQ-RocketMQ 原理

簡介—— 消息隊列中間件 MetaQ/RocketMQ

中間件 MetaQ 是一種基於隊列模型的消息中間件,MetaQ 據說最早是受 Kafka 的影響開發的,第一版的名字 "metamorphosis",是奧地利作家卡夫卡的名作——《變形記》。RocketMQ 是 MetaQ 的開源版本。

消息隊列中間件一般用於在分佈式場景下解決集羣單機瓶頸的問題。在傳統的分佈式計算環境中,常常會出現由於某個單機節點的性能瓶頸,即使其他節點仍有餘力,仍然會導致整個系統的性能無法進一步提升的情況,這一現象通常是由於任務負載不均衡,網絡延遲等常見且難以解決的問題。消息隊列本質上是提供了一種非常合理的任務分配策略,通過將任務分給消費者實現異步和分佈式處理,提高整個集羣的性能。

消息隊列(mq) 的核心思想是將耗時的任務異步化,通過消息隊列緩存任務,從而實現消息發送方和接收方的解耦,使得任務的處理能夠異步、並行,從而提高系統或集羣的吞吐量和可擴展性。在這個過程中,整個系統強依賴於消息隊列,起到類似橋樑的作用。消息隊列有着經典的三大應用場景:解耦、異步削峯填谷。

解耦場景: 消息隊列一般使用發佈 / 訂閱的模型,如果服務 B C D 依賴服務 A 的消息,此時新增服務 E 也需要依賴 A ,而 B 服務不再需要消息,需要頻繁且複雜的業務改造,效率低,穩定性差,此時引入消息隊列進行解耦,服務 A 只需要將產生的消息發佈到 mq 中,就不用管了,其它服務會自己根據需要訂閱 mq 中的消息,或者說去 mq 中消費,這就使得每個服務可以更多地關注自身業務,而不需要把精力用在維護服務之間的關係上,可擴展性提高。

異步場景: 如用戶的業務需要一系列的服務進行處理,按順序處理的話,用戶需要等待的時間過長。例如電商平臺的用戶下單、支付、積分、郵件、短信通知等流程,長時間等待用戶無法接受,就可以通過 mq 進行服務的異步處理,例如積分、郵件和短信通知服務訂閱了支付服務的消息,將支付完成作爲消息發佈到 mq ,這些服務就可以同時對這一訂單進行處理,降低了請求等待時間(rt) 。

削峯填穀場景: 削峯表示的含義是,流量如果太大,就控制服務器處理的 QPS,不要讓大流量打掛數據庫等導致服務器宕機,讓服務處理請求更加平緩,節省服務器資源,其本質上是控制用戶的請求速率,或是延緩或是直接拒絕。填谷的含義是將階段性的大流量請求緩存起來,在流量平緩的時候慢慢處理,防止過多的請求被拒絕後的重試導致更大的流量。mq 很適合這一場景,QPS 超出服務端接收請求的能力時,服務端仍然保持在安全範圍內地從消息隊列中獲取消息進行處理,多餘的消息會積壓在消息隊列中,或由於超時直接拒絕,到 QPS 低於這一閾值的時候,這些積壓的消息就會被逐漸消費掉。相當於在系統前修建了一個流量蓄水池。

除此之外還可以利用消息隊列進行消息通信,日誌處理等業務,但消息隊列也會引入系統可用性,系統複雜度,數據一致性等問題(強依賴消息隊列的正確執行,需要確保消息不會丟失,確保消息的順序性等)。這意味着如果系統中的消息隊列承擔着重要的角色,那麼消息隊列的可靠性和穩定性也至關重要,本文介紹的 MetaQ/RocketMQ 是側重於維持消息一致性和高可靠性的消息隊列中間件。

物理架構

MetaQ 的高可用性是基於其物理部署架構實現的,在生產者爲消息定義了一個 topic 之後,消費者可以訂閱這個 topic ,於是消息就有了從生產到消費的路由指向。



NameServer 負責暴露消息的 topic ,因此可以以將 NameServer 理解成一個註冊中心,用來關聯 topic 和對應的 broker ,即消息的存儲位置。NameServer 的每個節點都維護着 topic 和 broker 的映射關係,每個節點彼此獨立,無同步。在每個 NameServer 節點內部都維護着所有 Broker 的地址列表,所有 Topic 和 Topic 對應 Queue 的信息等。消息生產者在發送消息之前先與任意一臺 NameServer 建立連接,獲取 Broker 服務器的地址列表,然後根據負載均衡算法從列表中選擇一臺消息服務器發送消息。

Broker 主要負責消息的存儲和轉發,分爲 masterslave,是一寫多讀的關係。broker 節點可以按照處理的數據相同劃分成副本組,同一組 master 和 slave 的關係可以通過指定相同 brokerName,不同的 brokerId 來定義,brokerId 爲 0 標識 master,非 0 是 slave。每個 broker 服務器會與 NameServer 集羣建立長連接(注意是跟所有的 NameServer 服務器,因爲 NameServer 彼此之間獨立不同步),並且會註冊 topic 信息到 NameServer 中。複製策略是 Broker 的 Master 與 Slave 間的數據同步方式,分爲同步複製與異步複製。由於異步複製、異步刷盤可能會丟失少量消息,因此 Broker 默認採用的是同步雙寫的方式,消息寫入 master 成功後,master 會等待 slave 同步數據成功後才向 Producer 返回成功 ACK ,即 Master 與 Slave 都要寫入成功後纔會返回成功 ACK 。這樣可以保證消息發送時消息不丟失。副本組中,各個節點處理的速度不同,也就有了日誌水位的概念 (高水位對消費者不可見)。在 master 宕機時,同步副本集中的其餘節點會自動選舉出新的 master 代替工作(Raft 協議)。



Producer,消息生產者,與 NameServer 隨機一個節點建立長連接,定時從 NameServer 獲取 topic 路由信息,與 master broker 建立長連接,定時發送心跳,Producer 只與 master 建立連接產生通信,不與 slave 建立連接。生產者和消費者都有組(Group)的概念,同一組節點的生產 / 消費邏輯相同。

Consumer,消息消費者,與 NameServer 隨機一個節點建立長連接,定時從 NameServer 獲取 topic 的路由信息,並獲取想要消費的 queue 。可以和提供服務的 master 或 slave 建立長連接,定時向 master 和 slave 發送心跳,既可以從 master 訂閱消息,也可以從 slave 訂閱消息。



消息的存儲

MetaQ 將消息存儲(持久化)到位於生產者和消費者之間的一個消息代理(Message Broker)上。

MetaQ 消息模型:

消息的存儲方式對消息隊列的性能有很大影響,如 ActiveMQ 會使用隊列表來存儲消息,依靠輪訓、加鎖等方式檢查和處理消息,但對於 QPS 很高的系統來說,一下子積壓龐大的數據量在表中會導致 B+ 樹索引層級加深,影響查詢效率。KV 數據庫採用如 LSM 樹作爲索引結構,對讀性能有較大的犧牲,這對於消息隊列而言很難接受,因爲消息隊列常常需要面對消費失敗需要重試的情況。

基於這樣的存儲結構,MetaQ 對客戶端暴露的主要是 Consume queue 邏輯視圖,提供隊列訪問接口。消費者通過指定 Consume queue 的位點來讀取消息,通過提交 Consume queue 的位點來維護消費進度。Concume queue 每個條目長度固定(8 個字節 CommitLog 物理偏移量、4 字節消息長度、8 字節 tag 哈希碼),單個 ConsumeQueue 文件默認最多包括 30 萬個條目。這樣做的好處是隊列非常輕量級,Consume Queue 非常小,且在消費過程中都是順序讀取,其速度幾乎能與內存讀寫相比,而在 page cache 和良好的空間局部性作用下,CommitLog 的訪問也非常快速。



MetaQ 會啓動一個定時服務 ReputMessageService 定時調用(間隔 1ms)來生成 Consume queue 和 其它索引文件。

Consume queue 解決了順序消費的問題,但如果需要根據屬性進行篩選,就必須用到 index 索引。

index 索引支持根據 key 值進行篩選,查找時,可以根據消息的 key 計算 hash 槽的位置,hash 槽中存儲着 Index 條目的位置,可以根據這個 index 條目獲得一個鏈表(尾),每個 index 條目包含在 CommitLog 上的消息主體的物理偏移量。

消息鏈路

MetaQ 的消息可以根據 topic-queue 劃分出確定的從生產者到消費者路由指向。



1.producer 指定 broker 和 queue 發送消息 msg ;

2.broker 接收消息,並完成緩存、刷盤和生成摘要(同時根據 tag 和 user properties 對 msg 進行打標)等操作;

3.consumer 每隔一段時間( pullInterval )從 broker 端的(根據服務端消息過濾模式 tag 或 sql 過濾後)獲取一定量的消息到本地消息隊列中(單線程)

4.consumer 按照配置併發分配上述隊列消息並執行消費方法;

5.consumer 返回 broker 消費結果並重置消費位點;

生產者

Topic 是消息的主題,每個 topic 對應多個隊列,多個隊列會均勻的分佈在多個 broker 上,Producer 發送的消息在 broker 上會均衡的分佈在多個隊列中,Producer 發送消息時在多個隊列間輪詢確保消息的均衡。



發送消息的具體操作如下:

1、查詢本地緩存是否存儲了 TopicPublishInfo ,否則從 NameServer 獲取

2、根據負載均衡選擇策略獲取待發送隊列並輪訓訪問

3、獲取消息隊列對應的 broker 實際 IP

4、設置消息 Unique ID ,zip 壓縮消息

5、消息校驗(長度等),發送消息

Producer 發送的每條消息都包含一個 Topic,表示一類消息的集合。同時還有一個 Tag,用於區分同一 Topic 下不同類型的消息。一個 Topic 包括多個 Queue,每個 Queue 中存放該 Topic 對應消息的位置。一個 Topic 的 Queue 相當於該 Topic 中消息的分區,Queue 可以存儲在不同的 Broker 上。發送消息時,Producer 通過負載均衡模塊選擇相應的 Broker 集羣隊列進行消息投遞。

消息發送時如果出現失敗,默認會重試 2 次,在重試時會盡量避開剛剛接收失敗的 Broker,而是選擇其它 Broker 上的隊列進行發送,從而提高消息發送的成功率。

消費者

消費方式

MetaQ 消費者端有多套負載均衡算法的實現,比較常見的是平均分配和平均循環分配,默認使用平均分配算法,給每個 Consumer 分配均等的隊列。一個 Consumer 可以對應多個隊列,而一個隊列只能給一個 Consumer 進行消費,Consumer 和隊列之間是一對多的關係。

集羣模式下有一點需要注意:消費隊列負載機制遵循一個通用的思想,一個消息隊列同時只允許被一個消費者消費,一個消費者可以消費多個消費隊列。因此當 Consumer 的數量大於隊列的數量,會有部分 Consumer 分配不到隊列,這些分配不到隊列的 Consumer 機器不會有消息到達。

平均分配算法舉例:

如果消費集羣規模較大:例如 topic 隊列資源是 128 個,而消費機器數有 160 臺,按照一個隊列只會被一個消費集羣中一臺機器處理的原則,會有 32 臺機器不會收到消息,此種情況需要聯繫 MetaQ 人員進行擴容評估。

消費重試:當出現消費失敗的消息時,Broker 會爲每個消費者組設置一個重試隊列。當一條消息初次消費失敗,消息隊列會自動進行消費重試。達到最大重試次數後,若消費仍然失敗,此時會將該消息發送到死信隊列。對於死信消息,通常需要開發人員進行手動處理。

在消費時間過程中可能會遇到消息消費隊列增加和減少、消息消費者增加或減少,此時需要對消息消費隊列進行重新平衡,既重新分配 (rebalance),這就是所謂的重平衡機制。在 RocketMQ 中,每隔 20s 會根據當前隊列數量、消費者數量重新進行隊列負載計算,如果計算出來的結果與當前不一樣,則觸發消息消費隊列的重分配。

Consumer 啓動時會啓動定時器,還執行一些定時同步任務,包括:同步 nameServer 地址,從 nameServer 同步 topic 的路由信息,清理 offline 的 broker,並向所有 broker 發送心跳,分配給當前 consumer 的每個隊列將最新消費的 offset 同步給 broker。

消息消費過程淺析

三個關鍵服務: RebalanceService、PullMessageService、MessageConsumeService

RebalanceService 負載均衡服務

定時執行一次負載均衡(20 s)分配消息隊列給消費者。負載均衡針對每個 topic 獨立進行,具體如下:

private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);//廣播模式下每個消費者要消費所有 queue 的消息
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}",
                            consumerGroup,
                            topic,
                            mqSet,
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);//找到該topic下的消息隊列集合
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);//找到給消費者組下的所有消費者id
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }
                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);// 根據分配策略進行分配
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);// 更新處理隊列表
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

這裏主要做了幾件事:

i. 找到 topic 下的消息隊列(queue)集合

ii. 更新處理隊列表

i. 找到 topic 下的消息隊列集合

ii. 找到消費者組下所有消費者 id

iii. 根據分配策略進行分配

iv. 更新處理隊列表,開始真正拉取消息請求

消費者會將消費位點更新到 NameServer 上,Rebalance 發生時,讀取消費者的消費位點信息,需要注意在消費者數量大於隊列數量的情況下,如果消費者不及時更新消費位點信息,可能會導致消息被重複消費。因此,消費者需要及時更新消費位點信息,確保消費進度正確。

Consumer 創建的時候 Rebalance 會被執行。整個 rebalanceService 的作用就是不斷的通過負載均衡,重新分配隊列的過程。根據分配好的隊列構建拉取消息的請求,然後放到 pullRequestQueue 中。

PullMessageService 拉取消息服務

首先拉取消息時最重要的是確定偏移量 offset,這存儲在消費者端的 OffsetStore 對象中。

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
          this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
        } else {
          switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
              this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
              break;
            case CLUSTERING:
              this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
              break;
            default:
              break;
          }
}
this.offsetStore.load();

可以看到廣播模式和集羣模式的對象類型不同,這是因爲對 offset 的維護的方式不一樣,在 load 的時候 LocalFileOffsetStore 會從本地文件加載這個 offset,而 RemoteBrokerOffsetStore 的 load 函數是空的。

兩種對象類型分別有 readOffset 函數支持從內存中獲取 offset 值,以及分別從本地文件存儲和 broker 獲取 offset。需要注意集羣模式下消費者只需要關心 broker 上維護的消費進度,因爲不論 queue 和 消費者的映射關係如何切換, 只有 offset 之後的未消費消息是消費者需要關心的。



消息的拉取過程是一個不斷循環的生產者消費者模型,一個 PullRequest 就對應一個拉取任務,並和一對 MessageQueue(保存 Consume queue 的信息)和 ProcessQueue 關聯,消息拉取的過程中,PullMessageService 拉取線程不停的讀取 PullRequestQueue 根據 PullRequest 拉取消息。拉取到消息時,消息提交到 ProcessQueue 中並新建 ConsumeRequest 提交到 ConsumeService 處理, 然後生成下一批的 PullRequest 丟到 PullRequestQueue。如果沒有拉取到消息或出現異常,則會重新將請求放回拉取隊列。ProcessQueue 中以 TreeMap 形式保存待處理的消息, key 爲消息對應的 offset ,並自動進行排序。



消息拉取過程:

1.PullMessageService 不斷循環遍歷,從 PullRequestQueue 中提取 PullRequest,根據 nextOffset 去 broker 拉取消息,若該隊列 已經 dropped 則更新 offset 到 broker 並丟棄此拉消息請求。

2.PullMessageService 異步拉取消息,同時將 PullRequest 封裝在 PullCallback 中,PullCallback 封裝在 ResponseFuture 中,並以自增的請求 id 爲鍵,ResponseFuture 爲值放入 ResponseTable 中。

3.Broker 收到請求,如果 offset 之後有新的消息會立即發送異步響應;否則等待直到 producer 有新的消息發送後返回或者超時。如果通信異常或者 Broker 超時未返回響應,nettyClient 會定時清理超時的請求,釋放 PullRequest 回到 PullRequestQueue。

  1. 用最新的 offset 更新 ResponseFuture 裏的 PullRequest 並推送給 PullRequestQueue 裏以進行下一次拉取。批量拉取到的消息分批提交給 consumeExecutor 線程處理。

消費控速

MetaQ 爲消費者端拉取消息提供了消費控速的能力:

消息種類

普通消息

可選擇同步、異步或單向發送。同步:Producer 發出一條消息後,會在收到 MQ 返回的 ACK 之後再發送下一條消息。異步:Producer 發出消息後無需等待 MQ 返回 ACK ,直接發送下一條消息。單向: Producer 僅負責發送消息,不等待,MQ 也不返回 ACK。

順序消息

消息的順序性分爲兩種:

MetaQ 只支持同一個 queue 的順序消息,且同一個 queue 只能被一臺機器的一個線程消費,如果想要支持全局消息,那需要將該 topic 的 queue 的數量設置爲 1,犧牲了可用性。

消息事務



  1. 發送方向 MQ 服務端發送消息。

2.MQ Server 將消息持久化成功之後,向發送方 ACK 確認消息已經發送成功,此時消息爲半消息。

  1. 發送方開始執行本地事務邏輯。

  2. 發送方根據本地事務執行結果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到 Commit 狀態則將半消息標記爲可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態則刪除半消息,訂閱方將不會接受該消息。

  3. 在斷網或者是應用重啓的特殊情況下,上述步驟 4 提交的二次確認最終未到達 MQ Server,經過固定時間後 MQ Server 將對該消息發起消息回查。

  4. 發送方收到消息回查後,需要檢查對應消息的本地事務執行的最終結果。

  5. 發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,MQ Server 仍按照步驟 4 對半消息進行操作。

MetaQ 3.0 以後,新的版本提供更加豐富的功能,支持消息屬性、無序消息、延遲消息、廣播消息、長輪詢消費、高可用特性,這些功能基本上覆蓋了大部分應用對消息中間件的需求。除了功能豐富之外,MetaQ 基於順序寫,大概率順序讀的隊列存儲結構和 pull 模式的消費方式,使得 MetaQ 具備了最快的消息寫入速度和百億級的堆積能力,特別適合用來削峯填谷。在 MetaQ 3.0 版本的基礎上,衍生了開源版本 RocketMQ。

高可用

如何做到不重複消費也不丟失消息?

重複消費問題

MetaQ 不能保證消息不重複,因此對於重複消費情況,需要業務自定義唯一標識作爲冪等處理的依據。

消息丟失問題

MetaQ 避免消息丟失的機制主要包括:重試、冗餘消息存儲。在生產者的消息投遞失敗時,默認會重試兩次。消費者消費失敗時,在廣播模式下,消費失敗僅會返回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,而不會重試。在未指定順序消息的集羣模式下,消費失敗的消息會進入重試隊列自動重試,默認最大重試次數爲 16 。在順序消費的集羣模式下,消費失敗會使得當前隊列暫停消費,並重試到成功爲止。

主從同步

RocketMQ/MetaQ 爲每個存儲數據的 Broker 節點配置 ClusterName,BrokerName 標識來更好的進行資源管理。多個 BrokerName 相同的節點構成一個副本組。每個副本還擁有一個從 0 開始編號,不重複也不一定連續的 BrokerId 用來表示身份,編號爲 0 的節點是這個副本組的 Leader / Primary / Master,故障時通過選舉來重新對 Broker 編號標識新的身份。例如 BrokerId = {0, 1, 3},則 0 爲主,其他兩個爲備。

從模型的角度來看,RocketMQ /MetaQ 單節點上 Topic 數量較多,如果像 kafka 以 topic 粒度維護狀態機,節點宕機會導致上萬個狀態機切換,這種驚羣效應會帶來很多潛在風險,因此新版本的 RocketMQ/MetaQ 選擇以單個 Broker 作爲切換的最小粒度來管理,相比於其他更細粒度的實現,副本身份切換時只需要重分配 Broker 編號,對元數據節點壓力最小。由於通信的數據量少,可以加快主備切換的速度,單個副本下線的影響被限制在副本組內,減少管理和運維成本。這種實現也存在一些缺點,例如存儲節點的負載無法以最佳狀態在集羣上進行負載均衡。



RocketMQ/MetaQ 採用物理複製的方法,存儲層的 CommitLog 通過鏈表和內核的 MappedFile 機制抽象出一條 append only 的數據流。主副本將未提交的消息按序傳輸給其他副本(相當於 redo log),並根據一定規則計算確認位點(confirm offset)判斷日誌流是否被提交。最終一致性通過數據水位對齊的方式來實現(越近期的消息價值越高):

副本組的消息複製也支持同步和異步的模式。

KvyxBi

slave broker 會定時(60 s)從 master 同步信息

  public void syncAll() {
        this.syncTopicConfig();
        this.syncConsumerOffset();
        this.syncDelayOffset();
        this.syncSubscriptionGroupConfig();
        this.syncMessageRequestMode();
        if (brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
            this.syncTimerMetrics();
        }
    }
主從切換

RocketMQ 衍生出了很多不同的主從切換架構。

無切換架構

最早的時候,RocketMQ 基於 Master-Slave 模式提供了主備部署的架構,這種模式提供了一定的高可用能力,在 Master 節點負載較高情況下,讀流量可以被重定向到備機。由於沒有選主機制,在 Master 節點不可用時,這個副本組的消息發送將會完全中斷,還會出現延遲消息、事務消息等無法消費或者延遲。此外,備機在正常工作場景下資源使用率較低,造成一定的資源浪費。爲了解決這些問題,社區提出了在一個 Broker 進程內運行多個 BrokerContainer,這個設計類似於 Flink 的 slot,讓一個 Broker 進程上可以以 Container 的形式運行多個節點,複用傳輸層的連接,業務線程池等資源,通過單節點主備交叉部署來同時承擔多份流量,無外部依賴,自愈能力強。這種方式下隔離性弱於使用原生容器方式進行隔離,同時由於架構的複雜度增加導致了自愈流程較爲複雜。

切換架構

另一條演進路線則是基於可切換的,RocketMQ 也嘗試過依託於 Zookeeper 的分佈式鎖和通知機制進行 HA 狀態的管理。引入外部依賴的同時給架構帶來了複雜性,不容易做小型化部署,部署運維和診斷的成本較高。另一種方式就是基於 Raft 在集羣內自動選主,Raft 中的副本身份被透出和複用到 Broker Role 層面去除外部依賴,然而強一致的 Raft 版本並未支持靈活的降級策略,無法在 C(Consistency)和 A (Availability)之間靈活調整。兩種切換方案都是 CP 設計,犧牲高可用優先保證一致性。主副本下線時選主和路由定時更新策略導致整個故障轉移時間依然較長,Raft 本身對三副本的要求也會面臨較大的成本壓力。

RocketMQ DLedger 融合模式

RocketMQ DLedger (基於 Raft 的分佈式日誌存儲)融合模式是 RocketMQ 5.0 演進中結合上述兩條路線後的一個系統的解決方案。

M4A8VE

總結

相比較於 RocketMQ/MetaQ,Kafka 具有更高的吞吐量。Kafka 默認採用異步發送的機制,並且還擁有消息收集和批量發送的機制,這樣的設置可以顯著提高其吞吐量。由於 Kafka 的高吞吐量,因此通常被用於日誌採集、大數據等領域。

RocketMQ/MetaQ 不採用異步的方式發送消息。因爲當採用異步的方式發送消息時,Producer 發送的消息到達 Broker 就會返回成功。此時如果 Producer 宕機,而消息在 Broker 刷盤失敗時,就會導致消息丟失,從而降低系統的可靠性。

RocketMQ/MetaQ 單機可以支持更多的 topic 數量。因爲 Kafka 在 Broker 端是將一個分區存儲在一個文件中的,當 topic 增加時,分區的數量也會增加,就會產生過多的文件。當消息刷盤時,就會出現性能下降的情況。而 RocketMQ/MetaQ 是將所有消息順序寫入文件的,因此不會出現這種情況。

當 Kafka 單機的 topic 數量從幾十到幾百個時,就會出現吞吐量大幅度下降、load 增高、響應時間變長等現象。而 RocketMQ/MetaQ 的 topic 數量達到幾千,甚至上萬時,也只是會出現小幅度的性能下降。

綜上所述,Kafka 具有更高的吞吐量,適合應用於日誌採集、大數據等領域。而 RocketMQ/MetaQ 單機支持更多的 topic,且具有更高的可靠性(一致性支持),因此適用於淘寶這樣複雜的業務處理。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/EEkjBrVYQFwBiGQObrM_TQ