一款支持自動流水線和客戶端緩存的 Go 語言 Redis 客戶端
Rueidis 是一款高性能的 Go 語言 Redis 客戶端,它支持自動流水線操作和服務端輔助客戶端緩存等功能。Rueidis 的目標是提供一個簡單易用、性能卓越的 Redis 客戶端庫,以滿足 Go 開發者的各種需求。
主要功能
-
自動流水線操作,提升非阻塞命令的執行效率
-
服務端輔助客戶端緩存,大幅降低延遲和提高吞吐量
-
支持泛型對象映射和客戶端緩存
-
提供緩存策略模式(Cache-Aside pattern)的實現
-
支持分佈式鎖和客戶端緩存
-
提供用於編寫測試的 Rueidis 模擬庫
-
集成 OpenTelemetry,方便監控和追蹤
-
支持 Pub/Sub、分片 Pub/Sub 和 Streams
-
兼容 Redis Cluster、Sentinel、RedisJSON、RedisBloom、RediSearch、RedisTimeseries 等
-
提供不依賴 Redis Stack 的概率數據結構
快速入門
package main
import (
"context"
"github.com/redis/rueidis"
)
func main() {
client, err := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
// SET key val NX
err = client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error()
// HGETALL hm
hm, err := client.Do(ctx, client.B().Hgetall().Key("hm").Build()).AsStrMap()
}
命令構建器
Rueidis 提供了簡單易用的命令構建器,方便開發者構建 Redis 命令。
client.B().Set().Key("key").Value("val").Nx().Build()
流水線操作
自動流水線操作
Rueidis 會自動將併發執行的非阻塞 Redis 命令進行流水線操作,以減少網絡往返次數和系統調用次數,從而提高吞吐量。
func BenchmarkPipelining(b *testing.B, client rueidis.Client) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
client.Do(context.Background(), client.B().Get().Key("k").Build()).ToString()
}
})
}
手動流水線操作
除了自動流水線操作外,Rueidis 還支持手動流水線操作。
cmds := make(rueidis.Commands, 0, 10)
for i := 0; i < 10; i++ {
cmds = append(cmds, client.B().Set().Key("key").Value("value").Build())
}
for _, resp := range client.DoMulti(ctx, cmds...) {
if err := resp.Error(); err != nil {
panic(err)
}
}
服務端輔助客戶端緩存
Rueidis 默認開啓了服務端輔助客戶端緩存的 opt-in 模式,開發者可以通過 DoCache() 或 DoMultiCache() 方法使用該功能。
client.DoCache(ctx, client.B().Hmget().Key("mk").Field("1", "2").Cache(), time.Minute).ToArray()
client.DoMultiCache(ctx,
rueidis.CT(client.B().Get().Key("k1").Cache(), 1*time.Minute),
rueidis.CT(client.B().Get().Key("k2").Cache(), 2*time.Minute))
上下文取消
client.Do()、client.DoMulti()、client.DoCache() 和 client.DoMultiCache() 方法都支持上下文取消,如果上下文被取消或超時,這些方法會提前返回。
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error() == context.DeadlineExceeded
發佈 / 訂閱
Rueidis 提供了 client.Receive() 方法來接收來自 Redis 頻道的信息。
err = client.Receive(context.Background(), client.B().Subscribe().Channel("ch1", "ch2").Build(), func(msg rueidis.PubSubMessage) {
// 處理接收到的信息
})
CAS 事務
Rueidis 支持 CAS 事務,即 WATCH + MULTI + EXEC 操作。
client.Dedicated(func(c rueidis.DedicatedClient) error {
c.Do(ctx, c.B().Watch().Key("k1", "k2").Build())
c.Do(ctx, c.B().Mget().Key("k1", "k2").Build())
c.DoMulti(
ctx,
c.B().Multi().Build(),
c.B().Set().Key("k1").Value("1").Build(),
c.B().Set().Key("k2").Value("2").Build(),
c.B().Exec().Build(),
)
return nil
})
Lua 腳本
Rueidis 提供了 NewLuaScript 和 NewLuaScriptReadOnly 方法來創建 Lua 腳本。
script := rueidis.NewLuaScript("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}")
list, err := script.Exec(ctx, client, []string{"k1", "k2"}, []string{"a1", "a2"}).ToArray()
流式讀取
Rueidis 提供了 client.DoStream() 和 client.DoMultiStream() 方法來流式讀取 Redis 的響應數據。
s := client.DoMultiStream(ctx, client.B().Get().Key("a{slot1}").Build(), client.B().Get().Key("b{slot1}").Build())
for s.HasNext() {
n, err := s.WriteTo(io.Discard)
if rueidis.IsRedisNil(err) {
// ...
}
}
內存消耗
Rueidis 的每個底層連接都會分配一個環形緩衝區用於流水線操作,緩衝區的大小由 ClientOption.RingScaleEachConn 選項控制,默認值爲 10,即每個環形緩衝區的大小爲 2^10 字節。
如果 Rueidis 連接過多,可能會佔用大量內存。在這種情況下,可以考慮將 ClientOption.RingScaleEachConn 選項的值減小到 8 或 9,但這樣做可能會降低吞吐量。
創建 Redis 客戶端
可以使用 NewClient 函數創建 Rueidis 客戶端,並指定各種選項。
// 連接到單個 Redis 節點
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
})
// 連接到 Redis 集羣
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
ShuffleInit: true,
})
Redis URL
可以使用 ParseURL 或 MustParseURL 函數解析 Redis URL,並將其轉換爲 ClientOption 結構體。
// 連接到 Redis 集羣
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:7001?addr=127.0.0.1:7002&addr=127.0.0.1:7003"))
任意命令
如果需要構建 Rueidis 命令構建器中沒有提供的 Redis 命令,可以使用 client.B().Arbitrary() 方法。
// 這將生成 [ANY CMD k1 k2 a1 a2] 命令
client.B().Arbitrary("ANY", "CMD").Keys("k1", "k2").Args("a1", "a2").Build()
處理 JSON、原始字節數組和向量相似度搜索
Rueidis 命令構建器將所有參數都視爲 Redis 字符串,這意味着開發者可以將 []byte 直接存儲到 Redis 中,而無需進行轉換。
client.B().Set().Key("b").Value(rueidis.BinaryString([]byte{...})).Build()
命令響應解析
Rueidis 提供了一系列方法來解析 Redis 命令的響應數據,例如 ToString()、AsInt64()、ToArray() 等。
// GET 命令
client.Do(ctx, client.B().Get().Key("k").Build()).ToString()
client.Do(ctx, client.B().Get().Key("k").Build()).AsInt64()
使用 DecodeSliceOfJSON 解析 JSON 數組
DecodeSliceOfJSON 函數可以將 Redis 響應中的 JSON 數組解析爲 Go 結構體切片。
type User struct {
Name string `json:"name"`
}
// ...
var users []*User
if err := rueidis.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1", "user2").Build()), &users); err != nil {
return err
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/6BTRDyjSQIHtLo3Azk8qmQ