深入研究 Pulsar 中的消息分塊

本文翻譯自 StreamNative 博客《Deep Dive into Message Chunking in Pulsar》,作者楊子棵,Apache Pulsar Committer,StreamNative 工程師。

譯者簡介

饒璐,2021 年畢業於南京大學計算機系,現就職於京東零售實時平臺研發部消息中間件團隊,無論是工作,還是 Pulsar 社區志願者都算是新人一枚~ 目前正在參與社區文檔的翻譯工作,期望早日全面瞭解社區並期待能貢獻代碼。

Apache Pulsar 與所有消息系統一樣,會對發送到 broker 的每條消息限制大小。這可以防止每條消息的負載超過 broker 中設置的 maxMessageSize,其默認值爲 5 MB。

然而,在諸如圖像處理和音頻處理等應用場景中,很多用戶需要 Pulsar 客戶端向 broker 發送大消息。你可以通過調整 maxMessageSize 來實現此目的。但是,這種方法可能會導致許多問題。例如,如果客戶端發佈了一條 100 MB 的消息,並且 broker 允許將該消息存儲到 bookie 中,那麼 bookie 將花費大量資源來處理該消息。這將影響其他客戶端發佈消息並導致積壓丟失(Backlog Draining)。

因此,Pulsar 在發送大消息時,除了增加 maxMessageSize 的值外,也提供了消息分塊功能來支持。通過消息分塊,生產者可以根據 maxMessageSize 將大消息拆分爲多個塊,並將每個塊作爲普通消息發送給 broker。消費者之後會將這些塊消息組合回原始消息。

在這篇博客中,我們將解釋消息分塊的概念,深入探討其實現,並分享此功能的最佳實踐,包括:

消息分塊的工作原理

消息分塊是一個過程,通過該過程可以將大消息拆分爲多個塊的普通消息以進行生產和消費。使用消息分塊時,你無需擔心 Pulsar 如何拆分和組合大消息,或處理分塊消息的細節,Pulsar 爲你完成所有相關工作。在本節中,讓我們一起探索消息分塊在不同場景中是如何工作的。

單個生產者向主題發佈分塊消息

啓用消息分塊後,如果生產者收到來自用戶的大消息並且消息大小超過了來自 broker 的單個消息大小限制,則生產者將拆分消息。在上圖中,大消息 M 被分爲三部分:M-C1、M-C2 和 M-C3。生產者確保每個塊的有效負載不超過 maxMessageSize 並按順序發佈塊。

消費者緩衝分塊的消息,直到它接收到該消息的所有分塊。消費者然後聚合分塊,消費消息,並將原始消息返回給應用程序。

單個生產者同時向主題發佈普通和分塊消息

上圖包括四條消息,M1 和 M2 是超過 maxMessageSize 的大消息(將會被分塊),M3 和 M4 是普通消息。M1 分爲三個普通消息:M1-C1、M1-C2 和 M1-C3。消息 M2 分爲兩個普通消息:M2-C1 和 M2-C2。M3 和 M4 保持原樣發送。

Broker 將所有消息存儲在主題中,並以相同的順序將它們分派給消費者。消費者能夠確定傳入消息是否爲分塊消息。消費者接收到分塊消息後,將其緩衝在消費者內存中,直到接收完該消息的所有塊,然後將這些塊組合成一條消息重新形成原始的大型有效負載消息,並移交給應用程序。

多個生產者同時向一個主題發佈分塊消息

Pulsar 允許多個生產者同時生產關於同一主題的消息。broker 將來自不同生產者的所有分塊消息存儲在同一主題中。當多個生產者同時向同一個主題生產分塊消息時,來自不同分塊消息的塊被交織存儲在該主題中。這些塊仍然有序,但它們在該主題中可能並不連續。這給消費者帶來了一定的內存壓力,因爲消費者需要爲每條大消息保留一個單獨的緩衝區。

