數十萬定時任務,如何高效觸發定時和超時
項目產品中,大家都會有 "定時任務" 和 "定時超時" 的需求,初始階段,我們基本都是用少數的一些 timer,即使是任務量越來越大的時候,我們就難免維護着大量的 timer,或者進行了大量低效的掃描。
定時任務使用場景:
當訂單一直處於未支付狀態時,如何及時的關閉訂單 (已經使用)
如何定期檢查處於退款狀態的訂單是否已經退款成功 (後期重構使用)
設計方案:
-
整個 Redis 當做消息池,以 KV 形式存儲消息
-
使用 ZSET 做優先隊列,按照 Score 維持優先級
-
使用 LIST 結構,以先進先出的方式消費
-
ZSET 和 LIST 存儲消息地址 (對應消息池的每個 KEY)
-
使用定時器維護路由
-
根據 TTL 規則實現消息延遲
咱們公司現階段就是使用的這套方法:
-
新增一個 job,會 job_pool 中插入一條數據,記錄了業務方消費方。也會在 bucket 插入一條記錄,記錄執行的時間戳
-
搬運線程會去 bucket 中查找哪些執行時間戳的 RunTimeMillis 比現在的時間小,將這些記錄全部刪除;同時會解析出每個任務的 Topic 是什麼,然後將這些任務 PUSH 到 TOPIC 對應的列表 queue 中
3 每個 topic 的 list 都會有一個監聽線程去批量獲取 list 中的待消費數據,獲取到的數據全部扔給這個 topic 的消費線程池
- 消費線程池執行會去 job_pool 查找數據結構,返回給回調結構,執行回調方法。
待優化的內容:
-
目前只有一個 Queue 隊列存放消息,當需要消費的消息大量堆積後,會影響消息通知的時效。改進的辦法是,開啓多個 Queue,進行消息路由,再開啓多個消費線程進行消費,提供吞吐量
-
消息沒有進行持久化,存在風險,後續會將消息持久化到 MongoDB 中
一般來說還有什麼其他方法實現這類需求呢?
“輪詢掃描法”
-
用一個 Map<uid, last_packet_time> 來記錄每一個 uid 最近一次請求時間 last_packet_time
-
當某個用戶 uid 有請求包來到,實時更新這個 Map
-
啓動一個 timer,當 Map 中不爲空時,輪詢掃描這個 Map,看每個 uid 的 last_packet_time 是否超過 30s,如果超過則進行超時處理
“多 timer 觸發法”
-
用一個 Map<uid, last_packet_time> 來記錄每一個 uid 最近一次請求時間 last_packet_time
-
當某個用戶 uid 有請求包來到,實時更新這個 Map,並同時對這個 uid 請求包啓動一個 timer,30s 之後觸發
-
每個 uid 請求包對應的 timer 觸發後,看 Map 中,查看這個 uid 的 last_packet_time 是否超過 30s,如果超過則進行超時處理
方案一:只啓動一個 timer,但需要輪詢,效率較低
方案二:不需要輪詢,但每個請求包要啓動一個 timer,比較耗資源
ZSet(有序集合)數據結構來實現
-
創建 ZSet:首先,你需要創建一個 ZSet 數據結構,其中每個訂單將作爲一個成員,其分數將表示訂單的創建時間戳。你可以使用 Redis 等支持 ZSet 的數據庫來實現。
-
添加訂單:每當用戶創建新訂單時,將訂單添加到 ZSet 中,其中成員是訂單 ID,分數是訂單的創建時間戳。
-
定時檢查訂單:定期(例如,每分鐘)執行一個程序或定時任務來檢查 ZSet 中的訂單。你可以使用程序來查詢 ZSet,找到創建時間超過一定時間閾值的訂單,表示它們長時間未支付。
-
取消訂單:對於那些長時間未支付的訂單,可以將其從 ZSet 中刪除,並執行取消訂單的操作(例如,將訂單狀態設置爲 "已取消")。
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import java.util.Set;
public class OrderCancellationSystem {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost"); // 連接到本地Redis服務器
// 模擬添加訂單
addOrder(jedis, "Order1");
addOrder(jedis, "Order2");
// 定時任務,每分鐘檢查訂單並自動取消
while (true) {
cancelLongUnpaidOrders(jedis);
try {
Thread.sleep(60000); // 等待一分鐘再次檢查
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void addOrder(Jedis jedis, String orderId) {
long currentTime = System.currentTimeMillis();
jedis.zadd("orders", currentTime, orderId);
}
public static void cancelOrder(String orderId) {
// 執行取消訂單操作,例如更新訂單狀態
System.out.println("Cancelling order: " + orderId);
}
public static void cancelLongUnpaidOrders(Jedis jedis) {
long expirationTime = System.currentTimeMillis() - 3600 * 1000; // 60分鐘前的時間戳
Set<Tuple> longUnpaidOrders = jedis.zrangeByScoreWithScores("orders", "-inf", String.valueOf(expirationTime));
for (Tuple order : longUnpaidOrders) {
String orderId = order.getElement();
cancelOrder(orderId);
jedis.zrem("orders", orderId); // 從ZSet中刪除已取消的訂單
}
}
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Et8zdye295_hIZSzi-mnmg