作爲程序員,你可能不知道,Stream 竟然還有應用進階學習?

Stream 應用進階

在微服務架構下,細粒度服務之間更容易發生頻繁的分佈式集成與交互。

基於消息中間件的服務交互模式,或者建立以事件驅動爲主導的架構模型,可以幫助業務建立和實現核心的領域事件驅動交互機制。

領域事件(Domain Event)的通信改變了領域對象的狀態,比如訂單創建事件、庫存添加事件。

一個領域事件可以表達正在發生在一個領域對象上的行爲。領域事件的發生伴隨着修改的數據、時間戳、聚合 ID 及其他附加信息。在微服務架構下,領域事件以發佈 / 訂閱的模式,將事件發佈到 MQ 中間件,允許多個不同微服務訂閱事件並消費事件,這個事件可能是一個訂單創建事件(OrderCreateEvent)或訂單修改事件(CustomerModifyEvent)。

以上方式都遵循面向對象的方式,然而這些對象穿梭在生產者、分佈式消息隊列和消費者中,變成了共享類庫,當衆多微服務需要依賴共享類庫時,就產生了高度的耦合。這種共享分佈式對象實現遠程調用是通過把共享對象打成 jar 包被不同微服務共享依賴的,也是分佈式系統中的典型反模式,當一個領域事件被修改後,每個依賴的微服務都會受到影響。

使用 SCS 通過 Spring Message 的公共消息事件機制,可以支持事件驅動架構,同時避免依賴共享 Domain 對象類型。下面我們在 SCS 的一些元註解的基礎上結合 Spring 的一些小特性實現類似 CORS 的 EDA 架構。

SpringCloudStream 處理事件

SCS 提供了 @StreamListener 註解來控制序列化的方式,它作爲方法的入參並執行方法,例如:

新的事件分發特性在 @StreamListener 上增加了 Condition 屬性來使消息路由到多個監聽器成爲可能,Condition 的值是用 SPEL 表達式運算出來的一個 boolean 值。Condition 應用到傳入的消息上,能夠計算任何消息載荷、特定的消息頭或其組合。這提供了一種極其靈活的路由機制,且不需要不同的事件類型定義類。例如,我們定義一個帶 String eventType 屬性的 Event 類型,SCS 將提供開箱即用的功能:

其中,Event 類型的定義如下:

定製化事件註解

雖 然 可 以 使 用 通 用 的 Event 類 型 , 但 是 仍 然 需 要 通 過 @StreamListener 註解加入 SEL Condition 註解來過濾,使用起來比較麻煩。現在我們基於 @StreamListener 註解實現自定義事件註解,我們通過事件類型直接定位到我們的方法。自定義註解 @EventHandler 的定義如下:

通過 @EventHandler 的定義,我們可以把事件監聽註解簡化爲如下形式:

這裏,我們可以通過將 Condition 表達式轉化爲一個模板實現,重載 SCS 處 理 @StreamListener 注 解 的 BeanPostProcessor , 這 樣 通 過 eventType 就可以實現識別事件的方法。重載的函數如下:

接下來,我們可以用自定義的 @EnableEventHandling 註解來引入這個 configuration:

我 們 修 改 EventHandler 注 解 , 定 義 一 個 eventType 屬 性 來 做 Condition 的別名:

然後,我們可以使用自定義的事件處理註解,只需要提供事件類型,就可以從消費者發送訂閱事件:

微服務集成架構傾向於使用標準化的 HTTP、基於 REST API 的架構交互模式進行集成。此外,考慮到性能也可以採用 RPC 的調用方式。對於異步交互過程,使用消息隊列可以實現微服務之間的充分解耦和異構集成。

Spring Cloud 提供了 Spring Cloud Stream 框架,它可以屏蔽底層通信技術細節,並且實現了基於消息的輕量級微服務集成解決方案。

還可用使用 Spring Cloud Stream 實現基於事件驅動和 CQRS 的系統架構。

來源:

https://www.toutiao.com/article/7093880049954259464/?log_from=1742314f90e49_1651885489530

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/KhDXO7Ua5E4lrvQYvorROw