超詳細 kafka 入門最佳實踐
認識 kafka
kafka 簡介
Kafka 是一個分佈式流媒體平臺,kafka 官網:http://kafka.apache.org/
-
(1)流媒體平臺有三個關鍵功能:
-
發佈和訂閱記錄流,類似於消息隊列或企業消息傳遞系統。
-
以容錯的持久方式存儲記錄流。
-
記錄發生時處理流。
-
(2)Kafka 通常用於兩大類應用:
-
構建可在系統或應用程序之間可靠獲取數據的實時流數據管道
-
構建轉換或響應數據流的實時流應用程序
-
(3)首先是幾個概念:
-
Kafka 作爲一個集羣運行在一個或多個可跨多個數據中心的服務器上。
-
Kafka 集羣以稱爲** topics 主題**的類別存儲記錄流。
-
每條記錄都包含一個鍵,一個值和一個時間戳。
-
(4)Kafka 有四個核心 API:
-
Producer API(生產者 API)允許應用程序發佈記錄流至一個或多個 kafka 的 topics(主題)。
-
Consumer API(消費者 API)允許應用程序訂閱一個或多個 topics(主題),並處理所產生的對他們記錄的數據流。
-
**Streams API(流 API)**允許應用程序充當流處理器,從一個或多個 topics(主題)消耗的輸入流,併產生一個輸出流至一個或多個輸出的 topics(主題),有效地變換所述輸入流,以輸出流。
-
Connector API(連接器 API)允許構建和運行 kafka topics(主題)連接到現有的應用程序或數據系統中重用生產者或消費者。例如,關係數據庫的連接器可能捕獲對錶的每個更改。
在 Kafka 中,客戶端和服務器之間的通信是通過簡單,高性能,語言無關的 TCP 協議完成的。此協議已版本化並保持與舊版本的向後兼容性。Kafka 提供 Java 客戶端,但客戶端有多種語言版本。
1.2 Topics 主題 和 partitions 分區
我們首先深入瞭解 Kafka 爲記錄流提供的核心抽象 - 主題 topics
一個 Topic 可以認爲是一類消息,每個 topic 將被分成多個 partition(區), 每個 partition 在存儲層面是 append log 文件
主題是發佈記錄的類別或訂閱源名稱。Kafka 的主題總是多用戶; 也就是說,一個主題可以有零個,一個或多個消費者訂閱寫入它的數據。
對於每個主題,Kafka 羣集都維護一個如下所示的分區日誌:
每個分區都是一個有序的,不可變的記錄序列,不斷附加到結構化的提交日誌中。分區中的記錄每個都分配了一個稱爲偏移的順序 ID 號,它唯一地標識分區中的每個記錄。
Kafka 集羣持久保存所有已發佈的記錄 - 無論是否已使用 - 使用可配置的保留期。例如,如果保留策略設置爲兩天,則在發佈記錄後的兩天內,它可供使用,之後將被丟棄以釋放空間。Kafka 的性能在數據大小方面實際上是恆定的,因此長時間存儲數據不是問題。
這些功能組合意味着 Kafka 消費者 consumers 非常 cheap - 他們可以來來往往對集羣或其他消費者沒有太大影響。例如,您可以使用我們的命令行工具 “tail” 任何主題的內容,而無需更改任何現有使用者所消耗的內容。
日誌中的分區有多種用途。首先,它們允許日誌擴展到超出適合單個服務器的大小。每個單獨的分區必須適合託管它的服務器,但主題可能有許多分區,因此它可以處理任意數量的數據。其次,它們充當了並行性的單位 - 更多的是它。
1.3 Distribution 分配
一個 Topic 的多個 partitions, 被分佈在 kafka 集羣中的多個 server 上; 每個 server(kafka 實例) 負責 partitions 中消息的讀寫操作; 此外 kafka 還可以配置 partitions 需要備份的個數 (replicas), 每個 partition 將會被備份到多臺機器上, 以提高可用性.
基於 replicated 方案, 那麼就意味着需要對多個備份進行調度; 每個 partition 都有一個 server 爲 "leader";leader 負責所有的讀寫操作, 如果 leader 失效, 那麼將會有其他 follower 來接管 (成爲新的 leader);follower 只是單調的和 leader 跟進, 同步消息即可.. 由此可見作爲 leader 的 server 承載了全部的請求壓力, 因此從集羣的整體考慮, 有多少個 partitions 就意味着有多少個 "leader",kafka 會將 "leader" 均衡的分散在每個實例上, 來確保整體的性能穩定。
1.4 Producers 生產者 和 Consumers 消費者
1.4.1 Producers 生產者
Producers 將數據發佈到指定的 topics 主題。同時 Producer 也能決定將此消息歸屬於哪個 partition; 比如基於 "round-robin" 方式或者通過其他的一些算法等。
1.4.2 Consumers
-
本質上 kafka 只支持 Topic. 每個 consumer 屬於一個 consumer group; 反過來說, 每個 group 中可以有多個 consumer. 發送到 Topic 的消息, 只會被訂閱此 Topic 的每個 group 中的一個 consumer 消費。
-
如果所有使用者實例具有相同的使用者組,則記錄將有效地在使用者實例上進行負載平衡。
-
如果所有消費者實例具有不同的消費者組,則每個記錄將廣播到所有消費者進程。
分析:兩個服務器 Kafka 羣集,託管四個分區(P0-P3),包含兩個使用者組。消費者組 A 有兩個消費者實例,B 組有四個消費者實例。
在 Kafka 中實現消費 consumption 的方式是通過在消費者實例上劃分日誌中的分區,以便每個實例在任何時間點都是分配的 “公平份額” 的獨佔消費者。維護組中成員資格的過程由 Kafka 協議動態處理。如果新實例加入該組,他們將從該組的其他成員接管一些分區; 如果實例死亡,其分區將分發給其餘實例。
Kafka 僅提供分區內記錄的總訂單,而不是主題中不同分區之間的記錄。對於大多數應用程序而言,按分區排序與按鍵分區數據的能力相結合就足夠了。但是,如果您需要對記錄進行總訂單,則可以使用僅包含一個分區的主題來實現,但這將意味着每個使用者組只有一個使用者進程。
1.5 Consumers kafka 確保
-
發送到 partitions 中的消息將會按照它接收的順序追加到日誌中。也就是說,如果記錄 M1 由與記錄 M2 相同的生成者發送,並且首先發送 M1,則 M1 將具有比 M2 更低的偏移並且在日誌中更早出現。
-
消費者實例按照它們存儲在日誌中的順序查看記錄。對於消費者而言, 它們消費消息的順序和日誌中消息順序一致。
-
如果 Topic 的 "replicationfactor" 爲 N, 那麼允許 N-1 個 kafka 實例失效,我們將容忍最多 N-1 個服務器故障,而不會丟失任何提交到日誌的記錄。
1.6 kafka 作爲消息系統
Kafka 的流概念與傳統的企業郵件系統相比如何?
(1)傳統消息系統
消息傳統上有兩種模型:queuing 排隊 and publish-subscribe 發佈 - 訂閱。在隊列中,消費者池可以從服務器讀取並且每個記錄轉到其中一個; 在發佈 - 訂閱中,記錄被廣播給所有消費者。這兩種模型中的每一種都有優點和缺點。排隊的優勢在於它允許您在多個消費者實例上劃分數據處理,從而可以擴展您的處理。不幸的是,一旦一個進程讀取它已經消失的數據,隊列就不是多用戶。發佈 - 訂閱允許您將數據廣播到多個進程,但由於每條消息都發送給每個訂閱者,因此無法進行擴展處理。
卡夫卡的消費者羣體概念概括了這兩個概念。與隊列一樣,使用者組允許您將處理劃分爲一組進程(使用者組的成員)。與發佈 - 訂閱一樣,Kafka 允許您向多個消費者組廣播消息。
(2)kafka 的優勢
Kafka 模型的優勢在於每個主題都具有這些屬性 - 它可以擴展處理並且也是多用戶 - 不需要選擇其中一個。
與傳統的消息系統相比,Kafka 具有更強的訂購保證。
傳統隊列在服務器上按順序保留記錄,如果多個消費者從隊列中消耗,則服務器按照存儲順序分發記錄。但是,雖然服務器按順序分發記錄,但是記錄是異步傳遞給消費者的,因此它們可能會在不同的消費者處出現故障。這實際上意味着在存在並行消耗的情況下丟失記錄的順序。消息傳遞系統通常通過具有 “獨佔消費者” 概念來解決這個問題,該概念只允許一個進程從隊列中消耗,但當然這意味着處理中沒有並行性。
kafka 做得更好。通過在主題中具有並行性概念 - 分區 - ,Kafka 能夠在消費者流程池中提供訂購保證和負載平衡。這是通過將主題中的分區分配給使用者組中的使用者來實現的,以便每個分區僅由該組中的一個使用者使用。通過這樣做,我們確保使用者是該分區的唯一讀者並按順序使用數據。由於有許多分區,這仍然可以平衡許多消費者實例的負載。但請注意,消費者組中的消費者實例不能超過分區。
1.7 kafka 作爲存儲系統
-
任何允許發佈與消費消息分離的消息的消息隊列實際上充當了正在進行的消息的存儲系統。Kafka 的不同之處在於它是一個非常好的存儲系統。
-
寫入 Kafka 的數據將寫入磁盤並進行復制以實現容錯。Kafka 允許生產者等待確認,以便在完全複製之前寫入不被認爲是完整的,並且即使寫入的服務器失敗也保證寫入仍然存在。
-
磁盤結構 Kafka 很好地使用了規模 - 無論服務器上有 50 KB 還是 50 TB 的持久數據,Kafka 都會執行相同的操作。
-
由於認真對待存儲並允許客戶端控制其讀取位置,您可以將 Kafka 視爲一種專用於高性能,低延遲提交日誌存儲,複製和傳播的專用分佈式文件系統。
1.8 kafka 用於流處理
-
僅僅讀取,寫入和存儲數據流是不夠的,目的是實現流的實時處理。
-
在 Kafka 中,流處理器是指從輸入主題獲取連續數據流,對此輸入執行某些處理以及生成連續數據流以輸出主題的任何內容。
-
例如,零售應用程序可能會接收銷售和發貨的輸入流,並輸出重新排序流和根據此數據計算的價格調整。
-
可以使用生產者和消費者 API 直接進行簡單處理。但是,對於更復雜的轉換,Kafka 提供了完全集成的 Streams API。這允許構建執行非平凡處理的應用程序,這些應用程序可以計算流的聚合或將流連接在一起。
-
此工具有助於解決此類應用程序面臨的難題:處理無序數據,在代碼更改時重新處理輸入,執行有狀態計算等。
-
流 API 構建在 Kafka 提供的核心原語上:它使用生產者和消費者 API 進行輸入,使用 Kafka 進行有狀態存儲,並在流處理器實例之間使用相同的組機制來實現容錯。
2、kafka 使用場景
2.1 消息 Messaging
Kafka 可以替代更傳統的消息代理。消息代理的使用有多種原因(將處理與數據生成器分離,緩衝未處理的消息等)。與大多數消息傳遞系統相比,Kafka 具有更好的吞吐量,內置分區,複製和容錯功能,這使其成爲大規模消息處理應用程序的理想解決方案。
根據經驗,消息傳遞的使用通常相對較低,但可能需要較低的端到端延遲,並且通常取決於 Kafka 提供的強大的耐用性保證。
在這個領域,Kafka 可與傳統的消息傳遞系統(如 ActiveMQ 或 RabbitMQ)相媲美。
2.2 網站活動跟蹤
Kafka 的原始用例是能夠將用戶活動跟蹤管道重建爲一組實時發佈 - 訂閱源。這意味着站點活動(頁面查看,搜索或用戶可能採取的其他操作)將發佈到中心主題,每個活動類型包含一個主題。這些源可用於訂購一系列用例,包括實時處理,實時監控以及加載到 Hadoop 或離線數據倉庫系統以進行脫機處理和報告。
活動跟蹤通常非常高,因爲爲每個用戶頁面視圖生成了許多活動消息。
2.3 度量 Metrics
Kafka 通常用於運營監控數據。這涉及從分佈式應用程序聚合統計信息以生成操作數據的集中式提要。
2.4 日誌聚合
許多人使用 Kafka 作爲日誌聚合解決方案的替代品。日誌聚合通常從服務器收集物理日誌文件,並將它們放在中央位置(可能是文件服務器或 HDFS)進行處理。Kafka 抽象出文件的細節,並將日誌或事件數據作爲消息流更清晰地抽象出來。這允許更低延遲的處理並更容易支持多個數據源和分佈式數據消耗。與 Scribe 或 Flume 等以日誌爲中心的系統相比,Kafka 提供了同樣出色的性能,由於複製而具有更強的耐用性保證,以及更低的端到端延遲。
2.5 流處理
許多 Kafka 用戶在處理由多個階段組成的管道時處理數據,其中原始輸入數據從 Kafka 主題中消費,然後聚合,豐富或以其他方式轉換爲新主題以供進一步消費或後續處理。
例如,用於推薦新聞文章的處理管道可以從 RSS 訂閱源抓取文章內容並將其發佈到 “文章” 主題; 進一步處理可能會對此內容進行規範化或重複數據刪除,並將已清理的文章內容發佈到新主題; 最終處理階段可能會嘗試向用戶推薦此內容。此類處理管道基於各個主題創建實時數據流的圖形。從 0.10.0.0 開始,這是一個輕量級但功能強大的流處理庫,名爲 Kafka Streams 在 Apache Kafka 中可用於執行如上所述的此類數據處理。除了 Kafka Streams 之外,其他開源流處理工具包括 Apache Storm 和 Apache Samza。
2.6 Event Sourcing
Event Sourcing 是一種應用程序設計風格,其中狀態更改記錄爲按時間排序的記錄序列。Kafka 對非常大的存儲日誌數據的支持使其成爲以這種風格構建的應用程序的出色後端。
2.7 提交日誌
Kafka 可以作爲分佈式系統的一種外部提交日誌。該日誌有助於在節點之間複製數據,並充當故障節點恢復其數據的重新同步機制。Kafka 中的日誌壓縮功能有助於支持此用法。在這種用法中,Kafka 類似於 Apache BookKeeper 項目。
3、kafka 安裝
3.1 下載安裝
到官網 http://kafka.apache.org/downloads.html 下載想要的版本。
注:由於 Kafka 控制檯腳本對於基於 Unix 和 Windows 的平臺是不同的,因此在 Windows 平臺上使用 bin\windows\ 而不是 bin/ 將腳本擴展名更改爲. bat。
[root@along ~]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
[root@along ~]# tar -C /data/ -xvf kafka_2.11-2.1.0.tgz
[root@along ~]# cd /data/kafka_2.11-2.1.0/
3.2 配置啓動 zookeeper
kafka 正常運行,必須配置 zookeeper,否則無論是 kafka 集羣還是客戶端的生存者和消費者都無法正常的工作的;所以需要配置啓動 zookeeper 服務。
(1)zookeeper 需要 java 環境
[root@along ~]# yum -y install java-1.8.0
(2)這裏 kafka 下載包已經包括 zookeeper 服務,所以只需修改配置文件,啓動即可。
如果需要下載指定 zookeeper 版本;可以單獨去 zookeeper 官網 http://mirrors.shu.edu.cn/apache/zookeeper / 下載指定版本。
[root@along ~]# cd /data/kafka_2.11-2.1.0/
[root@along kafka_2.11-2.1.0]# grep "^[^#]" config/zookeeper.properties
dataDir=/tmp/zookeeper #數據存儲目錄
clientPort=2181 #zookeeper端口
maxClientCnxns=0
注:可自行添加修改 zookeeper 配置
3.3 配置 kafka
(1)修改配置文件
[root@along kafka_2.11-2.1.0]# grep "^[^#]" config/server.properties
broker.id=0
listeners=PLAINTEXT://localhost:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
注:可根據自己需求修改配置文件
broker.id:#唯一標識ID
listeners=PLAINTEXT://localhost:9092:#kafka服務監聽地址和端口
log.dirs:#日誌存儲目錄
zookeeper.connect:#指定zookeeper服務
(2)配置環境變量
[root@along ~]# vim /etc/profile.d/kafka.sh
export KAFKA_HOME="/data/kafka_2.11-2.1.0"
export PATH="${KAFKA_HOME}/bin:$PATH"
[root@along ~]# source /etc/profile.d/kafka.sh
(3)配置服務啓動腳本
[root@along ~]# vim /etc/init.d/kafka
#!/bin/sh
#
# chkconfig: 345 99 01
# description: Kafka
#
# File : Kafka
#
# Description: Starts and stops the Kafka server
#
source /etc/rc.d/init.d/functions
KAFKA_HOME=/data/kafka_2.11-2.1.0
KAFKA_USER=root
export LOG_DIR=/tmp/kafka-logs
[ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka
# See how we were called.
case "$1" in
start)
echo -n "Starting Kafka:"
/sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &"
echo " done."
exit 0
;;
stop)
echo -n "Stopping Kafka: "
/sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill \-9"
echo " done."
exit 0
;;
hardstop)
echo -n "Stopping (hard) Kafka: "
/sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill -9"
echo " done."
exit 0
;;
status)
c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`
if [ "$c_pid" = "" ] ; then
echo "Stopped"
exit 3
else
echo "Running $c_pid"
exit 0
fi
;;
restart)
stop
start
;;
*)
echo "Usage: kafka {start|stop|hardstop|status|restart}"
exit 1
;;
esac
3.4 啓動 kafka 服務
(1)後臺啓動 zookeeper 服務
[root@along ~]# nohup zookeeper-server-start.sh /data/kafka_2.11-2.1.0/config/zookeeper.properties &
(2)啓動 kafka 服務
[root@along ~]# service kafka start
Starting kafka (via systemctl): [ OK ]
[root@along ~]# service kafka status
Running 86018
[root@along ~]# ss -nutl
Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port
tcp LISTEN 0 50 :::9092 :::*
tcp LISTEN 0 50 :::2181 :::*
4、kafka 使用簡單入門
4.1 創建主題 topics
創建一個名爲 “along” 的主題,它只包含一個分區,只有一個副本:
[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic along
Created topic "along".
如果我們運行 list topic 命令,我們現在可以看到該主題:
[root@along ~]# kafka-topics.sh --list --zookeeper localhost:2181
along
4.2 發送一些消息
Kafka 附帶一個命令行客戶端,它將從文件或標準輸入中獲取輸入,並將其作爲消息發送到 Kafka 集羣。默認情況下,每行將作爲單獨的消息發送。
運行生產者,然後在控制檯中鍵入一些消息以發送到服務器。
[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along
>This is a message
>This is another message
4.3 啓動消費者
Kafka 還有一個命令行使用者,它會將消息轉儲到標準輸出。
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic along --from-beginning
This is a message
This is another message
所有命令行工具都有其他選項; 運行不帶參數的命令將顯示更詳細地記錄它們的使用信息。
5、設置多代理 kafka 羣集
到目前爲止,我們一直在與一個 broker 運行,但這並不好玩。對於 Kafka,單個代理只是一個大小爲 1 的集羣,因此除了啓動一些代理實例之外沒有太多變化。但是爲了感受它,讓我們將我們的集羣擴展到三個節點(仍然在我們的本地機器上)。
5.1 準備配置文件
[root@along kafka_2.11-2.1.0]# cd /data/kafka_2.11-2.1.0/
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-1.properties
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-2.properties
[root@along kafka_2.11-2.1.0]# vim config/server-1.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
[root@along kafka_2.11-2.1.0]# vim config/server-2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
注:該 broker.id 屬性是羣集中每個節點的唯一且永久的名稱。我們必須覆蓋端口和日誌目錄,因爲我們在同一臺機器上運行這些,並且我們希望讓所有代理嘗試在同一端口上註冊或覆蓋彼此的數據。
5.2 開啓集羣另 2 個 kafka 服務
[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties &
[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties &
[root@along ~]# ss -nutl
Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port
tcp LISTEN 0 50 ::ffff:127.0.0.1:9092 :::*
tcp LISTEN 0 50 ::ffff:127.0.0.1:9093 :::*
tcp LISTEN 0 50 ::ffff:127.0.0.1:9094 :::*
5.3 在集羣中進行操作
(1)現在創建一個複製因子爲 3 的新主題 my-replicated-topic
[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".
(2)在一個集羣中,運行 “describe topics” 命令查看哪個 broker 正在做什麼
[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
#註釋:第一行給出了所有分區的摘要,每個附加行提供有關一個分區的信息。由於我們只有一個分區用於此主題,因此只有一行。
#“leader”是負責給定分區的所有讀取和寫入的節點。每個節點將成爲隨機選擇的分區部分的領導者。
#“replicas”是複製此分區日誌的節點列表,無論它們是否爲領導者,或者即使它們當前處於活動狀態。
# “isr”是“同步”複製品的集合。這是副本列表的子集,該列表當前處於活躍狀態並且已經被領導者捕獲。
#請注意,Leader: 2,在我的示例中,節點2 是該主題的唯一分區的Leader。
(3)可以在我們創建的原始主題上運行相同的命令,以查看它的位置
[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic along
Topic:along PartitionCount:1 ReplicationFactor:1 Configs:
Topic: along Partition: 0 Leader: 0 Replicas: 0 Isr: 0
(4)向我們的新主題發佈一些消息:
[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>my test message 1
>my test message 2
>^C
(5)現在讓我們使用這些消息:
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2
5.4 測試集羣的容錯性
(1)現在讓我們測試一下容錯性。Broker 2 充當 leader 所以讓我們殺了它:
[root@along ~]# ps aux | grep server-2.properties |awk '{print $2}'
106737
[root@along ~]# kill -9 106737
[root@along ~]# ss -nutl
tcp LISTEN 0 50 ::ffff:127.0.0.1:9092 :::*
tcp LISTEN 0 50 ::ffff:127.0.0.1:9093 :::*
(2)leader 已切換到其中一個從屬節點,節點 2 不再位於同步副本集中:
[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1
(3)即使最初接受寫入的 leader 已經失敗,這些消息仍可供消費:
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2
6、使用 Kafka Connect 導入 / 導出數據
從控制檯寫入數據並將其寫回控制檯是一個方便的起點,但有時候可能希望使用其他來源的數據或將數據從 Kafka 導出到其他系統。對於許多系統,您可以使用 Kafka Connect 導入或導出數據,而不是編寫自定義集成代碼。
Kafka Connect 是 Kafka 附帶的工具,用於向 Kafka 導入和導出數據。它是一個可擴展的工具,運行連接器,實現與外部系統交互的自定義邏輯。在本快速入門中,我們將瞭解如何使用簡單的連接器運行 Kafka Connect,這些連接器將數據從文件導入 Kafka 主題並將數據從 Kafka 主題導出到文件。
(1)首先創建一些種子數據進行測試:
[root@along ~]# echo -e "foo\nbar" > test.txt
或者在Windows上:
> echo foo> test.txt
> echo bar>> test.txt
(2)接下來,啓動兩個以獨立模式運行的連接器,這意味着它們在單個本地專用進程中運行。提供三個配置文件作爲參數。
第一個始終是 Kafka Connect 流程的配置,包含常見配置,例如要連接的 Kafka 代理和數據的序列化格式。
其餘配置文件均指定要創建的連接器。這些文件包括唯一的連接器名稱,要實例化的連接器類以及連接器所需的任何其他配置。
[root@along ~]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2019-01-16 16:16:31,884] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)
[2019-01-16 16:16:31,903] INFO WorkerInfo values:
... ...
#注:Kafka附帶的這些示例配置文件使用您之前啓動的默認本地羣集配置並創建兩個連接器:第一個是源連接器,它從輸入文件讀取行並生成每個Kafka主題,第二個是宿連接器從Kafka主題讀取消息並將每個消息生成爲輸出文件中的一行。
(3)驗證是否導入成功(另起終端)
在啓動過程中,您將看到許多日誌消息,包括一些指示正在實例化連接器的日誌消息。
① 一旦 Kafka Connect 進程啓動,源連接器應該開始從 test.txt 主題讀取行並將其生成到主題 connect-test,並且接收器連接器應該開始從主題讀取消息 connect-test 並將它們寫入文件 test.sink.txt。我們可以通過檢查輸出文件的內容來驗證數據是否已通過整個管道傳遞:
[root@along ~]# cat test.sink.txt
foo
bar
② 請注意,數據存儲在 Kafka 主題中 connect-test,因此我們還可以運行控制檯使用者來查看主題中的數據(或使用自定義使用者代碼來處理它):
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
(4)繼續追加數據,驗證
[root@along ~]# echo Another line>> test.txt
[root@along ~]# cat test.sink.txt
foo
bar
Another line
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"
作者:along 阿龍
出處:http://www.cnblogs.com/along21/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/0NlaVNmXSLpGafiNaVZ8gg