K8s apiserver watch 機制淺析

最近有一個業務需求,需要實現多集羣 watch 功能,多集羣的控制面 apiserver 需要在每個子集羣的資源發生改變後跟 k8s 一樣將資源的事件發送給客戶端。客戶端的 client-go 通過多集羣控制面的 kubeconfig 文件新建一個 informer 並 list and watch 所有的子集羣事件,從而在一個統一的控制面觀察和處理多個集羣的資源變化。因此抱着學習的目的,讀了幾遍 k8s 相關的源碼,感受頗深。

K8s 的 apiserver 是 k8s 所有組件的流量入口,其他的所有的組件包括 kube-controller-manager,kubelet,kube-scheduler 等通過 list-watch 機制向 apiserver 發起 list watch 請求,根據收到的事件處理後續的請求。watch 機制本質上是使客戶端和服務端建立長連接,並將服務端的變化實時發送給客戶端的方式減小服務端的壓力。

k8s 的 apiserver 實現了兩種長連接方式:Chunked transfer encoding(分塊傳輸編碼) 和 Websocket,其中基於 chunked 的方式是 apiserver 的默認配置。k8s 的 watch 機制的實現依賴 etcd v3 的 watch 機制,etcd v3 使用的是基於 HTTP/2 的 gRPC 協議,雙向流的 Watch API 設計,實現了連接多路複用。etcd 裏存儲的 key 的任何變化都會發送給客戶端。

下面我們以 1.24.3 版本的 k8s 源碼爲例,從兩個方面介紹 k8s 的 watch 機制。

作者:段朦, 中國移動雲能力中心軟件開發工程師,專注於雲原生領域。

01

kube­apiserver 對 etcd 的 list­watch 機制

說到 kube-apiserver 對 etcd 的 list-watch,不得不提到一個關鍵的 struct:cacher。爲了減輕 etcd 的壓力,kube-apiserver 本身對 etcd 實現了 list-watch 機制,將所有對象的最新狀態和最近的事件存放到 cacher 裏,所有外部組件對資源的訪問都經過 cacher。我們看下 cacher 的數據結構(爲了篇幅考慮,這裏保留了幾個關鍵的子結構):

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

type Cacher struct {
   // incoming 事件管道, 會被分發給所有的watchers
   incoming chan watchCacheEvent
   //storage 的底層實現
   storage storage.Interface
   // 對象類型
   objectType reflect.Type
   // watchCache 滑動窗口,維護了當前kind的所有的資源,和一個基於滑動窗口的最近的事件數組
   watchCache *watchCache
   // reflector list並watch etcd 並將事件和資源存到watchCache中
   reflector  *cache.Reflector
   // watchersBuffer 代表着所有client-go客戶端跟apiserver的連接
   watchersBuffer []*cacheWatcher
   ....
}

下面看下 cacher 的創建過程

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

func NewCacherFromConfig(config Config) (*Cacher, error) {
      ...
   cacher := &Cacher{
      ...
      incoming:              make(chan watchCacheEvent, 100),
      ...
   }
      ...
    watchCache := newWatchCache(
      config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType)
   listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
   reflectorName := "storage/cacher.go:" + config.ResourcePrefix
   reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
   // Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
   // storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
   reflector.WatchListPageSize = storageWatchListPageSize
   cacher.watchCache = watchCache
   cacher.reflector = reflector
   go cacher.dispatchEvents() // 1
   cacher.stopWg.Add(1)
   go func() {
      defer cacher.stopWg.Done()
      defer cacher.terminateAllWatchers()
      wait.Until(
         func() {
            if !cacher.isStopped() {
               cacher.startCaching(stopCh)  // 2
            }
         }, time.Second, stopCh,
      )
   }()
   return cacher, nil
}

可以看到,在創建 cacher 的時候,也創建了 watchCache(用於保存事件和所有資源)和 reflactor(執行對 etcd 的 list-watch 並更新 watchCache)。創建 cacher 的時候同時開啓了兩個協程,註釋 1 處 cacher.dispatchEvents() 用於從 cacher 的 incoming 管道里獲取事件,並放到 cacheWatcher 的 input 裏。

處理邏輯可以看下面兩段代碼

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

