kafka-go 使用指南

Go 操作 Kafka 之 kafka-go

Go 社區中目前有三個比較常用的 kafka 客戶端庫 , 它們各有特點。

首先是 IBM/saram(這個庫已經由 Shopify 轉給了 IBM),之前我寫過一篇使用 sarama 操作 Kafka 的教程,相較於 sarama, kafka-go 更簡單、更易用。

segmentio/kafka-go 是純 Go 實現,提供了與 kafka 交互的低級別和高級別兩套 API,同時也支持 Context。

此外社區中另一個比較常用的 confluentinc/confluent-kafka-go,它是一個基於 cgo 的 librdkafka 包裝,在項目中使用它會引入對 C 庫的依賴。

準備 Kafka 環境

這裏推薦使用 Docker Compose 快速搭建一套本地開發環境。

以下docker-compose.yml文件用來搭建一套單節點 zookeeper 和單節點 kafka 環境,並且在8080端口提供kafka-ui管理界面。

version: '2.1'

services:
  zoo1:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888

  kafka1:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1
  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8080:8080
    depends_on:
      - kafka1
    environment:
      DYNAMIC_CONFIG_ENABLED: "TRUE"

參考資料

  • conduktor/kafka-stack-docker-compose

  • provectus/kafka-ui

將上述docker-compose.yml文件在本地保存,在同一目錄下執行以下命令啓動容器。

docker-compose up -d

容器啓動後,使用瀏覽器打開 127.0.0.1:8080 即可看到如下kafka-ui界面。

點擊頁面右側的 “Configure new cluster” 按鈕,配置 kafka 服務連接信息。

填寫完信息後,點擊頁面下方的 “Submit” 按鈕提交即可。

安裝 kafka-go

執行以下命令下載 kafka-go依賴。

go get github.com/segmentio/kafka-go

注意:kafka-go 需要 Go 1.15 或更高版本。

kafka-go 使用指南

kafka-go 提供了兩套與 Kafka 交互的 API。

通常建議直接使用高級別的交互 API。

Connection

Conn 類型是 kafka-go 包的核心。它代表與 Kafka broker 之間的連接。基於它實現了一套與 Kafka 交互的低級別 API。

發送消息

下面是連接至 Kafka 之後,使用 Conn 發送消息的代碼示例。

// writeByConn 基於Conn發送消息
func writeByConn() {
 topic := "my-topic"
 partition := 0

 // 連接至Kafka集羣的Leader節點
 conn, err := kafka.DialLeader(context.Background()"tcp""localhost:9092", topic, partition)
 if err != nil {
  log.Fatal("failed to dial leader:", err)
 }

 // 設置發送消息的超時時間
 conn.SetWriteDeadline(time.Now().Add(10 * time.Second))

 // 發送消息
 _, err = conn.WriteMessages(
  kafka.Message{Value: []byte("one!")},
  kafka.Message{Value: []byte("two!")},
  kafka.Message{Value: []byte("three!")},
 )
 if err != nil {
  log.Fatal("failed to write messages:", err)
 }

 // 關閉連接
 if err := conn.Close(); err != nil {
  log.Fatal("failed to close writer:", err)
 }
}

消費消息

// readByConn 連接至kafka後接收消息
func readByConn() {
 // 指定要連接的topic和partition
 topic := "`my-topic`"
 partition := 0

 // 連接至Kafka的leader節點
 conn, err := kafka.DialLeader(context.Background()"tcp""localhost:9092", topic, partition)
 if err != nil {
  log.Fatal("failed to dial leader:", err)
 }

 // 設置讀取超時時間
 conn.SetReadDeadline(time.Now().Add(10 * time.Second))
 // 讀取一批消息,得到的batch是一系列消息的迭代器
 batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

 // 遍歷讀取消息
 b := make([]byte, 10e3) // 10KB max per message
 for {
  n, err := batch.Read(b)
  if err != nil {
   break
  }
  fmt.Println(string(b[:n]))
 }

 // 關閉batch
 if err := batch.Close(); err != nil {
  log.Fatal("failed to close batch:", err)
 }

 // 關閉連接
 if err := conn.Close(); err != nil {
  log.Fatal("failed to close connection:", err)
 }
}

