2 萬文字,一文搞懂 Kafka

Kafka 作爲消息隊列,已經被我們用在了各個行業,今天分享一下 Kafka 相關知識點。

1、爲什麼有消息系統


  1. 解耦合

  2. 異步處理 例如電商平臺,秒殺活動。一般流程會分爲:1: 風險控制、2:庫存鎖定、3:生成訂單、4:短信通知、5:更新數據通過消息系統將秒殺活動業務拆分開,將不急需處理的業務放在後面慢慢處理;流程改爲:1:風險控制、2:庫存鎖定、3:消息系統、4:生成訂單、5:短信通知、6:更新數據

  3. 流量的控制 3.1 網關在接受到請求後,就把請求放入到消息隊列裏面 3.2 後端的服務從消息隊列裏面獲取到請求,完成後續的秒殺處理流程。然後再給用戶返回結果。優點:控制了流量 缺點:會讓流程變慢

2、Kafka 核心概念

生產者:Producer 往 Kafka 集羣生成數據消費者:Consumer 往 Kafka 裏面去獲取數據,處理數據、消費數據 Kafka 的數據是由消費者自己去拉去 Kafka 裏面的數據主題:topic 分區:partition 默認一個 topic 有一個分區(partition),自己可設置多個分區(分區分散存儲在服務器不同節點上)

3、Kafka 的集羣架構

Kafka 集羣中,一個 kafka 服務器就是一個 broker Topic 只是邏輯上的概念,partition 在磁盤上就體現爲一個目錄 Consumer Group:消費組 消費數據的時候,都必須指定一個 group id,指定一個組的 id 假定程序 A 和程序 B 指定的 group id 號一樣,那麼兩個程序就屬於同一個消費組特殊: 比如,有一個主題 topicA 程序 A 去消費了這個 topicA,那麼程序 B 就不能再去消費 topicA(程序 A 和程序 B 屬於一個消費組) 再比如程序 A 已經消費了 topicA 裏面的數據,現在還是重新再次消費 topicA 的數據,是不可以的,但是重新指定一個 group id 號以後,可以消費。不同消費組之間沒有影響。消費組需自定義,消費者名稱程序自動生成(獨一無二)。Controller:Kafka 節點裏面的一個主節點。藉助 zookeeper

4、Kafka 磁盤順序寫保證寫數據性能

**kafka 寫數據:**順序寫,往磁盤上寫數據時,就是追加數據,沒有隨機寫的操作。經驗: 如果一個服務器磁盤達到一定的個數,磁盤也達到一定轉數,往磁盤裏面順序寫(追加寫)數據的速度和寫內存的速度差不多生產者生產消息,經過kafka服務先寫到os cache 內存中,然後經過sync順序寫到磁盤上

5、Kafka 零拷貝機制保證讀數據高性能

消費者讀取數據流程:

  1. 消費者發送請求給 kafka 服務

  2. kafka 服務去 os cache 緩存讀取數據(緩存沒有就去磁盤讀取數據)

  3. 從磁盤讀取了數據到 os cache 緩存中

  4. os cache 複製數據到 kafka 應用程序中

  5. kafka 將數據(複製)發送到 socket cache 中

  6. socket cache 通過網卡傳輸給消費者

kafka linux sendfile 技術 — 零拷貝

  1. 消費者發送請求給 kafka 服務 2.kafka 服務去 os cache 緩存讀取數據(緩存沒有就去磁盤讀取數據) 3. 從磁盤讀取了數據到 os cache 緩存中 4.os cache 直接將數據發送給網卡 5. 通過網卡將數據傳輸給消費者

6、Kafka 日誌分段保存

Kafka 中一個主題,一般會設置分區;比如創建了一個topic_a,然後創建的時候指定了這個主題有三個分區。其實在三臺服務器上,會創建三個目錄。服務器 1(kafka1)創建目錄 topic_a-0:。目錄下面是我們文件(存儲數據),kafka 數據就是 message,數據存儲在 log 文件裏。.log 結尾的就是日誌文件,在 kafka 中把數據文件就叫做日誌文件 。一個分區下面默認有 n 多個日誌文件(分段存儲),一個日誌文件默認 1G服務器 2(kafka2):創建目錄 topic_a-1: 服務器 3(kafka3):創建目錄 topic_a-2:

7、Kafka 二分查找定位數據

         Kafka 裏面每一條消息,都有自己的 offset(相對偏移量),存在物理磁盤上面,在 position Position:物理位置(磁盤上面哪個地方)也就是說一條消息就有兩個位置:offset:相對偏移量(相對位置)position:磁盤物理位置稀疏索引:         Kafka 中採用了稀疏索引的方式讀取索引,kafka 每當寫入了 4k 大小的日誌(.log),就往 index 裏寫入一個記錄索引。其中會採用二分查找

8、高併發網絡設計(先了解 NIO)

         網絡設計部分是 kafka 中設計最好的一個部分,這也是保證 Kafka 高併發、高性能的原因,對 kafka 進行調優,就得對 kafka 原理比較瞭解,尤其是網絡設計部分

Reactor 網絡設計模式 1:Reactor 網絡設計模式 2:Reactor 網絡設計模式 3:Kafka 超高併發網絡設計:

9、Kafka 冗餘副本保證高可用

在 kafka 裏面分區是有副本的,注:0.8 以前是沒有副本機制的。創建主題時,可以指定分區,也可以指定副本個數。副本是有角色的:leader partition:1、寫數據、讀數據操作都是從 leader partition 去操作的。2、會維護一個 ISR(in-sync- replica )列表,但是會根據一定的規則刪除 ISR 列表裏面的值 生產者發送來一個消息,消息首先要寫入到 leader partition 中 寫完了以後,還要把消息寫入到 ISR 列表裏面的其它分區,寫完後纔算這個消息提交 follower partition:從 leader partition 同步數據。

10、優秀架構思考 - 總結

Kafka — 高併發、高可用、高性能 高可用:多副本機制 高併發:網絡架構設計 三層架構:多 selector -> 多線程 -> 隊列的設計(NIO) 高性能:寫數據:

  1. 把數據先寫入到 OS Cache

  2. 寫到磁盤上面是順序寫,性能很高

讀數據:

  1. 根據稀疏索引,快速定位到要消費的數據

  2. 零拷貝機制 減少數據的拷貝 減少了應用程序與操作系統上下文切換

11、Kafka 生產環境搭建

11.1 需求場景分析

電商平臺,需要每天 10 億請求都要發送到 Kafka 集羣上面。二八反正,一般評估出來問題都不大。10 億請求 -> 24 過來的,一般情況下,每天的 12:00 到早上 8:00 這段時間其實是沒有多大的數據量的。80% 的請求是用的另外 16 小時的處理的。16 個小時處理 -> 8 億的請求。16 * 0.2 = 3 個小時 處理了 8 億請求的 80% 的數據

也就是說 6 億的數據是靠 3 個小時處理完的。我們簡單的算一下高峯期時候的 qps6億/3小時 =5.5萬/s qps=5.5萬

10 億請求 * 50kb = 46T 每天需要存儲 46T 的數據

一般情況下,我們都會設置兩個副本 46T * 2 = 92T  Kafka 裏面的數據是有保留的時間週期,保留最近 3 天的數據。92T * 3 天 = 276T 我這兒說的是 50kb 不是說一條消息就是 50kb 不是(把日誌合併了,多條日誌合併在一起),通常情況下,一條消息就幾 b,也有可能就是幾百字節。

11.2 物理機數量評估

1)首先分析一下是需要虛擬機還是物理機 像 Kafka mysql hadoop 這些集羣搭建的時候,我們生產裏面都是使用物理機。2)高峯期需要處理的請求總的請求每秒 5.5 萬個,其實一兩臺物理機絕對是可以抗住的。一般情況下,我們評估機器的時候,是按照高峯期的 4 倍的去評估。如果是 4 倍的話,大概我們集羣的能力要準備到 20 萬 qps。這樣子的集羣纔是比較安全的集羣。大概就需要 5 臺物理機。每臺承受 4 萬請求。

場景總結:搞定10億請求,高峯期5.5萬的qps,276T的數據,需要5臺物理機。

11.3 磁盤選擇

搞定 10 億請求,高峯期 5.5 萬的 qps,276T 的數據,需要 5 臺物理機。1)SSD 固態硬盤,還是需要普通的機械硬盤 **SSD 硬盤:性能比較好,但是價格貴 SAS 盤:某方面性能不是很好,但是比較便宜。SSD 硬盤性能比較好,指的是它隨機讀寫的性能比較好。適合 MySQL 這樣集羣。**但是其實他的順序寫的性能跟 SAS 盤差不多。kafka 的理解:就是用的順序寫。所以我們就用普通的【機械硬盤】就可以了。

