etcd watch:etcd 如何實現 watch 機制?

你好,我是 aoho,今天我和你分享的主題是 etcd watch:etcd  如何實現 watch 機制?

etcd v2 和 v3 版本之間的重要變化之一就是 watch 機制的優化。etcd v2 watch 機制採用的是基於 HTTP/1.x 協議的客戶端輪詢機制,歷史版本存儲則是通過滑動窗口。在大量的客戶端連接的場景或者集羣規模較大的場景,導致 etcd 服務端的擴展性和穩定性都無法保證。etcd v3 在此基礎上進行優化,滿足了 Kubernetes pods 部署和狀態管理等業務場景訴求。

watch 是監聽一個或一組 key,key 的任何變化都會發出消息。某種意義上講,etcd 就是發佈訂閱模式。

Watch 的用法

在具體將講解 Watch 的實現方式之前,我們先來體驗下如何使用 Watch。通過  etcdctl 命令行工具實現鍵值對的檢測:

$ etcdctl put hello aoho
$ etcdctl put hello boho
$ etcdctl watch hello -w=json --rev=1

{
 "Header"{
  "cluster_id": 14841639068965178418,
  "member_id": 10276657743932975437,
  "revision": 4,
  "raft_term"4
 },
 "Events"[{
  "kv"{
   "key""aGVsbG8=",
   "create_revision": 3,
   "mod_revision": 3,
   "version": 1,
   "value""YW9obw=="
  }
 }{
  "kv"{
   "key""aGVsbG8=",
   "create_revision": 3,
   "mod_revision": 4,
   "version": 2,
   "value""Ym9obw=="
  }
 }],
 "CompactRevision": 0,
 "Canceled": false,
 "Created"false
}

依次在命令行中輸入上面三條命令,前面兩條依次更新 hello 對應的值,第三條命令監測鍵爲 hello 的變化,並指定版本號從 1 開始。結果輸出了兩條 watch 事件。我們接着在另一個命令行繼續輸入如下的更新命令:

$ etcdctl put hello coho

可以看到前一個命令行輸出瞭如下的內容:

{
 "Header"{
  "cluster_id": 14841639068965178418,
  "member_id": 10276657743932975437,
  "revision": 5,
  "raft_term"4
 },
 "Events"[{
  "kv"{
   "key""aGVsbG8=",
   "create_revision": 3,
   "mod_revision": 5,
   "version": 3,
   "value""Y29obw=="
  }
 }],
 "CompactRevision": 0,
 "Canceled": false,
 "Created"false
}

命令行輸出的事件表明,鍵 hello 對應的鍵值對發生了更新,並輸出了事件的詳細信息。如上就是通過 etcdctl 客戶端工具實現 watch 指定的鍵值對功能。接着我們看下,clientv3 中是如何實現 watch 功能。

func testWatch() {
    s := newWatchableStore()

    w := s.NewWatchStream()

    w.Watch(start_key: foo, end_key: nil)

    w.Watch(start_key: bar, end_key: nil)

    for {
        consume := <- w.Chan()
    }
}

etcd 的 mvcc 模塊對外提供了兩種訪問鍵值對的實現,一種是鍵值存儲 kvstore,另一種是 watchableStore。它們都實現了 KV 接口,KV 接口的具體實現則是 store 結構體。在上面的實現中,我們先調用了 watchableStore。

當我們要使用 Watch 功能時,我們創建了一個 watchStream。創建出來的 w 可以監聽的鍵爲 hello,之後我們就可以消費 w.Chan() 返回的 channel。鍵爲 hello 的任何變化,都會通過這個 channel 發送給客戶端。

可以看到 watchStream 實現了在大量 kv 的變化中,過濾出當前所監聽的 key,將 key 的變化輸出。

watchableStore 存儲

在前面的課時已經介紹過 kvstore,這裏我們介紹 watchableStore 的實現。Watch 的實現是在 store 上封裝了一層叫做 watchableStore,重寫了 store 的 Write 方法。