在上圖示例中,生產者 1 發佈一條大消息 M1,分爲三個塊:M1-C1、M1-C2 和 M1-C3。生產者 2 發佈一條大消息 M2 ,分爲三個塊:M2-C1、M2-C2 和 M2-C3。消費者需要爲兩條大消息分別創建一個單獨的緩衝區。當分別接受完大消息的所有塊時,重新聚合成的大消息將返回到應用程序。

注意:在以上討論的所有示例中,一次只有一個消費者消費來自主題的消息。換句話說,這些消費者的訂閱模式要麼是獨佔(exclusive),要麼是災備(failover)。目前,Pulsar 不支持共享(shared)或鍵共享(key_shared)訂閱模式的消費者的消息分塊。有關此功能的討論,請參閱 Issue Pulsar 中的大消息處理:共享訂閱的分塊 [1]。

如何啓用消息分塊

消息分塊功能默認關閉。如果要使用此功能,則需要在創建生產者時啓用消息分塊。

使用消息分塊功能時有一些限制:

    1. 主題必須是持久化主題。
    1. 生產者不能同時啓用消息分塊功能和批量消息處理功能。
    1. 消費者的訂閱模式必須是獨佔(exclusive),或者是災備(failover)。

以下是創建生產者時如何啓用消息分塊的示例。

Producer<byte[]producer = pulsarClient.newProducer()
       .topic(topic)
       .enableChunking(true)
       .enableBatching(false)
       .create();

然後,你可以使用此生產者生產如以下的大型有效負載消息:

byte[] data = new byte[10 * 1024 * 1024];
producer.send(data);

你還可以創建一個消費者來消費和確認大型有效負載消息,如下所示:

Consumer<byte[]consumer = pulsarClient.newConsumer()
       .topic(topic)
       .subscriptionName("test")
       .subscribe();

Message<byte[]msg = consumer.receive();
consumer.acknowledge(msg);

如何實現消息分塊

本節深入探討消息分塊的實現。首先,讓我們看一下與分塊消息相關的結構和字段,然後我們將探索生產和消費分塊消息的過程。

如下所示,MessageMetadata 中有四個與分塊相關的字段。生產者和消費者都使用這些字段。

optional string uuid;
optional int32 num_chunks_from_msg;
optional int32 total_chunk_msg_size;
optional int32 chunk_id;
  1. 1. uuid 是整個分塊消息的唯一標識符。同一分塊消息中的每個塊具有相同的 uuid,這樣我們就可以知道這個塊屬於哪個消息。

  2. 2. num_chunks_from_msg 是分塊消息中的塊數。同一分塊消息中的每個塊具有相同的 num_chunks_from_msg

  3. 3. total_chunk_msg_size 是整個分塊消息的總有效負載大小。同一分塊消息中的每個塊具有相同的 total_chunk_msg_size

  4. 4. chunk_id 用於標記當前塊的 ID。

拆分分塊消息

讓我們看看生產者端消息分塊的實現。在客戶端和 broker 成功建立連接後,broker 通過 proto 命令 CommandConnected 將 maxMessageSize 的值傳遞給客戶端。這是個可選的字段,默認值爲 5 MB。

當生產者發送消息時,會判斷當前消息的負載大小是否超過 maxMessageSize。如果消息超出大小限制,則此大消息將按 maxMessageSize 拆分。如下圖所示,有效負載大小爲 12 MB 的消息將被拆分爲三個塊(5MB、5MB、2MB)。

發佈分塊消息

一旦大消息被分割成塊,則生產者將每個塊作爲普通消息發送給 broker。與普通消息類似,每個塊仍然受到生產者的流量控制和客戶端的內存限制器的控制。生產者配置中有一個 maxPendingMessages 參數,用於限制生產者可以同時發佈的最大消息數。每個塊在 maxPendingMessages 中單獨計算(一個塊計數爲 1)。這意味着發送具有三個塊的大消息將佔用生產者中的三個消息用於掛起的消息(the pending message)。