2)需要我們評估每臺服務器需要多少塊磁盤 5 臺服務器,一共需要 276T ,大約每臺服務器 需要存儲 60T 的數據。我們公司裏面服務器的配置用的是 11 塊硬盤,每個硬盤 7T。11 * 7T = 77T

77T * 5 臺服務器 = 385T。

場景總結:

搞定10億請求,需要5臺物理機,11(SAS) * 7T

11.4 內存評估

搞定 10 億請求,需要 5 臺物理機,11(SAS) * 7T

我們發現 kafka 讀寫數據的流程 都是基於 os cache, 換句話說假設咱們的 os cashe 無限大那麼整個 kafka 是不是相當於就是基於內存去操作,如果是基於內存去操作,性能肯定很好。內存是有限的。1) 儘可能多的內存資源要給 os cache 2) Kafka 的代碼用 核心的代碼用的是 scala 寫的,客戶端的代碼 java 寫的。都是基於 jvm。所以我們還要給一部分的內存給 jvm。Kafka 的設計,沒有把很多數據結構都放在 jvm 裏面。所以我們的這個 jvm 不需要太大的內存。根據經驗,給個 10G 就可以了

NameNode: jvm 裏面還放了元數據(幾十 G),JVM 一定要給得很大。比如給個 100G。

假設我們這個 10 請求的這個項目,一共會有 100 個 topic。100 topic * 5 partition * 2 = 1000 partition 一個 partition 其實就是物理機上面的一個目錄,這個目錄下面會有很多個. log 的文件。.log 就是存儲數據文件,默認情況下一個. log 文件的大小是 1G。我們如果要保證 1000 個 partition 的最新的. log 文件的數據 如果都在內存裏面,這個時候性能就是最好。1000 * 1G = 1000G 內存. 我們只需要把當前最新的這個 log 保證裏面的 25% 的最新的數據在內存裏面。250M * 1000 = 0.25 G* 1000 =250G 的內存。

250 內存 / 5 = 50G 內存 50G+10G = 60G 內存

64G 的內存,另外的 4G,操作系統本生是不是也需要內存。其實 Kafka 的 jvm 也可以不用給到 10G 這麼多。評估出來 64G 是可以的。當然如果能給到 128G 的內存的服務器,那就最好。

我剛剛評估的時候用的都是一個 topic 是 5 個 partition,但是如果是數據量比較大的 topic,可能會有 10 個 partition。

總結:搞定10億請求,需要5臺物理機,11(SAS) * 7T ,需要64G的內存(128G更好)

11.5 CPU 壓力評估

評估一下每臺服務器需要多少 cpu core(資源很有限)

我們評估需要多少個 cpu ,依據就是看我們的服務裏面有多少線程去跑。線程就是依託 cpu 去運行的。如果我們的線程比較多,但是 cpu core 比較少,這樣的話,我們的機器負載就會很高,性能不就不好。

評估一下,kafka 的一臺服務器 啓動以後會有多少線程?

Acceptor 線程 1 processor 線程 3 6~9 個線程 處理請求線程 8 個 32 個線程 定時清理的線程,拉取數據的線程,定時檢查 ISR 列表的機制 等等。所以大概一個 Kafka 的服務啓動起來以後,會有一百多個線程。

cpu core = 4 個,一遍來說,幾十個線程,就肯定把 cpu 打滿了。cpu core = 8 個,應該很輕鬆的能支持幾十個線程。如果我們的線程是 100 多個,或者差不多 200 個,那麼 8 個 cpu core 是搞不定的。所以我們這兒建議:CPU core = 16 個。如果可以的話,能有 32 個 cpu core 那就最好。

結論:kafka 集羣,最低也要給 16 個 cpu core,如果能給到 32 cpu core 那就更好。2cpu * 8 =16 cpu core 4cpu * 8 = 32 cpu core

總結:搞定10億請求,需要5臺物理機,11(SAS) * 7T ,需要64G的內存(128G更好),需要16個cpu core(32個更好)

11.6 網絡需求評估

評估我們需要什麼樣網卡?一般要麼是千兆的網卡(1G/s),還有的就是萬兆的網卡(10G/s)

高峯期的時候 每秒會有5.5萬的請求湧入,5.5/5 = 大約是每臺服務器會有1萬個請求湧入。
我們之前說的,
10000 * 50kb = 488M  也就是每條服務器,每秒要接受488M的數據。數據還要有副本,副本之間的同步
也是走的網絡的請求。488 * 2 = 976m/s
說明一下:
   很多公司的數據,一個請求裏面是沒有50kb這麼大的,我們公司是因爲主機在生產端封裝了數據
   然後把多條數據合併在一起了,所以我們的一個請求才會有這麼大。
   
