etcd 怎麼對 key 設置過期時間

  1. 租約是什麼

我們都知道 Redis 可以通過 expire 命令對 key 設置過期時間,來實現緩存的 ttl,etcd 同樣有一種特性可以對 key 設置過期時間,也就是租約(Lease)。不過相較來說,兩者的適用場景並不相同,etcd 的 Lease 廣泛的用在服務註冊與保活上,redis 則主要用於淘汰緩存。下面介紹一下 etcd 的 Lease 機制,會從使用方式,以及實現原理來逐步探究。

  1. 使用案例

首先通過一個案例簡單介紹它的使用方式。

package main

import (
  "context"
  "log"
  "os"
  "os/signal"
  "syscall"
  "time"

  clientv3 "go.etcd.io/etcd/client/v3"
)

func main() {

  key := "linugo-lease"

  cli, err := clientv3.New(clientv3.Config{
    Endpoints: []string{"127.0.0.1:23790"},
    DialTimeout: time.Second,
  })
  if err != nil {
    log.Fatal("new client err: ", err)
  }

  //首先創建一個Lease並通過Grant方法申請一個租約,設置ttl爲20秒,沒有續約的話,該租約會在20s後消失
  ls := clientv3.NewLease(cli)
  grantResp, err := ls.Grant(context.TODO(), 20)
  if err != nil {
    log.Fatal("grant err: ", err)
  }
  log.Printf("grant id: %x\n", grantResp.ID)

  //接下來插入一個鍵值對綁定該租約,該鍵值對會隨着租約的到期而相應被刪除
  putResp, err := cli.Put(context.TODO(), key, "value", clientv3.WithLease(grantResp.ID))
  if err != nil {
    log.Fatal("put err: ", err)
  }

  log.Printf("create version: %v\n", putResp.Header.Revision)
  //通過KeepAliveOnce方法對該租約進行續期,每隔5s會將該租約續期到初始的20s
  go func() {
    for {
      time.Sleep(time.Second * 5)
      resp, err := ls.KeepAliveOnce(context.TODO(), grantResp.ID)
      if err != nil {
        log.Println("keep alive once err: ", err)
        break
      }
      log.Println("keep alive: ", resp.TTL)
    }
  }()

  sigC := make(chan os.Signal, 1)
  signal.Notify(sigC, os.Interrupt, syscall.SIGTERM)
  s := <-sigC
  log.Println("exit with: ", s.String())
}

我們可以通過上述方式實現某個服務模塊的保活,可以將節點的地址註冊到 etcd 中,並綁定適當時長的租約,定時進行續約操作,若節點宕機,超過了租約時長,etcd 中該節點的信息就會被移除掉,實現服務的自動摘除,通常配合 etcd 的 watch 特性來做到實時的感知。

v3 版的客戶端接口除了上述的 Grant,KeepAliveOnce 方法,還包括了一些其他重要的方法如 Revoke 刪除某個租約,TimeToLive 查看某個租約剩餘時長等。

etcd 服務端面向租約對客戶端服務的有 5 個接口,分別對 client 端的方法給予了實現。本次主要對服務端的實現方法進行分析。

type LeaseServer interface {
  //對應客戶端的Grant方法,創建租約
  LeaseGrant(context.Context, *LeaseGrantRequest) (*LeaseGrantResponse, error)
  //刪除某個租約
  LeaseRevoke(context.Context, *LeaseRevokeRequest) (*LeaseRevokeResponse, error)
  //租約某個續期
  LeaseKeepAlive(Lease_LeaseKeepAliveServer) error
  //租約剩餘時長查詢
  LeaseTimeToLive(context.Context, *LeaseTimeToLiveRequest) (*LeaseTimeToLiveResponse, error)
  //查看所有租約
  LeaseLeases(context.Context, *LeaseLeasesRequest) (*LeaseLeasesResponse, error)
}
  1. 初始化

在 etcd 啓動時候,會初始化一個 lessor,lessor 內部存儲了所有有關租約的信息,包括租約 ID,到期時間,租約綁定的鍵值對等;lessor 實現了一系列接口,是租約功能的具體實現邏輯,包括 Grant(創建),Revoke(撤銷),Renew(續租) 等。

