解析 RocketMQ 多樣消費功能 - 消息過濾

什麼是消息過濾

在消息中間件的使用過程中,一個主題對應的消費者想要通過規則只消費這個主題下具備某些特徵的消息,過濾掉自己不關心的消息,這個功能就叫消息過濾。

就如上圖所描述的,生產者會向主題中寫入形形色色的消息,有橙色的、黃色的、還有灰色的,而這個主題有兩個消費者,第一個消費者只想要消費橙色的消息,第二個消費者只想要消費黃色的和灰色的消息,那麼這個效果就需要通過消息過濾來實現。

消息過濾的應用場景

我們以常見的電商場景爲例,來看看消息過濾在實際應用過程中起到的作用。

電商平臺在設計時,往往存在系統拆分細、功能模塊多、調用鏈路長、系統依賴複雜等特點,消息中間件在其中就起到了異步解耦、異步通信的作用,特別是在雙十一這樣的流量高峯期,消息中間件還起到了削峯填谷的作用。

而在消息中間件使用方面,電商平臺因爲覆蓋的領域衆多會產生很多的消息主題,消息收發量也隨着交易量和訂閱系統的增加而增大。隨着業務系統的水平拆解和垂直增加,相關的消息呈現出高訂閱比和低投遞比的狀態,比如一個主題訂閱比是 300:1,即 1 個主題的訂閱者有 300 個,但是投遞比卻只有 15:300,即一條消息只有 15 個訂閱者需要投遞,其他 285 個訂閱者全部過濾了這條消息。那解決這些場景,就需要使用到消息過濾。

舉例來說,在交易鏈路中,一個訂單的處理流程分爲下單、扣減庫存、支付等流程,這個流程會涉及訂單操作和狀態機的變化。下游的系統,如積分、物流、通知、實時計算等,他們會通過消息中間件監聽訂單的變更消息。但是它們對訂單不同操作和狀態的消息有着不同的需求,如積分系統只關心下單消息,只要下單就扣減積分。物流系統只關係支付和收貨消息,支付就發起物流訂單,收貨就完成物流訂單。實時計算系統會統計訂單不同狀態的數據,所有消息都要接收。

試想一下如果沒有消息過濾這個功能,我們會怎麼支持以上消息過濾的功能呢?能想到的一般有以下兩個方案:

1. 通過將主題進行拆分,將不同的消息發送到不同主題上。

對於生產者來說,這意味着消費者有多少消費場景,就需要新建多少個 Topic,這無疑會給生產者帶來巨大的維護成本。對消費者來說,消費者有可能需要同時訂閱多個 Topic,這同樣帶來了很大的維護成本。另外,消息被主題拆分後,他們之間的消費順序就無法保證了,比如對於一個訂單,它的下單、支付等操作顯然是要被順序處理的。

2. 消費者收到消息後,根據消息體對消息按照規則硬編碼自行過濾。

這意味着所有的消息都會推送到消費者端進行計算,這無疑增加了網絡帶寬,也增加了消費者在內存和 CPU 上的消耗。

有了消息過濾這個功能,生產者只需向一個主題進行投遞消息,服務端根據訂閱規則進行計算,並按需投遞給每個消費者。這樣對生產者和消費者的代碼維護就非常友好,同時也能很大程度上降低網絡帶寬,同時減少消費者的內存佔用和 CPU 的消耗。

RocketMQ 消息過濾的模式

RocketMQ 是衆多消息中間件中爲數不多支持消息過濾的系統。這也是其作爲業務集成消息首選方案的重要基礎之一。

在功能層面,RocketMQ 支持兩種過濾方式,Tag 標籤過濾和 SQL 屬性過濾,下面我來這兩個過濾方式使用方式和技術原理進行介紹

Tag 標籤過濾