說明一下:
   一般情況下,網卡的帶寬是達不到極限的,如果是千兆的網卡,我們能用的一般就是700M左右。
   但是如果最好的情況,我們還是使用萬兆的網卡。
   如果使用的是萬兆的,那就是很輕鬆。

11.7 集羣規劃

請求量 規劃物理機的個數 分析磁盤的個數,選擇使用什麼樣的磁盤 內存 cpu core 網卡就是告訴大家,以後要是公司裏面有什麼需求,進行資源的評估,服務器的評估,大家按照我的思路去評估

一條消息的大小 50kb -> 1kb 500byte 1Mip 主機名 192.168.0.100 hadoop1 192.168.0.101 hadoop2 192.168.0.102 hadoop3

主機的規劃:kafka 集羣架構的時候:主從式的架構:controller -> 通過 zk 集羣來管理整個集羣的元數據。

  1. zookeeper 集羣 hadoop1 hadoop2 hadoop3

  2. kafka 集羣 理論上來講,我們不應該把 kafka 的服務於 zk 的服務安裝在一起。但是我們這兒服務器有限。所以我們 kafka 集羣也是安裝在 hadoop1 haadoop2 hadoop3

12、kafka 運維

12.1 常見運維工具介紹

KafkaManager — 頁面管理工具

12.2 常見運維命令

場景一:topic 數據量太大,要增加 topic 數

一開始創建主題的時候,數據量不大,給的分區數不多。

kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic test6
kafka-topics.sh --alter --zookeeper hadoop1:2181,hadoop2:2181,ha

broker id:

hadoop1:0 hadoop2:1 hadoop3:2 假設一個 partition 有三個副本:partition0:a,b,c

a:leader partition b,c:follower partition

ISR:{a,b,c}如果一個follower分區 超過10秒 沒有向leader partition去拉取數據,那麼這個分區就從ISR列表裏面移除。

場景二:核心 topic 增加副本因子

如果對核心業務數據需要增加副本因子 vim test.json 腳本,將下面一行 json 腳本保存

{“version”:1,“partitions”:[{“topic”:“test6”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:1,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:2,“replicas”:[0,1,2]}]}

執行上面 json 腳本:

kafka-reassign-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --reassignment-json-file test.json --execute

場景三:負載不均衡的 topic,手動遷移 vi topics-to-move.json

{“topics”: [{“topic”: “test01”}{“topic”: “test02”}], “version”: 1} // 把你所有的topic都寫在這裏
kafka-reassgin-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topics-to-move-json-file topics-to-move.json --broker-list “5,6” --generate

          把你所有的包括新加入的 broker 機器都寫在這裏,就會說是把所有的 partition 均勻的分散在各個 broker 上,包括新進來的 broker 此時會生成一個遷移方案,可以保存到一個文件裏去:expand-cluster-reassignment.json

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

這種數據遷移操作一定要在晚上低峯的時候來做,因爲他會在機器之間遷移數據,非常的佔用帶寬資源–generate: 根據給予的 Topic 列表和 Broker 列表生成遷移計劃。generate 並不會真正進行消息遷移,而是將消息遷移計劃計算出來,供 execute 命令使用。–execute: 根據給予的消息遷移計劃進行遷移。–verify: 檢查消息是否已經遷移完成。

場景四:如果某個 broker leader partition 過多

正常情況下,我們的 leader partition 在服務器之間是負載均衡。hadoop1 4 hadoop2 1 hadoop3 1

現在各個業務方可以自行申請創建 topic,分區數量都是自動分配和後續動態調整的, kafka 本身會自動把 leader partition 均勻分散在各個機器上,這樣可以保證每臺機器的讀寫吞吐量都是均勻的 但是也有例外,那就是如果某些 broker 宕機,會導致 leader partition 過於集中在其他少部分幾臺 broker 上, 這會導致少數幾臺 broker 的讀寫請求壓力過高,其他宕機的 broker 重啓之後都是 folloer partition,讀寫請求很低, 造成集羣負載不均衡有一個參數,auto.leader.rebalance.enable,默認是 true, 每隔 300 秒(leader.imbalance.check.interval.seconds)檢查 leader 負載是否平衡 如果一臺 broker 上的不均衡的 leader 超過了 10%,leader.imbalance.per.broker.percentage, 就會對這個 broker 進行選舉 配置參數:auto.leader.rebalance.enable 默認是 true leader.imbalance.per.broker.percentage: 每個 broker 允許的不平衡的 leader 的比率。如果每個 broker 超過了這個值,控制器會觸發 leader 的平衡。這個值表示百分比。10% leader.imbalance.check.interval.seconds:默認值 300 秒