type lessor struct {
  mu sync.RWMutex
  demotec chan struct{}
  //存放所有有效的lease信息,key爲leaseID,value包括該租約的ID,ttl,lease綁定的key等信息
  leaseMap map[LeaseID]*Lease
  //便於查找lease的一個數據結構,基於最小堆實現,可以將快到期的租約放到隊頭,檢查是否過期時候,只需要檢查隊頭即可
  leaseExpiredNotifier *LeaseExpiredNotifier
  //用於實時更新lease的剩餘時間
  leaseCheckpointHeap LeaseQueue
  //用戶存放的key與lease的綁定關係,通過key可以找到租約
  itemMap map[LeaseItem]LeaseID
  ......
  //過期的lease會被放到該chan中,被消費者清理
  expiredC chan []*Lease
  ......
}

在 lessor 被初始化後,同時會啓動一個 goroutine,用於頻繁的檢查是否有過期的 lease 以及更新 lease 剩餘時間。lease 的這些檢查是集羣的 leader 節點做的,包括更新剩餘的時間,維護 lease 的最小堆,到期時候撤銷 lease。而 follower 節點只用於響應 leader 節點的存儲、更新或撤銷 lease 請求。

func (le *lessor) runLoop() {
  defer close(le.doneC)
  for {
    //檢查是否有過期的lease
    le.revokeExpiredLeases()
    //checkpoint機制檢查並更新lease的剩餘時間
    le.checkpointScheduledLeases()
    //每500毫秒檢查一次
    select {
    case <-time.After(500 * time.Millisecond):
    case <-le.stopC:
      return
    }
  }
}

爲了涵蓋大部分場景,我們假設一個三節點的 etcd 集羣的場景,通過上面的案例代碼對其中的一個 follower 節點發起請求,並針對內部的運作流程進行分析。

  1. 創建 lease

當 v3 客戶端調用 Grant 方法時候,會對應到 server 端 LeaseServer 的 LeaseGrant 方法,該方法會經過一系列的中間步驟(鑑權等)到達 etcdServer 包裝實現的 LeaseGrant 方法,該方法會調用 raft 模塊並封裝一個 Lease 的提案並進行數據同步流程。由於此時節點是 follower,會將請求轉交給 leader 進行處理,leader 接到請求後會將該提案封裝成一個日誌,並廣播到 follower 節點,follower 節點執行提案消息,並回復給 leader 節點。

在 follower 節點執行提案內容時候,會解析出該請求是一個創建 lease 的請求,該流程是在 apply 模塊執行的。apply 模塊會調用自己包裝好的 LeaseGrant 方法。

func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
  op := "unknown"
  ar := &applyResult{}
  ......

  // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
  switch {
  ......
  case r.LeaseGrant != nil:
    op = "LeaseGrant"
    ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
  ......
  default:
    a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
  }
  return ar
}

LeaseGrant 方法是對 lessor 實現的 Grant 方法的進一步封裝。

func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
  l, err := a.s.lessor.Grant(lease.LeaseID(lc.ID), lc.TTL)
  resp := &pb.LeaseGrantResponse{}
  if err == nil {
    resp.ID = int64(l.ID)
    resp.TTL = l.TTL()
    resp.Header = newHeader(a.s)
  }
  return resp, err
}

lessor 通過 Grant 方法將 lease 封裝並存入到自己的 leaseMap,並經 lease 持久化到 boltdb。

func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
  ......
  //封裝lease
  l := &Lease{
    ID: id,
    ttl: ttl,
    //用於存放該lease綁定的key,用於在lease過期時刪除key
    itemSet: make(map[LeaseItem]struct{}),
    revokec: make(chan struct{}),
  }
  ......
  //如果是leader節點,則刷新lease的到期時間
  if le.isPrimary() {
    l.refresh(0)
  } else {
    //follower節點中沒有存儲lease的到期時間
    l.forever()
  }

  le.leaseMap[id] = l
  //lease信息持久化
  l.persistTo(le.b)
  //如果是leader節點,就將lease信息放到最小堆中
  if le.isPrimary() {
    item := &LeaseWithTime{id: l.ID, time: l.expiry}
    le.leaseExpiredNotifier.RegisterOrUpdate(item)
    le.scheduleCheckpointIfNeeded(l)
  }

  return l, nil
}

