Kafka 在美團數據平臺的實踐

Kafka 在美團數據平臺承擔着統一的數據緩存和分發的角色,隨着數據量的增長,集羣規模的擴大,Kafka 面臨的挑戰也愈發嚴峻。本文分享了美團 Kafka 面臨的實際挑戰,以及美團針對性的一些優化工作,希望能給從事相關開發工作的同學帶來幫助或啓發。

1. 現狀和挑戰

1.1 現狀

Kafka 是一個開源的流處理平臺,我們首先了解一下 Kafka 在美團數據平臺的現狀。

圖 1-1 Kafka 在美團數據平臺的現狀

如圖 1-1 所示,藍色部分描述了 Kafka 在數據平臺定位爲流存儲層。主要的職責是做數據的緩存和分發,它會將收集到的日誌分發到不同的數據系統裏,這些日誌來源於系統日誌、客戶端日誌以及業務數據庫。下游的數據消費系統包括通過 ODS 入倉提供離線計算使用、直接供實時計算使用、通過 DataLink 同步到日誌中心,以及做 OLAP 分析使用。

Kafka 在美團的集羣規模總體機器數已經超過了 15000 + 臺,單集羣的最大機器數也已經到了 2000 + 臺。在數據規模上,天級消息量已經超過了 30+P,天級消息量峯值也達到了 4 + 億 / 秒。不過隨着集羣規模的增大,數據量的增長,Kafka 面臨的挑戰也愈發嚴峻,下面講一下具體的挑戰都有哪些。

1.2 挑戰

圖 1-2 Kafka 在美團數據平臺面臨的挑戰

如圖 1-2 所示,具體的挑戰可以概括爲兩部分:

第一部分是慢節點影響讀寫,這裏慢節點參考了 HDFS 的一個概念,具體定義指的是讀寫延遲 TP99 大於 300ms 的 Broker。造成慢節點的原因有三個:

  1. 集羣負載不均衡會導致局部熱點,就是整個集羣的磁盤空間很充裕或者 ioutil 很低,但部分磁盤即將寫滿或者 ioutil 打滿。

  2. PageCache 容量,比如說,80GB 的 PageCache 在 170MB/s 的寫入量下僅能緩存 8 分鐘的數據量。那麼如果消費的數據是 8 分鐘前的數據,就有可能觸發慢速的磁盤訪問。

  3. Consumer 客戶端的線程模型缺陷會導致端到端延時指標失真。例如當 Consumer 消費的多個分區處於同一 Broker 時,TP90 可能小於 100ms,但是當多個分區處於不同 Broker 時,TP90 可能會大於 1000ms。

第二部分是大規模集羣管理的複雜性,具體表現有 4 類問題:

  1. 不同 Topic 之間會相互影響,個別 Topic 的流量突增,或者個別消費者的回溯讀會影響整體集羣的穩定性。

  2. Kafka 原生的 Broker 粒度指標不夠健全,導致問題定位和根因分析困難。

  3. 故障感知不及時,處理成本較高。

  4. Rack 級別的故障會造成部分分區不可用。

2. 讀寫延遲優化

接下來我們先介紹一下針對讀寫延遲問題,美團數據平臺做了哪些優化。首先從宏觀層面,我們將受影響因素分爲應用層和系統層,然後詳細介紹應用層和系統層存在的問題,並給出對應的解決方案,包括流水線加速、Fetcher 隔離、遷移取消和 Cgroup 資源隔離等,下面具體介紹各種優化方案的實現。

2.1 概覽

圖 2-1 Kafka 讀寫延遲優化概覽

圖 2-1 是針對讀寫延遲碰到的問題以及對應優化方案的概覽圖。我們把受影響的因素分爲應用層和系統層。

應用層主要包括 3 類問題:

1)Broker 端負載不均衡,例如磁盤使用率不均衡、ioutil 不均衡等問題。個別磁盤負載升高影響整個 Broker 的請求受到影響。

