Kafka 萬億級消息實戰

作者:vivo 互聯網服務器團隊 - Yang Yijun

一、Kafka 應用

本文主要總結當 Kafka 集羣流量達到 萬億級記錄 / 天或者十萬億級記錄 / 天  甚至更高後,我們需要具備哪些能力才能保障集羣高可用、高可靠、高性能、高吞吐、安全的運行。

這裏總結內容主要針對 Kafka2.1.1 版本,包括集羣版本升級、數據遷移、流量限制、監控告警、負載均衡、集羣擴 / 縮容、資源隔離、集羣容災、集羣安全、性能優化、平臺化、開源版本缺陷、社區動態等方面。本文主要是介紹核心脈絡,不做過多細節講解。下面我們先來看看 Kafka 作爲數據中樞的一些核心應用場景。

圖片

下圖展示了一些主流的數據處理流程,Kafka 起到一個數據中樞的作用。

圖片

接下來看看我們 Kafka 平臺整體架構;

圖片

1.1 版本升級

1.1.1  開源版本如何進行版本滾動升級與回退

官網地址:http://kafka.apache.org

1.1.1.2 源碼改造如何升級與回退

由於在升級過程中,必然出現新舊代碼邏輯交替的情況。集羣內部部分節點是開源版本,另外一部分節點是改造後的版本。所以,需要考慮在升級過程中,新舊代碼混合的情況,如何兼容以及出現故障時如何回退。

1.2 數據遷移

由於 Kafka 集羣的架構特點,這必然導致集羣內流量負載不均衡的情況,所以我們需要做一些數據遷移來實現集羣不同節點間的流量均衡。Kafka 開源版本爲數據遷移提供了一個腳本工具 “bin/kafka-reassign-partitions.sh”,如果自己沒有實現自動負載均衡,可以使用此腳本。

開源版本提供的這個腳本生成遷移計劃完全是人工干預的,當集羣規模非常大時,遷移效率變得非常低下,一般以天爲單位進行計算。當然,我們可以實現一套自動化的均衡程序,當負載均衡實現自動化以後,基本使用調用內部提供的 API,由程序去幫我們生成遷移計劃及執行遷移任務。需要注意的是,遷移計劃有指定數據目錄和不指定數據目錄兩種,指定數據目錄的需要配置 ACL 安全認證。

官網地址:http://kafka.apache.org

1.2.1 broker 間數據遷移

不指定數據目錄

//未指定遷移目錄的遷移計劃
{
    "version":1,
    "partitions":[
        {"topic":"yyj4","partition":0,"replicas":[1000003,1000004]},
        {"topic":"yyj4","partition":1,"replicas":[1000003,1000004]},
        {"topic":"yyj4","partition":2,"replicas":[1000003,1000004]}
    ]
}

指定數據目錄

//指定遷移目錄的遷移計劃
{
    "version":1,
    "partitions":[
        {"topic":"yyj1","partition":0,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},
        {"topic":"yyj1","partition":1,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},
        {"topic":"yyj1","partition":2,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]}
    ]
}

1.2.2 broker 內部磁盤間數據遷移

生產環境的服務器一般都是掛載多塊硬盤,比如 4 塊 / 12 塊等;那麼可能出現在 Kafka 集羣內部,各 broker 間流量比較均衡,但是在 broker 內部,各磁盤間流量不均衡,導致部分磁盤過載,從而影響集羣性能和穩定,也沒有較好的利用硬件資源。在這種情況下,我們就需要對 broker 內部多塊磁盤的流量做負載均衡,讓流量更均勻的分佈到各磁盤上。

1.2.3 併發數據遷移

當前 Kafka 開源版本(2.1.1 版本)提供的副本遷移工具 “bin/kafka-reassign-partitions.sh” 在同一個集羣內只能實現遷移任務的串行。對於集羣內已經實現多個資源組物理隔離的情況,由於各資源組不會相互影響,但是卻不能友好的進行並行的提交遷移任務,遷移效率有點低下,這種不足直到 2.6.0 版本才得以解決。如果需要實現併發數據遷移,可以選擇升級 Kafka 版本或者修改 Kafka 源碼。

1.2.4 終止數據遷移

當前 Kafka 開源版本(2.1.1 版本)提供的副本遷移工具 “bin/kafka-reassign-partitions.sh” 在啓動遷移任務後,無法終止遷移。當遷移任務對集羣的穩定性或者性能有影響時,將變得束手無策,只能等待遷移任務執行完畢(成功或者失敗),這種不足直到 2.6.0 版本才得以解決。如果需要實現終止數據遷移,可以選擇升級 Kafka 版本或者修改 Kafka 源碼。

1.3 流量限制

1.3.1 生產消費流量限制

