Flink 的處理背壓​原理及問題 - 面試必備

轉自:https://zhuanlan.zhihu.com/p/38157397

反壓機制(BackPressure)被廣泛應用到實時流處理系統中,流處理系統需要能優雅地處理反壓(backpressure)問題。反壓通常產生於這樣的場景:短時負載高峯導致系統接收數據的速率遠高於它處理數據的速率。許多日常問題都會導致反壓,例如,垃圾回收停頓可能會導致流入的數據快速堆積,或者遇到大促或秒殺活動導致流量陡增。反壓如果不能得到正確的處理,可能會導致資源耗盡甚至系統崩潰。反壓機制就是指系統能夠自己檢測到被阻塞的 Operator,然後系統自適應地降低源頭或者上游的發送速率。目前主流的流處理系統 Apache Storm、JStorm、Spark Streaming、S4、Apache Flink、Twitter Heron 都採用反壓機制解決這個問題,不過他們的實現各自不同。

不同的組件可以不同的速度執行(並且每個組件中的處理速度隨時間改變)。例如,考慮一個工作流程,或由於數據傾斜或任務調度而導致數據被處理十分緩慢。在這種情況下,如果上游階段不減速,將導致緩衝區建立長隊列,或導致系統丟棄元組。如果元組在中途丟棄,那麼效率可能會有損失,因爲已經爲這些元組產生的計算被浪費了。並且在一些流處理系統中比如 Strom,會將這些丟失的元組重新發送,這樣會導致數據的一致性問題,並且還會導致某些 Operator 狀態疊加。進而整個程序輸出結果不準確。第二由於系統接收數據的速率是隨着時間改變的,短時負載高峯導致系統接收數據的速率遠高於它處理數據的速率的情況,也會導致 Tuple 在中途丟失。所以實時流處理系統必須能夠解決發送速率遠大於系統能處理速率這個問題,大多數實時流處理系統採用反壓(BackPressure)機制解決這個問題。下面我們就來介紹一下不同的實時流處理系統採用的反壓機制:

1.Strom 反壓機制

1.1 Storm 1.0 以前的反壓機制

對於開啓了 acker 機制的 storm 程序,可以通過設置 conf.setMaxSpoutPending 參數來實現反壓效果,如果下游組件 (bolt) 處理速度跟不上導致 spout 發送的 tuple 沒有及時確認的數超過了參數設定的值,spout 會停止發送數據,這種方式的缺點是很難調優 conf.setMaxSpoutPending 參數的設置以達到最好的反壓效果,設小了會導致吞吐上不去,設大了會導致 worker OOM;有震盪,數據流會處於一個顛簸狀態,效果不如逐級反壓;另外對於關閉 acker 機制的程序無效;

1.2 Storm Automatic Backpressure

新的 storm 自動反壓機制 (Automatic Back Pressure) 通過監控 bolt 中的接收隊列的情況,當超過高水位值時專門的線程會將反壓信息寫到 Zookeeper ,Zookeeper 上的 watch 會通知該拓撲的所有 Worker 都進入反壓狀態,最後 Spout 降低 tuple 發送的速度。

每個 Executor 都有一個接受隊列和發送隊列用來接收 Tuple 和發送 Spout 或者 Bolt 生成的 Tuple 元組。每個 Worker 進程都有一個單的的接收線程監聽接收端口。它從每個網絡上進來的消息發送到 Executor 的接收隊列中。Executor 接收隊列存放 Worker 或者 Worker 內部其他 Executor 發過來的消息。Executor 工作線程從接收隊列中拿出數據,然後調用 execute 方法,發送 Tuple 到 Executor 的發送隊列。Executor 的發送線程從發送隊列中獲取消息,按照消息目的地址選擇發送到 Worker 的傳輸隊列中或者其他 Executor 的接收隊列中。最後 Worker 的發送線程從傳輸隊列中讀取消息,然後將 Tuple 元組發送到網絡中。

  1. 當 Worker 進程中的 Executor 線程發現自己的接收隊列滿了時,也就是接收隊列達到 high watermark 的閾值後,因此它會發送通知消息到背壓線程。

  2. 背壓線程將當前 worker 進程的信息註冊到 Zookeeper 的 Znode 節點中。具體路徑就是 /Backpressure/topo1/wk1 下

  3. Zookeepre 的 Znode Watcher 監視 / Backpreesure/topo1 下的節點目錄變化情況,如果發現目錄增加了 znode 節點說明或者其他變化。這就說明該 Topo1 需要反壓控制,然後它會通知 Topo1 所有的 Worker 進入反壓狀態。

  4. 最終 Spout 降低 tuple 發送的速度。

  5. JStorm 反壓機制