使用batch.Read更高效一些,但是需要根據消息長度選擇合適的 buffer(上述代碼中的 b),如果傳入的 buffer 太小(消息裝不下)就會返回io.ErrShortBuffer錯誤。

如果不考慮內存分配的效率問題,也可以按以下代碼使用batch.ReadMessage讀取消息。

for {
  msg, err := batch.ReadMessage()
  if err != nil {
    break
  }
  fmt.Println(string(msg.Value))
}

創建 topic

當 Kafka 關閉自動創建 topic 的設置時,可按如下方式創建 topic。

// createTopicByConn 創建topic
func createTopicByConn() {
 // 指定要創建的topic名稱
 topic := "my-topic"

 // 連接至任意kafka節點
 conn, err := kafka.Dial("tcp""localhost:9092")
 if err != nil {
  panic(err.Error())
 }
 defer conn.Close()

 // 獲取當前控制節點信息
 controller, err := conn.Controller()
 if err != nil {
  panic(err.Error())
 }
 var controllerConn *kafka.Conn
 // 連接至leader節點
 controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
 if err != nil {
  panic(err.Error())
 }
 defer controllerConn.Close()

 topicConfigs := []kafka.TopicConfig{
  {
   Topic:             topic,
   NumPartitions:     1,
   ReplicationFactor: 1,
  },
 }

 // 創建topic
 err = controllerConn.CreateTopics(topicConfigs...)
 if err != nil {
  panic(err.Error())
 }
}

通過非 leader 節點連接 leader 節點

下面的示例代碼演示瞭如何通過已有的非 leader 節點的 Conn,連接至 leader 節點。