Tag 標籤過濾方式是 RocketMQ 提供的基礎消息過濾能力,基於生產者爲消息設置的 Tag 標籤進行匹配。生產者在發送消息時,設置消息的 Tag 標籤,消費者按需指定已有的 Tag 標籤來進行匹配訂閱。

  1. 單 Tag 匹配:過濾表達式爲目標 Tag,表示只有消息標籤爲指定目標 Tag 的消息符合匹配條件,會被髮送給消費者;

  2. 多 Tag 匹配:多個 Tag 之間爲或的關係,不同 Tag 間使用兩個豎線(||)隔開。例如,Tag1||Tag2||Tag3,表示標籤爲 Tag1 或 Tag2 或 Tag3 的消息都滿足匹配條件,都會被髮送給消費者進行消費;

  3. 全 Tag 匹配:使用星號(*)作爲全匹配表達式。表示主題下的所有消息都將被髮送給消費者進行消費。

  1. 發送消息,設置 Tag 標籤
Message message = provider.newMessageBuilder()
    .setTopic("TopicA")
    .setKeys("messageKey")
    //設置消息Tag,用於消費端根據指定Tag過濾消息
    .setTag("TagA")
    .setBody("messageBody".getBytes())
    .build();
  1. 訂閱消息,匹配單個 Tag 標籤
//只訂閱消息標籤爲“TagA”的消息
FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);
  1. 訂閱消息,匹配多個 Tag 標籤
sion filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);
  1. 訂閱消息,匹配所有 Tag 標籤,即不過濾
//使用Tag標籤過濾消息,訂閱所有消息
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);

RocketMQ 在存儲消息的時候,是通過 Append-Only 的方式將所有主題的消息都寫在同一個 CommitLog 文件中,這可以有效的提升了消息的寫入速率。爲了消費時能夠快速檢索消息,它會在後臺啓動異步方式將消息所在位點、消息的大小,以及消息的標籤哈希值存儲到 ConsumeQueue 索引文件中。將標籤存儲到這個索引文件中,就是爲了在通過標籤進行消息過濾的時候,可以在索引層面就可以獲取到消息的標籤,不需要從 CommitLog 文件中讀取,這樣就減少消息讀取產生的系統 IO 和內存開銷。標籤存儲哈希值,主要是爲了保證 ConsumeQueue 索引文件能夠定長處理,這樣可以有效較少存儲空間,提升這個索引文件的讀取效率。

整個 Tag 標籤過濾的流程如下:

  1. 生產者對消息打上自己的業務標籤,發送給我們的服務端 Broker;

  2. Broker 將消息寫入 CommitLog 中,然後通過異步線程將消息分發到 ConsumeQueue 索引文件中;

  3. 消費者啓動後,定時向 Broker 發送心跳請求,將訂閱關係上傳到 Broker 端,Broker 將訂閱關係及標籤的哈希值保存在內存中;

  4. 消費者向 Broker 拉取消息,Broker 會通過訂閱關係和隊列去 ConsumeQueue 中檢索消息,將訂閱關係中的標籤哈希值和消息中的標籤哈希值做比較,如果匹配就返回給消費者;

  5. 消費者收到消息後,會將消息中的標籤值和本地訂閱關係中標籤值做精確匹配,匹配成功纔會交給消費線程進行消費。

SQL 屬性過濾

SQL 屬性過濾是 RocketMQ 提供的高級消息過濾方式,通過生產者爲消息設置的屬性(Key)及屬性值(Value)進行匹配。生產者在發送消息時可設置多個屬性,消費者訂閱時可設置 S QL 語法的過濾表達式過濾多個屬性。

  1. 數值比較:>, >=, <, <=, BETWEEN, =

  2. 字符比較:=, <>, IN

  3. 判空運算:IS NULL or IS NOT NULL

  4. 邏輯運算:AND, OR, NOT

  1. 發送消息,設置屬性
Message message = provider.newMessageBuilder()
    .setTopic("TopicA")
    .setKeys("messageKey")
    //設置消息屬性,用於消費端根據指定屬性過濾消息。
    .addProperty("Channel", "TaoBao")
    .addProperty("Price", "5999")
    .setBody("messageBody".getBytes())
    .build();
  1. 訂閱消息,匹配單個屬性