13、Kafka 生產者

13.1 生產者發送消息原理

13.2 生產者發送消息原理—基礎案例演示

13.3 如何提升吞吐量

如何提升吞吐量:參數一:buffer.memory:設置發送消息的緩衝區,默認值是 33554432,就是 32MB 參數二:compression.type:默認是 none,不壓縮,但是也可以使用 lz4 壓縮,效率還是不錯的,壓縮之後可以減小數據量,提升吞吐量,但是會加大 producer 端的 cpu 開銷 參數三:batch.size:設置 batch 的大小,如果 batch 太小,會導致頻繁網絡請求,吞吐量下降;如果 batch 太大,會導致一條消息需要等待很久才能被髮送出去,而且會讓內存緩衝區有很大壓力,過多數據緩衝在內存裏,默認值是:16384,就是 16kb,也就是一個 batch 滿了 16kb 就發送出去,一般在實際生產環境,這個 batch 的值可以增大一些來提升吞吐量,如果一個批次設置大了,會有延遲。一般根據一條消息大小來設置。如果我們消息比較少。配合使用的參數 linger.ms,這個值默認是 0,意思就是消息必須立即被髮送,但是這是不對的,一般設置一個 100 毫秒之類的,這樣的話就是說,這個消息被髮送出去後進入一個 batch,如果 100 毫秒內,這個 batch 滿了 16kb,自然就會發送出去。

13.4 如何處理異常

  1. LeaderNotAvailableException:這個就是如果某臺機器掛了,此時 leader 副本不可用,會導致你寫入失敗,要等待其他 follower 副本切換爲 leader 副本之後,才能繼續寫入,此時可以重試發送即可;如果說你平時重啓 kafka 的 broker 進程,肯定會導致 leader 切換,一定會導致你寫入報錯,是 LeaderNotAvailableException。

  2. NotControllerException:這個也是同理,如果說 Controller 所在 Broker 掛了,那麼此時會有問題,需要等待 Controller 重新選舉,此時也是一樣就是重試即可。

  3. NetworkException:網絡異常 timeout a. 配置 retries 參數,他會自動重試的 b. 但是如果重試幾次之後還是不行,就會提供 Exception 給我們來處理了, 我們獲取到異常以後,再對這個消息進行單獨處理。我們會有備用的鏈路。發送不成功的消息發送到 Redis 或者寫到文件系統中,甚至是丟棄。

13.5 重試機制

重試會帶來一些問題:

  1. 消息會重複有的時候一些 leader 切換之類的問題,需要進行重試,設置 retries 即可,但是消息重試會導致, 重複發送的問題,比如說網絡抖動一下導致他以爲沒成功,就重試了,其實人家都成功了.

  2. 消息亂序消息重試是可能導致消息的亂序的,因爲可能排在你後面的消息都發送出去了。所以可以使用 "max.in.flight.requests.per.connection" 參數設置爲 1, 這樣可以保證 producer 同一時間只能發送一條消息。兩次重試的間隔默認是 100 毫秒,用 "retry.backoff.ms" 來進行設置 基本上在開發過程中,靠重試機制基本就可以搞定 95% 的異常問題。

13.6 ACK 參數詳解

producer 端設置的 request.required.acks=0;只要請求已發送出去,就算是發送完了,不關心有沒有寫成功。性能很好,如果是對一些日誌進行分析,可以承受丟數據的情況,用這個參數,性能會很好。request.required.acks=1;發送一條消息,當 leader partition 寫入成功以後,纔算寫入成功。不過這種方式也有丟數據的可能。request.required.acks=-1;需要 ISR 列表裏面,所有副本都寫完以後,這條消息纔算寫入成功。ISR:1 個副本。1 leader partition 1 follower partition kafka 服務端:min.insync.replicas:1, 如果我們不設置的話,默認這個值是 1 一個 leader partition 會維護一個 ISR 列表,這個值就是限制 ISR 列表裏面 至少得有幾個副本,比如這個值是 2,那麼當 ISR 列表裏面只有一個副本的時候。往這個分區插入數據的時候會報錯。設計一個不丟數據的方案:數據不丟失的方案:1) 分區副本 >=2 2)acks = -1 3)min.insync.replicas >=2 還有可能就是發送有異常:對異常進行處理