經常會出現一些突發的,不可預測的異常生產或者消費流量會對集羣的 IO 等資源產生巨大壓力,最終影響整個集羣的穩定與性能。那麼我們可以對用戶的生產、消費、副本間數據同步進行流量限制,這個限流機制並不是爲了限制用戶,而是避免突發的流量影響集羣的穩定和性能,給用戶可以更好的服務。

如下圖所示,節點入流量由 140MB/s 左右突增到 250MB/s,而出流量則從 400MB/s 左右突增至 800MB/s。如果沒有限流機制,那麼集羣的多個節點將有被這些異常流量打掛的風險,甚至造成集羣雪崩。

圖片生產 / 消費流量限制官網地址:點擊鏈接

對於生產者和消費者的流量限制,官網提供了以下幾種維度組合進行限制(當然,下面限流機制存在一定缺陷,後面在 “Kafka 開源版本功能缺陷” 我們將提到):

/config/users/<user>/clients/<client-id> //根據用戶和客戶端ID組合限流
/config/users/<user>/clients/<default>
/config/users/<user>//根據用戶限流 這種限流方式是我們最常用的方式
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>

在啓動 Kafka 的 broker 服務時需要開啓 JMX 參數配置,方便通過其他應用程序採集 Kafka 的各項 JMX 指標進行服務監控。當用戶需要調整限流閾值時,根據單個 broker 所能承受的流量進行智能評估,無需人工干預判斷是否可以調整;對於用戶流量限制,主要需要參考的指標包括以下兩個:

(1)消費流量指標:ObjectName:kafka.server:type=Fetch,user=acl認證用戶名稱 屬性:byte-rate(用戶在當前broker的出流量)、throttle-time(用戶在當前broker的出流量被限制時間)
(2)生產流量指標:ObjectName:kafka.server:type=Produce,user=acl認證用戶名稱 屬性:byte-rate(用戶在當前broker的入流量)、throttle-time(用戶在當前broker的入流量被限制時間)

圖片

圖片

1.3.2 follower 同步 leader / 數據遷移流量限制

副本遷移 / 數據同步流量限制官網地址:鏈接

涉及參數如下:

//副本同步限流配置共涉及以下4個參數
leader.replication.throttled.rate
follower.replication.throttled.rate
leader.replication.throttled.replicas
follower.replication.throttled.replicas

輔助指標如下:

(1)副本同步出流量指標:ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec
(2)副本同步入流量指標:ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec

圖片

圖片

1.4 監控告警

關於 Kafka 的監控有一些開源的工具可用使用,比如下面這幾種:

Kafka Manager
Kafka Eagle
Kafka Monitor
KafkaOffsetMonitor;

我們已經把 Kafka Manager 作爲我們查看一些基本指標的工具嵌入平臺,然而這些開源工具不能很好的融入到我們自己的業務系統或者平臺上。所以,我們需要自己去實現一套粒度更細、監控更智能、告警更精準的系統。其監控覆蓋範圍應該包括基礎硬件、操作系統(操作系統偶爾出現系統進程 hang 住情況,導致 broker 假死,無法正常提供服務)、Kafka 的 broker 服務、Kafka 客戶端應用程序、zookeeper 集羣、上下游全鏈路監控。

1.4.1 硬件監控

網絡監控:

核心指標包括網絡入流量、網絡出流量、網絡丟包、網絡重傳、處於 TIME.WAIT 的 TCP 連接數、交換機、機房帶寬、DNS 服務器監控(如果 DNS 服務器異常,可能出現流量黑洞,引起大面積業務故障)等。

圖片

圖片

圖片

圖片

圖片

磁盤監控:

核心指標包括監控磁盤 write、磁盤 read(如果消費時沒有延時,或者只有少量延時,一般都沒有磁盤 read 操作)、磁盤 ioutil、磁盤 iowait(這個指標如果過高說明磁盤負載較大)、磁盤存儲空間、磁盤壞盤、磁盤壞塊 / 壞道(壞道或者壞塊將導致 broker 處於半死不活狀態,由於有 crc 校驗,消費者將被卡住)等。

圖片

圖片

圖片

圖片

CPU 監控:

監控 CPU 空閒率 / 負載,主板故障等,通常 CPU 使用率比較低不是 Kafka 的瓶頸。

內存 / 交換區監控:

內存使用率,內存故障。一般情況下,服務器上除了啓動 Kafka 的 broker 時分配的堆內存以外,其他內存基本全部被用來做 PageCache。

緩存命中率監控:

由於是否讀磁盤對 Kafka 的性能影響很大,所以我們需要監控 Linux 的 PageCache 緩存命中率,如果緩存命中率高,則說明消費者基本命中緩存。

詳細內容請閱讀文章:《Linux Page Cache 調優在 Kafka 中的應用》。

系統日誌:

我們需要對操作系統的錯誤日誌進行監控告警,及時發現一些硬件故障。

1.4.2 broker 服務監控

broker 服務的監控,主要是通過在 broker 服務啓動時指定 JMX 端口,然後通過實現一套指標採集程序去採集 JMX 指標。(服務端指標官網地址

**broker 級監控:**broker 進程、broker 入流量字節大小 / 記錄數、broker 出流量字節大小 / 記錄數、副本同步入流量、副本同步出流量、broker 間流量偏差、broker 連接數、broker 請求隊列數、broker 網絡空閒率、broker 生產延時、broker 消費延時、broker 生產請求數、broker 消費請求數、broker 上分佈 leader 個數、broker 上分佈副本個數、broker 上各磁盤流量、broker GC 等。

**topic 級監控:**topic 入流量字節大小 / 記錄數、topic 出流量字節大小 / 記錄數、無流量 topic、topic 流量突變(突增 / 突降)、topic 消費延時。

**partition 級監控:**分區入流量字節大小 / 記錄數、分區出流量字節大小 / 記錄數、topic 分區副本缺失、分區消費延遲記錄、分區 leader 切換、分區數據傾斜(生產消息時,如果指定了消息的 key 容易造成數據傾斜,這嚴重影響 Kafka 的服務性能)、分區存儲大小(可以治理單分區過大的 topic)。

**用戶級監控:**用戶出 / 入流量字節大小、用戶出 / 入流量被限制時間、用戶流量突變(突增 / 突降)。

**broker 服務日誌監控:**對 server 端打印的錯誤日誌進行監控告警,及時發現服務異常。

1.4.3. 客戶端監控

客戶端監控主要是自己實現一套指標上報程序,這個程序需要實現 

org.apache.kafka.common.metrics.MetricsReporter 接口。然後在生產者或者消費者的配置中加入配置項 metric.reporters,如下所示:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//ClientMetricsReporter類實現org.apache.kafka.common.metrics.MetricsReporter接口
props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName());
...

客戶端指標官網地址:

http://kafka.apache.org/21/documentation.html#selector_monitoring

http://kafka.apache.org/21/documentation.html#common_node_monitoring

http://kafka.apache.org/21/documentation.html#producer_monitoring

http://kafka.apache.org/21/documentation.html#producer_sender_monitoring

http://kafka.apache.org/21/documentation.html#consumer_monitoring

http://kafka.apache.org/21/documentation.html#consumer_fetch_monitoring

客戶端監控流程架構如下圖所示:

圖片

1.4.3.1 生產者客戶端監控

維度:用戶名稱、客戶端 ID、客戶端 IP、topic 名稱、集羣名稱、brokerIP;

指標:連接數、IO 等待時間、生產流量大小、生產記錄數、請求次數、請求延時、發送錯誤 / 重試次數等。

1.4.3.2 消費者客戶端監控

維度:用戶名稱、客戶端 ID、客戶端 IP、topic 名稱、集羣名稱、消費組、brokerIP、topic 分區;

指標:連接數、io 等待時間、消費流量大小、消費記錄數、消費延時、topic 分區消費延遲記錄等。

1.4.4 Zookeeper 監控

  1. Zookeeper 進程監控;

  2. Zookeeper 的 leader 切換監控;

  3. Zookeeper 服務的錯誤日誌監控;

1.4.5 全鏈路監控

當數據鏈路非常長的時候(比如:業務應用 -> 埋點 SDk-> 數據採集 ->Kafka-> 實時計算 -> 業務應用),我們定位問題通常需要經過多個團隊反覆溝通與排查才能發現問題到底出現在哪個環節,這樣排查問題效率比較低下。在這種情況下,我們就需要與上下游一起梳理整個鏈路的監控。出現問題時,第一時間定位問題出現在哪個環節,縮短問題定位與故障恢復時間。

1.5 資源隔離

1.5.1 相同集羣不同業務資源物理隔離

我們對所有集羣中不同對業務進行資源組物理隔離,避免各業務之間相互影響。在這裏,我們假設集羣有 4 個 broker 節點(Broker1/Broker2/Broker3/Broker4),2 個業務(業務 A / 業務 B),他們分別擁有 topic 分區分佈如下圖所示,兩個業務 topic 都分散在集羣的各個 broker 上,並且在磁盤層面也存在交叉。

試想一下,如果我們其中一個業務異常,比如流量突增,導致 broker 節點異常或者被打掛。那麼這時候另外一個業務也將受到影響,這樣將大大的影響了我們服務的可用性,造成故障,擴大了故障影響範圍。

圖片

針對這些痛點,我們可以對集羣中的業務進行物理資源隔離,各業務獨享資源,進行資源組劃分(這裏把 4 各 broker 劃分爲 Group1 和 Group2 兩個資源組)如下圖所示,不同業務的 topic 分佈在自己的資源組內,當其中一個業務異常時,不會波及另外一個業務,這樣就可以有效的縮小我們的故障範圍,提高服務可用性。

