構建面向生產的消息隊列(全)
- 消息隊列的本質
消息隊列: 的作用有:業務解耦、異步調用、流量削峯。如下圖所示:
在正式介紹 MQ 之前,我們需要明確幾個容易混淆的概念: Topic、Queue、Broker:
Topic 是一個邏輯上的概念,實際上 Message 是在每個 Broker 上以 Queue 的形式記錄。
1、消費者發送的 Message 會在 Broker 中的 Queue 隊列中記錄。
2、一個 Topic 的數據可能會存在多個 Broker 中。
3、一個 Broker 存在多個 Queue。
4、單個的 Queue 也可能存儲多個 Topic 的消息。
5、每個 Topic 在 Broker 上會劃分成幾個邏輯隊列,每個邏輯隊列保存一部分消息數據,但是保存的消息數據實際上不是真正的消息數據,而是指向 commit log 的消息索引。
6、Broker 是真正存儲 Message 的地方,需要集羣部署。commit log 存在 Broker 上。
在生產環境中,Broker 服務器一般需要配置性能比較好的本地盤,如 SSD、PCI-E 磁盤
MQ 的三大作用詳解
MQ 的三大作用:業務解耦、異步調用、流量削峯。
在介紹消息中間件之前,我們先想一下,如果不用 MQ,業務之間最常用的調用方式是什麼?比較常見的是 RPC 和 REST 方式。兩種簡單的對比如下:
因爲 REST 靈活度高、性能低;而 RPC 性能高而靈活度低,因此我們可以對外接口用 REST,內部通訊用 RPC。
內部採用 RPC 的架構如下所示:
如果是核心應用,上面架構沒有問題。但如果業務邏輯被加入了有時效性的運營活動邏輯,頻繁修改代碼邏輯顯然不靠譜。
藉助於 MQ,我們將運行活動業務邏輯單拎出來,放到 MQ 後面。客戶端的發起的請求,都發到 MQ 一份,後面的業務邏輯根據自己的需求進行消費。這樣,通過 MQ,我們實現了業務解耦(運營活動與核心業務)
上面我們提到很多時候內部業務調用是 RPC。在一些場景下,我們可以通過 MQ 替換 RPC,實現業務異步調用。
那麼,MQ 能否完全替代 RPC?理論上可以替代 RPC。但是在這種情況下,整個系統的穩定性都靠 MQ,存在一定的風險。而 PRC 是個獨立的框架,業務節點有問題,對全局沒有影響。所以不建議把 RPC 都換成 MQ。
接下來,我們看一個具體的場景(電商場景),如何通過 MQ 實現業務異步調用。
在下圖中,不使用 MQ 之前,交易服務向賣家推送服務、記錄交易數據等都是通過 RPC 實現。實際上,這與核心業務邏輯無關。因此可以改造成第二張圖的模式。
在介紹了 MQ 實現業務邏輯拆分和異步調用後,接下來我們查看 MQ 實現流量削峯的場景。
京東經常舉辦秒殺的活動。在秒殺開始那一刻,很多用戶發起請求,訪問量突增。此時爲了保證業務邏輯層的平穩運行,需要在 API 網關和業務員邏輯之間增加一個 MQ。客戶端請求都被堆積在 MQ,然後業務邏輯處理。此時,我們可以給用戶開放一個秒殺結果的查詢接口,通過 RPC 調用業務邏輯層。
MQ 雖然能夠實現流量削峯、業務解耦、異步調用,但引入 MQ 也有一切缺點,例如增加了故障診斷的複雜度、增加了架構的複雜度。因此 MQ 不能完全替代 RPC,兩者要結合使用。在微服務的場景中,MQ 通常放在 API 網關和業務邏輯層之間,如上圖所示。
幾種 MQ 的對比
接下來,我們對比一下幾種主流 MQ 的優劣勢:
從上圖中,我們可以看到,RocketMQ 在功能上是有一定的優勢的。Rocket 目前是一個很受歡迎的 MQ 開源項目:
如果要對比 Kafka 和 RocketMQ 的應用場景,簡單而言:Kafka 做的不是業務的事情,它不保證業務的一致性,它是系統間的數據流通道;RocketMQ 做的是業務相關的事情,它保證業務的一致性,它實現高性能可靠信息傳輸。
正常的消息會負載均衡發送。一個 Topic 會跨多個隊列。如果想要順序消息,就必須發到一個隊列,並且只有一個線程消費消息。
RocketMQ 被廣泛使用的幾個核心原因是:支持事務消息、支持延遲消息、支持消息重發、支持 customer 端 tag 過濾、支持消息回放。
RocketMQ 的拓撲與邏輯圖
RocketMQ 的拓撲圖如下。
RocketMQ 的 Name Server 集羣中每個節點都是相互獨立的,負責 MQ 集羣的服務註冊發現以及 Topic 的尋址,例如尋找 Topic 在哪個 Broker 上,找到 Broker 的 IP。
邏輯圖如下:
由於 RocketMQ 不能實現主從切換,因此消息生產者只連接到主上,消費者連接到主和從上。
在上圖中,Broker 的 Master 和 Slave 之間是同步的。他們之間有兩種同步方式:半同步 / 異步同步(AP 模型,類似 SNS 場景)、強同步(CP 模型,類似訂單場景)。
我們查看客戶端的尋址,例如消費者尋找 topic1,它會去 NameNode 集羣去找,例如找到 Topic1 在 Broker1 上,那麼是返回 Master 還是 Slave 的 IP? 這看業務場景,如果是 CP 業務,就可以返回 slave 的 P。這樣消息寫入通過 Master,消息消費通過 Slave Node。如果 Broker 中節點的磁盤性能比較高,直接連接 Master 也可以。如果客戶端本身連接的是 master,master down 了,那麼客戶端可以連接到 slave 繼續讀。但這就牽扯到一個斷點問題。消費者連接到 slave 就知道從斷點開始讀了。
消息分爲時序消息和非時序消息。什麼叫非時序消息,就是例如消息存在 Topic 的順序是 A1、A2、A3。但消費端的消費順序可以是:A3、A2、A1,並且不影響業務,這就是非時序消息。
如果是時序消息,就要求消息的消費順序和存儲順序是一致的。存儲的時候是 A1、A2、A3。那麼消費的時候,也需要 A1、A2、A3 的順序。想要保證消息的時序性,單機單線程是很重要。也就是說,Topic 就不要跨 Broker 了。
結合 RockerMQ 的可靠性和可用性,我們通常的使用方式如下圖所示,即多 Master 多 Salve + 異步複製。
在 RockerMQ 中,消息生產者在寫消息是以順序追加的方式,寫在 commitlog 中的。commitlog 是消息主體。然後 dispatch 線程把消息的偏移量和大小從 commitlog 中讀出來,放到按主題、以隊列的形式消費隊列。也就是說,隊列放的不是消息主體。而是消息的索引信息。
CommitLog 以物理文件的方式存放,每臺 Broker 上的 CommitLog 被本機器所有 ConsumeQueue 共享。爲了提升消息讀取的性能,RocketMQ 默認會把 commitlog 以 1G 大小進行拆分。此外,對於 MQ 系統,儘量使用 SDD 或 NVRAM。
這樣做的好處很明顯:
-
隊列輕量化,單個隊列數據量非常少。
-
對磁盤訪問串行化,避免磁盤競爭。
但缺點也很明顯:
-
寫消息是順序寫,但讀消息變成了完全隨機讀。
-
讀一條消息,先讀 consume queue,再讀 commit log,增加了開銷。
-
保證了 commit log 和 consume queue 完全一致,增加了架構設計的複雜性。
RocketMQ 的消息傳遞方式
我們知道,服務之間消息傳遞主要有兩種模式:隊列(Queue)和主題(Topic)。
-
Queue 模式是一種一對一的傳輸模式。在這種模式下,消息的生產者(Producer)將消息傳遞的目的地類型是 Queue。Queue 中一條消息只能傳遞給一個消費者(Consumer),如果沒有消費者在監聽隊列,消息將會一直保留在隊列中,直至消息消費者連接到隊列爲止,消費者會從隊列中請求獲得消息 ---> 消費者 pull 方式。
-
Topic 模式是一種一對多的消息傳輸模式。在這種模式下,消息的生產者(Producer)將消息傳遞的目的地類型是 Topic。消息到達 Topic 後,消息服務器將消息發送至所有訂閱此主題的消費者。--->push 給消費者模式。
在 RocketMQ 中,Topic 是個抽象的概念,每個 Topic 底層對應 N 個 queue,而數據也真實存在 queue 上的。Queue 是 Topic 在一個 Broker 上的分片等分爲指定份數後的其中一份,是負載均衡過程中資源分配的基本單元。也就是說,一個 topic 跨多個 queue,每個 queue 在一個 broker 上,因此 topic 可以跨多個 broker。
實際上,push 和 pull 方式都各優缺點。
push 方式:消息實時性高,但沒有考慮客戶端的消費能力、即消息處理速度。
pull 方式:消息實時性低,可能造大量無效請求。
爲了取長補短,RocketMQ 採用 LongPoll 方式,即長輪詢方式。在這種方式下,消費者主動發送 pull 消息,然後 broker hold 住請求,直到有消息才返回。請求的超時時間是 30s。當請求超時後,消費端再度發起請求。
RocketMQ 的消費者消費方式
關於消費者消費消息的方式,主要有兩種:
集羣消費:單條消息只消費一次,各階段均勻消費 Topic 的消息。
廣播消費:各集羣消費全量的消息,單條消息在每個集羣都被消費一次。
RocketMQ 最大的吞吐量設計
RocketMQ 的消息是存儲到磁盤上的,這樣既能保證斷電後恢復,又可以讓存儲的消息量超出內存的限制。
RocketMQ 爲了提高性能,會盡可能地保證磁盤的順序寫。消息在通過 Producer 寫入 RocketMQ 的時候,有兩種
寫磁盤方式:
1)異步刷盤方式:在返回寫成功狀態時,消息可能只是被寫入了內存的 PAGECACHE,寫操作的返回快,吞吐量大;當內存裏的消息量積累到一定程度時,統一觸發寫磁盤操作,快速寫入
2)同步刷盤方式:在返回寫成功狀態時,消息已經被寫入磁盤。具體流程是,消息寫入內存的 PAGECACHE 後,立刻通知刷盤線程刷盤,然後等待刷盤完成,刷盤線程執行完成後喚醒等待的線程,返回消息寫成功的狀態。
結合 MQ 的刷盤策略和同步策略,我們想想一下,RocketMQ 能夠承載的最大的吞吐量,怎麼設計?
採用 Broker 異步複製 + 異步刷盤模式:
-
消息寫到 Broker 的 Master 節點的內存中,就返回成功。
-
master 節點內存中的消息,異步地刷到本地磁盤(ASYNC_FLUSH)。
-
消息異步地同步到 Salve node 內存
-
Salve node 內存中的消息異步地刷到本地的磁盤。
RocketMQ 最大的高可用設計
針對 RocketMQ,我們如果做高可用?
對於非時序消息,我們可以將 Topic 存在多個 Broker 上。這樣即使一個 Master Node 出現問題,消息生產者還可以向其他 Broker 的 master 上寫消息。
嚴格時序消息的話,Broker 就必須要單機單線程(Master 和 Slave 強同步)。如果 master 掛了,消息發送會失敗,消息也不能被消費。就需要做同一個 Broker 內的 master 和 slave 的主從切換。
Broker 主從如何自動切換?
Broker 的主從切換由 Name Server 控制。Name Server 的主從切換由 Zookeeper 控制,如下圖所示:
上圖方案有一個小的缺點是:三個 name server 之間沒有強一致的協議,客戶會初選路由信息短暫不一致的問題。
Topic 的高可用
Topic 的高可用可以採用分佈式主從部部署:
我們先看下圖:
在 MQ 中,通過我們把一個 Broker 認爲是一個 MQ 的 Server。因此 MQ 的高可用就分爲兩種:
-
明確的主備劃分:Master 和 Slave。它倆各式一個服務器。每個服務器上有 queue,topic 存在這些 queue 上,兩個服務器之間主備複製。
-
Leader flower 多一致性要求比較高。可能會造成 leader 壓力很高。因此這種模式我們需要把每個 topic 的 leader 分散到不同的 broker 上。也就是說,broker 之間互爲主從。
高可用延遲消息的設計
延時消息的設計,分爲兩種:
-
帶外模式:Redis-MySQL
-
帶內模式:改造 MQ,實現延時集羣。
延時任務需求:
-
例如產品訂單,需要半個小時內完成支付。否則自動取消。這個延遲的時間,需要可以自定義。
-
如何檢查出於退款狀態的訂單是否已經成功退款?
具體的實現:
-
少部分實現:通過 RocketMQ 做演示消息,但延遲級別是有限的。因此需要基於 rockermq 做改造。----- 內置模式。
-
大部分是基於定時任務掃描庫來實現的。mysql 裏存儲消息本身,redis sorted set 存儲時間輪。----- 帶外模式。
Redis-MySQL:每個 topic 都有自己的 delay queus 和 ready queue。mysql 裏存儲消息本身。delay queue 和 ready queue 用 redis,提供 HTTP/RPC 形式的接口,client 包裝 http/rpc 請求。
Server 節點提供寫入和讀取:提供 HTTP/RPC 形式的接口,提供發送,拉取,消費 ack 三個接口
delay queue 有多個配置,比如有 20 個 bucket。
如下面兩個圖所示:
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/WdJxhVXwmxjGmKeYw0INhA