Kubernetes Go 週期執行器的實現

週期執行器主要用在同步網絡規則,如 iptables proixer、nettables proixer 中,一旦有新的網絡規則需要增加,就會週期性執行。

我們通過一張圖來快速瞭解一下 BoundedFrequencyRunner 的執行流程

BoundedFrequencyRunner

用於週期性的執行同步方法,並且提供了執行失敗進行重試,內部封裝了運行的限流器

type BoundedFrequencyRunner struct {
 minInterval time.Duration // 運行之間的最小時間間隔,避免多次執行
 maxInterval time.Duration // 運行之間的最大時間間隔

 run chan struct{} // 觸發運行的信號

 mu      sync.Mutex  // 保護對fn的運行和所有變更的互斥鎖
 fn      func()      // 要運行的函數,有外部傳入,Proxier傳入的是syncLoop
 lastRun time.Time   // 上次運行的時間,可以用於計算下一次可能執行的時間
 timer   timer       // 延遲運行的計時器
 limiter rateLimiter // 運行的速度限流器

 // 重試這部分在代碼裏面並沒有使用
 retry     chan struct{} // 重試信號量
 retryMu   sync.Mutex    // 保護 retryTime 的互斥鎖
 retryTime time.Time     // 重試的時間
}

我們來看一下他的構造方法

func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
 // 這裏的目的是保證timer已經準備好接收下一個信號
 // 如果實現沒有調用Reset直接開始監聽,有可能會多執行一次
 timer := &realTimer{timer: time.NewTimer(0)} 
 <-timer.C()                                  
 return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
}

func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
 bfr := &BoundedFrequencyRunner{
  name:        name,
  fn:          fn,
  minInterval: minInterval,
  maxInterval: maxInterval,
  run:         make(chan struct{}, 1),
  retry:       make(chan struct{}, 1),
  timer:       timer,
 }
 
 if minInterval == 0 {
  // 沒有設置最小間隔時間則不需要限流器
  bfr.limiter = nullLimiter{}
 } else {
  // 通過最小時間間隔來計算QPS,初始化限流器
  qps := float32(time.Second) / float32(minInterval)
  bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
 }
 return bfr
}

Loop 循環執行方法

流程圖如下:

整個循環通過監聽信號來啓動運行方法

func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
 // 第一次啓動的時候重置定時執行器
 bfr.timer.Reset(bfr.maxInterval)
 for {
  select {
  case <-stop: // 停止信號,則結束循環,這裏其實也可以用context來做
   bfr.stop()
   return
  case <-bfr.timer.C(): // 到達定時器出發的時間
   bfr.tryRun()
  case <-bfr.run: // 主動寫入了run信號進行處理
   bfr.tryRun()
  case <-bfr.retry: // 寫入了重試信號進行重試 
   bfr.doRetry()
  }
 }
}

tryRun 真正執行方法,會嘗試獲取限流器,如果已經被限流不允許執行則重新計算下一次可能執行的時間

func (bfr *BoundedFrequencyRunner) tryRun() {
 bfr.mu.Lock()
 defer bfr.mu.Unlock()

 // 限流器允許執行
 if bfr.limiter.TryAccept() {
  bfr.fn()
  bfr.lastRun = bfr.timer.Now()
  bfr.timer.Stop()
  bfr.timer.Reset(bfr.maxInterval)
  return
 }

 // 限流器沒有token了,不允許執行,則計算下一次可能執行的時間
 elapsed := bfr.timer.Since(bfr.lastRun)   // 上一次運行的時間到這一次運行時間的間隔
 nextPossible := bfr.minInterval - elapsed // 通過最小間隔計算下一次可能執行的時間
 nextScheduled := bfr.timer.Remaining()    // 上一次設置的下一次執行的時間
 // 下一次可能執行的時間小於下一次調度的時間,則用可能執行的時間進行替換
 if nextPossible < nextScheduled {
  nextScheduled = nextPossible
 }
 
 // 重置下一次執行的時間
 bfr.timer.Stop()
 bfr.timer.Reset(nextScheduled)
}

Run 主動觸發

run 信號的寫入,如果已經有數據在 run channel 中的話則會丟棄信號量,如果 Loop 循環沒有被啓動,同樣不會立刻執行方法

func (bfr *BoundedFrequencyRunner) Run() {
 select {
 case bfr.run <- struct{}{}:
 default:
 }
}

retry 在實現後並沒有真正使用,這裏不做分析

Timer 週期性觸發

realTimer 主要用於控制週期性執行

type realTimer struct {
 timer *time.Timer
 next  time.Time // 用作計時,可以返回下一次執行的時間
}

