etcd 實戰基礎篇
最近一直在看 etcd 相關的東西,爲了不 "白看",加深理解,隨即開啓此係列的輸出。
Etcd 是什麼
Etcd 是由 Go 編寫的。它是一個強一致性的分佈式鍵值存儲,提供一種可靠的方式來存儲需要由分佈式系統或者機器集羣訪問的數據。 同時 Etcd 各節點中的通信是通過 Raft 一致性算法來處理的。 有很多大型開源項目的底層都基於 Etcd,舉幾個比較有名的工業級項目: kubernetes、 CoreDNS、ROOK......
Etcd 的場景
-
服務發現。(可以把服務存儲到某個 prefix 開頭的 key 中,然後消費端或者服務信息以調用, 同時消費者也可以通過 watch 獲得 key 的變化)
-
消息分佈和訂閱
-
分佈式鎖
-
Leader 選舉
-
分佈式隊列
-
負載均衡
-
......
和 redis 的區別
面試的時候可能有面試官喜歡問:
-
redis 的數據類型更豐富 (string, hash, set ,zset, list),etcd 僅僅就是 key-val。
-
etcd 的底層是 Raft 算法,可以保證數據的強一致性。而 redis 數據複製上是主備異步複製,只能最終一致性。
-
讀寫性能上,因爲 etcd 保證強一致性,所以會比 redis 差。
-
存儲方面,etcd 使用的是持久化存儲 boltdb,而 redis 的方案是可持久化的 aof/rdb 。
環境與說明
直接下載編譯好的二進制文件也好,還是自己下載源碼編譯運行,先開啓一個單節點服務就行。我本地使用 goreman 搭建了三個實例。
這裏稍微說明一下:PEER ADDRS
指的是向其他 etcd server
暴露的通信地址, 比如上圖 name=infra1
要調用 infra2
, 調用的就是 http://127.0.0.1:22380
。 而 CLIENT ADDRS
是對客戶端暴露的地址。比如接下來我們的客戶端連接的是 infra1
,使用的就是 http://127.0.0.1:2379
。
目前網上的教程大多使用編譯好的 etcdctl 這樣的二進制文件,通過命令行來進行操作, 簡單直觀。比如:
但是也會導致一個問題,你並不知道客戶端底層是如何運行的,這中間又涉及了哪些接口,對應的數據結構是什麼樣的。 所以爲了一步步深入 etcd,我們從代碼層面操作 etcd 客戶端。
初始化 etcd 客戶端
我們先初始化一個 etcd 客戶端。
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")
var cli *clientv3.Client
// 初始化etcd 客戶端
func init() {
flag.Parse()
var err error
// 解析etcd的地址,編程[]string
endpoints := strings.Split(*addr, ",")
// 創建一個 etcd 的客戶端
cli, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
DialTimeout: 5 * time.Second})
if err != nil {
fmt.Printf("初始化客戶端失敗:%v\\n", err)
log.Fatal(err)
}
}
put 操作
命令行 etcdctl put key val 對應操作。
// 設置key
func PutKey(key string, value string) {
var err error
var resp *clientv3.PutResponse
resp, err = cli.Put(context.Background(), key, value)
if err != nil {
fmt.Printf("設置 key 失敗:%v\\n", err)
return
}
fmt.Printf("操作結果:%v\\n", resp)
}
除了簡單的設置,我們還有一種租約模式,也就是設置一個 key 的有效期,在有效期之內可以進行續租,如果沒續租到期就過期。 對應的命令行是分兩段:
etcdctl lease grant 200
// lease 326978bac638650a granted with TTL(200s)
etcdctl put hello world --lease=326978bac638650a
對應操作,
// 設置會過期的key
func PutKeyLease(key string, value string, ttl int64) {
var err error
var resp *clientv3.PutResponse
// 創建一個租約對象
var lease clientv3.Lease
lease = clientv3.NewLease(cli)
var leaseResp *clientv3.LeaseGrantResponse
// 根據時間,生成一個租約
leaseResp, err = lease.Grant(context.Background(), ttl)
if err != nil {
fmt.Printf("設置 租約 失敗:%v\\n", err)
}
resp, err = cli.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))
if err != nil {
fmt.Printf("設置 key 失敗:%v\\n", err)
return
}
fmt.Printf("操作結果:%v\\n", resp)
}
etcd 的租約模式,簡單的說, 當 Lease server 收到 client 請求,比如上面創建一個有效期 200 秒的請求,會通過 Raft 模塊完成日誌同步, 隨後 Apply 模塊的 Grant 接口執行日誌條目內容。這是後續我們要研究的,這裏略微提一下。
首先你得創建一個 Lease(租約),獲取到一個 Lease 唯一 id,然後 put 的時候帶上這個 id。當一個 key 指定一個 Lease 的時候, 底層最終是會把這個 key 關聯到 Lease 的內存集合中。所以本質上,一個 Lease 可以 關聯 n 個 key。而我們平常使用的緩存 key 設置過期時間,一般是把 key 和過期時間一對一綁定。可能有人還要問,Lease 到期了是如何刪除掉關聯的 key?
其實原理解釋起來也很簡單。Lease 在底層存儲的結構是堆。由一個異步的 G 專門負責的去淘汰過期的 Lease。定時從最小堆中取出已經到期的 Lease。 然後刪除 Lease 以及 刪除通過 LeaseId 關聯上此 Lease 的 key 列表。後面我們分析源碼的時候專門討論這塊。
這裏我還要說一點,你可以看到,不管是 put 一個普通的 key,還是一個帶有租約的 key,調用的都是同一個方法。
// 普通的
resp, err = cli.Put(context.Background(), key, value)
// 租約
resp, err = cli.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))
// 源碼裏面
type OpOption func(*Op)
func WithLease(leaseID LeaseID) OpOption {
return func(op *Op) { op.leaseID = leaseID }
}
func (op *Op) applyOpts(opts []OpOption) {
for _, opt := range opts {
opt(op)
}
}
看出來了嗎?一個很常見的設計模式,裝飾器。
Get 操作
命令行 etcdctl get key
對應操作,
func GetKey(key string) {
var err error
var res *clientv3.GetResponse
res, err = cli.Get(context.Background(), key)
if err != nil {
fmt.Printf("獲取 key 失敗 :%v\\n", err)
return
}
fmt.Printf("key %v 的值是:%+v\\n", key, res)
}
我們都知道,etcd 從 v3 開始,底層實現了 MVCC 機制。所以在 etcd 中的 key 是存在多個歷史版本的。
我們會在命令行中操作 etcdctl get hello --rev=?
,比如
可以看到,不同版本的 key("hello") 的值是不一樣的。
// 獲取指定版本的key
func GetKeyByVersion(key string, version int64) {
var err error
var res *clientv3.GetResponse
res, err = cli.Get(context.Background(), key, clientv3.WithRev(version))
if err != nil {
fmt.Printf("刪除 key:%v 失敗:%v", key, err)
return
}
fmt.Printf("請求key:%v,請求版本:%v,獲取結果:%+v\\n", key, version, res)
一樣的套路。我們也可以運行這段代碼演示一下。
src.GetKeyByVersion("hello", 20)
src.GetKeyByVersion("hello", 21)
Watch 操作
命令行 ./etcdctl watch hello
爲了避免客戶端的反覆輪詢, etcd 提供了 event 機制。客戶端可以訂閱一系列的 event ,用於 watch 某些 key 。 當這些被 watch 的 key 更新時, etcd 就會通知客戶端。
// 監聽key 變動
func WatchKey(key string) {
var watch clientv3.WatchChan
watch = cli.Watch(context.Background(), key)
for {
res := <-watch
fmt.Printf("key:%v變動通知:%+v\\n", key, res)
fmt.Printf("值:%+v\\n", *res.Events[0])
}
}
從上面這段代碼看出,watch 最終是通過 channel 的方式來進行通知的。
// 開啓一個 G
// go src.WatchKey("hello")
然後我們運行這段程序,在命令行上操作 hello 這個 key。
./etcdctl lease grant 30
lease 326978bac638651e granted with TTL(30s)
./etcdctl put hello world-age --lease=326978bac638651e
可以看到接收到兩個事件,一個是 put,一個是租約到期 delete。
總結
以下是這篇文章全部代碼。
package src
import (
"context"
"flag"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"strings"
"time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")
var cli *clientv3.Client
// 初始化etcd 客戶端
func init() {
flag.Parse()
var err error
// 解析etcd的地址,編程[]string
endpoints := strings.Split(*addr, ",")
// 創建一個 etcd 的客戶端
cli, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
DialTimeout: 5 * time.Second})
if err != nil {
fmt.Printf("初始化客戶端失敗:%v\\n", err)
log.Fatal(err)
}
}
// 設置key
func PutKey(key string, value string) {
var err error
var resp *clientv3.PutResponse
resp, err = cli.Put(context.Background(), key, value)
if err != nil {
fmt.Printf("設置 key 失敗:%v\\n", err)
return
}
fmt.Printf("操作結果:%v\\n", resp)
}
// 設置會過期的key
func PutKeyLease(key string, value string, ttl int64) {
var err error
var resp *clientv3.PutResponse
// 創建一個租約對象
var lease clientv3.Lease
lease = clientv3.NewLease(cli)
var leaseResp *clientv3.LeaseGrantResponse
// 根據時間,生成一個租約
leaseResp, err = lease.Grant(context.Background(), ttl)
if err != nil {
fmt.Printf("設置 租約 失敗:%v\\n", err)
}
resp, err = cli.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))
if err != nil {
fmt.Printf("設置 key 失敗:%v\\n", err)
return
}
fmt.Printf("操作結果:%v\\n", resp)
}
// 獲取key
func GetKey(key string) {
var err error
var res *clientv3.GetResponse
res, err = cli.Get(context.Background(), key)
if err != nil {
fmt.Printf("獲取 key 失敗 :%v\\n", err)
return
}
fmt.Printf("key %v 的值是:%+v\\n", key, res)
}
// 獲取指定版本的key
func GetKeyByVersion(key string, version int64) {
var err error
var res *clientv3.GetResponse
res, err = cli.Get(context.Background(), key, clientv3.WithRev(version))
if err != nil {
fmt.Printf("刪除 key:%v 失敗:%v", key, err)
return
}
fmt.Printf("請求key:%v,請求版本:%v,獲取結果:%+v\\n", key, version, res)
}
// 刪除key
func DeleteKey(key string) {
var err error
var res *clientv3.DeleteResponse
res, err = cli.Delete(context.Background(), key)
if err != nil {
fmt.Printf("刪除 key:%v 失敗:%v", key, err)
return
}
fmt.Printf("操作結果:%+v\\n", res)
}
// 監聽key 變動
func WatchKey(key string) {
var watch clientv3.WatchChan
watch = cli.Watch(context.Background(), key, clientv3.WithRev(21))
for {
res := <-watch
fmt.Printf("key:%v變動通知:%+v\\n", key, res)
fmt.Printf("值:%+v\\n", *res.Events[0])
}
}
這篇文章主要介紹了 etcd 這個分佈式存儲工具,包括它的應用場景以及實戰基本的操作。 上面其實還有很多的實例沒有寫出來,一個是因爲懶,沒必要一個個演示一遍,另一個原因是留給你們自行實現。 我們以這個爲開始,一步步敲開 etcd 的大門。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Z9flTpcP4I1sAqxwzV_7NQ