談談 SpringBoot-RabbitMQ 死信隊列

前言

死信:無法被消費的消息,稱爲死信。

如果死信一直留在隊列中,會導致一直被消費,卻從不消費成功。

所以我們專門開闢了一個來存放死信的隊列,叫死信隊列(DLX,dead-letter-exchange)。

死信的幾種來源:

環境準備配置

準備 MQ 的隊列和環境:

主要配置文件如下:

@Configuration
public class DeadConfig {

    /* 正常配置 **********************************************************************************************************/

    /**
     * 正常交換機,開啓持久化
     */
    @Bean
    DirectExchange normalExchange() {
        return new DirectExchange("normalExchange", true, false);
    }

    @Bean
    public Queue normalQueue() {
        // durable: 是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啓時仍然存在,暫存隊列:當前連接有效
        // exclusive: 默認也是false,只能被當前創建的連接使用,而且當連接關閉後隊列即被刪除。此參考優先級高於durable
        // autoDelete: 是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        Map<String, Object> args = deadQueueArgs();
        // 隊列設置最大長度
        args.put("x-max-length", 5);
        return new Queue("normalQueue", true, false, false, args);
    }

    @Bean
    public Queue ttlQueue() {
        Map<String, Object> args = deadQueueArgs();
        // 隊列設置消息過期時間 60 秒
        args.put("x-message-ttl", 60 * 1000);
        return new Queue("ttlQueue", true, false, false, args);
    }

    @Bean
    Binding normalRouteBinding() {
        return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normalRouting");
    }

    @Bean
    Binding ttlRouteBinding() {
        return BindingBuilder.bind(ttlQueue()).to(normalExchange()).with("ttlRouting");
    }

    /* 死信配置 **********************************************************************************************************/

    /**
     * 死信交換機
     */
    @Bean
    DirectExchange deadExchange() {
        return new DirectExchange("deadExchange", true, false);
    }

    /**
     * 死信隊列
     */
    @Bean
    public Queue deadQueue() {
        return new Queue("deadQueue", true, false, false);
    }

    @Bean
    Binding deadRouteBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("deadRouting");
    }

    /**
     * 轉發到 死信隊列,配置參數
     */
    private Map<String, Object> deadQueueArgs() {
        Map<String, Object> map = new HashMap<>();
        // 綁定該隊列到私信交換機
        map.put("x-dead-letter-exchange""deadExchange");
        map.put("x-dead-letter-routing-key""deadRouting");
        return map;
    }

}

arguments 具體參數如下:

隊列達到最大長度

首先測試最簡單的,沒有消費者。

調用 6 次正常隊列的生產方法。

 /**
  * 正常消息隊列,隊列最大長度5
  */
 @GetMapping("/normalQueue")
 public String normalQueue() {

     Map<String, Object> map = new HashMap<>();
     map.put("messageId", String.valueOf(UUID.randomUUID()));
     map.put("data", System.currentTimeMillis() + ", 正常隊列消息,最大長度 5");

     rabbitTemplate.convertAndSend("normalExchange""normalRouting", map, new CorrelationData());
     return JSONObject.toJSONString(map);
 }

MQ 結果如下:

消息 TTL 過期

消息的 TTL 指的是消息的存活時間,我們可以通過設置消息的 TTL 或者隊列的 TTL 來實現。

兩種配置都行,一般都用在定時任務,限時支付這種地方。

 /**
  * 消息 TTL, time to live
  */
 @GetMapping("/ttlToDead")
 public String ttlToDead() {

     Map<String, Object> map = new HashMap<>();
     map.put("messageId", String.valueOf(UUID.randomUUID()));
     map.put("data", System.currentTimeMillis() + ", ttl隊列消息");

     rabbitTemplate.convertAndSend("normalExchange""ttlRouting", map, new CorrelationData());
     return JSONObject.toJSONString(map);
 }

發送後:

等待過期後:

Demo 中只是爲了方便,代碼中儘量使用 消息 TTL,不要用 隊列 TTL

拒絕消息

正常隊列消費後拒絕消息,並且不進行重新入隊:

@Component
@RabbitListener(queues = "normalQueue")
public class NormalConsumer {
    @RabbitHandler
    public void process(Map<String, Object> message, Channel channel, Message mqMsg) throws IOException {
        System.out.println("收到消息,並拒絕重新入隊 : " + message.toString());
        channel.basicReject(mqMsg.getMessageProperties().getDeliveryTag()false);
    }
}

MQ 控制檯:

死信隊列消費:

@Component
@RabbitListener(queues = "deadQueue")
public class DeadConsumer {
    @RabbitHandler
    public void process(Map<String, Object> message, Channel channel, Message mqMsg) throws IOException {
        System.out.println("死信隊列收到消息 : " + message.toString());
        channel.basicAck(mqMsg.getMessageProperties().getDeliveryTag()false);
    }
}

消息順序和實驗一致:

死信隊列收到消息 : {data=1631534291765, 正常隊列消息,最大長度 5, messageId=bce3888b-da38-4299-ac88-d22cbe164739}
死信隊列收到消息 : {data=1631535222745, ttl隊列消息, messageId=a4617445-5aab-4fac-aec7-5709ea699598}
死信隊列收到消息 : {data=1631534503765, 正常隊列消息,最大長度 5, messageId=b65ecaab-5ce7-4597-a32c-c90b67ec46da}
死信隊列收到消息 : {data=1631534511468, 正常隊列消息,最大長度 5, messageId=d63d2a4c-e7d3-4f00-a6ca-78e2d62d1d92}
死信隊列收到消息 : {data=1631534585087, 正常隊列消息,最大長度 5, messageId=eed0c349-415b-43dc-aa79-c683122a1289}
死信隊列收到消息 : {data=1631534588311, 正常隊列消息,最大長度 5, messageId=7a7bd152-f2fa-4a74-b9e6-943ac7cbb3d4}
死信隊列收到消息 : {data=1631534608504, 正常隊列消息,最大長度 5, messageId=9de512a1-4ca4-4060-9096-27aba01c1687}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/OVIRkUOUkymmISfnUF8M_g