2)Broker 的數據遷移存在效率問題和資源競爭問題。具體來講,包括以下 3 個層面:

3)Consumer 端單線程模型存在缺陷導致運維指標失真,並且單 Consumer 消費的分區數不受限制,消費能力不足就無法跟上實時最新的數據,當消費的分區數增多時可能會引起回溯讀。

系統層也主要包括 3 類問題:

1)PageCache 污染。Kafka 利用內核層提供的 ZeroCopy 技術提升性能,但是內核層無法區分實時讀寫請求和回溯讀請求,導致磁盤讀可能污染 PageCache,影響實時讀寫。

2)HDD 在隨機讀寫負載下性能差。HDD 對於順序讀寫友好,但是面對混合負載場景下的隨機讀寫,性能顯著下降。

3)CPU 和內存等系統資源在混部場景下的資源競爭問題。在美團大數據平臺,爲了提高資源的利用率,IO 密集型的服務(比如 Kafka)會和 CPU 密集型的服務(比如實時計算作業)混布,混布存在資源競爭,影響讀寫延遲。

以上提到的問題,我們採取了針對性的策略。比如應用層的磁盤均衡、遷移流水線加速、支持遷移取消和 Consumer 異步化等。系統層的 Raid 卡加速、Cgroup 隔離優化等。此外,針對 HDD 隨機讀寫性能不足的問題,我們還設計並實現了基於 SSD 的緩存架構。

2.2 應用層

① 磁盤均衡

圖 2-2 Kafka 應用層磁盤均衡

磁盤熱點導致兩個問題:

針對這兩個問題,我們採用了基於空閒磁盤優先的分區遷移計劃,整個計劃分爲 3 步,由組件 Rebalancer 統籌管理:

  1. 生成遷移計劃。Rebalancer 通過目標磁盤使用率和當前磁盤使用率(通過 Kafka Monitor 上報)持續生成具體的分區遷移計劃。

  2. 提交遷移計劃。Rebalancer 向 Zookeeper 的 Reassign 節點提交剛纔生成的遷移計劃,Kafka 的 Controller 收到這個 Reassign 事件之後會向整個 Kafka Broker 集羣提交 Reassign 事件。

  3. 檢查遷移計劃。Kafka Broker 負責具體執行數據遷移任務,Rebalancer 負責檢查任務進展。

如圖 2-2 所示,每塊 Disk 持有 3 個分區是一個相對均衡的狀態,如果部分 Disk 持有 4 個分區,比如 Broker1-Disk1 和 Broker4-Disk4;部分 Disk 持有 2 個分區,比如 Broker2-Disk2,Broker3-Disk3,Reblanacer 就會將 Broker1-Disk1 和 Broker4-Disk4 上多餘的分區分別遷移到 Broker2-Disk2 和 Broker3-Disk3,最終儘可能地保證整體磁盤利用率均衡。

② 遷移優化

雖然基於空閒磁盤優先的分區遷移實現了磁盤均衡,但是遷移本身仍然存在效率問題和資源競爭問題。接下來,我們會詳細描述我們採取的針對性策略。

  1. 採取流水線加速策略優化遷移緩慢引起的遷移效率問題。

  2. 支持遷移取消解決長尾分區遷移緩慢引起的讀寫請求受影響問題。

  3. 採取 Fetcher 隔離緩解數據遷移請求和實時讀寫請求共用 Fetcher 線程的問題。

優化一,流水線加速

圖 2-3 流水線加速

如圖 2-3 所示,箭頭以上原生 Kafka 版本只支持按批提交,比如說一批提交了四個分區,當 TP4 這個分區一直卡着無法完成的時候,後續所有分區都無法繼續進行。採用流水線加速之後,即使 TP4 這個分區還沒有完成,可以繼續提交新的分區。在相同的時間內,原有的方案受阻於 TP4 沒有完成,後續所有分區都沒辦法完成,在新的方案中,TP4 分區已經遷移到 TP11 分區了。圖中虛線代表了一個無序的時間窗口,主要用於控制併發,目的是爲了和原有的按組提交的個數保持一致,避免過多的遷移影響讀寫請求服務。

