KafKa 存儲機制

Kafka 是爲了解決大數據的實時日誌流而生的, 每天要處理的日誌量級在千億規模。對於日誌流的特點主要包括 :

  1. 數據實時產生 2. 海量數據存儲與處理

所以它必然要面臨分佈式系統遇到的高併發、高可用、高性能等三高問題。

對於 Kafka 的存儲需要保證以下幾點:

  1. 存儲的主要是消息流(可以是簡單的文本格式也可以是其他格式,對於 Broker 存儲來說,它並不關心數據本身) 2. 要支持海量數據的高效存儲、高持久化(保證重啓後數據不丟失) 3. 要支持海量數據的高效檢索(消費的時候可以通過 offset 或者時間戳高效查詢並處理) 4. 要保證數據的安全性和穩定性、故障轉移容錯性

kafka 存儲選型


從上圖性能測試的結果看出普通機械磁盤的順序 I/O 性能指標是 53.2M values/s,而內存的隨機 I/O 性能 指標是 36.7M values/s。由此似乎可以得出結論:磁盤的順序 I/O 性能要強於內存的隨機 I/O 性能。

另外,如果需要較高的存儲性能,必然是提高讀速度和寫速度:

  1. 提高讀速度:利用索引,來提高查詢速度,但是有了索引,大量寫操作都會維護索引,那麼會降低寫入效率。常見的如關係型數據庫:mysql 等 2. 提高寫速度:這種一般是採用日誌存儲, 通過順序追加寫的方式來提高寫入速度,因爲沒有索引,無法快速查詢,最嚴重的只能一行行遍歷讀取。常見的如大數據相關領域的基本都基於此方式來實現。

Kafka 存儲方案剖析

對於 Kafka 來說, 它主要用來處理海量數據流,這個場景的特點主要包括:

  1. 寫操作:寫併發要求非常高,基本得達到百萬級 TPS,順序追加寫日誌即可,無需考慮更新操作

  2. 讀操作:相對寫操作來說,比較簡單,只要能按照一定規則高效查詢即可(offset 或者時間戳)

對於寫操作來說,直接採用順序追加寫日誌的方式就可以滿足 Kafka 對於百萬 TPS 寫入效率要求。所以我們重點放在如何解決高效查詢這些日誌。Kafka 採用了稀疏哈希索引(底層基於 Hash Table 實現)的方式

把消息的 Offset 設計成一個有序的字段,這樣消息在日誌文件中也就有序存放了,也不需要額外引入哈 希表結構, 可以直接將消息劃分成若干個塊,對於每個塊,我們只需要索引當前塊的第一條消息的 Offset (類似二分查找算法的原理),即先根據 Offset 大小找到對應的塊, 然後再從塊中順序查找, 這樣就可以快速定位到要查找的消息。

由於生產者生產的消息會不斷追加到 log 文件末尾,爲防止 log 文件過大導致數據定位效率低下,Kafka 採取了分片和索引機制。 它將每個 Partition 分爲多個 Segment,每個 Segment 對應兩個文件:“.index” 索引文件和 “.log” 數據文件。這些文件位於同一文件下,該文件夾的命名規則爲:topic 名 - 分區號。例如,test 這個 topic 有三分分 區,則其對應的文件夾爲 test-0,test-1,test-2。

index 和 log 文件以當前 Segment 的第一條消息的 Offset 命名。下圖爲 index 文件和 log 文件的結構示意圖:

“.index” 文件存儲大量的索引信息               “.log” 文件存儲大量的數據,

索引文件中的元數據指向對應數據文件中 Message 的物理偏移量。

查看索引:./kafka-dump-log.sh --files /tmp/kafka-logs/test-1/00000000000000000000.index

kafka 存儲架構設計


從上分析我們可以知道: Kafka 最終的存儲實現方案, 即基於順序追加寫日誌 + 稀疏哈希索引。

Kafka 日誌存儲結構:

從上圖可以看出 Kafka 是基於「主題 + 分區 + 副本 + 分段 + 索引」的結構:

  1. kafka 中消息是以主題 Topic 爲基本單位進行歸類的,這裏的 Topic 是邏輯上的概念,實際上在磁盤存儲是根據分區 Partition 存儲的, 即每個 Topic 被分成多個 Partition,分區 Partition 的數量可以在主題 Topic 創建的時候進行指定。 2. Partition 分區主要是爲了解決 Kafka 存儲的水平擴展問題而設計的, 如果一個 Topic 的所有消息都只存儲到一個 Kafka Broker 上的話, 對於 Kafka 每秒寫入幾百萬消息的高併發系統來說,這個 Broker 肯定會出現瓶頸, 故障時候不好進行恢復,所以 Kafka 將 Topic 的消息劃分成多個 Partition, 然後均衡的分佈到整個 Kafka Broker 集羣中。 3. Partition 分區內每條消息都會被分配一個唯一的消息 id, 即我們通常所說的偏移量 Offset, 因此 kafka 只能保證每個分區內部有序性, 並不能保證全局有序性。 4. 然後每個 Partition 分區又被劃分成了多個 LogSegment,這是爲了防止 Log 日誌過大,Kafka 又引入了日誌分段 (LogSegment) 的概念,將 Log 切分爲多個 LogSegement,相當於一個巨型文件被平均分割爲一些相對較小的文件,這樣也便於消息的查找、維護和清理。這樣在做歷史數據清理的時候,直接刪除舊的 LogSegement 文件就可以了。 5. Log 日誌在物理上只是以文件夾的形式存儲,而每個 LogSegement 對應磁盤上的一個日誌文件和兩個索引文件,以及可能的其他文件(比如以 ".snapshot" 爲後綴的快照索引文件等)

kafka 日誌系統架構設計


再來研究 topic->partition 的關係

kafka 消息是按主題 Topic 爲基礎單位歸類的,各個 Topic 在邏輯上是獨立的,每個 Topic 又可以分爲一個或者多個 Partition,每條消息在發送的時候會根據分區規則被追加到指定的分區中,如下圖所示:

4 個分區的主題邏輯結構圖

日誌目錄佈局

Kafka 消息寫入到磁盤的日誌目錄佈局是怎樣的?Log 對應了一個命名爲 - 的文件夾。舉個例子,假設現在有一個名爲 “topic-order” 的 Topic,該 Topic 中 有 4 個 Partition,那麼在實際物理存儲上表現爲“topic-order-0”、“topic-order-1”、“topic-order-2”、 “topic-order-3” 這 4 個文件夾。

Log 中寫入消息是順序寫入的。但是隻有最後一個 LogSegement 才能執行寫入操作,之前的所有 LogSegement 都不能執行寫入操作。爲了更好理解這個概念,我們將最後一個 LogSegement 稱 爲 "activeSegement",即表示當前活躍的日誌分段。隨着消息的不斷寫入,當 activeSegement 滿足一定的條件時,就需要創建新的 activeSegement,之後再追加的消息會寫入新的 activeSegement。

爲了更高效的進行消息檢索,每個 LogSegment 中的日誌文件(以 “.log” 爲文件後綴)都有對應的幾個索引文件:偏移量索引文件(以 “.index” 爲文件後綴)、時間戳索引文件(以 “.timeindex” 爲文件後綴)、快照索引文件 (以 “.snapshot” 爲文件後綴)。其中每個 LogSegment 都有一個 Offset 來作爲 基準偏移量(baseOffset),用來表示當前 LogSegment 中第一條消息的 Offset。偏移量是一個 64 位的 Long 長整型數,日誌文件和這幾個索引文件都是根據基準偏移量(baseOffset)命名的,名稱固定爲 20 位數字,沒有達到的位數前面用 0 填充。比如第一個 LogSegment 的基準偏移量爲 0,對應的日誌文件 爲 00000000000000000000.log。

上面例子中 LogSegment 對應的基準位移是 12768089,也說明了當前 LogSegment 中的第一條消息的偏移量爲 12768089,同時可以說明前一個 LogSegment 中共有 12768089 條消息(偏移量從 0 至 12768089 的消息)。再如:

注意每個 LogSegment 中不只包含 “.log”、“.index”、“.timeindex” 這幾種文件,還可能包含 “.snapshot”、“.txnindex”、“leader-epoch-checkpoint”等文件, 以及 “.deleted”、“.cleaned”、 “.swap”等臨時文件。