圖片

1.6 集羣歸類

我們把集羣根據業務特點進行拆分爲日誌集羣、監控集羣、計費集羣、搜索集羣、離線集羣、在線集羣等,不同場景業務放在不同集羣,避免不同業務相互影響。

1.7 擴容 / 縮容

1.7.1 topic 擴容分區

隨着 topic 數據量增長,我們最初創建的 topic 指定的分區個數可能已經無法滿足數量流量要求,所以我們需要對 topic 的分區進行擴展。擴容分區時需要考慮一下幾點:

必須保證 topic 分區 leader 與 follower 輪詢的分佈在資源組內所有 broker 上,讓流量分佈更加均衡,同時需要考慮相同分區不同副本跨機架分佈以提高容災能力;

當 topic 分區 leader 個數除以資源組節點個數有餘數時,需要把餘數分區 leader 優先考慮放入流量較低的 broker。

1.7.2 broker 上線

隨着業務量增多,數據量不斷增大,我們的集羣也需要進行 broker 節點擴容。關於擴容,我們需要實現以下幾點:

擴容智能評估:根據集羣負載,把是否需要擴容評估程序化、智能化;

智能擴容:當評估需要擴容後,把擴容流程以及流量均衡平臺化。

1.7.3 broker 下線

某些場景下,我們需要下線我們的 broker,主要包括以下幾個場景:

一些老化的服務器需要下線,實現節點下線平臺化;

服務器故障,broker 故障無法恢復,我們需要下線故障服務器,實現節點下線平臺化;

有更優配置的服務器替換已有 broker 節點,實現下線節點平臺化。

1.8 負載均衡

我們爲什麼需要負載均衡呢?首先,我們來看第一張圖,下圖是我們集羣某個資源組剛擴容後的流量分佈情況,流量無法自動的分攤到我們新擴容後的節點上。那麼這個時候需要我們手動去觸發數據遷移,把部分副本遷移至新節點上才能實現流量均衡。

圖片

下面,我們來看一下第二張圖。這張圖我們可以看出流量分佈非常不均衡,最低和最高流量偏差數倍以上。這和 Kafka 的架構特點有關,當集羣規模與數據量達到一定量後,必然出現當問題。這種情況下,我們也需要進行負載均衡。

圖片

我們再來看看第三張圖。這裏我們可以看出出流量只有部分節點突增,這就是 topic 分區在集羣內部不夠分散,集中分佈到了某幾個 broker 導致,這種情況我們也需要進行擴容分區和均衡。

圖片

我們比較理想的流量分佈應該如下圖所示,各節點間流量偏差非常小,這種情況下,既可以增強集羣扛住流量異常突增的能力又可以提升集羣整體資源利用率和服務穩定性,降低成本。

圖片

圖片

負載均衡我們需要實現以下效果:

1)生成副本遷移計劃以及執行遷移任務平臺化、自動化、智能化;

2)執行均衡後 broker 間流量比較均勻,且單個 topic 分區均勻分佈在所有 broker 節點上;

3)執行均衡後 broker 內部多塊磁盤間流量比較均衡;

要實現這個效果,我們需要開發一套自己的負載均衡工具,如對開源的 cruise control 進行二次開發;此工具的核心主要在生成遷移計劃的策略,遷移計劃的生成方案直接影響到最後集羣負載均衡的效果。參考內容:

1. linkedIn/cruise-control
2. Introduction to Kafka Cruise Control
3. Cloudera Cruise Control REST API Reference

cruise control 架構圖如下:

圖片

在生成遷移計劃時,我們需要考慮以下幾點:

1)選擇核心指標作爲生成遷移計劃的依據,比如出流量、入流量、機架、單 topic 分區分散性等;

2)優化用來生成遷移計劃的指標樣本,比如過濾流量突增 / 突降 / 掉零等異常樣本;

3)各資源組的遷移計劃需要使用的樣本全部爲資源組內部樣本,不涉及其他資源組,無交叉;

4)治理單分區過大 topic,讓 topic 分區分佈更分散,流量不集中在部分 broker,讓 topic 單分區數據量更小,這樣可以減少遷移的數據量,提升遷移速度;

5)已經均勻分散在資源組內的 topic,加入遷移黑名單,不做遷移,這樣可以減少遷移的數據量,提升遷移速度;

6)做 topic 治理,排除長期無流量 topic 對均衡的干擾;

7)新建 topic 或者 topic 分區擴容時,應讓所有分區輪詢分佈在所有 broker 節點,輪詢後餘數分區優先分佈流量較低的 broker;

