面試官:怎麼不用定時任務實現關閉訂單?

在電商、支付等領域,往往會有這樣的場景,用戶下單後放棄支付了,那這筆訂單會在指定的時間段後進行關閉操作,細心的你一定發現了像某寶、某東都有這樣的邏輯,而且時間很準確,誤差在 1s 內;那他們是怎麼實現的呢?

一般的做法有如下幾種:

一、定時任務關閉訂單(最 low)

一般情況下,最不推薦的方式就是關單方式就是定時任務方式,原因我們可以看下面的圖來說明。

我們假設,關單時間爲下單後 10 分鐘,定時任務間隔也是 10 分鐘;通過上圖我們看出,如果在第 1 分鐘下單,在第 20 分鐘的時候才能被掃描到執行關單操作,這樣誤差達到 10 分鐘,這在很多場景下是不可接受的,另外需要頻繁掃描主訂單號造成網絡 IO 和磁盤 IO 的消耗,對實時交易造成一定的衝擊,所以 PASS。

二、rocketmq 延遲隊列方式

延遲消息生產者把消息發送到消息服務器後,並不希望被立即消費,而是等待指定時間後纔可以被消費者消費,這類消息通常被稱爲延遲消息。在 RocketMQ 開源版本中,支持延遲消息,但是不支持任意時間精度的延遲消息,只支持特定級別的延遲消息。消息延遲級別分別爲 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共 18 個級別。

發送延遲消息(生產者)

/**
 * 推送延遲消息
 * @param topic 
 * @param body 
 * @param producerGroup 
 * @return boolean
 */
public boolean sendMessage(String topic, String body, String producerGroup) {
    try {
        Message recordMsg = new Message(topic, body.getBytes());
        producer.setProducerGroup(producerGroup);

        //設置消息延遲級別,我這裏設置14,對應就是延時10分鐘
        // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        recordMsg.setDelayTimeLevel(14);
        // 發送消息到一個Broker
        SendResult sendResult = producer.send(recordMsg);
        // 通過sendResult返回消息是否成功送達
        log.info("發送延遲消息結果:======sendResult:{}", sendResult);
        DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.info("發送時間:{}", format.format(new Date()));

        return true;
    } catch (Exception e) {
        e.printStackTrace();
        log.error("延遲消息隊列推送消息異常:{},推送內容:{}", e.getMessage(), body);
    }
    return false;
}

消費延遲消息(消費者)

/**
 * 接收延遲消息
 * 
 * @param topic
 * @param consumerGroup
 * @param messageHandler
 */
public void messageListener(String topic, String consumerGroup, MessageListenerConcurrently messageHandler) {
    ThreadPoolUtil.execute(() -> {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
            consumer.setConsumerGroup(consumerGroup);
            consumer.setVipChannelEnabled(false);
            consumer.setNamesrvAddr(address);
            //設置消費者拉取消息的策略,*表示消費該topic下的所有消息,也可以指定tag進行消息過濾
            consumer.subscribe(topic, "*");
            //消費者端啓動消息監聽,一旦生產者發送消息被監聽到,就打印消息,和rabbitmq中的handlerDelivery類似
            consumer.registerMessageListener(messageHandler);
            consumer.start();
            log.info("啓動延遲消息隊列監聽成功:" + topic);
        } catch (MQClientException e) {
            log.error("啓動延遲消息隊列監聽失敗:{}", e.getErrorMessage());
            System.exit(1);
        }
    });
}

實現監聽類,處理具體邏輯

/**
 * 延遲消息監聽
 * 
 */
@Component
public class CourseOrderTimeoutListener implements ApplicationListener<ApplicationReadyEvent> {
    @Resource
    private MQUtil mqUtil;
    @Resource
    private CourseOrderTimeoutHandler courseOrderTimeoutHandler;
    @Override
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        // 訂單超時監聽
        mqUtil.messageListener(EnumTopic.ORDER_TIMEOUT, EnumGroup.ORDER_TIMEOUT_GROUP, courseOrderTimeoutHandler);
    }
}

