kafka 原理看這一篇就夠了
爲何使用消息隊列
異步。 接口方式實現多個系統協作,如圖 A 系統作爲用戶請求接收方,需要調用多個系統的接口,這些接口還有可能是在 A 系統裏同步調用,所以最後的接口耗時是多個系統接口耗時的總和;mq 方式則可以異步發送消息給 mq,mq 再發送給其他多個系統,多個系統並行且異步的接收消息。當然,mq 方式實現有一個前提是用戶的請求不需要立即返回請求結果,例如用戶發送一個查詢請求就不適合 mq 方式。mq 方式多用於傳遞事件,如發送優惠券、秒殺等。
削峯。 用戶的請求大部分都集中在固定的時間段,而在晚間凌晨或者用戶使用低峯期基本沒什麼請求。所以 mq 的削峯就是爲了將高峯期的請求泄洪一部分到低峯期。
解耦。 接口方式發送消息,發送者調用接口,接收者提供接口,此時發送者作爲消息生產者(如圖中的 A 系統)作爲主動的一方需要適配上游的各個類型的接口,它們的傳輸協議、參數、返回值等可能都不一樣,同時各個接收方還不能拒收消息,這些都會帶來極大的工作量;mq 方式發送消息,消息發送者變成了上游,現在只需要將統一格式的消息發送給 mq,由 mq 來控制消息的存儲、容災以及消息是否送達等。消息接收者則遵守消息的統一格式即可,如果不想接收消息可以取消訂閱。這樣就達到了生產者和消費者之間的解耦效果。
kafka 的總體架構
Producer: 消息的生產者,即消息的入口。
Broker: kafka 的一個實例,一臺 kafka 服務器上會有一個或多個實例。多臺 kafka 服務構成了 kafka 的集羣。
Topic: 消息的主題,生產者按照主題發送消息,消費者按照主題接收消息,一個 Broker 可以有多個主題。
Partition: Topic 的分區,一個 Topic 可以有多個分區,分區越多可以並行處理消息的能力越強。同一個 Topic 上的不同分區消息是不重複的,Partition 的本質是文件夾。
Replication: Partition 的副本,副本用來做數據備份。副本分爲主分區副本(Leader)和從分區副本(Follower),它們不能同時出現在一個 Broker 上。主分區副本負責消息的接收並寫入,從分區副本不接收生產者發來的消息,它的唯一職責就是從主分區副本同步過來消息。當主分區副本掛掉的時候,會在從分區副本中選出一個新的 Leader 作爲主分區副本。kafka 中一個 Partition 的最大副本數量是 10 個,且副本數量不能大於 Broker 的數量。
Consumer: 消息的消費者,即消息的出口。
Consumer Group: 多個消費者組成一個消費組,消費組之間可以重複消費消息。同一個消費組的某一個 Partition 不能同時被多個消費者消費。
Zookeeper: kafka 集羣依賴 Zookeeper 保存集羣的元信息,以保證 kafka 集羣的可靠性。kafka 從 2.8 版本以後使用其內部的 Quorum 控制器來代替 Zookeeper。
生產者寫數據
生產者發送消息給 Leader 分區副本,並順序寫入到磁盤文件,然後 Follower 分區副本從 Leader 分區副本 poll 消息以保證數據是最新的。kafka 將消息寫入哪個分區有幾下幾個原則:
-
生產者指定了分區,寫入對應的分區
-
生產者沒有指定分區,但設置了數據庫的 key,根據 key 的 hash 值算出一個分區
-
生產者既沒有指定分區,也沒有設置 key,輪詢出一個分區
topic 本質是一個目錄,而 topic 又是由一些 Partition Logs(分區日誌) 組成。消息採用 hash 取模的分區算法有序的寫入到 Partitionp Log 上。
producer 在將消息寫入 partition 之前會先在內存中緩存,累計到一定量後(按數量、按時間間隔或按數據大小),再批量寫入。
一般一條消息大概 1~10kB,推薦不要超過 1MB。
kafka 默認數據保留 7 天時間。如果數據量大可以修改配置(log.retention.hours)將時間縮短。
消費者讀數據
與生產者一樣,消費者主動的從 Leader 分區副本拉取消息。每成功拉取一條 partition 的消息,partition 的消息遊標卡尺(offset)就會加 1。
partition 裏的 offset 默認配置是從最新一條開始消費,也可以配置 from beginning 從 0 開始消費。
在同一個消費組裏,消費者和 partition 的關係是 1:1 或者 1:n,不能出現消費者與 partition 是 n:1 的情況,意思是同一個消費組裏消費者數量要小於等於 parition 的數量。因爲不這樣做就會造成多個消費者共享一個 offset,從而就不能保證一個 partition 內的消息的順序性,也會造成消息被重複消費的安全問題,這是一種不穩定的重複消費。
如果想要穩定的重複消費同一條消息,可以設置兩個消費組。兩個組內的消費者消費同一個 partition 時,offset 是相互獨立的。
消息的有序性
想要保證消息被消費的有序性,有以下兩個方法:
-
一個 topic 只設置一個 partition。缺點是消費組裏只能有一個消費者消費,不適用高併發場景。
-
producer 將需要保證順序的消息發送到同一個 partition。兩種方式指定:1、指定 partition;2、不指定 partition,根據 key 的 hash 值運算後得到 partition。
消息的可靠性
kafka 的數據是可持久化的寫在 Partition Log 文件裏。每個 topic 都可以設置副本數量。副本數量決定了有幾個 broker 來存放寫入的數據。
consumer 和 partition 數量的關係是:partition 數 >= 同一個消費組裏的 consumer 數。因爲一個 partition 只能被同一個消費組的一個 consumer 消費(但一個 consumer 可以消費多個 partition)。這是爲了消息在一個 partition 裏的順序讀。
生產端消息可靠性
分區副本
所有的讀寫請求都發往 leader 副本所在的 broker,follower 副本不處理客戶端請求,它唯一的任務就是從 leader 副本異步拉取消息。
Kafka 默認的副本因子是 3,即每個分區只有 1 個 leader 副本和 2 個 follower 副本。
同步副本 (In-sync replicas)
ISR 同步副本機制是用來判斷 follower 是否同步了 leader 的最新數據。
ISR 列表保存了與 leader 已經同步的副本,leader 自己是長期存在於 ISR 列表。當 follower 副本超過設定的時間間隔(replica.lag.time.max.ms)沒有和 leader 同步,就會被踢出 ISR 列表,反之則不會被踢出。
acks 參數(生產者配置)
acks 參數,表示有多少的分區副本收到消息,才能認爲消息是寫入成功的。
- acks=0。不需要副本收到消息,producer 就能收到 broker 的響應。該模式吞吐量高,但安全性低,容易丟消息。
- acks=1(默認)。只要 leader 副本接收到了消息並寫入到磁盤,producer 就能收到 broker 的響應。需要注意的是這種模式依然會有丟消息的安全問題。例如,當 leader 副本收到消息以後還沒來得及同步副本到 follower 就宕機了,此時 producer 已經收到了成功的響應,但 follower 變爲新的 leader 時還未將最新的那條消息同步過來。
- acks=all(或 - 1)。只有 ISR 列表裏的所有分區副本都收到消息,producer 才能收到 broker 的響應。該模式延遲最高。
acks=all 模式下,有一個最小副本配置(min.insync.replicas)。該配置默認值是 1,只在 acks=all 時生效。該參數控制消息最少被多少個副本寫入纔算成功寫入。即 ISR 列表的副本最小數量。因爲 ISR 列表始終要有 leader 副本,所以如果該配置默認是 1,實際上是起不到副本作用的,所以該配置最好配置爲大於 1 的數。
當 leader 副本宕機時,acks=all 模式下,會在 ISR 列表中選舉一個新的 broker 作爲 leader。
-
增大 min.insync.replicas。可以增加數據的可靠性。
-
減小 min.insync.replicas。可以增加系統的可用性。
消費端消息可靠性
要想實現消費端的消息可靠性,必須抓住兩點:
-
保證消息到達的狀態(offset)和本地事務的狀態保持一致。
-
保證消費的冪等性。
要想保證消費端消息的可靠性,首先必須保證提交 offset 和提交本地事務要麼一起成功,要麼一起失敗。我們以自動提交 offset 和手動提交 offset 分別舉例說明。
-
自動提交 offset。消息到達消費客戶端,不論本地事務是否提交成功,offset 都會自動提交。一旦本地事務提交失敗,就會造成消息丟失的問題。
-
手動提交 offset。有三種方法:
-
第一種方式是消費端 KafkaListener 不配置本地事務,業務代碼執行完後數據入庫,最後再提交 offset,即使 offset 提交失敗,只要保證業務代碼的冪等性,消息重複消費也可以接受。
-
第二種方式是消費端 KafkaListener 配置本地事務,將 offset 的值寫道數據庫裏和業務數據一起提交,這樣就將業務數據和 offset 做了綁定關係,在消費一開始就根據業務 id 和 offset 判斷消息是否消費過,如果沒有消費過才執行業務代碼。
-
第三種方式是前兩種方式的結合,這種方式不需要將 offset 入庫。該方法在消費端 KafkaListener 配置本地事務,先執行業務代碼最後執行 offset 提交,這樣業務代碼失敗就不會執行提交 offset 的代碼;而如果是最後提交 offset 失敗,本地事務也會回滾。
-
在實際的運用中,考慮到數據庫事務相對性能較差,可以把本地事務和 offset 的綁定關係用緩存來保存。
kafka 優化
kafka 削峯的幾種方法:
-
增加分區。增加分區數可以提高消息並行處理的能力。當然也會增加集羣的維護成本,需要權衡。
-
使用消費組。使用消費組可以讓多個消費者並行消費一個 partition 的消息,因爲每個消費組在同一個 partition 的 offset 不是共享的。但是爲了避免重複消費消息,需要爲不同消費組上的多個消費者指定所消費消息的 key。
-
增加副本數。可以提高 kafka 的吞吐量,提升 kafka 的可靠性和容錯性。
此外,修改一些 kafka 配置參數也能達到一定的優化效果。例如,
-
爲了減少每次發送 / 拉取消息的次數,可以提高消息發送 / 拉取的消息數量 / 數據大小的閾值,或者增加時間間隔。減少消息發送 / 拉取的次數意味着一次發送 / 拉取的量比較大,所以還要注意提高會話超時、拉取超時的時間間隔,以免觸發 rebalance。
-
減小並行度(concurrency)。當 concurrency=3 時,就會有 4*3=12 個 Consumer 線程,12 個 Listener 線程。減小 concurrency 可以減少客戶端線程數量。
kafka 和 rocketmq
目前消息隊列用的比較多的就是 kafka 和 rocketmq 了。我們可以比較一下這兩種消息隊列的優缺點。
- 適用場景
topic 較多時推薦使用 rocketmq;topic 少時 kafka 性能更佳。因爲 kafka 一個 topic 一個 partition 文件,rocketmq 是多個 topic 一個文件。
kafka 適合日誌處理、大數據領域;rocketmq 適合業務處理。
- 性能
kafka 的 tps 在百萬條 / 秒;rocketmq 大約 10 萬條 / 秒。
- 可靠性
kafka 異步刷盤,異步副本;recketmq 異步 / 同步刷盤,異步 / 同步副本。
- 支持隊列數量
kafka 單機最大支持 64 個隊列 / 分區,增加分區性能降低嚴重;rocketmq 單機最大支持 5w 隊列,性能穩定。
- 消息順序性
kafka 在同一個 partition 下支持消息順序性,但如果一臺 broker 宕機會打亂順序;rocketmq 支持消息順序性,一臺 broker 宕機消息會發送失敗,但順序性依然可以保證。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/8MJiISheeampw7pSW8t2ww