談談 SpringBoot-RabbitMQ 死信隊列
前言
死信:無法被消費的消息,稱爲死信。
如果死信一直留在隊列中,會導致一直被消費,卻從不消費成功。
所以我們專門開闢了一個來存放死信的隊列,叫死信隊列(DLX,dead-letter-exchange
)。
死信的幾種來源:
-
消息 TTL 過期(time to live,存活時間,可以用在限時支付消息)
-
隊列達到最大長度(隊列滿了,無法路由到該隊列)
-
消息被拒絕(
basic.reject / basic.nack
),並且requeue = false
環境準備配置
準備 MQ 的隊列和環境:
-
正常交換機
-
正常隊列(最長隊列 5) ---- 正常消費者,拒絕消息
-
ttl 隊列(過期時間 60 秒) ---- 沒有消費者
-
死信交換機
-
死信隊列
主要配置文件如下:
@Configuration
public class DeadConfig {
/* 正常配置 **********************************************************************************************************/
/**
* 正常交換機,開啓持久化
*/
@Bean
DirectExchange normalExchange() {
return new DirectExchange("normalExchange", true, false);
}
@Bean
public Queue normalQueue() {
// durable: 是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啓時仍然存在,暫存隊列:當前連接有效
// exclusive: 默認也是false,只能被當前創建的連接使用,而且當連接關閉後隊列即被刪除。此參考優先級高於durable
// autoDelete: 是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
Map<String, Object> args = deadQueueArgs();
// 隊列設置最大長度
args.put("x-max-length", 5);
return new Queue("normalQueue", true, false, false, args);
}
@Bean
public Queue ttlQueue() {
Map<String, Object> args = deadQueueArgs();
// 隊列設置消息過期時間 60 秒
args.put("x-message-ttl", 60 * 1000);
return new Queue("ttlQueue", true, false, false, args);
}
@Bean
Binding normalRouteBinding() {
return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normalRouting");
}
@Bean
Binding ttlRouteBinding() {
return BindingBuilder.bind(ttlQueue()).to(normalExchange()).with("ttlRouting");
}
/* 死信配置 **********************************************************************************************************/
/**
* 死信交換機
*/
@Bean
DirectExchange deadExchange() {
return new DirectExchange("deadExchange", true, false);
}
/**
* 死信隊列
*/
@Bean
public Queue deadQueue() {
return new Queue("deadQueue", true, false, false);
}
@Bean
Binding deadRouteBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("deadRouting");
}
/**
* 轉發到 死信隊列,配置參數
*/
private Map<String, Object> deadQueueArgs() {
Map<String, Object> map = new HashMap<>();
// 綁定該隊列到私信交換機
map.put("x-dead-letter-exchange", "deadExchange");
map.put("x-dead-letter-routing-key", "deadRouting");
return map;
}
}
arguments 具體參數如下:
隊列達到最大長度
首先測試最簡單的,沒有消費者。
調用 6 次正常隊列的生產方法。
/**
* 正常消息隊列,隊列最大長度5
*/
@GetMapping("/normalQueue")
public String normalQueue() {
Map<String, Object> map = new HashMap<>();
map.put("messageId", String.valueOf(UUID.randomUUID()));
map.put("data", System.currentTimeMillis() + ", 正常隊列消息,最大長度 5");
rabbitTemplate.convertAndSend("normalExchange", "normalRouting", map, new CorrelationData());
return JSONObject.toJSONString(map);
}
MQ 結果如下:
消息 TTL 過期
消息的 TTL 指的是消息的存活時間,我們可以通過設置消息的 TTL 或者隊列的 TTL 來實現。
-
消息的 TTL :對於設置了過期時間屬性 (expiration) 的消息,消息如果在過期時間內沒被消費,會過期
-
隊列的 TTL :對於設置了過期時間屬性 (x-message-ttl) 的隊列,所有路由到這個隊列的消息,都會設置上這個過期時間
兩種配置都行,一般都用在定時任務,限時支付這種地方。
/**
* 消息 TTL, time to live
*/
@GetMapping("/ttlToDead")
public String ttlToDead() {
Map<String, Object> map = new HashMap<>();
map.put("messageId", String.valueOf(UUID.randomUUID()));
map.put("data", System.currentTimeMillis() + ", ttl隊列消息");
rabbitTemplate.convertAndSend("normalExchange", "ttlRouting", map, new CorrelationData());
return JSONObject.toJSONString(map);
}
發送後:
等待過期後:
Demo 中只是爲了方便,代碼中儘量使用 消息 TTL,不要用 隊列 TTL
拒絕消息
正常隊列消費後拒絕消息,並且不進行重新入隊:
@Component
@RabbitListener(queues = "normalQueue")
public class NormalConsumer {
@RabbitHandler
public void process(Map<String, Object> message, Channel channel, Message mqMsg) throws IOException {
System.out.println("收到消息,並拒絕重新入隊 : " + message.toString());
channel.basicReject(mqMsg.getMessageProperties().getDeliveryTag(), false);
}
}
MQ 控制檯:
死信隊列消費:
@Component
@RabbitListener(queues = "deadQueue")
public class DeadConsumer {
@RabbitHandler
public void process(Map<String, Object> message, Channel channel, Message mqMsg) throws IOException {
System.out.println("死信隊列收到消息 : " + message.toString());
channel.basicAck(mqMsg.getMessageProperties().getDeliveryTag(), false);
}
}
消息順序和實驗一致:
死信隊列收到消息 : {data=1631534291765, 正常隊列消息,最大長度 5, messageId=bce3888b-da38-4299-ac88-d22cbe164739}
死信隊列收到消息 : {data=1631535222745, ttl隊列消息, messageId=a4617445-5aab-4fac-aec7-5709ea699598}
死信隊列收到消息 : {data=1631534503765, 正常隊列消息,最大長度 5, messageId=b65ecaab-5ce7-4597-a32c-c90b67ec46da}
死信隊列收到消息 : {data=1631534511468, 正常隊列消息,最大長度 5, messageId=d63d2a4c-e7d3-4f00-a6ca-78e2d62d1d92}
死信隊列收到消息 : {data=1631534585087, 正常隊列消息,最大長度 5, messageId=eed0c349-415b-43dc-aa79-c683122a1289}
死信隊列收到消息 : {data=1631534588311, 正常隊列消息,最大長度 5, messageId=7a7bd152-f2fa-4a74-b9e6-943ac7cbb3d4}
死信隊列收到消息 : {data=1631534608504, 正常隊列消息,最大長度 5, messageId=9de512a1-4ca4-4060-9096-27aba01c1687}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/OVIRkUOUkymmISfnUF8M_g