// 用於週期性的時鐘
func (rt *realTimer) C() <-chan time.Time {
 return rt.timer.C
}

// 下一次執行的時間
func (rt *realTimer) Remaining() time.Duration {
 return rt.next.Sub(time.Now())
}

// 重置下次執行的時間
func (rt *realTimer) Reset(d time.Duration) bool {
 rt.next = time.Now().Add(d)
 return rt.timer.Reset(d)
}

其他的方法是將 time 直接進行暴露,沒有做特殊封裝

runtime 包爲什麼要使用 go:linkname?

因爲 runtime 包內不是所有功能都會暴露成公共的 API,但是又希望在內部模塊之間實現信息的共享,所以通過 go:linkname 這種方式來進行導出

注意到這個是由於在 NewTimer 的包中看到 startTime 沒有方法體,而是把實現放在了 runtime 包中,然後通過 go:linkname 來進行鏈接

package time

func NewTimer(d Duration) *Timer {
 // ...
 startTimer(&t.r)
 return t
}

func startTimer(*runtimeTimer)

runtime 包內的具體實現

package runtime 

//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
 if raceenabled {
  racerelease(unsafe.Pointer(t))
 }
 addtimer(t)
}

限流器的實現

先看一下整體流程:

RateLimiter 爲抽象出的限流器接口,來限制執行的併發數

type RateLimiter interface {
 PassiveRateLimiter
}

type PassiveRateLimiter interface {
 // 嘗試是否可以獲取token
 TryAccept() bool
}

限流器的構造

type tokenBucketPassiveRateLimiter struct {
 limiter *rate.Limiter
 qps     float32
 clock   clock.PassiveClock
}

func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter {
 limiter := rate.NewLimiter(rate.Limit(qps), burst)
 return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps)
}

func newTokenBucketRateLimiterWithPassiveClock(limiter *rate.Limiter, c clock.PassiveClock, qps float32) *tokenBucketPassiveRateLimiter {
 return &tokenBucketPassiveRateLimiter{
  limiter: limiter,
  qps:     qps,
  clock:   c,
 }
}

嘗試獲取執行的令牌,獲取成功纔可以執行

func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
 return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
}

func (lim *Limiter) AllowN(t time.Time, n int) bool {
 return lim.reserveN(t, n, 0).ok
}

func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
 lim.mu.Lock()
 defer lim.mu.Unlock()
 // 未限流則直接返回成功
 if lim.limit == Inf {
  return Reservation{
   ok:        true,
   lim:       lim,
   tokens:    n,
   timeToAct: t,
  }
 } else if lim.limit == 0 {
  // 如果併發度設置爲0但是允許n個併發則返回ok
  var ok bool
  if lim.burst >= n {
   ok = true
   lim.burst -= n
  }
  return Reservation{
   ok:        ok,
   lim:       lim,
   tokens:    lim.burst,
   timeToAct: t,
  }
 }

 //獲取當前桶中有的令牌
 t, tokens := lim.advance(t)

 // 計算剩餘的令牌是否夠用
 tokens -= float64(n)

 // 如果不夠用則計算需要等待的時間
 var waitDuration time.Duration
 if tokens < 0 {
  waitDuration = lim.limit.durationFromTokens(-tokens)
 }

 // 看併發度是否超過最大限度,並且是否進行等待
 ok := n <= lim.burst && waitDuration <= maxFutureReserve

 // 返回限流的結果
 r := Reservation{
  ok:    ok,
  lim:   lim,
  limit: lim.limit,
 }
 if ok {
  r.tokens = n
  r.timeToAct = t.Add(waitDuration)
  lim.last = t
  lim.tokens = tokens
  lim.lastEvent = r.timeToAct
 }

 return r
}

advance 來計算令牌數,超過最大值則丟棄多餘的令牌

func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
 // 更新最後獲取的時間,方便下次進行計算
 last := lim.last
 if t.Before(last) {
  last = t
 }

 // 計算經過這段時間產生的令牌書
 elapsed := t.Sub(last)
 delta := lim.limit.tokensFromDuration(elapsed)
 tokens := lim.tokens + delta

 // 如果token已經溢出,那麼就設置成burst的值,丟掉多餘的tokens
 if burst := float64(lim.burst); tokens > burst {
  tokens = burst
 }
 return t, tokens
}

回顧

最後回顧一下整個流程

  1. Loop 循環不斷監聽 runtimer的信號量。

  2. 獲取信號量後嘗試執行邏輯,如果限流器沒有 token 則不執行

  3. 執行成功後重置 timer,進入下一次循環。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/WKz0nFtJXDJeNjzlgaGWww