conn, err := kafka.Dial("tcp""localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()
// 獲取當前控制節點信息
controller, err := conn.Controller()
if err != nil {
    panic(err.Error())
}
var connLeader *kafka.Conn
connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
    panic(err.Error())
}
defer connLeader.Close(

獲取 topic 列表

conn, err := kafka.Dial("tcp""localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()

partitions, err := conn.ReadPartitions()
if err != nil {
    panic(err.Error())
}

m := map[string]struct{}{}
// 遍歷所有分區取topic
for _, p := range partitions {
    m[p.Topic] = struct{}{}
}
for k := range m {
    fmt.Println(k)
}

Reader

Reader是由 kafka-go 包提供的另一個概念,對於從單個主題 - 分區(topic-partition)消費消息這種典型場景,使用它能夠簡化代碼。Reader 還實現了自動重連和偏移量管理,並支持使用 Context 支持異步取消和超時的 API。

**注意:**當進程退出時,必須在 Reader 上調用 Close() 。Kafka 服務器需要一個優雅的斷開連接來阻止它繼續嘗試向已連接的客戶端發送消息。如果進程使用 SIGINT (shell 中的 Ctrl-C) 或 SIGTERM (如 docker stop 或 kubernetes start) 終止,那麼下面給出的示例不會調用 Close()。當同一 topic 上有新 Reader 連接時,可能導致延遲 (例如,新進程啓動或新容器運行)。在這種場景下應使用signal.Notify處理程序在進程關閉時關閉 Reader。

消費消息

下面的代碼演示瞭如何使用 Reader 連接至 Kafka 消費消息。

// readByReader 通過Reader接收消息
func readByReader() {
 // 創建Reader
 r := kafka.NewReader(kafka.ReaderConfig{
  Brokers:   []string{"localhost:9092""localhost:9093""localhost:9094"},
  Topic:     "topic-A",
  Partition: 0,
  MaxBytes:  10e6, // 10MB
 })
 r.SetOffset(42) // 設置Offset

 // 接收消息
 for {
  m, err := r.ReadMessage(context.Background())
  if err != nil {
   break
  }
  fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
 }

 // 程序退出前關閉Reader
 if err := r.Close(); err != nil {
  log.Fatal("failed to close reader:", err)
 }
}

消費者組

kafka-go支持消費者組,包括 broker 管理的 offset。要啓用消費者組,只需在 ReaderConfig 中指定 GroupID

使用消費者組時,ReadMessage 會自動提交偏移量。

// 創建一個reader,指定GroupID,從 topic-A 消費消息
r := kafka.NewReader(kafka.ReaderConfig{
 Brokers:  []string{"localhost:9092""localhost:9093""localhost:9094"},
 GroupID:  "consumer-group-id", // 指定消費者組id
 Topic:    "topic-A",
 MaxBytes: 10e6, // 10MB
})

// 接收消息
for {
 m, err := r.ReadMessage(context.Background())
 if err != nil {
  break
 }
 fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

// 程序退出前關閉Reader
if err := r.Close(); err != nil {
 log.Fatal("failed to close reader:", err)
}

在使用消費者組時會有以下限制:

顯式提交

kafka-go 也支持顯式提交。當需要顯式提交時不要調用 ReadMessage,而是調用 FetchMessage獲取消息,然後調用 CommitMessages 顯式提交。

ctx := context.Background()
for {
    // 獲取消息
    m, err := r.FetchMessage(ctx)
    if err != nil {
        break
    }
    // 處理消息
    fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
    // 顯式提交
    if err := r.CommitMessages(ctx, m); err != nil {
        log.Fatal("failed to commit messages:", err)
    }
}

在消費者組中提交消息時,具有給定主題 / 分區的最大偏移量的消息確定該分區的提交偏移量的值。例如,如果通過調用 FetchMessage 獲取了單個分區的偏移量爲 1、2 和 3 的消息,則使用偏移量爲 3 的消息調用 CommitMessages 也將導致該分區的偏移量爲 1 和 2 的消息被提交。

管理提交間隔

默認情況下,調用CommitMessages將同步向 Kafka 提交偏移量。爲了提高性能,可以在 ReaderConfig 中設置 CommitInterval 來定期向 Kafka 提交偏移。

// 創建一個reader從 topic-A 消費消息
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092""localhost:9093""localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    MaxBytes:       10e6, // 10MB
    CommitInterval: time.Second, // 每秒刷新一次提交給 Kafka
})

Writer

向 Kafka 發送消息,除了使用基於Conn的低級 API,kafka-go包還提供了更高級別的 Writer 類型。大多數情況下使用Writer即可滿足條件,它支持以下特性。

發送消息

// 創建一個writer 向topic-A發送消息
w := &kafka.Writer{
 Addr:         kafka.TCP("localhost:9092""localhost:9093""localhost:9094"),
 Topic:        "topic-A",
 Balancer:     &kafka.LeastBytes{}, // 指定分區的balancer模式爲最小字節分佈
 RequiredAcks: kafka.RequireAll,    // ack模式
 Async:        true,                // 異步
}

err := w.WriteMessages(context.Background(),
 kafka.Message{
  Key:   []byte("Key-A"),
  Value: []byte("Hello World!"),
 },
 kafka.Message{
  Key:   []byte("Key-B"),
  Value: []byte("One!"),
 },
 kafka.Message{
  Key:   []byte("Key-C"),
  Value: []byte("Two!"),
 },
)
if err != nil {
    log.Fatal("failed to write messages:", err)
}

if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

創建不存在的 topic

如果給 Writer 配置了AllowAutoTopicCreation:true,那麼當發送消息至某個不存在的 topic 時,則會自動創建 topic。

w := &Writer{
    Addr:                   kafka.TCP("localhost:9092""localhost:9093""localhost:9094"),
    Topic:                  "topic-A",
    AllowAutoTopicCreation: true,  // 自動創建topic
}

messages := []kafka.Message{
    {
        Key:   []byte("Key-A"),
        Value: []byte("Hello World!"),
    },
    {
        Key:   []byte("Key-B"),
        Value: []byte("One!"),
    },
    {
        Key:   []byte("Key-C"),
        Value: []byte("Two!"),
    },
}

var err error
const retries = 3
// 重試3次
for i := 0; i < retries; i++ {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    err = w.WriteMessages(ctx, messages...)
    if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
        time.Sleep(time.Millisecond * 250)
        continue
    }

    if err != nil {
        log.Fatalf("unexpected error %v", err)
    }
    break
}