type Lease struct {
  ID LeaseID
  ttl int64
   //剩餘ttl,用於checkpoint機制
  remainingTTL int64 
  expiryMu sync.RWMutex
  // lease的過期時間,只會存在於leader的lessor中
  expiry time.Time
  mu sync.RWMutex
  //該lesase對應key的set集合
  itemSet map[LeaseItem]struct{}
  //用於通知的chan,當該lease過期,chan會關閉
  revokec chan struct{}
}
  1. 綁定

lease 創建好之後,就可以通過 Put 指令創建一個數據並與 lease 進行綁定。在 Put 時候,put 的 value 字段中會有一個 leaseID,並存到了 boltDB。這樣可以在 etcd 停掉之後,再次啓動時可以根據持久化存儲來恢復 lease 與數據的對應關係。

func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
  ......
  kv := mvccpb.KeyValue{
    Key: key,
    Value: value,
    CreateRevision: c,
    ModRevision: rev,
    Version: ver,
    //lease字段
    Lease: int64(leaseID),
  }
  //....持久化等操作
  //attach操作
  if leaseID != lease.NoLease {
    if tw.s.le == nil {
      panic("no lessor to attach lease")
    }
    err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
    if err != nil {
      panic("unexpected error from lease Attach")
    }
  }
  tw.trace.Step("attach lease to kv pair")
}

lessor 的 Attach 操作會將 lease 與 key 兩者進行綁定並存到自身的 itemMap 以及 lease 的 itemSet 中。

func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
  ......
  l := le.leaseMap[id]
  l.mu.Lock()
  for _, it := range items {
    //存到lease的itemSet
    l.itemSet[it] = struct{}{}
    //存到lessor的itemMap中
    le.itemMap[it] = id
  }
  l.mu.Unlock()
  return nil
}
  1. 續約

客戶端提供的 keepAlive 方法用於 lease 進行續租,每次調用都會使得 lease 的剩餘時間回到初始時候設定的剩餘時間。由於 lease 的一些檢查以及維護都是由 leader 節點維持,所以當我們發送請求到 follower 時,會直接將請求重定向到 leader 節點。

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
     //發送到follower會返回ErrNotPrimary的錯誤
    ttl, err := s.lessor.Renew(id)
  if err == nil { // already requested to primary lessor(leader)
    return ttl, nil
  }
  ......
  for cctx.Err() == nil && err != nil {
    //獲取leader節點
    leader, lerr := s.waitLeader(cctx)
    if lerr != nil {
      return -1, lerr
    }
    for _, url := range leader.PeerURLs {
      lurl := url + leasehttp.LeasePrefix
      //通過http接口請求到leader的keeplaive接口
      ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
      if err == nil || err == lease.ErrLeaseNotFound {
        return ttl, err
      }
    }
    time.Sleep(50 * time.Millisecond)
  }
  ......
  return -1, ErrCanceled
}

到達 Leader 節點之後會通過 Renew 更新該 lease 的剩餘時間,過期時間以及最小堆中的 lease。

func (le *lessor) Renew(id LeaseID) (int64, error) {
  le.mu.RLock()
  if !le.isPrimary() {
    le.mu.RUnlock()
    return -1, ErrNotPrimary
  }

  demotec := le.demotec

  l := le.leaseMap[id]
  if l == nil {
    le.mu.RUnlock()
    return -1, ErrLeaseNotFound
  }
  //當cp(checkpoint方法,需要通過raft做數據同步的方法)不爲空而且剩餘時間大於0時爲true
  clearRemainingTTL := le.cp != nil && l.remainingTTL > 0

  le.mu.RUnlock()
  //如果lease過期
  if l.expired() {
    select {
    case <-l.revokec: //revoke時候會直接返回
      return -1, ErrLeaseNotFound
    // The expired lease might fail to be revoked if the primary changes.
    // The caller will retry on ErrNotPrimary.
    case <-demotec:
      return -1, ErrNotPrimary
    case <-le.stopC:
      return -1, ErrNotPrimary
    }
  }

  if clearRemainingTTL {
    //通過checkpoint方法同步到各個節點lease的剩餘時間
    le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
  }

  le.mu.Lock()
  l.refresh(0)
  item := &LeaseWithTime{id: l.ID, time: l.expiry}
  //更新最小堆中的lease
  le.leaseExpiredNotifier.RegisterOrUpdate(item)
  le.mu.Unlock()
  return l.ttl, nil
}
  1. 撤銷