消費者消費的時候,會將提交的位移保存在 Kafka 內部的主題__consumer_offsets 中,下面來看一個整體的日誌目錄結構圖:

磁盤數據存儲


我們知道 Kafka 是依賴文件系統來存儲和緩存消息,以及典型的順序追加寫日誌操作,另外它使用操作系統的 PageCache 來減少對磁盤 I/O 操作,即將磁盤的數據緩存到內存中,把對磁盤的訪問轉變爲對內存的訪問。

在 Kafka 中,大量使用了 PageCache, 這也是 Kafka 能實現高吞吐的重要因素之一, 當一個進程準備讀取磁盤上的文件內容時,操作系統會先查看待讀取的數據頁是否在 PageCache 中,如果命中則直接返回數據,從而避免了對磁盤的 I/O 操作;如果沒有命中,操作系統則會向磁盤發起讀取請求並將讀取的數據頁存入 PageCache 中,之後再將數據返回給進程。同樣,如果一個進程需要將數據寫入磁盤,那麼操作系統也會檢查數據頁是否在頁緩存中,如果不存在,則 PageCache 中添加相應的數據頁,最後將數據寫入對應的數據頁。被修改過後的數據頁也就變成了髒頁,操作系統會在合適的時間把髒頁中的數據寫入磁盤,以保持數據的一致性。

除了消息順序追加寫日誌、PageCache 以外, kafka 還使用了零拷貝(Zero-Copy)技術來進一步提 升系統性能, 如下圖所示:

消息從生產到寫入磁盤的整體過程如下圖所示:

可靠性


可靠性相關的問題:

我發消息的時候,需要等 ack 嘛? 我發了消息之後,消費者一定會收到嘛? 遇到各種故障時,我的消息會不會丟? 消費者側會收到多條消息嘛?消費者 svr 重啓後消息會丟失嘛?

Kafka 從拓撲上分有如下角色:

Consumer: 消費者,一般以 API 形式存在於各個業務 svr 中 Producer: 生產者,一般以 API 形式存在於各個業務 svr 中 Kafka broker: kafka 集羣中的服務器,topic 裏的消息數據存在上面

Producer 採用發送 push 的方式將消息發到 broker 上,broker 存儲後。由 consumer 採用 pull 模式訂閱並消費消息

Producer 的可靠性保證

回答生產者的可靠性保證,即回答:

  1. 發消息之後有沒有 ack

  2. 發消息收到 ack 後,是不是消息就不會丟失了而 Kafka 通過配置來指定 producer 生產者在發送消息時的 ack 策略:

# -1(全量同步確認,強可靠性保證)
Request.required.acks= -1
# 1(leader 確認收到, 默認)
Request.required.acks = 1
# 0(不確認,但是吞吐量大)
Request.required.acks = 0

kafka 配置爲 CP 系統

如果想實現 kafka 配置爲 CP(Consistency & Partition tolerance) 系統, 配置需要如下:

request.required.acks=-1
min.insync.replicas = ${N/2 + 1}     N: follower的個數
unclean.leader.election.enable = false

如圖所示,在 acks=-1 的情況下,新消息只有被 ISR 中的所有 follower(f1 和 f2, f3) 都從 leader 複製過 去纔會回 ack, ack 後,無論那種機器故障情況 (全部或部分), 寫入的 msg4,都不會丟失, 消息狀態滿足 一致性 C 要求。

正常情況下,所有 follower 複製完成後,leader 回 producer ack。

異常情況下,如果當數據發送到 leader 後部分副本 (f1 和 f2 同步), leader 掛了?此時任何 follower 都有可能變成新的 leader, producer 端會得到返回異常,producer 端會重新發送數據,但這樣數據可能會重複 (但不會丟失), 暫不考慮數據重複的情況。

min.insync.replicas 參數用於保證當前集羣中處於正常同步狀態的副本 follower 數量,當實際值小於配置值時,集羣停止服務。如果配置爲 N/2+1, 即多一半的數量,則在滿足此條件下,通過算法保證強一致性。當不滿足配置數時,犧牲可用性即停服。