這種方式相比定時任務好了很多,但是有一個致命的缺點,就是延遲等級只有 18 種(商業版本支持自定義時間),如果我們想把關閉訂單時間設置在 15 分鐘該如何處理呢?顯然不夠靈活。

三、rabbitmq 死信隊列的方式

Rabbitmq 本身是沒有延遲隊列的,只能通過 Rabbitmq 本身隊列的特性來實現,想要 Rabbitmq 實現延遲隊列,需要使用 Rabbitmq 的死信交換機(Exchange)和消息的存活時間 TTL(Time To Live)

死信交換機 一個消息在滿足如下條件下,會進死信交換機,記住這裏是交換機而不是隊列,一個交換機可以對應很多隊列。

一個消息被 Consumer 拒收了,並且 reject 方法的參數裏 requeue 是 false。也就是說不會被再次放在隊列裏,被其他消費者使用。上面的消息的 TTL 到了,消息過期了。

隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。死信交換機就是普通的交換機,只是因爲我們把過期的消息扔進去,所以叫死信交換機,並不是說死信交換機是某種特定的交換機

消息 TTL(消息存活時間) 消息的 TTL 就是消息的存活時間。RabbitMQ 可以對隊列和消息分別設置 TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認爲這個消息就死了,稱之爲死信。如果隊列設置了,消息也設置了,那麼會取值較小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這裏單講單個消息的 TTL,因爲它纔是實現延遲任務的關鍵。

byte[] messageBodyBytes = "Hello, world!".getBytes();  
AMQP.BasicProperties properties = new AMQP.BasicProperties();  
properties.setExpiration("60000");  
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);

可以通過設置消息的 expiration 字段或者 x-message-ttl 屬性來設置時間,兩者是一樣的效果。只是 expiration 字段是字符串參數,所以要寫個 int 類型的字符串:當上面的消息扔到隊列中後,過了 60 秒,如果沒有被消費,它就死了。不會被消費者消費到。這個消息後面的,沒有 “死掉” 的消息對頂上來,被消費者消費。死信在隊列中並不會被刪除和釋放,它會被統計到隊列的消息數中去。

處理流程圖

創建交換機(Exchanges)和隊列(Queues)

創建死信交換機

如圖所示,就是創建一個普通的交換機,這裏爲了方便區分,把交換機的名字取爲:delay。

創建自動過期消息隊列 這個隊列的主要作用是讓消息定時過期的,比如我們需要 2 小時候關閉訂單,我們就需要把消息放進這個隊列裏面,把消息過期時間設置爲 2 小時。

創建一個一個名爲 delay_queue1 的自動過期的隊列,當然圖片上面的參數並不會讓消息自動過期,因爲我們並沒有設置 x-message-ttl 參數,如果整個隊列的消息有消息都是相同的,可以設置,這裏爲了靈活,所以並沒有設置,另外兩個參數 x-dead-letter-exchange 代表消息過期後,消息要進入的交換機,這裏配置的是 delay,也就是死信交換機,x-dead-letter-routing-key 是配置消息過期後,進入死信交換機的 routing-key, 跟發送消息的 routing-key 一個道理,根據這個 key 將消息放入不同的隊列。

創建消息處理隊列 這個隊列纔是真正處理消息的隊列,所有進入這個隊列的消息都會被處理。

消息隊列的名字爲 delay_queue2 消息隊列綁定到交換機 進入交換機詳情頁面,將創建的 2 個隊列(delayqueue1 和 delayqueue2)綁定到交換機上面

自動過期消息隊列的 routing key 設置爲 delay 綁定 delayqueue2

delayqueue2 的 key 要設置爲創建自動過期的隊列的 x-dead-letter-routing-key 參數,這樣當消息過期的時候就可以自動把消息放入 delay_queue2 這個隊列中了 綁定後的管理頁面如下圖:

當然這個綁定也可以使用代碼來實現,只是爲了直觀表現,所以本文使用的管理平臺來操作發送消息

