微服務消息驅動組件 SpringCloud Strea
簡介
SpringCloud Stream 是一個用於構建與共享消息系統連接的高度可擴展的事件驅動微服務組件。它提供了一個靈活的編程模型,基於 Spring Boot 建立獨立的生產級 Spring 應用程序,並使用 Spring Integration 提供與消息代理的連接可以讓我們在使用時幾乎無需關心具體的消息隊列實現。它屏蔽底層消息中間件的差異,降低切換成本,統一消息的編程模型,讓開發人員能夠更多的關注自己的業務。
架構模型
或許我們也可以看一個更爲簡潔的圖
我們可以看到,每個系統只依賴於自己的 Binder
和消息中間件或者說其他系統交互, Stream 隱藏了所有消息的發送細節,對於它來說只關心三個核心模塊
-
Destination Binders:目標綁定器,告訴 Stream 你需要綁定到哪個消息隊列服務的
Binder
實現即可。例如RabbitMQ
還是Kafka
的Binder
?這是它的核心構建塊,負責支持和提供與我們擁有的外部系統或外部消息傳遞系統的集成 -
Destination Bindings:目的地綁定,把消息生產者和消費者之間的橋樑提供給 Stream 。例如對於
RabbitMQ
來說,你需要告訴 Stream 當前系統發送消息所使用的的channel -> exchange -> routingKey -> queue
分別是什麼(當然這些都是在配置文件中完成的) -
Message:就是我們需要發送的消息
對於任何消息來說,只需要提供上述三個核心模塊即可,我們無需去關心發送的細節。
直至 SpringCloud Stream 3.2.1 版本,它已經支持了幾乎所有市面上流行的消息隊列產品。RabbitMQ、Kafka、RocketMQ、AWS SNS/SQS
等等,主要是因爲這種一統江湖的趨勢讓不同的消息中間件廠商都開發了自己的綁定器 Binder
提供給 SpringCloud Stream。
初體驗
下面以 RabbitMQ 爲例體驗一下 Stream 消息驅動開發。首先我們需要引入依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
接下來我們需要在配置文件中指定相關配置,在此之前請確保你對 RabbitMQ 中的組件有一個基本的認識,否則請先閱讀 RabbitMQ 基礎篇
生產者
配置文件:
spring:
#消息隊列地址
rabbitmq:
host: 129.204.178.49 #你的 rabbitmq 服務地址
port: 5672
username: guest
password: guest
cloud:
stream: #SpringCloud Stream 配置
bindings:
output-channel-demo: # channel 消息輸出通道
destination: demo-exchange # 交換機
binder: demo-binder # 綁定器
binders:
demo-binder: #綁定器
type: rabbit # rabbitmq
rabbit:
bindings:
output-channel-demo: # channel 消息輸出通道
producer: # 生產者
routing-key-expression: '''demoRoutingKey'''
聲明輸出通道
/**
* 聲明消息輸出通道 channel
* */
public interface MessageSource {
@Output("output-channel-demo")
MessageChannel output();
}
定義一個通道綁定類
/**
* 該註解用來指定一個或多個定義了 @Input 或 @Output 註解的接口,以此實現對消息通道(Channel)的綁定
* */
@EnableBinding(MessageSource.class)
public class MessageSourceHandler {
}
接下來我們寫一個集成測試發送消息即可
@Autowired
MessageSource messageSource;
/** 發送消息測試 */
@Test
public void test() {
messageSource.output().send(MessageBuilder.withPayload("測試消息").build());
}
此時消息就成功的發送出去了,接下來我們來寫消費者
消費者
配置文件
spring:
cloud:
stream:
binders:
demo-binder: #綁定器
type: rabbit #rabbitmq
rabbit:
bindings:
input-channel-demo: #消息輸入通道 channel
consumer:
binding-routing-key: 'demoRoutingKey'
bindings:
input-channel-demo: #消息輸入通道 channel
group: someGroup #防止多個消費者實例重複接收消息,這樣一條消息只會發送給相同組的其中一個實例
destination: demo-exchange #交換機
binder: demo-binder #綁定器
rabbitmq:
host: 129.204.178.49
port: 5672
username: guest
password: guest
聲明輸入通道
/**
* 聲明消息輸入通道 channel
* */
public interface MessageSink {
@Input("input-channel-demo")
SubscribableChannel input();
}
聲明綁定類
@EnableBinding(MessageSink.class)
public class MessageSinkHandler {
/**
* 監聽 input-channel-demo 通道的消息,該 @StreamListener 註解支持 SPEL 表達式,但是被標註的方法不能有返回值
* */
@StreamListener("input-channel-demo")
public void consume(String message){
System.out.println("接受到消息:"+message);
}
}
這樣一個完整的 SpringCloud Stream 微服務消息驅動的 demo 就完成了,啓動應用,消費者能成功的收到生產者發送的測試消息。要用好 SpringCloud Stream 你必須弄懂配置文件的內容!
GitHub 源碼地址 https://github.com/yanzhisishui/springcloud-stream.git ,或者微信公衆號回覆 “SpringCloud Stream 入門案例源碼”。
發送延遲消息
在 SpringCloud Stream 中發送延遲消息非常簡單,首先我們需要在生產者、消費者的配置文件中指定交換機的類型是延遲交換機
rabbit:
bindings:
input-channel-demo: #消息輸入通道 channel
consumer:
delayed-exchange: true
binding-routing-key: 'demoRoutingKey'
生產者一樣,這裏省略。然後只需要在上面發送的代碼中加一個 header 即可
//設置消息30秒後發送到消費者
messageSource.output().send(MessageBuilder.withPayload("測試消息")
.setHeader("x-delay",30 * 1000).build());
如果你發送延遲消息拋出 unknown exchange type 'x-delayed-message' 異常,那麼是因爲你的 RabbitMQ 服務沒有安裝延遲隊列插件。去官網安裝一下即可
這樣一個延遲消息的業務就實現了,看到這裏你會發現使用 SpringCloud Stream 整合消息很簡單,例如實際上對於整合 RabbitMQ
來說,幾乎所有的配置都在 RabbitConsumerProperties、RabbitProductProperties
中,生產者和消費者共有的屬性在它們的父類 RabbitCommonProperties
中。幾乎 RabbitMQ 的所有特性和功能都可以直接在配置文件中完成。作者能力有限,其他高級特性配置詳情可以參考官網 RabbitMQ Consumer Properties (https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/3.2.1/reference/html/spring-cloud-stream-binder-rabbit.html#_rabbitmq_consumer_properties )。
但如果你真這麼覺得那你就大錯特錯了,正如 SpringBoot ,用起來很簡單可能只需要花費 20% 的精力,但是想玩的好,可能要付出 200% 的精力。SpringCloud Stream 其實包含了一系列複雜技術體系,Spring Intergration、Spring Message、Spring AMQP
等等,其內部原理實現、組件的集成非常複雜。
我想 SpringCloud Stream 出生這麼久還不廣泛流行的原因之一就是,這一套技術體系涉及的東西太多了,萬一生產環境出現什麼疑難雜症,需要去閱讀源碼解決的話,這樣的技術工作量是很超出預期的。
Spring Message
Spring Message
是 Spring Framework
的一個子模塊,它定義了消息的統一編程模型,實際上 SpringCloud Stream 也是基於它實現的統一。
Spring Message 定義了上圖的消息編程模型,提出了通道 Channel
和 消息 Message
的抽象,所有的消息都由生產者發送到輸出通道 Output
中給消息中間件,然後所有的消費者都從輸入通道 Input
中獲取消息,而消息 Message
本身由兩部分組成,消息頭 header
和 消息體 payload
。
在上述的 初體驗 中,我們涉及到的幾個核心註解正是該模型的體現
-
@Output:代表輸出通道,生產者從這發出消息
-
@Input:代表輸入通道,消費者從這讀取消息
-
@EnableBinding:將定義通道的接口綁定到某個
Bean
以便於我們可以通過該Bean
操作通道進行發送和接收消息。 -
@StreamListener:訂閱輸入通道中的消息
SpringCloud Function 函數式編程
在 SpringCloud Stream 3.1 版本之後,你會發現 @EnableBinding
等幾個核心註解被官方標註廢棄了,這是因爲官方推出了更新的函數式編程模型 SpringCloud Function,試圖用這個組件將編程推向一個更高的層次。本篇文章不詳細介紹該組件,簡單介紹在 SpringCloud Stream 中如何結合 SpringCloud Function 進行消息發送和消費。
在結合 SpringCloud Function 時消息的通道命名要遵循以下約定
-
輸入 :
<functionName> + -in- + <index>
-
輸出 :
<functionName> + -out- + <index>
index
代表輸入或輸出綁定的索引,目前我們直接寫 0
即可。
任務型消息
參考官方文檔 Suppliers (Sources) (https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_suppliers_sources),我們開始寫一個生產者的發送消息方法。
@Bean
public Supplier<String> source1() {
return () -> "測試定時消息";
}
然後根據通道規則在 application.yml
中配置通道名爲 source1-out-0
,再配置 spring.cloud.function.destination = source1
,指定 function
的函數方法名。
接下來我們開始寫消費者,同樣我們需要一個消費方法。
@Bean
public Consumer<String> sink1() {
return message -> System.out.println("收到消息:" + message);
}
然後根據通道規則把配置文件中的通道名改爲 sink1-in-0
。這樣一個簡單的定時消息的發送和接收就完成了,生產者會每秒給消費者發送一條消息,不得不說,SpringCloud Stream 和 SpringCloud Function 的集成真的是...... 太神奇了。
業務觸發型消息
但通常我們更多的應用場景是業務觸發發送消息,所以 SpringCloud Stream 給我們提供了一個 StreamBridge
組件。使用它發送消息只要指定通道名即可
@Test
public void test() {
streamBridge.send("source1-out-0","測試消息");
}
這樣我們就已經完成了消息的發送,消費者還是用上面的消費函數即可。
總結
不得不說集成 SpringCloud Function 之後,消息的發送和接收又邁進了一個嶄新的階段,但 <functionName> + -in- + <index>
這樣的配置規約我覺得讓我有些難受...... 甚至目前我認爲 3.1 之前被廢棄的註解方式也許更適合我們開發使用。
結語
去年在生產項目中使用 SpringCloud Stream 的時候它才只支持 RabbitMQ 和 Kafka,但現在幾乎所有流行的消息中間件都開發了 Binder
去適配它,這也說明了它一統江湖的趨勢。
雖然我一直推崇技術的更新迭代,但這次我也要由衷的提醒,如果是新項目我們可以去嘗試引入使用,如果是老項目更新技術組件,還是要慎重,畢竟 SpringCloud Stream 涉及的一套技術體系太多,太複雜,本文僅僅是 SpringCloud Stream 的冰山一角。我們目前並不能很好的駕馭它,但我仍然相信它以後會成爲消息中間件的對接主流!
我們能夠看到 SpringCloud Stream 的一套技術體系試圖把消息驅動推動到一個更高的層次,但就目前實際使用情況來看我覺得這個目標還是有些遙遠......
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/HAmCsccbPznAR5YLexWH-Q