Go ETCD ClientV3 詳解

【導讀】etcd 有時會被用來做服務發現組件使用,替代 consul。本文介紹了 go 語言 etcd client 的用法。

如果您覺得有什麼不理解,或者覺得文章有欠缺的地方,請您點擊這裏 (https://github.com/helios741/myblog/issues/77) 提出。我會很感謝您的建議也會解答您的問題。

ETCD golang ClientV3 的基本使用

零、搭建單機的 ETCD

爲了演示,在 Linux 機器上搭建一個不通過 SSL 認證的單機 ETCD,安裝部署步驟如下:在 github 上找到對應的包,下載到機器上:

ETCD_VER=v3.4.4

GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
DOWNLOAD_URL=${GITHUB_URL}

rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test

curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz

/tmp/etcd-download-test/etcd --version
/tmp/etcd-download-test/etcdctl version

通過後臺運行部署起來:

nohup ./etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379 &

不要使用默認的監聽地址,因爲默認的監聽的是 localhost,通過外部無法訪問。

安裝完可以通過./etcdctl version查看 ETCD 的版本。

一、etcdctl 的基本使用

1.1 關於數據的 CRUD + Watch

新增一條數據

./etcdctl put "/school/class/name" "helios"

獲取一條數據

[root@dajiahao03 etcd]# ./etcdctl get "/school/class/name"
/school/class/name
helios

得到一組數據

[root@dajiahao03 etcd]# ./etcdctl get "/school/class/" --prefix
/school/class/name
helios1
/school/class/name1
helios

得到所有的 key

[root@dajiahao03 etcd]# ./etcdctl --prefix --keys-only=true get /
/school/class/name

/school/class/name1

刪除一條數據

[root@dajiahao03 etcd]# ./etcdctl del "/school/class/name2"
1

watch 的功能,這個功能要開兩個終端喲

# 第一個終端:
./etcdctl watch "/school/class" --prefix
# 第二個終端
[root@dajiahao03 etcd]# ./etcdctl put "/school/class/name2" "helios2"
OK
# 第一個終端的變化
[root@dajiahao03 etcd]# ./etcdctl watch "/school/class" --prefix
PUT
/school/class/name2
helios2

1.2 關於集羣的操作

查看集羣狀態(如果單機的可以不用指定 ENDPOINTS,如果是集羣的話,通過分號的形式加到 ENDPOINTS 後面)

[root@dajiahao03 etcd]# export ENDPOINTS="172.27.143.50:2379"
[root@dajiahao03 etcd]# ./etcdctl --write-out=table --endpoints=$ENDPOINTS endpoint status
+--------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
|      ENDPOINT      |        ID        | VERSION | DB SIZE | IS LEADER | IS LEARNER | RAFT TERM | RAFT INDEX | RAFT APPLIED INDEX | ERRORS |
+--------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
| 172.27.143.50:2379 | 8e9e05c52164694d |   3.4.4 |   20 kB |      true |      false |         3 |         13 |                 13 |        |
+--------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+

查看集羣成員

[root@dajiahao03 etcd]# ./etcdctl --write-out=table --endpoints=$ENDPOINTS member list
+------------------+---------+---------+-----------------------+---------------------+------------+
|        ID        | STATUS  |  NAME   |      PEER ADDRS       |    CLIENT ADDRS     | IS LEARNER |
+------------------+---------+---------+-----------------------+---------------------+------------+
| 8e9e05c52164694d | started | default | http://localhost:2380 | http://0.0.0.0:2379 |      false |
+------------------+---------+---------+-----------------------+---------------------+------------+

刪除集羣成員

MEMBER_ID=8e9e05c52164694d
etcdctl --endpoints=$ENDPOINTS member remove ${MEMBER_ID}

添加成員

NEW_ETCD_
NEW_ETCD_HOST="172.27.140.172"
./etcdctl --endpoints=$ENDPOINTS member add ${NEW_ETCD_NAME} --peer-urls=http://${NEW_ETCD_HOST}:2380

1.3 關於集羣的備份和恢復

磁盤碎片整理

[root@dajiahao03 etcd]# ./etcdctl --endpoints=$ENDPOINTS defrag
Finished defragmenting etcd member[172.27.143.50:2379]

備份當前的 ETD 集羣

[root@dajiahao03 etcd]# ./etcdctl snapshot save snapshot.db
{"level":"info","ts":1583651900.406544,"caller":"snapshot/v3_snapshot.go:110","msg":"created temporary db file","path":"snapshot.db.part"}
{"level":"info","ts":1583651900.4077375,"caller":"snapshot/v3_snapshot.go:121","msg":"fetching snapshot","endpoint":"127.0.0.1:2379"}
{"level":"info","ts":1583651900.4105544,"caller":"snapshot/v3_snapshot.go:134","msg":"fetched snapshot","endpoint":"127.0.0.1:2379","took":0.003921237}
{"level":"info","ts":1583651900.410609,"caller":"snapshot/v3_snapshot.go:143","msg":"saved","path":"snapshot.db"}
Snapshot saved at snapshot.db
[root@dajiahao03 etcd]# ll snapshot.db
-rw------- 1 root root 20512 3月   8 15:18 snapshot.db

查看 snapshot 狀態

[root@dajiahao03 etcd]# ./etcdctl snapshot status snapshot.db
21c0c96e, 8, 11, 20 kB

從備份中恢復集羣

[root@dajiahao03 etcd]# ./etcdctl snapshot save snapshot.db
{"level":"info","ts":1583652044.0606484,"caller":"snapshot/v3_snapshot.go:110","msg":"created temporary db file","path":"snapshot.db.part"}
{"level":"info","ts":1583652044.0613058,"caller":"snapshot/v3_snapshot.go:121","msg":"fetching snapshot","endpoint":"127.0.0.1:2379"}
{"level":"info","ts":1583652044.0659368,"caller":"snapshot/v3_snapshot.go:134","msg":"fetched snapshot","endpoint":"127.0.0.1:2379","took":0.005182366}
{"level":"info","ts":1583652044.0660565,"caller":"snapshot/v3_snapshot.go:143","msg":"saved","path":"snapshot.db"}
Snapshot saved at snapshot.db

切換 leader

# 先看狀態
etcdctl endpoint --cluster=true status  -w table
# move-leader
./etcdctl move-leader d6414a7c7c550d29

二、使用 etcd client 可能會出現的問題

通過go get go.etcd.io/etcd/clientv3,如果覺得慢活着出現問題,可以手動的把 etcd 項目手動 clone 到指定目錄先:

mkfir -p ${GOPATH}/src/go.etcd.io
git clone git@github.com:etcd-io/etcd.git ${GOPATH}/src/go.etcd.io

在 build 的時候出現下圖的問題的話,可以參考這個博客,但是我嘗試了,好想不管用,如果不管用可以看我後面的終極方法。

經過了上述的還是不行,因爲我的 go 版本是 13.X,我就乾脆降到 12.X,就 work 了。步驟如下(國內鏡像地址:http://mirrors.ustc.edu.cn/golang/):

# 找到匹配的版本的go包(我的是mac的)
wget http://mirrors.ustc.edu.cn/golang/go1.12.4.darwin-amd64.tar.gz
tar -C /usr/local/bin -xzf /home/yourname/Downloads/go1.12.4.linux-amd64.tar.gz
# 修改bashrc重的PATH(我用的是zsh)
vim ~/.zshrc
export PATH=$PATH:/usr/local/bin/go/bin
source ~/.zshrc

三、etcd ClientV3 的使用

3.1 連接 ETCD

var (
    config clientv3.Config
    client *clientv3.Client
    err error
)
// 客戶端配置
config = clientv3.Config{
    Endpoints: []string{"172.27.43.50:2379"},
    DialTimeout: 5 * time.Second,
}
// 建立連接
if client, err = clientv3.New(config); err != nil {
    fmt.Println(err)
    return
}

3.2 寫入數據到 ETCD

// 實例化一個用於操作ETCD的KV
    kv = clientv3.NewKV(client)
    if putResp, err = kv.Put(context.TODO()"/school/class/students""helios0", clientv3.WithPrevKV()); err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(putResp.Header.Revision)
    if putResp.PrevKv != nil {
        fmt.Printf("prev Value: %s \n CreateRevision : %d \n ModRevision: %d \n Version: %d \n",
            string(putResp.PrevKv.Value), putResp.PrevKv.CreateRevision, putResp.PrevKv.ModRevision, putResp.PrevKv.Version)
    }

輸出爲:

20
prev Value: helios0
 CreateRevision : 9
 ModRevision: 19
 Version: 11

這裏注意 kv.Put 的最後一個參數表示,返回上一次的數據,是個可選參數。在 etcd 的 client 中有很多這樣的可選參數,更多的參數可以參考這個文檔 clientv3 option

3.3 獲取 ETCD 裏面的數據

// 實例化一個用於操作ETCD的KV
kv = clientv3.NewKV(client)
if getResp, err = kv.Get(context.TODO()"/school/class/students"); err != nil {
    fmt.Println(err)
    return
}
// 輸出本次的Revision
fmt.Printf("Key is s %s \n Value is %s \n", getResp.Kvs[0].Key, getResp.Kvs[0].Value)

輸出爲:

Key is s /school/class/students
Value is helios0

如果給 kv.Get 方法加上clientv3.WithPrefix()就能查找出以某個爲前綴的所有 KV。

3.4 刪除 ETCD 裏面的數據

kv = clientv3.NewKV(client)
_, err = kv.Put(context.TODO()"/school/class/students""helios1")

if delResp, err = kv.Delete(context.TODO()"/school/class/students", clientv3.WithPrevKV()); err != nil {
    fmt.Println(err)
    return
}
if len(delResp.PrevKvs) != 0 {
    for _, kvpair = range delResp.PrevKvs {
        fmt.Printf("delete key is: %s \n Value: %s \n", string(kvpair.Key), string(kvpair.Value))
    }
}

輸出爲:

delete key is: /school/class/students
 Value: helios1

在 Delete 中還可以增加下面兩個 option:- clientv3.WithFromKey(): 從這個 key 開始 - clientv3.WithLimit(n): 先知刪除 n 條

3.4 對租約的操作

3.4.1 申請一個 10s 的租約,定時查看是否過期

// 申請一個租約
lease = clientv3.NewLease(client)
if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
    fmt.Println(err)
    return
}
leaseId = leaseGrantResp.ID

// 獲得kv API子集
kv = clientv3.NewKV(client)

if _, err = kv.Put(context.TODO()"/school/class/students""h", clientv3.WithLease(leaseId)); err != nil {
    fmt.Println(err)
    return
}

for {
    if getResp, err = kv.Get(context.TODO()"/school/class/students"); err != nil {
        fmt.Println(err)
        return
    }
    if getResp.Count == 0 {
        fmt.Println("kv過期了")
        break
    }
    fmt.Println("還沒過期:", getResp.Kvs)
    time.Sleep(2 * time.Second)
}

輸出:

還沒過期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
還沒過期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
還沒過期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
還沒過期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
還沒過期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
還沒過期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
kv過期了

3.4.2 自動續租

if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
    fmt.Println(err)
    return
}
go func() {
    for {
        select {
        case keepResp = <- keepRespChan:
            if keepRespChan == nil {
                fmt.Println("租約已經失效了")
                goto END
            } else {    // 每秒會續租一次, 所以就會受到一次應答
                fmt.Println("收到自動續租應答:", keepResp.ID)
            }
        }
    }
END:
}()