優化二,遷移取消

圖 2-4-1 遷移問題

如圖 2-4-1 所示,箭頭左側描述了因爲遷移影響的三種線上類型。第一種是因爲遷移會觸發最舊讀,同步大量的數據,在這個過程中會首先將數據回刷到 PageCache 上引起 PageCache 污染,導致某個實時讀的分區發生 Cache Miss,觸發磁盤度進而影響讀寫請求;第二種是當存在某些異常節點導致遷移 Hang 住時,部分運維操作無法執行,比如流量上漲觸發的 Topic 自動擴分區。因爲在 Kafka 遷移過程中這類運維操作被禁止執行。第三種和第二種類似,它的主要問題是當目標節點 Crash,Topic 擴分區也無法完成,用戶可能一直忍受讀寫請求受影響。

圖 2-4-2 遷移取消

針對上面提到的 3 種問題,我們支持了遷移取消功能。管理員可以調用遷移取消命令,中斷正在遷移的分區,針對第一種場景,PageCache 就不會被污染,實時讀得以保證;在第二、三種場景中,因爲遷移取消,擴分區得以完成。遷移取消會刪除未完成遷移的分區,刪除可能會導致磁盤 IO 出現瓶頸影響讀寫,因此我們通過支持平滑刪除避免大量刪除引起的性能問題。

優化三,Fetcher 隔離

圖 2-5 Fetcher 隔離

如圖 2-5,綠色代表實時讀,紅色代表延時讀。當某一個 Follower 的實時讀和延時讀共享同一個 Fetcher 時,延時讀會影響實時讀。因爲每一次延時讀的數據量是顯著大於實時讀的,而且延時讀容易觸發磁盤讀,可能數據已經不在 PageCache 中了,顯著地拖慢了 Fetcher 的拉取效率。

針對這種問題,我們實施的策略叫 Fetcher 隔離。也就是說所有 ISR 的 Follower 共享 Fetcher,所有非 ISR 的 Follower 共享 Fetcher,這樣就能保證所有 ISR 中的實時讀不會被非 ISR 的回溯讀所影響。

③ Consumer 異步化

圖 2-6 Kafka-Broker 分階段延時統計模型

在講述 Consumer 異步化前,需要解釋下圖 2-6 展示的 Kafka-Broker 分階段延時統計模型。Kafka-Broker 端是一個典型的事件驅動架構,各組件通過隊列通信。請求在不同組件流轉時,會依次記錄時間戳,最終就可以統計出請求在不同階段的執行耗時。

具體來說,當一個 Kafka 的 Producer 或 Consumer 請求進入到 Kafka-Broker 時,Processor 組件將請求寫入 RequestQueue,RequestHandler 從 RequestQueue 拉取請求進行處理,在 RequestQueue 中的等待時間是 RequestQueueTime,RequestHandler 具體的執行時間是 LocalTime。當 RequestHandler 執行完畢後會將請求傳遞給 DelayedPurgatory 組件中,該組件是一個延時隊列。

當觸發某一個延時條件完成了以後會把請求寫到 ResponseQueue 中,在 DelayedPurgatory 隊列持續的時間爲 RemoteTime,Processor 會不斷的從 ResponseQueue 中將數據拉取出來發往客戶端,標紅的 ResponseTime 是可能會被客戶端影響的,因爲如果客戶端接收能力不足,那麼 ResponseTime 就會一直持續增加。從 Kafka-Broker 的視角,每一次請求總的耗時時 RequestTotalTime,包含了剛纔所有流程分階段計時總和。

圖 2-7 Consumer 異步化

