RocketMQ-Streams 架構設計淺析

作者:倪澤,RocketMQ 資深貢獻者, RocketMQ-Streams 維護者之一,阿里雲技術專家。

RocketMQ-Streams 是一款輕量級流處理引擎,應用以 SDK 的形式嵌入並啓動,即可進行流處理計算,不依賴於其他組件,最低 1 核 1G 可部署,在資源敏感場景具有很大優勢。同時它支持 UTF/UTAF/UTDF 多種計算類型。目前已經廣泛運用於安全,風控,邊緣計算等場景。

本期將帶領大家從源碼的角度,解析 RocketMQ-Streams 的構建,數據流轉過程。也會討論 RocketMQ-Streams 是如何實現故障恢復和擴縮容的。

01

使用示例

代碼示例:

public class RocketMQWindowExample {
    public static void main(String[] args) {
        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
        source.fromRocketmq(
                "topicName",
                "groupName",
                false,
                "namesrvAddr")
                .map(message -> JSONObject.parseObject((String) message))
                .window(TumblingWindow.of(Time.seconds(10)))
                .groupBy("groupByKey")
                .sum("字段名", "輸出別名")
                .count("total")
                .waterMark(5)
                .setLocalStorageOnly(true)
                .toDataSteam()
                .toPrint(1)
                .start();
    }
}

pom 文件依賴:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-streams-clients</artifactId>
  <version>1.0.1-preview</version>
</dependency>

上述代碼是一個簡單的使用例子,它主要的功能是從 RocketMQ 中指定 topic 讀取數據,經過轉化成 JSON 格式,以 groupByKey 字段值分組、10 秒一個窗口,對 OutFlow 字段值進行累加,結果輸出到 total 字段,並打印到控制檯上。上述計算中還允許輸入亂序 5 秒,即窗口時間到達後不會馬上觸發,而是會等待 5s,如果這個段時間內,有窗口數據到達依然有效。上述 setLocalStorageOnly 爲 true 表示不對狀態進行遠程存儲,僅使用 RocksDB 做本地存儲。目前 1.0.1 的 RocketMQ-Streams 版本依然使用 Mysql 作爲遠程狀態存儲,下一版本將使用 RocketMQ 作爲遠程狀態存儲。

02

RocketMQ 總體架構圖

圖片

RocketMQ-Streams 作爲輕量流處理引擎,本質上是作爲 RocketMQ 的客戶端消費數據,一個流處理實例可以處理多個隊列,而一個隊列只能被一個實例消費。若干 RocketMQ-Streams 實例組成消費者組共同消費數據,通過擴容實例達到增加處理能力的消費,減少實例則會發生 rebalance,消費的隊列自動重平衡到其他消費實例上。從上述圖中,我們還可以看出計算實例間不需要直接交換任何數據,可各自獨立完成所有計算處理。這種架構簡化了 RocketMQ-Streams 本身的設計,同時也可非常方便的進行實例擴縮容。

處理拓撲

處理器拓撲爲應用定義了流處理過程的計算邏輯,它由一系列的處理器節點和數據流向組成。例如,在開頭的代碼示例中,整個處理拓撲由 source、map、groupBy、sum、count、print 等處理節點組成。有兩種特殊的處理節點:

他沒有任何上游節點,從外部讀入數據到 RocketMQ-Streams,並交由下游處理。

他沒有任何下游節點,他將處理後的數據寫出到外部。

處理拓撲僅僅是流處理代碼的邏輯抽象,在流計算啓動時將會被實例化。爲了設計簡單,目前一個流處理實例中僅有一張計算拓撲。

在所有流處理算子之中,有兩種特別的算子,一種是涉及數據分組的算子 groupBy,另一種是有狀態計算例如 count 等。這兩種算子會影響整個計算拓撲的構建,下面將具體分析 RocketMQ-Streams 是如何處理他們的。

groupBy

分組算子 groupBy 特殊是因爲經過 groupBy 操作,後續算子期望對相同 key 的數據進行操作,例如經過 groupBy("年級")之後再進行 sum 就是對按照年級分組求和,這就要求需要將具有相同 “年級” 的數據重新路由到一個流計算實例上處理,如果不這樣做,每個實例上得出的結果都將是不完整的,整體輸出結果也將是錯誤的。

