Go ETCD ClientV3 詳解
【導讀】etcd 有時會被用來做服務發現組件使用,替代 consul。本文介紹了 go 語言 etcd client 的用法。
如果您覺得有什麼不理解,或者覺得文章有欠缺的地方,請您點擊這裏 (https://github.com/helios741/myblog/issues/77) 提出。我會很感謝您的建議也會解答您的問題。
ETCD golang ClientV3 的基本使用
零、搭建單機的 ETCD
爲了演示,在 Linux 機器上搭建一個不通過 SSL 認證的單機 ETCD,安裝部署步驟如下:在 github 上找到對應的包,下載到機器上:
ETCD_VER=v3.4.4
GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
DOWNLOAD_URL=${GITHUB_URL}
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test
curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
/tmp/etcd-download-test/etcd --version
/tmp/etcd-download-test/etcdctl version
通過後臺運行部署起來:
nohup ./etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379 &
不要使用默認的監聽地址,因爲默認的監聽的是 localhost,通過外部無法訪問。
安裝完可以通過./etcdctl version
查看 ETCD 的版本。
一、etcdctl 的基本使用
1.1 關於數據的 CRUD + Watch
新增一條數據
./etcdctl put "/school/class/name" "helios"
獲取一條數據
[root@dajiahao03 etcd]# ./etcdctl get "/school/class/name"
/school/class/name
helios
得到一組數據
[root@dajiahao03 etcd]# ./etcdctl get "/school/class/" --prefix
/school/class/name
helios1
/school/class/name1
helios
得到所有的 key
[root@dajiahao03 etcd]# ./etcdctl --prefix --keys-only=true get /
/school/class/name
/school/class/name1
刪除一條數據
[root@dajiahao03 etcd]# ./etcdctl del "/school/class/name2"
1
watch 的功能,這個功能要開兩個終端喲
# 第一個終端:
./etcdctl watch "/school/class" --prefix
# 第二個終端
[root@dajiahao03 etcd]# ./etcdctl put "/school/class/name2" "helios2"
OK
# 第一個終端的變化
[root@dajiahao03 etcd]# ./etcdctl watch "/school/class" --prefix
PUT
/school/class/name2
helios2
1.2 關於集羣的操作
查看集羣狀態(如果單機的可以不用指定 ENDPOINTS,如果是集羣的話,通過分號的形式加到 ENDPOINTS 後面)
[root@dajiahao03 etcd]# export ENDPOINTS="172.27.143.50:2379"
[root@dajiahao03 etcd]# ./etcdctl --write-out=table --endpoints=$ENDPOINTS endpoint status
+--------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
| ENDPOINT | ID | VERSION | DB SIZE | IS LEADER | IS LEARNER | RAFT TERM | RAFT INDEX | RAFT APPLIED INDEX | ERRORS |
+--------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
| 172.27.143.50:2379 | 8e9e05c52164694d | 3.4.4 | 20 kB | true | false | 3 | 13 | 13 | |
+--------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
查看集羣成員
[root@dajiahao03 etcd]# ./etcdctl --write-out=table --endpoints=$ENDPOINTS member list
+------------------+---------+---------+-----------------------+---------------------+------------+
| ID | STATUS | NAME | PEER ADDRS | CLIENT ADDRS | IS LEARNER |
+------------------+---------+---------+-----------------------+---------------------+------------+
| 8e9e05c52164694d | started | default | http://localhost:2380 | http://0.0.0.0:2379 | false |
+------------------+---------+---------+-----------------------+---------------------+------------+
刪除集羣成員
MEMBER_ID=8e9e05c52164694d
etcdctl --endpoints=$ENDPOINTS member remove ${MEMBER_ID}
添加成員
NEW_ETCD_
NEW_ETCD_HOST="172.27.140.172"
./etcdctl --endpoints=$ENDPOINTS member add ${NEW_ETCD_NAME} --peer-urls=http://${NEW_ETCD_HOST}:2380
1.3 關於集羣的備份和恢復
磁盤碎片整理
[root@dajiahao03 etcd]# ./etcdctl --endpoints=$ENDPOINTS defrag
Finished defragmenting etcd member[172.27.143.50:2379]
備份當前的 ETD 集羣
[root@dajiahao03 etcd]# ./etcdctl snapshot save snapshot.db
{"level":"info","ts":1583651900.406544,"caller":"snapshot/v3_snapshot.go:110","msg":"created temporary db file","path":"snapshot.db.part"}
{"level":"info","ts":1583651900.4077375,"caller":"snapshot/v3_snapshot.go:121","msg":"fetching snapshot","endpoint":"127.0.0.1:2379"}
{"level":"info","ts":1583651900.4105544,"caller":"snapshot/v3_snapshot.go:134","msg":"fetched snapshot","endpoint":"127.0.0.1:2379","took":0.003921237}
{"level":"info","ts":1583651900.410609,"caller":"snapshot/v3_snapshot.go:143","msg":"saved","path":"snapshot.db"}
Snapshot saved at snapshot.db
[root@dajiahao03 etcd]# ll snapshot.db
-rw------- 1 root root 20512 3月 8 15:18 snapshot.db
查看 snapshot 狀態
[root@dajiahao03 etcd]# ./etcdctl snapshot status snapshot.db
21c0c96e, 8, 11, 20 kB
從備份中恢復集羣
[root@dajiahao03 etcd]# ./etcdctl snapshot save snapshot.db
{"level":"info","ts":1583652044.0606484,"caller":"snapshot/v3_snapshot.go:110","msg":"created temporary db file","path":"snapshot.db.part"}
{"level":"info","ts":1583652044.0613058,"caller":"snapshot/v3_snapshot.go:121","msg":"fetching snapshot","endpoint":"127.0.0.1:2379"}
{"level":"info","ts":1583652044.0659368,"caller":"snapshot/v3_snapshot.go:134","msg":"fetched snapshot","endpoint":"127.0.0.1:2379","took":0.005182366}
{"level":"info","ts":1583652044.0660565,"caller":"snapshot/v3_snapshot.go:143","msg":"saved","path":"snapshot.db"}
Snapshot saved at snapshot.db
切換 leader
# 先看狀態
etcdctl endpoint --cluster=true status -w table
# move-leader
./etcdctl move-leader d6414a7c7c550d29
二、使用 etcd client 可能會出現的問題
通過go get go.etcd.io/etcd/clientv3
,如果覺得慢活着出現問題,可以手動的把 etcd 項目手動 clone 到指定目錄先:
mkfir -p ${GOPATH}/src/go.etcd.io
git clone git@github.com:etcd-io/etcd.git ${GOPATH}/src/go.etcd.io
在 build 的時候出現下圖的問題的話,可以參考這個博客,但是我嘗試了,好想不管用,如果不管用可以看我後面的終極方法。
經過了上述的還是不行,因爲我的 go 版本是 13.X,我就乾脆降到 12.X,就 work 了。步驟如下(國內鏡像地址:http://mirrors.ustc.edu.cn/golang/):
# 找到匹配的版本的go包(我的是mac的)
wget http://mirrors.ustc.edu.cn/golang/go1.12.4.darwin-amd64.tar.gz
tar -C /usr/local/bin -xzf /home/yourname/Downloads/go1.12.4.linux-amd64.tar.gz
# 修改bashrc重的PATH(我用的是zsh)
vim ~/.zshrc
export PATH=$PATH:/usr/local/bin/go/bin
source ~/.zshrc
三、etcd ClientV3 的使用
3.1 連接 ETCD
var (
config clientv3.Config
client *clientv3.Client
err error
)
// 客戶端配置
config = clientv3.Config{
Endpoints: []string{"172.27.43.50:2379"},
DialTimeout: 5 * time.Second,
}
// 建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
3.2 寫入數據到 ETCD
// 實例化一個用於操作ETCD的KV
kv = clientv3.NewKV(client)
if putResp, err = kv.Put(context.TODO(), "/school/class/students", "helios0", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
return
}
fmt.Println(putResp.Header.Revision)
if putResp.PrevKv != nil {
fmt.Printf("prev Value: %s \n CreateRevision : %d \n ModRevision: %d \n Version: %d \n",
string(putResp.PrevKv.Value), putResp.PrevKv.CreateRevision, putResp.PrevKv.ModRevision, putResp.PrevKv.Version)
}
輸出爲:
20
prev Value: helios0
CreateRevision : 9
ModRevision: 19
Version: 11
這裏注意 kv.Put 的最後一個參數表示,返回上一次的數據,是個可選參數。在 etcd 的 client 中有很多這樣的可選參數,更多的參數可以參考這個文檔 clientv3 option
3.3 獲取 ETCD 裏面的數據
// 實例化一個用於操作ETCD的KV
kv = clientv3.NewKV(client)
if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil {
fmt.Println(err)
return
}
// 輸出本次的Revision
fmt.Printf("Key is s %s \n Value is %s \n", getResp.Kvs[0].Key, getResp.Kvs[0].Value)
輸出爲:
Key is s /school/class/students
Value is helios0
如果給 kv.Get 方法加上clientv3.WithPrefix()
就能查找出以某個爲前綴的所有 KV。
3.4 刪除 ETCD 裏面的數據
kv = clientv3.NewKV(client)
_, err = kv.Put(context.TODO(), "/school/class/students", "helios1")
if delResp, err = kv.Delete(context.TODO(), "/school/class/students", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
return
}
if len(delResp.PrevKvs) != 0 {
for _, kvpair = range delResp.PrevKvs {
fmt.Printf("delete key is: %s \n Value: %s \n", string(kvpair.Key), string(kvpair.Value))
}
}
輸出爲:
delete key is: /school/class/students
Value: helios1
在 Delete 中還可以增加下面兩個 option:- clientv3.WithFromKey(): 從這個 key 開始 - clientv3.WithLimit(n): 先知刪除 n 條
3.4 對租約的操作
3.4.1 申請一個 10s 的租約,定時查看是否過期
// 申請一個租約
lease = clientv3.NewLease(client)
if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
fmt.Println(err)
return
}
leaseId = leaseGrantResp.ID
// 獲得kv API子集
kv = clientv3.NewKV(client)
if _, err = kv.Put(context.TODO(), "/school/class/students", "h", clientv3.WithLease(leaseId)); err != nil {
fmt.Println(err)
return
}
for {
if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil {
fmt.Println(err)
return
}
if getResp.Count == 0 {
fmt.Println("kv過期了")
break
}
fmt.Println("還沒過期:", getResp.Kvs)
time.Sleep(2 * time.Second)
}
輸出:
還沒過期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
還沒過期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
還沒過期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
還沒過期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
還沒過期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
還沒過期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
kv過期了
3.4.2 自動續租
if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
fmt.Println(err)
return
}
go func() {
for {
select {
case keepResp = <- keepRespChan:
if keepRespChan == nil {
fmt.Println("租約已經失效了")
goto END
} else { // 每秒會續租一次, 所以就會受到一次應答
fmt.Println("收到自動續租應答:", keepResp.ID)
}
}
}
END:
}()
3.5 watch 功能
對某一個 key 進行監聽 5s:
kv = clientv3.NewKV(client)
// 模擬KV的變化
go func() {
for {
_ , err = kv.Put(context.TODO(), "/school/class/students", "helios1")
_, err = kv.Delete(context.TODO(), "/school/class/students")
time.Sleep(1 * time.Second)
}
}()
// 先GET到當前的值,並監聽後續變化
if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil {
fmt.Println(err)
return
}
// 現在key是存在的
if len(getResp.Kvs) != 0 {
fmt.Println("當前值:", string(getResp.Kvs[0].Value))
}
// 獲得當前revision
watchStartRevision = getResp.Header.Revision + 1
// 創建一個watcher
watcher = clientv3.NewWatcher(client)
fmt.Println("從該版本向後監聽:", watchStartRevision)
ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5 * time.Second, func() {
cancelFunc()
})
watchRespChan = watcher.Watch(ctx, "/school/class/students", clientv3.WithRev(watchStartRevision))
// 處理kv變化事件
for watchResp = range watchRespChan {
for _, event = range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改爲:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("刪除了", "Revision:", event.Kv.ModRevision)
}
}
}
3.6 通過 op 方法代替 kv.Get/kv.Put/kv.Delete
putOp = clientv3.OpPut("/school/class/students", "helios")
if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
panic(err)
}
fmt.Println("寫入Revision:", opResp.Put().Header.Revision)
getOp = clientv3.OpGet("/school/class/students")
if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
panic(err)
}
fmt.Println("數據Revision:", opResp.Get().Kvs[0].ModRevision)
fmt.Println("數據value:", string(opResp.Get().Kvs[0].Value))
輸出爲:
寫入Revision: 46
數據Revision: 46
數據value: helios
3.7(選看) 通過 txn 實現分佈式鎖
ETCD 中的 txn 通過簡單的 "If-Then-Else" 實現了原子操作。
實現分佈式鎖主要分爲三個步驟:1. 上鎖,包括創建租約、自動續約、在租約時間內去搶一個 key 2. 搶到鎖後執行業務邏輯,沒有搶到退出 3. 釋放租約
接下來我們看代碼:
// 1. 上鎖
// 1.1 創建租約
lease = clientv3.NewLease(client)
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
panic(err)
}
leaseId = leaseGrantResp.ID
// 1.2 自動續約
// 創建一個可取消的租約,主要是爲了退出的時候能夠釋放
ctx, cancelFunc = context.WithCancel(context.TODO())
// 3. 釋放租約
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseId)
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
panic(err)
}
// 續約應答
go func() {
for {
select {
case keepResp = <- keepRespChan:
if keepRespChan == nil {
fmt.Println("租約已經失效了")
goto END
} else { // 每秒會續租一次, 所以就會受到一次應答
fmt.Println("收到自動續租應答:", keepResp.ID)
}
}
}
END:
}()
// 1.3 在租約時間內去搶鎖(etcd裏面的鎖就是一個key)
kv = clientv3.NewKV(client)
// 創建事物
txn = kv.Txn(context.TODO())
//if 不存在key, then 設置它, else 搶鎖失敗
txn.If(clientv3.Compare(clientv3.CreateRevision("lock"), "=", 0)).
Then(clientv3.OpPut("lock", "g", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("lock"))
// 提交事務
if txnResp, err = txn.Commit(); err != nil {
panic(err)
}
if !txnResp.Succeeded {
fmt.Println("鎖被佔用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 2. 搶到鎖後執行業務邏輯,沒有搶到退出
fmt.Println("處理任務")
time.Sleep(5 * time.Second)
// 3. 釋放鎖,步驟在上面的defer,當defer租約關掉的時候,對應的key被回收了
轉自:Helios
zhuanlan.zhihu.com/p/111800017
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/KRkxwkjuUeQ2wegyC7C0Mw