RabbitMQ 如何實現 2 小時未支付關閉訂單?

場景

開發中經常需要用到定時任務,對於商城來說,定時任務尤其多,比如優惠券定時過期、訂單定時關閉、微信支付 2 小時未支付關閉訂單等等,都需要用到定時任務,但是定時任務本身有一個問題,一般來說我們都是通過定時輪詢查詢數據庫來判斷是否有任務需要執行,也就是說不管怎麼樣,我們需要先查詢數據庫,而且有些任務對時間準確要求比較高的,需要每秒查詢一次,對於系統小倒是無所謂,如果系統本身就大而且數據也多的情況下,這就不大現實了,所以需要其他方式的,當然實現的方式有多種多樣的,比如 Redis 實現定時隊列、基於優先級隊列的 JDK 延遲隊列、時間輪等。因爲我們項目中本身就使用到了 Rabbitmq,所以基於方便開發和維護的原則,我們使用了 Rabbitmq 延遲隊列來實現定時任務, 不知道 rabbitmq 是什麼的和不知道 springboot 怎麼集成 Rabbitmq 的可以查看我之前的文章 Spring boot 集成 RabbitMQ

Rabbitmq 延遲隊列

Rabbitmq 本身是沒有延遲隊列的,只能通過 Rabbitmq 本身隊列的特性來實現,想要 Rabbitmq 實現延遲隊列,需要使用 Rabbitmq 的死信交換機(Exchange)和消息的存活時間 TTL(Time To Live)

死信交換機

一個消息在滿足如下條件下,會進死信交換機,記住這裏是交換機而不是隊列,一個交換機可以對應很多隊列。

  1. 一個消息被 Consumer 拒收了,並且 reject 方法的參數裏 requeue 是 false。也就是說不會被再次放在隊列裏,被其他消費者使用。

  2. 上面的消息的 TTL 到了,消息過期了。

  3. 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。

== 死信交換機就是普通的交換機 ==,只是因爲我們把過期的消息扔進去,所以叫死信交換機,並不是說死信交換機是某種特定的交換機

消息 TTL(消息存活時間)

消息的 TTL 就是消息的存活時間。RabbitMQ 可以對隊列和消息分別設置 TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認爲這個消息就死了,稱之爲死信。如果隊列設置了,消息也設置了,那麼會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這裏單講單個消息的 TTL,因爲它纔是實現延遲任務的關鍵。

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange""queue-key", properties, messageBodyBytes);

可以通過設置消息的 expiration 字段或者 x-message-ttl 屬性來設置時間,兩者是一樣的效果。只是 expiration 字段是字符串參數,所以要寫個 int 類型的字符串:
當上面的消息扔到隊列中後,過了 60 秒,如果沒有被消費,它就死了。不會被消費者消費到。這個消息後面的,沒有 “死掉” 的消息對頂上來,被消費者消費。死信在隊列中並不會被刪除和釋放,它會被統計到隊列的消息數中去

處理流程圖

創建交換機(Exchanges)和隊列(Queues)

創建死信交換機

如圖所示,就是創建一個普通的交換機,這裏爲了方便區分,把交換機的名字取爲:delay

創建自動過期消息隊列

這個隊列的主要作用是讓消息定時過期的,比如我們需要 2 小時候關閉訂單,我們就需要把消息放進這個隊列裏面,把消息過期時間設置爲 2 小時

創建一個一個名爲 delay_queue1 的自動過期的隊列,當然圖片上面的參數並不會讓消息自動過期,因爲我們並沒有設置 x-message-ttl 參數,如果整個隊列的消息有消息都是相同的,可以設置,這裏爲了靈活,所以並沒有設置,另外兩個參數 x-dead-letter-exchange 代表消息過期後,消息要進入的交換機,這裏配置的是 delay,也就是死信交換機,x-dead-letter-routing-key 是配置消息過期後,進入死信交換機的 routing-key, 跟發送消息的 routing-key 一個道理,根據這個 key 將消息放入不同的隊列

創建消息處理隊列