13.7 自定義分區

分區:1、沒有設置 key 我們的消息就會被輪訓的發送到不同的分區。2、設置了 keykafka 自帶的分區器,會根據 key 計算出來一個 hash 值,這個 hash 值會對應某一個分區。如果 key 相同的,那麼 hash 值必然相同,key 相同的值,必然是會被髮送到同一個分區。但是有些比較特殊的時候,我們就需要自定義分區

public class HotDataPartitioner implements Partitioner {
private Random random;
@Override
public void configure(Map<String, ?> configs) {
random = new Random();
}
@Override
public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String key = (String)keyObj;
List partitionInfoList = cluster.availablePartitionsForTopic(topic);
//獲取到分區的個數 0,1,2
int partitionCount = partitionInfoList.size();
//最後一個分區
int hotDataPartition = partitionCount - 1;
return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;
}
}

如何使用:配置上這個類即可:props.put(”partitioner.class”, “com.zhss.HotDataPartitioner”);

13.8 綜合案例演示

14.1 消費組概念 groupid 相同就屬於同一個消費組 1)每個 consumer 都要屬於一個 consumer.group,就是一個消費組,topic 的一個分區只會分配給 一個消費組下的一個 consumer 來處理,每個 consumer 可能會分配多個分區,也有可能某個 consumer 沒有分配到任何分區 2)如果想要實現一個廣播的效果,那隻需要使用不同的 group id 去消費就可以。topicA: partition0、partition1 groupA:consumer1: 消費 partition0 consuemr2: 消費 partition1 consuemr3: 消費不到數據 groupB: consuemr3: 消費到 partition0 和 partition1 3)如果 consumer group 中某個消費者掛了,此時會自動把分配給他的分區交給其他的消費者,如果他又重啓了,那麼又會把一些分區重新交還給他

14、Kafka 消費者

14.1 消費組概念

groupid 相同就屬於同一個消費組 1)每個 consumer 都要屬於一個 consumer.group,就是一個消費組,topic 的一個分區只會分配給 一個消費組下的一個 consumer 來處理,每個 consumer 可能會分配多個分區,也有可能某個 consumer 沒有分配到任何分區 2)如果想要實現一個廣播的效果,那隻需要使用不同的 group id 去消費就可以。topicA: partition0、partition1 groupA:consumer1: 消費 partition0 consuemr2: 消費 partition1 consuemr3: 消費不到數據 groupB: consuemr3: 消費到 partition0 和 partition1 3)如果 consumer group 中某個消費者掛了,此時會自動把分配給他的分區交給其他的消費者,如果他又重啓了,那麼又會把一些分區重新交還給他

14.2 基礎案例演示

14.3 偏移量管理

  1. 每個 consumer 內存裏數據結構保存對每個 topic 的每個分區的消費 offset,定期會提交 offset,老版本是寫入 zk,但是那樣高併發請求 zk 是不合理的架構設計,zk 是做分佈式系統的協調的,輕量級的元數據存儲,不能負責高併發讀寫,作爲數據存儲。

  2. 現在新的版本提交 offset 發送給 kafka 內部 topic:__consumer_offsets,提交過去的時候, key 是 group.id+topic + 分區號,value 就是當前 offset 的值,每隔一段時間,kafka 內部會對這個 topic 進行 compact(合併),也就是每個 group.id+topic + 分區號就保留最新數據。

  3. __consumer_offsets 可能會接收高併發的請求,所以默認分區 50 個 (leader partitiron -> 50 kafka),這樣如果你的 kafka 部署了一個大的集羣,比如有 50 臺機器,就可以用 50 臺機器來抗 offset 提交的請求壓力. 消費者 -> broker 端的數據 message -> 磁盤 -> offset 順序遞增 從哪兒開始消費?-> offset 消費者(offset)

14.4 偏移量監控工具介紹

  1. web 頁面管理的一個管理軟件 (kafka Manager) 修改 bin/kafka-run-class.sh 腳本,第一行增加 JMX_PORT=9988 重啓 kafka 進程

  2. 另一個軟件:主要監控的 consumer 的偏移量。就是一個 jar 包 java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb –offsetStorage kafka \(根據版本:偏移量存在 kafka 就填 kafka,存在 zookeeper 就填 zookeeper) –zk hadoop1:2181 –port 9004 –refresh 15.seconds –retain 2.days。

14.5 消費異常感知