當發送每個塊時,會爲每個塊創建一個單獨的 OpSendMsg。每個 OpSendMsg 共享相同的 ChunkMessageCtxChunkMessageCtx 用於將分塊消息 ID 返回給用戶。每個塊還在其消息元數據中共享分塊消息的相同 uuid

生產者按順序將每個塊發送給 broker,broker 按順序接收塊。這樣可以確保在收到最後一個塊的發佈確認時,成功發送整個分塊消息。這也確保了分塊消息的所有塊都按順序存儲在主題中。消費者依靠這個排序保證來按順序消費塊。

如果是分區主題,生產者會將同一條大消息的所有塊發佈到同一個分區。

分塊消息 ID

在 Pulsar 中,普通消息發佈或消費後,其消息 ID 會返回給用戶。對於分塊消息,Pulsar 如何將消息 ID 返回給用戶?

Pulsar 2.10.0 之前

在 Pulsar 2.10.0 之前,生產者或消費者只返回最後一個塊的消息 ID 作爲整個分塊消息的消息 ID,這種實現有時會引發一些問題。例如,當我們使用這個消息 ID 進行 seek 時,消費者會從最後一個塊的位置開始消費。消費者會錯誤地認爲之前的塊已經丟失,並選擇跳過當前消息。如果我們使用包容性搜索(inclusive seek),消費者可能會跳過第一條消息,從而導致意外行爲。

如上圖所示,消費者將最後一個塊的消息作爲分塊消息的消息 ID 返回給用戶。如果消費者使用這個消息 ID 進行 inclusive seek,在 broker 看來,消費者是在尋找 M1-C3。根據 inclusive seek 的語義,消費者進行 inclusive seek 後消費的第一條消息應該是當前 seek 位置的消息。所以消費者在 seek 之後消費的第一條消息應該是消息 M1。但實際上,到達消費者的第一條消息是 M1-C3。消費者然後發現它沒有收到該分塊消息的先前塊並且無法繼續接收先前的塊,於是將丟棄 M1。因此,消費的第一條消息實際上是 M2。這是一個意外的行爲。

在 Pulsar 2.10.0 中引入分塊消息 ID

爲了解決這個問題,Pulsar 在 2.10.0 版本中引入了分塊消息 ID 的功能,GitHub Issue 上的 PIP 107[2] 詳細介紹了此功能的提議。爲了實現與原始邏輯的兼容,分塊消息 ID 與原始行爲一致,它包含兩個普通的消息 ID:第一個塊的消息 ID 和最後一個塊的消息 ID。生產者和消費者都使用分塊消息上下文來生成分塊消息 ID。

如上圖所示,生產者在接收每個塊的發佈確認的同時,會獲取第一個塊的消息 ID(對應 “first chunk mid”)和最後一個塊的消息 ID。生產者將它們緩存在分塊消息上下文中,並在收到最後一個分塊消息 ID 後生成分塊消息 ID。對於消費者來說也是如此。不同的是,消費者通過接收消息來獲取消息 ID。

分塊消息 ID 功能不僅解決了由於查找(seeking)引起的問題,還可以讓你獲得有關分塊消息的更多信息。

將塊組合起來

消費者需要將所有塊組合成原始消息,然後再將其返回給應用程序。消費者使用分塊消息上下文(ChunkedMessageCtx)來緩衝所有塊數據,例如有效負載、元數據和消息 ID。在處理分塊消息時,消費者假設收到的分塊是按順序接收的,如果接收到的分塊無序,則將丟棄整個分塊消息。

假設我們發佈了兩條大消息,“abc” 和 “de”,如上圖所示。它們正在等待就該主題進行消費。broker 的 maxMessageSize 設置得太小(只有 1 個字節),導致每個塊中的有效負載都很小。