8)擴容 broker 節點後開啓負載均衡時,優先把同一 broker 分配了同一大流量(流量大而不是存儲空間大,這裏可以認爲是每秒的吞吐量)topic 多個分區 leader 的,遷移一部分到新 broker 節點;

9)提交遷移任務時,同一批遷移計劃中的分區數據大小偏差應該儘可能小,這樣可以避免遷移任務中小分區遷移完成後長時間等待大分區的遷移,造成任務傾斜;

1.9 安全認證

是不是我們的集羣所有人都可以隨意訪問呢?當然不是,爲了集羣的安全,我們需要進行權限認證,屏蔽非法操作。主要包括以下幾個方面需要做安全認證:

(1) 生產者權限認證;
(2) 消費者權限認證;
(3) 指定數據目錄遷移安全認證;

官網地址:http://kafka.apache.org

1.10 集羣容災

跨機架容災:

官網地址:http://kafka.apache.org

**跨集羣 / 機房容災:**如果有異地雙活等業務場景時,可以參考 Kafka2.7 版本的 MirrorMaker 2.0。

GitHub 地址:https://github.com

精確 KIP 地址 :https://cwiki.apache.org

**ZooKeeper 集羣上 Kafka 元數據恢復:**我們會定期對 ZooKeeper 上的權限信息數據做備份處理,當集羣元數據異常時用於恢復。

1.11 參數 / 配置優化

**broker 服務參數優化:**這裏我只列舉部分影響性能的核心參數。

num.network.threads
#創建Processor處理網絡請求線程個數,建議設置爲broker當CPU核心數*2,這個值太低經常出現網絡空閒太低而缺失副本。
num.io.threads
#創建KafkaRequestHandler處理具體請求線程個數,建議設置爲broker磁盤個數*2
num.replica.fetchers
#建議設置爲CPU核心數/4,適當提高可以提升CPU利用率及follower同步leader數據當並行度。
compression.type
#建議採用lz4壓縮類型,壓縮可以提升CPU利用率同時可以減少網絡傳輸數據量。
queued.max.requests
#如果是生產環境,建議配置最少500以上,默認爲500。
log.flush.scheduler.interval.ms
log.flush.interval.ms
log.flush.interval.messages
#這幾個參數表示日誌數據刷新到磁盤的策略,應該保持默認配置,刷盤策略讓操作系統去完成,由操作系統來決定什麼時候把數據刷盤;
#如果設置來這個參數,可能對吞吐量影響非常大;
auto.leader.rebalance.enable
#表示是否開啓leader自動負載均衡,默認true;我們應該把這個參數設置爲false,因爲自動負載均衡不可控,可能影響集羣性能和穩定;

**生產優化:**這裏我只列舉部分影響性能的核心參數。

linger.ms
#客戶端生產消息等待多久時間才發送到服務端,單位:毫秒。和batch.size參數配合使用;適當調大可以提升吞吐量,但是如果客戶端如果down機有丟失數據風險;
batch.size
#客戶端發送到服務端消息批次大小,和linger.ms參數配合使用;適當調大可以提升吞吐量,但是如果客戶端如果down機有丟失數據風險;
compression.type
#建議採用lz4壓縮類型,具備較高的壓縮比及吞吐量;由於Kafka對CPU的要求並不高,所以,可以通過壓縮,充分利用CPU資源以提升網絡吞吐量;
buffer.memory
#客戶端緩衝區大小,如果topic比較大,且內存比較充足,可以適當調高這個參數,默認只爲33554432(32MB)
retries
#生產失敗後的重試次數,默認0,可以適當增加。當重試超過一定次數後,如果業務要求數據準確性較高,建議做容錯處理。
retry.backoff.ms
#生產失敗後,重試時間間隔,默認100ms,建議不要設置太大或者太小。

除了一些核心參數優化外,我們還需要考慮比如 topic 的分區個數和 topic 保留時間;如果分區個數太少,保留時間太長,但是寫入數據量非常大的話,可能造成以下問題:

1)topic 分區集中落在某幾個 broker 節點上,導致流量副本失衡;

2)導致 broker 節點內部某幾塊磁盤讀寫超負載,存儲被寫爆;

1.11.1 消費優化

消費最大的問題,並且經常出現的問題就是消費延時,拉歷史數據。當大量拉取歷史數據時將出現大量讀盤操作,污染 pagecache,這個將加重磁盤的負載,影響集羣性能和穩定;

可以怎樣減少或者避免大量消費延時呢?

當 topic 數據量非常大時,建議一個分區開啓一個線程去消費;

對 topic 消費延時添加監控告警,及時發現處理;

當 topic 數據可以丟棄時,遇到超大延時,比如單個分區延遲記錄超過千萬甚至數億,那麼可以重置 topic 的消費點位進行緊急處理;【此方案一般在極端場景才使用】