RocketMQ-Streams 採用 shuffle topic 這種方式來處理。具體說來,計算實例將 groupBy 數據重新發回 RocketMQ 的一個 topic,並且在發回過程中按照 key 的 hash 值來選擇目標隊列,再從這個 topic 讀取數據進行後續流處理。按照 key hash 後相同的 key 一定在一個隊列裏面,而一個隊列只會被一個流處理實例消費,這樣就達到相同 key 被路由到一個實例上處理的效果。

有狀態算子

有狀態算子與無狀態算子相對。如果計算結果只與當前輸入有關,和上一次輸入無關就是無狀態算子,例如 filter、map、foreach 結果只與當前輸入有關係。還有一種算子的輸出結果不僅與當前算子有關係還與上一次輸入有關,例如 sum,需要對一段時間內輸入進行求和,他就是有狀態算子。

RocketMQ-Streams 利用 RocksDB 作爲本地存儲,Mysql 作爲遠程存儲來保存狀態數據。他具體做法是:

  1. 當發現消息來自新的隊列時,檢查是否需要加載狀態,如果需要異步加載狀態到 RocksDB。

  2. 數據到達有狀態算子時,如果加載完成使用 RocksDB 中狀態進行計算,如果沒有,使用 Mysql 中狀態計算。

  3. 計算完成後,將狀態數據保存到 RocksDB 和 Mysql 中。

  4. 窗口觸發後,從 RocksDB 中查詢出狀態數據,並將結果向下遊算子傳遞。

整體數據流向圖如下:

圖片

03

擴縮容與故障恢復

擴縮容和故障恢復是一個硬幣的兩面,即同一個事物的兩種表達,計算集羣如果能正確擴縮容就等於具備故障恢復的能力,反之亦然。通過前面介紹我們知道,RocketMQ-Streams 具有非常良好的擴縮容性能,擴容時只需要新部署一個流計算實例即可,縮容時停止計算實例即可。對於無狀態的計算來說比較簡單,擴容後,數據計算不需要之前的狀態。有狀態計算的擴縮容涉及到狀態的遷移。有狀態的擴縮容可由下圖表示:

圖片

當計算實例從 3 個縮容到 2 個,藉助於 RocketMQ 的 rebalance,MQ 會在計算實例之間重新分配。

Instance1 上消費的 MQ2 和 MQ3 被分配到 Instance2 和 Instance3 上,這兩個 MQ 的狀態數據也需要遷移到 Instance2 和 Instance3 上,這也暗示,狀態數據是根據源數據分片保存的;擴容則是剛好相反的過程。

具體實現上,RocketMQ-Streams 採用系統消息來觸發狀態的加載和持久化。

系統消息類別:

//新增消費隊列
NewSplitMessage
//不在消費某個隊列
RemoveSplitMessage
//客戶端持久化消費位點到MQ
CheckPointMessage

當發現消息來自一個新的 RocketMQ 隊列(MessageQueue),RocketMQ-Streams 之前沒有處理過來自該隊列的消息,會先於數據前發送 NewSplitMessage 消息,通過處理拓撲下游算子傳遞,當有狀態算子收到該消息時會將新增隊列對應的狀態加載到本地內存 RocksDB 中,當數據真正到達時,就根據這個狀態繼續計算。

當因爲計算實例增加或者 RocketMQ 集羣變動,rebalance 後,計算實例不再消費某個隊列(MessageQueue)時,會發出 RemoveSplitMessage 消息,有狀態算子刪除本地 RocksDB 中的狀態。

CheckPointMessage 是一種特別的系統消息,他的作用與實現 exactly-once 有關。我們在擴縮容過程中需要做到 exactly-once,才能保證擴縮容或故障恢復對計算結果沒有影響。RocketMQ-streams 向 broker 提交消費 offset 前會產生 CheckPointMessage 消息,向下遊拓撲傳遞,他將保證即將提交消費位點的所有消息都已經被 sink 處理掉。

開源地址:

RocketMQ-Streams 倉庫地址:

https://github.com/apache/rocketmq-streams

RocketMQ 倉庫地址:

https://github.com/apache/rocketmq

04

加入 Apache RocketMQ 社區

十年鑄劍,Apache RocketMQ 的成長離不開全球接近 500 位開發者的積極參與貢獻,相信在下個版本你就是 Apache RocketMQ 的貢獻者,在社區不僅可以結識社區大牛,提升技術水平,也可以提升個人影響力,促進自身成長。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/SBQiyga9tgZa7Q46CHOIRw