// 位於 mvcc/watchable_store_txn.go:22
func (tw *watchableStoreTxnWrite) End() {
 changes := tw.Changes()
 if len(changes) == 0 {
  tw.TxnWrite.End()
  return
 }

 rev := tw.Rev() + 1
 evs := make([]mvccpb.Event, len(changes))
 for i, change := range changes {
  evs[i].Kv = &changes[i]
  if change.CreateRevision == 0 {
   evs[i].Type = mvccpb.DELETE
   evs[i].Kv.ModRevision = rev
  } else {
   evs[i].Type = mvccpb.PUT
  }
 }

 // end write txn under watchable store lock so the updates are visible
 // when asynchronous event posting checks the current store revision
 tw.s.mu.Lock()
 tw.s.notify(rev, evs)
 tw.TxnWrite.End()
 tw.s.mu.Unlock()
}

type watchableStoreTxnWrite struct {
 TxnWrite
 s *watchableStore
}

func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite {
 return &watchableStoreTxnWrite{s.store.Write(trace), s}
}

通過 MVCC 中介紹,store 的任何寫操作,都需要 Write 方法返回的 TxnWrite。所以這裏重寫 Write 方法意味着任何寫操作都會經過 watchableStore。從上面的代碼不難看出,watchableStoreTxnWrite 在事務提交時,先將本次變更 changes 打包成 Event,然後調用 notify 來將變更通知出去。最後真正提交事務 TxnWrite.End()。

Watch 負責了註冊、管理以及觸發 Watcher 的功能。我們先來看一下這個結構體的各個字段:

// 位於 mvcc/watchable_store.go:47
type watchableStore struct {
 *store

 // 同步讀寫鎖
 mu sync.RWMutex

 // 被阻塞在 watch channel 中的 watcherBatch
 victims []watcherBatch
 victimc chan struct{}

 // 未同步的 watchers
 unsynced watcherGroup

 // 已同步的 watchers
 synced watcherGroup

 stopc chan struct{}
 wg    sync.WaitGroup
}

每一個 watchableStore 其實都組合了來自 store 結構體的字段和方法,除此之外,還有兩個 watcherGroup 類型的字段,watcherGroup 管理多個 watcher,能夠根據 key 快速找到監聽該 key 的一個或多個 watcher。其中 unsynced 用於存儲未同步完成的實例,synced 用於存儲已經同步完成的實例。

根據 watchableStore 的定義,我們可以描述 Watch 監聽的過程。

watchableStore 收到了所有 key 的變更後,將這些 key 交給 synced(watchGroup),synced 能夠快速地從所有 key 中找到監聽的 key。將這些 key 發送給對應的 watcher,這些 watcher 再通過 chan 將變更信息發送出去。

synced 是怎麼快速找到符合條件的 key 呢?etcd 中使用了 map 和 adt(紅黑樹)來實現。

不單獨使用 map 是因爲 watch 可以監聽一個範圍的 key。如果只監聽一個 key:

watch(start_key: foo, end_key: nil)

則對應的存儲爲 map[key]*watcher。這樣可以根據 key 快速找到對應的 watcher,etcd 也是這樣做的。但對於一組 key 呢?

watch(start_key: foo, end_key: fop)

這裏我監聽了從 foo->fop 之間的所有 key,理論上這些 key 的數目是無限的,所以無法再使用 map。比如:key=fooac 也屬於監聽範圍。etcd 用 adt 來存儲這種 key。

// 位於 mvcc/watcher_group.go:147
// watcherGroup 是由一系列範圍 watcher 組織起來的 watchers
type watcherGroup struct {
 // keyWatchers has the watchers that watch on a single key
 keyWatchers watcherSetByKey
 // ranges has the watchers that watch a range; it is sorted by interval
 ranges adt.IntervalTree
 // watchers is the set of all watchers
 watchers watcherSet
}