func (c *Cacher) dispatchEvents() {
   ...
   for {
      select {
      case event, ok := <-c.incoming:
         if !ok {
            return
         }
         if event.Type != watch.Bookmark {
         // 從incoming通道中獲取事件,併發送給交給dispatchEvent方法處理
            c.dispatchEvent(&event)
         }
         lastProcessedResourceVersion = event.ResourceVersion
         metrics.EventsCounter.WithLabelValues(c.objectType.String()).Inc()
      ...
      case <-c.stopCh:
         return
      }
   }
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
   c.startDispatching(event)
   defer c.finishDispatching()
   if event.Type == watch.Bookmark {
      for _, watcher := range c.watchersBuffer {
         watcher.nonblockingAdd(event)
      }
   } else {
      wcEvent := *event
      setCachingObjects(&wcEvent, c.versioner)
      event = &wcEvent
      c.blockedWatchers = c.blockedWatchers[:0]
      // watchersBuffer 是一個數組,維護着所有client-go跟apiserver的watch連接,產生的cacheWatcher。
      for _, watcher := range c.watchersBuffer {
         if !watcher.nonblockingAdd(event) {
            c.blockedWatchers = append(c.blockedWatchers, watcher)
         }
      }
      ...
   }
}

 watchersBuffer 是一個數組,維護着所有 client-go 跟 apiserver 的 watch 連接產生的 cacheWatcher,因此 CacheWatcher 跟發起 watch 請求的 client-go 的客戶端是一對一的關係。當 apiserver 收到一個 etcd 的事件之後,會將這個事件發送到所有的 cacheWatcher 的 input channel 裏。

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
   select {
   case c.input <- event:
      return true
   default:
      return false
   }
}

cacherWatcher 的 struct 結構如下

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go 

type cacheWatcher struct {
   input     chan *watchCacheEvent
   result    chan watch.Event
   done      chan struct{}
   filter    filterWithAttrsFunc
   stopped   bool
   forget    func()
   versioner storage.Versioner
   // The watcher will be closed by server after the deadline,
   // save it here to send bookmark events before that.
   deadline            time.Time
   allowWatchBookmarks bool
   // Object type of the cache watcher interests
   objectType reflect.Type
   // human readable identifier that helps assigning cacheWatcher
   // instance with request
   identifier string
}

可以看到,cacherWatcher 不用於存儲數據,只是實現了 watch 接口,並且維護了兩個 channel,input channel 用於獲取從 cacher 中的 incoming 通道中的事件,result channel 用於跟 client-go 的客戶端交互,客戶端的 informer 發起 watch 請求後,會從這個 chanel 裏獲取事件進行後續的處理。

註釋 2 處開啓了另外一個協程,cacher.startCaching(stopCh) ,實際上調用了 cacher 的 reflector 的 listAndWatch 方法,這裏的 reflector 跟 informer 的 reflector 一樣,list 方法是獲取 etcd 裏的所有資源並對 reflector 的 store 做一次整體的 replace 替換,這裏的 store 就是上面說的 watchCache,watchCache 實現了 store 接口,watch 方法是 watch etcd 的資源,並從 watcher 的 resultChan 裏拿到事件,根據事件的類型,調用 watchCache 的 add,update,或 delete 方法。startCaching 執行對 etcd 的 listAndWatch

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go 

func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
    ...
   if err := c.reflector.ListAndWatch(stopChannel); err != nil {
      klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.objectType.String(), err)
   }
}

reflector 的 list 方法裏的 syncWith 方法將 list 得到的結果替換放到 watchCache 裏

staging/src/k8s.io/client-go/tools/cache/reflector.go 

func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
   found := make([]interface{}, 0, len(items))
   for _, item := range items {
      found = append(found, item)
   }
   return r.store.Replace(found, resourceVersion)
}

reflector 的 list 方法裏的 watchHandler 函數傳入 watch etcd 得到的 watcher 和 store(即 watchCache),並根據 watcher 的 resultChan 通道里收到的事件類型執行 watchCache 相應的方法(Add,Delete,Update)。

staging/src/k8s.io/client-go/tools/cache/reflector.go

