Kafka 流 - 抑制
在這篇文章中,我將解釋 Kafka Streams 抑制的概念。儘管它看起來很容易理解,但還是有一些內在的問題 / 事情是必須要了解的。這是我上一篇博文 CDC 分析的延續。
架構
一個典型的 CDC 架構可以表示爲:。
使用 Kafka 及其組件的 CDC 架構
在上述架構中。
-
單獨的表交易信息被存儲在 Kafka 的獨立主題中。這些信息可以通過 Kafka 的 sink 連接器傳輸到目標目的地。
-
爲了做聚合,如計數、統計、與其他流(CRM 或靜態內容)的連接,我們使用 Kafka 流。有些事情也可以用 KSQL 來完成,但是用 KSQL 實現需要額外的 KSQL 服務器和額外的部署來處理。相反,Kafka Streams 是一種優雅的方式,它是一個獨立的應用程序。
Kafka Streams 應用程序可以用 Java/Scala 編寫。
我的要求是將 CDC 事件流從多個表中加入,並每天創建統計。爲了做到這一點,我們不得不使用 Kafka Streams 的抑制功能。
要理解 Kafka 流的壓制概念,我們首先要理解聚合(Aggregation)。
聚合的概念
Kafka Streams Aggregation 的概念與其他函數式編程(如 Scala/Java Spark Streaming、Akka Streams)相當相似。這篇文章只是涵蓋了其中一些重要的概念。關於詳細的聚合概念,請訪問 confluent 文檔。
聚合的概念
聚合是一種有狀態的轉換操作,它被應用於相同鍵的記錄。Kafka Streams 支持以下聚合:聚合、計數和減少。你可以在 KStream 或 KTable 上運行 groupBy(或其變體),這將分別產生一個 KGroupedStream 和 KGroupedTable。
要在 Kafka 流中進行聚合,可以使用。
-
Count。用來計算元素的簡單操作
-
Aggregation。
當我們希望改變結果類型時,就會使用聚合函數。聚合函數有兩個關鍵部分。Initializer 和 Aggregator。當收到第一條記錄時,初始化器被調用,並作爲聚合器的起點。對於隨後的記錄,聚合器使用當前的記錄和計算的聚合(直到現在)進行計算。從概念上講,這是一個在無限數據集上進行的有狀態計算。它是有狀態的,因爲計算當前狀態要考慮到當前狀態(鍵值記錄)和最新狀態(當前聚合)。這可以用於移動平均數、總和、計數等場景。 -
Reduce。
你可以使用 Reduce 來組合數值流。上面提到的聚合操作是 Reduce 的一種通用形式。reduce 操作的結果類型不能被改變。在我們的案例中,使用窗口化操作的 Reduce 就足夠了。
在 Kafka Streams 中,有不同的窗口處理方式。請參考文檔。我們對 1 天的 Tumbling 時間窗口感興趣。
注意:所有的聚合操作都會忽略空鍵的記錄,這是顯而易見的,因爲這些函數集的目標就是對特定鍵的記錄進行操作。因此,我們需要確 f 保我們首先對我們的事件流做 selectKeyoperation。
Kafka-streams-windowing
在程序中添加 suppress(untilWindowClose...) 告訴 Kafka Streams 抑制所有來自 reduce 操作的輸出結果,直到 "窗口關閉"。" 當窗口關閉時,它的結果不能再改變,所以任何從 suppress(untilWindowClose...) 出來的結果都是其窗口的最終結果。
根據上述文件中的定義,我們希望每天在寬限期過後產生一個彙總的統計信息(與 UTC 一致)。但是,有一個注意點。在遇到相同的 group-by key 之前,suppress 不會刷新聚合的記錄!!。
在 CDC 事件流中,每個表都會有自己的 PK,我們不能用它作爲事件流的鍵。
爲了在所有事件中使用相同的 group-by key,我不得不在創建統計信息時在轉換步驟中對 key 進行硬編碼,如 "KeyValue.pair("store-key", statistic)"。然後,groupByKey() 將正確地將所有的統計信息分組。
在 CDC 架構中,我們不能期望在寬限期後就有 DB 操作發生。在非高峯期 / 週末,可能沒有數據庫操作。但我們仍然需要生成聚合消息。
爲了從壓制中刷新聚集的記錄,我不得不創建一個虛擬的 DB 操作(更新任何具有相同內容的錶行,如 update tableX set id=(select max(id) from tableX);。這個假的 DB 更新操作,我必須每天在寬限期後立即通過 cronjob 進行。也許這個 cronjob 可以取代 ProcessorContext#schedule(), Processor#punctuate()(還沒有嘗試,因爲我需要在這個應用程序中引入硬編碼的表名)。
壓制和重放問題
當我們重放來計算一個較長時期的彙總統計時,問題就更明顯了。流媒體時間變得很奇怪,聚合窗口也過期了,我們得到以下警告。
2021-04-15 08:30:49 WARN 跳過過期窗口的記錄。key=[statistics-store-msg-key] topic=[statistics-streaming-aggregates-statistics-stream-store-repartition] partition=[0] offset=[237] timestamp=[1618420920468] window=[1618358400000,1618444800000) expiration=[1618459200477] streamTime=[1618459200477]
爲了防止這種過期的窗口並得到奇怪的彙總結果,我們需要將寬限期增加到相當大的數值,如下圖所示。
自動計算梯度長度
如上圖所示,當我們進行重放並給出 "event-collection-start" 時,我們應該自動設置 "grace duration"(足夠大)。然後,kafka 流將處理所有聚集的事件,沒有任何過期。但最終的結果仍然不會被 "衝出" 壓制窗口。我們需要通過在啓動應用程序後創建一個假的更新來強行做到這一點。由於這是一個批處理程序,我們還需要 "kill $pid" 來關閉(直到 KIP-95 完成:開放 3 年)。
我希望很多人像我一樣在使用 suppress 時偶然發現了這個問題,對他們來說,這相當有用。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/n2pF8xgR66zfTy3z7kqfsg