這個隊列纔是真正處理消息的隊列,所有進入這個隊列的消息都會被處理

消息隊列的名字爲 delay_queue2

消息隊列綁定到交換機

進入交換機詳情頁面,將創建的 2 個隊列(delay_queue1 和 delay_queue2)綁定到交換機上面

自動過期消息隊列的 routing key 設置爲 delay

綁定 delay_queue2

delay_queue2 的 key 要設置爲創建自動過期的隊列的 x-dead-letter-routing-key 參數,這樣當消息過期的時候就可以自動把消息放入 delay_queue2 這個隊列中了

綁定後的管理頁面如下圖:

當然這個綁定也可以使用代碼來實現,只是爲了直觀表現,所以本文使用的管理平臺來操作

發送消息

String msg = "hello word";
MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("6000");
        messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("delay""delay",message);

主要的代碼就是

messageProperties.setExpiration("6000");

設置了讓消息 6 秒後過期

== 注意:== 因爲要讓消息自動過期,所以 == 一定不能設置 delay_queue1 的監聽,不能讓這個隊列裏面的消息被接受到,否則消息一旦被消費,就不存在過期了 ==

接收消息

接收消息配置好 delay_queue2 的監聽就好了

package wang.raye.rabbitmq.demo1;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DelayQueue {
    /** 消息交換機的名字*/
    public static final String EXCHANGE = "delay";
    /** 隊列key1*/
    public static final String ROUTINGKEY1 = "delay";
    /** 隊列key2*/
    public static final String ROUTINGKEY2 = "delay_key";

    /**
     * 配置鏈接信息
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);

        connectionFactory.setUsername("kberp");
        connectionFactory.setPassword("kberp");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); // 必須要設置
        return connectionFactory;
    }

    /**  
     * 配置消息交換機
     * 針對消費者配置  
        FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念  
        HeadersExchange :通過添加屬性key-value匹配  
        DirectExchange:按照routingkey分發到指定隊列  
        TopicExchange:多關鍵字匹配  
     */  
    @Bean  
    public DirectExchange defaultExchange() {  
        return new DirectExchange(EXCHANGE, true, false);
    } 

    /**
     * 配置消息隊列2
     * 針對消費者配置  
     * @return
     */
    @Bean
    public Queue queue() {  
       return new Queue("delay_queue2"true); //隊列持久  

    }
    /**
     * 將消息隊列2與交換機綁定
     * 針對消費者配置  
     * @return
     */
    @Bean  
    @Autowired
    public Binding binding() {  
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);  
    } 

    /**
     * 接受消息的監聽,這個監聽會接受消息隊列1的消息
     * 針對消費者配置  
     * @return
     */
    @Bean  
    @Autowired
    public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {  
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
        container.setQueues(queue());  
        container.setExposeListenerChannel(true);  
        container.setMaxConcurrentConsumers(1);  
        container.setConcurrentConsumers(1);  
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認  
        container.setMessageListener(new ChannelAwareMessageListener() {

            public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
                byte[] body = message.getBody();  
                System.out.println("delay_queue2 收到消息 : " + new String(body));  
                channel.basicAck(message.getMessageProperties().getDeliveryTag()false); //確認消息成功消費  

            }  

        });  
        return container;  
    }  

}

在消息監聽中處理需要定時處理的任務就好了,因爲 Rabbitmq 能發送消息,所以可以把任務特徵碼發過來,比如關閉訂單就把訂單 id 發過來,這樣就避免了需要查詢一下那些訂單需要關閉而加重 MySQL 負擔了,畢竟一旦訂單量大的話,查詢本身也是一件很費 IO 的事情

總結

基於 Rabbitmq 實現定時任務,就是將消息設置一個過期時間,放入一個沒有讀取的隊列中,讓消息過期後自動轉入另外一個隊列中,監控這個隊列消息的監聽處來處理定時任務具體的操作

作者:RayeWang

來源:blog.csdn.net/wantnrun/article/details/80401641

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/MHLeVoJK9F9P6lbVb3HPOA