當消費者消費第一條消息時,發現該消息是分塊消息,並創建一個 ChunkedMessageCtx。(在上面的示例中,我們沒有列出分塊消息上下文中的所有字段。)通過分塊消息的唯一標識符 uuid,我們可以知道應該將塊放置在哪個上下文中。上下文中的 lastChunkedMessageId 表示最後收到的塊的分塊 ID。每當消費者收到新的塊時,它將會更新。上下文的有效負載是當前緩衝的整個分塊消息的有效負載。隨着消費者收到更多的塊,它將繼續增長。

一旦消費者收到了 uuid 爲 uuid1 的所有消息塊,它就可以使用分塊消息上下文來生成原始消息。然後將完整的消息返回給應用程序,消費者釋放該上下文。請注意,因爲我們在此過程中收到了一個消息塊 uuid2,所以在消費者中創建了第二個塊消息上下文。

就像前面的示例一樣,當消費者收到 uuid2 的所有分塊時,它會從分塊的消息上下文中生成一條新消息,並將完整的消息返回給該應用程序。

消息分塊的最佳實踐

在本節中,我們將分享一些使用消息分塊的最佳實踐。

不要使用過大的消息元數據

不建議在分塊消息中設置非常大的消息元數據。生產者經常發佈具有最大有效負載大小的塊。在將塊寫入 bookie 的過程中,如果塊的 header 部分(其中包括消息元數據)超過 10 KB(bookie 的 padding max frame 大小),就會出錯。

maxMessageSize 參數限制從客戶端到 broker 的每條消息的有效負載大小,但它不計算消息頭(header)的大小。在 BookKeeper 中,有一個類似的參數設置,名爲 nettyMaxFrameSizeBytes,它限制寫入每條消息的大小。任何發送到 BookKeeper 的消息若大於 nettyMaxFrameSizeBytes 都將被拒絕。在 BookKeeper 中,消息大小的計算包括消息頭和有效負載。BookKeeper 和 broker 計算 maxMessageSize 的方式不同。因此,Broker 會爲消息頭保留一些填充大小(padding size),該值爲 10 KB(在 Commands.MESSAGE_SIZE_FRAME_PADDING 中定義)。與 BookKeeper 建立連接時,broker 會將 nettyMaxFrameSizeBytes 設置爲 maxMessageSize 加上 10 KB。

因此,在發送大消息時,需要確保消息頭的大小不超過 10 KB。在設置大消息的鍵值和其他屬性時,也不應超過這個限制大小。目前有一個新的 PIP[3] 提出,通過在拆分消息時在客戶端包含消息頭部分來解除此限制。

主題級別 maxMessageSize 限制

如果要將分塊消息發佈到某主題,不建議爲該主題設置主題級別的 maxMessageSize

主題級別的 maxMessageSize 參數在 Pulsar 2.7.1 中引入,在啓用消息分塊時該參數可能會導致一些問題。如前文所述,分塊消息拆分使用 broker 級別的 maxMessageSize 作爲塊大小。在大多數的用例當中,主題級別的 maxMessageSize 始終小於或等於 broker 級別的 maxMessageSize。在這種情況下,發佈分塊消息將被 broker 拒絕。因爲這條消息中某些塊的有效負載大小會達到 broker 級別的 maxMessageSize,超過主題級別的 maxMessageSize,導致被 broker 拒絕。此外,broker 在檢查消息是否超過主題級別 maxMessageSize 時,會同時計算消息頭和有效負載。因此,當我們使用分塊消息功能時,需要注意不要在該主題上設置主題級別的 maxMessageSize 參數。

Pulsar 2.10 已修復了此問題。刪除了對分塊消息的主題級別的 maxMessageSize 參數檢查。將 Pulsar 版本升級到 2.10 後可以修復此問題。有關更多信息請閱讀 PIP-131[4]。

消費者的最佳實踐

你可以在消費者配置中使用 maxPendingChunkedMessageexpireTimeOfIncompleteChunkedMessage 和 autoAckOldestChunkedMessageOnQueueFull 三個參數來控制消費者用於分塊消息的內存。