adt 的實現這裏不做介紹,只用知道 adt 能夠根據 key=fooac 快速地找到所屬範圍 foo->fop。在找到 watcher 後,調用 watcher 的 send() 方法,將變更的 Event 發送出去。

syncWatchers 同步監聽

在初始化一個新的 watchableStore 時,etcd 會創建一個用於同步 watcherGroup 的 Goroutine,在 syncWatchersLoop 這個循環中會每隔 100ms 調用一次 syncWatchers 方法,將所有未通知的事件通知給所有的監聽者,這可以說是整個模塊的核心:

// 位於 mvcc/watchable_store.go:334
func (s *watchableStore) syncWatchers() int {
 s.mu.Lock()
 defer s.mu.Unlock()

 if s.unsynced.size() == 0 {
  return 0
 }

 s.store.revMu.RLock()
 defer s.store.revMu.RUnlock()

 // in order to find key-value pairs from unsynced watchers, we need to
 // find min revision index, and these revisions can be used to
 // query the backend store of key-value pairs
 curRev := s.store.currentRev
 compactionRev := s.store.compactMainRev

 wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
 minBytes, maxBytes := newRevBytes(), newRevBytes()
 revToBytes(revision{main: minRev}, minBytes)
 revToBytes(revision{main: curRev + 1}, maxBytes)

 // UnsafeRange returns keys and values. And in boltdb, keys are revisions.
 // values are actual key-value pairs in backend.
 tx := s.store.b.ReadTx()
 tx.RLock()
 revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
 var evs []mvccpb.Event
 evs = kvsToEvents(s.store.lg, wg, revs, vs)
 tx.RUnlock()

 var victims watcherBatch
 wb := newWatcherBatch(wg, evs)
 for w := range wg.watchers {
  w.minRev = curRev + 1

  eb, ok := wb[w]
  if !ok {
   // bring un-notified watcher to synced
   s.synced.add(w)
   s.unsynced.delete(w)
   continue
  }

  if eb.moreRev != 0 {
   w.minRev = eb.moreRev
  }

  if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
   pendingEventsGauge.Add(float64(len(eb.evs)))
  } else {
   if victims == nil {
    victims = make(watcherBatch)
   }
   w.victim = true
  }

  if w.victim {
   victims[w] = eb
  } else {
   if eb.moreRev != 0 {
    // stay unsynced; more to read
    continue
   }
   s.synced.add(w)
  }
  s.unsynced.delete(w)
 }
 s.addVictim(victims)

 vsz := 0
 for _, v := range s.victims {
  vsz += len(v)
 }
 slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))

 return s.unsynced.size()
}

簡化後的 syncWatchers 方法中總共做了三件事情,首先是根據當前的版本從未同步的 watcherGroup 中選出一些待處理的任務,然後從 BoltDB 中取當前版本範圍內的數據變更並將它們轉換成事件,事件和 watcherGroup 在打包之後會通過 send 方法發送到每一個 watcher 對應的 Channel 中。

客戶端監聽事件

客戶端監聽鍵值對時,調用的正是 Watch 方法,Watch 在 stream 中創建一個新的 watcher,並返回對應的 WatchID。

// 位於 mvcc/watcher.go:108
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
 // 防止出現 ket>= end 的錯誤範圍情況
 if len(end) != 0 && bytes.Compare(key, end) != -1 {
  return -1, ErrEmptyWatcherRange
 }

 ws.mu.Lock()
 defer ws.mu.Unlock()
 if ws.closed {
  return -1, ErrEmptyWatcherRange
 }

 if id == AutoWatchID {
  for ws.watchers[ws.nextID] != nil {
   ws.nextID++
  }
  id = ws.nextID
  ws.nextID++
 } else if _, ok := ws.watchers[id]; ok {
  return -1, ErrWatcherDuplicateID
 }

 w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)

 ws.cancels[id] = c
 ws.watchers[id] = w
 return id, nil
}

