Apache pulsar 技術系列 -- 消息重推的幾種方式

Apache Pulsar 是一個多租戶、高性能的服務間消息傳輸解決方案,支持多租戶、低延時、讀寫分離、跨地域複製(GEO replication)、快速擴容、靈活容錯等特性。在很多場景下,用戶需要通過 MQ 實現消息的重新推送能力,比如超時重推、處理異常時重推等,本文介紹 Apache Pulsar 提供的幾種消息重推方案。

在 MQ 實際的使用中,用戶消費數據時,可能會遇到消息處理異常或者需要推遲處理的場景,這裏就涉及到消息的重推邏輯,Pulsar 自己提供了消息重推的能力。本文主要介紹 Pulsar 的消息重推機制。

消息獲取(拉取 / 推送)機制

Pulsar 的消費採用了推、拉結合的消息獲取機制,Consumer 獲取消息之前會首先通知 Broker(FLOW 請求),Broker 會根據配置的 ReceiveQueue 大小以及 Consumer 當前可以接收的消息數量來推送消息給 Consumer。

詳細的交互流程如下圖所示:

圖片

  1. Consumer 在創建之後,會以 MaxReceiveQueue 的大小作爲 Permit 值,這個值就是 Consumer 可以緩存的的最大消息條數。

  2. 然後,Consumer 向 Broker 發起 FLOW 請求,攜帶 Permit 信息(Consumer Permit 減少到 0),Broker 接收之後會記錄這個 Permit 作爲 Consumer 的 AvailablePermit,AvailablePermit 決定 Broker 可以向 Consumer 發送數據的數量(實際是在讀取數據時判斷)。

  3. 如果 AvailablePermit > 0, Broker 開始讀取數據(假設有 N 條),然後推送給 Consumer,推送之後,AvailablePermit 自減 N。

  4. Consumer 接收到消息之後,並不會直接返回給用戶,而是放在 ReceiveQueue 中,當用戶調用 Receive() 方法來獲取消息時,Consumer 將 Permit + 1。

  5. 當 Permit > MaxReceiveQueueSize / 2,Consumer 會再次發起 Flow 請求,並且攜帶當前的 Permit 值。

上述流程,就是 Consumer 和 Broker 的消息傳遞過程。

在默認的情況下,數據推送給 Consumer 之後,就完全交給用戶處理,數據不會重複推送。這種方式滿足不了需要重推的場景,下面介紹目前 Pulsar 的幾種重推機制。

SDK 統一的重推

一個比較直觀的做法是超過一定時間,如果消息沒有 Ack 就重新推送。

目前 Pulsar 提供了通過超時時間來控制數據重推的能力,Consumer 可以配置 AckTimeout(默認關閉),在設置了 AckTimeout 之後,Client 會構建一個 UnAckedMessageTracker ,用戶 Receive() 的所有的消息都會被 UnAckedMessageTracker 跟蹤。用戶 Ack 消息時,會從 UnAckedMessageTracker 刪除,對於沒有 Ack 的消息,UnAckedMessageTracker 會有定時任務來檢查,如果已經超過了 AckTimeout 時間,則會觸發重推。

重推是通過 RedeliverUnackMessage 來實現的,UnAckedMessageTracker 會主動發起 Redeliver 的請求,Broker 會根據請求的 MessageId 信息重新推送。

AckTimeout 在 Consumer 初始化時設置:

 Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                  .ackTimeout(10, TimeUnit.SECOND)

用戶決策的重推 -- NegativeAck

通過 AckTimeout 實現的重推,是 SDK 內部統一實現的,用戶不能控制重推的行爲,如果用戶希望根據自己的使用場景,決定哪些消息需要重推,Pulsar 提供了 NegativeAck 的能力。

NegativeAck 和 AckTimeout 方式類似,有一個 NegativeAcksTracker 來管理消息的重推,NegativeAcksTracker 只會跟蹤用戶主動調用 NegativeAcknowledge() 方法的 MessageID,重推的邏輯也是通過 RedeliverUnackMessage 實現。

NegativeAck 可以設置 Redelivery 的 Delay 時間。

 Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS)

使用的時候,需要明確調用。

// call the API to send negative acknowledgment
consumer.negativeAcknowledge(message);

用戶決策的重推 -- RLQ

除了 NegativeAck 的方式,用戶還可以通過重試隊列( RLQ )來實現主動的消息重推,RLQ 一般會使用在用戶暫時不能處理某些消息,並且希望之後再處理的場景。

Pulsar 提供了 ReconsumeLater() 方法來實現重試隊列,和 Negative 不同的是,RLQ 會創建一個新的 Topic,Topic 的格式是 TopicName-SubscriptionName_RLQ , 每次 ReconsumeLater() 時,都會產生一個新的消息寫入到 RLQ Topic 中,並且會對之前的消息 Ack。

設置了 RLQ 的 Consumer,SDK 內部默認會啓動 RLQ 的訂閱,所以 RLQ 的消息也會被 Consumer 消費到。

RLQ 是通過 DeadLetterPolicy 來配置的(DLQ 下文會解釋)。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
     .topic("my-topic")
     .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Shared)
     .enableRetry(true)
    .deadLetterPolicy(DeadLetterPolicy.builder()
     .maxRedeliverCount(maxRedeliveryCount)
    .build())
     .subscribe();

RLQ Topic 中的消息屬性中會添加一下信息:

daCfyO

RLQ 也需要主動調用: consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS)。

爲重推次數加上限制 --DLQ

對於數據持續處理失敗,一直重試並不是一個很好的策略,此時死信隊列(DLQ)就是一個比較好的選擇,DLQ 允許用戶將持續處理失敗的數據寫入到一個獨立的 Dead Letter Topic 中,DLQ 的數據需要單獨的訂閱來消費。

DLQ Topic 的格式爲 TopicName-SubscriptionName_DLQ。DLQ 需要爲重試設置一個上限,當重試次數超過上限之後,就會被寫入到 DLQ Topic 中。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
       .topic("my-topic")
      .subscriptionName("my-subscription")
      .subscriptionType(SubscriptionType.Shared)
         .deadLetterPolicy(DeadLetterPolicy.builder()
             .maxRedeliverCount(maxRedeliveryCount)
             .build())
         .subscribe();

幾種重推和 DLQ 的關係

如果配置了 DLQ,那麼使用 AckTimeout、NegativeAck 或者 ReconsumeLater 引起的數據重推都會觸發 DLQ,也就是說重試的次數達到上限之後,都會被寫入到 DLQ topic 裏。

重試次數的統計有所區別:

AckTimeout 和 NegativeAck 都是通過 Redelivery 機制來計數的,SDK 發起 Redelivery 請求之後,Broker 側的 RedeliveryTracker 會記錄重推的次數,並且在推送給 Consumer 的 Message 中會包含 RedeliveryCount 的字段。

對於 RLQ,則是從 RECONSUMETIMES 屬性中獲取重複消費的次數,這個屬性在 Client 生成,並且也是在 Client 計數。

總的來說,Apache Pulsar 提供了多種消息重推的方式,用戶可以結合自己的場景,靈活使用,滿足自己的業務需求。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/V3Y2yc7nYlCzVVs8Td7hFg