Go go-queue 庫實現 kafka 的發佈 - 訂閱
這篇文章,我們來了解下 go-queue/kq (kafka)。消息隊列對於大型微服務系統是必不可少的,主要是用來解決削峯、降低服務之間的耦合度以及異步能力。
Kafka, Beanstalkd Pub/Sub framework.
https://github.com/zeromicro/go-queue/
02 概述
Apache Kafka 是一個快速、可擴展的、高吞吐、可容錯的分佈式發佈訂閱消息系統。其具有高吞吐量、內置分區、支持數據副本和容錯的特性,適合在大規模消息處理場景中使用。
Kafka 中的一些基本概念和名詞解釋:
-
Producer(生產者):發送消息到 Kafka 的應用程序或服務。
-
Consumer(消費者):從 Kafka 讀取消息的應用程序或服務。
-
Broker(代理或節點):Kafka 的服務器實例,負責存儲數據並處理客戶端的讀寫請求。
-
Topic(主題):消息的分類或者通道。生產者向主題發送消息,消費者從主題讀取消息。
-
Partition(分區):爲了實現並行處理和增加系統的吞吐量,主題可以被分割成一個或多個分區。
-
Offset(偏移量):在分區中,每條消息都有一個唯一的 ID,稱爲偏移量。它幫助消費者跟蹤它已經讀取的消息。
-
Replica(副本):爲了增加數據的持久性和可用性,分區可以被複制到多個節點上。
-
Leader:在副本中的一個特定的角色。所有的讀寫請求都由 leader 處理。其他的副本稱爲 follower,它們複製 leader 的數據。
-
Consumer Group(消費者組):一組消費者,可以協同工作分攤消息消費的任務。每條消息只被組內的一個消費者處理。
-
ZooKeeper:一個分佈式的配置和同步服務,Kafka 使用它來管理集羣中的代理和選舉 leader。
03 使用
go-queue 庫是由 go-zero 團隊針對於消息隊列的封裝,目前支持 kafka、rabbitmq、stan、beanstalkd 等。 go-queue 在 segmentio/kafka-go 庫的基礎上,使用 go-zero 進行了上層統一封裝,核心針對於 kafka 的發佈 / 訂閱,讓開發人員更容易上手,將更多時間聚焦在開發業務上。
其封裝非常簡單,僅僅 3 個文件:config.go、pusher.go、queue.go
config.go
package kq
import "github.com/zeromicro/go-zero/core/service"
const (
firstOffset = "first"
lastOffset = "last"
)
type KqConf struct {
service.ServiceConf
Brokers []string
Group string
Topic string
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
Username string `json:",optional"`
Password string `json:",optional"`
ForceCommit bool `json:",default=true"`
}
-
Brokers: kafka 的地址,支持多個地址
-
Group: 消費者組
-
Topic: 訂閱的 Topic 主題
-
Offset: 消費的偏移量,first 從頭開始消費,last 從最後開始消費
-
Conns: 連接數,默認爲 1,一個 kafka queue 對應可對應多個 consumer,Conns 對應 kafka queue 數量,可以同時初始化多個 kafka queue
-
Consumers: go-queue 內部是起多個 goroutine 從 kafka 中獲取信息寫入進程內的 channel,這個參數是控制此處的 goroutine 數量(並不是真正消費時的併發 goroutine 數量)
-
Processors: 當 Consumers 中的多個 goroutine 將 kafka 消息拉取到進程內部的 channel 後,我們要真正消費消息寫入我們自己邏輯,go-queue 內部通過此參數控制當前消費的併發 goroutine 數量.
-
MinBytes: fetch 每次從 kafka 拉取的最小字節數,如果不夠這個字節數就等待
-
MaxBytes: fetch 每次從 kafka 拉取的最大字節數,消息的大小還需要受到 broker 的 message.max.bytes 限制, 以及 topic 的 max.message.bytes 的限制
-
Username: kafka 的用戶名
-
Password: kafka 的密碼
-
ForceCommit: 是否強制提交,默認爲 true,每次消費都會提交
pusher.go
package kq
import (
"context"
"strconv"
"time"
"github.com/segmentio/kafka-go"
"github.com/zeromicro/go-zero/core/executors"
"github.com/zeromicro/go-zero/core/logx"
)
type (
PushOption func(options *chunkOptions)
Pusher struct {
produer *kafka.Writer
topic string
executor *executors.ChunkExecutor
}
chunkOptions struct {
chunkSize int
flushInterval time.Duration
}
)
func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
producer := &kafka.Writer{
Addr: kafka.TCP(addrs...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Compression: kafka.Snappy,
}
pusher := &Pusher{
produer: producer,
topic: topic,
}
pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
chunk := make([]kafka.Message, len(tasks))
for i := range tasks {
chunk[i] = tasks[i].(kafka.Message)
}
if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
logx.Error(err)
}
}, newOptions(opts)...)
return pusher
}
func (p *Pusher) Close() error {
if p.executor != nil {
p.executor.Flush()
}
return p.produer.Close()
}
func (p *Pusher) Name() string {
return p.topic
}
func (p *Pusher) Push(v string) error {
msg := kafka.Message{
Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
Value: []byte(v),
}
if p.executor != nil {
return p.executor.Add(msg, len(v))
} else {
return p.produer.WriteMessages(context.Background(), msg)
}
}
func WithChunkSize(chunkSize int) PushOption {
return func(options *chunkOptions) {
options.chunkSize = chunkSize
}
}
func WithFlushInterval(interval time.Duration) PushOption {
return func(options *chunkOptions) {
options.flushInterval = interval
}
}
func newOptions(opts []PushOption) []executors.ChunkOption {
var options chunkOptions
for _, opt := range opts {
opt(&options)
}
var chunkOpts []executors.ChunkOption
if options.chunkSize > 0 {
chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
}
if options.flushInterval > 0 {
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
}
return chunkOpts
}
這段代碼是一個 Kafka 的生產者,用於將消息推送到 Kafka 集羣中。它使用了 github.com/segmentio/kafka-go 庫來與 Kafka 進行交互。在 NewPusher 函數中,它創建了一個 kafka.Writer 實例,用於將消息寫入 Kafka。 Pusher 結構體包含了一個 executor 字段,它是一個 executors.ChunkExecutor 類型的實例,用於將消息分塊並批量發送到 Kafka。Push 方法用於將消息推送到 Kafka,如果 executor 不爲空,則將消息添加到 executor 中,否則直接將消息寫入 Kafka。WithChunkSize 和 WithFlushInterval 函數用於設置 executor 的參數。
然而,NewPusher 只有 addrs 和 topic 兩個參數,對於有用戶名和密碼的 kafka 來說,會出現連接不上的情況,例如:unexpected EOF: broker appears to be expecting TLS。 需要我們對當前方法進行改造。
type (
PushOption func(options *chunkOptions)
PushConf struct {
Brokers []string
Topic string
Username string
Password string
Tls bool
}
)
func NewPusher(c PushConf, opts ...PushOption) *Pusher {
producer := &kafka.Writer{
Addr: kafka.TCP(c.Brokers...),
Topic: c.Topic,
Balancer: &kafka.LeastBytes{},
Compression: kafka.Snappy,
}
if c.Username != "" && c.Password != "" {
mechanism, _ := scram.Mechanism(scram.SHA512, c.Username, c.Password)
if c.Tls {
producer.Transport = &kafka.Transport{
SASL: mechanism,
TLS: &tls.Config{},
}
} else {
producer.Transport = &kafka.Transport{
SASL: mechanism,
}
}
}
pusher := &Pusher{
produer: producer,
topic: c.Topic,
}
pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
chunk := make([]kafka.Message, len(tasks))
for i := range tasks {
chunk[i] = tasks[i].(kafka.Message)
}
if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
logx.Error(err)
}
}, newOptions(opts)...)
return pusher
}
queue.go
package kq
import (
"context"
"io"
"log"
"time"
"github.com/segmentio/kafka-go"
_ "github.com/segmentio/kafka-go/gzip"
_ "github.com/segmentio/kafka-go/lz4"
"github.com/segmentio/kafka-go/sasl/plain"
_ "github.com/segmentio/kafka-go/snappy"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/queue"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/timex"
)
const (
defaultCommitInterval = time.Second
defaultMaxWait = time.Second
defaultQueueCapacity = 1000
)
type (
ConsumeHandle func(key, value string) error
ConsumeErrorHandler func(msg kafka.Message, err error)
ConsumeHandler interface {
Consume(key, value string) error
}
queueOptions struct {
commitInterval time.Duration
queueCapacity int
maxWait time.Duration
metrics *stat.Metrics
errorHandler ConsumeErrorHandler
}
QueueOption func(*queueOptions)
kafkaQueue struct {
c KqConf
consumer *kafka.Reader
handler ConsumeHandler
channel chan kafka.Message
producerRoutines *threading.RoutineGroup
consumerRoutines *threading.RoutineGroup
metrics *stat.Metrics
errorHandler ConsumeErrorHandler
}
kafkaQueues struct {
queues []queue.MessageQueue
group *service.ServiceGroup
}
)
func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue {
q, err := NewQueue(c, handler, opts...)
if err != nil {
log.Fatal(err)
}
return q
}
func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error) {
if err := c.SetUp(); err != nil {
return nil, err
}
var options queueOptions
for _, opt := range opts {
opt(&options)
}
ensureQueueOptions(c, &options)
if c.Conns < 1 {
c.Conns = 1
}
q := kafkaQueues{
group: service.NewServiceGroup(),
}
for i := 0; i < c.Conns; i++ {
q.queues = append(q.queues, newKafkaQueue(c, handler, options))
}
return q, nil
}
func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue {
var offset int64
if c.Offset == firstOffset {
offset = kafka.FirstOffset
} else {
offset = kafka.LastOffset
}
readerConfig := kafka.ReaderConfig{
Brokers: c.Brokers,
GroupID: c.Group,
Topic: c.Topic,
StartOffset: offset,
MinBytes: c.MinBytes, // 10KB
MaxBytes: c.MaxBytes, // 10MB
MaxWait: options.maxWait,
CommitInterval: options.commitInterval,
QueueCapacity: options.queueCapacity,
}
if len(c.Username) > 0 && len(c.Password) > 0 {
readerConfig.Dialer = &kafka.Dialer{
SASLMechanism: plain.Mechanism{
Username: c.Username,
Password: c.Password,
},
}
}
consumer := kafka.NewReader(readerConfig)
return &kafkaQueue{
c: c,
consumer: consumer,
handler: handler,
channel: make(chan kafka.Message),
producerRoutines: threading.NewRoutineGroup(),
consumerRoutines: threading.NewRoutineGroup(),
metrics: options.metrics,
errorHandler: options.errorHandler,
}
}
func (q *kafkaQueue) Start() {
q.startConsumers()
q.startProducers()
q.producerRoutines.Wait()
close(q.channel)
q.consumerRoutines.Wait()
}
func (q *kafkaQueue) Stop() {
q.consumer.Close()
logx.Close()
}
func (q *kafkaQueue) consumeOne(key, val string) error {
startTime := timex.Now()
err := q.handler.Consume(key, val)
q.metrics.Add(stat.Task{
Duration: timex.Since(startTime),
})
return err
}
func (q *kafkaQueue) startConsumers() {
for i := 0; i < q.c.Processors; i++ {
q.consumerRoutines.Run(func() {
for msg := range q.channel {
if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
if q.errorHandler != nil {
q.errorHandler(msg, err)
}
if !q.c.ForceCommit {
continue
}
}
if err := q.consumer.CommitMessages(context.Background(), msg); err != nil {
logx.Errorf("commit failed, error: %v", err)
}
}
})
}
}
func (q *kafkaQueue) startProducers() {
for i := 0; i < q.c.Consumers; i++ {
q.producerRoutines.Run(func() {
for {
msg, err := q.consumer.FetchMessage(context.Background())
// io.EOF means consumer closed
// io.ErrClosedPipe means committing messages on the consumer,
// kafka will refire the messages on uncommitted messages, ignore
if err == io.EOF || err == io.ErrClosedPipe {
return
}
if err != nil {
logx.Errorf("Error on reading message, %q", err.Error())
continue
}
q.channel <- msg
}
})
}
}
func (q kafkaQueues) Start() {
for _, each := range q.queues {
q.group.Add(each)
}
q.group.Start()
}
func (q kafkaQueues) Stop() {
q.group.Stop()
}
func WithCommitInterval(interval time.Duration) QueueOption {
return func(options *queueOptions) {
options.commitInterval = interval
}
}
func WithQueueCapacity(queueCapacity int) QueueOption {
return func(options *queueOptions) {
options.queueCapacity = queueCapacity
}
}
func WithHandle(handle ConsumeHandle) ConsumeHandler {
return innerConsumeHandler{
handle: handle,
}
}
func WithMaxWait(wait time.Duration) QueueOption {
return func(options *queueOptions) {
options.maxWait = wait
}
}
func WithMetrics(metrics *stat.Metrics) QueueOption {
return func(options *queueOptions) {
options.metrics = metrics
}
}
func WithErrorHandler(errorHandler ConsumeErrorHandler) QueueOption {
return func(options *queueOptions) {
options.errorHandler = errorHandler
}
}
type innerConsumeHandler struct {
handle ConsumeHandle
}
func (ch innerConsumeHandler) Consume(k, v string) error {
return ch.handle(k, v)
}
func ensureQueueOptions(c KqConf, options *queueOptions) {
if options.commitInterval == 0 {
options.commitInterval = defaultCommitInterval
}
if options.queueCapacity == 0 {
options.queueCapacity = defaultQueueCapacity
}
if options.maxWait == 0 {
options.maxWait = defaultMaxWait
}
if options.metrics == nil {
options.metrics = stat.NewMetrics(c.Name)
}
if options.errorHandler == nil {
options.errorHandler = func(msg kafka.Message, err error) {
logx.Errorf("consume: %s, error: %v", string(msg.Value), err)
}
}
}
這段代碼是一個基於 Kafka 的消息隊列實現,包含了生產者和消費者的實現。其中,kafkaQueue 結構體表示一個 Kafka 隊列,包含了一個 Kafka 消費者和一個消息處理器。Start 函數用於啓動消費者和生產者,Stop 函數用於停止消費者。WithCommitInterval、WithQueueCapacity、WithMaxWait、WithMetrics 和 WithErrorHandler 函數用於設置消費者的參數。 MustNewQueue 和 NewQueue 函數用於創建一個新的消息隊列實例。
MustNewQueue 函數在創建失敗時會直接退出程序,而 NewQueue 函數會返回一個錯誤。kafkaQueues 結構體表示多個 Kafka 隊列,包含了多個 kafkaQueue 實例。Start 函數用於啓動所有的 Kafka 隊列,Stop 函數用於停止所有的 Kafka 隊列。
-
commitInterval : 提交給 kafka broker 間隔時間,默認是 1s
-
queueCapacity:kafka 內部隊列長度
-
maxWait:從 kafka 批量獲取數據時,等待新數據到來的最大時間。
-
metrics:上報消費每個消息消費時間,默認會內部初始化,一般也不需要指定
值得注意的是,初始化函數 NewQueue 需要傳入參數 ConsumeHandler,這個參數是一個接口,用於處理消息。這個接口的定義如下:
ConsumeHandler interface {
Consume(key, value string) error
}
只要實現了 ConsumeHandler 接口,就可以作爲參數傳入 NewQueue 函數,這樣就可以自定義消息處理邏輯了。
與生產者不同,消費者代碼已經有了用戶名和密碼,如果需要使用 tls,則需要自行加上。
04 總結
go-queue 的庫封裝的非常簡單,且做了一些優化,如果只是需要使用到 kafka 的發佈 / 訂閱功能,我想直接拿來用是非常方便的,或者是把代碼複製下來簡單改動下。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/x1KIbn9NeLyKTISzWCPIdA