關於 rabbitmq,你想知道的都在這裏(下)

持久化

我們將消息發送到 mq 消息隊列了,如果 mq 服務器掛掉了,在 mq 中未消費的消息是否就丟失了,重啓 mq 服務器未消費的是否還在?我們可以將 comsumer 關掉,然後只保留 producer 發送消息,然後看看 mq 的管理端,發現消息並沒人消費。此時,將 mq 服務重啓,重啓後看消息是否還在?

奇蹟般的消息都還在。上節課我們也沒配置什麼東西,怎麼消息默認就持久化了嗎?

@Bean
public Queue DirectQueue() {
    return new Queue(DirectRabbitConfig.directQueue);
}

那是因爲我們在定義隊列的時候使用的構造方法底層實現的時候durable參數設置爲 true(持久化),因此消息當 mq 掛掉時,未消費的消息重啓後還會在。

大家可以設置爲 false 試試,但是需要注意我們未設置virtual-host,默認的使用的是/,而這個host不允許設置爲false。我們可以在rabbitmq管理控制檯增加一個host如下操作:

點擊增加一個 host 名稱爲 myVirtualHost,並在 yml 文件中增加一個配置:

這樣就可以設置 queue 的 durable 爲 false,將消費者關閉(模擬消費者異常),然後生產消息,在 mq 的控制檯看到有未消費的消息,重啓 mq,消息丟失~~~

消息確認

發送端確認

增加配置
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    //設置開啓Mandatory,才能觸發回調函數,無論消息推送結果怎麼樣都強制調用回調函數
    rabbitTemplate.setMandatory(true);

    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("confirm");
            System.out.println("correlationData:"+correlationData);
            System.out.println("ack:"+ack);
            System.out.println("cause:"+cause);
            System.out.println();
        }
    });

    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            System.out.println("returnedMessage");
            System.out.println("returnedMessage.getMessage()-->"+returnedMessage.getMessage());
            System.out.println("returnedMessage.getReplyCode()-->"+returnedMessage.getReplyCode());
            System.out.println("returnedMessage.getReplyText()-->"+returnedMessage.getReplyText());
            System.out.println("returnedMessage.getExchange()-->"+returnedMessage.getExchange());
            System.out.println("returnedMessage.getRoutingKey()-->"+returnedMessage.getRoutingKey());
            System.out.println();
        }
    });

    return rabbitTemplate;
}
yml 開始配置:

eJlXTm

注:√代表存在,× 代表不存在

消費端確認

basicAck,表示成功確認,使用此回執方法後,消息會被 rabbitmq broker 刪除。

void basicAck(long deliveryTag, boolean multiple)

basicNack,表示失敗確認,一般在消費消息業務異常時用到此方法,可以將消息重新投遞入隊列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

消息重新投放,將投放到隊列的頭部,也就是馬上會再被消費,如果業務不能處理成功,將陷入死循環。能瞬間將 cpu 撐滿。良好實踐是,先將這條消息設置爲 basicAck,然後再投遞另外一條同樣的消息,這樣消息就進入了消息隊列尾部,但是也需要處理消息一直消費不成功,當消費多少次都是失敗後進入特有的失敗隊列。

basicReject,拒絕消息,與 basicNack 區別在於不能進行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)
@Slf4j
@Component
@RabbitListener(queues = "directQueue",group = "DirectReceiver")//監聽的隊列名稱 directQueue,不需要管路由和交換機,因爲這些是生產者管理的事情。消費者只需要關心隊列即可
public class DirectReceiver {
//    @RabbitHandler
//    public void handler(Map testMessage) {
//        System.out.println("directReceiver消費者收到消息  : " + testMessage.toString());
//        for (Object item : testMessage.keySet()) {
//            log.info("item:{}-->value:{}",item,testMessage.get(item));
//        }
//        log.info("");
//    }

