RocketMQ 高級使用
消息存儲
分佈式隊列因爲有高可靠性的要求,所以數據要進行持久化存儲。
-
消息生成者發送消息
-
MQ 收到消息,將消息進行持久化,在存儲中新增一條記錄
-
返回 ACK 給生產者
-
MQ push 消息給對應的消費者,然後等待消費者返回 ACK
-
如果消息消費者在指定時間內成功返回 ack,那麼 MQ 認爲消息消費成功,在存儲中刪除消息,即執行第 6 步;如果 MQ 在指定時間內沒有收到 ACK,則認爲消息消費失敗,會嘗試重新 push 消息, 重複執行 4、5、6 步驟
-
MQ 刪除消息
存儲介質
- 關係型數據庫 DB
Apache 下開源的另外一款 MQ—ActiveMQ(默認採用的 KahaDB 做消息存儲)可選用 JDBC 的方式來做消息持久化,通過簡單的 xml 配置信息即可實現 JDBC 消息存儲。由於,普通關係型數據庫(如 Mysql)在單表數據量達到千萬級別的情況下,其 IO 讀寫性能往往會出現瓶頸。在可靠性方面,該種方案非常依賴 DB,如果一旦 DB 出現故障,則 MQ 的消息就無法落盤存儲會導致線上故障
-
文件系統
目前業界較爲常用的幾款產品(RocketMQ/Kafka/RabbitMQ)均採用的是消息刷盤至所部署虛擬機 / 物理機的文件系統來做持久化(刷盤一般可以分爲異步刷盤和同步刷盤兩種模式)。消息刷盤爲消息存儲提供了一種高效率、高可靠性和高性能的數據持久化方式。除非部署 MQ 機器本身或是本地磁盤掛了,否則一般是不會出現無法持久化的故障問題。
性能對比
文件系統 > 關係型數據庫 DB
消息的存儲和發送
1)消息存儲
磁盤如果使用得當,磁盤的速度完全可以匹配上網絡 的數據傳輸速度。目前的高性能磁盤,順序寫速度可以達到 600MB/s, 超過了一般網卡的傳輸速度。但是磁盤隨機寫的速度只有大概 100KB/s,和順序寫的性能相差 6000 倍!因爲有如此巨大的速度差別,好的消息隊列系統會比普通的消息隊列系統速度快多個數量級。RocketMQ 的消息用順序寫, 保證了消息存儲的速度。
2)消息發送
Linux 操作系統分爲【用戶態】和【內核態】,文件操作、網絡操作需要涉及這兩種形態的切換,免不了進行數據複製。
一臺服務器 把本機磁盤文件的內容發送到客戶端,一般分爲兩個步驟:
-
read;讀取本地文件內容;
-
write;將讀取的內容通過網絡發送出去。
這兩個看似簡單的操作,實際進行了 4 次數據複製,分別是:
-
從磁盤複製數據到內核態內存;
-
從內核態內存復 制到用戶態內存;
-
然後從用戶態 內存複製到網絡驅動的內核態內存;
-
最後是從網絡驅動的內核態內存復 制到網卡中進行傳輸。
RocketMQ 充分利用了上述特性,也就是所謂的 “零拷貝” 技術,提高消息存盤和網絡發送的速度。
這裏需要注意的是,採用 MappedByteBuffer 這種內存映射的方式有幾個限制,其中之一是一次只能映射 1.5~2G 的文件至用戶態的虛擬內存,這也是爲何 RocketMQ 默認設置單個 CommitLog 日誌數據文件爲 1G 的原因了
消息存儲結構
RocketMQ 消息的存儲是由 ConsumeQueue 和 CommitLog 配合完成 的,消息真正的物理存儲文件是 CommitLog,ConsumeQueue 是消息的邏輯隊列,類似數據庫的索引文件,存儲的是指向物理存儲的地址。每 個 Topic 下的每個 Message Queue 都有一個對應的 ConsumeQueue 文件。
-
CommitLog:存儲消息的元數據
-
ConsumerQueue:存儲消息在 CommitLog 的索引
-
IndexFile:爲了消息查詢提供了一種通過 key 或時間區間來查詢消息的方法,這種通過 IndexFile 來查找消息的方法不影響發送與消費消息的主流程
刷盤機制
RocketMQ 的消息是存儲到磁盤上的,這樣既能保證斷電後恢復, 又可以讓存儲的消息量超出內存的限制。RocketMQ 爲了提高性能,會盡可能地保證磁盤的順序寫。消息在通過 Producer 寫入 RocketMQ 的時 候,有兩種寫磁盤方式,分佈式同步刷盤和異步刷盤。
1)同步刷盤
在返回寫成功狀態時,消息已經被寫入磁盤。具體流程是,消息寫入內存的 PAGECACHE 後,立刻通知刷盤線程刷盤, 然後等待刷盤完成,刷盤線程執行完成後喚醒等待的線程,返回消息寫 成功的狀態。
2)異步刷盤
在返回寫成功狀態時,消息可能只是被寫入了內存的 PAGECACHE,寫操作的返回快,吞吐量大;當內存裏的消息量積累到一定程度時,統一觸發寫磁盤動作,快速寫入。
3)配置
同步刷盤還是異步刷盤,都是通過 Broker 配置文件裏的 flushDiskType 參數設置的,這個參數被配置成 SYNC_FLUSH、ASYNC_FLUSH 中的 一個。
高可用性機制
RocketMQ 分佈式集羣是通過 Master 和 Slave 的配合達到高可用性的。
Master 和 Slave 的區別:在 Broker 的配置文件中,參數 brokerId 的值爲 0 表明這個 Broker 是 Master,大於 0 表明這個 Broker 是 Slave,同時 brokerRole 參數也會說明這個 Broker 是 Master 還是 Slave。
Master 角色的 Broker 支持讀和寫,Slave 角色的 Broker 僅支持讀,也就是 Producer 只能和 Master 角色的 Broker 連接寫入消息;Consumer 可以連接 Master 角色的 Broker,也可以連接 Slave 角色的 Broker 來讀取消息。
消息消費高可用
在 Consumer 的配置文件中,並不需要設置是從 Master 讀還是從 Slave 讀,當 Master 不可用或者繁忙的時候,Consumer 會被自動切換到從 Slave 讀。有了自動切換 Consumer 這種機制,當一個 Master 角色的機器出現故障後,Consumer 仍然可以從 Slave 讀取消息,不影響 Consumer 程序。這就達到了消費端的高可用性。
消息發送高可用
在創建 Topic 的時候,把 Topic 的多個 Message Queue 創建在多個 Broker 組上(相同 Broker 名稱,不同 brokerId 的機器組成一個 Broker 組),這樣當一個 Broker 組的 Master 不可 用後,其他組的 Master 仍然可用,Producer 仍然可以發送消息。 RocketMQ 目前還不支持把 Slave 自動轉成 Master,如果機器資源不足, 需要把 Slave 轉成 Master,則要手動停止 Slave 角色的 Broker,更改配置文 件,用新的配置文件啓動 Broker。
消息主從複製
如果一個 Broker 組有 Master 和 Slave,消息需要從 Master 複製到 Slave 上,有同步和異步兩種複製方式。
1)同步複製
同步複製方式是等 Master 和 Slave 均寫 成功後才反饋給客戶端寫成功狀態;
在同步複製方式下,如果 Master 出故障, Slave 上有全部的備份數據,容易恢復,但是同步複製會增大數據寫入 延遲,降低系統吞吐量。
2)異步複製
異步複製方式是隻要 Master 寫成功 即可反饋給客戶端寫成功狀態。
在異步複製方式下,系統擁有較低的延遲和較高的吞吐量,但是如果 Master 出了故障,有些數據因爲沒有被寫 入 Slave,有可能會丟失;
3)配置
同步複製和異步複製是通過 Broker 配置文件裏的 brokerRole 參數進行設置的,這個參數可以被設置成 ASYNC_MASTER、 SYNC_MASTER、SLAVE 三個值中的一個。
4)總結
實際應用中要結合業務場景,合理設置刷盤方式和主從複製方式, 尤其是 SYNC_FLUSH 方式,由於頻繁地觸發磁盤寫動作,會明顯降低 性能。通常情況下,應該把 Master 和 Save 配置成 ASYNC_FLUSH 的刷盤 方式,主從之間配置成 SYNC_MASTER 的複製方式,這樣即使有一臺 機器出故障,仍然能保證數據不丟,是個不錯的選擇。
集羣 Master 選舉機制
基於 raft 協議的過半寫入機制
在這裏考慮有三個 Broker 節點的情況,即 Broker01 作爲主節點,以及 Broker02 和 Broker03 作爲從節點。
當 Producer 將消息寫入 Broker01 主節點時,Broker01 只需將消息順利寫入 pageCache(頁高速緩存)即視爲寫入成功,該節點會記入寫入操作並在後臺進行異步持久化。
緊接着,基於 RAFT 協議,Broker01 節點會將消息同步至從節點 Broker02 和 Broker03,並將消息寫入他們的 pageCache。只要在 Broker02 和 Broker03 中有一個節點成功寫入,整個寫入操作即視爲成功。這是因爲在這三節點系統中,只需要有過半節點(即 2 個節點)寫入成功,整個系統即認定消息已成功寫入。
假設此時 Broker01 節點發生故障,系統會通過 Leader 選舉機制,讓 Broker02 或 Broker03 中的一個節點升級爲主節點,保證消息能繼續被寫入。此時只有兩個節點,只有當寫入消息的操作在這兩個節點中全部成功完成,才能被視爲成功。這同樣體現了 "過半寫入" 的原則。
基於 raft 協議的 Leader 選舉機制
這裏要討論 RocketMQ 中的領導選舉機制,類似於設計模式中的狀態機模式,爲了便於理解,我們將其分爲三種角色:Follower(跟隨者)、Candidate(候選人)、以及 Leader(領導)。
首先關注 Follower 角色,它是節點初始狀態,即 Broker 節點在一開始就處於 Follower 狀態。在此狀態中,節點設有一個隨機倒計時,如果 Follower 收到了 Leader 的生命信號(心跳),這個倒計時將被重置,這意味着只要有心跳信息,Follower 狀態將一直保持。然而,如果這個隨機倒計時結束了,Follower 角色會升級爲 Candidate 角色。
作爲 Candidate,其主動行爲是發起投票尋求成爲 Leader,對於接收的投票,如果投票數大於或等於整個集羣節點數的一半,它將升級爲 Leader 狀態,如果小於這個數量,它會降級爲 Follower 狀態。
Leader 角色則是系統的核心,它的主動行爲是發送心跳信號保持其他 Follower 節點的跟隨狀態,避免他們試圖搶奪領導地位。
舉例來說,這裏有三個 Broker 節點,即 Broker01,Broker02,和 Broker03。他們剛啓動,目的是要選舉出一個作爲 Leader。假設 Broker01 節點首先完成了隨機倒計時,它將首先變成 Candidate,並開始發起投票。如果其餘兩個節點,即 Broker02 和 Broker03 都把票投給了 Broker01,那麼 Broker01 將會升級爲 Leader,併發送心跳給 Broker02 和 Broker03。一旦 Broker02 和 Broker03 接收到心跳,他們會重置自己的倒計時和狀態,一直保持在 Follower 狀態。這就是一種狀態轉換的情況。
接下來,來討論另一種領導選舉的情形。假設在三個 Broker 節點剛啓動時,Broker01 和 Broker02 都完成了隨機倒計時並升級爲 Candidate 狀態,並同時發起投票。
在這次投票中,Broker01 投自己一票,Broker02 也投自己一票,而 Broker03 則選擇投給 Broker01。那麼由於 Broker01 的票數達到了過半(二分之一以上),它將成功升級爲 Leader。在升級後,Broker01 將開始發出心跳信號。
另一方面,Broker02 由於未能得到過半的選票(只有自己的一票),將不得不降級爲 Follower,然後重新開始新一輪的隨機倒計時,等待下一次的領導選舉機會。
在這種情況下,雖然 Broker01 和 Broker02 同時成爲 Candidate 併發起投票,但由於得票數量的原因,Broker01 最終升級爲 Leader,而 Broker02 則只能繼續留在 Follower 狀態。這也是 RocketMQ 中領導選舉機制的一種典型應用。
負載均衡
Producer 負載均衡
Producer 端,每個實例在發消息的時候,默認會輪詢所有的 message queue 發送,以達到讓消息平均落在不同的 queue 上。而由於 queue 可以散落在不同的 broker,所以消息就發送到不同的 broker 下,如下圖:
圖中箭頭線條上的標號代表順序,發佈方會把第一條消息發送至 Queue 0,然後第二條消息發送至 Queue 1,以此類推。
Consumer 負載均衡
1)集羣模式
在集羣消費模式下,每條消息只需要投遞到訂閱這個 topic 的 Consumer Group 下的一個實例即可。RocketMQ 採用主動拉取的方式拉取並消費消息,在拉取的時候需要明確指定拉取哪一條 message queue。
而每當實例的數量有變更,都會觸發一次所有實例的負載均衡,這時候會按照 queue 的數量和實例的數量平均分配 queue 給每個實例。
默認的分配算法是 AllocateMessageQueueAveragely,如下圖:
還有另外一種平均的算法是 AllocateMessageQueueAveragelyByCircle,也是平均分攤每一條 queue,只是以環狀輪流分 queue 的形式,如下圖:
需要注意的是,集羣模式下,queue 都是隻允許分配只一個實例,這是由於如果多個實例同時消費一個 queue 的消息,由於拉取哪些消息是 consumer 主動控制的,那樣會導致同一個消息在不同的實例下被消費多次,所以算法上都是一個 queue 只分給一個 consumer 實例,一個 consumer 實例可以允許同時分到不同的 queue。
通過增加 consumer 實例去分攤 queue 的消費,可以起到水平擴展的消費能力的作用。而有實例下線的時候,會重新觸發負載均衡,這時候原來分配到的 queue 將分配到其他實例上繼續消費。
但是如果 consumer 實例的數量比 message queue 的總數量還多的話,多出來的 consumer 實例將無法分到 queue,也就無法消費到消息,也就無法起到分攤負載的作用了。所以需要控制讓 queue 的總數量大於等於 consumer 的數量。
2)廣播模式
由於廣播模式下要求一條消息需要投遞到一個消費組下面所有的消費者實例,所以也就沒有消息被分攤消費的說法。
在實現上,其中一個不同就是在 consumer 分配 queue 的時候,所有 consumer 都分到所有的 queue。
消息重試
順序消息的重試
對於順序消息,當消費者消費消息失敗後,消息隊列 RocketMQ 會自動不斷進行消息重試(每次間隔時間爲 1 秒),這時,應用會出現消息消費被阻塞的情況。因此,在使用順序消息時,務必保證應用能夠及時監控並處理消費失敗的情況,避免阻塞現象的發生。
無序消息的重試
對於無序消息(普通、定時、延時、事務消息),當消費者消費消息失敗時,您可以通過設置返回狀態達到消息重試的結果。
無序消息的重試只針對集羣消費方式生效;廣播方式不提供失敗重試特性,即消費失敗後,失敗消息不再重試,繼續消費新的消息。
1)重試次數
消息隊列 RocketMQ 默認允許每條消息最多重試 16 次,每次重試的間隔時間如下:
| 第幾次重試
|
與上次重試的間隔時間
|
第幾次重試
|
與上次重試的間隔時間
| | --- | --- | --- | --- | |
1
|
10 秒
|
9
|
7 分鐘
| |
2
|
30 秒
|
10
|
8 分鐘
| |
3
|
1 分鐘
|
11
|
9 分鐘
| |
4
|
2 分鐘
|
12
|
10 分鐘
| |
5
|
3 分鐘
|
13
|
20 分鐘
| |
6
|
4 分鐘
|
14
|
30 分鐘
| |
7
|
5 分鐘
|
15
|
1 小時
| |
8
|
6 分鐘
|
16
|
2 小時
|
如果消息重試 16 次後仍然失敗,消息將不再投遞。如果嚴格按照上述重試時間間隔計算,某條消息在一直消費失敗的前提下,將會在接下來的 4 小時 46 分鐘之內進行 16 次重試,超過這個時間範圍消息將不再重試投遞。
注意: 一條消息無論重試多少次,這些重試消息的 Message ID 不會改變。
2)配置方式
消費失敗後,重試配置方式
集羣消費方式下,消息消費失敗後期望消息重試,需要在消息監聽器接口的實現中明確進行配置(三種方式任選一種):
-
返回 Action.ReconsumeLater (推薦)
-
返回 Null
-
拋出異常
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//處理消息
doConsumeMessage(message);
//方式1:返回 Action.ReconsumeLater,消息將重試
return Action.ReconsumeLater;
//方式2:返回 null,消息將重試
return null;
//方式3:直接拋出異常, 消息將重試
throw new RuntimeException("Consumer Message exceotion");
}
}
消費失敗後,不重試配置方式
集羣消費方式下,消息失敗後期望消息不重試,需要捕獲消費邏輯中可能拋出的異常,最終返回 Action.CommitMessage,此後這條消息將不會再重試。
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
doConsumeMessage(message);
} catch (Throwable e) {
//捕獲消費邏輯中的所有異常,並返回 Action.CommitMessage;
return Action.CommitMessage;
}
//消息處理正常,直接返回 Action.CommitMessage;
return Action.CommitMessage;
}
}
自定義消息最大重試次數
消息隊列 RocketMQ 允許 Consumer 啓動的時候設置最大重試次數,重試時間間隔將按照如下策略:
-
最大重試次數小於等於 16 次,則重試時間間隔同上表描述。
-
最大重試次數大於 16 次,超過 16 次的重試時間間隔均爲每次 2 小時。
Properties properties = new Properties();
//配置對應 Group ID 的最大消息重試次數爲 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);
注意:
-
消息最大重試次數的設置對相同 Group ID 下的所有 Consumer 實例有效。
-
如果只對相同 Group ID 下兩個 Consumer 實例中的其中一個設置了 MaxReconsumeTimes,那麼該配置對兩個 Consumer 實例均生效。
-
配置採用覆蓋的方式生效,即最後啓動的 Consumer 實例會覆蓋之前的啓動實例的配置
獲取消息重試次數
消費者收到消息後,可按照如下方式獲取消息的重試次數:
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//獲取消息的重試次數
System.out.println(message.getReconsumeTimes());
return Action.CommitMessage;
}
}
死信隊列
當一條消息初次消費失敗,消息隊列 RocketMQ 會自動進行消息重試;達到最大重試次數後,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列 RocketMQ 不會立刻將消息丟棄,而是將其發送到該消費者對應的特殊隊列中。
在消息隊列 RocketMQ 中,這種正常情況下無法被消費的消息稱爲死信消息(Dead-Letter Message),存儲死信消息的特殊隊列稱爲死信隊列(Dead-Letter Queue)。
死信特性
死信消息具有以下特性
-
不會再被消費者正常消費。
-
有效期與正常消息相同,均爲 3 天,3 天后會被自動刪除。因此,請在死信消息產生後的 3 天內及時處理。
死信隊列具有以下特性:
-
一個死信隊列對應一個 Group ID, 而不是對應單個消費者實例。
-
如果一個 Group ID 未產生死信消息,消息隊列 RocketMQ 不會爲其創建相應的死信隊列。
-
一個死信隊列包含了對應 Group ID 產生的所有死信消息,不論該消息屬於哪個 Topic。
查看死信信息
- 在控制檯查詢出現死信隊列的主題信息
- 在消息界面根據主題查詢死信消息
- 選擇重新發送消息
一條消息進入死信隊列,意味着某些因素導致消費者無法正常消費該消息,因此,通常需要您對其進行特殊處理。排查可疑因素並解決問題後,可以在消息隊列 RocketMQ 控制檯重新發送該消息,讓消費者重新消費一次。
消費冪等
消息隊列 RocketMQ 消費者在接收到消息以後,有必要根據業務上的唯一 Key 對消息做冪等處理的必要性。
消費冪等的必要性
在互聯網應用中,尤其在網絡不穩定的情況下,消息隊列 RocketMQ 的消息有可能會出現重複,這個重複簡單可以概括爲以下情況:
-
發送時消息重複
當一條消息已被成功發送到服務端並完成持久化,此時出現了網絡閃斷或者客戶端宕機,導致服務端對客戶端應答失敗。 如果此時生產者意識到消息發送失敗並嘗試再次發送消息,消費者後續會收到兩條內容相同並且 Message ID 也相同的消息。
-
投遞時消息重複
消息消費的場景下,消息已投遞到消費者並完成業務處理,當客戶端給服務端反饋應答的時候網絡閃斷。 爲了保證消息至少被消費一次,消息隊列 RocketMQ 的服務端將在網絡恢復後再次嘗試投遞之前已被處理過的消息,消費者後續會收到兩條內容相同並且 Message ID 也相同的消息。
-
負載均衡時消息重複(包括但不限於網絡抖動、Broker 重啓以及訂閱方應用重啓)
當消息隊列 RocketMQ 的 Broker 或客戶端重啓、擴容或縮容時,會觸發 Rebalance,此時消費者可能會收到重複消息。
處理方式
因爲 Message ID 有可能出現衝突(重複)的情況,所以真正安全的冪等處理,不建議以 Message ID 作爲處理依據。 最好的方式是以業務唯一標識作爲冪等處理的關鍵依據,而業務的唯一標識可以通過消息 Key 進行設置:
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
訂閱方收到消息時可以根據消息的 Key 進行冪等處理:
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根據業務唯一標識的 key 做冪等處理
}
});
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/tCgMsdtM2d_SHqfkumKkeg