Jstorm 做了兩級的反壓,第一級和 Jstorm 類似,通過執行隊列來監測,但是不會通過 ZK 來協調,而是通過 Topology Master 來協調。在隊列中會標記 high water mark 和 low water mark,當執行隊列超過 high water mark 時,就認爲 bolt 來不及處理,則向 TM 發一條控制消息,上游開始減慢發送速率,直到下游低於 low water mark 時解除反壓。

此外,在 Netty 層也做了一級反壓,由於每個 Worker Task 都有自己的發送和接收的緩衝區,可以對緩衝區設定限額、控制大小,如果 spout 數據量特別大,緩衝區填滿會導致下游 bolt 的接收緩衝區填滿,造成了反壓。

限流機制:jstorm 的限流機制, 當下遊 bolt 發生阻塞時, 並且阻塞 task 的比例超過某個比例時(現在默認設置爲 0.1),觸發反壓

限流方式:計算阻塞 Task 的地方執行線程執行時間,Spout 每發送一個 tuple 等待相應時間,然後講這個時間發送給 Spout, 於是, spout 每發送一個 tuple,就會等待這個執行時間。

Task 阻塞判斷方式:在 jstorm 連續 4 次採樣週期中採樣,隊列情況,當隊列超過 80%(可以設置)時,即可認爲該 task 處在阻塞狀態。

  1. SparkStreaming 反壓機制

3.1 爲什麼引入反壓機制 Backpressure

默認情況下,Spark Streaming 通過 Receiver 以生產者生產數據的速率接收數據,計算過程中會出現 batch processing time > batch interval 的情況,其中 batch processing time 爲實際計算一個批次花費時間, batch interval 爲 Streaming 應用設置的批處理間隔。這意味着 Spark Streaming 的數據接收速率高於 Spark 從隊列中移除數據的速率,也就是數據處理能力低,在設置間隔內不能完全處理當前接收速率接收的數據。如果這種情況持續過長的時間,會造成數據在內存中堆積,導致 Receiver 所在 Executor 內存溢出等問題(如果設置 StorageLevel 包含 disk, 則內存存放不下的數據會溢寫至 disk, 加大延遲)。Spark 1.5 以前版本,用戶如果要限制 Receiver 的數據接收速率,可以通過設置靜態配製參數 “spark.streaming.receiver.maxRate” 的值來實現,此舉雖然可以通過限制接收速率,來適配當前的處理能力,防止內存溢出,但也會引入其它問題。比如:producer 數據生產高於 maxRate,當前集羣處理能力也高於 maxRate,這就會造成資源利用率下降等問題。爲了更好的協調數據接收速率與資源處理能力,Spark Streaming 從 v1.5 開始引入反壓機制(back-pressure), 通過動態控制數據接收速率來適配集羣數據處理能力。

3.2 反壓機制 Backpressure

Spark Streaming Backpressure: 根據 JobScheduler 反饋作業的執行信息來動態調整 Receiver 數據接收率。通過屬性 “spark.streaming.backpressure.enabled” 來控制是否啓用 backpressure 機制,默認值 false,即不啓用。

SparkStreaming 架構圖如下所示:

SparkStreaming 反壓過程執行如下圖所示:

在原架構的基礎上加上一個新的組件 RateController, 這個組件負責監聽 “OnBatchCompleted” 事件,然後從中抽取 processingDelay 及 schedulingDelay 信息. Estimator 依據這些信息估算出最大處理速度(rate),最後由基於 Receiver 的 Input Stream 將 rate 通過 ReceiverTracker 與 ReceiverSupervisorImpl 轉發給 BlockGenerator(繼承自 RateLimiter).

  1. Heron 反壓機制

當下遊處理速度跟不上上游發送速度時,一旦 StreamManager 發現一個或多個 Heron Instance 速度變慢,立刻對本地 spout 進行降級,降低本地 Spout 發送速度, 停止從這些 spout 讀取數據。並且受影響的 StreamManager 會發送一個特殊的 start backpressure message 給其他的 StreamManager ,要求他們對 spout 進行本地降級。當其他 StreamManager 接收到這個特殊消息時,他們通過不讀取當地 Spout 中的 Tuple 來進行降級。一旦出問題的 Heron Instance 恢復速度後,本地的 SM 會發送 stop backpressure message 解除降級。

很多 Socket Channel 與應用程序級別的 Buffer 相關聯,該緩衝區由 high watermark 和 low watermark 組成。當緩衝區大小達到 high watermark 時觸發反壓,並保持有效,直到緩衝區大小低於 low watermark。此設計的基本原理是防止拓撲在進入和退出背壓緩解模式之間快速振盪。

  1. Flink 反壓機制