AutoWatchID 是 WatchStream 中傳遞的觀察者 ID。當用戶沒有提供可用的 ID 時,如果有傳遞該值,etcd 將自動分配一個 ID。如果傳遞的 ID 已經存在,則會返回 ErrWatcherDuplicateID 錯誤。watchable_store.go 中的 watch 實現是監聽的具體實現,實現代碼如下:

// 位於 mvcc/watchable_store.go:120
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
 // 構建 watcher
 wa := &watcher{
  key:    key,
  end:    end,
  minRev: startRev,
  id:     id,
  ch:     ch,
  fcs:    fcs,
 }

 s.mu.Lock()
 s.revMu.RLock()
 synced := startRev > s.store.currentRev || startRev == 0
 if synced {
  wa.minRev = s.store.currentRev + 1
  if startRev > wa.minRev {
   wa.minRev = startRev
  }
 }
 if synced {
  s.synced.add(wa)
 } else {
  slowWatcherGauge.Inc()
  s.unsynced.add(wa)
 }
 s.revMu.RUnlock()
 s.mu.Unlock()
 // prometheus 的指標增加
 watcherGauge.Inc()

 return wa, func() { s.cancelWatcher(wa) }
}

對 watchableStore 進行操作之前,需要加鎖。當 etcd 收到客戶端的 watch 請求,如果請求攜帶了 revision 參數,則比較請求的 revision 和 store 當前的 revision,如果大於當前 revision,則放入 synced 組中,否則放入 unsynced 組。

服務端處理監聽

當 etcd 服務啓動時,會在服務端運行一個用於處理監聽事件的 watchServer gRPC 服務,客戶端的 Watch 請求最終都會被轉發到這個服務的 Watch 函數中:

// 位於 etcdserver/api/v3rpc/watch.go:140
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
 sws := serverWatchStream{
  lg: ws.lg,

  clusterID: ws.clusterID,
  memberID:  ws.memberID,

  maxRequestBytes: ws.maxRequestBytes,

  sg:        ws.sg,
  watchable: ws.watchable,
  ag:        ws.ag,

  gRPCStream:  stream,
  watchStream: ws.watchable.NewWatchStream(),
  // chan for sending control response like watcher created and canceled.
  ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),

  progress: make(map[mvcc.WatchID]bool),
  prevKV:   make(map[mvcc.WatchID]bool),
  fragment: make(map[mvcc.WatchID]bool),

  closec: make(chan struct{}),
 }

 sws.wg.Add(1)
 go func() {
  sws.sendLoop()
  sws.wg.Done()
 }()

 errc := make(chan error, 1)
 // Ideally recvLoop would also use sws.wg to signal its completion
 // but when stream.Context().Done() is closed, the stream's recv
 // may continue to block since it uses a different context, leading to
 // deadlock when calling sws.close().
 go func() {
  if rerr := sws.recvLoop(); rerr != nil {
   if isClientCtxErr(stream.Context().Err(), rerr) {
    sws.lg.Debug("failed to receive watch request from gRPC stream", zap.Error(rerr))
   } else {
    sws.lg.Warn("failed to receive watch request from gRPC stream", zap.Error(rerr))
    streamFailures.WithLabelValues("receive""watch").Inc()
   }
   errc <- rerr
  }
 }()

 select {
 case err = <-errc:
  close(sws.ctrlStream)

 case <-stream.Context().Done():
  err = stream.Context().Err()
  // the only server-side cancellation is noleader for now.
  if err == context.Canceled {
   err = rpctypes.ErrGRPCNoLeader
  }
 }

 sws.close()
 return err
}

當客戶端想要通過 Watch 結果監聽某一個 Key 或者一個範圍的變動,在每一次客戶端調用服務端上述方式都會創建兩個 Goroutine,其中一個協程會負責向監聽者發送數據變動的事件,另一個協程會負責處理客戶端發來的事件。

