FreeWheel 全球範圍 Kafka 集羣上雲實踐

Kafka 憑藉其高吞吐量、高可靠等特性一直是數據領域重要的消息中間件,雲廠商消息隊列的推出,促使越來越多的企業從自建遷移上公有云。那麼將 Kafka 部署到公有云需要怎麼做?它的彈性伸縮、容災能力怎麼樣?成本如何控制?

本文將以 FreeWheel 的 Kafka 服務遷移上 AWS 爲例,闡述 Kafka 在遷移公有云方面的實踐過程,解答上面的問題,也希望可以給大家帶來啓發。

1 業務場景

首先介紹一下 Kafka 在 FreeWheel 的應用場景。FreeWheel 是一家廣告投放公司,我們的主要產品是廣告投放系統。用戶在瀏覽廣告時會產生一些業務數據,我們使用 Kafka 作爲處理和分析這些數據的數據總線。我們的廣告服務器部署在全球 10+ 個數據中心,是一個 On-Prem 和 AWS 混合雲的架構,Kafka 集羣和這些服務器部署在一起。我們使用 Mirrormaker 將多個 DC 的廣告日誌數據同步到 AWS 的 Global Kafka 集羣進行統一處理。如下圖所示,在單個 DC 部署的 Kafka 集羣,簡稱爲 Local Kafka,中心數據集羣爲 Global Kafka。本文主要論述的是混合雲架構上 Kafka 的治理經驗。

2 Kafka 與 AWS 原生服務的融合

既然選擇了 AWS,讀者肯定會問:如何利用 AWS 的原生服務來支持 Kafka?本章就來介紹我們使用 AWS 的原生服務來提升 Kafka 運維、彈性伸縮和容災能力的祕訣。

部署方案

首先講講我們的部署方式。與在實體服務器上部署應用不同,在 AWS 上部署應用可以使用一些雲服務廠家提供的獨特能力來減輕運維的壓力。ASG(Auto Scaling group) 是一些機器實例組成的運算組。ELB(Elastic Load Balancing) 是 AWS 提供的負載均衡服務,它可以綁定在 ASG 上。Route53 是 AWS 提供的域名解析服務。我們根據這些組件設計了 Kafka 的部署方案:將 Kafka 機器實例啓動在三組 ASG 裏面 (部署三組 ASG 的原因在本文論述容災的部分),然後將一個 ELB 綁定這些 ASG。最後將 ELB 綁定在一個 Route53 上面。這樣一個帶有負載均衡與固定域名的 Kafka 集羣就搭建好了。

彈性伸縮

突發流量是個令人頭痛的問題,處理不好可能對整個服務的穩定性產生致命影響。然而云上部署最吸引人的特點就是彈性計算,我們可以利用這個特點來輕鬆的應對突發流量。

FreeWheel 的客戶主要位於北美和歐洲,因此在這些地方發生一些重大事件或者體育賽事 (比如超級碗或者美國大選) 的時候會產生突發流量。此時需要考慮擴容 Kafka。對於這種短時間內增加的流量,我們會調整 Kafka 的機型。只需要更換更大的機型再輪番重啓 Kafka,就可以實現彈性縮擴容。

EKS(Elastic Kubernetes Service) 是 AWS 實現的 K8S 託管服務。我們把 Mirrormaker 部署在 EKS 上。每一個 Local Kafka 的流量是不同的,因此我們對 Mirrormaker 進行了差異化的配置。對於流量大的數據鏈路,我們配置了更多的 Pod 和 CPU 數量。當突發流量到來時,使用 EKS 可以快速擴充 Mirrormaker 的數據同步能力。如下圖所示,當位於 Data Center B 中的 Local Kafka 的流量增大時,我們可以通過修改 Pod 數量來增加該條鏈路上的 Mirrormaker 進程數量,以此加快數據同步的速度。

容災

熟悉 Kafka 的讀者應該知道,Kafka 對於每個 topic 可以設置不同的保存時間,保存時間外的數據會被刪除。正常情況下,Local Kafka 和 Global Kafka 保留數據的時間都是一天。如果想長時間保留數據怎麼辦呢?

這裏我們使用了 S3 和 Kafka connect。S3(Amazon Simple Storage Service) 是 AWS 提供的對象存儲服務,它的特點是廉價、易用,適合長時間存儲數據。Kafka connect 是 Kafka 提供的數據同步框架,可以把其他數據源的數據導入 Kafka,也可以把 Kafka 內的數據同步到其他地方。connector 是運行在 Kafka connect 集羣中的應用。我們使用 Confluent 公司開源的 S3 connector 把需要長期存儲的數據備份到 S3。

