Redis 實現延遲隊列的方案總結

    redis 是我們項目開發中常見的技術中間件,它除了可以實現常見的分佈式鎖和分佈式緩存功能之外,還可以幫助我們實現很多的功能,如延遲隊列。下面介紹幾種 redis 常見的實現延遲隊列的方案。

1、通過過期 key 通知實現

    實現思路:首先開啓 redis 的 key 過期通知,然後在業務中給 key 設置過期時間,到了過期時間後 redis 會自動的將過期的 key 消息推送給監聽者,從而實現延遲任務。

核心的代碼實現:

#1、開始redis的過期通知
notify-keyspace-events Ex
#2、監聽redis的過期key
@Component
@Slf4j
public class RedisExpireKeyService extends 
                  KeyExpirationEventMessageListener {
    /**
     * Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
     *
     * @param listenerContainer must not be {@literal null}.
     */
    public RedisExpireKeyService(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    /**
     * 監聽過期的key
     *
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expireKey = message.toString();
        //執行具體的業務
        System.out.println("監聽到key=" + expireKey + ",已經過期");
    }
}

    生產環境是不推薦使用此方案,原因 Redis 的過期策略採用的是惰性刪除和定期刪除相結合的方式,redis 並不保證 key 在過期時會被立即刪除操作,此方案適用於平時自己項目練習的時候使用。

2、通過 Zset 數據類型 + 定時任務實現

    實現思路:ZSet 是一種有序集合類型,它可以存儲不重複的元素,並且給每個元素賦予一個 double 類型的排序權重值 (score),所以可以將元素的過期時間作爲分值,通過定時任務掃描的方式判斷是否達到過期時間,從而實現延遲隊列。

    核心的代碼實現:

#使用xxl-job
@JobName("consumerTaskJob")
public void consumerTaskJob() {
   String expireKey = "ExPIRE_KEY";
   try {
       //獲取當前時間
        double currentTime = System.currentTimeMillis();
        //獲取超時的數據
        Set<String> expiredMemberSet = redisTemplate.opsForZSet().rangeByScore(expireKey, Double.MIN_VALUE, currentTime);
        //過期key
        for (String expiredMember : expiredMemberSet) {
            //todo 做實際的延遲任務
            //從ZSet中移除數據
            redisTemplate.opsForZSet().remove(expireKey, expiredMember);
        } 
       } catch (Exception e) {
          log.error("數據處理失敗",e);
       }
    }

      Zset + 定時任務的實現延遲任務的方式雖然比監聽過期 key 方案合理一些,但是它還是存在一定的缺陷,如無重試機制、延遲時間固定化(依賴定時任務的執行時間)、不適用於大規模的延遲任務。

3、Redisson 實現延遲隊列

    Redisson 是一個操作 Redis 的 Java 客戶框架,它提供了 RDelayedQueue 接口和 RQueue 接口可以實現延遲隊列(Redisson 提供的延遲隊列底層也是基於 Zset 數據結構實現的)。

    核心的代碼實現:

#1、添加依賴
<dependency>
 <groupId>org.redisson</groupId>
 <artifactId>redisson</artifactId>
 <version>3.16.0</version> 
</dependency>
#2、添加數據到隊列中
//創建RedissonClient實例
RedissonClient redissonClient = Redisson.create();
//創建阻塞隊列
RBlockingDeque<String> queue = redissonClient.getBlockingDeque("delayQueue");
//創建延遲隊列並關聯到阻塞隊列
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(queue);
//添加延遲任務
delayedQueue.offer("Task1", 5000, TimeUnit.MILLISECONDS);
#3、消費數據
while (true) {
    try {
          //獲取並移除隊首元素,如果隊列爲空,則阻塞等待
          String task = queue.take();
          System.out.println("Task: " + task);
       } catch (Exception e) {
          log.error("消費失敗",e);
     }
}

總結:Redis 實現的延遲隊列適用於處理一些比較簡單的業務,如發送郵件、發送通知等,對於複雜的業務不適用於 Redis 的延遲任務方案。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/2JhVLLsUU8vCc3cRr83jyw