// 關閉Writer
if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

寫入多個 topic

通常,WriterConfig.Topic用於初始化單個 topic 的 Writer。通過去掉 WriterConfig 中的 Topic 配置,分別設置每條消息的message.topic,可以實現將消息發送至多個 topic。

w := &kafka.Writer{
 Addr:     kafka.TCP("localhost:9092""localhost:9093""localhost:9094"),
    // 注意: 當此處不設置Topic時,後續的每條消息都需要指定Topic
 Balancer: &kafka.LeastBytes{},
}

err := w.WriteMessages(context.Background(),
    // 注意: 每條消息都需要指定一個 Topic, 否則就會報錯
 kafka.Message{
        Topic: "topic-A",
  Key:   []byte("Key-A"),
  Value: []byte("Hello World!"),
 },
 kafka.Message{
        Topic: "topic-B",
  Key:   []byte("Key-B"),
  Value: []byte("One!"),
 },
 kafka.Message{
        Topic: "topic-C",
  Key:   []byte("Key-C"),
  Value: []byte("Two!"),
 },
)
if err != nil {
    log.Fatal("failed to write messages:", err)
}

if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

注意:Writer 中的 Topic 和 Message 中的 Topic 是互斥的,同一時刻有且只能設置一處。

其他配置

TLS

對於基本的 Conn 類型或在 Reader/Writer 配置中,可以在 Dialer 中設置 TLS 選項。如果 TLS 字段爲空,則它將不啓用 TLS 連接。

注意:不在 Conn/Reder/Writer 上配置 TLS,連接到啓用 TLS 的 Kafka 集羣,可能會出現io.ErrUnexpectedEOF錯誤。

Connection
dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},  // 指定TLS配置
}

conn, err := dialer.DialContext(ctx, "tcp""localhost:9093")
Reader
dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},  // 指定TLS配置
}

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092""localhost:9093""localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    Dialer:         dialer,
})
Writer

創建Writer時可以按如下方式指定 TLS 配置。

w := kafka.Writer{
    Addr: kafka.TCP("localhost:9092""localhost:9093""localhost:9094"), 
    Topic:   "topic-A",
    Balancer: &kafka.Hash{},
    Transport: &kafka.Transport{
        TLS: &tls.Config{},  // 指定TLS配置
      },
    }

SASL

可以在Dialer上指定一個選項以使用 SASL 身份驗證。Dialer可以直接用來打開一個 Conn,也可以通過它們各自的配置傳遞給一個 ReaderWriter。如果 SASLMechanism字段爲 nil,則不會使用 SASL 進行身份驗證。

SASL 身份驗證類型
明文
mechanism := plain.Mechanism{
    Username: "username",
    Password: "password",
}
SCRAM
mechanism, err := scram.Mechanism(scram.SHA512, "username""password")
if err != nil {
    panic(err)
}
Connection
mechanism, err := scram.Mechanism(scram.SHA512, "username""password")
if err != nil {
    panic(err)
}

dialer := &kafka.Dialer{
    Timeout:       10 * time.Second,
    DualStack:     true,
    SASLMechanism: mechanism,
}

conn, err := dialer.DialContext(ctx, "tcp""localhost:9093")
Reader
mechanism, err := scram.Mechanism(scram.SHA512, "username""password")
if err != nil {
    panic(err)
}

dialer := &kafka.Dialer{
    Timeout:       10 * time.Second,
    DualStack:     true,
    SASLMechanism: mechanism,
}

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092","localhost:9093""localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    Dialer:         dialer,
})
Writer
mechanism, err := scram.Mechanism(scram.SHA512, "username""password")
if err != nil {
    panic(err)
}

