etcd 通信接口:客戶端 API 實踐與核心方法

你好,我是 aoho,今天我和你分享的主題是通信接口:客戶端 API 實踐與核心方法。

我們在前面介紹了 etcd 的整體架構。學習客戶端與 etcd 服務端的通信以及 etcd 集羣節點的內部通信接口對於我們更好地使用和掌握 etcd 組件很有幫助,也是所必需瞭解的內容。本文我們將會介紹 etcd 的 gRPC 通信接口以及客戶端的實踐。

etcd clientv3 客戶端

etcd 客戶端 clientv3 接入的示例將會以 Go 客戶端爲主,讀者需要準備好基本的開發環境。

首先是 etcd clientv3 的初始化,我們根據指定的 etcd 節點,建立客戶端與 etcd 集羣的連接。

    cli,err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })

如上的代碼實例化了一個 client,這裏需要傳入的兩個參數:

etcd 客戶端初始化

解決完包依賴之後,我們初始化 etcd 客戶端。客戶端初始化代碼如下所示:

// client_init_test.go
package client

import (
 "context"
 "fmt"
 "go.etcd.io/etcd/clientv3"
 "testing"
 "time"
)
// 測試客戶端連接
func TestEtcdClientInit(t *testing.T) {

 var (
  config clientv3.Config
  client *clientv3.Client
  err    error
 )
 // 客戶端配置
 config = clientv3.Config{
  // 節點配置
  Endpoints:   []string{"localhost:2379"},
  DialTimeout: 5 * time.Second,
 }
 // 建立連接
 if client, err = clientv3.New(config); err != nil {
  fmt.Println(err)
 } else {
  // 輸出集羣信息
  fmt.Println(client.Cluster.MemberList(context.TODO()))
 }
 client.Close()
}

如上的代碼,預期的執行結果如下:

=== RUN   TestEtcdClientInit
&{cluster_id:14841639068965178418 member_id:10276657743932975437 raft_term:3  [ID:10276657743932975437 name:"default" peerURLs:"http://localhost:2380" clientURLs:"http://0.0.0.0:2379" ] {} [] 0} <nil>
--- PASS: TestEtcdClientInit (0.08s)
PASS

可以看到 clientv3 與 etcd Server 的節點 localhost:2379 成功建立了連接,並且輸出了集羣的信息,下面我們就可以對 etcd 進行操作了。

client 定義

接着我們來看一下 client 的定義:

type Client struct {
    Cluster
    KV
    Lease
    Watcher
    Auth
    Maintenance

    // Username is a user name for authentication.
    Username string
    // Password is a password for authentication.
    Password string
}

注意,這裏顯示的都是可導出的模塊結構字段,代表了客戶端能夠使用的幾大核心模塊,其具體功能介紹如下:

proto3

etcd v3 的通信基於 gRPC,proto 文件是定義服務端和客戶端通訊接口的標準。包括:

gRPC 推薦使用 proto3 消息格式,在進行核心 API 的學習之前,我們需要對 proto3 的基本語法有初步的瞭解。proto3 是原有 Protocol Buffer 2(被稱爲 proto2) 的升級版本,刪除了一部分特性,優化了對移動設備的支持。

gRPC 服務

發送到 etcd 服務器的每個 API 請求都是一個 gRPC 遠程過程調用。etcd3 中的 RPC 接口定義根據功能分類到服務中。

處理 etcd 鍵值的重要服務包括:

請求和響應

etcd3 中的所有 RPC 都遵循相同的格式。每個 RPC 都有一個函數名,該函數將 NameRequest 作爲參數並返回 NameResponse 作爲響應。例如,這是 Range RPC 描述:

service KV {
  Range(RangeRequest) returns (RangeResponse)
  ...
}

響應頭

etcd API 的所有響應都有一個附加的響應標頭,其中包括響應的羣集元數據:

message ResponseHeader {
  uint64 cluster_id = 1;
  uint64 member_id = 2;
  int64 revision = 3;
  uint64 raft_term = 4;
}