    @RabbitHandler
    public void processHandler(Map testMessage,Message message,Channel channel) throws IOException {
        try{
            System.out.println("directReceiver消費者收到消息  : " + testMessage.toString());
            log.info("channel:{}",channel);
            log.info("message:{}",message);
            int i=1/0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag()false);
        }catch (Exception e){
            //是否是重新投遞的消息
            if(message.getMessageProperties().getRedelivered()){
                log.error("消息已重複處理,不再消費");
                channel.basicReject(message.getMessageProperties().getDeliveryTag()false);
            }else{
                log.error("消息即將再次返回隊列處理");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
            }
        }
    }
}

一個消息兩個訂閱

如果一個消息隊列被兩個消息消費者訂閱,那兩個訂閱者都可以收到消息嗎?還是搶奪式的?我們 copy 一份 directReceiver 如下:

@Component
@RabbitListener(queues = "directQueue")//監聽的隊列名稱 directQueue,不需要管路由和交換機,因爲這些是生產者管理的事情。消費者只需要關心隊列即可
public class DirectAnotherReceiver {
    @RabbitHandler
    public void handler(Map testMessage) {
        System.out.println("directAnotherReceiver消費者收到消息  : " + testMessage.toString());
    }
}

然後用 postman 發消息,發現每次消息只有一個 listener 消費。且是輪流消費的。即使你啓動多個服務也是隻有一個消費者消費。這樣可以在部署多個實例的時候保證消息不會被重複消費。

directAnotherReceiver消費者收到消息  : {datetime=1624238518016, data=hello,i am direct msg!, id=eb99a77f-0657-4f98-bee8-4815f88436d9}
directReceiver消費者收到消息  : {datetime=1624238539087, data=hello,i am direct msg!, id=2da0d4d3-1707-43f3-96f9-1790aa411be9}
directAnotherReceiver消費者收到消息  : {datetime=1624238540193, data=hello,i am direct msg!, id=5cfbfba5-d577-4992-82ec-29e3504f40b9}
directReceiver消費者收到消息  : {datetime=1624238541016, data=hello,i am direct msg!, id=9d687827-29f9-459d-9da8-f79f775becf9}
directAnotherReceiver消費者收到消息  : {datetime=1624238541879, data=hello,i am direct msg!, id=cddee0cc-0a4e-4448-ae1a-be02fa8530a3}
directReceiver消費者收到消息  : {datetime=1624238542683, data=hello,i am direct msg!, id=f1bb5c39-ffc4-4bdd-a34a-c0c394bebfe0}
directAnotherReceiver消費者收到消息  : {datetime=1624238543150, data=hello,i am direct msg!, id=48b6449f-0720-4321-92b0-e954728d258d}
directReceiver消費者收到消息  : {datetime=1624238545120, data=hello,i am direct msg!, id=120ef701-b762-46a1-a36b-39537b801a4a}
directAnotherReceiver消費者收到消息  : {datetime=1624238545586, data=hello,i am direct msg!, id=6a3df339-53d8-440d-ae76-8e3c8b131897}
directReceiver消費者收到消息  : {datetime=1624238545911, data=hello,i am direct msg!, id=7894f42e-357e-4996-84ed-88a51d283c66}
directAnotherReceiver消費者收到消息  : {datetime=1624238546260, data=hello,i am direct msg!, id=b6bb6399-7c59-42eb-a3da-aea72f75541c}
directReceiver消費者收到消息  : {datetime=1624238546605, data=hello,i am direct msg!, id=7c93063b-c195-467c-9ad9-d244c68d422d}
directAnotherReceiver消費者收到消息  : {datetime=1624238546930, data=hello,i am direct msg!, id=90405b7c-ae15-4456-8b07-5ec4ed78a84c}
directReceiver消費者收到消息  : {datetime=1624238548702, data=hello,i am direct msg!, id=63583d09-ed93-474c-9a69-264a152fca03}
directAnotherReceiver消費者收到消息  : {datetime=1624238549108, data=hello,i am direct msg!, id=55328371-eb0f-4341-9659-fd5256a3adc7}
directReceiver消費者收到消息  : {datetime=1624238549423, data=hello,i am direct msg!, id=c50faa16-885d-4855-9e92-9463cd21da2f}
directAnotherReceiver消費者收到消息  : {datetime=1624238570665, data=hello,i am direct msg!, id=1dad00f5-6bb8-4add-be9e-340e2f77a008}
directReceiver消費者收到消息  : {datetime=1624238571162, data=hello,i am direct msg!, id=5d604580-5f2d-430f-a11d-540aa3a9f263}
directAnotherReceiver消費者收到消息  : {datetime=1624238571909, data=hello,i am direct msg!, id=b0ae512e-153a-4e3a-a3c4-0581ac4d9c08}
directReceiver消費者收到消息  : {datetime=1624238572399, data=hello,i am direct msg!, id=cee8755d-80ad-4185-87b6-c015cb70126c}
directAnotherReceiver消費者收到消息  : {datetime=1624238573032, data=hello,i am direct msg!, id=3d0153c5-a7ac-4820-8884-6e53ad3a80e4}
directReceiver消費者收到消息  : {datetime=1624238573615, data=hello,i am direct msg!, id=fc6f4f8e-522c-4d47-9557-5ca7622e2567}
directAnotherReceiver消費者收到消息  : {datetime=1624238574171, data=hello,i am direct msg!, id=7dc9355f-e6fa-4a72-8f11-0b7ed5f3820f}

