rabbitmq 如何保證消息可靠性不丟失
之前我們簡單介紹了 rabbitmq 的功能。他的作用就是方便我們的消息解耦。緊接着問題就會暴露出來。解耦就設計到雙方系統不穩定問題。在 mq 中有生產者、mq、消費者三個角色。其中一個角色 down 機或者重啓後。就設計到消息的丟失問題。
因爲 MQ 整個消息週期設計到上述的三個角色,所以我們從這個三個角色開始討論丟失數據的情況。並如何解決
- 在生產數據程序中,消息已經處理好還未發送給 MQ 這個階段,生產者因爲意外情況中斷了。這個時候生產者這條消息就會丟失。因爲程序重啓好之後可能不會再次生產該消息。
實際案列 1:
- 購物商城中已經選購了商品提交到支付界面。在支付成功後我們的程序需要發送消息給商家。這個時候程序中斷了。待重啓後客戶界面訂單狀態是付款成功的。但是這個訂單就沒有及時通知給商家。這會造成商家延遲發貨。
實際案例 2:
- 同樣是購物支付, A 客戶先付款 order1 訂單,支付成功後發送 MQ 前直線異常但並未導致程序中斷。這個時候 order1 商家收不到通知,然後 B 客戶對 order2 訂單進行支付且整個過程正常。order2 訂單就會通知到對應的商家。整個週期 order1 訂單就屬於丟失
總結:
- 兩種情況都是在發送消息是出現問題。第一種是程序中斷,第二種是訂單異常,第一種異常級別高會影響整個程序使用反而是好排查。第二種程序不異常。這種情況很難發現,只會是個別情況。
解決方案:
- 針對上述情況 mq 也提供了兩種方法解決。
- 1、開啓 rabbitmq 事務 (同步)
- 2、開啓 confirm 模式 (異步)
代碼模擬
1Map<String, Object> resultMap = new HashMap<String, Object>(){
2 {
3 put("code", 200);
4 }
5};
6String msg = "";
7Integer index = 0;
8if (params.containsKey("msg")) {
9 msg = params.get("msg").toString();
10}
11if (params.containsKey("index")) {
12 index = Integer.valueOf(params.get("index").toString());
13}
14if (index != 0) {
15
16 int i = 1 / 0;
17}
18rabbitTemplate.convertAndSend(RabbitConfig.TOPICEXCHANGE, "zxh", msg);
19return resultMap;
20
21複製代碼
22
- 上述代碼
http://localhost:8282/rabbitmq/sendTopic?msg=test&index=1
就會發生異常,這個時候數據丟失 http://localhost:8282/rabbitmq/sendTopic?msg=test
可以正常發送。讀者可以自行測試- 其實通過 rabbitmq 的事務並不能解決上面的丟失情況。但是加上事務會保證消息發送的可靠性。上面發送消息後出異常這時候我們就沒法回退消息了。但是事務可以幫我們實現
事務
1String msg = "trantest";
2Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
3Channel channel = connection.createChannel(true);
4try {
5 channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
6 int i = 1 / 0;
7} catch (IOException e) {
8 channel.txRollback();
9 e.printStackTrace();
10}
11channel.txCommit();
12connection.close();
13
14複製代碼
15
- 最終測試效果是 mq 沒有收到消息的。
confirm 模式確實
1Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
2Channel channel = connection.createChannel(false);
3channel.confirmSelect();
4try {
5 channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
6} catch (IOException e) {
7 e.printStackTrace();
8}
9boolean b = channel.waitForConfirms();
10if (b) {
11 System.out.println("mq接收消息成功");
12 Thread.sleep(1000*5);
13}
14System.out.println("end1");
15channel.confirmSelect();
16channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
17channel.addConfirmListener(new ConfirmListener() {
18 @SneakyThrows
19 @Override
20 public void handleAck(long deliveryTag, boolean multiple) throws IOException {
21 System.out.println("~~~~~消息成功發送到交換機");
22 Thread.sleep(1000 * 5);
23 }
24
25 @Override
26 public void handleNack(long deliveryTag, boolean multiple) throws IOException {
27 System.out.println("~~~~~消息發送到交換機失敗");
28 }
29});
30System.out.println("end2");
31channel.close();
32connection.close();
33
34複製代碼
35
- 上面使用了兩種確認方式,前者是同步確認,後者是異步確認。因爲在同一個方法裏。msg 都是能獲取到的。所以在 ConfimListener 中就沒有返回消息。
數據退回監聽
- 上面兩種一個增加安全可靠性。一個增加確認機制。還有一種情況是數據回退。當交換機沒有隊列綁定是這個時候發送數據後如果設置了回退屬性,那麼消息會回退到監聽器匯中的。channel 中的 mandatory 表示是否檢測分發到隊列中。
1String msg = "Hello World!";
2Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
3Channel channel = connection.createChannel(false);
4channel.confirmSelect();
5
6channel.addReturnListener(new ReturnListener() {
7 @Override
8 public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
9
10 System.out.println("*****"+i);
11 System.out.println("*****"+s);
12 System.out.println("*****"+s1);
13 System.out.println("*****"+s2);
14 System.out.println("*****"+new String(bytes));
15 }
16});
17
18
19channel.basicPublish(RabbitConfig.DIRECTEXCHANGE, "c", true, null, msg.getBytes());
20channel.addConfirmListener(new ConfirmListener() {
21 @SneakyThrows
22 @Override
23 public void handleAck(long deliveryTag, boolean multiple) throws IOException {
24 System.out.println("~~~~~消息成功發送到交換機");
25 Thread.sleep(1000 * 5);
26 }
27
28 @Override
29 public void handleNack(long deliveryTag, boolean multiple) throws IOException {
30 System.out.println("~~~~~消息發送到交換機失敗");
31 }
32});
33
34複製代碼
35
-
上面 ReturnListener 就會被觸發,這個時候 confirm 監聽器也被觸發認爲成功接收的只不過被退回。
-
在發送消息到 MQ 時我們可以設置消息屬性是否爲可持久化。如果設置了直接就會存儲在磁盤上。在內存可用時也會同步到內存中提高效率。如果消息屬性中設置的是非持久化的話,就會直接存儲在內存裏,當內存不足是會將數據備份至磁盤上。
-
消費端如果沒有單獨設置的話默認就是 MQ 不管理。換句話說 MQ 只負責發送消息。mq 爲我們提供了三種模式 NONE, MANUAL, AUTO; 默認的
-
我們需要手動將連接工廠設置 MANUAL 後再接收到消息後我們需要手動確認,mq 纔會刪除消息。否則會一直等待到消費端重啓纔會進行重新分發數據
-
channel.basicAck(long,boolean); 確認收到消息,消息將被隊列移除,false 只確認當前 consumer 一個消息收到,true 確認所有 consumer 獲得的消息。
-
channel.basicNack(long,boolean,boolean); 確認否定消息,第一個 boolean 表示一個 consumer 還是所有,第二個 boolean 表示 requeue 是否重新回到隊列,true 重新入隊。
-
channel.basicReject(long,boolean); 拒絕消息,requeue=false 表示不再重新入隊,如果配置了死信隊列則進入死信隊列。
當消息回滾到消息隊列時,這條消息不會回到隊列尾部,而是仍是在隊列頭部,這時消費者會又接收到這條消息,如果想消息進入隊尾,須確認消息後再次發送消息。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://juejin.cn/post/6939709371093876766?utm_source=gold_browser_extension