RabbitMQ、RocketMQ、Kafka 延遲隊列實現
延遲隊列在實際項目中有非常多的應用場景,最常見的比如訂單未支付,超時取消訂單,在創建訂單的時候發送一條延遲消息,達到延遲時間之後消費者收到消息,如果訂單沒有支付的話,那麼就取消訂單。
那麼,今天我們需要來談的問題就是 RabbitMQ、RocketMQ、Kafka 中分別是怎麼實現延時隊列的,以及他們對應的實現原理是什麼?
RabbitMQ
RabbitMQ 本身並不存在延遲隊列的概念,在 RabbitMQ 中是通過 DLX 死信交換機和 TTL 消息過期來實現延遲隊列的。
TTL(Time to Live)過期時間
有兩種方式可以設置 TTL。
-
1. 通過隊列屬性設置,這樣的話隊列中的所有消息都會擁有相同的過期時間
-
2. 對消息單獨設置過期時間,這樣每條消息的過期時間都可以不同
那麼如果同時設置呢?這樣將會以兩個時間中較小的值爲準。
針對隊列的方式通過參數x-message-ttl
來設置。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
針對消息的方式通過setExpiration
來設置。
AMQP.BasicProperties properties = new AMQP.BasicProperties();
Properties.setDeliveryMode(2);
properties.setExpiration("60000");
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "message".getBytes());
DLX(Dead Letter Exchange)死信交換機
一個消息要成爲死信消息有 3 種情況:
-
1. 消息被拒絕,比如調用
reject
方法,並且需要設置requeue
爲false
-
2. 消息過期
-
3. 隊列達到最大長度
可以通過參數dead-letter-exchange
設置死信交換機,也可以通過參數dead-letter- exchange
指定 RoutingKey(未指定則使用原隊列的 RoutingKey)。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "exchange.dlx");
args.put("x-dead-letter-routing-key", "routingkey");
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
原理
當我們對消息設置了 TTL 和 DLX 之後,當消息正常發送,通過 Exchange 到達 Queue 之後,由於設置了 TTL 過期時間,並且消息沒有被消費(訂閱的是死信隊列),達到過期時間之後,消息就轉移到與之綁定的 DLX 死信隊列之中。
這樣的話,就相當於通過 DLX 和 TTL 間接實現了延遲消息的功能,實際使用中我們可以根據不同的延遲級別綁定設置不同延遲時間的隊列來達到實現不同延遲時間的效果。
RocketMQ
RocketMQ 和 RabbitMQ 不同,它本身就有延遲隊列的功能,但是開源版本只能支持固定延遲時間的消息,不支持任意時間精度的消息(這個好像只有阿里雲版本的可以)。
他的默認時間間隔分爲 18 個級別,基本上也能滿足大部分場景的需要了。
默認延遲級別:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h。
使用起來也非常的簡單,直接通過setDelayTimeLevel
設置延遲級別即可。
setDelayTimeLevel(level)
原理
實現原理說起來比較簡單,Broker 會根據不同的延遲級別創建出多個不同級別的隊列,當我們發送延遲消息的時候,根據不同的延遲級別發送到不同的隊列中,同時在 Broker 內部通過一個定時器去輪詢這些隊列(RocketMQ 會爲每個延遲級別分別創建一個定時任務),如果消息達到發送時間,那麼就直接把消息發送到指 topic 隊列中。
RocketMQ 這種實現方式是放在服務端去做的,同時有個好處就是相同延遲時間的消息是可以保證有序性的。
談到這裏就順便提一下關於消息消費重試的原理,這個本質上來說其實是一樣的,對於消費失敗需要重試的消息實際上都會被丟到延遲隊列的 topic 裏,到期後再轉發到真正的 topic 中。
Kafka
對於 Kafka 來說,原生並不支持延遲隊列的功能,需要我們手動去實現,這裏我根據 RocketMQ 的設計提供一個實現思路。
這個設計,我們也不支持任意時間精度的延遲消息,只支持固定級別的延遲,因爲對於大部分延遲消息的場景來說足夠使用了。
只創建一個 topic,但是針對該 topic 創建 18 個 partition,每個 partition 對應不同的延遲級別,這樣做和 RocketMQ 一樣有個好處就是能達到相同延遲時間的消息達到有序性。
原理
-
• 首先創建一個單獨針對延遲隊列的 topic,同時創建 18 個 partition 針對不同的延遲級別
-
• 發送消息的時候根據延遲參數發送到延遲 topic 對應的 partition,對應的
key
爲延遲時間,同時把原 topic 保存到 header 中
ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("delay_topic", delayPartition, delayTime, data);
producerRecord.headers().add("origin_topic", topic.getBytes(StandardCharsets.UTF_8));
-
• 內嵌的
consumer
單獨設置一個ConsumerGroup
去消費延遲 topic 消息,消費到消息之後如果沒有達到延遲時間那麼就進行pause
,然後seek
到當前ConsumerRecord
的offset
位置,同時使用定時器去輪詢延遲的TopicPartition
,達到延遲時間之後進行resume
-
• 如果達到了延遲時間,那麼就獲取到
header
中的真實 topic ,直接轉發
這裏爲什麼要進行pause
和resume
呢?因爲如果不這樣的話,如果超時未消費達到max.poll.interval.ms
最大時間(默認 300s),那麼將會觸發 Rebalance。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/vkasHD20UoGcISzW-8lVZw