避免重置 topic 的分區 offset 到很早的位置,這可能造成拉取大量歷史數據;

1.11.2 Linux 服務器參數優化

我們需要對 Linux 的文件句柄、pagecache 等參數進行優化。可參考《Linux Page Cache 調優在 Kafka 中的應用》。

1.12. 硬件優化

磁盤優化

在條件允許的情況下,可以採用 SSD 固態硬盤替換 HDD 機械硬盤,解決機械盤 IO 性能較低的問題;如果沒有 SSD 固態硬盤,則可以對服務器上的多塊硬盤做硬 RAID(一般採用 RAID10),讓 broker 節點的 IO 負載更加均衡。如果是 HDD 機械硬盤,一個 broker 可以掛載多塊硬盤,比如 12 塊 * 4TB。

內存

由於 Kafka 屬於高頻讀寫型服務,而 Linux 的讀寫請求基本走的都是 Page Cache,所以單節點內存大一些對性能會有比較明顯的提升。一般選擇 256GB 或者更高。

網絡

提升網絡帶寬:在條件允許的情況下,網絡帶寬越大越好。因爲這樣網絡帶寬纔不會成爲性能瓶頸,最少也要達到萬兆網絡( 10Gb,網卡爲全雙工)才能具備相對較高的吞吐量。如果是單通道,網絡出流量與入流量之和的上限理論值是 1.25GB/s;如果是雙工雙通道,網絡出入流量理論值都可以達到 1.25GB/s。

網絡隔離打標:由於一個機房可能既部署有離線集羣(比如 HBase、Spark、Hadoop 等)又部署有實時集羣(如 Kafka)。那麼實時集羣和離線集羣掛載到同一個交換機下的服務器將出現競爭網絡帶寬的問題,離線集羣可能對實時集羣造成影響。所以我們需要進行交換機層面的隔離,讓離線機器和實時集羣不要掛載到相同的交換機下。即使有掛載到相同交換機下面的,我們也將進行網絡通行優先級(金、銀、銅、鐵)標記,當網絡帶寬緊張的時候,讓實時業務優先通行。

CPU

Kafka 的瓶頸不在 CPU,單節點一般有 32 核的 CPU 都足夠使用。

1.13. 平臺化

現在問題來了,前面我們提到很多監控、優化等手段;難道我們管理員或者業務用戶對集羣所有的操作都需要登錄集羣服務器嗎?答案當然是否定的,我們需要豐富的平臺化功能來支持。一方面是爲了提升我們操作的效率,另外一方面也是爲了提升集羣的穩定和降低出錯的可能。

配置管理

黑屏操作,每次修改 broker 的 server.properties 配置文件都沒有變更記錄可追溯,有時可能因爲有人修改了集羣配置導致一些故障,卻找不到相關記錄。如果我們把配置管理做到平臺上,每次變更都有跡可循,同時降低了變更出錯的風險。

滾動重啓

當我們需要做線上變更時,有時候需要對集羣對多個節點做滾動重啓,如果到命令行去操作,那效率將變得很低,而且需要人工去幹預,浪費人力。這個時候我們就需要把這種重複性的工作進行平臺化,提升我們的操作效率。

集羣管理

集羣管理主要是把原來在命令行的一系列操作做到平臺上,用戶和管理員不再需要黑屏操作 Kafka 集羣;這樣做主要有以下優點:

提升操作效率;

操作出錯概率更小,集羣更安全;

所有操作有跡可循,可以追溯;

集羣管理主要包括:broker 管理、topic 管理、生產 / 消費權限管理、用戶管理等

1.13.1 mock 功能

在平臺上爲用戶的 topic 提供生產樣例數據與消費抽樣的功能,用戶可以不用自己寫代碼也可以測試 topic 是否可以使用,權限是否正常;

在平臺上爲用戶的 topic 提供生產 / 消費權限驗證功能,讓用戶可以明確自己的賬號對某個 topic 有沒有讀寫權限;

1.13.2 權限管理

把用戶讀 / 寫權限管理等相關操作進行平臺化。

1.13.3 擴容 / 縮容

把 broker 節點上下線做到平臺上,所有的上線和下線節點不再需要操作命令行。

1.13.4 集羣治理

1)無流量 topic 的治理,對集羣中無流量 topic 進行清理,減少過多無用元數據對集羣造成的壓力;

2)topic 分區數據大小治理,把 topic 分區數據量過大的 topic(如單分區數據量超過 100GB / 天)進行梳理,看看是否需要擴容,避免數據集中在集羣部分節點上;

3)topic 分區數據傾斜治理,避免客戶端在生產消息的時候,指定消息的 key,但是 key 過於集中,消息只集中分佈在部分分區,導致數據傾斜;