FilterExpression filterExpression = new FilterExpression("Channel='TaoBao'", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);
  1. 訂閱消息,匹配多個屬性
FilterExpression filterExpression = new FilterExpression("Channel='TaoBao' AND Price>5000", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);
  1. 訂閱消息,匹配所有屬性

FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);

由於 SQL 過濾需要將消息的屬性和 SQL 表達式進行匹配,這會對服務端的內存和 CPU 增加很大的開銷。爲了降低這個開銷,RocketMQ 採用了布隆過濾器進行優化。當 Broker 在收到消息後,會預先對所有的訂閱者進行 SQL 匹配,並將匹配結果生成布隆過濾器的位圖存儲在 ConsumeQueueExt 索引擴展文件中。在消費時,Broker 就會使用使用這個過濾位圖,通過布隆過濾器對消費者的 SQL 進行過濾,這可以避免消息在一定不匹配的時候,不需要去 CommitLog 中將消息的屬性拉取到內存進行計算,可以有效地降低屬性和 SQL 進行匹配的消息量,減少服務端的內存和 CPU 開銷。

整個 SQL 過濾的處理流程如下:

  1. 消費者通過心跳上傳訂閱關係,Broker 判斷如果是 SQL 過濾,就會通過布隆過濾器的算法,生成這個 SQL 對應的布隆過濾匹配參數;

  2. 生產者對消息設置上自己的業務屬性,發送給我們的服務端 Broker;

  3. Broker 收到後將消息寫入 CommitLog 中,然後通過異步線程將消息分發到 ConsumeQueue 索引文件中。在寫入之前,會將這條消息的屬性和當前所有訂閱關係中 SQL 進行匹配,如果通過,則將 SQL 對應的布隆過濾匹配參數合併成一個完整的布隆過濾位圖;

  4. 消費者消費消息的時候,Broker 會先獲取預先生成的布隆過濾匹配參數,然後通過布隆過濾器對 ConsumeQueueExt 的布隆過濾位圖和消費者的布隆過濾匹配參數進行匹配;

  5. 布隆過濾器返回匹配成功只能說明消息屬性和 SQL 可能匹配,Broker 還需要從 CommitLog 中將消息屬性取出來,再做一次和 SQL 的精確匹配,這個時候匹配成功纔會將消息投遞給消費者

差異及對比

最佳實踐

主題劃分及消息定義

主題和消息背後的本質其實就是業務實體的屬性、行爲或狀態發生了變化。只有發生了變化,生產者纔會往主題裏面發送消息,消費者才需要監聽這些的消息,去完成自身的業務邏輯。

那麼如何做好主題劃分和消息定義呢,我們以訂單實體爲例,來看看主題劃分和消息定義的原則。

  1. 業務領域是否一致

不同的業務領域背後有不同的業務實體,其屬性、行爲及狀態的定義天差地別。比如商品和訂單,他們屬於兩個完全獨立且不同的領域,就不能定義成同一個主題。

  1. 業務場景是否一致

同一個業務領域不同的業務場景或者技術場景,不能定義一個主題。如訂單流程和訂單緩存刷新都和訂單有關係,但是訂單緩存刷新可能需要被不同的流程觸發,放在一起就會導致部分場景訂單緩存不刷新的情況。

  1. 消息類型是否一致

同一個業務領域和業務場景,對消息類型有不同需求,比如訂單處理過程中,我們需要發送一個事務消息,同時也需要發送一個定時消息,那麼這兩個消息就不能共用一個主題。

  1. 無標籤無屬性

對於業務實體極其簡單的消息,是可以不需要定義標籤和屬性,比如 MySQLBinlog 的同步。所有的消費者都沒有消息過濾需求的,也無需定義標籤和屬性。

  1. 如何定義標籤