Flink 沒有使用任何複雜的機制來解決反壓問題,因爲根本不需要那樣的方案!它利用自身作爲純數據流引擎的優勢來優雅地響應反壓問題。下面我們會深入分析 Flink 是如何在 Task 之間傳輸數據的,以及數據流如何實現自然降速的。

Flink 在運行時主要由 operators 和 streams 兩大組件構成。每個 operator 會消費中間態的流,並在流上進行轉換,然後生成新的流。對於 Flink 的網絡機制一種形象的類比是,Flink 使用了高效有界的分佈式阻塞隊列,就像 Java 通用的阻塞隊列(BlockingQueue)一樣。還記得經典的線程間通信案例:生產者消費者模型嗎?使用 BlockingQueue 的話,一個較慢的接受者會降低發送者的發送速率,因爲一旦隊列滿了(有界隊列)發送者會被阻塞。Flink 解決反壓的方案就是這種感覺。

在 Flink 中,這些分佈式阻塞隊列就是這些邏輯流,而隊列容量是通過緩衝池來(LocalBufferPool)實現的。每個被生產和被消費的流都會被分配一個緩衝池。緩衝池管理着一組緩衝 (Buffer),緩衝在被消費後可以被回收循環利用。這很好理解:你從池子中拿走一個緩衝,填上數據,在數據消費完之後,又把緩衝還給池子,之後你可以再次使用它。

如下圖所示展示了 Flink 在網絡傳輸場景下的內存管理。網絡上傳輸的數據會寫到 Task 的 InputGate(IG) 中,經過 Task 的處理後,再由 Task 寫到 ResultPartition(RS) 中。每個 Task 都包括了輸入和輸入,輸入和輸出的數據存在 Buffer 中(都是字節數據)。Buffer 是 MemorySegment 的包裝類。

  1. TaskManager(TM)在啓動時,會先初始化 NetworkEnvironment 對象,TM 中所有與網絡相關的東西都由該類來管理(如 Netty 連接),其中就包括 NetworkBufferPool。根據配置,Flink 會在 NetworkBufferPool 中生成一定數量(默認 2048 個)的內存塊 MemorySegment(關於 Flink 的內存管理,後續文章會詳細談到),內存塊的總數量就代表了網絡傳輸中所有可用的內存。NetworkEnvironment 和 NetworkBufferPool 是 Task 之間共享的,每個 TM 只會實例化一個。

  2. Task 線程啓動時,會向 NetworkEnvironment 註冊,NetworkEnvironment 會爲 Task 的 InputGate(IG)和 ResultPartition(RP) 分別創建一個 LocalBufferPool(緩衝池)並設置可申請的 MemorySegment(內存塊)數量。IG 對應的緩衝池初始的內存塊數量與 IG 中 InputChannel 數量一致,RP 對應的緩衝池初始的內存塊數量與 RP 中的 ResultSubpartition 數量一致。不過,每當創建或銷燬緩衝池時,NetworkBufferPool 會計算剩餘空閒的內存塊數量,並平均分配給已創建的緩衝池。注意,這個過程只是指定了緩衝池所能使用的內存塊數量,並沒有真正分配內存塊,只有當需要時才分配。爲什麼要動態地爲緩衝池擴容呢?因爲內存越多,意味着系統可以更輕鬆地應對瞬時壓力(如 GC),不會頻繁地進入反壓狀態,所以我們要利用起那部分閒置的內存塊。

  3. 在 Task 線程執行過程中,當 Netty 接收端收到數據時,爲了將 Netty 中的數據拷貝到 Task 中,InputChannel(實際是 RemoteInputChannel)會向其對應的緩衝池申請內存塊(上圖中的①)。如果緩衝池中也沒有可用的內存塊且已申請的數量還沒到池子上限,則會向 NetworkBufferPool 申請內存塊(上圖中的②)並交給 InputChannel 填上數據(上圖中的③和④)。如果緩衝池已申請的數量達到上限了呢?或者 NetworkBufferPool 也沒有可用內存塊了呢?這時候,Task 的 Netty Channel 會暫停讀取,上游的發送端會立即響應停止發送,拓撲會進入反壓狀態。當 Task 線程寫數據到 ResultPartition 時,也會向緩衝池請求內存塊,如果沒有可用內存塊時,會阻塞在請求內存塊的地方,達到暫停寫入的目的。

  4. 當一個內存塊被消費完成之後(在輸入端是指內存塊中的字節被反序列化成對象了,在輸出端是指內存塊中的字節寫入到 Netty Channel 了),會調用 Buffer.recycle() 方法,會將內存塊還給 LocalBufferPool (上圖中的⑤)。如果 LocalBufferPool 中當前申請的數量超過了池子容量(由於上文提到的動態容量,由於新註冊的 Task 導致該池子容量變小),則 LocalBufferPool 會將該內存塊回收給 NetworkBufferPool(上圖中的⑥)。如果沒超過池子容量,則會繼續留在池子中,減少反覆申請的開銷。