String msg = "hello word";  
MessageProperties messageProperties = newMessageProperties();  
messageProperties.setExpiration("6000");
messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
Message message = newMessage(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("delay", "delay",message);

設置了讓消息 6 秒後過期 注意:因爲要讓消息自動過期,所以一定不能設置 delay_queue1 的監聽,不能讓這個隊列裏面的消息被接受到,否則消息一旦被消費,就不存在過期了。

接收消息接收消息配置好 delay_queue2 的監聽就好了。

package wang.raye.rabbitmq.demo1;import org.springframework.amqp.core.AcknowledgeMode;  
import org.springframework.amqp.core.Binding;  
import org.springframework.amqp.core.BindingBuilder;  
import org.springframework.amqp.core.DirectExchange;  
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.core.Queue;  
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;  
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;@ConfigurationpublicclassDelayQueue{  
    /** 消息交換機的名字*/
    publicstaticfinalString EXCHANGE = "delay";    /** 隊列key1*/
    publicstaticfinalString ROUTINGKEY1 = "delay";    /** 隊列key2*/
    publicstaticfinalString ROUTINGKEY2 = "delay_key";    /**
     * 配置鏈接信息
     * @return
     */
    @Bean
    publicConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = newCachingConnectionFactory("120.76.237.8",5672);
        connectionFactory.setUsername("kberp");
        connectionFactory.setPassword("kberp");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); // 必須要設置
        return connectionFactory;
    }    /**  
     * 配置消息交換機
     * 針對消費者配置  
        FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念  
        HeadersExchange :通過添加屬性key-value匹配  
        DirectExchange:按照routingkey分發到指定隊列  
        TopicExchange:多關鍵字匹配  
     */  
    @Bean  
    publicDirectExchange defaultExchange() {  
        returnnewDirectExchange(EXCHANGE, true, false);
    }    /**
     * 配置消息隊列2
     * 針對消費者配置  
     * @return
     */
    @Bean
    publicQueue queue() {  
       returnnewQueue("delay_queue2", true); //隊列持久  
    }    /**
     * 將消息隊列2與交換機綁定
     * 針對消費者配置  
     * @return
     */
    @Bean  
    @Autowired
    publicBinding binding() {  
        returnBindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);  
    }    /**
     * 接受消息的監聽,這個監聽會接受消息隊列1的消息
     * 針對消費者配置  
     * @return
     */
    @Bean  
    @Autowired
    publicSimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {  
        SimpleMessageListenerContainer container = newSimpleMessageListenerContainer(connectionFactory());  
        container.setQueues(queue());  
        container.setExposeListenerChannel(true);  
        container.setMaxConcurrentConsumers(1);  
        container.setConcurrentConsumers(1);  
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認  
        container.setMessageListener(newChannelAwareMessageListener() {            publicvoid onMessage(Message message, com.rabbitmq.client.Channel channel) throwsException{                byte[] body = message.getBody();  
                System.out.println("delay_queue2 收到消息 : "+ newString(body));  
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費  
            }  
        });  
        return container;  
    }  
}

這種方式可以自定義進入死信隊列的時間;是不是很完美,但是有的小夥伴的情況是消息中間件就是 rocketmq,公司也不可能會用商業版,怎麼辦?那就進入下一節

四、時間輪算法

(1) 創建環形隊列,例如可以創建一個包含 3600 個 slot 的環形隊列 (本質是個數組)。

(2) 任務集合,環上每一個 slot 是一個 Set 同時,啓動一個 timer,這個 timer 每隔 1s,在上述環形隊列中移動一格,有一個 Current Index 指針來標識正在檢測的 slot。

Task 結構中有兩個很重要的屬性:(1)Cycle-Num:當 Current Index 第幾圈掃描到這個 Slot 時,執行任務 (2) 訂單號,要關閉的訂單號(也可以是其他信息,比如:是一個基於某個訂單號的任務)。