服務端 recvLoop

recvLoop 協程主要用來負責處理客戶端發來的事件。

// 位於 etcdserver/api/v3rpc/watch.go:216
func (sws *serverWatchStream) recvLoop() error {
 for {
  req, err := sws.gRPCStream.Recv()
  if err == io.EOF {
   return nil
  }
  if err != nil {
   return err
  }

  switch uv := req.RequestUnion.(type) {
  case *pb.WatchRequest_CreateRequest:
   if uv.CreateRequest == nil {
    break
   }

   creq := uv.CreateRequest
   if len(creq.Key) == 0 {
    // \x00 is the smallest key
    creq.Key = []byte{0}
   }
   if len(creq.RangeEnd) == 0 {
    // force nil since watchstream.Watch distinguishes
    // between nil and []byte{} for single key / >=
    creq.RangeEnd = nil
   }
   if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 {
    // support  >= key queries
    creq.RangeEnd = []byte{}
   }

   if !sws.isWatchPermitted(creq) {
    wr := &pb.WatchResponse{
     Header:       sws.newResponseHeader(sws.watchStream.Rev()),
     WatchId:      creq.WatchId,
     Canceled:     true,
     Created:      true,
     CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
    }

    select {
    case sws.ctrlStream <- wr:
    case <-sws.closec:
    }
    return nil
   }

   filters := FiltersFromRequest(creq)

   wsrev := sws.watchStream.Rev()
   rev := creq.StartRevision
   if rev == 0 {
    rev = wsrev + 1
   }
   id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
   if err == nil {
    sws.mu.Lock()
    if creq.ProgressNotify {
     sws.progress[id] = true
    }
    if creq.PrevKv {
     sws.prevKV[id] = true
    }
    if creq.Fragment {
     sws.fragment[id] = true
    }
    sws.mu.Unlock()
   }
   wr := &pb.WatchResponse{
    Header:   sws.newResponseHeader(wsrev),
    WatchId:  int64(id),
    Created:  true,
    Canceled: err != nil,
   }
   if err != nil {
    wr.CancelReason = err.Error()
   }
   select {
   case sws.ctrlStream <- wr:
   case <-sws.closec:
    return nil
   }

  case *pb.WatchRequest_CancelRequest:
   if uv.CancelRequest != nil {
    id := uv.CancelRequest.WatchId
    err := sws.watchStream.Cancel(mvcc.WatchID(id))
    if err == nil {
     sws.ctrlStream <- &pb.WatchResponse{
      Header:   sws.newResponseHeader(sws.watchStream.Rev()),
      WatchId:  id,
      Canceled: true,
     }
     sws.mu.Lock()
     delete(sws.progress, mvcc.WatchID(id))
     delete(sws.prevKV, mvcc.WatchID(id))
     delete(sws.fragment, mvcc.WatchID(id))
     sws.mu.Unlock()
    }
   }
  case *pb.WatchRequest_ProgressRequest:
   if uv.ProgressRequest != nil {
    sws.ctrlStream <- &pb.WatchResponse{
     Header:  sws.newResponseHeader(sws.watchStream.Rev()),
     WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels
    }
   }
  default:
   // we probably should not shutdown the entire stream when
   // receive an valid command.
   // so just do nothing instead.
   continue
  }
 }
}

在用於處理客戶端的 recvLoop 方法中調用了 mvcc 模塊暴露出的 watchStream.Watch 方法,該方法會返回一個可以用於取消監聽事件的 watchID;當 gRPC 流已經結束後者出現錯誤時,當前的循環就會返回,兩個 Goroutine 也都會結束。

服務端 sendLoop