我們對 S3 connector 進行了二次開發,使得它在備份的時候可以按照我們期望的 parquet 格式進行保存。這些格式化數據可以通過 Presto 進行查詢,意味着流經 Kafka 的數據可以被長期訪問,這對排查故障與比對數據有很大的作用。我們還會對這部分數據定期進行 ETL,以此監控數據的體積分佈和增長趨勢。

如果把服務部署在實體機房,常常要考慮跨機房容災問題,事實上 AWS 也有類似的概念。AWS 的機房分佈於全球各地,範圍最大的地理概念是 region。比如美國東部有弗吉尼亞,俄亥俄等幾個 region。一個 region 又包含若干個 AZ(Availability Zone),這裏指的是距離較近的幾個機房。因此容災包含了 AZ 和 Region 兩個級別。

上文講到,我們把 Kafka 的 EC2 實例啓動在三個 ASG 裏,這是因爲這三個 ASG 指定了三個 AZ,每個 ASG 裏面的節點都保存有一個 topic 的完整副本。因此當其中一個 AZ 發生故障的時候,其他的 AZ 仍然可以繼續服務,這樣就實現了 AZ 級別的容災。至於 region 級別的容災,我們則使用了 S3 上備份的數據。首先,我們使用了 AWS S3 CRR(Cross-Region Replication) 服務,將 S3 上的數據實時同步到其他的 region。當 region 級別的災難發生的時候,我們會在其他的 region 啓動 Kafka 集羣,然後將一段時間內 S3 上的數據寫回 Kafka。最後根據 timestamp 將各個 local Kafka 的 Mirrormaker consumer offset 調整到事故發生的時候,打開 Mirrormaker, 將新的數據寫入 Kafka。請注意,這裏的操作必然會引發一部分數據重複或者丟失,然而對應 region 級別的小概率災難,這些代價是值得付出的。

3 Kafka 在 AWS 降本增效的實踐

有云平臺使用經驗的讀者會說,雲計算好是好,但是使用起來太貴了!別急,這裏我來教大家幾招 Kafka 雲上部署的省錢祕籍。節約成本,要從計算和存儲兩個方向考慮。

計算成本節約

AWS 的計算成本體現在機型上。對於 Kafka,我們的最佳實踐是使用 R 系列的機型。R 系列機型和其他機型相比,在 CPU 核心數相同的情況下,它的內存會更大。因爲 Kafka 會消耗比較多的堆外內存,同時高內存也意味着更高的 cache 命中率,所以 R 系列機型最適合 Kafka。

但是 R 系列的機型也非常昂貴,那麼如何有效降低成本呢?我們做了兩方面的優化。

第一個方面是部署架構的優化。上文提到,我們的 Kafka 是跨 AZ 部署的。根據 Kafka 的原理,Kafka 的所有讀寫均由 Leader 支持,所以我們把 Leader 集中到一個 AZ 當中,此時該集羣的主要壓力都轉移到了該 AZ。其他 AZ 中只有 Follower,那麼在這些 AZ 中的實例機型就可以比較小。只有在主 AZ 發生故障時,其他 AZ 的機器再動態擴容。通過 AZ 間非對稱部署,可以降低 Kafka 的計算費用。另外,AWS 會對跨 AZ 的數據傳輸收取費用。如果 Kafka 的下游應用也部署在和 Kafka Leader 相同的 AZ,就可以避免跨 AZ 的數據傳輸的收費。

第二個方面是使用 ARM 架構的機器。傳統的服務器架構以 x86 爲主,然而近些年 AWS 推出了 ARM 架構的機器。這部分機器在 CPU 核心、帶寬以及內存大小等參數不變的情況下,比同等的 x86 機器價格上便宜了 15% 左右,CPU 性能提高了 20% 左右。我們的 Kafka connect 集羣已經替換成了 ARM 架構的機器,節約了 30% 左右的成本。未來我們會測試 Kafka 和 Mirrormaker 與 ARM 架構的兼容性,如果能成功遷移,成本還會進一步降低。

存儲成本節約

AWS 的存儲成本體現在 S3 和 EBS 上。我們針對它們也進行了優化。