3.5 watch 功能

對某一個 key 進行監聽 5s:

kv = clientv3.NewKV(client)

// 模擬KV的變化
go func() {
    for {
        _ , err = kv.Put(context.TODO()"/school/class/students""helios1")
        _, err = kv.Delete(context.TODO()"/school/class/students")
        time.Sleep(1 * time.Second)
    }
}()

// 先GET到當前的值,並監聽後續變化
if getResp, err = kv.Get(context.TODO()"/school/class/students"); err != nil {
    fmt.Println(err)
    return
}

// 現在key是存在的
if len(getResp.Kvs) != 0 {
    fmt.Println("當前值:", string(getResp.Kvs[0].Value))
}

// 獲得當前revision
watchStartRevision = getResp.Header.Revision + 1
// 創建一個watcher
watcher = clientv3.NewWatcher(client)
fmt.Println("從該版本向後監聽:", watchStartRevision)

ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5 * time.Second, func() {
    cancelFunc()
})

watchRespChan = watcher.Watch(ctx, "/school/class/students", clientv3.WithRev(watchStartRevision))
// 處理kv變化事件
for watchResp = range watchRespChan {
    for _, event = range watchResp.Events {
        switch event.Type {
        case mvccpb.PUT:
            fmt.Println("修改爲:", string(event.Kv.Value)"Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
        case mvccpb.DELETE:
            fmt.Println("刪除了""Revision:", event.Kv.ModRevision)
        }
    }
}

3.6 通過 op 方法代替 kv.Get/kv.Put/kv.Delete

putOp = clientv3.OpPut("/school/class/students""helios")

if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
    panic(err)
}
fmt.Println("寫入Revision:", opResp.Put().Header.Revision)

