etcd 實戰基礎篇

最近一直在看 etcd 相關的東西,爲了不 "白看",加深理解,隨即開啓此係列的輸出。

Etcd 是什麼

Etcd 是由 Go 編寫的。它是一個強一致性的分佈式鍵值存儲,提供一種可靠的方式來存儲需要由分佈式系統或者機器集羣訪問的數據。 同時 Etcd 各節點中的通信是通過 Raft 一致性算法來處理的。 有很多大型開源項目的底層都基於 Etcd,舉幾個比較有名的工業級項目: kubernetes、 CoreDNS、ROOK......

Etcd 的場景

和 redis 的區別

面試的時候可能有面試官喜歡問:

環境與說明

直接下載編譯好的二進制文件也好,還是自己下載源碼編譯運行,先開啓一個單節點服務就行。我本地使用 goreman 搭建了三個實例。

這裏稍微說明一下:PEER ADDRS 指的是向其他 etcd server 暴露的通信地址, 比如上圖 name=infra1 要調用 infra2, 調用的就是 http://127.0.0.1:22380。 而 CLIENT ADDRS 是對客戶端暴露的地址。比如接下來我們的客戶端連接的是 infra1,使用的就是 http://127.0.0.1:2379

目前網上的教程大多使用編譯好的 etcdctl 這樣的二進制文件,通過命令行來進行操作, 簡單直觀。比如: 

但是也會導致一個問題,你並不知道客戶端底層是如何運行的,這中間又涉及了哪些接口,對應的數據結構是什麼樣的。 所以爲了一步步深入 etcd,我們從代碼層面操作 etcd 客戶端。

初始化 etcd 客戶端

我們先初始化一個 etcd 客戶端。

var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")

var cli *clientv3.Client

// 初始化etcd 客戶端
func init() {
	flag.Parse()
	var err error
	// 解析etcd的地址,編程[]string
	endpoints := strings.Split(*addr, ",")
	// 創建一個 etcd 的客戶端
	cli, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
		DialTimeout: 5 * time.Second})
	if err != nil {
		fmt.Printf("初始化客戶端失敗:%v\\n", err)
		log.Fatal(err)
	}
}

put 操作

命令行 etcdctl put key val 對應操作。

// 設置key
func PutKey(key string, value string) {
	var err error
	var resp *clientv3.PutResponse
	resp, err = cli.Put(context.Background(), key, value)
	if err != nil {
		fmt.Printf("設置 key 失敗:%v\\n", err)
		return
	}
	fmt.Printf("操作結果:%v\\n", resp)
}

除了簡單的設置,我們還有一種租約模式,也就是設置一個 key 的有效期,在有效期之內可以進行續租,如果沒續租到期就過期。 對應的命令行是分兩段:

etcdctl lease grant 200
// lease 326978bac638650a granted with TTL(200s)
etcdctl put hello world --lease=326978bac638650a

對應操作,

// 設置會過期的key
func PutKeyLease(key string, value string, ttl int64) {
	var err error
	var resp *clientv3.PutResponse

	// 創建一個租約對象
	var lease clientv3.Lease
	lease = clientv3.NewLease(cli)

	var leaseResp *clientv3.LeaseGrantResponse
	// 根據時間,生成一個租約
	leaseResp, err = lease.Grant(context.Background(), ttl)
	if err != nil {
		fmt.Printf("設置 租約 失敗:%v\\n", err)
	}

	resp, err = cli.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))
	if err != nil {
		fmt.Printf("設置 key 失敗:%v\\n", err)
		return
	}
	fmt.Printf("操作結果:%v\\n", resp)
}

etcd 的租約模式,簡單的說, 當 Lease server 收到 client 請求,比如上面創建一個有效期 200 秒的請求,會通過 Raft 模塊完成日誌同步, 隨後 Apply 模塊的 Grant 接口執行日誌條目內容。這是後續我們要研究的,這裏略微提一下。

首先你得創建一個 Lease(租約),獲取到一個 Lease 唯一 id,然後 put 的時候帶上這個 id。當一個 key 指定一個 Lease 的時候, 底層最終是會把這個 key 關聯到 Lease 的內存集合中。所以本質上,一個 Lease 可以 關聯 n 個 key。而我們平常使用的緩存 key 設置過期時間,一般是把 key 和過期時間一對一綁定。可能有人還要問,Lease 到期了是如何刪除掉關聯的 key?

其實原理解釋起來也很簡單。Lease 在底層存儲的結構是堆。由一個異步的 G 專門負責的去淘汰過期的 Lease。定時從最小堆中取出已經到期的 Lease。 然後刪除 Lease 以及 刪除通過 LeaseId 關聯上此 Lease 的 key 列表。後面我們分析源碼的時候專門討論這塊。

這裏我還要說一點,你可以看到,不管是 put 一個普通的 key,還是一個帶有租約的 key,調用的都是同一個方法。

// 普通的
	resp, err = cli.Put(context.Background(), key, value)
// 租約
	resp, err = cli.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))

// 源碼裏面
type OpOption func(*Op)

func WithLease(leaseID LeaseID) OpOption {
	return func(op *Op) { op.leaseID = leaseID }
}
func (op *Op) applyOpts(opts []OpOption) {
	for _, opt := range opts {
		opt(op)
	}
}

看出來了嗎?一個很常見的設計模式,裝飾器。

Get 操作

命令行 etcdctl get key 對應操作,

func GetKey(key string) {
	var err error
	var res *clientv3.GetResponse
	res, err = cli.Get(context.Background(), key)
	if err != nil {
		fmt.Printf("獲取 key 失敗 :%v\\n", err)
		return
	}
	fmt.Printf("key %v 的值是:%+v\\n", key, res)
}

