Go go-queue 庫實現 kafka 的發佈 - 訂閱

這篇文章,我們來了解下 go-queue/kq (kafka)。消息隊列對於大型微服務系統是必不可少的,主要是用來解決削峯、降低服務之間的耦合度以及異步能力。

Kafka, Beanstalkd Pub/Sub framework.

https://github.com/zeromicro/go-queue/

02 概述

Apache Kafka 是一個快速、可擴展的、高吞吐、可容錯的分佈式發佈訂閱消息系統。其具有高吞吐量、內置分區、支持數據副本和容錯的特性,適合在大規模消息處理場景中使用。

Kafka 中的一些基本概念和名詞解釋: 

03 使用

go-queue 庫是由 go-zero 團隊針對於消息隊列的封裝,目前支持 kafka、rabbitmq、stan、beanstalkd 等。 go-queue 在 segmentio/kafka-go 庫的基礎上,使用 go-zero 進行了上層統一封裝,核心針對於 kafka 的發佈 / 訂閱,讓開發人員更容易上手,將更多時間聚焦在開發業務上。

其封裝非常簡單,僅僅 3 個文件:config.go、pusher.go、queue.go

config.go

package kq
import "github.com/zeromicro/go-zero/core/service"
const (
  firstOffset = "first"
  lastOffset  = "last"
)
type KqConf struct {
  service.ServiceConf
  Brokers     []string
  Group       string
  Topic       string
  Offset      string `json:",options=first|last,default=last"`
  Conns       int    `json:",default=1"`
  Consumers   int    `json:",default=8"`
  Processors  int    `json:",default=8"`
  MinBytes    int    `json:",default=10240"`    // 10K
  MaxBytes    int    `json:",default=10485760"` // 10M
  Username    string `json:",optional"`
  Password    string `json:",optional"`
  ForceCommit bool   `json:",default=true"`
}

pusher.go

package kq
import (
  "context"
  "strconv"
  "time"
  "github.com/segmentio/kafka-go"
  "github.com/zeromicro/go-zero/core/executors"
  "github.com/zeromicro/go-zero/core/logx"
)
type (
  PushOption func(options *chunkOptions)
  Pusher struct {
    produer  *kafka.Writer
    topic    string
    executor *executors.ChunkExecutor
  }
  chunkOptions struct {
    chunkSize     int
    flushInterval time.Duration
  }
)
func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
  producer := &kafka.Writer{
    Addr:        kafka.TCP(addrs...),
    Topic:       topic,
    Balancer:    &kafka.LeastBytes{},
    Compression: kafka.Snappy,
  }
  pusher := &Pusher{
    produer: producer,
    topic:   topic,
  }
  pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
    chunk := make([]kafka.Message, len(tasks))
    for i := range tasks {
      chunk[i] = tasks[i].(kafka.Message)
    }
    if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
      logx.Error(err)
    }
  }, newOptions(opts)...)
  return pusher
}
func (p *Pusher) Close() error {
  if p.executor != nil {
    p.executor.Flush()
  }
  return p.produer.Close()
}
func (p *Pusher) Name() string {
  return p.topic
}
func (p *Pusher) Push(v string) error {
  msg := kafka.Message{
    Key:   []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
    Value: []byte(v),
  }
  if p.executor != nil {
    return p.executor.Add(msg, len(v))
  } else {
    return p.produer.WriteMessages(context.Background(), msg)
  }
}
func WithChunkSize(chunkSize int) PushOption {
  return func(options *chunkOptions) {
    options.chunkSize = chunkSize
  }
}
func WithFlushInterval(interval time.Duration) PushOption {
  return func(options *chunkOptions) {
    options.flushInterval = interval
  }
}
func newOptions(opts []PushOption) []executors.ChunkOption {
  var options chunkOptions
  for _, opt := range opts {
    opt(&options)
  }
  var chunkOpts []executors.ChunkOption
  if options.chunkSize > 0 {
    chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
  }
  if options.flushInterval > 0 {
    chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
  }
  return chunkOpts
}

這段代碼是一個 Kafka 的生產者,用於將消息推送到 Kafka 集羣中。它使用了 github.com/segmentio/kafka-go 庫來與 Kafka 進行交互。在 NewPusher 函數中,它創建了一個 kafka.Writer 實例,用於將消息寫入 Kafka。 Pusher 結構體包含了一個 executor 字段,它是一個 executors.ChunkExecutor 類型的實例,用於將消息分塊並批量發送到 Kafka。Push 方法用於將消息推送到 Kafka,如果 executor 不爲空,則將消息添加到 executor 中,否則直接將消息寫入 Kafka。WithChunkSize 和 WithFlushInterval 函數用於設置 executor 的參數。

然而,NewPusher 只有 addrs 和 topic 兩個參數,對於有用戶名和密碼的 kafka 來說,會出現連接不上的情況,例如:unexpected EOF: broker appears to be expecting TLS。 需要我們對當前方法進行改造。