應用服務可以通過 Cluster_ID 和 Member_ID 字段來確保,當前與之通信的正是預期的那個集羣或者成員。

應用服務可以使用修訂號字段來知悉當前鍵值存儲庫最新的修訂號。當應用程序指定歷史修訂版以進行時程查詢並希望在請求時知道最新修訂版時,此功能特別有用。

應用服務可以使用 Raft_Term 來檢測集羣何時完成一個新的 leader 選舉。

下面開始介紹 etcd 中這幾個重要的服務和接口。

KV 存儲

kv 對象的實例獲取通過如下的方式:

kv  := clientev3.NewKV(client)

我們來看一下 kv 接口的具體定義:

type KV interface {

    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

    // 檢索 keys.
    Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

    // 刪除 key,可以使用 WithRange(end)[key, end) 的方式
    Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)

    // 壓縮給定版本之前的 KV 歷史
    Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)

    // 指定某種沒有事務的操作
    Do(ctx context.Context, op Op) (OpResponse, error)

    // Txn 創建一個事務
    Txn(ctx context.Context) Txn
}

從 KV 對象的定義我們可知,它就是一個接口對象,包含幾個主要的 kv 操作方法:

kv 存儲 put

put 的定義如下:

    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

其中的參數:

Put 將一個鍵值對放入 etcd 中。請注意,鍵值可以是純字節數組,字符串是該字節數組的不可變表示形式。要獲取字節字符串,請執行 string([] byte {0x10,0x20})

put 的使用方法如下所示:

putResp, err := kv.Put(context.TODO(),"aa""hello-world!")

kv 查詢 get

現在可以對存儲的數據進行取值了。默認情況下,Get 將返回 “key” 對應的值。

Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

OpOption 爲可選的函數傳參,傳參爲 WithRange(end) 時,Get 將返回 [key,end)範圍內的鍵;傳參爲 WithFromKey() 時,Get 返回大於或等於 key 的鍵;當通過 rev> 0 傳遞 WithRev(rev) 時,Get 查詢給定修訂版本的鍵;如果壓縮了所查找的修訂版本,則返回請求失敗,並顯示 ErrCompacted。傳遞 WithLimit(limit) 時,返回的 key 數量受 limit 限制;傳參爲 WithSort 時,將對鍵進行排序。對應的使用方法如下:

getResp, err := kv.Get(context.TODO()"aa")

從以上數據的存儲和取值,我們知道 put 返回 PutResponse、get 返回 GetResponse,注意:不同的 KV 操作對應不同的 response 結構,定義如下:

type (
    CompactResponse pb.CompactionResponse
    PutResponse     pb.PutResponse
    GetResponse     pb.RangeResponse
    DeleteResponse  pb.DeleteRangeResponse
    TxnResponse     pb.TxnResponse
)

我們分別來看一看 PutResponse 和 GetResponse 映射的 RangeResponse 結構的定義:

type PutResponse struct {
    Header *ResponseHeader `protobuf:"bytes,1,opt,`
    // if prev_kv is set in the request, the previous key-value pair will be returned.
    PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,`
}
//Header 裏保存的主要是本次更新的 revision 信息

type RangeResponse struct {
    Header *ResponseHeader `protobuf:"bytes,1,opt,`
    // kvs is the list of key-value pairs matched by the range request.
    // kvs is empty when count is requested.
    Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,`
    // more indicates if there are more keys to return in the requested range.
    More bool `protobuf:"varint,3,opt,`
    // count is set to the number of keys within the range when requested.
    Count int64 `protobuf:"varint,4,opt,`
}

Kvs 字段,保存了本次 Get 查詢到的所有 kv 對,我們繼續看一下 mvccpb.KeyValue 對象的定義:

type KeyValue struct {

    Key []byte `protobuf:"bytes,1,opt,`
    // create_revision 是當前 key 的最後創建版本
    CreateRevision int64 `protobuf:"varint,2,opt,`
    // mod_revision 是指當前 key 的最新修訂版本
    ModRevision int64 `protobuf:"varint,3,opt,`
    // key 的版本,每次更新都會增加版本號
    Version int64 `protobuf:"varint,4,opt,`

    Value []byte `protobuf:"bytes,5,opt,`

    // 綁定了 key 的租期 Id,當 lease 爲 0 ,則表明沒有綁定 key;租期過期,則會刪除 key
    Lease int64 `protobuf:"varint,6,opt,`
}

至於 RangeResponse.More 和 Count,當我們使用 withLimit() 選項進行 Get 時會發揮作用,相當於分頁查詢。

接下來,我們通過一個特別的 Get 選項,獲取 aa 目錄下的所有子目錄:

rangeResp, err := kv.Get(context.TODO()"/aa", clientv3.WithPrefix())

WithPrefix() 用於查找以 /aa 爲前綴的所有 key,因此可以模擬出查找子目錄的效果。

我們知道 etcd 是一個有序的 kv 存儲,因此 /aa 爲前綴的 key 總是順序排列在一起。

withPrefix 實際上會轉化爲範圍查詢,它根據前綴 /aa 生成了一個 key range,[“/aa/”, “/aa0”),這是因爲比 / 大的字符是 0,所以以 /aa0 作爲範圍的末尾,就可以掃描到所有的 /aa/ 打頭的 key 了。

KV 操作實踐

鍵值對的操作是 etcd 中最基本、最常用的功能,主要包括讀、寫、刪除三種基本的操作。在 etcd 中定義了 kv 接口,用來對外提供這些操作,下面我們進行具體的測試:

package client

import (
 "context"
 "fmt"
 "github.com/google/uuid"
 "go.etcd.io/etcd/clientv3"
 "testing"
 "time"
)

func TestKV(t *testing.T) {
 rootContext := context.Background()
 // 客戶端初始化
 cli, err := clientv3.New(clientv3.Config{
  Endpoints:   []string{"localhost:2379"},
  DialTimeout: 2 * time.Second,
 })
 // etcd clientv3 >= v3.2.10, grpc/grpc-go >= v1.7.3
 if cli == nil || err == context.DeadlineExceeded {
  // handle errors
  fmt.Println(err)
  panic("invalid connection!")
 }
 // 客戶端斷開連接
 defer cli.Close()
 // 初始化 kv
 kvc := clientv3.NewKV(cli)
 //獲取值
 ctx, cancelFunc := context.WithTimeout(rootContext, time.Duration(2)*time.Second)
 response, err := kvc.Get(ctx, "cc")
 cancelFunc()
 if err != nil {
  fmt.Println(err)
 }
 kvs := response.Kvs
 // 輸出獲取的 key
 if len(kvs) > 0 {
  fmt.Printf("last value is :%s\r\n", string(kvs[0].Value))
 } else {
  fmt.Printf("empty key for %s\n""cc")
 }
 //設置值
 uuid := uuid.New().String()
 fmt.Printf("new value is :%s\r\n", uuid)
 ctx2, cancelFunc2 := context.WithTimeout(rootContext, time.Duration(2)*time.Second)
 _, err = kvc.Put(ctx2, "cc", uuid)
 // 設置成功之後,將該 key 對應的鍵值刪除
 if delRes, err := kvc.Delete(ctx2, "cc"); err != nil {
  fmt.Println(err)
 } else {
  fmt.Printf("delete %s for %t\n""cc", delRes.Deleted > 0)
 }
 cancelFunc2()
 if err != nil {
  fmt.Println(err)
 }
}

如上的測試用例,主要是針對 kv 的操作,依次獲取 key,即 Get(),對應 etcd 底層實現的 range 接口;其次是寫入鍵值對,即 put 操作;最後刪除剛剛寫入的鍵值對。預期的執行結果如下所示:

=== RUN   Test
empty key for cc
new value is: 41e1362a-28a7-4ac9-abf5-fe1474d93f84
delete cc for true
--- PASS: Test (0.11s)
PASS

