etcd 通信接口:客戶端 API 實踐與核心方法
你好,我是 aoho,今天我和你分享的主題是通信接口:客戶端 API 實踐與核心方法。
我們在前面介紹了 etcd 的整體架構。學習客戶端與 etcd 服務端的通信以及 etcd 集羣節點的內部通信接口對於我們更好地使用和掌握 etcd 組件很有幫助,也是所必需瞭解的內容。本文我們將會介紹 etcd 的 gRPC 通信接口以及客戶端的實踐。
etcd clientv3 客戶端
etcd 客戶端 clientv3 接入的示例將會以 Go 客戶端爲主,讀者需要準備好基本的開發環境。
首先是 etcd clientv3 的初始化,我們根據指定的 etcd 節點,建立客戶端與 etcd 集羣的連接。
cli,err := clientv3.New(clientv3.Config{
Endpoints:[]string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
如上的代碼實例化了一個 client,這裏需要傳入的兩個參數:
-
Endpoints:etcd 的多個節點服務地址,因爲我是單點本機測試,所以只傳 1 個。
-
DialTimeout:創建 client 的首次連接超時,這裏傳了 5 秒,如果 5 秒都沒有連接成功就會返回 err;值得注意的是,一旦 client 創建成功,我們就不用再關心後續底層連接的狀態了,client 內部會重連。
etcd 客戶端初始化
解決完包依賴之後,我們初始化 etcd 客戶端。客戶端初始化代碼如下所示:
// client_init_test.go
package client
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"testing"
"time"
)
// 測試客戶端連接
func TestEtcdClientInit(t *testing.T) {
var (
config clientv3.Config
client *clientv3.Client
err error
)
// 客戶端配置
config = clientv3.Config{
// 節點配置
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
}
// 建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
} else {
// 輸出集羣信息
fmt.Println(client.Cluster.MemberList(context.TODO()))
}
client.Close()
}
如上的代碼,預期的執行結果如下:
=== RUN TestEtcdClientInit
&{cluster_id:14841639068965178418 member_id:10276657743932975437 raft_term:3 [ID:10276657743932975437 name:"default" peerURLs:"http://localhost:2380" clientURLs:"http://0.0.0.0:2379" ] {} [] 0} <nil>
--- PASS: TestEtcdClientInit (0.08s)
PASS
可以看到 clientv3 與 etcd Server 的節點 localhost:2379 成功建立了連接,並且輸出了集羣的信息,下面我們就可以對 etcd 進行操作了。
client 定義
接着我們來看一下 client 的定義:
type Client struct {
Cluster
KV
Lease
Watcher
Auth
Maintenance
// Username is a user name for authentication.
Username string
// Password is a password for authentication.
Password string
}
注意,這裏顯示的都是可導出的模塊結構字段,代表了客戶端能夠使用的幾大核心模塊,其具體功能介紹如下:
-
Cluster:向集羣裏增加 etcd 服務端節點之類,屬於管理員操作。
-
KV:我們主要使用的功能,即操作 K-V。
-
Lease:租約相關操作,比如申請一個 TTL=10 秒的租約。
-
Watcher:觀察訂閱,從而監聽最新的數據變化。
-
Auth:管理 etcd 的用戶和權限,屬於管理員操作。
-
Maintenance:維護 etcd,比如主動遷移 etcd 的 leader 節點,屬於管理員操作。
proto3
etcd v3 的通信基於 gRPC,proto 文件是定義服務端和客戶端通訊接口的標準。包括:
-
客戶端該傳什麼樣的參數
-
服務端該返回什麼參數
-
客戶端該怎麼調用
-
是阻塞還是非阻塞
-
是同步還是異步。
gRPC 推薦使用 proto3 消息格式,在進行核心 API 的學習之前,我們需要對 proto3 的基本語法有初步的瞭解。proto3 是原有 Protocol Buffer 2(被稱爲 proto2) 的升級版本,刪除了一部分特性,優化了對移動設備的支持。
gRPC 服務
發送到 etcd 服務器的每個 API 請求都是一個 gRPC 遠程過程調用。etcd3 中的 RPC 接口定義根據功能分類到服務中。
處理 etcd 鍵值的重要服務包括:
-
KV 服務,創建、更新、獲取和刪除鍵值對。
-
監視,監視鍵的更改。
-
租約,消耗客戶端保持活動消息的基元。
-
鎖,etcd 提供分佈式共享鎖的支持。
-
選舉,暴露客戶端選舉機制。
請求和響應
etcd3 中的所有 RPC 都遵循相同的格式。每個 RPC 都有一個函數名,該函數將 NameRequest 作爲參數並返回 NameResponse 作爲響應。例如,這是 Range RPC 描述:
service KV {
Range(RangeRequest) returns (RangeResponse)
...
}
響應頭
etcd API 的所有響應都有一個附加的響應標頭,其中包括響應的羣集元數據:
message ResponseHeader {
uint64 cluster_id = 1;
uint64 member_id = 2;
int64 revision = 3;
uint64 raft_term = 4;
}
-
Cluster_ID - 產生響應的集羣的 ID。
-
Member_ID - 產生響應的成員的 ID。
-
Revision - 產生響應時鍵值存儲的修訂版本號。
-
Raft_Term - 產生響應時,成員的 Raft 稱謂。
應用服務可以通過 Cluster_ID 和 Member_ID 字段來確保,當前與之通信的正是預期的那個集羣或者成員。
應用服務可以使用修訂號字段來知悉當前鍵值存儲庫最新的修訂號。當應用程序指定歷史修訂版以進行時程查詢並希望在請求時知道最新修訂版時,此功能特別有用。
應用服務可以使用 Raft_Term 來檢測集羣何時完成一個新的 leader 選舉。
下面開始介紹 etcd 中這幾個重要的服務和接口。
KV 存儲
kv 對象的實例獲取通過如下的方式:
kv := clientev3.NewKV(client)
我們來看一下 kv 接口的具體定義:
type KV interface {
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
// 檢索 keys.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
// 刪除 key,可以使用 WithRange(end), [key, end) 的方式
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// 壓縮給定版本之前的 KV 歷史
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
// 指定某種沒有事務的操作
Do(ctx context.Context, op Op) (OpResponse, error)
// Txn 創建一個事務
Txn(ctx context.Context) Txn
}
從 KV 對象的定義我們可知,它就是一個接口對象,包含幾個主要的 kv 操作方法:
kv 存儲 put
put 的定義如下:
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
其中的參數:
-
ctx: Context 包對象,是用來跟蹤上下文的,比如超時控制
-
key: 存儲對象的 key
-
val: 存儲對象的 value
-
opts: 可變參數,額外選項
Put 將一個鍵值對放入 etcd 中。請注意,鍵值可以是純字節數組,字符串是該字節數組的不可變表示形式。要獲取字節字符串,請執行 string([] byte {0x10,0x20})
。
put 的使用方法如下所示:
putResp, err := kv.Put(context.TODO(),"aa", "hello-world!")
kv 查詢 get
現在可以對存儲的數據進行取值了。默認情況下,Get 將返回 “key” 對應的值。
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
OpOption 爲可選的函數傳參,傳參爲 WithRange(end)
時,Get 將返回 [key,end)範圍內的鍵;傳參爲 WithFromKey()
時,Get 返回大於或等於 key 的鍵;當通過 rev> 0 傳遞 WithRev(rev)
時,Get 查詢給定修訂版本的鍵;如果壓縮了所查找的修訂版本,則返回請求失敗,並顯示 ErrCompacted。傳遞 WithLimit(limit)
時,返回的 key 數量受 limit 限制;傳參爲 WithSort
時,將對鍵進行排序。對應的使用方法如下:
getResp, err := kv.Get(context.TODO(), "aa")
從以上數據的存儲和取值,我們知道 put 返回 PutResponse、get 返回 GetResponse,注意:不同的 KV 操作對應不同的 response 結構,定義如下:
type (
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
)
我們分別來看一看 PutResponse 和 GetResponse 映射的 RangeResponse 結構的定義:
type PutResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,`
// if prev_kv is set in the request, the previous key-value pair will be returned.
PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,`
}
//Header 裏保存的主要是本次更新的 revision 信息
type RangeResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,`
// kvs is the list of key-value pairs matched by the range request.
// kvs is empty when count is requested.
Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,`
// more indicates if there are more keys to return in the requested range.
More bool `protobuf:"varint,3,opt,`
// count is set to the number of keys within the range when requested.
Count int64 `protobuf:"varint,4,opt,`
}
Kvs 字段,保存了本次 Get 查詢到的所有 kv 對,我們繼續看一下 mvccpb.KeyValue 對象的定義:
type KeyValue struct {
Key []byte `protobuf:"bytes,1,opt,`
// create_revision 是當前 key 的最後創建版本
CreateRevision int64 `protobuf:"varint,2,opt,`
// mod_revision 是指當前 key 的最新修訂版本
ModRevision int64 `protobuf:"varint,3,opt,`
// key 的版本,每次更新都會增加版本號
Version int64 `protobuf:"varint,4,opt,`
Value []byte `protobuf:"bytes,5,opt,`
// 綁定了 key 的租期 Id,當 lease 爲 0 ,則表明沒有綁定 key;租期過期,則會刪除 key
Lease int64 `protobuf:"varint,6,opt,`
}
至於 RangeResponse.More 和 Count,當我們使用 withLimit() 選項進行 Get 時會發揮作用,相當於分頁查詢。
接下來,我們通過一個特別的 Get 選項,獲取 aa 目錄下的所有子目錄:
rangeResp, err := kv.Get(context.TODO(), "/aa", clientv3.WithPrefix())
WithPrefix()
用於查找以 /aa
爲前綴的所有 key,因此可以模擬出查找子目錄的效果。
我們知道 etcd 是一個有序的 kv 存儲,因此 /aa
爲前綴的 key 總是順序排列在一起。
withPrefix 實際上會轉化爲範圍查詢,它根據前綴 /aa
生成了一個 key range,[“/aa/”, “/aa0”),這是因爲比 /
大的字符是 0
,所以以 /aa0
作爲範圍的末尾,就可以掃描到所有的 /aa/
打頭的 key 了。
KV 操作實踐
鍵值對的操作是 etcd 中最基本、最常用的功能,主要包括讀、寫、刪除三種基本的操作。在 etcd 中定義了 kv 接口,用來對外提供這些操作,下面我們進行具體的測試:
package client
import (
"context"
"fmt"
"github.com/google/uuid"
"go.etcd.io/etcd/clientv3"
"testing"
"time"
)
func TestKV(t *testing.T) {
rootContext := context.Background()
// 客戶端初始化
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 2 * time.Second,
})
// etcd clientv3 >= v3.2.10, grpc/grpc-go >= v1.7.3
if cli == nil || err == context.DeadlineExceeded {
// handle errors
fmt.Println(err)
panic("invalid connection!")
}
// 客戶端斷開連接
defer cli.Close()
// 初始化 kv
kvc := clientv3.NewKV(cli)
//獲取值
ctx, cancelFunc := context.WithTimeout(rootContext, time.Duration(2)*time.Second)
response, err := kvc.Get(ctx, "cc")
cancelFunc()
if err != nil {
fmt.Println(err)
}
kvs := response.Kvs
// 輸出獲取的 key
if len(kvs) > 0 {
fmt.Printf("last value is :%s\r\n", string(kvs[0].Value))
} else {
fmt.Printf("empty key for %s\n", "cc")
}
//設置值
uuid := uuid.New().String()
fmt.Printf("new value is :%s\r\n", uuid)
ctx2, cancelFunc2 := context.WithTimeout(rootContext, time.Duration(2)*time.Second)
_, err = kvc.Put(ctx2, "cc", uuid)
// 設置成功之後,將該 key 對應的鍵值刪除
if delRes, err := kvc.Delete(ctx2, "cc"); err != nil {
fmt.Println(err)
} else {
fmt.Printf("delete %s for %t\n", "cc", delRes.Deleted > 0)
}
cancelFunc2()
if err != nil {
fmt.Println(err)
}
}
如上的測試用例,主要是針對 kv 的操作,依次獲取 key,即 Get(),對應 etcd 底層實現的 range 接口;其次是寫入鍵值對,即 put 操作;最後刪除剛剛寫入的鍵值對。預期的執行結果如下所示:
=== RUN Test
empty key for cc
new value is: 41e1362a-28a7-4ac9-abf5-fe1474d93f84
delete cc for true
--- PASS: Test (0.11s)
PASS
可以看到,剛開始 etcd 並沒有存儲鍵 cc
的值,隨後寫入新的鍵值對並測試將其刪除。
其他通信接口
其他常用的接口還有 Transaction、Compact、watch、Lease、Lock 等。我們依次看看這些接口的定義。
事務 Transaction
Txn 方法在單個事務中處理多個請求。txn 請求增加鍵值存儲的修訂版本併爲每個完成的請求生成帶有相同修訂版本的事件。etcd 不容許在一個 txn 中多次修改同一個 key。Txn 接口定義如下:
rpc Txn(TxnRequest) returns (TxnResponse) {}
Compact 方法
Compact 方法壓縮 etcd 鍵值對存儲中的事件歷史。鍵值對存儲應該定期壓縮,否則事件歷史會無限制的持續增長。
rpc Compact(CompactionRequest) returns (CompactionResponse) {}
請求的消息體是 CompactionRequest, CompactionRequest 壓縮鍵值對存儲到給定修訂版本。所有修訂版本比壓縮修訂版本小的鍵都將被刪除
watch
Watch API 提供了一個基於事件的接口,用於異步監視鍵的更改。etcd3 監視程序通過從給定的修訂版本(當前版本或歷史版本)持續監視 key 更改,並將 key 更新流回客戶端。
在 rpc.proto 中 Watch service 定義如下:
service Watch {
rpc Watch(stream WatchRequest) returns (stream WatchResponse) {}
}
Watch 觀察將要發生或者已經發生的事件。輸入和輸出都是流; 輸入流用於創建和取消觀察,而輸出流發送事件。一個觀察 RPC 可以在一次性在多個 key 範圍上觀察,併爲多個觀察流化事件。整個事件歷史可以從最後壓縮修訂版本開始觀察。WatchService 只有一個 Watch 方法。
Lease service
Lease service 提供租約的支持。Lease 是一種檢測客戶端存活狀況的機制。羣集授予具有生存時間的租約。如果 etcd 羣集在給定的 TTL 時間內未收到 keepAlive,則租約到期。
爲了將租約綁定到鍵值存儲中,每個 key 最多可以附加一個租約。當租約到期或被撤銷時,該租約所附的所有 key 都將被刪除。每個過期的密鑰都會在事件歷史記錄中生成一個刪除事件。
在 rpc.proto 中 Lease service 定義的接口如下:
service Lease {
rpc LeaseGrant(LeaseGrantRequest) returns (LeaseGrantResponse) {}
rpc LeaseRevoke(LeaseRevokeRequest) returns (LeaseRevokeResponse) {}
rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) {}
rpc LeaseTimeToLive(LeaseTimeToLiveRequest) returns (LeaseTimeToLiveResponse) {}
}
-
LeaseGrant 創建一個租約
-
LeaseRevoke 撤銷一個租約
-
LeaseKeepAlive 用於維持租約
-
LeaseTimeToLive 獲取租約信息
Lock service
Lock service 提供分佈式共享鎖的支持。Lock service 以 gRPC 接口的方式暴露客戶端鎖機制。在 v3lock.proto 中 Lock service 定義如下:
service Lock {
rpc Lock(LockRequest) returns (LockResponse) {}
rpc Unlock(UnlockRequest) returns (UnlockResponse) {}
}
-
Lock 方法,在給定命令鎖上獲得分佈式共享鎖。
-
Unlock 使用 Lock 返回的 key 並釋放對鎖的持有。
小結
本文主要介紹了 etcd 的 gRPC 通信接口以及 clientv3 客戶端的實踐,主要包括鍵值對操作(增刪改查)、watch、Lease、鎖和 Compact 等接口。通過本課時的學習,瞭解 etcd 客戶端的使用以及常用功能的接口定義,對於我們在日常工作中能夠得心應手的使用 etcd 實現相應的功能能夠很有幫助。
當然,本文限於篇幅,只是介紹了常用的幾個通信接口,如果你對其他的接口還有疑問,歡迎在留言區提出。
aoho 求索 aoho 求索是一個分享服務端開發技術的公衆號,主要涉及 Java、Golang 等語言,介紹微服務架構和高併發相關的實踐。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/6OLML6lrTV69Ofxx8y0GQQ