分佈式架構之 Kafka

一、Kafka 是什麼?

Apache Kafka 是一個開源分佈式事件流平臺,被數千家公司用於高性能數據管道、流分析、數據集成和關鍵任務應用程序。

二、Kafka 有哪些應用場景?

三、Kafka 的架構圖是怎樣的?

四、Kafka 的優缺點有哪些?

1. 優點有哪些?

2. 缺點有哪些?

五、如何安裝 Kafka?

以 Linux 爲例,安裝 Kafka 非常簡單,一共三步如下所示:

1. 下載

wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz

2. 解壓

tar -xzf kafka_2.12-3.3.1.tgz

3. 啓動

cd kafka_2.12-3.3.1
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

六、YC-Framework 如何使用 Kafka?

1. 導入依賴

<dependency>
    <groupId>com.yc.framework</groupId>
    <artifactId>yc-common-kafka</artifactId>
</dependency>

2. 配置文件

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer: # 生產者
      retries: 3 # 設置大於0的值,則客戶端會將發送失敗的記錄重新發送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 當每一條記錄被消費者監聽器(ListenerConsumer)處理之後提交
      # RECORD
      # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後提交
      # BATCH
      # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,距離上次提交時間大於TIME時提交
      # TIME
      # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,被處理record數量大於等於COUNT時提交
      # COUNT
      # TIME | COUNT 有一個條件滿足時提交
      # COUNT_TIME
      # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後, 手動調用Acknowledgment.acknowledge()後提交
      # MANUAL
      # 手動調用Acknowledgment.acknowledge()後立即提交,一般使用這種
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

3. 生產者代碼

@RestController
public class KafkafProductor {
    private final static String TOPIC_NAME = "my-replicated-topic";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @RequestMapping("/send")
    public String send(@RequestParam("msg") String msg) {
        kafkaTemplate.send(TOPIC_NAME, "key", msg);
        return String.format("消息 %s 發送成功!", msg);
    }
}

4. 消費者代碼

@Component
public class KafkaConsumer {
    /**
     * @param record record
     * @KafkaListener(groupId = "testGroup", topicPartitions = {
     * @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
     * @TopicPartition(topic = "topic2", partitions = "0",
     * partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
     * },concurrency = "6")
     * //concurrency就是同組下的消費者個數,就是併發消費數,必須小於等於分區總數
     */
    @KafkaListener(topics = "my-replicated-topic", groupId = "testGroup1")
    public void listenJihuGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println("testGroup1 message: " + value);
        System.out.println("testGroup1 record: " + record);
        //手動提交offset,一般是提交一個banch,冪等性防止重複消息
        // === 每條消費完確認性能不好!
        ack.acknowledge();
    }
    //配置多個消費組
    @KafkaListener(topics = "my-replicated-topic", groupId = "testGroup2")
    public void listenJihuGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println("testGroup2 message: " + value);
        System.out.println("testGroup2 record: " + record);
        //手動提交offset
        ack.acknowledge();
    }
}

相關示例代碼地址:
https://github.com/developers-youcong/yc-framework/tree/main/yc-example/yc-example-kafka

YC-Framework 官網:
https://framework.youcongtech.com/

YC-Framework Github 源代碼:
https://github.com/developers-youcong/yc-framework

YC-Framework Gitee 源代碼:
https://gitee.com/developers-youcong/yc-framework

以上源代碼均已開源,開源不易,如果對你有幫助,不妨給個 star,鼓勵一下!!!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/NWbMRnMA0jKtkjNZVV1CYw