RabbitMQ 如何做到全鏈路 100- 不丟失?

-     前言    -

我們都知道,消息從生產端到消費端消費要經過 3 個步驟:

  1. 生產端發送消息到 RabbitMQ;

  2. RabbitMQ 發送消息到消費端;

  3. 消費端消費這條消息;

這 3 個步驟中的每一步都有可能導致消息丟失,消息丟失不可怕,可怕的是丟失了我們還不知道,所以要有一些措施來保證系統的可靠性。

這裏的可靠並不是一定就 100% 不丟失了,磁盤損壞,機房爆炸等等都能導致數據丟失,當然這種都是極小概率發生,能做到 99.999999% 消息不丟失,就是可靠的了。下面來具體分析一下問題以及解決方案。

-     生產端可靠性投遞    -

生產端可靠性投遞,即生產端要確保將消息正確投遞到 RabbitMQ 中。生產端投遞的消息丟失的原因有很多,比如消息在網絡傳輸的過程中發生網絡故障消息丟失,或者消息投遞到 RabbitMQ 時 RabbitMQ 掛了,那消息也可能丟失,而我們根本不知道發生了什麼。針對以上情況,RabbitMQ 本身提供了一些機制。

-     事務消息機制    -

事務消息機制由於會嚴重降低性能,所以一般不採用這種方法,我就不介紹了,而採用另一種輕量級的解決方案——confirm 消息確認機制。

-     Confirm 消息確認機制    -

什麼是 confirm 消息確認機制?顧名思義,就是生產端投遞的消息一旦投遞到 RabbitMQ 後,RabbitMQ 就會發送一個確認消息給生產端,讓生產端知道我已經收到消息了,否則這條消息就可能已經丟失了,需要生產端重新發送消息了。

通過下面這句代碼來開啓確認模式:

channel.confirmSelect();// 開啓發送方確認模式

然後異步監聽確認和未確認的消息:

channel.addConfirmListener(new ConfirmListener() {
    //消息正確到達broker
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("已收到消息");
        //做一些其他處理
    }

    //RabbitMQ因爲自身內部錯誤導致消息丟失,就會發送一條nack消息
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("未確認消息,標識:" + deliveryTag);
        //做一些其他處理,比如消息重發等
    }
});

這樣就可以讓生產端感知到消息是否投遞到 RabbitMQ 中了,當然這樣還不夠,稍後我會說一下極端情況。

-     消息持久化    -

那消息持久化呢?我們知道,RabbitMQ 收到消息後將這個消息暫時存在了內存中,那這就會有個問題,如果 RabbitMQ 掛了,那重啓後數據就丟失了,所以相關的數據應該持久化到硬盤中,這樣就算 RabbitMQ 重啓後也可以到硬盤中取數據恢復。那如何持久化呢?

message 消息到達 RabbitMQ 後先是到 exchange 交換機中,然後路由給 queue 隊列,最後發送給消費端。

所有需要給 exchange、queue 和 message 都進行持久化:

//第三個參數true表示這個exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME, "direct"true);

queue 持久化:

//第二個參數true表示這個queue持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

message 持久化:

//第三個參數MessageProperties.PERSISTENT_TEXT_PLAIN表示這條消息持久化
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

這樣,如果 RabbitMQ 收到消息後掛了,重啓後會自行恢復消息。

到此,RabbitMQ 提供的幾種機制都介紹完了,但這樣還不足以保證消息可靠性投遞 RabbitMQ 中,上面我也提到了會有極端情況,比如 RabbitMQ 收到消息還沒來得及將消息持久化到硬盤時,RabbitMQ 掛了,這樣消息還是丟失了,或者 RabbitMQ 在發送確認消息給生產端的過程中,由於網絡故障而導致生產端沒有收到確認消息,這樣生產端就不知道 RabbitMQ 到底有沒有收到消息,就不好做接下來的處理。

所以除了 RabbitMQ 提供的一些機制外,我們自己也要做一些消息補償機制,以應對一些極端情況。接下來我就介紹其中的一種解決方案——消息入庫。

消息入庫

消息入庫,顧名思義就是將要發送的消息保存到數據庫中。

首先發送消息前先將消息保存到數據庫中,有一個狀態字段 status=0,表示生產端將消息發送給了 RabbitMQ 但還沒收到確認;在生產端收到確認後將 status 設爲 1,表示 RabbitMQ 已收到消息。這裏有可能會出現上面說的兩種情況,所以生產端這邊開一個定時器,定時檢索消息表,將 status=0 並且超過固定時間後(可能消息剛發出去還沒來得及確認這邊定時器剛好檢索到這條 status=0 的消息,所以給個時間)還沒收到確認的消息取出重發(第二種情況下這裏會造成消息重複,消費者端要做冪等性),可能重發還會失敗,所以可以做一個最大重發次數,超過就做另外的處理。

這樣消息就可以可靠性投遞到 RabbitMQ 中了,而生產端也可以感知到了。

消費端消息不丟失

既然已經可以讓生產端 100% 可靠性投遞到 RabbitMQ 了,那接下來就改看看消費端的了,如何讓消費端不丟失消息。

默認情況下,以下 3 種情況會導致消息丟失:

其實,上述 3 中情況導致消息丟失歸根結底是因爲 RabbitMQ 的自動 ack 機制,即默認 RabbitMQ 在消息發出後就立即將這條消息刪除,而不管消費端是否接收到,是否處理完,導致消費端消息丟失時 RabbitMQ 自己又沒有這條消息了。

所以就需要將自動 ack 機制改爲手動 ack 機制。

消費端手動確認消息:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
        //接收到消息,做處理
        //手動確認
        channel.basicAck(delivery.getEnvelope().getDeliveryTag()false);
    } catch (Exception e) {
        //出錯處理,這裏可以讓消息重回隊列重新發送或直接丟棄消息
    }
};
//第二個參數autoAck設爲false表示關閉自動確認機制,需手動確認
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

這樣,當 autoAck 參數置爲 false,對於 RabbitMQ 服務端而言,隊列中的消息分成了兩個部分:一部分是等待投遞給消費端的消息;一部分是已經投遞給消費端,但是還沒有收到消費端確認信號的消息。

如果 RabbitMQ 一直沒有收到消費端的確認信號,並且消費此消息的消費端已經斷開連接或宕機(RabbitMQ 會自己感知到),則 RabbitMQ 會安排該消息重新進入隊列(放在隊列頭部),等待投遞給下一個消費者,當然也有能還是原來的那個消費端,當然消費端也需要確保冪等性。

好了,到此從生產端到 RabbitMQ 再到消費端的全鏈路,就可以保證數據的不丟失。

由於個人水平有限,有些地方可能理解錯了或理解不到位的,請大家多多指出!

作者:指尖涼

來源:blog.csdn.net/hsz2568952354/article/details/8655947

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/UCo7ucOahR2i32jkbwhDKw