ResponseTime 持續增加的主要問題是因爲 Kafka 原生 Consumer 基於 NIO 的單線程模型存在缺陷。如圖 2-7 所示,在 Phase1,User 首先發起 Poll 請求,Kafka-Client 會同時向 Broker1、Broker2 和 Broker3 發送請求,Broker1 的數據先就緒時,Kafka Client 將數據寫入 CompleteQueue,並立即返回,而不是繼續拉取 Broker2 和 Broker3 的數據。後續的 Poll 請求會直接從 CompleteQueue 中讀取數據,然後直接返回,直到 CompleteQueue 被清空。在 CompleteQueue 被清空之前,即使 Broker2 和 Broker3 的端的數據已經就緒,也不會得到及時拉取。如圖中 Phase2,因爲單線程模型存在缺陷導致 WaitFetch 這部分時長變大,導致 Kafka-Broker 的 RespnseTime 延時指標不斷升高,帶來的問題是無法對服務端的處理瓶頸進行精準的監控與細分。

圖 2-8 引入異步拉取線程

針對這個問題,我們的改進是引入異步拉取線程。異步拉取線程會及時地拉取就緒的數據,避免服務端延時指標受影響,而且原生 Kafka 並沒有限制同時拉取的分區數,我們在這裏做了限速,避免 GC 和 OOM 的發生。異步線程在後臺持續不斷地拉取數據並放到 CompleteQueue 中。

2.3 系統層

① Raid 卡加速

圖 2-9 Raid 卡加速

HDD 存在隨機寫性能不足的問題,表現爲延時升高,吞吐降低。針對這個問題我們引入了 Raid 卡加速。Raid 卡自帶緩存,與 PageCache 類似,在 Raid 這一層會把數據 Merge 成更大的 Block 寫入 Disk,更加充分利用順序寫 HDD 的帶寬,藉助 Raid 卡保證了隨機寫性能。

② Cgroup 隔離優化

圖 2-10 Cgroup 隔離

爲了提高資源利用率,美團數據平臺將 IO 密集型應用和 CPU 密集型應用混合部署。IO 密集型應用在這裏指的就是 Kafka,CPU 密集型應用在這裏指的是 Flink 和 Storm。但是原有的隔離策略存在兩個問題:首先是物理核本身會存在資源競爭,在同一個物理核下,共享的 L1Cache 和 L2Cache 都存在競爭,當實時平臺 CPU 飆升時會導致 Kafka 讀寫延時受到影響;其次,Kafka 的 HT 跨 NUMA,增加內存訪問耗時,如圖 2-10 所示,跨 NUMA 節點是通過 QPI 去做遠程訪問,而這個遠程訪問的耗時是 40ns。

針對這兩個問題,我們改進了隔離策略,針對物理核的資源競爭,我們新的混布策略保證 Kafka 獨佔物理核,也就是說在新的隔離策略中,不存在同一個物理核被 Kafka 和 Flink 同時使用;然後是保證 Kafka 的所有超線程處於同一側的 NUMA,避免 Kafka 跨 NUMA 帶來的訪問延時。通過新的隔離策略,Kafka 的讀寫延時不再受 Flink CPU 飆升的影響。

2.4 混合層 - SSD 新緩存架構

圖 2-11 Page 污染引起的性能問題

背景和挑戰

Kafka 利用操作系統提供的 ZeroCopy 技術處理數據讀取請求,PageCache 容量充裕時數據直接從 PageCache 拷貝到網卡,有效降低了讀取延時。但是實際上,PageCache 的容量往往是不足的,因爲它不會超過一個機器的內存。容量不足時,ZeroCopy 就會觸發磁盤讀,磁盤讀不僅顯著變慢,還會污染 PageCache 影響其他讀寫。

如圖 2-11 中左半部分所示,當一個延遲消費者去拉取數據時,發現 PageCache 中沒有它想要的數據,這個時候就會觸發磁盤讀。磁盤讀後會將數據回寫到 PageCache,導致 PageCache 污染,延遲消費者消費延遲變慢的同時也會導致另一個實時消費受影響。因爲對於實時消費而言,它一直讀的是最新的數據,最新的數據按正常來說時不應該觸發磁盤讀的。

選型和決策

