Kafka 流 - 抑制

在這篇文章中,我將解釋 Kafka Streams 抑制的概念。儘管它看起來很容易理解,但還是有一些內在的問題 / 事情是必須要了解的。這是我上一篇博文 CDC 分析的延續。

架構

一個典型的 CDC 架構可以表示爲:。

使用 Kafka 及其組件的 CDC 架構

在上述架構中。

Kafka Streams 應用程序可以用 Java/Scala 編寫。

我的要求是將 CDC 事件流從多個表中加入,並每天創建統計。爲了做到這一點,我們不得不使用 Kafka Streams 的抑制功能。

要理解 Kafka 流的壓制概念,我們首先要理解聚合(Aggregation)。

聚合的概念

Kafka Streams Aggregation 的概念與其他函數式編程(如 Scala/Java Spark Streaming、Akka Streams)相當相似。這篇文章只是涵蓋了其中一些重要的概念。關於詳細的聚合概念,請訪問 confluent 文檔。

聚合的概念

聚合是一種有狀態的轉換操作,它被應用於相同鍵的記錄。Kafka Streams 支持以下聚合:聚合、計數和減少。你可以在 KStream 或 KTable 上運行 groupBy(或其變體),這將分別產生一個 KGroupedStream 和 KGroupedTable。

要在 Kafka 流中進行聚合,可以使用。

在 Kafka Streams 中,有不同的窗口處理方式。請參考文檔。我們對 1 天的 Tumbling 時間窗口感興趣。

注意:所有的聚合操作都會忽略空鍵的記錄,這是顯而易見的,因爲這些函數集的目標就是對特定鍵的記錄進行操作。因此,我們需要確 f 保我們首先對我們的事件流做 selectKeyoperation。

Kafka-streams-windowing

在程序中添加 suppress(untilWindowClose...) 告訴 Kafka Streams 抑制所有來自 reduce 操作的輸出結果,直到 "窗口關閉"。" 當窗口關閉時,它的結果不能再改變,所以任何從 suppress(untilWindowClose...) 出來的結果都是其窗口的最終結果。

根據上述文件中的定義,我們希望每天在寬限期過後產生一個彙總的統計信息(與 UTC 一致)。但是,有一個注意點。在遇到相同的 group-by key 之前,suppress 不會刷新聚合的記錄!!。

在 CDC 事件流中,每個表都會有自己的 PK,我們不能用它作爲事件流的鍵。

爲了在所有事件中使用相同的 group-by key,我不得不在創建統計信息時在轉換步驟中對 key 進行硬編碼,如 "KeyValue.pair("store-key", statistic)"。然後,groupByKey() 將正確地將所有的統計信息分組。

在 CDC 架構中,我們不能期望在寬限期後就有 DB 操作發生。在非高峯期 / 週末,可能沒有數據庫操作。但我們仍然需要生成聚合消息。

爲了從壓制中刷新聚集的記錄,我不得不創建一個虛擬的 DB 操作(更新任何具有相同內容的錶行,如 update tableX set id=(select max(id) from tableX);。這個假的 DB 更新操作,我必須每天在寬限期後立即通過 cronjob 進行。也許這個 cronjob 可以取代 ProcessorContext#schedule(), Processor#punctuate()(還沒有嘗試,因爲我需要在這個應用程序中引入硬編碼的表名)。

壓制和重放問題

當我們重放來計算一個較長時期的彙總統計時,問題就更明顯了。流媒體時間變得很奇怪,聚合窗口也過期了,我們得到以下警告。

2021-04-15 08:30:49 WARN 跳過過期窗口的記錄。key=[statistics-store-msg-key] topic=[statistics-streaming-aggregates-statistics-stream-store-repartition] partition=[0] offset=[237] timestamp=[1618420920468] window=[1618358400000,1618444800000) expiration=[1618459200477] streamTime=[1618459200477]

爲了防止這種過期的窗口並得到奇怪的彙總結果,我們需要將寬限期增加到相當大的數值,如下圖所示。

自動計算梯度長度

如上圖所示,當我們進行重放並給出 "event-collection-start" 時,我們應該自動設置 "grace duration"(足夠大)。然後,kafka 流將處理所有聚集的事件,沒有任何過期。但最終的結果仍然不會被 "衝出" 壓制窗口。我們需要通過在啓動應用程序後創建一個假的更新來強行做到這一點。由於這是一個批處理程序,我們還需要 "kill $pid" 來關閉(直到 KIP-95 完成:開放 3 年)。

我希望很多人像我一樣在使用 suppress 時偶然發現了這個問題,對他們來說,這相當有用。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/n2pF8xgR66zfTy3z7kqfsg