// Transport 負責管理連接池和其他資源,
// 通常最好的使用方式是創建後在應用程序中共享使用它們。
sharedTransport := &kafka.Transport{
    SASL: mechanism,
}

w := kafka.Writer{
 Addr:      kafka.TCP("localhost:9092""localhost:9093""localhost:9094"),
 Topic:     "topic-A",
 Balancer:  &kafka.Hash{},
 Transport: sharedTransport,
}
Client
mechanism, err := scram.Mechanism(scram.SHA512, "username""password")
if err != nil {
    panic(err)
}

// Transport 負責管理連接池和其他資源,
// 通常最好的使用方式是創建後在應用程序中共享使用它們。
sharedTransport := &kafka.Transport{
    SASL: mechanism,
}

client := &kafka.Client{
    Addr:      kafka.TCP("localhost:9092""localhost:9093""localhost:9094"),
    Timeout:   10 * time.Second,
    Transport: sharedTransport,
}

Balancer

kafka-go實現了多種負載均衡策略。特別是當你從其他 Kafka 庫遷移過來時,你可以按如下說明選擇合適的 Balancer 實現。

Sarama

如果從 sarama 切換過來,並且需要 / 希望使用相同的算法進行消息分區,則可以使用kafka.Hashkafka.ReferenceHash

w := &kafka.Writer{
 Addr:     kafka.TCP("localhost:9092""localhost:9093""localhost:9094"),
 Topic:    "topic-A",
 Balancer: &kafka.Hash{},
}
librdkafka 和 confluent-kafka-go

kafka.CRC32Balancerlibrdkafka默認的consistent_random策略表現一致。

w := &kafka.Writer{
 Addr:     kafka.TCP("localhost:9092""localhost:9093""localhost:9094"),
 Topic:    "topic-A",
 Balancer: kafka.CRC32Balancer{},
}
Java

使用kafka.Murmur2Balancer可以獲得與默認 Java 客戶端相同的策略。

w := &kafka.Writer{
 Addr:     kafka.TCP("localhost:9092""localhost:9093""localhost:9094"),
 Topic:    "topic-A",
 Balancer: kafka.Murmur2Balancer{},
}

Compression

可以通過設置Compression字段在 Writer 上啓用壓縮:

w := &kafka.Writer{
 Addr:        kafka.TCP("localhost:9092""localhost:9093""localhost:9094"),
 Topic:       "topic-A",
 Compression: kafka.Snappy,
}

Reader 將通過檢查消息屬性來確定消費的消息是否被壓縮。

Logging

想要記錄 Reader/Writer 類型的操作,可以在創建時配置日誌記錄器。

kafka-go 中的Logger是一個接口類型。

type Logger interface {
 Printf(string, ...interface{})
}

並且提供了一個LoggerFunc類型,幫我們實現了Logger接口。

type LoggerFunc func(string, ...interface{})

func (f LoggerFunc) Printf(msg string, args ...interface{}) { f(msg, args...) }
Reader

藉助kafka.LoggerFunc我們可以自定義一個Logger

// 自定義一個Logger
func logf(msg string, a ...interface{}) {
 fmt.Printf(msg, a...)
 fmt.Println()
}

r := kafka.NewReader(kafka.ReaderConfig{
 Brokers:     []string{"localhost:9092""localhost:9093""localhost:9094"},
 Topic:       "q1mi-topic",
 Partition:   0,
 Logger:      kafka.LoggerFunc(logf),
 ErrorLogger: kafka.LoggerFunc(logf),
})
Writer

也可以直接使用第三方日誌庫,例如下面示例代碼中使用了 zap 日誌庫。

w := &kafka.Writer{
 Addr:        kafka.TCP("localhost:9092"),
 Topic:       "q1mi-topic",
 Logger:      kafka.LoggerFunc(zap.NewExample().Sugar().Infof),
 ErrorLogger: kafka.LoggerFunc(zap.NewExample().Sugar().Errorf),
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/wWeugBLcaMZ--A0f-rFgYw