func watchHandler(start time.Time,
   w watch.Interface,
   store Store,
   expectedType reflect.Type,
   expectedGVK *schema.GroupVersionKind,
   name string,
   expectedTypeName string,
   setLastSyncResourceVersion func(string),
   clock clock.Clock,
   errc chan error,
   stopCh <-chan struct{},
) error {
   eventCount := 0
   // Stopping the watcher should be idempotent and if we return from this function there's no way
   // we're coming back in with the same watch interface.
   defer w.Stop()
loop:
   for {
      select {
      case <-stopCh:
         return errorStopRequested
      case err := <-errc:
         return err
      case event, ok := <-w.ResultChan():
         if !ok {
            break loop
         }
         if event.Type == watch.Error {
            return apierrors.FromObject(event.Object)
         }
         if expectedType != nil {
            if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
               utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
               continue
            }
         }
         if expectedGVK != nil {
            if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
               utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
               continue
            }
         }
         meta, err := meta.Accessor(event.Object)
         if err != nil {
            utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
            continue
         }
         resourceVersion := meta.GetResourceVersion()
         switch event.Type {
         case watch.Added:
            err := store.Add(event.Object)
            if err != nil {
               utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
            }
         ...  
         setLastSyncResourceVersion(resourceVersion)
         if rvu, ok := store.(ResourceVersionUpdater); ok {
            rvu.UpdateResourceVersion(resourceVersion)
         }
         eventCount++
      }
   }
}

上文說到,reflector 執行 ListAndWatch 更新 watchCache 保存的資源數據,下面看下 watchCache 的 replace 和 add 方法,看下 reflector 是如何操作 watchCache 保存的資源的。

replace 執行了 watchCache 的 store 的 replace 方法,store 是 threadSafeMap 的實現,實際上更新了底層的 threadSafeMap,用於當前資源的所有實例。

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
   ... 
   if err := w.store.Replace(toReplace, resourceVersion); err != nil {
      return err
   }
   ...
}

add 方法同樣更新了底層了 threadSafeMap,同時執行了一個 processEvent 方法,上文說到 watchCache 維護了一個基於事件的數組 []*watchCacheEvent,數組採用滑動窗口算法,長度固定爲 100,processEvent 會一直更新這個數組,後面的事件會擠掉最前面的事件,代碼如下

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go 

func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
   ...
   if err := func() error {
      // TODO: We should consider moving this lock below after the watchCacheEvent
      // is created. In such situation, the only problematic scenario is Replace(
      // happening after getting object from store and before acquiring a lock.
      // Maybe introduce another lock for this purpose.
      w.Lock()
      defer w.Unlock()
      w.updateCache(wcEvent)
      w.resourceVersion = resourceVersion
      defer w.cond.Broadcast()
    ...
      return updateFunc(elem)
   }(); err != nil {
      return err
   }
   if w.eventHandler != nil {
     w.eventHandler(wcEvent)
   }
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go 

func (w *watchCache) updateCache(event *watchCacheEvent) {
   w.resizeCacheLocked(event.RecordTime)
   if w.isCacheFullLocked() {
      // Cache is full - remove the oldest element.
      w.startIndex++
   }
   w.cache[w.endIndex%w.capacity] = event
   w.endIndex++
}

至此,cacher 創建時創建的兩個協程處理過程分析完了,我們做下簡單的總結,創建 cacher 的時候開啓了兩個協程,第一個協程從 cacher 的 incoming 通道里取出事件放到 cacheWatcher 的 input 通道里,而 cacheWatcher 是本地客戶端創建一個 watch 請求都會生成一個,這個我們下一章再說。另外一個協程主要做的事情就是 reflector 執行 listAndWatch 方法並更新 cacher 裏的 watchCache,具體的來說,就是更新 watchCache 裏的基於滑動窗口算法的事件數組和維護當前 kind 的資源的所有實例的 treadSafeMap。這裏還有兩個點我們沒有明確:1.cacher 是什麼時候及誰創建的 2.cacher 的 incoming 通道里的事件是哪裏來的,這個通道里的時間跟 reflector 的 listAndWatch 方法裏的執行對 etcd 的 watch 請求的 watcher 的通道里事件是否同步?帶着這些問題,我們繼續看下代碼,第一個問題,可以看到 apiserver 在創建 storage 的時候創建了 cacher,說明 apiserver 在 GVK 註冊到 apiserver 的時候就創建了相應資源的 cacher,這裏調用鏈太深,因此不貼代碼了。第二個問題,我們先看下 incoming 通道里事件是如何來的,注意這裏是 cacher 的 processEvent 方法處理的。

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go 

func (c *Cacher) processEvent(event *watchCacheEvent) {
   if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
      // Monitor if this gets backed up, and how much.
      klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
   }
   c.incoming <- *event
}