heartbeat.interval.ms:consumer 心跳時間間隔,必須得與 coordinator 保持心跳才能知道 consumer 是否故障了, 然後如果故障之後,就會通過心跳下發 rebalance 的指令給其他的 consumer 通知他們進行 rebalance 的操作 session.timeout.ms:kafka 多長時間感知不到一個 consumer 就認爲他故障了,默認是 10 秒 max.poll.interval.ms:如果在兩次 poll 操作之間,超過了這個時間,那麼就會認爲這個 consume 處理能力太弱了,會被踢出消費組,分區分配給別人去消費,一般來說結合業務處理的性能來設置就可以了。

14.6 核心參數解釋

fetch.max.bytes:獲取一條消息最大的字節數,一般建議設置大一些,默認是 1M 其實我們在之前多個地方都見到過這個類似的參數,意思就是說一條信息最大能多大?

  1. Producer 發送的數據,一條消息最大多大, -> 10M

  2. Broker 存儲數據,一條消息最大能接受多大 -> 10M

  3. Consumer max.poll.records: 一次 poll 返回消息的最大條數,默認是 500 條 connection.max.idle.ms:consumer 跟 broker 的 socket 連接如果空閒超過了一定的時間,此時就會自動回收連接,但是下次消費就要重新建立 socket 連接,這個建議設置爲 - 1,不要去回收 enable.auto.commit: 開啓自動提交偏移量 auto.commit.interval.ms: 每隔多久提交一次偏移量,默認值 5000 毫秒 _consumer_offset auto.offset.reset:earliest 當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset 時,從頭開始消費 topica -> partition0:1000 partitino1:2000 latest 當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset 時,消費新產生的該分區下的數據 none topic 各分區都存在已提交的 offset 時,從 offset 後開始消費;只要有一個分區不存在已提交的 offset,則拋出異常

14.7 綜合案例演示

引入案例:二手電商平臺(歡樂送),根據用戶消費的金額,對用戶星星進行累計。訂單系統(生產者) -> Kafka 集羣裏面發送了消息。會員系統(消費者) -> Kafak 集羣裏面消費消息,對消息進行處理。

14.8 group coordinator 原理

面試題:消費者是如何實現 rebalance 的?— 根據 coordinator 實現

  1. 什麼是 coordinator 每個 consumer group 都會選擇一個 broker 作爲自己的 coordinator,他是負責監控這個消費組裏的各個消費者的心跳,以及判斷是否宕機,然後開啓 rebalance 的

  2. 如何選擇 coordinator 機器 首先對 groupId 進行 hash(數字),接着對__consumer_offsets 的分區數量取模,默認是 50,_consumer_offsets 的分區數可以通過 offsets.topic.num.partitions 來設置,找到分區以後,這個分區所在的 broker 機器就是 coordinator 機器。比如說:groupId,“myconsumer_group” -> hash 值(數字)-> 對 50 取模 -> 8 __consumer_offsets 這個主題的 8 號分區在哪臺 broker 上面,那一臺就是 coordinator 就知道這個 consumer group 下的所有的消費者提交 offset 的時候是往哪個分區去提交 offset,

  3. 運行流程 1)每個 consumer 都發送 JoinGroup 請求到 Coordinator, 2)然後 Coordinator 從一個 consumer group 中選擇一個 consumer 作爲 leader, 3)把 consumer group 情況發送給這個 leader, 4)接着這個 leader 會負責制定消費方案, 5)通過 SyncGroup 發給 Coordinator 6)接着 Coordinator 就把消費方案下發給各個 consumer,他們會從指定的分區的 leader broker 開始進行 socket 連接以及消費消息

14.9 rebalance 策略

consumer group 靠 coordinator 實現了 Rebalance

這裏有三種 rebalance 的策略:range、round-robin、sticky

比如我們消費的一個主題有 12 個分區:p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11 假設我們的消費者組裏面有三個消費者

  1. range 策略 range 策略就是按照 partiton 的序號範圍 p0~3 consumer1 p4~7 consumer2 p8~11 consumer3 默認就是這個策略;

  2. round-robin 策略 就是輪詢分配 consumer1:0,3,6,9 consumer2:1,4,7,10 consumer3:2,5,8,11 但是前面的這兩個方案有個問題:12 -> 2 每個消費者會消費 6 個分區

