K8s apiserver watch 機制淺析
最近有一個業務需求,需要實現多集羣 watch 功能,多集羣的控制面 apiserver 需要在每個子集羣的資源發生改變後跟 k8s 一樣將資源的事件發送給客戶端。客戶端的 client-go 通過多集羣控制面的 kubeconfig 文件新建一個 informer 並 list and watch 所有的子集羣事件,從而在一個統一的控制面觀察和處理多個集羣的資源變化。因此抱着學習的目的,讀了幾遍 k8s 相關的源碼,感受頗深。
K8s 的 apiserver 是 k8s 所有組件的流量入口,其他的所有的組件包括 kube-controller-manager,kubelet,kube-scheduler 等通過 list-watch 機制向 apiserver 發起 list watch 請求,根據收到的事件處理後續的請求。watch 機制本質上是使客戶端和服務端建立長連接,並將服務端的變化實時發送給客戶端的方式減小服務端的壓力。
k8s 的 apiserver 實現了兩種長連接方式:Chunked transfer encoding(分塊傳輸編碼) 和 Websocket,其中基於 chunked 的方式是 apiserver 的默認配置。k8s 的 watch 機制的實現依賴 etcd v3 的 watch 機制,etcd v3 使用的是基於 HTTP/2 的 gRPC 協議,雙向流的 Watch API 設計,實現了連接多路複用。etcd 裏存儲的 key 的任何變化都會發送給客戶端。
下面我們以 1.24.3 版本的 k8s 源碼爲例,從兩個方面介紹 k8s 的 watch 機制。
作者:段朦, 中國移動雲能力中心軟件開發工程師,專注於雲原生領域。
01
kubeapiserver 對 etcd 的 listwatch 機制
說到 kube-apiserver 對 etcd 的 list-watch,不得不提到一個關鍵的 struct:cacher。爲了減輕 etcd 的壓力,kube-apiserver 本身對 etcd 實現了 list-watch 機制,將所有對象的最新狀態和最近的事件存放到 cacher 裏,所有外部組件對資源的訪問都經過 cacher。我們看下 cacher 的數據結構(爲了篇幅考慮,這裏保留了幾個關鍵的子結構):
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
type Cacher struct {
// incoming 事件管道, 會被分發給所有的watchers
incoming chan watchCacheEvent
//storage 的底層實現
storage storage.Interface
// 對象類型
objectType reflect.Type
// watchCache 滑動窗口,維護了當前kind的所有的資源,和一個基於滑動窗口的最近的事件數組
watchCache *watchCache
// reflector list並watch etcd 並將事件和資源存到watchCache中
reflector *cache.Reflector
// watchersBuffer 代表着所有client-go客戶端跟apiserver的連接
watchersBuffer []*cacheWatcher
....
}
下面看下 cacher 的創建過程
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func NewCacherFromConfig(config Config) (*Cacher, error) {
...
cacher := &Cacher{
...
incoming: make(chan watchCacheEvent, 100),
...
}
...
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
reflector.WatchListPageSize = storageWatchListPageSize
cacher.watchCache = watchCache
cacher.reflector = reflector
go cacher.dispatchEvents() // 1
cacher.stopWg.Add(1)
go func() {
defer cacher.stopWg.Done()
defer cacher.terminateAllWatchers()
wait.Until(
func() {
if !cacher.isStopped() {
cacher.startCaching(stopCh) // 2
}
}, time.Second, stopCh,
)
}()
return cacher, nil
}
可以看到,在創建 cacher 的時候,也創建了 watchCache(用於保存事件和所有資源)和 reflactor(執行對 etcd 的 list-watch 並更新 watchCache)。創建 cacher 的時候同時開啓了兩個協程,註釋 1 處 cacher.dispatchEvents() 用於從 cacher 的 incoming 管道里獲取事件,並放到 cacheWatcher 的 input 裏。
處理邏輯可以看下面兩段代碼
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *Cacher) dispatchEvents() {
...
for {
select {
case event, ok := <-c.incoming:
if !ok {
return
}
if event.Type != watch.Bookmark {
// 從incoming通道中獲取事件,併發送給交給dispatchEvent方法處理
c.dispatchEvent(&event)
}
lastProcessedResourceVersion = event.ResourceVersion
metrics.EventsCounter.WithLabelValues(c.objectType.String()).Inc()
...
case <-c.stopCh:
return
}
}
}
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
c.startDispatching(event)
defer c.finishDispatching()
if event.Type == watch.Bookmark {
for _, watcher := range c.watchersBuffer {
watcher.nonblockingAdd(event)
}
} else {
wcEvent := *event
setCachingObjects(&wcEvent, c.versioner)
event = &wcEvent
c.blockedWatchers = c.blockedWatchers[:0]
// watchersBuffer 是一個數組,維護着所有client-go跟apiserver的watch連接,產生的cacheWatcher。
for _, watcher := range c.watchersBuffer {
if !watcher.nonblockingAdd(event) {
c.blockedWatchers = append(c.blockedWatchers, watcher)
}
}
...
}
}
watchersBuffer 是一個數組,維護着所有 client-go 跟 apiserver 的 watch 連接產生的 cacheWatcher,因此 CacheWatcher 跟發起 watch 請求的 client-go 的客戶端是一對一的關係。當 apiserver 收到一個 etcd 的事件之後,會將這個事件發送到所有的 cacheWatcher 的 input channel 裏。
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
select {
case c.input <- event:
return true
default:
return false
}
}
cacherWatcher 的 struct 結構如下
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
type cacheWatcher struct {
input chan *watchCacheEvent
result chan watch.Event
done chan struct{}
filter filterWithAttrsFunc
stopped bool
forget func()
versioner storage.Versioner
// The watcher will be closed by server after the deadline,
// save it here to send bookmark events before that.
deadline time.Time
allowWatchBookmarks bool
// Object type of the cache watcher interests
objectType reflect.Type
// human readable identifier that helps assigning cacheWatcher
// instance with request
identifier string
}
可以看到,cacherWatcher 不用於存儲數據,只是實現了 watch 接口,並且維護了兩個 channel,input channel 用於獲取從 cacher 中的 incoming 通道中的事件,result channel 用於跟 client-go 的客戶端交互,客戶端的 informer 發起 watch 請求後,會從這個 chanel 裏獲取事件進行後續的處理。
註釋 2 處開啓了另外一個協程,cacher.startCaching(stopCh) ,實際上調用了 cacher 的 reflector 的 listAndWatch 方法,這裏的 reflector 跟 informer 的 reflector 一樣,list 方法是獲取 etcd 裏的所有資源並對 reflector 的 store 做一次整體的 replace 替換,這裏的 store 就是上面說的 watchCache,watchCache 實現了 store 接口,watch 方法是 watch etcd 的資源,並從 watcher 的 resultChan 裏拿到事件,根據事件的類型,調用 watchCache 的 add,update,或 delete 方法。startCaching 執行對 etcd 的 listAndWatch
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
...
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.objectType.String(), err)
}
}
reflector 的 list 方法裏的 syncWith 方法將 list 得到的結果替換放到 watchCache 裏
staging/src/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
reflector 的 list 方法裏的 watchHandler 函數傳入 watch etcd 得到的 watcher 和 store(即 watchCache),並根據 watcher 的 resultChan 通道里收到的事件類型執行 watchCache 相應的方法(Add,Delete,Update)。
staging/src/k8s.io/client-go/tools/cache/reflector.go
func watchHandler(start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
clock clock.Clock,
errc chan error,
stopCh <-chan struct{},
) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
if expectedType != nil {
if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
continue
}
}
if expectedGVK != nil {
if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
continue
}
resourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
}
...
setLastSyncResourceVersion(resourceVersion)
if rvu, ok := store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(resourceVersion)
}
eventCount++
}
}
}
上文說到,reflector 執行 ListAndWatch 更新 watchCache 保存的資源數據,下面看下 watchCache 的 replace 和 add 方法,看下 reflector 是如何操作 watchCache 保存的資源的。
replace 執行了 watchCache 的 store 的 replace 方法,store 是 threadSafeMap 的實現,實際上更新了底層的 threadSafeMap,用於當前資源的所有實例。
staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go
func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
...
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
return err
}
...
}
add 方法同樣更新了底層了 threadSafeMap,同時執行了一個 processEvent 方法,上文說到 watchCache 維護了一個基於事件的數組 []*watchCacheEvent,數組採用滑動窗口算法,長度固定爲 100,processEvent 會一直更新這個數組,後面的事件會擠掉最前面的事件,代碼如下
staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
...
if err := func() error {
// TODO: We should consider moving this lock below after the watchCacheEvent
// is created. In such situation, the only problematic scenario is Replace(
// happening after getting object from store and before acquiring a lock.
// Maybe introduce another lock for this purpose.
w.Lock()
defer w.Unlock()
w.updateCache(wcEvent)
w.resourceVersion = resourceVersion
defer w.cond.Broadcast()
...
return updateFunc(elem)
}(); err != nil {
return err
}
if w.eventHandler != nil {
w.eventHandler(wcEvent)
}
}
staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go
func (w *watchCache) updateCache(event *watchCacheEvent) {
w.resizeCacheLocked(event.RecordTime)
if w.isCacheFullLocked() {
// Cache is full - remove the oldest element.
w.startIndex++
}
w.cache[w.endIndex%w.capacity] = event
w.endIndex++
}
至此,cacher 創建時創建的兩個協程處理過程分析完了,我們做下簡單的總結,創建 cacher 的時候開啓了兩個協程,第一個協程從 cacher 的 incoming 通道里取出事件放到 cacheWatcher 的 input 通道里,而 cacheWatcher 是本地客戶端創建一個 watch 請求都會生成一個,這個我們下一章再說。另外一個協程主要做的事情就是 reflector 執行 listAndWatch 方法並更新 cacher 裏的 watchCache,具體的來說,就是更新 watchCache 裏的基於滑動窗口算法的事件數組和維護當前 kind 的資源的所有實例的 treadSafeMap。這裏還有兩個點我們沒有明確:1.cacher 是什麼時候及誰創建的 2.cacher 的 incoming 通道里的事件是哪裏來的,這個通道里的時間跟 reflector 的 listAndWatch 方法裏的執行對 etcd 的 watch 請求的 watcher 的通道里事件是否同步?帶着這些問題,我們繼續看下代碼,第一個問題,可以看到 apiserver 在創建 storage 的時候創建了 cacher,說明 apiserver 在 GVK 註冊到 apiserver 的時候就創建了相應資源的 cacher,這裏調用鏈太深,因此不貼代碼了。第二個問題,我們先看下 incoming 通道里事件是如何來的,注意這裏是 cacher 的 processEvent 方法處理的。
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *Cacher) processEvent(event *watchCacheEvent) {
if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
}
c.incoming <- *event
}
看下這個方法是哪裏用到的,由上面的 NewCacherFromConfig 方法可以看到是創建 cacher 的時候,創建 watchCache 的時候傳入的。watchCache 定義了一個 eventHandler 用於處理 listAndWatch 收到的事件,由上面的代碼 watchCache 的 processEvent 方法可以看到,在更新 watchCache 之後,會根據是否有 eventHandler 執行 eventHandler 的 func,即上面的 cacher 的 processEvent。至此,第二個問題也變的很清晰,cacher 的 incoming 通道里的事件是 watch etcd 收到的事件更新 watchCache 之後處理的。這一章講了 apiserver 對 etcd 的 list 和 watch 機制,apiserver 收到事件之後本身做了緩存,並將事件發送給 cacheWatcher 的 input 通道里,由 cacherWatcher 處理跟客戶端的連接,下一章我們講一下本地客戶端跟 apiserver 的 watch 機制的實現。
02
客戶端對 apiserver 的 watch 機制實現
apiserver 對 list 接口增加了一個 watch 參數,客戶端可以向 apiserver 通過增加一個 watch=true 參數發起 watch 請求,https://{host:6443}/apis/apps/v1/namespaces/default/deployments?watch=true
apiserver 的 hander 在解析到 watch 參數爲 true 時,進行 watch 請求的處理
staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go
func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
...
if opts.Watch || forceWatch {
if rw == nil {
...
watcher, err := rw.Watch(ctx, &opts)
...
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, outputMediaType, req, w, timeout)
})
return
}
...
}
}
可以看到,當客戶端發起 watch 請求時,實際上調用了 watcher 的 watch 接口,這裏的 watcher 實際上是 watch 接口的實現,apiserver 根據 url 的路徑參數,針對不同的 watch 請求強轉爲不同類型的 watcher 實現,k8s 的內置資源大都繼承了 REST 結構體,他的底層 storage 就是 cacher,因此這裏實際上就是調用了 cacher 的 watch 方法,在看一下 serveWatch 的實現
staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go
func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
...
server := &WatchServer{
Watching: watcher,
Scope: scope,
UseTextFraming: useTextFraming,
MediaType: mediaType,
Framer: framer,
Encoder: encoder,
EmbeddedEncoder: embeddedEncoder,
Fixup: func(obj runtime.Object) runtime.Object {
result, err := transformObject(ctx, obj, options, mediaTypeOptions, scope, req)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err))
return obj
}
// When we are transformed to a table, use the table options as the state for whether we
// should print headers - on watch, we only want to print table headers on the first object
// and omit them on subsequent events.
if tableOptions, ok := options.(*metav1.TableOptions); ok {
tableOptions.NoHeaders = true
}
return result
},
TimeoutFactory: &realTimeoutFactory{timeout},
}
server.ServeHTTP(w, req)
}
創建了一個 watchServer,並執行 watchServer 的 ServeHTTP 方法,看一下 ServeHTTP 的實現
staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
kind := s.Scope.Kind
if wsstream.IsWebSocketRequest(req) {
w.Header().Set("Content-Type", s.MediaType)
websocket.Handler(s.HandleWS).ServeHTTP(w, req)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
utilruntime.HandleError(err)
s.Scope.err(errors.NewInternalError(err), w, req)
return
}
framer := s.Framer.NewFrameWriter(w)
if framer == nil {
// programmer error
err := fmt.Errorf("no stream framing support is available for media type %q", s.MediaType)
utilruntime.HandleError(err)
s.Scope.err(errors.NewBadRequest(err.Error()), w, req)
return
}
var e streaming.Encoder
var memoryAllocator runtime.MemoryAllocator
if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator {
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator)
} else {
e = streaming.NewEncoder(framer, s.Encoder)
}
// ensure the connection times out
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
defer cleanup()
// begin the stream
w.Header().Set("Content-Type", s.MediaType)
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()
var unknown runtime.Unknown
internalEvent := &metav1.InternalEvent{}
outEvent := &metav1.WatchEvent{}
buf := &bytes.Buffer{}
ch := s.Watching.ResultChan()
done := req.Context().Done()
embeddedEncodeFn := s.EmbeddedEncoder.Encode
if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
if memoryAllocator == nil {
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
}
embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error {
return encoder.EncodeWithAllocator(obj, w, memoryAllocator)
}
}
for {
select {
case <-done:
return
case <-timeoutCh:
return
case event, ok := <-ch:
if !ok {
// End of results.
return
}
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
obj := s.Fixup(event.Object)
if err := embeddedEncodeFn(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
return
}
// ContentType is not required here because we are defaulting to the serializer
// type
unknown.Raw = buf.Bytes()
event.Object = &unknown
metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw)))
*outEvent = metav1.WatchEvent{}
// create the external type directly and encode it. Clients will only recognize the serialization we provide.
// The internal event is being reused, not reallocated so its just a few extra assignments to do it this way
// and we get the benefit of using conversion functions which already have to stay in sync
*internalEvent = metav1.InternalEvent(event)
err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(internalEvent, outEvent, nil)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to convert watch object: %v", err))
// client disconnect.
return
}
if err := e.Encode(outEvent); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v (%#v)", outEvent, err, e))
// client disconnect.
return
}
if len(ch) == 0 {
flusher.Flush()
}
buf.Reset()
}
}
}
可以看到,這裏主要就是處理長連接發送給客戶端的事件,讀取 watcher 的 resultChan 裏的事件,持續不斷的放到 http response 的流當中,如果客戶端發起的是 websocket 請求,則直接處理 watcher 的 resultChan 裏的事件,如果是正常的 http 請求則需要修改請求頭建立 http 1.1 的長連接。
上面說到,客戶端發起 watch 請求時,apiserver 實際上調用的是 cacher 的 Watch 方法,下面看一下 Watch 方法
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
...
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType, identifier)
...
cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher(err), nil
}
func() {
c.Lock()
defer c.Unlock()
// Update watcher.forget function once we can compute it.
watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
// Add it to the queue only when the client support watch bookmarks.
if watcher.allowWatchBookmarks {
c.bookmarkWatchers.addWatcher(watcher)
}
c.watcherIdx++
}()
go watcher.processInterval(ctx, cacheInterval, watchRV)
return watcher, nil
}
可以看到,當客戶端發起 watch 請求時,apiserver 調用 cacher 的 watch 方法的時候創建了 CacheWatcher,因此客戶端的 watch 請求和 cachWatcher 是一一對應的。cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV) 是指根據客戶端傳過來的 resourceVersion 獲取 watchCache 滑動窗口裏大於當前 resourceVersion 的事件,併發送給後續的協程 go watcher.processInterval(ctx, cacheInterval, watchRV) 處理,防止客戶端的 watch 連接斷開可能導致的事件丟失。go watcher.processInterval(ctx, cacheInterval, watchRV) 協程中將首次 watch 時滑動窗口中的事件和後續 watch input 通道中收到的事件放到 cacheWatcher 的 resultChan 裏。代碼如下
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
...
initEventCount := 0
/* 首次watch 獲取cacheInterval 的事件併發送到resultChan*/
for {
event, err := cacheInterval.Next()
if err != nil {
klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
return
}
if event == nil {
break
}
c.sendWatchCacheEvent(event)
resourceVersion = event.ResourceVersion
initEventCount++
}
/* 後續建立watch連接之後,將input通道中的事件發送到resultChan*/
c.process(ctx, resourceVersion)
}
至此,k8s 的 apiserver 對 etcd 的 list 和 watch 以及對客戶端的 list watch 處理邏輯完成了閉環,我們可以用一張圖表示
參考文檔:
https://github.com/kubernetes/kubernetes
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/utFvCI-gHIDKBazuyqEncQ