如果選舉 f3 爲新 leader, 則可能會發生消息截斷,因爲 f3 還未同步 msg4 的數據。Kafka 的通 unclean.leader.election.enable 來控制在這種情況下,是否可以選舉 f3 爲 leader。舊版本中默認爲 true, 在某個版本下已默認爲 false,避免這種情況下消息截斷的出現。

通過 ack 和 min.insync.replicas 和 unclean.leader.election.enable 的配合,保證在 kafka 配置爲 CP 系統時,要麼不工作,要麼得到 ack 後,消息不會丟失且消息狀態一致。

kafka 配置爲 AP 系統

如果想實現 kafka 配置爲 AP(Availability & Partition tolerance) 系統:

request.required.acks=1 min.insync.replicas = 1 unclean.leader.election.enable = false

當配置爲 acks=1 時,即 leader 接收消息後回 ack,這時會出現消息丟失的問題:如果 leader 接受到了 第 4 條消息,此時還沒有同步到 follower 中,leader 機器掛了,其中一個 follower 被選爲 leader, 則第 4 條消息丟失了。當然這個也需要 unclean.leader.election.enable 參數配置爲 false 來配合。但是 leader 回 ack 的情況下,follower 未同步的概率會大大提升。

通過 producer 策略的配置和 kafka 集羣通用參數的配置,可以針對自己的業務系統特點來進行合理的參數配置,在通訊性能和消息可靠性下尋得某種平衡。

Broker 的可靠性保證

消息通過 producer 發送到 broker 之後,還會遇到很多問題:

Partition leader 寫入成功, follower 什麼時候同步? Leader 寫入成功,消費者什麼時候能讀到這條消息? Leader 寫入成功後,leader 重啓,重啓後消息狀態還正常嘛? Leader 重啓,如何選舉新的 leader?

這些問題集中在, 消息落到 broker 後,集羣通過何種機制來保證不同副本建的消息狀態一致性。

LEO 和 HW 簡單介紹

LEO:LogEndOffset 的縮寫,表示每個 partition 的 log 最後一條 Message 的位置。 HW: HighWaterMark 的縮寫,是指 consumer 能夠看到的此 partition 的位置。 取一個 partition 對應的 ISR 中最小的 LEO 作爲 HW,consumer 最多隻能消費到 HW 所在的位置。

下面具體分析一下 ISR 集合和 HW、LEO 的關係。

假設某分區的 ISR 集合中有 3 個副本,即一個 leader 副本和 2 個 follower 副本,此時分區的 LEO 和 HW 都分別爲 3 。消息 3 和消息 4 從生產者出發之後先被存入 leader 副本。

在消息被寫入 leader 副本之後,follower 副本會發送拉取請求來拉取消息 3 和消息 4 進行消息同步。

在同步過程中不同的副本同步的效率不盡相同,在某一時刻 follower1 完全跟上了 leader 副本而 follower2 只同步了消息 3,如此 leader 副本的 LEO 爲 5,follower1 的 LEO 爲 5,follower2 的 LEO 爲 4,那麼當前分區的 HW 取最小值 4,此時消費者可以消費到 offset0 至 3 之間的消息。

當所有副本都成功寫入消息 3 和消息 4 之後,整個分區的 HW 和 LEO 都變爲 5,因此消費者可以消費到 offset 爲 4 的消息了。

由此可見,HW 用於標識消費者可以讀取的最大消息位置,LEO 用於標識消息追加到文件的最後位置。 如果消息發送成功,不代表消費者可以消費這條消息。

Consumer 的可靠性策略

Consumer 的可靠性策略集中在 consumer 的投遞語義上,即:

何時消費,消費到什麼? 按消費是否會丟? 消費是否會重複?

這些語義場景,可以通過 kafka 消費者的而部分參數進行配置,簡單來說有以下 3 中場景:

AutoCommit(at most once, commit 後掛,實際會丟)

enable.auto.commit = true auto.commit.interval.ms