getOp = clientv3.OpGet("/school/class/students")

if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
    panic(err)
}
fmt.Println("數據Revision:", opResp.Get().Kvs[0].ModRevision)
fmt.Println("數據value:", string(opResp.Get().Kvs[0].Value))

輸出爲:

寫入Revision: 46
數據Revision: 46
數據value: helios

3.7(選看) 通過 txn 實現分佈式鎖

ETCD 中的 txn 通過簡單的 "If-Then-Else" 實現了原子操作。

實現分佈式鎖主要分爲三個步驟:1. 上鎖,包括創建租約、自動續約、在租約時間內去搶一個 key 2. 搶到鎖後執行業務邏輯,沒有搶到退出 3. 釋放租約

接下來我們看代碼:

// 1. 上鎖
// 1.1 創建租約
lease = clientv3.NewLease(client)

if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
    panic(err)
}
leaseId = leaseGrantResp.ID

// 1.2 自動續約
// 創建一個可取消的租約,主要是爲了退出的時候能夠釋放
ctx, cancelFunc = context.WithCancel(context.TODO())

// 3. 釋放租約
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseId)

if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
    panic(err)
}
// 續約應答
go func() {
    for {
        select {
        case keepResp = <- keepRespChan:
            if keepRespChan == nil {
                fmt.Println("租約已經失效了")
                goto END
            } else {    // 每秒會續租一次, 所以就會受到一次應答
                fmt.Println("收到自動續租應答:", keepResp.ID)
            }
        }
    }
END:
}()

// 1.3 在租約時間內去搶鎖(etcd裏面的鎖就是一個key)
kv = clientv3.NewKV(client)

// 創建事物
txn = kv.Txn(context.TODO())

//if 不存在key, then 設置它, else 搶鎖失敗
txn.If(clientv3.Compare(clientv3.CreateRevision("lock")"=", 0)).
    Then(clientv3.OpPut("lock""g", clientv3.WithLease(leaseId))).
    Else(clientv3.OpGet("lock"))

// 提交事務
if txnResp, err = txn.Commit(); err != nil {
    panic(err)
}

if !txnResp.Succeeded {
    fmt.Println("鎖被佔用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
    return
}

// 2. 搶到鎖後執行業務邏輯,沒有搶到退出
fmt.Println("處理任務")
time.Sleep(5 * time.Second)

// 3. 釋放鎖,步驟在上面的defer,當defer租約關掉的時候,對應的key被回收了

轉自:Helios

zhuanlan.zhihu.com/p/111800017

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/KRkxwkjuUeQ2wegyC7C0Mw