假設當前 Current Index 指向第 0 格,例如在 3610 秒之後,有一個訂單需要關閉,只需:(1) 計算這個訂單應該放在哪一個 slot,當我們計算的時候現在指向 1,3610 秒之後,應該是第 10 格,所以這個 Task 應該放在第 10 個 slot 的 Set 中 (2) 計算這個 Task 的 Cycle-Num,由於環形隊列是 3600 格 (每秒移動一格,正好 1 小時),這個任務是 3610 秒後執行,所以應該繞 3610/3600=1 圈之後再執行,於是 Cycle-Num=1。

Current Index 不停的移動,每秒移動到一個新 slot,這個 slot 中對應的 Set,每個 Task 看 Cycle-Num 是不是 0:(1)如果不是 0,說明還需要多移動幾圈,將 Cycle-Num 減 1 (2)如果是 0,說明馬上要執行這個關單 Task 了,取出訂單號執行關單 (可以用單獨的線程來執行 Task),並把這個訂單信息從 Set 中刪除即可。(1) 無需再輪詢全部訂單,效率高 (2)一個訂單,任務只執行一次 (3)時效性好,精確到秒(控制 timer 移動頻率可以控制精度)。

五、redis 過期監聽

1. 修改 redis.windows.conf 配置文件中 notify-keyspace-events 的值 默認配置 notify-keyspace-events 的值爲 "" 修改爲 notify-keyspace-events Ex 這樣便開啓了過期事件

2. 創建配置類 RedisListenerConfig(配置 RedisMessageListenerContainer 這個 Bean)

package com.zjt.shop.config;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;import org.springframework.data.redis.serializer.StringRedisSerializer; 
 
@Configurationpublic class RedisListenerConfig { 
    @Autowired
    private RedisTemplate redisTemplate; 
    /**
     * @return
     */
    @Bean
    public RedisTemplate redisTemplateInit() { 
        // key序列化
        redisTemplate.setKeySerializer(new StringRedisSerializer()); 
        //val實例化
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); 
        return redisTemplate;
    } 
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);        return container;
    }
 
}

3. 繼承 KeyExpirationEventMessageListener 創建 redis 過期事件的監聽類

package com.zjt.shop.common.util;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;import com.zjt.shop.modules.order.service.OrderInfoService;import com.zjt.shop.modules.product.entity.OrderInfoEntity;import com.zjt.shop.modules.product.mapper.OrderInfoMapper;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.stereotype.Component; 

@Slf4j@Componentpublic class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { 
    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {        super(listenerContainer);
    } 
    @Autowired
    private OrderInfoMapper orderInfoMapper; 
    /**
     * 針對redis數據失效事件,進行數據處理
     * @param message
     * @param pattern
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {      try {
          String key = message.toString();          //從失效key中篩選代表訂單失效的key
          if (key != null && key.startsWith("order_")) {              //截取訂單號,查詢訂單,如果是未支付狀態則爲-取消訂單
              String orderNo = key.substring(6);
              QueryWrapper<OrderInfoEntity> queryWrapper = new QueryWrapper<>();
              queryWrapper.eq("order_no",orderNo);
              OrderInfoEntity orderInfo = orderInfoMapper.selectOne(queryWrapper);              if (orderInfo != null) {                  if (orderInfo.getOrderState() == 0) {   //待支付
                      orderInfo.setOrderState(4);         //已取消
                      orderInfoMapper.updateById(orderInfo);
                      log.info("訂單號爲【" + orderNo + "】超時未支付-自動修改爲已取消狀態");
                  }
              }
          }
      } catch (Exception e) {
          e.printStackTrace();
          log.error("【修改支付訂單過期狀態異常】:" + e.getMessage());
      }
    }
}

4:測試

通過 redis 客戶端存一個有效時間爲 3s 的訂單:

結果:

總結: 以上方法只是個人對於關單的一些想法,可能有些地方有疏漏,請在公衆號直接留言進行指出,當然如果你有更好的關單方式也可以隨時溝通交流。

作者:程序員阿牛

來源:juejin.cn/post/6987233263660040206

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/K78tXrQvZBl5KdDTEOGO2Q