分佈式架構之 Kafka
一、Kafka 是什麼?
Apache Kafka 是一個開源分佈式事件流平臺,被數千家公司用於高性能數據管道、流分析、數據集成和關鍵任務應用程序。
二、Kafka 有哪些應用場景?
-
- 消息系統;
-
- 網站活動追蹤;
-
- 運營指標;
-
- 日誌收集;
-
- 流處理;
-
- 事件採集。
三、Kafka 的架構圖是怎樣的?
四、Kafka 的優缺點有哪些?
1. 優點有哪些?
-
(1)性能卓越,單機寫入 TPS 約在百萬條 / 秒,最大的優點,就是吞吐量高。
-
(2)時效性:ms 級。
-
(3)可用性:非常高,kafka 是分佈式的,一個數據多個副本,少數機器宕機,不會丟失數據,不會導致不可用。
-
(4)消費者採用 Pull 方式獲取消息, 消息有序, 通過控制能夠保證所有消息被消費且僅被消費一次。
-
(5)有優秀的第三方 Kafka Web 管理界面 Kafka-Manager。
-
(6)在日誌領域比較成熟,被多家公司和多個開源項目使用。
-
(7)功能支持:功能較爲簡單,主要支持簡單的 MQ 功能,在大數據領域的實時計算以及日誌採集被大規模使用。
2. 缺點有哪些?
-
(1)消息丟失。
-
(2)消息重複。
-
(3)消息亂序。
-
(4)消息堆積。
五、如何安裝 Kafka?
以 Linux 爲例,安裝 Kafka 非常簡單,一共三步如下所示:
1. 下載
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
2. 解壓
tar -xzf kafka_2.12-3.3.1.tgz
3. 啓動
cd kafka_2.12-3.3.1
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
六、YC-Framework 如何使用 Kafka?
1. 導入依賴
<dependency>
<groupId>com.yc.framework</groupId>
<artifactId>yc-common-kafka</artifactId>
</dependency>
2. 配置文件
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer: # 生產者
retries: 3 # 設置大於0的值,則客戶端會將發送失敗的記錄重新發送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 當每一條記錄被消費者監聽器(ListenerConsumer)處理之後提交
# RECORD
# 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後提交
# BATCH
# 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,距離上次提交時間大於TIME時提交
# TIME
# 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,被處理record數量大於等於COUNT時提交
# COUNT
# TIME | COUNT 有一個條件滿足時提交
# COUNT_TIME
# 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後, 手動調用Acknowledgment.acknowledge()後提交
# MANUAL
# 手動調用Acknowledgment.acknowledge()後立即提交,一般使用這種
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
3. 生產者代碼
@RestController
public class KafkafProductor {
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public String send(@RequestParam("msg") String msg) {
kafkaTemplate.send(TOPIC_NAME, "key", msg);
return String.format("消息 %s 發送成功!", msg);
}
}
4. 消費者代碼
@Component
public class KafkaConsumer {
/**
* @param record record
* @KafkaListener(groupId = "testGroup", topicPartitions = {
* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
* @TopicPartition(topic = "topic2", partitions = "0",
* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
* },concurrency = "6")
* //concurrency就是同組下的消費者個數,就是併發消費數,必須小於等於分區總數
*/
@KafkaListener(topics = "my-replicated-topic", groupId = "testGroup1")
public void listenJihuGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println("testGroup1 message: " + value);
System.out.println("testGroup1 record: " + record);
//手動提交offset,一般是提交一個banch,冪等性防止重複消息
// === 每條消費完確認性能不好!
ack.acknowledge();
}
//配置多個消費組
@KafkaListener(topics = "my-replicated-topic", groupId = "testGroup2")
public void listenJihuGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println("testGroup2 message: " + value);
System.out.println("testGroup2 record: " + record);
//手動提交offset
ack.acknowledge();
}
}
相關示例代碼地址:
https://github.com/developers-youcong/yc-framework/tree/main/yc-example/yc-example-kafka
YC-Framework 官網:
https://framework.youcongtech.com/
YC-Framework Github 源代碼:
https://github.com/developers-youcong/yc-framework
YC-Framework Gitee 源代碼:
https://gitee.com/developers-youcong/yc-framework
以上源代碼均已開源,開源不易,如果對你有幫助,不妨給個 star,鼓勵一下!!!
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/NWbMRnMA0jKtkjNZVV1CYw