配置如上的 consumer 收到消息就返回正確給 brocker, 但是如果業務邏輯沒有走完中斷了,實際上這個消息沒有消費成功。這種場景適用於可靠性要求不高的業務。其中 auto.commit.interval.ms 代表了自動提交的間隔。比如設置爲 1s 提交 1 次,那麼在 1s 內的故障重啓,會從當前消費 offset 進行重新消費時,1s 內未提交但是已經消費的 msg, 會被重新消費到。

手動 Commit(at least once, commit 前掛,就會重複, 重啓還會丟)

enable.auto.commit = false

配置爲手動提交的場景下,業務開發者需要在消費消息到消息業務邏輯處理整個流程完成後進行手動提交。如果在流程未處理結束時發生重啓,則之前消費到未提交的消息會重新消費到,即消息顯然會投遞多次。此處應用與業務邏輯明顯實現了冪等的場景下使用。

特別應關注到在 golang 中 sarama 庫的幾個參數的配置:

sarama.offset.initial (oldest, newest) offsets.retention.minutes

intitial = oldest 代表消費可以訪問到的 topic 裏的最早的消息,大於 commit 的位置,但是小於 HW。 同時也受到 broker 上消息保留時間的影響和位移保留時間的影響。不能保證一定能消費到 topic 起始位置的消息。

如果設置爲 newest 則代表訪問 commit 位置的下一條消息。如果發生 consumer 重啓且 autocommit 沒有設置爲 false, 則之前的消息會發生丟失,再也消費不到了。在業務環境特別不穩定或非持久化 consumer 實例的場景下,應特別注意。 一般情況下, offsets.retention.minutes 爲 1440s。

Exactly once, 很難,需要 msg 持久化和 commit 是原子的

消息投遞且僅投遞一次的語義是很難實現的。首先要消費消息並且提交保證不會重複投遞,其次提交前 要完成整體的業務邏輯關於消息的處理。在 kafka 本身沒有提供此場景語義接口的情況下,這幾乎是不 可能有效實現的。一般的解決方案,也是進行原子性的消息存儲,業務邏輯異步慢慢的從存儲中取出消息進行處理。

消費組 Reblance


消費者組

消費組指的是多個消費者組成起來的一個組,它們共同消費 topic 的所有消息,並且一個 topic 的一個 partition 只能被一個 consumer 消費。其實 reblance 就是爲了 kafka 對提升消費效率做的優化,規定了 一個 ConsumerGroup 下的所有 consumer 均勻分配訂閱 Topic 的每個分區。

例如:某 Group 下有 20 個 consumer 實例,它訂閱了一個具有 100 個 partition 的 Topic 。正常情況下,kafka 會爲每個 Consumer 平均的分配 5 個分區。這個分配的過程就是 Rebalance。

rebalance 的影響

每次 reblance 會把所有的消費者重新分配監聽 topic,會產生一定影響

首先,Rebalance 過程對 Consumer Group 消費過程有極大的影響。如果你瞭解 JVM 的垃圾回收 機制,你一定聽過萬物靜止的收集方式,即著名的 stop the world,簡稱 STW。在 STW 期間,所 有應用線程都會停止工作,表現爲整個應用程序僵在那邊一動不動。Rebalance 過程也和這個類似,在 Rebalance 過程中,所有 Consumer 實例都會停止消費,等待 Rebalance 完成。這是 Rebalance 爲人詬病的一個方面。

其次,目前 Rebalance 的設計是所有 Consumer 實例共同參與,全部重新分配所有分區。其實更 高效的做法是儘量減少分配方案的變動。例如實例 A 之前負責消費分區 1、2、3,那麼 Rebalance 之後,如果可能的話,最好還是讓實例 A 繼續消費分區 1、2、3,而不是被重新分配 其他的分區。這樣的話,實例 A 連接這些分區所在 Broker 的 TCP 連接就可以繼續用,不用重新創建連接其他 Broker 的 Socket 資源。

最後,Rebalance 實在是太慢了。曾經,有個國外用戶的 Group 內有幾百個 Consumer 實例,成 功 Rebalance 一次要幾個小時!這完全是不能忍受的。最悲劇的是,目前社區對此無能爲力,至 少現在還沒有特別好的解決方案。所謂 “本事大不如不攤上”,也許最好的解決方案就是避免 Rebalance 的發生吧。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/OjVpJ70KZjm5J3jw0YWcfw