延時消費

延時隊列需要在 rabbitmq 安裝一個插件

https://www.rabbitmq.com/community-plugins.html

否則會報錯誤:沒有找到對應x-delayed-message的exchange type

下載後放到 rabbitmq 安裝目錄的 plugins 目錄下,並執行命令如下:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

安裝成功後如下圖,然後需要重啓 rabbitmq

聲明延時隊列

@Configuration
public class DelayRabbitConfig {
    /**
     * 延時隊列交換機
     * 注意這裏的交換機類型:CustomExchange
     * @return
     */
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type""direct");//直連型
        //屬性參數 交換機名稱 交換機類型 是否持久化 是否自動刪除 配置參數
        return new CustomExchange("delay_exchange11""x-delayed-message", true, false, args);
    }

    /**
     * 延時隊列
     *
     * @return
     */
    @Bean
    public Queue delayQueue() {
        //屬性參數 隊列名稱 是否持久化
        return new Queue("delay_queue11"true);
    }

    /**
     * 綁定交換機
     *
     * @return
     */
    @Bean
    public Binding bindDelay() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("abc").noargs();
    }
}
發送消息
//延遲隊列
@GetMapping("/sendDelayMsg")
public String sendDelayMsg() {
    Map<String,Object> map=new HashMap<String,Object>();
    map.put("id",UUID.randomUUID().toString());
    map.put("data","hello,i am sendDelayMsg msg!");
    map.put("datetime",System.currentTimeMillis());
    //交換機 路由 消息(發送消息的時候不需要管隊列,因爲隊列已經在DirectRabbitConfig中配置了,隊列應該是消費者關心的事情)
    rabbitTemplate.convertAndSend("delay_exchange11""abc", map,message -> {
        //配置消息的過期時間
        message.getMessageProperties().setDelay(5000);//延遲5秒
        return message;
    });

    return "ok";
}
接收消息
@Slf4j
@Component
@RabbitListener(queues = "delay_queue11")//監聽的隊列名稱 delay_queue,不需要管路由和交換機,因爲這些是生產者管理的事情。消費者只需要關心隊列即可
public class DelayReceiver {
    @RabbitHandler
    public void handler(Map testMessage) {
        System.out.println("DelayReceiver消費者收到消息  : " + testMessage.toString());
        for (Object item : testMessage.keySet()) {
            log.info("item:{}-->value:{}",item,testMessage.get(item));
        }
        log.info("");
    }
}

注意:需要將 Mandatory=false,否則將報錯路由找不到

文章已經同步更新到 Java 實驗室官方站點:

https://javawu.com/archives/2919

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/JOE_VbrI2HUXVf8QEE5a9A