Go:基於 Redis Stream 構建可擴展事件流
事件流架構是獨立擴展某些組件的好方法。當提到事件流工具時,這方面的主流似乎是 Kafka,但也有一些其他實用的流工具,比如 NATS 流 / Jetstream, NSQ 或 Redis 流。
今天我將寫一些關於 Redis Stream 基本用法的筆記,我們將構建一個發佈者和一個消費者的例子,並使用運行在 docker 上的 Redis 服務器做本地測試。發佈者將使用 XADD 命令發送一些消息到 Redis 的持久化消息流中,消費者將使用 XREADGROUP 讀取流。
當事件在給定流中發佈時,通過使用 XREADGROUP,可以讓多個消費者執行相同的操作。這種機制使水平擴展消費者成爲可能。當我們添加一個消費者時,它將自己註冊到一個消費者組中,並且流中的消息將均勻地發送到組中的不同消費者。
構建示例
假設我們構建了一個分佈式系統,在這個系統中,將從外部接收一些”tickets“消息,當消息到達時,我們希望執行一些操作,例如解析消息的內容,調用 API 寫信息到數據庫。
本地 Redis
開始之前,我們需要一個 Redis 服務,我們在本地使用 docker 鏡像啓動 redis 服務:
docker run --name localredis -d redis redis-server --appendonly yes
這裏啓動本地 redis 服務,appendonly 參數的作用是設置將 redis 數據持久化。當有數據變化,就會寫副本到一個文件。如果 redis 服務重啓,消息數據會恢復。
Go 發佈者
讓我們開始編寫代碼,爲發佈者創建一個新的 go module。這個發佈者將簡單地使用 XADD 命令向 Redis 流發送一些消息。
初始化一個 Go module,創建一個 main.go 文件,並添加 redis 調用庫:
go mod init publisher
touch main.go
go get github.com/go-redis/redis
在 main.go 文件中,我們將連接 redis 並使用 ping 來檢查是否連接成功:
package main
import (
"fmt"
"log"
"github.com/go-redis/redis"
)
func main() {
log.Println("Publisher started")
redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%s", "127.0.0.1", "6379"),
})
_, err := redisClient.Ping().Result()
if err != nil {
log.Fatal("Unable to connect to Redis", err)
}
log.Println("Connected to Redis server")
}
當執行 go run main.go,如果一切正常的話將看到如下日誌:
2021/08/25 20:54:21 Publisher started
2021/08/25 20:54:21 Connected to Redis server
現在我們創建一個 “tickets” 流然後添加一個消息:
func publishTicketReceivedEvent(client *redis.Client) error {
log.Println("Publishing event to Redis")
err := client.XAdd(&redis.XAddArgs{
Stream: "tickets",
MaxLen: 0,
MaxLenApprox: 0,
ID: "",
Values: map[string]interface{}{
"whatHappened": string("ticket received"),
"ticketID": int(rand.Intn(100000000)),
"ticketData": string("some ticket data"),
},
}).Err()
return err
}
這裏函數將發送一條 “ticket received” 消息並附帶一個隨機 id 和一些數據。在主函數中,我們將以上函數在循環中多次使用看看會發生什麼?
for i := 0; i < 3000; i++ {
err = publishTicketReceivedEvent(redisClient)
if err != nil {
log.Fatal(err)
}
}
如果你執行 main.go 函數,將會打印 3000 多行日誌:
...
2021/08/25 21:08:38 Publishing event to Redis
2021/08/25 21:08:38 Publishing event to Redis
2021/08/25 21:08:38 Publishing event to Redis
我們進入 redis 容器內看看,在 redis 服務中,打開 redis-cli 客戶端,查看當前狀態:
docker exec -it localredis /bin/bash
redis-cli
127.0.0.1:6379> XINFO STREAM tickets
XINFO 是 redis 命令用於監控消息流或消費者組狀態。這裏我們將看到有 3000 條流數據:以下是第一條和最後一條數據:
1) "length"
2) (integer) 3000
3) "radix-tree-keys"
4) (integer) 45
5) "radix-tree-nodes"
6) (integer) 111
7) "last-generated-id"
8) "1615061318123-0"
9) "groups"
10) (integer) 0
11) "first-entry"
12) 1) "1615061313111-0"
2) 1) "whatHappened"
2) "ticket received"
3) "ticketID"
4) "98498081"
5) "ticketData"
6) "some ticket data"
13) "last-entry"
14) 1) "1615061318123-0"
2) 1) "whatHappened"
2) "ticket received"
3) "ticketID"
4) "39114354"
5) "ticketData"
6) "some ticket data"
Ok,我們已經在名爲 “tickets” 流中發佈了一些消息,現在我們來構建一個消費者去消費流數據:
Go 消費者
和發佈者類似,首先需要創建一個新的 module:
go mod init consumer
touch main.go
go get github.com/go-redis/redis
在 main.go 要先連接 redis 服務:
package main
import (
"fmt"
"log
"github.com/go-redis/redis"
)
func main() {
log.Println("Consumer started")
redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%s", "127.0.0.1", "6379"),
})
_, err := redisClient.Ping().Result()
if err != nil {
log.Fatal("Unbale to connect to Redis", err)
}
log.Println("Connected to Redis server")
}
下面使用 XGROUPCREATE 來創建消費者組:
subject := "tickets"
consumersGroup := "tickets-consumer-group"
err = redisClient.XGroupCreate(subject, consumersGroup, "0").Err()
if err != nil {
log.Println(err)
}
現在可以使用 XREADGROUP 來監聽流中消息,並使用一個唯一 id 將消費者註冊到消費者組裏:
爲了生成唯一 id,將使用 xid 庫:
go get github.com/rs/xid
當接收到 ticket 消息時,將調用以下函數:
func handleNewTicket(ticketID string, ticketData string) error {
log.Printf("Handling new ticket id : %s data %s\n", ticketID, ticketData)
return nil
}
然後在 main.go 中創建一個無限循環,我們調用 XREADGROUP 並在 > 位置,表示從該組的第一個待處理消息開始,然後爲每個 ticket 調用 handNewTicket 函數,併發送 XACK 命令到 redis 服務通知消息已經被消費。
uniqueID := xid.New().String()
for {
entries, err := redisClient.XReadGroup(&redis.XReadGroupArgs{
Group: consumersGroup,
Consumer: uniqueID,
Streams: []string{subject, ">"},
Count: 2,
Block: 0,
NoAck: false,
}).Result()
if err != nil {
log.Fatal(err)
}
for i := 0; i < len(entries[0].Messages); i++ {
messageID := entries[0].Messages[i].ID
values := entries[0].Messages[i].Values
eventDescription := fmt.Sprintf("%v", values["whatHappened"])
ticketID := fmt.Sprintf("%v", values["ticketID"])
ticketData := fmt.Sprintf("%v", values["ticketData"])
if eventDescription == "ticket received" {
err := handleNewTicket(ticketID, ticketData)
if err != nil {
log.Fatal(err)
}
redisClient.XAck(subject, consumersGroup, messageID)
}
}
如果程序執行正常,將看到 3000 行的日誌:
...
2021/08/25 21:51:44 Handling new ticket id : 28377708 data some ticket data
2021/08/25 21:51:44 Handling new ticket id : 56451806 data some ticket data
2021/08/25 21:51:44 Handling new ticket id : 94132471 data some ticket data
您還應該注意到,該程序沒有退出,仍然在偵聽消息。這是因爲我們使用 BLOCK = 0 參數調用 XREADGROUP,這意味着程序執行將被阻塞無限長的時間,直到新消息到來。如果您打開第二個終端並再次運行發佈者,會看到消息同時被髮布和消費,這是一件好事。
在現實生活中,消費消息時,我們會對消息做比 log.Println() 更復雜的事情。我們可能調用外部 API,寫入數據庫或將數據導出到 S3 桶中等…… 當我們給消費者增加一些延遲時,會發生什麼?
假設我們仍然以同樣的速度發佈這 3000 條消息,但是我們在消費者中添加了一個 time.Sleep(100 * time.Millisecond),這將耗時超過 5 分鐘來消費這 3000 條消息… 除非我們同時運行多個消費者。好消息是,我們已經爲這種方法編寫了所有代碼。
如何擴展?
我們做一個快速實驗。如果我們將 time.Sleep(100 * time.Millisecond) 添加到消費者的 handNewTicket 函數,將會如何?【模擬數據處理耗時】
func handleNewTicket(ticketID string, ticketData string) error {
log.Printf("Handling new ticket id : %s data %s\n", ticketID, ticketData)
time.Sleep(100 * time.Millisecond)
return nil
}
在消費者中打開 5 個終端運行 go run main.go:
然後在發佈者運行 go run main.go,你將看到 tickets 消息平均的發佈到各個消費者,所以消費者以固定的速度同時運行。幾秒鐘後,所有消費者都應該同時停止。
是不是很厲害?
只需幾行代碼,我們就擁有了一個帶有事件發佈者、事件流和任意數量消費者的分佈式系統。這種解決方案可以很容易地在 kubernetes 集羣上擴展,並允許我們處理潛在的巨大數據負載。
這個示例項目的源代碼可以在這裏找到:https://github.com/gmrdn/redis-streams-go
想了解更多關於 Redis Streams 的信息,查看 https://redis.io/topics/streams-intro
備註:文章部分結果在譯者本地運行測試。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/E9S4dAcMJXiecRoqk1odLQ