S3 上面主要存儲了 Kafka 的備份數據。一條數據從寫入 Local Kafka 到寫入 Global Kafka 共被備份到 S3 三次。它第一次是被 Local Kafka 的 connect 集羣備份到 S3;第二次是被 Global Kafka 的 connect 集羣備份到 S3;最後一次是被 CRR 備份到跨 region 的 S3。顯然這種多次備份是不經濟的。

如下圖所示,爲了減少重複備份,我們每天會對比 Local Kafka 備份的數據和 Global Kafka 備份的數據,計算出它們的差集。然後我們會將這個差集轉移到備份 Global Kafka 的 S3 中,再刪除 Local Kafka 的備份數據。一旦出現差集,就說明 Mirrormaker 同步過程中丟失了數據。此時我們會發出報警。至於同步到其他 region 中 S3 的數據,我們會選用更便宜的 S3 存儲等級,降低它的存儲成本。

EBS(Amazon Elastic Block Store) 是 AWS 提供的硬盤類存儲。因爲 Kafka 的吞吐較大,我們使用了 gp3 類型的 EBS。一般 EBS 的 iops 和吞吐和它的容量成正比,而 EBS 的價格又和容量成正比。gp3 的 iops 和吞吐可以動態調整,那麼相同的吞吐能力下,gp3 的價格比其他類型要便宜很多。

爲了進一步降低磁盤佔用,壓縮使用成本,我們嘗試引入了 ZSTD 壓縮算法。該算法是 Facebook 開源的新型壓縮算法,在 Kafka 2.1 版本後可用。在這之前,Kafka 提供的壓縮率最高的算法是 Gzip。經過我們的測試,在消息大小超過 50Kb 時,ZSTD 的壓縮比比 Gzip 的壓縮比大 20%,同時壓縮速度和解壓速度更快。而我們當前磁盤佔用最大的 topic 的平均消息大小已經超過 60Kb,所以我們將壓縮算法換成了 ZSTD。

這裏需要注意的是,Kafka 2.1 對 ZSTD 的支持並不好。

首先,適配該版本的其他語言客戶端在解壓 / 壓縮 ZSTD 時可能有 bug。我們發現 Go 語言的 Sarama 客戶端解壓 ZSTD 時有嚴重的性能 bug。該 bug 的問題是在 ZSTD 解壓過程中預分配了過多的內存,這會導致 GVM 產生大量 GC,使得 GVM 不斷擴張直至耗盡服務器內存而崩潰。該問題在客戶端更新至 1.28.0 之後才得以修復。

其次,Kafka 的 Java client 對 ZSTD 的壓縮和解壓縮支持也有缺陷。我們使用的 client 版本是 2.4.1,它在壓縮 ZSTD 文件的時候會產生大量 GC。爲解決這個問題,社區建立了相關的 issue,希望用一個可複用的 buffer 來減少對象的數量,進而降低 GC 次數。ZSTD 官方也在更新中提供了類似的 buffer。這個 feature 以及 ZSTD 包的升級在 Kafka 2.8.0 版本中已經生效。因此,如果希望使用 ZSTD,建議將 Java 客戶端升級至 2.8.0,同時對非 Java 客戶端進行充分測試。

使用 ZSTD 壓縮後,該 Topic 的數據平均每條大小減少了 25%,上游的 Producer 寫 Kafka 時間減少了 63%。下圖展示了 Producer 在使用 ZSTD 前後的性能變化。紅線是每秒寫入的數據量,藍線是寫入時間,可以觀察到在修改前後,紅線的變化不大,但是藍線下降了很多,這反映出 Producer 的寫入性能有巨大提升。

4 總結

本文以 Kafka 上云爲例,回顧和總結了 Kafka 向 AWS 遷移部署實踐方面的經驗。

FreeWheel 大數據團隊從 2018 年開始向 AWS 上遷移服務,在遷移到 AWS 上之後,Kafka 的部署和運維變得更加容易,團隊享受到了彈性計算帶來的便捷,容災能力得到提升,也爲公司業務帶來了更好的支持。

最後,非常感謝每一位爲此辛勞工作的同事,是數據團隊的共同努力完成了這項任務。值得一提的是 AWS 的 service 團隊爲我們提供了很多技術支持和指導,在此對他們付出的汗水與努力表示感謝,將來我們還會持續推進數據團隊在 AWS 上的技術演進。

5 作者簡介

袁藝,Apache Druid Committer,現任 FreeWheel 大數據基礎架構團隊高級工程師,主要負責公司 Kafka 的維護和開發工作。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/GoIpvjPFk_9etl1wbtQw7A