Go:基於 Redis Stream 構建可擴展事件流

事件流架構是獨立擴展某些組件的好方法。當提到事件流工具時,這方面的主流似乎是 Kafka,但也有一些其他實用的流工具,比如 NATS 流 / Jetstream, NSQ 或 Redis 流。

今天我將寫一些關於 Redis Stream 基本用法的筆記,我們將構建一個發佈者和一個消費者的例子,並使用運行在 docker 上的 Redis 服務器做本地測試。發佈者將使用 XADD 命令發送一些消息到 Redis 的持久化消息流中,消費者將使用 XREADGROUP 讀取流。

當事件在給定流中發佈時,通過使用 XREADGROUP,可以讓多個消費者執行相同的操作。這種機制使水平擴展消費者成爲可能。當我們添加一個消費者時,它將自己註冊到一個消費者組中,並且流中的消息將均勻地發送到組中的不同消費者。

構建示例

假設我們構建了一個分佈式系統,在這個系統中,將從外部接收一些”tickets“消息,當消息到達時,我們希望執行一些操作,例如解析消息的內容,調用 API 寫信息到數據庫。

本地 Redis

開始之前,我們需要一個 Redis 服務,我們在本地使用 docker 鏡像啓動 redis 服務:

docker run --name localredis -d redis redis-server --appendonly yes

這裏啓動本地 redis 服務,appendonly 參數的作用是設置將 redis 數據持久化。當有數據變化,就會寫副本到一個文件。如果 redis 服務重啓,消息數據會恢復。

Go 發佈者

讓我們開始編寫代碼,爲發佈者創建一個新的 go module。這個發佈者將簡單地使用 XADD 命令向 Redis 流發送一些消息。
初始化一個 Go module,創建一個 main.go 文件,並添加 redis 調用庫:

go mod init publisher
touch main.go
go get github.com/go-redis/redis

在 main.go 文件中,我們將連接 redis 並使用 ping 來檢查是否連接成功:

package main
import (
 "fmt"
 "log"
"github.com/go-redis/redis"
)
func main() {
  log.Println("Publisher started")
  redisClient := redis.NewClient(&redis.Options{
    Addr: fmt.Sprintf("%s:%s", "127.0.0.1", "6379"),
  })
  _, err := redisClient.Ping().Result()
  if err != nil {
    log.Fatal("Unable to connect to Redis", err)
  }
  log.Println("Connected to Redis server")
}

當執行 go run main.go,如果一切正常的話將看到如下日誌:

2021/08/25 20:54:21 Publisher started
2021/08/25 20:54:21 Connected to Redis server

現在我們創建一個 “tickets” 流然後添加一個消息:

func publishTicketReceivedEvent(client *redis.Client) error {
  log.Println("Publishing event to Redis")
  err := client.XAdd(&redis.XAddArgs{
    Stream:       "tickets",
    MaxLen:       0,
    MaxLenApprox: 0,
    ID:           "",
    Values: map[string]interface{}{
      "whatHappened": string("ticket received"),
      "ticketID":     int(rand.Intn(100000000)),
      "ticketData":   string("some ticket data"),
    },
  }).Err()
  return err
}

這裏函數將發送一條 “ticket received” 消息並附帶一個隨機 id 和一些數據。在主函數中,我們將以上函數在循環中多次使用看看會發生什麼?

for i := 0; i < 3000; i++ {
  err = publishTicketReceivedEvent(redisClient)
  if err != nil {
    log.Fatal(err)
  }
}

如果你執行 main.go 函數,將會打印 3000 多行日誌:

...
2021/08/25 21:08:38 Publishing event to Redis
2021/08/25 21:08:38 Publishing event to Redis
2021/08/25 21:08:38 Publishing event to Redis

我們進入 redis 容器內看看,在 redis 服務中,打開 redis-cli 客戶端,查看當前狀態:

docker exec -it localredis /bin/bash
redis-cli
127.0.0.1:6379> XINFO STREAM tickets

XINFO 是 redis 命令用於監控消息流或消費者組狀態。這裏我們將看到有 3000 條流數據:以下是第一條和最後一條數據:

1) "length"
 2) (integer) 3000
 3) "radix-tree-keys"
 4) (integer) 45
 5) "radix-tree-nodes"
 6) (integer) 111
 7) "last-generated-id"
 8) "1615061318123-0"
 9) "groups"
10) (integer) 0
11) "first-entry"
12) 1) "1615061313111-0"
    2) 1) "whatHappened"
       2) "ticket received"
       3) "ticketID"
       4) "98498081"
       5) "ticketData"
       6) "some ticket data"
13) "last-entry"
14) 1) "1615061318123-0"
    2) 1) "whatHappened"
       2) "ticket received"
       3) "ticketID"
       4) "39114354"
       5) "ticketData"
       6) "some ticket data"

Ok,我們已經在名爲 “tickets” 流中發佈了一些消息,現在我們來構建一個消費者去消費流數據:

Go 消費者

