Stream 消息驅動
一、什麼是 Spring Cloud Stream?
-
官方定義 Spring Cloud Stream 是一個構建消息驅動微服務的框架。
-
應用程序通過 inputs 或者 outputs 來與 Spring Cloud Stream 中 binder 對象交互。
-
通過我們配置來 binding(綁定),而 Spring Cloud Stream 的 binder 對象負責與消息中間件交互。所以,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅動的方式。
-
通過使用 Spring Integration 來連接消息代理中間件以實現消息事件驅動。
-
Spring Cloud Stream 爲一些供應商的消息中間件產品提供了個性化的自動化配置實現,引用了發佈 - 訂閱、消費組、分區的三個核心概念。
-
目前僅支持 RabbitMQ、 Kafka。
二、Stream 的設計思想
1、標準 MQ
-
生產者 / 消費者之間靠消息媒介傳遞信息內容
-
消息必須走特定的通道 - 消息通道 Message Channel
-
消息通道里的消息如何被消費呢,誰負責收發處理 - 消息通道 MessageChannel 的子接口 SubscribableChannel,由 MessageHandler 消息處理器所訂閱。
2、爲什麼用 Cloud Stream?
比方說我們用到了 RabbitMQ 和 Kafka,由於這兩個消息中間件的架構上的不同,像 RabbitMQ 有 exchange,kafka 有 Topic 和 Partitions 分區。
這些中間件的差異性導致我們實際項目開發給我們造成了一定的困擾,我們如果用了兩個消息隊列的其中一種,後面的業務需求,我想往另外一種消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因爲它跟我們的系統耦合了,這時候 Spring Cloud Stream 給我們提供了—種解耦合的方式。
3、Stream 憑什麼可以統一底層差異?
在沒有綁定器這個概念的情況下,我們的 SpringBoot 應用要直接與消息中間件進行信息交互的時候,由於各消息中間件構建的初衷不同,它們的實現細節上會有較大的差異性通過定義綁定器作爲中間層,完美地實現了應用程序與消息中間件細節之間的隔離。通過嚮應用程序暴露統一的 Channel 通道,使得應用程序不需要再考慮各種不同的消息中間件實現。
4、通過定義綁定器 Binder 作爲中間層,實現了應用程序與消息中間件細節之間的隔離。
Binder:
-
INPUT 對應於消費者
-
OUTPUT 對應於生產者
Stream 中的消息通信方式遵循了發佈 - 訂閱模式
Topic 主題進行廣播
-
在 RabbitMQ 就是 Exchange
-
在 Kakfa 中就是 Topic
三、Stream 編碼常用註解簡介
1. Spring Cloud Stream 標準流程套路
-
Binder - 很方便的連接中間件,屏蔽差異。
-
Channel - 通道,是隊列 Queue 的一種抽象,在消息通訊系統中就是實現存儲和轉發的媒介,通過 Channel 對隊列進行配置。
-
Source 和 Sink - 簡單的可理解爲參照對象是 Spring Cloud Stream 自身,從 Stream 發佈消息就是輸出,接受消息就是輸入。
2. 編碼 API 和常用註解
四、Stream 之消息重複消費
運行後有兩個問題
-
有重複消費問題
-
消息持久化問題
消費
-
http://localhost:8801/sendMessage
-
目前是 8802/8803 同時都收到了,存在重複消費問題
-
如何解決:分組和持久化屬性 group(重要)
生產實際案例
比如在如下場景中,訂單系統我們做集羣部署,都會從 RabbitMQ 中獲取訂單信息,那如果一個訂單同時被兩個服務獲取到,那麼就會造成數據錯誤,我們得避免這種情況。這時我們就可以使用 Stream 中的消息分組來解決。
注意在 Stream 中處於同一個 group 中的多個消費者是競爭關係,就能夠保證消息只會被其中一個應用消費一次。不同組是可以全面消費的 (重複消費)。
五、Stream 之 group 解決消息重複消費
1. 原理
微服務應用放置於同一個 group 中,就能夠保證消息只會被其中一個應用消費一次。
不同的組是可以重複消費的,同一個組內會發生競爭關係,只有其中一個可以消費。
8802/8803 都變成不同組,group 兩個不同
group: A_Group、B_Group
六、Stream 之消息持久化
-
通過上述,解決了重複消費問題,再看看持久化。
-
停止 8802/8803 並去除掉 8802 的分組 group: A_Group,8803 的分組 group: A_Group 沒有去掉。
-
8801 先發送 4 條消息到 RabbitMq。
-
先啓動 8802,無分組屬性配置,後臺沒有打出來消息。
-
再啓動 8803,有分組屬性配置,後臺打出來了 MQ 上的消息。(消息持久化體現)
source:https://www.yuque.com/yanzipang-wf7ur/hkyrfw/vbkxz8
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/l0_xVXt2uHEda05gDA52SQ