看下這個方法是哪裏用到的,由上面的 NewCacherFromConfig 方法可以看到是創建 cacher 的時候,創建 watchCache 的時候傳入的。watchCache 定義了一個 eventHandler 用於處理 listAndWatch 收到的事件,由上面的代碼 watchCache 的 processEvent 方法可以看到,在更新 watchCache 之後,會根據是否有 eventHandler 執行 eventHandler 的 func,即上面的 cacher 的 processEvent。至此,第二個問題也變的很清晰,cacher 的 incoming 通道里的事件是 watch etcd 收到的事件更新 watchCache 之後處理的。這一章講了 apiserver 對 etcd 的 list 和 watch 機制,apiserver 收到事件之後本身做了緩存,並將事件發送給 cacheWatcher 的 input 通道里,由 cacherWatcher 處理跟客戶端的連接,下一章我們講一下本地客戶端跟 apiserver 的 watch 機制的實現。

02

客戶端對 apiserver 的 watch 機制實現

    apiserver 對 list 接口增加了一個 watch 參數,客戶端可以向 apiserver 通過增加一個 watch=true 參數發起 watch 請求,https://{host:6443}/apis/apps/v1/namespaces/default/deployments?watch=true

apiserver 的 hander 在解析到 watch 參數爲 true 時,進行 watch 請求的處理

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go 

func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
   return func(w http.ResponseWriter, req *http.Request) {
      ...
      if opts.Watch || forceWatch {
         if rw == nil {
         ...
         watcher, err := rw.Watch(ctx, &opts)
         ...
         metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
            serveWatch(watcher, scope, outputMediaType, req, w, timeout)
         })
         return
      }
      ...
   }
}

可以看到,當客戶端發起 watch 請求時,實際上調用了 watcher 的 watch 接口,這裏的 watcher 實際上是 watch 接口的實現,apiserver 根據 url 的路徑參數,針對不同的 watch 請求強轉爲不同類型的 watcher 實現,k8s 的內置資源大都繼承了 REST 結構體,他的底層 storage 就是 cacher,因此這裏實際上就是調用了 cacher 的 watch 方法,在看一下 serveWatch 的實現

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go 

func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
   ...
   server := &WatchServer{
      Watching: watcher,
      Scope:    scope,
      UseTextFraming:  useTextFraming,
      MediaType:       mediaType,
      Framer:          framer,
      Encoder:         encoder,
      EmbeddedEncoder: embeddedEncoder,
      Fixup: func(obj runtime.Object) runtime.Object {
         result, err := transformObject(ctx, obj, options, mediaTypeOptions, scope, req)
         if err != nil {
            utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err))
            return obj
         }
         // When we are transformed to a table, use the table options as the state for whether we
         // should print headers - on watch, we only want to print table headers on the first object
         // and omit them on subsequent events.
         if tableOptions, ok := options.(*metav1.TableOptions); ok {
            tableOptions.NoHeaders = true
         }
         return result
      },
      TimeoutFactory: &realTimeoutFactory{timeout},
   }
   server.ServeHTTP(w, req)
}

創建了一個 watchServer,並執行 watchServer 的 ServeHTTP 方法,看一下 ServeHTTP 的實現

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go 