假設 consuemr1 掛了: p0-5 分配給 consumer2,p6-11 分配給 consumer3 這樣的話,原本在 consumer2 上的的 p6,p7 分區就被分配到了 consumer3 上。

  1. sticky 策略 最新的一個 sticky 策略,就是說盡可能保證在 rebalance 的時候,讓原本屬於這個 consumer 的分區還是屬於他們,然後把多餘的分區再均勻分配過去,這樣儘可能維持原來的分區分配的策略

consumer1:0-3 consumer2: 4-7 consumer3: 8-11 假設 consumer3 掛了 consumer1:0-3,+8,9 consumer2: 4-7,+10,11

15、Broker 管理

15.1 Leo、hw 含義

  1. Kafka 的核心原理

  2. 如何去評估一個集羣資源

  3. 搭建了一套 kafka 集羣 -》 介紹了簡單的一些運維管理的操作。

  4. 生產者(使用,核心的參數)

  5. 消費者(原理,使用的,核心參數)

  6. broker 內部的一些原理

核心的概念:LEO,HW LEO:是跟 offset 偏移量有關係。

LEO:在 kafka 裏面,無論 leader partition 還是 follower partition 統一都稱作副本(replica)。

每次 partition 接收到一條消息,都會更新自己的 LEO,也就是 log end offset,LEO 其實就是最新的 offset + 1

HW:高水位 LEO 有一個很重要的功能就是更新 HW,如果 follower 和 leader 的 LEO 同步了,此時 HW 就可以更新 HW 之前的數據對消費者是可見,消息屬於 commit 狀態。HW 之後的消息消費者消費不到。

15.2 Leo 更新

15.3 hw 更新

15.4 controller 如何管理整個集羣

1: 競爭 controller 的 /controller/id 2:controller 服務監聽的目錄:/broker/ids/ 用來感知 broker 上下線 /broker/topics/ 創建主題,我們當時創建主題命令,提供的參數,ZK 地址。/admin/reassign_partitions 分區重分配 ……

15.5 延時任務

kafka 的延遲調度機制(擴展知識) 我們先看一下 kafka 裏面哪些地方需要有任務要進行延遲調度。第一類延時的任務:比如說 producer 的 acks=-1,必須等待 leader 和 follower 都寫完才能返回響應。有一個超時時間,默認是 30 秒(request.timeout.ms)。所以需要在寫入一條數據到 leader 磁盤之後,就必須有一個延時任務,到期時間是 30 秒延時任務 放到 DelayedOperationPurgatory(延時管理器)中。假如在 30 秒之前如果所有 follower 都寫入副本到本地磁盤了,那麼這個任務就會被自動觸發甦醒,就可以返回響應結果給客戶端了, 否則的話,這個延時任務自己指定了最多是 30 秒到期,如果到了超時時間都沒等到,就直接超時返回異常。第二類延時的任務:follower 往 leader 拉取消息的時候,如果發現是空的,此時會創建一個延時拉取任務 延時時間到了之後(比如到了 100ms),就給 follower 返回一個空的數據,然後 follower 再次發送請求讀取消息, 但是如果延時的過程中 (還沒到 100ms),leader 寫入了消息,這個任務就會自動甦醒,自動執行拉取任務。

海量的延時任務,需要去調度。

15.6 時間輪機制

  1. 什麼會有要設計時間輪?Kafka 內部有很多延時任務,沒有基於 JDK Timer 來實現,那個插入和刪除任務的時間複雜度是 O(nlogn), 而是基於了自己寫的時間輪來實現的,時間複雜度是 O(1),依靠時間輪機制,延時任務插入和刪除,O(1)

  2. 時間輪是什麼?其實時間輪說白其實就是一個數組。tickMs: 時間輪間隔 1ms wheelSize:時間輪大小 20 interval:timckMS * whellSize,一個時間輪的總的時間跨度。20ms currentTime:當時時間的指針。a: 因爲時間輪是一個數組,所以要獲取裏面數據的時候,靠的是 index,時間複雜度是 O(1) b: 數組某個位置上對應的任務,用的是雙向鏈表存儲的,往雙向鏈表裏面插入,刪除任務,時間複雜度也是 O(1) 舉例:插入一個 8ms 以後要執行的任務 19ms 3. 多層級的時間輪 比如:要插入一個 110 毫秒以後運行的任務。tickMs: 時間輪間隔 20ms wheelSize:時間輪大小 20 interval:timckMS * whellSize,一個時間輪的總的時間跨度。20ms currentTime:當時時間的指針。第一層時間輪:1ms * 20 第二層時間輪:20ms * 20 第三層時間輪:400ms * 20

轉自:數據社

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/V1KSIrIURCsrX7fCmq2NtQ