針對這個問題,我們這邊在做方案選型時提供了兩種方案:

方案一,讀磁盤時不回寫 PageCache,比如使用 DirectIO,不過 Java 並不支持;

方案二,在內存和 HDD 之間引入中間層,比如 SSD。衆所周知,SSD 和 HDD 相比具備良好的隨機讀寫能力,非常適合我們的使用場景。針對 SSD 的方案我們也有兩種選型:

方案一,可以基於操作系統的內核實現,這種方案 SSD 與 HDD 存儲空間按照固定大小分塊,並且 SSD 與 HDD 建立映射關係,同時會基於數據局部性原理,Cache Miss 後數據會按 LRU 和 LFU 替換 SSD 中部分數據,業界典型方案包括 OpenCAS 和 FlashCache。其優勢是數據路由對應用層透明,對應用代碼改動量小,並且社區活躍可用性好;但是問題在於局部性原理並不滿足 Kafka 的讀寫特性,而且緩存空間污染問題並未得到根本解決,因爲它會根據 LRU 和 LFU 去替換 SSD 中的部分數據。

方案二,基於 Kafka 的應用層去實現,具體就是 Kafka 的數據按照時間維度存儲在不同設備上,對於近實時數據直接放在 SSD 上,針對較爲久遠的數據直接放在 HDD 上,然後 Leader 直接根據 Offset 從對應設備讀取數據。這種方案的優勢是它的緩存策略充分考慮了 Kafka 的讀寫特性,確保近實時的數據消費請求全部落在 SSD 上,保證這部分請求處理的低延遲,同時從 HDD 讀取的數據不回刷到 SSD 防止緩存污染,同時由於每個日誌段都有唯一明確的狀態,因此每次請求目的明確,不存在因 Cache Miss 帶來的額外性能開銷。同時劣勢也很明顯,需要在 Server 端代碼上進行改進,涉及的開發以及測試的工作量較大。

圖 2-13 KafkaSSD 新緩存架構

具體實現

下面來介紹一下 SSD 新緩存架構的具體實現。

  1. 首先新的緩存架構會將 Log 內的多個 Segment 按時間維度存儲在不同的存儲設備上,如圖 2-14 中的紅圈 1,新緩存架構數據會有三種典型狀態,一種叫 Only Cache,指的是數據剛寫進 SSD,還未同步到 HDD 上;第 2 個是 Cached,指數據既同步到了 HDD 也有一部分緩存在 SSD 上;第三種類型叫 WithoutCache,指的是同步到了 HDD 但是 SSD 中已經沒有緩存了。

  2. 然後後臺異步線程持續地將 SSD 數據同步到 HDD 上。

  3. 隨着 SSD 的持續寫入,當存儲空間達到閾值後,會按時間順序刪除距當前時間最久的數據,因爲 SSD 的數據空間有限。

  4. 副本可根據可用性要求靈活開啓是否寫入 SSD。

  5. 從 HDD 讀取的數據是不會回刷到 SSD 上的,防止緩存污染。

圖 2-14 SSD 新緩存架構細節優化

細節優化

介紹了具體實現之後,再來看一下細節優化。

  1. 首先是關於日誌段同步,就是剛纔說到的 Segment,只同步 Inactive 的日誌段,Inactive 指的是現在並沒有在寫的日誌段,低成本解決數據一致性問題。

  2. 其次是做同步限速優化,在 SSD 向 HDD 同步時是需要限速的,同時保護了兩種設備,不會影響其他 IO 請求的處理。

3. 大規模集羣管理優化

3.1 隔離策略

美團大數據平臺的 Kafka 服務於多個業務,這些業務的 Topic 混布在一起的話,很有可能造成不同業務的不同 Topic 之間相互影響。此外,如果 Controller 節點同時承擔數據讀寫請求,當負載明顯變高時,Controller 可能無法及時控制類請求,例如元數據變更請求,最終可能會造成整個集羣發生故障。