和發佈者類似,首先需要創建一個新的 module:

go mod init consumer
touch main.go
go get github.com/go-redis/redis

在 main.go 要先連接 redis 服務:

package main
import (
  "fmt"
  "log
  "github.com/go-redis/redis"
)
func main() {
  log.Println("Consumer started")
  redisClient := redis.NewClient(&redis.Options{
    Addr: fmt.Sprintf("%s:%s", "127.0.0.1", "6379"),
  })
  _, err := redisClient.Ping().Result()
  if err != nil {
    log.Fatal("Unbale to connect to Redis", err)
  }
  log.Println("Connected to Redis server")
}

下面使用 XGROUPCREATE 來創建消費者組:

subject := "tickets"
consumersGroup := "tickets-consumer-group"
err = redisClient.XGroupCreate(subject, consumersGroup, "0").Err()
if err != nil {
log.Println(err)
}

現在可以使用 XREADGROUP 來監聽流中消息,並使用一個唯一 id 將消費者註冊到消費者組裏:
爲了生成唯一 id,將使用 xid 庫:

go get github.com/rs/xid

當接收到 ticket 消息時,將調用以下函數:

func handleNewTicket(ticketID string, ticketData string) error {
  log.Printf("Handling new ticket id : %s data %s\n", ticketID, ticketData)
  return nil
}

然後在 main.go 中創建一個無限循環,我們調用 XREADGROUP 並在 > 位置,表示從該組的第一個待處理消息開始,然後爲每個 ticket 調用 handNewTicket 函數,併發送 XACK 命令到 redis 服務通知消息已經被消費。

uniqueID := xid.New().String()
for {
  entries, err := redisClient.XReadGroup(&redis.XReadGroupArgs{
    Group:    consumersGroup,
    Consumer: uniqueID,
    Streams:  []string{subject, ">"},
    Count:    2,
    Block:    0,
    NoAck:    false,
  }).Result()
  if err != nil {
    log.Fatal(err)
  }
for i := 0; i < len(entries[0].Messages); i++ {
  messageID := entries[0].Messages[i].ID
  values := entries[0].Messages[i].Values
  eventDescription := fmt.Sprintf("%v", values["whatHappened"])
  ticketID := fmt.Sprintf("%v", values["ticketID"])
  ticketData := fmt.Sprintf("%v", values["ticketData"])
  if eventDescription == "ticket received" {
    err := handleNewTicket(ticketID, ticketData)
    if err != nil {
      log.Fatal(err)
    }
    redisClient.XAck(subject, consumersGroup, messageID)
  }
}

如果程序執行正常,將看到 3000 行的日誌:

...
2021/08/25 21:51:44 Handling new ticket id : 28377708 data some ticket data
2021/08/25 21:51:44 Handling new ticket id : 56451806 data some ticket data
2021/08/25 21:51:44 Handling new ticket id : 94132471 data some ticket data

您還應該注意到,該程序沒有退出,仍然在偵聽消息。這是因爲我們使用 BLOCK = 0 參數調用 XREADGROUP,這意味着程序執行將被阻塞無限長的時間,直到新消息到來。如果您打開第二個終端並再次運行發佈者,會看到消息同時被髮布和消費,這是一件好事。

在現實生活中,消費消息時,我們會對消息做比 log.Println() 更復雜的事情。我們可能調用外部 API,寫入數據庫或將數據導出到 S3 桶中等…… 當我們給消費者增加一些延遲時,會發生什麼?

假設我們仍然以同樣的速度發佈這 3000 條消息,但是我們在消費者中添加了一個 time.Sleep(100 * time.Millisecond),這將耗時超過 5 分鐘來消費這 3000 條消息… 除非我們同時運行多個消費者。好消息是,我們已經爲這種方法編寫了所有代碼。

如何擴展?

我們做一個快速實驗。如果我們將 time.Sleep(100 * time.Millisecond) 添加到消費者的 handNewTicket 函數,將會如何?【模擬數據處理耗時】

func handleNewTicket(ticketID string, ticketData string) error {
  log.Printf("Handling new ticket id : %s data %s\n", ticketID, ticketData)
   time.Sleep(100 * time.Millisecond)
   return nil
}

在消費者中打開 5 個終端運行 go run main.go:

然後在發佈者運行 go run main.go,你將看到 tickets 消息平均的發佈到各個消費者,所以消費者以固定的速度同時運行。幾秒鐘後,所有消費者都應該同時停止。

是不是很厲害?

只需幾行代碼,我們就擁有了一個帶有事件發佈者、事件流和任意數量消費者的分佈式系統。這種解決方案可以很容易地在 kubernetes 集羣上擴展,並允許我們處理潛在的巨大數據負載。
這個示例項目的源代碼可以在這裏找到:https://github.com/gmrdn/redis-streams-go
想了解更多關於 Redis Streams 的信息,查看 https://redis.io/topics/streams-intro

備註:文章部分結果在譯者本地運行測試。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/E9S4dAcMJXiecRoqk1odLQ