4)topic 分區分散性治理,讓 topic 分區分佈在集羣儘可能多的 broker 上,這樣可以避免因 topic 流量突增,流量只集中到少數節點上的風險,也可以避免某個 broker 異常對 topic 影響非常大;

5)topic 分區消費延時治理;一般有延時消費較多的時候有兩種情況,一種是集羣性能下降,另外一種是業務方的消費併發度不夠,如果是消費者併發不夠的化應該與業務聯繫增加消費併發。

1.13.5 監控告警

1)把所有指標採集做成平臺可配置,提供統一的指標採集和指標展示及告警平臺,實現一體化監控;

2)把上下游業務進行關聯,做成全鏈路監控;

3)用戶可以配置 topic 或者分區流量延時、突變等監控告警;

1.13.6 業務大屏

業務大屏主要指標:集羣個數、節點個數、日入流量大小、日入流量記錄、日出流量大小、日出流量記錄、每秒入流量大小、每秒入流量記錄、每秒出流量大小、每秒出流量記錄、用戶個數、生產延時、消費延時、數據可靠性、服務可用性、數據存儲大小、資源組個數、topic 個數、分區個數、副本個數、消費組個數等指標。

1.13.7 流量限制

把用戶流量現在做到平臺,在平臺進行智能限流處理。

1.13.8 負載均衡

把自動負載均衡功能做到平臺,通過平臺進行調度和管理。

1.13.9 資源預算

當集羣達到一定規模,流量不斷增長,那麼集羣擴容機器從哪裏來呢?業務的資源預算,讓集羣裏面的多個業務根據自己在集羣中當流量去分攤整個集羣的硬件成本;當然,獨立集羣與獨立隔離的資源組,預算方式可以單獨計算。

1.14. 性能評估

1.14.1 單 broker 性能評估

我們做單 broker 性能評估的目的包括以下幾方面:

  1. 爲我們進行資源申請評估提供依據;

  2. 讓我們更瞭解集羣的讀寫能力及瓶頸在哪裏,針對瓶頸進行優化;

  3. 爲我們限流閾值設置提供依據;

  4. 爲我們評估什麼時候應該擴容提供依據;

1.14.2 topic 分區性能評估

  1. 爲我們創建 topic 時,評估應該指定多少分區合理提供依據;

  2. 爲我們 topic 的分區擴容評估提供依據;

1.14.3 單磁盤性能評估

  1. 爲我們瞭解磁盤的真正讀寫能力,爲我們選擇更合適 Kafka 的磁盤類型提供依據;

  2. 爲我們做磁盤流量告警閾值設置提供依據;

1.14.4 集羣規模限制摸底

1)我們需要了解單個集羣規模的上限或者是元數據規模的上限,探索相關信息對集羣性能和穩定性的影響;

2)根據摸底情況,評估集羣節點規模的合理範圍,及時預測風險,進行超大集羣的拆分等工作;

1.15 DNS+LVS 的網絡架構

當我們的集羣節點達到一定規模,比如單集羣數百個 broker 節點,那麼此時我們生產消費客戶端指定 bootstrap.servers 配置時,如果指定呢?是隨便選擇其中幾個 broker 配置還是全部都配上呢?

其實以上做法都不合適,如果只配置幾個 IP,當我們配置當幾個 broker 節點下線後,我們當應用將無法連接到 Kafka 集羣;如果配置所有 IP,那更不現實啦,幾百個 IP,那麼我們應該怎麼做呢?

**方案:**採用 DNS+LVS 網絡架構,最終生產者和消費者客戶端只需要配置域名就可以啦。需要注意的是,有新節點加入集羣時,需要添加映射;有節點下線時,需要從映射中踢掉,否則這批機器如果拿到其他的地方去使用,如果端口和 Kafka 的一樣的話,原來集羣部分請求將發送到這個已經下線的服務器上來,造成生產環境重點故障。

二、開源版本功能缺陷

RTMP 協議主要的特點有:多路複用,分包和應用層協議。以下將對這些特點進行詳細的描述。

2.1 副本遷移

無法實現增量遷移;【我們已經基於 2.1.1 版本源碼改造,實現了增量遷移】

無法實現併發遷移;【開源版本直到 2.6.0 才實現了併發遷移】

無法實現終止遷移;【我們已經基於 2.1.1 版本源碼改造,實現了終止副本遷移】【開源版本直到 2.6.0 才實現了暫停遷移,和終止遷移有些不一樣,不會回滾元數據】

當指定遷移數據目錄時,遷移過程中,如果把 topic 保留時間改短,topic 保留時間針對正在遷移 topic 分區不生效,topic 分區過期數據無法刪除;【開源版本 bug,目前還沒有修復】

當指定遷移數據目錄時,當遷移計劃爲以下場景時,整個遷移任務無法完成遷移,一直處於卡死狀態;【開源版本 bug,目前還沒有修復】