針對這些相互影響的問題,我們從業務、角色和優先級三個維度來做隔離優化。

圖 3-1 隔離優化

3.2 全鏈路監控

隨着集羣規模增長,集羣管理碰到了一系列問題,主要包括兩方面:

圖 3-2 全鏈路監控

針對這兩個問題,我們採取的策略是全鏈路監控。全鏈路監控收集和監控 Kafka 核心組件的指標和日誌。全鏈路監控架構如圖 3-2 所示。當某一個客戶端讀寫請求變慢時,我們通過全鏈路監控可以快速定位到具體慢在哪個環節,全鏈路指標監控如圖 3-3 所示。

圖 3-3 全鏈路指標監控

圖 3-4 是一個根據全鏈路指標定位請求瓶頸的示例,可以看出服務端 RemoteTime 佔比最高,這說明耗時主要花費在數據複製。日誌和指標的解析服務可以自動實時感知故障和慢節點,大部分的故障(內存、磁盤、Raid 卡以及網卡等)和慢節點都已經支持自動化處理,還有一類故障是計劃外的故障,比如分區多個副本掛掉導致的不可用,遷移 Hang 住以及非預期的錯誤日誌等,需要人工介入處理。

圖 3-4 全鏈路監控指標示例

3.3 服務生命週期管理

圖 3-5 服務生命週期管理

美團線上 Kafka 的服務器規模在萬級別,隨着服務規模的增長,我們對服務和機器本身的管理,也在不斷迭代。我們的自動化運維繫統能夠處理大部分的機器故障和服務慢節點,但對於機器和服務本身的管理是割裂的,導致存在兩類問題:

  1. 狀態語義存在歧義,無法真實反映系統狀態,往往需要藉助日誌和指標去找到真實系統是否健康或者異常。

  2. 狀態不全面,異常 Case 需人工介入處理,誤操作風險極大。

爲了解決這兩類問題,我們引入了生命週期管理機制,確保能夠真實反映系統狀態。生命週期管理指的是從服務開始運行到機器報廢停止服務的全流程管理,並且做到了服務狀態和機器狀態聯動,無需人工同步變更。而且新的生命週期管理機制的狀態變更由特定的自動化運維觸發,禁止人工變更。

3.4 TOR 容災

圖 3-6 TOR 容災挑戰

我們從工程實現的角度,歸納總結了當前主流圖神經網絡模型的基本範式,實現一套通用框架,以期涵蓋多種 GNN 模型。以下按照圖的類型(同質圖、異質圖和動態圖)分別討論。

圖 3-7 TOR 容災

TOR 容災保證同一個分區的不同副本不在同一個 Rack 下,如圖 3-7 所示,即使 Rack1 整個發生故障,也能保證所有分區可用。

4 未來展望

過去一段時間,我們圍繞降低服務端的讀寫延遲做了大量的優化,但是在服務高可用方面,依然有一些工作需要完成。未來一段時間,我們會將重心放在提升魯棒性和通過各種粒度的隔離機制縮小故障域。比如,讓客戶端主動對一些故障節點進行避讓,在服務端通過多隊列的方式隔離異常請求,支持服務端熱下盤,網絡層主動反壓與限流等等。

另外,隨着美團實時計算業務整體的發展,實時計算引擎(典型如 Flink)和流存儲引擎(典型如 Kafka)混合部署的模式越來越難以滿足業務的需求。因此,我們需要在保持當前成本不變的情況下對 Kafka 進行獨立部署。這就意味着需要用更少的機器(在我們的業務模式下,用原來 1/4 的機器)來承載不變的業務流量。如何在保障服務穩定的情況下,用更少的機器扛起業務請求,也是我們面臨的挑戰之一。

最後,隨着雲原生趨勢的來臨,我們也在探索流存儲服務的上雲之路。

5 作者簡介

海源、仕祿、肖恩、鴻洛、啓帆、胡榮、李傑等,均來自美團數據科學與平臺部。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/waVLtusovUkVDt7rKCcdDg