撤銷操作可以由兩種方式觸發,一種是通過客戶端直接調用 Revoke 方法被動觸發,一種是 leader 節點檢測到 lease 過期時候的主動觸發。被動觸發相對簡單,follower 節點收到請求後直接調用 raft 模塊同步該請求,各個節點收到請求後通過 lessor 主動刪除該 lease(刪除並沒有直接刪除 leaseMap 中的 lease,而是關閉對應 revokec),以及刪除綁定在上面的 key。

func (le *lessor) Revoke(id LeaseID) error {
  le.mu.Lock()
  l := le.leaseMap[id]
  //關閉用於通知撤銷的管道
  defer close(l.revokec)
  le.mu.Unlock()

  if le.rd == nil {
    return nil
  }

  txn := le.rd()
  //Keys方法會將lease中itemSet的key取出
  keys := l.Keys()
  sort.StringSlice(keys).Sort()
  //刪除lease綁定的key
  for _, key := range keys {
    txn.DeleteRange([]byte(key), nil)
  }

  le.mu.Lock()
  defer le.mu.Unlock()
  delete(le.leaseMap, l.ID)
  //刪除boltdb持久化的lease
  le.b.BatchTx().UnsafeDelete(buckets.Lease, int64ToBytes(int64(l.ID)))

  txn.End()
  return nil
}

主動觸發則通過創建 lessor 時候啓動的異步協程 runLoop(),每 500ms 輪詢調用 revokeExpiredLeases 來檢查是否過期。

func (le *lessor) revokeExpiredLeases() {
  var ls []*Lease
  // rate limit
  revokeLimit := leaseRevokeRate / 2
  le.mu.RLock()
    //如果是leader節點
  if le.isPrimary() {
        //在leaseExpiredNotifier最小堆中找到過期的lease
    ls = le.findExpiredLeases(revokeLimit)
  }
  le.mu.RUnlock()
  if len(ls) !={
    select {
    case <-le.stopC:
      return
    case le.expiredC <- ls://將過期的lease發送到expireC中
    default:
    }
  }
}

在 etcd 啓動時候,會另外啓動一個異步 run 協程,會訂閱該 expireC,收到消息後發起一個 Revoke 提案並進行同步操作。

//leassor通過ExpiredLeasesC方法把expiredC暴露出來
func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
  return le.expiredC
}

//etcd啓動的異步run協程
func (s *EtcdServer) run() {
  ......
  var expiredLeaseC <-chan []*lease.Lease
  if s.lessor != nil {
    expiredLeaseC = s.lessor.ExpiredLeasesC()
  }
  for{
    select{
      case leases := <-expiredLeaseC://接到過期消息
          s.GoAttach(func() {
           for _, lease := range leases {
             ......
             lid := lease.ID
             s.GoAttach(func() {
                ctx := s.authStore.WithRoot(s.ctx)
                //調用revoke方法
                _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
                ......
                <-c
              })
        }
      })
        ......
            //其他case操作
        }
    }
}
  1. 小結

爲了保持數據的一致性,lease 的創建,刪除,checkpoint 等都需要經過 raft 模塊進行同步,而在續約階段則直接通過 http 請求發送到 leader 節點,所有的維護與檢查工作都在 leader 節點,大體可以用下圖來表示。由於作者對 raft 模塊理解不夠深入,所以一筆帶過。

9.Reference

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/-Flu-x_4eQZv4MHpVcw9lw