etcd 怎麼對 key 設置過期時間
- 租約是什麼
我們都知道 Redis 可以通過 expire 命令對 key 設置過期時間,來實現緩存的 ttl,etcd 同樣有一種特性可以對 key 設置過期時間,也就是租約(Lease)。不過相較來說,兩者的適用場景並不相同,etcd 的 Lease 廣泛的用在服務註冊與保活上,redis 則主要用於淘汰緩存。下面介紹一下 etcd 的 Lease 機制,會從使用方式,以及實現原理來逐步探究。
- 使用案例
首先通過一個案例簡單介紹它的使用方式。
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
func main() {
key := "linugo-lease"
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:23790"},
DialTimeout: time.Second,
})
if err != nil {
log.Fatal("new client err: ", err)
}
//首先創建一個Lease並通過Grant方法申請一個租約,設置ttl爲20秒,沒有續約的話,該租約會在20s後消失
ls := clientv3.NewLease(cli)
grantResp, err := ls.Grant(context.TODO(), 20)
if err != nil {
log.Fatal("grant err: ", err)
}
log.Printf("grant id: %x\n", grantResp.ID)
//接下來插入一個鍵值對綁定該租約,該鍵值對會隨着租約的到期而相應被刪除
putResp, err := cli.Put(context.TODO(), key, "value", clientv3.WithLease(grantResp.ID))
if err != nil {
log.Fatal("put err: ", err)
}
log.Printf("create version: %v\n", putResp.Header.Revision)
//通過KeepAliveOnce方法對該租約進行續期,每隔5s會將該租約續期到初始的20s
go func() {
for {
time.Sleep(time.Second * 5)
resp, err := ls.KeepAliveOnce(context.TODO(), grantResp.ID)
if err != nil {
log.Println("keep alive once err: ", err)
break
}
log.Println("keep alive: ", resp.TTL)
}
}()
sigC := make(chan os.Signal, 1)
signal.Notify(sigC, os.Interrupt, syscall.SIGTERM)
s := <-sigC
log.Println("exit with: ", s.String())
}
我們可以通過上述方式實現某個服務模塊的保活,可以將節點的地址註冊到 etcd 中,並綁定適當時長的租約,定時進行續約操作,若節點宕機,超過了租約時長,etcd 中該節點的信息就會被移除掉,實現服務的自動摘除,通常配合 etcd 的 watch 特性來做到實時的感知。
v3 版的客戶端接口除了上述的 Grant,KeepAliveOnce 方法,還包括了一些其他重要的方法如 Revoke 刪除某個租約,TimeToLive 查看某個租約剩餘時長等。
etcd 服務端面向租約對客戶端服務的有 5 個接口,分別對 client 端的方法給予了實現。本次主要對服務端的實現方法進行分析。
type LeaseServer interface {
//對應客戶端的Grant方法,創建租約
LeaseGrant(context.Context, *LeaseGrantRequest) (*LeaseGrantResponse, error)
//刪除某個租約
LeaseRevoke(context.Context, *LeaseRevokeRequest) (*LeaseRevokeResponse, error)
//租約某個續期
LeaseKeepAlive(Lease_LeaseKeepAliveServer) error
//租約剩餘時長查詢
LeaseTimeToLive(context.Context, *LeaseTimeToLiveRequest) (*LeaseTimeToLiveResponse, error)
//查看所有租約
LeaseLeases(context.Context, *LeaseLeasesRequest) (*LeaseLeasesResponse, error)
}
- 初始化
在 etcd 啓動時候,會初始化一個 lessor,lessor 內部存儲了所有有關租約的信息,包括租約 ID,到期時間,租約綁定的鍵值對等;lessor 實現了一系列接口,是租約功能的具體實現邏輯,包括 Grant(創建),Revoke(撤銷),Renew(續租) 等。
type lessor struct {
mu sync.RWMutex
demotec chan struct{}
//存放所有有效的lease信息,key爲leaseID,value包括該租約的ID,ttl,lease綁定的key等信息
leaseMap map[LeaseID]*Lease
//便於查找lease的一個數據結構,基於最小堆實現,可以將快到期的租約放到隊頭,檢查是否過期時候,只需要檢查隊頭即可
leaseExpiredNotifier *LeaseExpiredNotifier
//用於實時更新lease的剩餘時間
leaseCheckpointHeap LeaseQueue
//用戶存放的key與lease的綁定關係,通過key可以找到租約
itemMap map[LeaseItem]LeaseID
......
//過期的lease會被放到該chan中,被消費者清理
expiredC chan []*Lease
......
}
在 lessor 被初始化後,同時會啓動一個 goroutine,用於頻繁的檢查是否有過期的 lease 以及更新 lease 剩餘時間。lease 的這些檢查是集羣的 leader 節點做的,包括更新剩餘的時間,維護 lease 的最小堆,到期時候撤銷 lease。而 follower 節點只用於響應 leader 節點的存儲、更新或撤銷 lease 請求。
func (le *lessor) runLoop() {
defer close(le.doneC)
for {
//檢查是否有過期的lease
le.revokeExpiredLeases()
//checkpoint機制檢查並更新lease的剩餘時間
le.checkpointScheduledLeases()
//每500毫秒檢查一次
select {
case <-time.After(500 * time.Millisecond):
case <-le.stopC:
return
}
}
}
爲了涵蓋大部分場景,我們假設一個三節點的 etcd 集羣的場景,通過上面的案例代碼對其中的一個 follower 節點發起請求,並針對內部的運作流程進行分析。
- 創建 lease
當 v3 客戶端調用 Grant 方法時候,會對應到 server 端 LeaseServer 的 LeaseGrant 方法,該方法會經過一系列的中間步驟(鑑權等)到達 etcdServer 包裝實現的 LeaseGrant 方法,該方法會調用 raft 模塊並封裝一個 Lease 的提案並進行數據同步流程。由於此時節點是 follower,會將請求轉交給 leader 進行處理,leader 接到請求後會將該提案封裝成一個日誌,並廣播到 follower 節點,follower 節點執行提案消息,並回復給 leader 節點。
在 follower 節點執行提案內容時候,會解析出該請求是一個創建 lease 的請求,該流程是在 apply 模塊執行的。apply 模塊會調用自己包裝好的 LeaseGrant 方法。
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
op := "unknown"
ar := &applyResult{}
......
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
switch {
......
case r.LeaseGrant != nil:
op = "LeaseGrant"
ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
......
default:
a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
}
return ar
}
LeaseGrant 方法是對 lessor 實現的 Grant 方法的進一步封裝。
func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
l, err := a.s.lessor.Grant(lease.LeaseID(lc.ID), lc.TTL)
resp := &pb.LeaseGrantResponse{}
if err == nil {
resp.ID = int64(l.ID)
resp.TTL = l.TTL()
resp.Header = newHeader(a.s)
}
return resp, err
}
lessor 通過 Grant 方法將 lease 封裝並存入到自己的 leaseMap,並經 lease 持久化到 boltdb。
func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
......
//封裝lease
l := &Lease{
ID: id,
ttl: ttl,
//用於存放該lease綁定的key,用於在lease過期時刪除key
itemSet: make(map[LeaseItem]struct{}),
revokec: make(chan struct{}),
}
......
//如果是leader節點,則刷新lease的到期時間
if le.isPrimary() {
l.refresh(0)
} else {
//follower節點中沒有存儲lease的到期時間
l.forever()
}
le.leaseMap[id] = l
//lease信息持久化
l.persistTo(le.b)
//如果是leader節點,就將lease信息放到最小堆中
if le.isPrimary() {
item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.scheduleCheckpointIfNeeded(l)
}
return l, nil
}
type Lease struct {
ID LeaseID
ttl int64
//剩餘ttl,用於checkpoint機制
remainingTTL int64
expiryMu sync.RWMutex
// lease的過期時間,只會存在於leader的lessor中
expiry time.Time
mu sync.RWMutex
//該lesase對應key的set集合
itemSet map[LeaseItem]struct{}
//用於通知的chan,當該lease過期,chan會關閉
revokec chan struct{}
}
- 綁定
lease 創建好之後,就可以通過 Put 指令創建一個數據並與 lease 進行綁定。在 Put 時候,put 的 value 字段中會有一個 leaseID,並存到了 boltDB。這樣可以在 etcd 停掉之後,再次啓動時可以根據持久化存儲來恢復 lease 與數據的對應關係。
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
......
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
//lease字段
Lease: int64(leaseID),
}
//....持久化等操作
//attach操作
if leaseID != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to attach lease")
}
err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease Attach")
}
}
tw.trace.Step("attach lease to kv pair")
}
lessor 的 Attach 操作會將 lease 與 key 兩者進行綁定並存到自身的 itemMap 以及 lease 的 itemSet 中。
func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
......
l := le.leaseMap[id]
l.mu.Lock()
for _, it := range items {
//存到lease的itemSet
l.itemSet[it] = struct{}{}
//存到lessor的itemMap中
le.itemMap[it] = id
}
l.mu.Unlock()
return nil
}
- 續約
客戶端提供的 keepAlive 方法用於 lease 進行續租,每次調用都會使得 lease 的剩餘時間回到初始時候設定的剩餘時間。由於 lease 的一些檢查以及維護都是由 leader 節點維持,所以當我們發送請求到 follower 時,會直接將請求重定向到 leader 節點。
func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
//發送到follower會返回ErrNotPrimary的錯誤
ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
......
for cctx.Err() == nil && err != nil {
//獲取leader節點
leader, lerr := s.waitLeader(cctx)
if lerr != nil {
return -1, lerr
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
//通過http接口請求到leader的keeplaive接口
ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
}
}
time.Sleep(50 * time.Millisecond)
}
......
return -1, ErrCanceled
}
到達 Leader 節點之後會通過 Renew 更新該 lease 的剩餘時間,過期時間以及最小堆中的 lease。
func (le *lessor) Renew(id LeaseID) (int64, error) {
le.mu.RLock()
if !le.isPrimary() {
le.mu.RUnlock()
return -1, ErrNotPrimary
}
demotec := le.demotec
l := le.leaseMap[id]
if l == nil {
le.mu.RUnlock()
return -1, ErrLeaseNotFound
}
//當cp(checkpoint方法,需要通過raft做數據同步的方法)不爲空而且剩餘時間大於0時爲true
clearRemainingTTL := le.cp != nil && l.remainingTTL > 0
le.mu.RUnlock()
//如果lease過期
if l.expired() {
select {
case <-l.revokec: //revoke時候會直接返回
return -1, ErrLeaseNotFound
// The expired lease might fail to be revoked if the primary changes.
// The caller will retry on ErrNotPrimary.
case <-demotec:
return -1, ErrNotPrimary
case <-le.stopC:
return -1, ErrNotPrimary
}
}
if clearRemainingTTL {
//通過checkpoint方法同步到各個節點lease的剩餘時間
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
}
le.mu.Lock()
l.refresh(0)
item := &LeaseWithTime{id: l.ID, time: l.expiry}
//更新最小堆中的lease
le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.mu.Unlock()
return l.ttl, nil
}
- 撤銷
撤銷操作可以由兩種方式觸發,一種是通過客戶端直接調用 Revoke 方法被動觸發,一種是 leader 節點檢測到 lease 過期時候的主動觸發。被動觸發相對簡單,follower 節點收到請求後直接調用 raft 模塊同步該請求,各個節點收到請求後通過 lessor 主動刪除該 lease(刪除並沒有直接刪除 leaseMap 中的 lease,而是關閉對應 revokec),以及刪除綁定在上面的 key。
func (le *lessor) Revoke(id LeaseID) error {
le.mu.Lock()
l := le.leaseMap[id]
//關閉用於通知撤銷的管道
defer close(l.revokec)
le.mu.Unlock()
if le.rd == nil {
return nil
}
txn := le.rd()
//Keys方法會將lease中itemSet的key取出
keys := l.Keys()
sort.StringSlice(keys).Sort()
//刪除lease綁定的key
for _, key := range keys {
txn.DeleteRange([]byte(key), nil)
}
le.mu.Lock()
defer le.mu.Unlock()
delete(le.leaseMap, l.ID)
//刪除boltdb持久化的lease
le.b.BatchTx().UnsafeDelete(buckets.Lease, int64ToBytes(int64(l.ID)))
txn.End()
return nil
}
主動觸發則通過創建 lessor 時候啓動的異步協程 runLoop(),每 500ms 輪詢調用 revokeExpiredLeases 來檢查是否過期。
func (le *lessor) revokeExpiredLeases() {
var ls []*Lease
// rate limit
revokeLimit := leaseRevokeRate / 2
le.mu.RLock()
//如果是leader節點
if le.isPrimary() {
//在leaseExpiredNotifier最小堆中找到過期的lease
ls = le.findExpiredLeases(revokeLimit)
}
le.mu.RUnlock()
if len(ls) != 0 {
select {
case <-le.stopC:
return
case le.expiredC <- ls://將過期的lease發送到expireC中
default:
}
}
}
在 etcd 啓動時候,會另外啓動一個異步 run 協程,會訂閱該 expireC,收到消息後發起一個 Revoke 提案並進行同步操作。
//leassor通過ExpiredLeasesC方法把expiredC暴露出來
func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
return le.expiredC
}
//etcd啓動的異步run協程
func (s *EtcdServer) run() {
......
var expiredLeaseC <-chan []*lease.Lease
if s.lessor != nil {
expiredLeaseC = s.lessor.ExpiredLeasesC()
}
for{
select{
case leases := <-expiredLeaseC://接到過期消息
s.GoAttach(func() {
for _, lease := range leases {
......
lid := lease.ID
s.GoAttach(func() {
ctx := s.authStore.WithRoot(s.ctx)
//調用revoke方法
_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
......
<-c
})
}
})
......
//其他case操作
}
}
}
- 小結
爲了保持數據的一致性,lease 的創建,刪除,checkpoint 等都需要經過 raft 模塊進行同步,而在續約階段則直接通過 http 請求發送到 leader 節點,所有的維護與檢查工作都在 leader 節點,大體可以用下圖來表示。由於作者對 raft 模塊理解不夠深入,所以一筆帶過。
9.Reference
-
etcdV3.5.0 源碼 - https://github.com/etcd-io/etcd/tree/v3.5.0
-
etcd 如何實現租約 - etcd 原理與實踐
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/-Flu-x_4eQZv4MHpVcw9lw