func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
   kind := s.Scope.Kind
   if wsstream.IsWebSocketRequest(req) {
      w.Header().Set("Content-Type", s.MediaType)
      websocket.Handler(s.HandleWS).ServeHTTP(w, req)
      return
   }
   flusher, ok := w.(http.Flusher)
   if !ok {
      err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
      utilruntime.HandleError(err)
      s.Scope.err(errors.NewInternalError(err), w, req)
      return
   }
   framer := s.Framer.NewFrameWriter(w)
   if framer == nil {
      // programmer error
      err := fmt.Errorf("no stream framing support is available for media type %q", s.MediaType)
      utilruntime.HandleError(err)
      s.Scope.err(errors.NewBadRequest(err.Error()), w, req)
      return
   }
   var e streaming.Encoder
   var memoryAllocator runtime.MemoryAllocator
   if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator {
      memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
      defer runtime.AllocatorPool.Put(memoryAllocator)
      e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator)
   } else {
      e = streaming.NewEncoder(framer, s.Encoder)
   }
   // ensure the connection times out
   timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
   defer cleanup()
   // begin the stream
   w.Header().Set("Content-Type", s.MediaType)
   w.Header().Set("Transfer-Encoding", "chunked")
   w.WriteHeader(http.StatusOK)
   flusher.Flush()
   var unknown runtime.Unknown
   internalEvent := &metav1.InternalEvent{}
   outEvent := &metav1.WatchEvent{}
   buf := &bytes.Buffer{}
   ch := s.Watching.ResultChan()
   done := req.Context().Done()
   embeddedEncodeFn := s.EmbeddedEncoder.Encode
   if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
      if memoryAllocator == nil {
         // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
         // instead, we allocate the buffer for the entire watch session and release it when we close the connection.
         memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
         defer runtime.AllocatorPool.Put(memoryAllocator)
      }
      embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error {
         return encoder.EncodeWithAllocator(obj, w, memoryAllocator)
      }
   }
   for {
      select {
      case <-done:
         return
      case <-timeoutCh:
         return
      case event, ok := <-ch:
         if !ok {
            // End of results.
            return
         }
         metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
         obj := s.Fixup(event.Object)
         if err := embeddedEncodeFn(obj, buf); err != nil {
            // unexpected error
            utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
            return
         }
         // ContentType is not required here because we are defaulting to the serializer
         // type
         unknown.Raw = buf.Bytes()
         event.Object = &unknown
         metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw)))
         *outEvent = metav1.WatchEvent{}
         // create the external type directly and encode it.  Clients will only recognize the serialization we provide.
         // The internal event is being reused, not reallocated so its just a few extra assignments to do it this way
         // and we get the benefit of using conversion functions which already have to stay in sync
         *internalEvent = metav1.InternalEvent(event)
         err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(internalEvent, outEvent, nil)
         if err != nil {
            utilruntime.HandleError(fmt.Errorf("unable to convert watch object: %v", err))
            // client disconnect.
            return
         }
         if err := e.Encode(outEvent); err != nil {
            utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v (%#v)", outEvent, err, e))
            // client disconnect.
            return
         }
         if len(ch) == 0 {
            flusher.Flush()
         }
         buf.Reset()
      }
   }
}

可以看到,這裏主要就是處理長連接發送給客戶端的事件,讀取 watcher 的 resultChan 裏的事件,持續不斷的放到 http response 的流當中,如果客戶端發起的是 websocket 請求,則直接處理 watcher 的 resultChan 裏的事件,如果是正常的 http 請求則需要修改請求頭建立 http 1.1 的長連接。

上面說到,客戶端發起 watch 請求時,apiserver 實際上調用的是 cacher 的 Watch 方法,下面看一下 Watch 方法

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
   ...
   watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType, identifier)
   ...
   cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV)
   if err != nil {
      // To match the uncached watch implementation, once we have passed authn/authz/admission,
      // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
      // rather than a directly returned error.
      return newErrWatcher(err), nil
   }
   func() {
      c.Lock()
      defer c.Unlock()
      // Update watcher.forget function once we can compute it.
      watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
      c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
      // Add it to the queue only when the client support watch bookmarks.
      if watcher.allowWatchBookmarks {
         c.bookmarkWatchers.addWatcher(watcher)
      }
      c.watcherIdx++
   }()
   go watcher.processInterval(ctx, cacheInterval, watchRV)
   return watcher, nil
}

可以看到,當客戶端發起 watch 請求時,apiserver 調用 cacher 的 watch 方法的時候創建了 CacheWatcher,因此客戶端的 watch 請求和 cachWatcher 是一一對應的。cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV) 是指根據客戶端傳過來的 resourceVersion 獲取 watchCache 滑動窗口裏大於當前 resourceVersion 的事件,併發送給後續的協程 go watcher.processInterval(ctx, cacheInterval, watchRV) 處理,防止客戶端的 watch 連接斷開可能導致的事件丟失。go watcher.processInterval(ctx, cacheInterval, watchRV) 協程中將首次 watch 時滑動窗口中的事件和後續 watch input 通道中收到的事件放到 cacheWatcher 的 resultChan 裏。代碼如下

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go 

func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
   ...
   initEventCount := 0
   /* 首次watch 獲取cacheInterval 的事件併發送到resultChan*/
   for {
      event, err := cacheInterval.Next()
      if err != nil {
         klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
         return
      }
      if event == nil {
         break
      }
      c.sendWatchCacheEvent(event)
      resourceVersion = event.ResourceVersion
      initEventCount++
   }
   /* 後續建立watch連接之後,將input通道中的事件發送到resultChan*/
   c.process(ctx, resourceVersion)
}

至此,k8s 的 apiserver 對 etcd 的 list 和 watch 以及對客戶端的 list watch 處理邏輯完成了閉環,我們可以用一張圖表示

參考文檔:

https://github.com/kubernetes/kubernetes

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/utFvCI-gHIDKBazuyqEncQ