數十萬定時任務,如何高效觸發定時和超時

項目產品中,大家都會有 "定時任務" 和 "定時超時" 的需求,初始階段,我們基本都是用少數的一些 timer,即使是任務量越來越大的時候,我們就難免維護着大量的 timer,或者進行了大量低效的掃描。

定時任務使用場景:
當訂單一直處於未支付狀態時,如何及時的關閉訂單 (已經使用)

如何定期檢查處於退款狀態的訂單是否已經退款成功 (後期重構使用)

設計方案:

咱們公司現階段就是使用的這套方法:

  1. 新增一個 job,會 job_pool 中插入一條數據,記錄了業務方消費方。也會在 bucket 插入一條記錄,記錄執行的時間戳

  2. 搬運線程會去 bucket 中查找哪些執行時間戳的 RunTimeMillis 比現在的時間小,將這些記錄全部刪除;同時會解析出每個任務的 Topic 是什麼,然後將這些任務 PUSH 到 TOPIC 對應的列表 queue 中

3 每個 topic 的 list 都會有一個監聽線程去批量獲取 list 中的待消費數據,獲取到的數據全部扔給這個 topic 的消費線程池

  1. 消費線程池執行會去 job_pool 查找數據結構,返回給回調結構,執行回調方法。

待優化的內容:

  1. 目前只有一個 Queue 隊列存放消息,當需要消費的消息大量堆積後,會影響消息通知的時效。改進的辦法是,開啓多個 Queue,進行消息路由,再開啓多個消費線程進行消費,提供吞吐量

  2. 消息沒有進行持久化,存在風險,後續會將消息持久化到 MongoDB 中

一般來說還有什麼其他方法實現這類需求呢?

“輪詢掃描法”

  1. 用一個 Map<uid, last_packet_time> 來記錄每一個 uid 最近一次請求時間 last_packet_time

  2. 當某個用戶 uid 有請求包來到,實時更新這個 Map

  3. 啓動一個 timer,當 Map 中不爲空時,輪詢掃描這個 Map,看每個 uid 的 last_packet_time 是否超過 30s,如果超過則進行超時處理

“多 timer 觸發法”

  1. 用一個 Map<uid, last_packet_time> 來記錄每一個 uid 最近一次請求時間 last_packet_time

  2. 當某個用戶 uid 有請求包來到,實時更新這個 Map,並同時對這個 uid 請求包啓動一個 timer,30s 之後觸發

  3. 每個 uid 請求包對應的 timer 觸發後,看 Map 中,查看這個 uid 的 last_packet_time 是否超過 30s,如果超過則進行超時處理

方案一:只啓動一個 timer,但需要輪詢,效率較低

方案二:不需要輪詢,但每個請求包要啓動一個 timer,比較耗資源

ZSet(有序集合)數據結構來實現

  1. 創建 ZSet:首先,你需要創建一個 ZSet 數據結構,其中每個訂單將作爲一個成員,其分數將表示訂單的創建時間戳。你可以使用 Redis 等支持 ZSet 的數據庫來實現。

  2. 添加訂單:每當用戶創建新訂單時,將訂單添加到 ZSet 中,其中成員是訂單 ID,分數是訂單的創建時間戳。

  3. 定時檢查訂單:定期(例如,每分鐘)執行一個程序或定時任務來檢查 ZSet 中的訂單。你可以使用程序來查詢 ZSet,找到創建時間超過一定時間閾值的訂單,表示它們長時間未支付。

  4. 取消訂單:對於那些長時間未支付的訂單,可以將其從 ZSet 中刪除,並執行取消訂單的操作(例如,將訂單狀態設置爲 "已取消")。

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import java.util.Set;
public class OrderCancellationSystem {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost"); // 連接到本地Redis服務器
        // 模擬添加訂單
        addOrder(jedis, "Order1");
        addOrder(jedis, "Order2");
        // 定時任務,每分鐘檢查訂單並自動取消
        while (true) {
            cancelLongUnpaidOrders(jedis);
            try {
                Thread.sleep(60000); // 等待一分鐘再次檢查
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void addOrder(Jedis jedis, String orderId) {
        long currentTime = System.currentTimeMillis();
        jedis.zadd("orders", currentTime, orderId);
    }
    public static void cancelOrder(String orderId) {
        // 執行取消訂單操作,例如更新訂單狀態
        System.out.println("Cancelling order: " + orderId);
    }
    public static void cancelLongUnpaidOrders(Jedis jedis) {
        long expirationTime = System.currentTimeMillis() - 3600 * 1000; // 60分鐘前的時間戳
        Set<Tuple> longUnpaidOrders = jedis.zrangeByScoreWithScores("orders", "-inf", String.valueOf(expirationTime));
        for (Tuple order : longUnpaidOrders) {
            String orderId = order.getElement();
            cancelOrder(orderId);
            jedis.zrem("orders", orderId); // 從ZSet中刪除已取消的訂單
        }
    }
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Et8zdye295_hIZSzi-mnmg