關於 rabbitmq,你想知道的都在這裏(下)
持久化
我們將消息發送到 mq 消息隊列了,如果 mq 服務器掛掉了,在 mq 中未消費的消息是否就丟失了,重啓 mq 服務器未消費的是否還在?我們可以將 comsumer 關掉,然後只保留 producer 發送消息,然後看看 mq 的管理端,發現消息並沒人消費。此時,將 mq 服務重啓,重啓後看消息是否還在?
奇蹟般的消息都還在。上節課我們也沒配置什麼東西,怎麼消息默認就持久化了嗎?
@Bean
public Queue DirectQueue() {
return new Queue(DirectRabbitConfig.directQueue);
}
那是因爲我們在定義隊列的時候使用的構造方法底層實現的時候durable
參數設置爲 true(持久化),因此消息當 mq 掛掉時,未消費的消息重啓後還會在。
大家可以設置爲 false 試試,但是需要注意我們未設置virtual-host
,默認的使用的是/
,而這個host
不允許設置爲false
。我們可以在rabbitmq
管理控制檯增加一個host
如下操作:
點擊增加一個 host 名稱爲 myVirtualHost,並在 yml 文件中增加一個配置:
這樣就可以設置 queue 的 durable 爲 false,將消費者關閉(模擬消費者異常),然後生產消息,在 mq 的控制檯看到有未消費的消息,重啓 mq,消息丟失~~~
消息確認
發送端確認
增加配置
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//設置開啓Mandatory,才能觸發回調函數,無論消息推送結果怎麼樣都強制調用回調函數
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm");
System.out.println("correlationData:"+correlationData);
System.out.println("ack:"+ack);
System.out.println("cause:"+cause);
System.out.println();
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("returnedMessage");
System.out.println("returnedMessage.getMessage()-->"+returnedMessage.getMessage());
System.out.println("returnedMessage.getReplyCode()-->"+returnedMessage.getReplyCode());
System.out.println("returnedMessage.getReplyText()-->"+returnedMessage.getReplyText());
System.out.println("returnedMessage.getExchange()-->"+returnedMessage.getExchange());
System.out.println("returnedMessage.getRoutingKey()-->"+returnedMessage.getRoutingKey());
System.out.println();
}
});
return rabbitTemplate;
}
yml 開始配置:
注:√代表存在,× 代表不存在
消費端確認
basicAck
,表示成功確認,使用此回執方法後,消息會被 rabbitmq broker 刪除。
-
deliveryTag:表示消息投遞序號,每次消費消息或者消息重新投遞後,deliveryTag 都會增加。手動消息確認模式下,我們可以對指定 deliveryTag 的消息進行 ack、nack、reject 等操作。
-
multiple:是否批量確認,值爲 true 則會一次性 ack 所有小於當前消息 deliveryTag 的消息。
void basicAck(long deliveryTag, boolean multiple)
basicNack
,表示失敗確認,一般在消費消息業務異常時用到此方法,可以將消息重新投遞入隊列。
-
deliveryTag:表示消息投遞序號。
-
multiple:是否批量確認。
-
requeue:值爲 true 消息將重新入隊列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
注:消息重新投放,將投放到隊列的頭部,也就是馬上會再被消費,如果業務不能處理成功,將陷入死循環。能瞬間將 cpu 撐滿。良好實踐是,先將這條消息設置爲 basicAck,然後再投遞另外一條同樣的消息,這樣消息就進入了消息隊列尾部,但是也需要處理消息一直消費不成功,當消費多少次都是失敗後進入特有的失敗隊列。
basicReject
,拒絕消息,與 basicNack 區別在於不能進行批量操作,其他用法很相似。
-
deliveryTag:表示消息投遞序號。
-
requeue:值爲 true 消息將重新入隊列。
void basicReject(long deliveryTag, boolean requeue)
@Slf4j
@Component
@RabbitListener(queues = "directQueue",group = "DirectReceiver")//監聽的隊列名稱 directQueue,不需要管路由和交換機,因爲這些是生產者管理的事情。消費者只需要關心隊列即可
public class DirectReceiver {
// @RabbitHandler
// public void handler(Map testMessage) {
// System.out.println("directReceiver消費者收到消息 : " + testMessage.toString());
// for (Object item : testMessage.keySet()) {
// log.info("item:{}-->value:{}",item,testMessage.get(item));
// }
// log.info("");
// }
@RabbitHandler
public void processHandler(Map testMessage,Message message,Channel channel) throws IOException {
try{
System.out.println("directReceiver消費者收到消息 : " + testMessage.toString());
log.info("channel:{}",channel);
log.info("message:{}",message);
int i=1/0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
//是否是重新投遞的消息
if(message.getMessageProperties().getRedelivered()){
log.error("消息已重複處理,不再消費");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}else{
log.error("消息即將再次返回隊列處理");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}
}
}
一個消息兩個訂閱
如果一個消息隊列被兩個消息消費者訂閱,那兩個訂閱者都可以收到消息嗎?還是搶奪式的?我們 copy 一份 directReceiver 如下:
@Component
@RabbitListener(queues = "directQueue")//監聽的隊列名稱 directQueue,不需要管路由和交換機,因爲這些是生產者管理的事情。消費者只需要關心隊列即可
public class DirectAnotherReceiver {
@RabbitHandler
public void handler(Map testMessage) {
System.out.println("directAnotherReceiver消費者收到消息 : " + testMessage.toString());
}
}
然後用 postman 發消息,發現每次消息只有一個 listener 消費。且是輪流消費的。即使你啓動多個服務也是隻有一個消費者消費。這樣可以在部署多個實例的時候保證消息不會被重複消費。
directAnotherReceiver消費者收到消息 : {datetime=1624238518016, data=hello,i am direct msg!, id=eb99a77f-0657-4f98-bee8-4815f88436d9}
directReceiver消費者收到消息 : {datetime=1624238539087, data=hello,i am direct msg!, id=2da0d4d3-1707-43f3-96f9-1790aa411be9}
directAnotherReceiver消費者收到消息 : {datetime=1624238540193, data=hello,i am direct msg!, id=5cfbfba5-d577-4992-82ec-29e3504f40b9}
directReceiver消費者收到消息 : {datetime=1624238541016, data=hello,i am direct msg!, id=9d687827-29f9-459d-9da8-f79f775becf9}
directAnotherReceiver消費者收到消息 : {datetime=1624238541879, data=hello,i am direct msg!, id=cddee0cc-0a4e-4448-ae1a-be02fa8530a3}
directReceiver消費者收到消息 : {datetime=1624238542683, data=hello,i am direct msg!, id=f1bb5c39-ffc4-4bdd-a34a-c0c394bebfe0}
directAnotherReceiver消費者收到消息 : {datetime=1624238543150, data=hello,i am direct msg!, id=48b6449f-0720-4321-92b0-e954728d258d}
directReceiver消費者收到消息 : {datetime=1624238545120, data=hello,i am direct msg!, id=120ef701-b762-46a1-a36b-39537b801a4a}
directAnotherReceiver消費者收到消息 : {datetime=1624238545586, data=hello,i am direct msg!, id=6a3df339-53d8-440d-ae76-8e3c8b131897}
directReceiver消費者收到消息 : {datetime=1624238545911, data=hello,i am direct msg!, id=7894f42e-357e-4996-84ed-88a51d283c66}
directAnotherReceiver消費者收到消息 : {datetime=1624238546260, data=hello,i am direct msg!, id=b6bb6399-7c59-42eb-a3da-aea72f75541c}
directReceiver消費者收到消息 : {datetime=1624238546605, data=hello,i am direct msg!, id=7c93063b-c195-467c-9ad9-d244c68d422d}
directAnotherReceiver消費者收到消息 : {datetime=1624238546930, data=hello,i am direct msg!, id=90405b7c-ae15-4456-8b07-5ec4ed78a84c}
directReceiver消費者收到消息 : {datetime=1624238548702, data=hello,i am direct msg!, id=63583d09-ed93-474c-9a69-264a152fca03}
directAnotherReceiver消費者收到消息 : {datetime=1624238549108, data=hello,i am direct msg!, id=55328371-eb0f-4341-9659-fd5256a3adc7}
directReceiver消費者收到消息 : {datetime=1624238549423, data=hello,i am direct msg!, id=c50faa16-885d-4855-9e92-9463cd21da2f}
directAnotherReceiver消費者收到消息 : {datetime=1624238570665, data=hello,i am direct msg!, id=1dad00f5-6bb8-4add-be9e-340e2f77a008}
directReceiver消費者收到消息 : {datetime=1624238571162, data=hello,i am direct msg!, id=5d604580-5f2d-430f-a11d-540aa3a9f263}
directAnotherReceiver消費者收到消息 : {datetime=1624238571909, data=hello,i am direct msg!, id=b0ae512e-153a-4e3a-a3c4-0581ac4d9c08}
directReceiver消費者收到消息 : {datetime=1624238572399, data=hello,i am direct msg!, id=cee8755d-80ad-4185-87b6-c015cb70126c}
directAnotherReceiver消費者收到消息 : {datetime=1624238573032, data=hello,i am direct msg!, id=3d0153c5-a7ac-4820-8884-6e53ad3a80e4}
directReceiver消費者收到消息 : {datetime=1624238573615, data=hello,i am direct msg!, id=fc6f4f8e-522c-4d47-9557-5ca7622e2567}
directAnotherReceiver消費者收到消息 : {datetime=1624238574171, data=hello,i am direct msg!, id=7dc9355f-e6fa-4a72-8f11-0b7ed5f3820f}
延時消費
延時隊列需要在 rabbitmq 安裝一個插件
https://www.rabbitmq.com/community-plugins.html
否則會報錯誤:沒有找到對應x-delayed-message的exchange type
下載後放到 rabbitmq 安裝目錄的 plugins 目錄下,並執行命令如下:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
安裝成功後如下圖,然後需要重啓 rabbitmq
聲明延時隊列
@Configuration
public class DelayRabbitConfig {
/**
* 延時隊列交換機
* 注意這裏的交換機類型:CustomExchange
* @return
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");//直連型
//屬性參數 交換機名稱 交換機類型 是否持久化 是否自動刪除 配置參數
return new CustomExchange("delay_exchange11", "x-delayed-message", true, false, args);
}
/**
* 延時隊列
*
* @return
*/
@Bean
public Queue delayQueue() {
//屬性參數 隊列名稱 是否持久化
return new Queue("delay_queue11", true);
}
/**
* 綁定交換機
*
* @return
*/
@Bean
public Binding bindDelay() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("abc").noargs();
}
}
發送消息
//延遲隊列
@GetMapping("/sendDelayMsg")
public String sendDelayMsg() {
Map<String,Object> map=new HashMap<String,Object>();
map.put("id",UUID.randomUUID().toString());
map.put("data","hello,i am sendDelayMsg msg!");
map.put("datetime",System.currentTimeMillis());
//交換機 路由 消息(發送消息的時候不需要管隊列,因爲隊列已經在DirectRabbitConfig中配置了,隊列應該是消費者關心的事情)
rabbitTemplate.convertAndSend("delay_exchange11", "abc", map,message -> {
//配置消息的過期時間
message.getMessageProperties().setDelay(5000);//延遲5秒
return message;
});
return "ok";
}
接收消息
@Slf4j
@Component
@RabbitListener(queues = "delay_queue11")//監聽的隊列名稱 delay_queue,不需要管路由和交換機,因爲這些是生產者管理的事情。消費者只需要關心隊列即可
public class DelayReceiver {
@RabbitHandler
public void handler(Map testMessage) {
System.out.println("DelayReceiver消費者收到消息 : " + testMessage.toString());
for (Object item : testMessage.keySet()) {
log.info("item:{}-->value:{}",item,testMessage.get(item));
}
log.info("");
}
}
注意:需要將 Mandatory=false,否則將報錯路由找不到
文章已經同步更新到 Java 實驗室官方站點:
https://javawu.com/archives/2919
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/JOE_VbrI2HUXVf8QEE5a9A