我們都知道,etcd 從 v3 開始,底層實現了 MVCC 機制。所以在 etcd 中的 key 是存在多個歷史版本的。 

我們會在命令行中操作 etcdctl get hello --rev=?,比如

 

 可以看到,不同版本的 key("hello") 的值是不一樣的。

// 獲取指定版本的key
func GetKeyByVersion(key string, version int64) {
	var err error
	var res *clientv3.GetResponse
	res, err = cli.Get(context.Background(), key, clientv3.WithRev(version))
	if err != nil {
		fmt.Printf("刪除 key:%v 失敗:%v", key, err)
		return
	}
	fmt.Printf("請求key:%v,請求版本:%v,獲取結果:%+v\\n", key, version, res)

一樣的套路。我們也可以運行這段代碼演示一下。

src.GetKeyByVersion("hello", 20)
src.GetKeyByVersion("hello", 21)

 其他參數暫時忽略,主要看 Kvs 裏面的結果。

Watch 操作

命令行 ./etcdctl watch hello

爲了避免客戶端的反覆輪詢, etcd 提供了 event 機制。客戶端可以訂閱一系列的 event ,用於 watch 某些 key 。 當這些被 watch 的 key 更新時, etcd 就會通知客戶端。

// 監聽key 變動
func WatchKey(key string) {
	var watch clientv3.WatchChan
	watch = cli.Watch(context.Background(), key)
	for {
		res := <-watch
		fmt.Printf("key:%v變動通知:%+v\\n", key, res)
		fmt.Printf("值:%+v\\n", *res.Events[0])
	}

}

從上面這段代碼看出,watch 最終是通過 channel 的方式來進行通知的。

// 開啓一個 G
// go src.WatchKey("hello")

然後我們運行這段程序,在命令行上操作 hello 這個 key。

./etcdctl lease grant 30
lease 326978bac638651e granted with TTL(30s)
./etcdctl put hello world-age --lease=326978bac638651e

可以看到接收到兩個事件,一個是 put,一個是租約到期 delete。

總結

以下是這篇文章全部代碼。

package src

import (
	"context"
	"flag"
	"fmt"
	"github.com/coreos/etcd/clientv3"
	"log"
	"strings"
	"time"
)

var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")

var cli *clientv3.Client

// 初始化etcd 客戶端
func init() {
	flag.Parse()
	var err error
	// 解析etcd的地址,編程[]string
	endpoints := strings.Split(*addr, ",")
	// 創建一個 etcd 的客戶端
	cli, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
		DialTimeout: 5 * time.Second})
	if err != nil {
		fmt.Printf("初始化客戶端失敗:%v\\n", err)
		log.Fatal(err)
	}
}

// 設置key
func PutKey(key string, value string) {
	var err error
	var resp *clientv3.PutResponse
	resp, err = cli.Put(context.Background(), key, value)
	if err != nil {
		fmt.Printf("設置 key 失敗:%v\\n", err)
		return
	}
	fmt.Printf("操作結果:%v\\n", resp)
}

// 設置會過期的key
func PutKeyLease(key string, value string, ttl int64) {
	var err error
	var resp *clientv3.PutResponse

	// 創建一個租約對象
	var lease clientv3.Lease
	lease = clientv3.NewLease(cli)

	var leaseResp *clientv3.LeaseGrantResponse
	// 根據時間,生成一個租約
	leaseResp, err = lease.Grant(context.Background(), ttl)
	if err != nil {
		fmt.Printf("設置 租約 失敗:%v\\n", err)
	}

	resp, err = cli.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))
	if err != nil {
		fmt.Printf("設置 key 失敗:%v\\n", err)
		return
	}
	fmt.Printf("操作結果:%v\\n", resp)
}

// 獲取key
func GetKey(key string) {
	var err error
	var res *clientv3.GetResponse
	res, err = cli.Get(context.Background(), key)
	if err != nil {
		fmt.Printf("獲取 key 失敗 :%v\\n", err)
		return
	}
	fmt.Printf("key %v 的值是:%+v\\n", key, res)
}

// 獲取指定版本的key
func GetKeyByVersion(key string, version int64) {
	var err error
	var res *clientv3.GetResponse
	res, err = cli.Get(context.Background(), key, clientv3.WithRev(version))
	if err != nil {
		fmt.Printf("刪除 key:%v 失敗:%v", key, err)
		return
	}
	fmt.Printf("請求key:%v,請求版本:%v,獲取結果:%+v\\n", key, version, res)
}

// 刪除key
func DeleteKey(key string) {
	var err error
	var res *clientv3.DeleteResponse
	res, err = cli.Delete(context.Background(), key)
	if err != nil {
		fmt.Printf("刪除 key:%v 失敗:%v", key, err)
		return
	}
	fmt.Printf("操作結果:%+v\\n", res)
}

// 監聽key 變動
func WatchKey(key string) {
	var watch clientv3.WatchChan
	watch = cli.Watch(context.Background(), key, clientv3.WithRev(21))
	for {
		res := <-watch
		fmt.Printf("key:%v變動通知:%+v\\n", key, res)
		fmt.Printf("值:%+v\\n", *res.Events[0])
	}

}

這篇文章主要介紹了 etcd 這個分佈式存儲工具,包括它的應用場景以及實戰基本的操作。 上面其實還有很多的實例沒有寫出來,一個是因爲懶,沒必要一個個演示一遍,另一個原因是留給你們自行實現。 我們以這個爲開始,一步步敲開 etcd 的大門。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Z9flTpcP4I1sAqxwzV_7NQ