type (
  PushOption func(options *chunkOptions)
  PushConf   struct {
    Brokers  []string
    Topic    string
    Username string
    Password string
    Tls      bool
  }
)
func NewPusher(c PushConf, opts ...PushOption) *Pusher {
  producer := &kafka.Writer{
    Addr:        kafka.TCP(c.Brokers...),
    Topic:       c.Topic,
    Balancer:    &kafka.LeastBytes{},
    Compression: kafka.Snappy,
  }
  if c.Username != "" && c.Password != "" {
    mechanism, _ := scram.Mechanism(scram.SHA512, c.Username, c.Password)
    if c.Tls {
      producer.Transport = &kafka.Transport{
        SASL: mechanism,
        TLS:  &tls.Config{},
      }
    } else {
      producer.Transport = &kafka.Transport{
        SASL: mechanism,
      }
    }
  }
  pusher := &Pusher{
    produer: producer,
    topic:   c.Topic,
  }
  pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
    chunk := make([]kafka.Message, len(tasks))
    for i := range tasks {
      chunk[i] = tasks[i].(kafka.Message)
    }
    if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
      logx.Error(err)
    }
  }, newOptions(opts)...)
  return pusher
}

queue.go

package kq
import (
  "context"
  "io"
  "log"
  "time"
  "github.com/segmentio/kafka-go"
  _ "github.com/segmentio/kafka-go/gzip"
  _ "github.com/segmentio/kafka-go/lz4"
  "github.com/segmentio/kafka-go/sasl/plain"
  _ "github.com/segmentio/kafka-go/snappy"
  "github.com/zeromicro/go-zero/core/logx"
  "github.com/zeromicro/go-zero/core/queue"
  "github.com/zeromicro/go-zero/core/service"
  "github.com/zeromicro/go-zero/core/stat"
  "github.com/zeromicro/go-zero/core/threading"
  "github.com/zeromicro/go-zero/core/timex"
)
const (
  defaultCommitInterval = time.Second
  defaultMaxWait        = time.Second
  defaultQueueCapacity  = 1000
)
type (
  ConsumeHandle func(key, value string) error
  ConsumeErrorHandler func(msg kafka.Message, err error)
  ConsumeHandler interface {
    Consume(key, value string) error
  }
  queueOptions struct {
    commitInterval time.Duration
    queueCapacity  int
    maxWait        time.Duration
    metrics        *stat.Metrics
    errorHandler   ConsumeErrorHandler
  }
  QueueOption func(*queueOptions)
  kafkaQueue struct {
    c                KqConf
    consumer         *kafka.Reader
    handler          ConsumeHandler
    channel          chan kafka.Message
    producerRoutines *threading.RoutineGroup
    consumerRoutines *threading.RoutineGroup
    metrics          *stat.Metrics
    errorHandler     ConsumeErrorHandler
  }
  kafkaQueues struct {
    queues []queue.MessageQueue
    group  *service.ServiceGroup
  }
)
func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue {
  q, err := NewQueue(c, handler, opts...)
  if err != nil {
    log.Fatal(err)
  }
  return q
}
func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error) {
  if err := c.SetUp(); err != nil {
    return nil, err
  }
  var options queueOptions
  for _, opt := range opts {
    opt(&options)
  }
  ensureQueueOptions(c, &options)
  if c.Conns < 1 {
    c.Conns = 1
  }
  q := kafkaQueues{
    group: service.NewServiceGroup(),
  }
  for i := 0; i < c.Conns; i++ {
    q.queues = append(q.queues, newKafkaQueue(c, handler, options))
  }
  return q, nil
}
func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue {
  var offset int64
  if c.Offset == firstOffset {
    offset = kafka.FirstOffset
  } else {
    offset = kafka.LastOffset
  }
  readerConfig := kafka.ReaderConfig{
    Brokers:        c.Brokers,
    GroupID:        c.Group,
    Topic:          c.Topic,
    StartOffset:    offset,
    MinBytes:       c.MinBytes, // 10KB
    MaxBytes:       c.MaxBytes, // 10MB
    MaxWait:        options.maxWait,
    CommitInterval: options.commitInterval,
    QueueCapacity:  options.queueCapacity,
  }
  if len(c.Username) > 0 && len(c.Password) > 0 {
    readerConfig.Dialer = &kafka.Dialer{
      SASLMechanism: plain.Mechanism{
        Username: c.Username,
        Password: c.Password,
      },
    }
  }
  consumer := kafka.NewReader(readerConfig)
  return &kafkaQueue{
    c:                c,
    consumer:         consumer,
    handler:          handler,
    channel:          make(chan kafka.Message),
    producerRoutines: threading.NewRoutineGroup(),
    consumerRoutines: threading.NewRoutineGroup(),
    metrics:          options.metrics,
    errorHandler:     options.errorHandler,
  }
}
func (q *kafkaQueue) Start() {
  q.startConsumers()
  q.startProducers()
  q.producerRoutines.Wait()
  close(q.channel)
  q.consumerRoutines.Wait()
}
func (q *kafkaQueue) Stop() {
  q.consumer.Close()
  logx.Close()
}
func (q *kafkaQueue) consumeOne(key, val string) error {
  startTime := timex.Now()
  err := q.handler.Consume(key, val)
  q.metrics.Add(stat.Task{
    Duration: timex.Since(startTime),
  })
  return err
}
func (q *kafkaQueue) startConsumers() {
  for i := 0; i < q.c.Processors; i++ {
    q.consumerRoutines.Run(func() {
      for msg := range q.channel {
        if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
          if q.errorHandler != nil {
            q.errorHandler(msg, err)
          }
          if !q.c.ForceCommit {
            continue
          }
        }
        if err := q.consumer.CommitMessages(context.Background(), msg); err != nil {
          logx.Errorf("commit failed, error: %v", err)
        }
      }
    })
  }
}
func (q *kafkaQueue) startProducers() {
  for i := 0; i < q.c.Consumers; i++ {
    q.producerRoutines.Run(func() {
      for {
        msg, err := q.consumer.FetchMessage(context.Background())
        // io.EOF means consumer closed
        // io.ErrClosedPipe means committing messages on the consumer,
        // kafka will refire the messages on uncommitted messages, ignore
        if err == io.EOF || err == io.ErrClosedPipe {
          return
        }
        if err != nil {
          logx.Errorf("Error on reading message, %q", err.Error())
          continue
        }
        q.channel <- msg
      }
    })
  }
}
func (q kafkaQueues) Start() {
  for _, each := range q.queues {
    q.group.Add(each)
  }
  q.group.Start()
}
func (q kafkaQueues) Stop() {
  q.group.Stop()
}
func WithCommitInterval(interval time.Duration) QueueOption {
  return func(options *queueOptions) {
    options.commitInterval = interval
  }
}
func WithQueueCapacity(queueCapacity int) QueueOption {
  return func(options *queueOptions) {
    options.queueCapacity = queueCapacity
  }
}
func WithHandle(handle ConsumeHandle) ConsumeHandler {
  return innerConsumeHandler{
    handle: handle,
  }
}
func WithMaxWait(wait time.Duration) QueueOption {
  return func(options *queueOptions) {
    options.maxWait = wait
  }
}
func WithMetrics(metrics *stat.Metrics) QueueOption {
  return func(options *queueOptions) {
    options.metrics = metrics
  }
}
func WithErrorHandler(errorHandler ConsumeErrorHandler) QueueOption {
  return func(options *queueOptions) {
    options.errorHandler = errorHandler
  }
}
type innerConsumeHandler struct {
  handle ConsumeHandle
}
func (ch innerConsumeHandler) Consume(k, v string) error {
  return ch.handle(k, v)
}
func ensureQueueOptions(c KqConf, options *queueOptions) {
  if options.commitInterval == 0 {
    options.commitInterval = defaultCommitInterval
  }
  if options.queueCapacity == 0 {
    options.queueCapacity = defaultQueueCapacity
  }
  if options.maxWait == 0 {
    options.maxWait = defaultMaxWait
  }
  if options.metrics == nil {
    options.metrics = stat.NewMetrics(c.Name)
  }
  if options.errorHandler == nil {
    options.errorHandler = func(msg kafka.Message, err error) {
      logx.Errorf("consume: %s, error: %v", string(msg.Value), err)
    }
  }
}