如果出現了更新或者刪除事件,就會被髮送到 watchStream 持有的 Channel 中,而 sendLoop 會通過 select 來監聽多個 Channel 中的數據並將接收到的數據封裝成 pb.WatchResponse 結構並通過 gRPC 流發送給客戶端:

// 位於 etcdserver/api/v3rpc/watch.go:332
func (sws *serverWatchStream) sendLoop() {
 // watch ids that are currently active
for {
  select {
  case wresp, ok := <-sws.watchStream.Chan():
   evs := wresp.Events
   events := make([]*mvccpb.Event, len(evs))
   for i := range evs {
    events[i] = &evs[i]   }

   canceled := wresp.CompactRevision != 0
   wr := &pb.WatchResponse{
    Header:          sws.newResponseHeader(wresp.Revision),
    WatchId:         int64(wresp.WatchID),
    Events:          events,
    CompactRevision: wresp.CompactRevision,
    Canceled:        canceled,
   }

   sws.gRPCStream.Send(wr)

  case c, ok := <-sws.ctrlStream: // ...
  case <-progressTicker.C: // ...
  case <-sws.closec:
   return
  }
 }
}

對於每一個 Watch 請求來說,watchServer 會根據請求創建兩個用於處理當前請求的 Goroutine,這兩個協程會與更底層的 mvcc 模塊協作提供監聽和回調功能:

到這裏,我們對於 Watch 功能的介紹就差不多結束了,從對外提供的接口到底層的使用的數據結構以及具體實現,其他與 Watch 功能相關的話題可以直接閱讀 etcd 的源代碼瞭解更加細節的實現。

Watch 異常場景

上述是正常流程,但是會有很多不正常的情況發生。可以知道,消息都是通過一個 Chan 發送出去,但如果消費者消費速度慢,Chan 就容易堆積。Chan 的空間不可能無限大,那就必然會有滿的時候,滿了後該怎麼辦呢?

接下來就要討論前面小結所提及的 unsynced、victims 數組的作用。首先思考下 Chan 什麼時候會滿呢?

var (
 // chanBufLen is the length of the buffered chan
 // for sending out watched events.
 // TODO: find a good buf value. 1024 is just a random one that
 // seems to be reasonable.
 chanBufLen = 1024

 // maxWatchersPerSync is the number of watchers to sync in a single batch
 maxWatchersPerSync = 512
)

代碼中 Chan 的長度是 1024。不過這也是一個隨機值,只是沒有現在更好的選擇。

chan 一旦滿了,會發生以下操作:

// 位於 mvcc/watchable_store.go:438
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
 var victim watcherBatch
 for w, eb := range newWatcherBatch(&s.synced, evs) {
  if eb.revs != 1 {
   s.store.lg.Panic(
    "unexpected multiple revisions in watch notification",
    zap.Int("number-of-revisions", eb.revs),
   )
  }
  if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
   pendingEventsGauge.Add(float64(len(eb.evs)))
  } else {
   // move slow watcher to victims
   w.minRev = rev + 1
   if victim == nil {
    victim = make(watcherBatch)
   }
   w.victim = true
   victim[w] = eb
   s.synced.delete(w)
   slowWatcherGauge.Inc()
  }
 }
 s.addVictim(victim)
}

notify 通知一個事實,即在給定修訂版中的給定事件只是發生在監視事件鍵的觀察者身上。watcher 會記錄當前的 Revision,並將自身標記爲受損的。此次的變更操作會被保存到 watchableStore 的 victims 中。同時該 watcher 會被從 synced 踢出。

假設此時有一個寫操作:foo=f1。而正好 Chan 此時剛滿,則監聽 foo 的 watcher 將從 synced 中踢出,同時 foo=f1 被保存到 victims 中。

接下來對 foo 的任何變更,該 watcher 都不會記錄。那這些消息就都丟掉了嗎?當然不是,watcher 變成受損狀態時記錄下了當時的 Revision,這個很重要。

syncVictimsLoop 清除 victims

