如何保證 RabbitMQ 消息不丟失?

一. RabbitMQ 消息丟失的三種情況

第一種:生產者弄丟了數據。生產者將數據發送到 RabbitMQ 的時候,可能數據就在半路給搞丟了,因爲網絡問題啥的,都有可能。

第二種:RabbitMQ 弄丟了數據。MQ 還沒有持久化自己掛了。

第三種:消費端弄丟了數據。剛消費到,還沒處理,結果進程掛了,比如重啓了。

二. RabbitMQ 消息丟失解決方案

  1. 針對生產者

方案 1 :開啓 RabbitMQ 事務

可以選擇用 RabbitMQ 提供的事務功能,就是生產者發送數據之前開啓 RabbitMQ 事務 channel.txSelect,然後發送消息,如果消息沒有成功被 RabbitMQ 接收到,那麼生產者會收到異常報錯,此時就可以回滾事務 channel.txRollback,然後重試發送消息;如果收到了消息,那麼可以提交事務 channel.txCommit。

// 開啓事務  
channel.txSelect();  
try {  
   // 這裏發送消息  
} catch (Exception e) {  
   channel.txRollback(); 
// 這裏再次重發這條消息
}
// 提交事務  
channel.txCommit();

缺點:

RabbitMQ 事務機制是同步的,你提交一個事務之後會阻塞在那兒,採用這種方式基本上吞吐量會下來,因爲太耗性能。

方案 2:使用 confirm 機制

事務機制和 confirm 機制最大的不同在於,事務機制是同步的,你提交一個事務之後會阻塞在那兒,但是 confirm 機制是異步的

在生產者開啓了 confirm 模式之後,每次寫的消息都會分配一個唯一的 id,然後如果寫入了 rabbitmq 之中,rabbitmq 會給你回傳一個 ack 消息,告訴你這個消息發送 OK 了;如果 rabbitmq 沒能處理這個消息,會回調你一個 nack 接口,告訴你這個消息失敗了,你可以進行重試。而且你可以結合這個機制知道自己在內存裏維護每個消息的 id,如果超過一定時間還沒接收到這個消息的回調,那麼你可以進行重發。

//開啓confirm  
channel.confirm();  
//發送成功回調  
public void ack(String messageId){
}
// 發送失敗回調  
public void nack(String messageId){  
    //重發該消息  
}
  1. 針對 RabbitMQ

主要需要應對三點:

(1)消息持久化

RabbitMQ 的消息默認存放在內存上面,如果不特別聲明設置,消息不會持久化保存到硬盤上面的,如果節點重啓或者意外 crash 掉,消息就會丟失。

所以就要對消息進行持久化處理。如何持久化,下面具體說明下。要想做到消息持久化,必須滿足以下三個條件,缺一不可。

(2)設置集羣鏡像模式

先來介紹下 RabbitMQ 三種部署模式:

爲什麼設置鏡像模式集羣,因爲隊列的內容僅僅存在某一個節點上面,不會存在所有節點上面,所有節點僅僅存放消息結構和元數據。下面畫了一張圖介紹普通集羣丟失消息情況:

如果想解決上面途中問題,保證消息不丟失,需要採用 HA 鏡像模式隊列。

下面介紹下三種 HA 策略模式:

命令處理 HA 策略模版:

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)爲每個以rock.wechat開頭的隊列設置所有節點的鏡像,並且設置爲自動同步模式

rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'  
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

2)爲每個以rock.wechat.開頭的隊列設置兩個節點的鏡像,並且設置爲自動同步模式

rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3)爲每個以node.開頭的隊列分配指定的節點做鏡像

rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 鏡像隊列有一個很大的缺點就是系統的吞吐量會有所下降。

(3)消息補償機制

爲什麼還要消息補償機制呢?難道消息還會丟失,沒錯,系統是在一個複雜的環境,不要想的太簡單了,雖然以上的三種方案,基本可以保證消息的高可用不丟失的問題。

但是作爲有追求的程序員來講,要絕對保證我的系統的穩定性,有一種危機意識。

比如:持久化的消息,保存到硬盤過程中,當前隊列節點掛了,存儲節點硬盤又壞了,消息丟了,怎麼辦?

1)生產端首先將業務數據以及消息數據入庫,需要在同一個事務中,消息數據入庫失敗,則整體回滾。

2)根據消息表中消息狀態,失敗則進行消息補償措施,重新發送消息處理。

  1. 針對消費者

方案一:ACK 確認機制

多個消費者同時收取消息,比如消息接收到一半的時候,一個消費者死掉了 (邏輯複雜時間太長,超時了或者消費被停機或者網絡斷開鏈接),如何保證消息不丟?

使用 rabbitmq 提供的 ack 機制,服務端首先關閉 rabbitmq 的自動 ack,然後每次在確保處理完這個消息之後,在代碼裏手動調用 ack。這樣就可以避免消息還沒有處理完就 ack。才把消息從內存刪除。

這樣就解決了,即使一個消費者出了問題,但不會同步消息給服務端,會有其他的消費端去消費,保證了消息不丟的 case。

總結

如果需要保證消息在整條鏈路中不丟失,那就需要生產端、mq 自身與消費端共同去保障。

通過以上的處理,理論上不存在消息丟失的情況,但是系統的吞吐量以及性能有所下降。在實際開發中,需要考慮消息丟失的影響程度,來做出對可靠性以及性能之間的權衡。

原文地址:https://blog.csdn.net/w20001118/article/details/12659597

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/tKARjiybVGnwOqIoy5wB-w