MQ 如何保證消息可靠性傳輸

1 介紹

這篇我們來說說 MQ 消息的可靠性傳輸。可靠性傳輸其實包含兩種情況:一種是重複消費的情況,我們上一篇的冪等性消費解決的就是這個問題;另外一種是消息丟失的情況的,要確保我們生產的消息一定最終會得到消費。這時候就要從消息執行的幾個階段去保證,每一個階段都不能出現問題。

2 消息生產階段

消息生產階段指的是消息從生產到消息發送出去,經過網絡傳輸,再到達 Broker 服務器並被接收的這整個階段,我們需要一個健壯的確認機制(ACK)來保證消息傳遞的可靠性。如果說消息被接收到之後可以反饋給消息生產方去確認,那這個過程就比較完美了。

3 消息服務器處理階段

Broker 作爲消息服務器,主要用於消息收發的操作。一般情況下只要消息服務正常運行,並依賴數據持久化能力,丟消息的可能行就比較小。
但是在很多場景下,爲了提升消息隊列的效率,爲了提升吞吐能力,在沒有確定完成持久化動作(刷盤)之前,就會把確認消息返回。即只要消息進行
Commit 了,那就是成功的。但是如果還沒持久化成功便發生了宕機,那就有存在消息丟失的風險。可以參照如下優化:

4 消息消費階段

消息存儲到了 Broker 之後,剩下的就是消息消費了。消息消費階段跟生產階段大概一致,都是使用確認機制來保證消息的可靠性和傳輸的。
當 Consumer 從 Broker 拉取到消息之後,開始消費消息,執行業務的的邏輯程序,業務程序執行成功後,纔給 Broker 發送消費確認響應。
如果沒成功或者消息在發送中途丟失,就沒有確認響應,這樣的話,在下一輪消息拉取的時候,Broker 依舊會返回這一條消費數據給你,避免網絡抖動原因或者 Consumer 在執行消費出錯導致丟失。

4.1 消費分區的策略模式

多個消費者消費用一個分區,我們經常會出現這種情況:同一個 Consumer Group 裏面有多個 Consumer,比如 Comsumer A 拉走了某一批數據,但是還沒返回確認消息,Consumer B 又過來要 拉數據了,Broker 要怎麼判定呢?
這邊舉個例子:Consumer A 拉取 index = 106 位置的數據,但是還沒返回消費完成的確認信息,這時候消費位置依然是 index = 10086,如果 Consumer B 也過拉取數據,則

4.2 消費重試和死信隊列

在 RocketMQ 中,當消息第一次消費失敗時,消息隊列會自動進行消息重試,達到最大重試次數(可配置閾值,比如 5)後,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息。此時,消息隊列 RocketMQ 版不會立刻將消息丟棄,而是將其發送到該消費者對應的特殊隊列中。這種無法被消費的消息稱爲死信消息(Dead-Letter Message),存儲死信消息的特殊隊列稱爲死信隊列(Dead-Letter Queue)。
可以使用單獨的作業服務進行獨立處理,比如重新發送死信消息進行消費,避免消息漏處理導致業務服務可用性問題。

5 總結

總得來說:MQ 可以從三個角度來分析:生產者丟數據、消息隊列服務器(Broker)丟數據、消費者丟數據
生產者丟數據:RabbitMQ 提供 transaction 和 confirm 模式來確保生產者不丟消息。
消息隊列服務丟數據:開啓持久化磁盤的配置。這個持久化配置可以和 confirm 機制配合使用,你可以在消息持久化磁盤後,再給生產者發送一個 Ack 信號。
消費者丟數據:與生產者基本一直,等消費完成並接收到 confirm 才能確認是消費成功。超時或者失敗則重試,重試超過指定閾值的時候,計入死信隊列並獨立處理。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/3aTXmPmQy0Fjf9qaRKoEsQ