rabbitmq 如何保證消息可靠性不丟失

之前我們簡單介紹了 rabbitmq 的功能。他的作用就是方便我們的消息解耦。緊接着問題就會暴露出來。解耦就設計到雙方系統不穩定問題。在 mq 中有生產者、mq、消費者三個角色。其中一個角色 down 機或者重啓後。就設計到消息的丟失問題。

因爲 MQ 整個消息週期設計到上述的三個角色,所以我們從這個三個角色開始討論丟失數據的情況。並如何解決

實際案列 1

實際案例 2

總結

解決方案

代碼模擬

 1Map<String, Object> resultMap = new HashMap<String, Object>(){
 2    {
 3        put("code", 200);
 4    }
 5};
 6String msg = "";
 7Integer index = 0;
 8if (params.containsKey("msg")) {
 9    msg = params.get("msg").toString();
10}
11if (params.containsKey("index")) {
12    index = Integer.valueOf(params.get("index").toString());
13}
14if (index != 0) {
15    
16    int i = 1 / 0;
17}
18rabbitTemplate.convertAndSend(RabbitConfig.TOPICEXCHANGE, "zxh", msg);
19return resultMap;
20
21複製代碼
22

事務

 1String msg = "trantest";
 2Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
 3Channel channel = connection.createChannel(true);
 4try {
 5    channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
 6    int i = 1 / 0;
 7} catch (IOException e) {
 8    channel.txRollback();
 9    e.printStackTrace();
10}
11channel.txCommit();
12connection.close();
13
14複製代碼
15

confirm 模式確實

 1Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
 2Channel channel = connection.createChannel(false);
 3channel.confirmSelect();
 4try {
 5    channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
 6} catch (IOException e) {
 7    e.printStackTrace();
 8}
 9boolean b = channel.waitForConfirms();
10if (b) {
11    System.out.println("mq接收消息成功");
12    Thread.sleep(1000*5);
13}
14System.out.println("end1");
15channel.confirmSelect();
16channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
17channel.addConfirmListener(new ConfirmListener() {
18    @SneakyThrows
19    @Override
20    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
21        System.out.println("~~~~~消息成功發送到交換機");
22        Thread.sleep(1000 * 5);
23    }
24
25    @Override
26    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
27        System.out.println("~~~~~消息發送到交換機失敗");
28    }
29});
30System.out.println("end2");
31channel.close();
32connection.close();
33
34複製代碼
35

數據退回監聽

 1String msg = "Hello World!";
 2Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
 3Channel channel = connection.createChannel(false);
 4channel.confirmSelect();
 5
 6channel.addReturnListener(new ReturnListener() {
 7    @Override
 8    public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
 9        
10        System.out.println("*****"+i);  
11        System.out.println("*****"+s);  
12        System.out.println("*****"+s1); 
13        System.out.println("*****"+s2); 
14        System.out.println("*****"+new String(bytes));  
15    }
16});
17
18
19channel.basicPublish(RabbitConfig.DIRECTEXCHANGE, "c", true, null, msg.getBytes());
20channel.addConfirmListener(new ConfirmListener() {
21    @SneakyThrows
22    @Override
23    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
24        System.out.println("~~~~~消息成功發送到交換機");
25        Thread.sleep(1000 * 5);
26    }
27
28    @Override
29    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
30        System.out.println("~~~~~消息發送到交換機失敗");
31    }
32});
33
34複製代碼
35

當消息回滾到消息隊列時,這條消息不會回到隊列尾部,而是仍是在隊列頭部,這時消費者會又接收到這條消息,如果想消息進入隊尾,須確認消息後再次發送消息。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://juejin.cn/post/6939709371093876766?utm_source=gold_browser_extension