分塊消息上下文緩衝在客戶端的內存當中。隨着消費者創建更多的上下文,它可能會佔用過多的內存並導致內存不足的錯誤。因此,Pulsar 在消費者配置中引入了 maxPendingChunkedMessage 參數,用於限制消費者可以同時維護的分塊消息上下文的最大數量。

此外,你可以在消費者配置中設置 expireTimeOfIncompleteChunkedMessage 參數來定義分塊消息上下文的過期時間。如果生產者未能成功發佈消息的所有塊,導致消費者在這個設置過期時間內無法接收到所有塊,則消費者會將不完整的塊設置爲過期。該參數的默認值爲一分鐘。

你還可以設置 autoAckOldestChunkedMessageOnQueueFull 參數使得,當掛起的分塊消息數量達到了最大值,或者上下文過期,去刪除最舊的上下文。如果設置爲 true,消費者將通過靜默確認的方式刪除你要刪除的分塊消息上下文;反之,消費者會將消息標記爲重新發送。

下面是如何在消費者上配置消息分塊的示例。

Consumer<byte[]consumer = client.newConsumer()
        .topic(topic)
        .subscriptionName("test")
        .autoAckOldestChunkedMessageOnQueueFull(true)
        .maxPendingChunkedMessage(100)
        .expireTimeOfIncompleteChunkedMessage(10, TimeUnit.MINUTES)
        .subscribe();

不建議發佈太大的分塊消息,因爲這會導致消費者端的內存使用率偏高。雖然消費者可以限制同時緩衝的分塊消息數量,但卻沒有簡單的方法來控制緩衝的分塊消息使用的內存量。

調試的最佳實踐

在 broker 中,你可以通過檢查主題中的某些統計信息來調試消息分塊功能。以下是常用的三個指標:

  1. 1. msgChunkPublished:布爾類型。它表示主題上是否發佈了分塊消息。

  2. 2. 發佈者中的 chunkedMessageRate:它表示生產者收到的關於某主題的分塊消息總數。

  3. 3. 訂閱和消費者中的 chunkedMessageRate:它表示訂閱或消費者中的分塊消息調度速率。

閱讀 Pulsar 文檔中的管理主題 (https://pulsar.apache.org/docs/admin-api-topics/#get-stats) 以瞭解更多信息。

升級你的 Pulsar 版本

隨着 Pulsar 社區不斷優化消息分塊功能,建議使用最新的 Pulsar 版本。我們建議將你的 Pulsar 版本更新到 2.10 或更高版本。這些版本在消息分塊有重要的錯誤修復,例如修復分塊消息的流控制可能引起的內存泄漏問題以及修復查找分塊消息引起的問題。

更多資源

如果在使用消息分塊時有任何更好的想法或遇到任何問題,請隨時在 Pulsar repo[5] 中創建 Issue。

你可以在以下鏈接中找到有關消息分塊實現的更多詳細信息:

引用鏈接

[1] Pulsar 中的大消息處理:共享訂閱的分塊: https://github.com/apache/pulsar/issues/7645
[2] PIP 107: https://github.com/apache/pulsar/issues/12402
[3] 新的 PIP: https://github.com/apache/pulsar/issues/13591
[4] PIP-131: https://github.com/apache/pulsar/issues/13544
[5] Pulsar repo: https://github.com/apache/pulsar/issues
[6] PIP 37: Large message size handling in Pulsar: https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar
[7] PR PIP 37 Pulsar-client:support large message size: https://github.com/apache/pulsar/pull/4400
[8] PIP 107: Introduce the chunk message ID: https://github.com/apache/pulsar/issues/12402
[9] PR PIP 107 Client Introduce chunk message ID: https://github.com/apache/pulsar/pull/12403

ApachePulsar Apache 軟件基金會頂級項目,下一代雲原生分佈式消息流平臺,集消息、存儲、輕量化函數式計算爲一體,採用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區域數據複製,具有強一致性、高吞吐、低延時及高可擴展性等流數據存儲特性。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/IJeaHURfr2EvBsJ1oDtBKQ