死信及死信隊列
1.1 什麼是死信
一般來說,生產者將消息投遞到隊列中,消費者從隊列取出消息進行消費,但某些時候由於特定的原因導致隊列中的某些消息無法被消費,這樣的消息如果沒有後續的處理,就變成了死信 (Dead Letter),所有的死信都會放到死信隊列中。
爲什麼爲有死信?消息變成死信一般是以下三種情況:
-
消息被拒絕,即
basicReject/basicNack
,並且設置 requeue 參數爲 false,這種情況一般消息丟失 。 -
消息過期(TTL) ,TTL 全稱爲 Time-To-Live,表示的是消息的有效期,默認情況下 Rabbit 中的消息不過期,但是可以設置隊列的過期時間和消息的過期時間以達到消息過期的效果 ,消息如果在隊列中一直沒有被消費並且存在時間超過了 TTL,消息就會變成了 "死信" ,後續無法再被消費。
-
隊列達到最大長度,一般當設置了最大隊列長度或大小並達到最大值時。
1.2 死信交換器 DLX
在消息的拒絕操作都是在requeue = true
情形下,如果爲 false 可以發現當發生異常確認後,消息丟失了,這肯定是不能容忍的,所以提出了死信交換器(dead-letter-exchange)的概念。
死信交換器仍然只是一個普通的交換器,創建時並沒有特別要求和操作。在創建隊列的時候,聲明該交換器將用作保存被拒絕的消息即可,相關的參數是 x-dead-letter-exchange
。當這個隊列中有死信時,RabbitMQ 就會自動的將這個消息重新發布到設置的 Exchange 上去,進而被路由到另一個隊列。
舉個栗子
1、生產者生產 3 條消息
import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import java.io.IOException;import java.util.concurrent.TimeoutException;public class DlxProducer {
public final static String EXCHANGE_NAME = "dlx_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//建立連接
Connection connection = RabbitMQUtils.getConnection();
// 創建一個信道
Channel channel = connection.createChannel();
// 指定轉發
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String[] routekeys = {"rabbit", "cat", "dog"};
for (int i = 0; i < 3; i++) {
String routekey = routekeys[i % 3];
String msg = "Hello,RabbitMq" + (i + 1);
channel.basicPublish(EXCHANGE_NAME, routekey, null, msg.getBytes());
System.out.println("Sent " + routekey + ":" + msg);
}
// 關閉頻道和連接
channel.close();
connection.close();
}
}
2、普通消費者消費消息,但是不能消費全部的消息,並把不能消費得消息投遞到死信隊列。如果是我們還想做點其他事情,我們可以在死信交換的時候改變死信消息的路由鍵,具體的相關的參數是 x-dead-letter-routing-key
。
import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeoutException;/**
* 類說明:普通的消費者,但是自己無法消費的消息,將投入死信隊列
*/public class NormalDlxConsumer { public static void main(String[] args) throws IOException, TimeoutException { //建立連接
Connection connection = RabbitMQUtils.getConnection(); // 創建一個信道
Channel channel = connection.createChannel();
channel.exchangeDeclare(DlxProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //綁定死信交換器
//聲明一個隊列,並綁定死信交換器
String queueName = "dlx_queue";
Map<String, Object> argos = new HashMap<String, Object>();
argos.put("x-dead-letter-exchange", DlxConsumer.DLX_EXCHANGE_NAME); //死信路由鍵,會替換消息原來的路由鍵
//args.put("x-dead-letter-routing-key", "dead");
channel.queueDeclare(queueName, false, true, false, argos); //綁定,將隊列和交換器通過路由鍵進行綁定
channel.queueBind(queueName, DlxProducer.EXCHANGE_NAME, "#");
System.out.println("waiting for message........"); final Consumer consumer = new DefaultConsumer(channel) { @Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8"); //如果是cat的消息確認
if (envelope.getRoutingKey().equals("cat")) {
System.out.println("Received[" + envelope.getRoutingKey() + "]" + message);
channel.basicAck(envelope.getDeliveryTag(), false);
} else { //如果是其他的消息拒絕(queue=false),成爲死信消息
System.out.println("Will reject[" + envelope.getRoutingKey() + "]" + message);
channel.basicReject(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(queueName, false, consumer);
}
}
3、申明一個消費者,負責消費死信隊列
mport com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import java.io.IOException;import java.util.concurrent.TimeoutException;/**
* 類說明:普通的消費者,負責消費死信隊列dlx_accept
*/public class DlxConsumer {
public final static String DLX_EXCHANGE_NAME = "dlx_accept";
public static void main(String[] args) throws IOException, TimeoutException {
//建立連接
Connection connection = RabbitMQUtils.getConnection();
// 創建一個信道
Channel channel = connection.createChannel();
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "dlx_accept";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, DLX_EXCHANGE_NAME, "#");
System.out.println("waiting for message........");
//聲明瞭一個死信消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received dead letter[" + envelope.getRoutingKey() + "]" + message);
}
};
//消費者正式開始在指定隊列上消費消息
channel.basicConsume(queueName, true, consumer);
}
}
測試結果:
DLX 和備用交換器的區別
-
備用交換器是主交換器無法路由消息,那麼消息將被路由到這個新的備用交換器,而死信交換器則是接收過期或者被拒絕的消息。
-
備用交換器是在聲明主交換器時發生聯繫,而死信交換器則聲明隊列時發生聯繫。
場景分析:備用交換器一般是用於生產者生產消息時,確保消息可以儘量進入 RabbitMQ,而死信交換器主要是用於消費者消費消息產生死信的場景(比如消息過期,隊列滿了,消息拒絕且不重新投遞)。
作者:Ayue
鏈接:https://juejin.cn/post/7111501986440151071
來源:稀土掘金
著作權歸作者所有。商業轉載請聯繫作者獲得授權,非商業轉載請註明出處。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/tdMR7g9VlGvA_4nqJifjSA