標籤過濾是 RocketMQ 中使用最簡單,且過濾性能最好的一種過濾方式。爲了發揮其巨大的優勢,可以考慮優先使用。在使用時,我們需要確認這個字段在業務實體和業務流程中是否是唯一定義的,並且它是被絕大多數消費者作爲過濾條件的,那麼可以將它作爲標籤來定義。比如訂單中有下單渠道和訂單操作這兩個字段,並且在單次消息發送過程中都是唯一定義,但是訂單操作被絕大多數消費者應用爲過濾條件,那麼它最合適作爲標籤。

  1. 如何定義屬性

屬性過濾的開銷相對比較大,所以只有在標籤過濾無法滿足時,才推薦使用。比如標籤已經被其他字段佔用,或者過濾條件不可枚舉,需要支持多屬性複雜邏輯的過濾,就只能使用屬性過濾了。

保持訂閱關係一致

訂閱關係一致是指同一個消費者組下面的所有的消費者所訂閱的 Topic 和過濾表達式都必須完全一致。

正如上圖所示,一個消費者組包含兩個消費者,他們同時訂閱了 Topic-A 這個主題,但是消費者一訂閱的是 Tag-A 這個標籤的消息,消費者二訂閱的是 Tag-B 這個標籤的消息,那麼他們兩者的訂閱關係就存在不一致。

那麼訂閱關係不一致會導致什麼問題呢?

  1. 頻繁複雜均衡

在 RocketMQ 實現中,消費者客戶端默認每 30 秒向 Broker 發送一次心跳,這個過程會上傳訂閱關係,Broker 發現變化了就進行訂閱關係覆蓋,同時會觸發客戶端進行負載均衡。那麼訂閱關係不一致的兩個客戶端會交叉上傳自己的訂閱關係,從而導致客戶端頻繁進行負載均衡。

  1. 消費速率下降

客戶端觸發了負載均衡,會導致消費者所持有的消費隊列發生變化,出現間斷性暫停消息拉取,導致整體消費速率下降,甚至出現消息積壓。

  1. 消息重複消費

客戶端觸發了負載均衡,會導致已經消費成功的消息因爲消費隊列發生變化而放棄向 Broker 提交消費位點。Broker 會認爲這條消息沒有消費成功而重新向消費者發起投遞,從而導致消息重複消費。

  1. 消息未消費

訂閱關係的不一致,會有兩種場景會導致消息未消費。第一種是消費者的訂閱關係和 Broker 當前訂閱關係不一致,導致消息在 Broker 服務端就被過濾了。第二種是消費者的訂閱關係和 Broker 當前的雖然一致,但是 Broker 投遞給了其他的消費者,被其他消費者本地過濾了。

在消息過濾使用中,有以下建議:

  1. 不要共用消費者組

不同業務系統千萬不要使用同一個消費者組訂閱同一個主題的消息。一般不同業務系統由不同團隊維護,很容易發生一個團隊修改了訂閱關係而沒有通知到其他團隊,從而導致訂閱關係不一致的情況。

  1. 不頻繁變更訂閱關係

頻繁變更訂閱關係這種情況比較少,但也存在部分用戶實現在線規則或者動態參數來設置訂閱關係。這有可能導致訂閱關係發生變化,觸發客戶端負載均衡的情況。

  1. 變更做好風險評估

由於業務的發展,需求的變更,訂閱關係不可能一直不變,但是變更訂閱關係過程中,需要考慮整體發佈完成需要的總體時間,以及發佈過程中訂閱關係不一致而對業務可能帶來的風險。

  1. 消費做好冪等處理

不管是訂閱關係不一致,還是客戶端上下線,都會導致消息的重複投遞,所以消息冪等處理永遠是消息消費的黃金法則。在業務邏輯中,消費者需要保證對已經處理過的消息直接返回成功,避免二次消費對業務造成的損害,如果返回失敗就會導致消息一直重複投遞直到進死信。

到此,本文關於消息過濾的分享就到此結束了,非常感謝大家能夠花費寶貴的時間閱讀,有不對的地方麻煩指正,感謝大家對 RocketMQ 的關注,希望大家能夠多多參與社區的討論和貢獻。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/mou7lsifmou_y2sHDttjNg