在上面的場景中,我們知道,隊列滿時,當時變更的 Event 被放入了 victims 中。這個協程就會試圖清除這個 Event。怎麼清除呢?協程會不斷嘗試讓 watcher 發送這個 Event,一旦隊列不滿,watcher 將這個 Event 發出後。該 watcher 就被劃入了 unsycned 中,同時不再是受損狀態。

// 位於 mvcc/watchable_store.go:246
// syncVictimsLoop tries to write precomputed watcher responses to
// watchers that had a blocked watcher channel
func (s *watchableStore) syncVictimsLoop() {
 defer s.wg.Done()

 for {
  for s.moveVictims() != 0 {
   // try to update all victim watchers
  }
  s.mu.RLock()
  isEmpty := len(s.victims) == 0
  s.mu.RUnlock()

  var tickc <-chan time.Time
  if !isEmpty {
   tickc = time.After(10 * time.Millisecond)
  }

  select {
  case <-tickc:
  case <-s.victimc:
  case <-s.stopc:
   return
  }
 }
}

// moveVictims tries to update watches with already pending event data
func (s *watchableStore) moveVictims() (moved int) {
 s.mu.Lock()
 victims := s.victims
 s.victims = nil
 s.mu.Unlock()

 var newVictim watcherBatch
 for _, wb := range victims {
  // try to send responses again
  for w, eb := range wb {
   // watcher has observed the store up to, but not including, w.minRev
   rev := w.minRev - 1
   if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
    pendingEventsGauge.Add(float64(len(eb.evs)))
   } else {
    if newVictim == nil {
     newVictim = make(watcherBatch)
    }
    newVictim[w] = eb
    continue
   }
   moved++
  }

  // assign completed victim watchers to unsync/sync
  s.mu.Lock()
  s.store.revMu.RLock()
  curRev := s.store.currentRev
  for w, eb := range wb {
   if newVictim != nil && newVictim[w] != nil {
    // couldn't send watch response; stays victim
    continue
   }
   w.victim = false
   if eb.moreRev != 0 {
    w.minRev = eb.moreRev
   }
   if w.minRev <= curRev {
    s.unsynced.add(w)
   } else {
    slowWatcherGauge.Dec()
    s.synced.add(w)
   }
  }
  s.store.revMu.RUnlock()
  s.mu.Unlock()
 }

 if len(newVictim) > 0 {
  s.mu.Lock()
  s.victims = append(s.victims, newVictim)
  s.mu.Unlock()
 }

 return moved
}

此時 syncWatchersLoop 協程就開始起作用。由於在受損狀態下,這個 watcher 已經錯過了很多消息。爲了追回進度,協程會根據 watcher 保存的 Revision,找出受損之後所有的消息,將關於 foo 的消息全部給 watcher,當 watcher 將這些消息都發送出去後。watcher 就脫離了 unsynced,成爲了 synced。

至此就解決了 Chan 滿導致的問題。同時也闡明瞭 Watch 的設計實現。

小結

watch 可以用來監聽一個或一組 key,key 的任何變化都會發出事件消息。某種意義上講,etcd 也是一種發佈訂閱模式。

我們通過介紹 watch 的用法,引入對 etcd watch 機制實現的分析和講解。watchableStore 負責了註冊、管理以及觸發 Watcher 的功能。watchableStore 將 watcher 劃分爲 synced 、unsynced 以及異常狀態下的 victim 三類。在 etcd 啓動時,WatchableKV 模塊啓動了 syncWatchersLoop 和 syncVictimsLoop 異步 goroutine,用以負責不同場景下的事件推送,並提供了事件重試機制,保證事件都能發送出去給到客戶端。

aoho 求索 aoho 求索是一個分享服務端開發技術的公衆號,主要涉及 Java、Golang 等語言,介紹微服務架構和高併發相關的實踐。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/iw3ML2z3137rDSQnQNiBmw