這段代碼是一個基於 Kafka 的消息隊列實現,包含了生產者和消費者的實現。其中,kafkaQueue 結構體表示一個 Kafka 隊列,包含了一個 Kafka 消費者和一個消息處理器。Start 函數用於啓動消費者和生產者,Stop 函數用於停止消費者。WithCommitInterval、WithQueueCapacity、WithMaxWait、WithMetrics 和 WithErrorHandler 函數用於設置消費者的參數。 MustNewQueue 和 NewQueue 函數用於創建一個新的消息隊列實例。

MustNewQueue 函數在創建失敗時會直接退出程序,而 NewQueue 函數會返回一個錯誤。kafkaQueues 結構體表示多個 Kafka 隊列,包含了多個 kafkaQueue 實例。Start 函數用於啓動所有的 Kafka 隊列,Stop 函數用於停止所有的 Kafka 隊列。

值得注意的是,初始化函數 NewQueue 需要傳入參數 ConsumeHandler,這個參數是一個接口,用於處理消息。這個接口的定義如下:

ConsumeHandler interface {
    Consume(key, value string) error
}

只要實現了 ConsumeHandler 接口,就可以作爲參數傳入 NewQueue 函數,這樣就可以自定義消息處理邏輯了。

與生產者不同,消費者代碼已經有了用戶名和密碼,如果需要使用 tls,則需要自行加上。

04 總結

go-queue 的庫封裝的非常簡單,且做了一些優化,如果只是需要使用到 kafka 的發佈 / 訂閱功能,我想直接拿來用是非常方便的,或者是把代碼複製下來簡單改動下。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/x1KIbn9NeLyKTISzWCPIdA