下面這張圖簡單展示了兩個 Task 之間的數據傳輸以及 Flink 如何感知到反壓的:

  1. 記錄 “A” 進入了 Flink 並且被 Task 1 處理。(這裏省略了 Netty 接收、反序列化等過程)

  2. 記錄被序列化到 buffer 中。

  3. 該 buffer 被髮送到 Task 2,然後 Task 2 從這個 buffer 中讀出記錄。

不要忘了:記錄能被 Flink 處理的前提是,必須有空閒可用的 Buffer。

結合上面兩張圖看:Task 1 在輸出端有一個相關聯的 LocalBufferPool(稱緩衝池 1),Task 2 在輸入端也有一個相關聯的 LocalBufferPool(稱緩衝池 2)。如果緩衝池 1 中有空閒可用的 buffer 來序列化記錄 “A”,我們就序列化併發送該 buffer。

這裏我們需要注意兩個場景:

這種固定大小緩衝池就像阻塞隊列一樣,保證了 Flink 有一套健壯的反壓機制,使得 Task 生產數據的速度不會快於消費的速度。我們上面描述的這個方案可以從兩個 Task 之間的數據傳輸自然地擴展到更復雜的 pipeline 中,保證反壓機制可以擴散到整個 pipeline。

5.3 反壓實驗

另外,官方博客中爲了展示反壓的效果,給出了一個簡單的實驗。下面這張圖顯示了:隨着時間的改變,生產者(黃色線)和消費者(綠色線)每 5 秒的平均吞吐與最大吞吐(在單一 JVM 中每秒達到 8 百萬條記錄)的百分比。我們通過衡量 task 每 5 秒鐘處理的記錄數來衡量平均吞吐。該實驗運行在單 JVM 中,不過使用了完整的 Flink 功能棧。

首先,我們運行生產 task 到它最大生產速度的 60%(我們通過 Thread.sleep() 來模擬降速)。消費者以同樣的速度處理數據。然後,我們將消費 task 的速度降至其最高速度的 30%。你就會看到背壓問題產生了,正如我們所見,生產者的速度也自然降至其最高速度的 30%。接着,停止消費 task 的人爲降速,之後生產者和消費者 task 都達到了其最大的吞吐。接下來,我們再次將消費者的速度降至 30%,pipeline 給出了立即響應:生產者的速度也被自動降至 30%。最後,我們再次停止限速,兩個 task 也再次恢復 100% 的速度。總而言之,我們可以看到:生產者和消費者在 pipeline 中的處理都在跟隨彼此的吞吐而進行適當的調整,這就是我們希望看到的反壓的效果。

在 Storm/JStorm 中,只要監控到隊列滿了,就可以記錄下拓撲進入反壓了。但是 Flink 的反壓太過於天然了,導致我們無法簡單地通過監控隊列來監控反壓狀態。Flink 在這裏使用了一個 trick 來實現對反壓的監控。如果一個 Task 因爲反壓而降速了,那麼它會卡在向 LocalBufferPool 申請內存塊上。那麼這時候,該 Task 的 stack trace 就會長下面這樣:

java.lang.Object.wait(Native Method)
o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING request
[...]

那麼事情就簡單了。通過不斷地採樣每個 task 的 stack trace 就可以實現反壓監控。

Flink 的實現中,只有當 Web 頁面切換到某個 Job 的 Backpressure 頁面,纔會對這個 Job 觸發反壓檢測,因爲反壓檢測還是挺昂貴的。JobManager 會通過 Akka 給每個 TaskManager 發送TriggerStackTraceSample消息。默認情況下,TaskManager 會觸發 100 次 stack trace 採樣,每次間隔 50ms(也就是說一次反壓檢測至少要等待 5 秒鐘)。並將這 100 次採樣的結果返回給 JobManager,由 JobManager 來計算反壓比率(反壓出現的次數 / 採樣的次數),最終展現在 UI 上。UI 刷新的默認週期是一分鐘,目的是不對 TaskManager 造成太大的負擔。

總結

Flink 不需要一種特殊的機制來處理反壓,因爲 Flink 中的數據傳輸相當於已經提供了應對反壓的機制。因此,Flink 所能獲得的最大吞吐量由其 pipeline 中最慢的組件決定。相對於 Storm/JStorm 的實現,Flink 的實現更爲簡潔優雅,源碼中也看不見與反壓相關的代碼,無需 Zookeeper/TopologyMaster 的參與也降低了系統的負載,也利於對反壓更迅速的響應。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/uUNtDIwGUXgZIV-KYQ4mFA