一文入門 Kafka

作者:ninetyhe,騰訊 CDG 後臺開發工程師

溫故而知新,反覆學習優秀的框架,定有所獲。因爲工作原因,需要用到 Kafka 的特殊場景,週末再次閱讀了 kafka 的資料,收穫不少。

kafka 由 LinkedIn 公司推出的一個高吞吐的分佈式消息系統,通俗的說就是一個基於發佈和訂閱的消息隊列,官網地址:

https://kafka.apache.org/intro

應用場景

Kafka 拓撲圖(多副本機制)

由上圖我們可以發現 Kafka 是分佈式,同時對於每一個分區都存在多副本,同時整個集羣的管理都通過 zookeeper 管理。

Kafka 核心組件

broker

Kafka 服務器,負責消息存儲和轉發;一 broker 就代表一個 kafka 節點。一個 broker 可以包含多個 topic

topic

消息類別,Kafka 按照 topic 來分類消息

partition

offset

Producer

生產者,負責向 Kafka Broker 發消息的客戶端

Consumer

消息消者,負責消費 Kafka Broker 中的消息

Consumer Group

消費者組,每個 Consumer 必須屬於一個 group;(注意的是 一個分區只能由組內一個消費者消費,消費者組之間互不影響。

Zookeeper

管理 kafka 集羣,負責存儲了集羣 broker、topic、partition 等 meta 數據存儲,同時也負責 broker 故障發現,partition leader 選舉,負載均衡等功能。

服務治理

既然 Kafka 是分佈式的發佈 / 訂閱系統,這樣如果做的集羣之間數據同步和一致性,kafka 是不是肯定不會丟消息呢?以及宕機的時候如果進行 Leader 選舉呢?

數據同步

在 Kafka 中的 Partition 有一個 leader 與多個 follower,producer 往某個 Partition 中寫入數據是,只會往 leader 中寫入數據,然後數據纔會被複制進其他的 Replica 中。而每一個 follower 可以理解成一個消費者,定期去 leader 去拉去消息。而只有數據同步了後,kafka 纔會給生產者返回一個 ACK 告知消息已經存儲落地了。

ISR

在 Kafka 中,爲了保證性能,Kafka 不會採用強一致性的方式來同步主從的數據。而是維護了一個:in-sync Replica 的列表,Leader 不需要等待所有 Follower 都完成同步,只要在 ISR 中的 Follower 完成數據同步就可以發送 ack 給生產者即可認爲消息同步完成。同時如果發現 ISR 裏面某一個 follower 落後太多的話,就會把它剔除。

具體流程如下:

上述的做法並無法保證 kafka 一定不丟消息。 雖然 Kafka 通過多副本機制中最大限度保證消息不會丟失,但是如果數據已經寫入系統 page cache 中但是還沒來得及刷入磁盤,此時突然機器宕機或者掉電,那消息自然而然的就會丟失。

Kafka 故障恢復

Kafka 通過 Zookeeper 連坐集羣的管理,所以這裏的選舉機制採用的是 Zab(zookeeper 使用)。

Kafka 爲什麼這麼快

順序寫磁盤

Kafka 採用了順序寫磁盤,而由於順序寫磁盤相對隨機寫,減少了尋地址的耗費時間。(在 Kafka 的每一個分區裏面消息是有序的。

Page Cache

Kafka 在 OS 系統方面使用了 Page Cache 而不是我們平常所用的 Buffer。Page Cache 其實不陌生,也不是什麼新鮮事物。

我們在 linux 上查看內存的時候,經常可以看到 buff/cache,兩者都是用來加速 IO 讀寫用的,而 cache 是作用於讀,也就是說,磁盤的內容可以讀到 cache 裏面這樣,應用程序讀磁盤就非常快;而 buff 是作用於寫,我們開發寫磁盤都是,一般如果寫入一個 buff 裏面再 flush 就非常快。而 kafka 正是把這兩者發揮了極致:Kafka 雖然是 scala 寫的,但是依舊在 Java 的虛擬機上運行,儘管如此,kafka 它還是儘量避開了 JVM 的限制,它利用了 Page cache 來存儲,這樣躲開了數據在 JVM 因爲 GC 而發生的 STW。另一方面也是 Page Cache 使得它實現了零拷貝,具體下面會講。

零拷貝

無論是優秀的 Netty 還是其他優秀的 Java 框架,基本都在零拷貝減少了 CPU 的上下文切換和磁盤的 IO。當然 Kafka 也不例外。零拷貝的概念具體這裏不作太詳細的複述,大致的給大家講一下這個概念。

傳統的一次應用程請求數據的過程

這裏大致可以發傳統的方式發生了 4 次拷貝,2 次 DMA 和 2 次 CPU,而 CPU 發生了 4 次的切換。_(DMA 簡單理解就是,在進行 I/O 設備和內存的數據傳輸的時候,數據搬運的工作全部交給 DMA 控制器,而 CPU 不再參與任何與數據搬運相關的事情)。

零拷貝的方式

通過優化我們可以發現,CPU 只發生了 2 次的上下文切換和 3 次數據拷貝。(linux 系統提供了系統事故調用函數 “sendfile()”,這樣系統調用,可以直接把內核緩衝區裏的數據拷貝到 socket 緩衝區裏,不再拷貝到用戶態)。

分區分段

我們上面也介紹過了,kafka 採取了分區的模式,而每一個分區又對應到一個物理分段,而查找的時候可以根據二分查找快速定位。這樣不僅提供了數據讀的查詢效率,也提供了並行操作的方式。

數據壓縮

Kafka 對數據提供了:Gzip 和 Snappy 壓縮協議等壓縮協議,對消息結構體進行了壓縮,一方面減少了帶寬,也減少了數據傳輸的消耗。

Kafka 安裝

安裝 JDK

由於使用壓縮包還需要自己配置環境變量,所以這裏推薦直接用 yum 安裝,熟悉查看目前 Java 的版本:

yum -y list Java*

安裝你想要的版本,這裏我是 1.8

yum install java-1.8.0-openjdk-devel.x86_64

查看是否安裝成功

Java -version

安裝 Zookeeper

首先需要去官網下載安裝包,然後解壓

tar -zxvf zookeeper-3.4.9.tar.gz

要做的就是將這個文件複製一份,並命名爲:zoo.cfg,然後在 zoo.cfg 中修改自己的配置即可

cp zoo_sample.cfg zoo.cfg
vim zoo.cfg

主要配置解釋如下

# zookeeper內部的基本單位,單位是毫秒,這個表示一個tickTime爲2000毫秒,在zookeeper的其他配置中,都是基於tickTime來做換算的
tickTime=2000
# 集羣中的follower服務器(F)與leader服務器(L)之間 初始連接 時能容忍的最多心跳數(tickTime的數量)。
initLimit=10
#syncLimit:集羣中的follower服務器(F)與leader服務器(L)之間 請求和應答 之間能容忍的最多心跳數(tickTime的數量)
syncLimit=5
# 數據存放文件夾,zookeeper運行過程中有兩個數據需要存儲,一個是快照數據(持久化數據)另一個是事務日誌
dataDir=/tmp/zookeeper
## 客戶端訪問端口
clientPort=2181

配置環境變量

vim ~/.bash_profile
export ZK=/usr/local/src/apache-zookeeper-3.7.0-bin
export PATH=$PATH:$ZK/bin
export PATH
// 啓動
zkServer.sh start

下面能看啓動成功

安裝 Kafka

下載 kafka

https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka-2.8.0-src.tgz

安裝 kafka

 tar -xzvf kafka_2.12-2.0.0.tgz

配置環境變量

 export ZK=/usr/local/src/apache-zookeeper-3.7.0-bin
 export PATH=$PATH:$ZK/bin
 export KAFKA=/usr/local/src/kafka
 export PATH=$PATH:$KAFKA/bin

啓動 Kafka

 nohup kafka-server-start.sh 自己的配置文件路徑/server.properties &

大功告成!

參考資料

《深入理解 Kafka:核心設計實踐原理》

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/vzvmOXGcsX7rwY4J_--onw