遷移過程中,如果有重啓 broker 節點,那個 broker 節點上的所有 leader 分區無法切換回來,導致節點流量全部轉移到其他節點,直到所有副本被遷移完畢後 leader 纔會切換回來;【開源版本 bug,目前還沒有修復】。

在原生的Kafka版本中存在以下指定數據目錄場景無法遷移完畢的情況,此版本我們也不決定修復次bug:
1.針對同一個topic分區,如果部分目標副本相比原副本是所屬broker發生變化,部分目標副本相比原副本是broker內部所屬數據目錄發生變化;
那麼副本所屬broker發生變化的那個目標副本可以正常遷移完畢,目標副本是在broker內部數據目錄發生變化的無法正常完成遷移;
但是舊副本依然可以正常提供生產、消費服務,並且不影響下一次遷移任務的提交,下一次遷移任務只需要把此topic分區的副本列表所屬broker列表變更後提交依然可以正常完成遷移,並且可以清理掉之前未完成的目標副本;
這裏假設topic yyj1的初始化副本分佈情況如下:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000003,1000001],"log_dirs":["/kfk211data/data31","/kfk211data/data13"]}
]
}
//遷移場景1:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000003,1000002],"log_dirs":["/kfk211data/data32","/kfk211data/data23"]}
]
}
//遷移場景2:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000002,1000001],"log_dirs":["/kfk211data/data22","/kfk211data/data13"]}
]
}
針對上述的topic yyj1的分佈分佈情況,此時如果我們的遷移計劃爲“遷移場景1”或遷移場景2“,那麼都將出現有副本無法遷移完畢的情況。
但是這並不影響舊副本處理生產、消費請求,並且我們可以正常提交其他的遷移任務。
爲了清理舊的未遷移完成的副本,我們只需要修改一次遷移計劃【新的目標副本列表和當前分區已分配副本列表完全不同即可】,再次提交遷移即可。
這裏,我們依然以上述的例子做遷移計劃修改如下:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000004,1000005],"log_dirs":["/kfk211data/data42","/kfk211data/data53"]}
]
}
這樣我們就可以正常完成遷移。

2.2 流量協議

限流粒度較粗,不夠靈活精準,不夠智能。

當前限流維度組合

/config/users/<user>/clients/<client-id>
/config/users/<user>/clients/<default>
/config/users/<user>
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>

存在問題

當同一個 broker 上有多個用戶同時進行大量的生產和消費時,想要讓 broker 可以正常運行,那必須在做限流時讓所有的用戶流量閾值之和不超過 broker 的吞吐上限;如果超過 broker 上限,那麼 broker 就存在被打掛的風險;然而,即使用戶流量沒有達到 broker 的流量上限,但是,如果所有用戶流量集中到了某幾塊盤上,超過了磁盤的讀寫負載,也會導致所有生產、消費請求將被阻塞,broker 可能處於假死狀態。

解決方案

(1) 改造源碼,實現單個 broker 流量上限限制,只要流量達到 broker 上限立即進行限流處理,所有往這個 broker 寫的用戶都可以被限制住;或者對用戶進行優先級處理,放過高優先級的,限制低優先級的;

(2) 改造源碼,實現 broker 上單塊磁盤流量上限限制(很多時候都是流量集中到某幾塊磁盤上,導致沒有達到 broker 流量上限卻超過了單磁盤讀寫能力上限),只要磁盤流量達到上限,立即進行限流處理,所有往這個磁盤寫的用戶都可以被限制住;或者對用戶進行優先級處理,放過高優先級的,限制低優先級的;

(3) 改造源碼,實現 topic 維度限流以及對 topic 分區的禁寫功能;

(4) 改造源碼,實現用戶、broker、磁盤、topic 等維度組合精準限流;

三、kafka 發展趨勢

3.1 Kafka 社區迭代計劃

3.2 逐步棄用 ZooKeeper(KIP-500)

3.3 controller 與 broker 分離,引入 raft 協議作爲 controller 的仲裁機制(KIP-630)

3.4 分層存儲(KIP-405)

3.5 可以減少 topic 分區(KIP-694)

3.6 MirrorMaker2 精確一次(KIP-656)

3.7 下載與各版本特性說明

3.8 Kafka 所有 KIP 地址

四、如何貢獻社區

4.1 哪些點可以貢獻

http://kafka.apache.org/contributing

4.2 wiki 貢獻地址

https://cwiki.apache.org/confluence/dashboard.action#all-updates

4.3 issues 地址

1)https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-10444?filter=allopenissues

2)https://issues.apache.org/jira/secure/BrowseProjects.jspa?selectedCategory=all

4.4 主要 committers

http://kafka.apache.org/committers

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/4t4k6aO_1P_wSH6pU2geog