Redis 實現延遲隊列的方案總結
redis 是我們項目開發中常見的技術中間件,它除了可以實現常見的分佈式鎖和分佈式緩存功能之外,還可以幫助我們實現很多的功能,如延遲隊列。下面介紹幾種 redis 常見的實現延遲隊列的方案。
1、通過過期 key 通知實現
實現思路:首先開啓 redis 的 key 過期通知,然後在業務中給 key 設置過期時間,到了過期時間後 redis 會自動的將過期的 key 消息推送給監聽者,從而實現延遲任務。
核心的代碼實現:
#1、開始redis的過期通知
notify-keyspace-events Ex
#2、監聽redis的過期key
@Component
@Slf4j
public class RedisExpireKeyService extends
KeyExpirationEventMessageListener {
/**
* Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
*
* @param listenerContainer must not be {@literal null}.
*/
public RedisExpireKeyService(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 監聽過期的key
*
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String expireKey = message.toString();
//執行具體的業務
System.out.println("監聽到key=" + expireKey + ",已經過期");
}
}
生產環境是不推薦使用此方案,原因 Redis 的過期策略採用的是惰性刪除和定期刪除相結合的方式,redis 並不保證 key 在過期時會被立即刪除操作,此方案適用於平時自己項目練習的時候使用。
2、通過 Zset 數據類型 + 定時任務實現
實現思路:ZSet 是一種有序集合類型,它可以存儲不重複的元素,並且給每個元素賦予一個 double 類型的排序權重值 (score),所以可以將元素的過期時間作爲分值,通過定時任務掃描的方式判斷是否達到過期時間,從而實現延遲隊列。
核心的代碼實現:
#使用xxl-job
@JobName("consumerTaskJob")
public void consumerTaskJob() {
String expireKey = "ExPIRE_KEY";
try {
//獲取當前時間
double currentTime = System.currentTimeMillis();
//獲取超時的數據
Set<String> expiredMemberSet = redisTemplate.opsForZSet().rangeByScore(expireKey, Double.MIN_VALUE, currentTime);
//過期key
for (String expiredMember : expiredMemberSet) {
//todo 做實際的延遲任務
//從ZSet中移除數據
redisTemplate.opsForZSet().remove(expireKey, expiredMember);
}
} catch (Exception e) {
log.error("數據處理失敗",e);
}
}
Zset + 定時任務的實現延遲任務的方式雖然比監聽過期 key 方案合理一些,但是它還是存在一定的缺陷,如無重試機制、延遲時間固定化(依賴定時任務的執行時間)、不適用於大規模的延遲任務。
3、Redisson 實現延遲隊列
Redisson 是一個操作 Redis 的 Java 客戶框架,它提供了 RDelayedQueue 接口和 RQueue 接口可以實現延遲隊列(Redisson 提供的延遲隊列底層也是基於 Zset 數據結構實現的)。
核心的代碼實現:
#1、添加依賴
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.0</version>
</dependency>
#2、添加數據到隊列中
//創建RedissonClient實例
RedissonClient redissonClient = Redisson.create();
//創建阻塞隊列
RBlockingDeque<String> queue = redissonClient.getBlockingDeque("delayQueue");
//創建延遲隊列並關聯到阻塞隊列
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(queue);
//添加延遲任務
delayedQueue.offer("Task1", 5000, TimeUnit.MILLISECONDS);
#3、消費數據
while (true) {
try {
//獲取並移除隊首元素,如果隊列爲空,則阻塞等待
String task = queue.take();
System.out.println("Task: " + task);
} catch (Exception e) {
log.error("消費失敗",e);
}
}
總結:Redis 實現的延遲隊列適用於處理一些比較簡單的業務,如發送郵件、發送通知等,對於複雜的業務不適用於 Redis 的延遲任務方案。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/2JhVLLsUU8vCc3cRr83jyw