可以看到,剛開始 etcd 並沒有存儲鍵 cc 的值,隨後寫入新的鍵值對並測試將其刪除。

其他通信接口

其他常用的接口還有 Transaction、Compact、watch、Lease、Lock 等。我們依次看看這些接口的定義。

事務 Transaction

Txn 方法在單個事務中處理多個請求。txn 請求增加鍵值存儲的修訂版本併爲每個完成的請求生成帶有相同修訂版本的事件。etcd 不容許在一個 txn 中多次修改同一個 key。Txn 接口定義如下:

rpc Txn(TxnRequest) returns (TxnResponse) {}

Compact 方法

Compact 方法壓縮 etcd 鍵值對存儲中的事件歷史。鍵值對存儲應該定期壓縮,否則事件歷史會無限制的持續增長。

rpc Compact(CompactionRequest) returns (CompactionResponse) {}

請求的消息體是 CompactionRequest, CompactionRequest 壓縮鍵值對存儲到給定修訂版本。所有修訂版本比壓縮修訂版本小的鍵都將被刪除

watch

Watch API 提供了一個基於事件的接口,用於異步監視鍵的更改。etcd3 監視程序通過從給定的修訂版本(當前版本或歷史版本)持續監視 key 更改,並將 key 更新流回客戶端。

在 rpc.proto 中 Watch service 定義如下:

service Watch {
  rpc Watch(stream WatchRequest) returns (stream WatchResponse) {}
}

Watch 觀察將要發生或者已經發生的事件。輸入和輸出都是流; 輸入流用於創建和取消觀察,而輸出流發送事件。一個觀察 RPC 可以在一次性在多個 key 範圍上觀察,併爲多個觀察流化事件。整個事件歷史可以從最後壓縮修訂版本開始觀察。WatchService 只有一個 Watch 方法。

Lease service

Lease service 提供租約的支持。Lease 是一種檢測客戶端存活狀況的機制。羣集授予具有生存時間的租約。如果 etcd 羣集在給定的 TTL 時間內未收到 keepAlive,則租約到期。

爲了將租約綁定到鍵值存儲中,每個 key 最多可以附加一個租約。當租約到期或被撤銷時,該租約所附的所有 key 都將被刪除。每個過期的密鑰都會在事件歷史記錄中生成一個刪除事件。

在 rpc.proto 中 Lease service 定義的接口如下:

service Lease {

  rpc LeaseGrant(LeaseGrantRequest) returns (LeaseGrantResponse) {}

  rpc LeaseRevoke(LeaseRevokeRequest) returns (LeaseRevokeResponse) {}

  rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) {}

  rpc LeaseTimeToLive(LeaseTimeToLiveRequest) returns (LeaseTimeToLiveResponse) {}
}

Lock service

Lock service 提供分佈式共享鎖的支持。Lock service 以 gRPC 接口的方式暴露客戶端鎖機制。在 v3lock.proto 中 Lock service 定義如下:

service Lock {

  rpc Lock(LockRequest) returns (LockResponse) {}

  rpc Unlock(UnlockRequest) returns (UnlockResponse) {}
}

小結

本文主要介紹了 etcd 的 gRPC 通信接口以及 clientv3 客戶端的實踐,主要包括鍵值對操作(增刪改查)、watch、Lease、鎖和 Compact 等接口。通過本課時的學習,瞭解 etcd 客戶端的使用以及常用功能的接口定義,對於我們在日常工作中能夠得心應手的使用 etcd 實現相應的功能能夠很有幫助。

當然,本文限於篇幅,只是介紹了常用的幾個通信接口,如果你對其他的接口還有疑問,歡迎在留言區提出。

aoho 求索 aoho 求索是一個分享服務端開發技術的公衆號,主要涉及 Java、Golang 等語言,介紹微服務架構和高併發相關的實踐。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/6OLML6lrTV69Ofxx8y0GQQ