Kubernetes Go 週期執行器的實現
週期執行器主要用在同步網絡規則,如 iptables proixer、nettables proixer 中,一旦有新的網絡規則需要增加,就會週期性執行。
我們通過一張圖來快速瞭解一下 BoundedFrequencyRunner 的執行流程
BoundedFrequencyRunner
用於週期性的執行同步方法,並且提供了執行失敗進行重試,內部封裝了運行的限流器
type BoundedFrequencyRunner struct {
minInterval time.Duration // 運行之間的最小時間間隔,避免多次執行
maxInterval time.Duration // 運行之間的最大時間間隔
run chan struct{} // 觸發運行的信號
mu sync.Mutex // 保護對fn的運行和所有變更的互斥鎖
fn func() // 要運行的函數,有外部傳入,Proxier傳入的是syncLoop
lastRun time.Time // 上次運行的時間,可以用於計算下一次可能執行的時間
timer timer // 延遲運行的計時器
limiter rateLimiter // 運行的速度限流器
// 重試這部分在代碼裏面並沒有使用
retry chan struct{} // 重試信號量
retryMu sync.Mutex // 保護 retryTime 的互斥鎖
retryTime time.Time // 重試的時間
}
我們來看一下他的構造方法
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
// 這裏的目的是保證timer已經準備好接收下一個信號
// 如果實現沒有調用Reset直接開始監聽,有可能會多執行一次
timer := &realTimer{timer: time.NewTimer(0)}
<-timer.C()
return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
}
func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
bfr := &BoundedFrequencyRunner{
name: name,
fn: fn,
minInterval: minInterval,
maxInterval: maxInterval,
run: make(chan struct{}, 1),
retry: make(chan struct{}, 1),
timer: timer,
}
if minInterval == 0 {
// 沒有設置最小間隔時間則不需要限流器
bfr.limiter = nullLimiter{}
} else {
// 通過最小時間間隔來計算QPS,初始化限流器
qps := float32(time.Second) / float32(minInterval)
bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
}
return bfr
}
Loop 循環執行方法
流程圖如下:
整個循環通過監聽信號來啓動運行方法
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
// 第一次啓動的時候重置定時執行器
bfr.timer.Reset(bfr.maxInterval)
for {
select {
case <-stop: // 停止信號,則結束循環,這裏其實也可以用context來做
bfr.stop()
return
case <-bfr.timer.C(): // 到達定時器出發的時間
bfr.tryRun()
case <-bfr.run: // 主動寫入了run信號進行處理
bfr.tryRun()
case <-bfr.retry: // 寫入了重試信號進行重試
bfr.doRetry()
}
}
}
tryRun 真正執行方法,會嘗試獲取限流器,如果已經被限流不允許執行則重新計算下一次可能執行的時間
func (bfr *BoundedFrequencyRunner) tryRun() {
bfr.mu.Lock()
defer bfr.mu.Unlock()
// 限流器允許執行
if bfr.limiter.TryAccept() {
bfr.fn()
bfr.lastRun = bfr.timer.Now()
bfr.timer.Stop()
bfr.timer.Reset(bfr.maxInterval)
return
}
// 限流器沒有token了,不允許執行,則計算下一次可能執行的時間
elapsed := bfr.timer.Since(bfr.lastRun) // 上一次運行的時間到這一次運行時間的間隔
nextPossible := bfr.minInterval - elapsed // 通過最小間隔計算下一次可能執行的時間
nextScheduled := bfr.timer.Remaining() // 上一次設置的下一次執行的時間
// 下一次可能執行的時間小於下一次調度的時間,則用可能執行的時間進行替換
if nextPossible < nextScheduled {
nextScheduled = nextPossible
}
// 重置下一次執行的時間
bfr.timer.Stop()
bfr.timer.Reset(nextScheduled)
}
Run 主動觸發
run 信號的寫入,如果已經有數據在 run channel 中的話則會丟棄信號量,如果 Loop 循環沒有被啓動,同樣不會立刻執行方法
func (bfr *BoundedFrequencyRunner) Run() {
select {
case bfr.run <- struct{}{}:
default:
}
}
retry 在實現後並沒有真正使用,這裏不做分析
Timer 週期性觸發
realTimer 主要用於控制週期性執行
type realTimer struct {
timer *time.Timer
next time.Time // 用作計時,可以返回下一次執行的時間
}
// 用於週期性的時鐘
func (rt *realTimer) C() <-chan time.Time {
return rt.timer.C
}
// 下一次執行的時間
func (rt *realTimer) Remaining() time.Duration {
return rt.next.Sub(time.Now())
}
// 重置下次執行的時間
func (rt *realTimer) Reset(d time.Duration) bool {
rt.next = time.Now().Add(d)
return rt.timer.Reset(d)
}
其他的方法是將 time 直接進行暴露,沒有做特殊封裝
runtime 包爲什麼要使用 go:linkname?
因爲 runtime 包內不是所有功能都會暴露成公共的 API,但是又希望在內部模塊之間實現信息的共享,所以通過 go:linkname 這種方式來進行導出
注意到這個是由於在 NewTimer 的包中看到 startTime 沒有方法體,而是把實現放在了 runtime 包中,然後通過 go:linkname 來進行鏈接
package time
func NewTimer(d Duration) *Timer {
// ...
startTimer(&t.r)
return t
}
func startTimer(*runtimeTimer)
runtime 包內的具體實現
package runtime
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
if raceenabled {
racerelease(unsafe.Pointer(t))
}
addtimer(t)
}
限流器的實現
先看一下整體流程:
RateLimiter 爲抽象出的限流器接口,來限制執行的併發數
type RateLimiter interface {
PassiveRateLimiter
}
type PassiveRateLimiter interface {
// 嘗試是否可以獲取token
TryAccept() bool
}
限流器的構造
type tokenBucketPassiveRateLimiter struct {
limiter *rate.Limiter
qps float32
clock clock.PassiveClock
}
func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps)
}
func newTokenBucketRateLimiterWithPassiveClock(limiter *rate.Limiter, c clock.PassiveClock, qps float32) *tokenBucketPassiveRateLimiter {
return &tokenBucketPassiveRateLimiter{
limiter: limiter,
qps: qps,
clock: c,
}
}
嘗試獲取執行的令牌,獲取成功纔可以執行
func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
}
func (lim *Limiter) AllowN(t time.Time, n int) bool {
return lim.reserveN(t, n, 0).ok
}
func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
defer lim.mu.Unlock()
// 未限流則直接返回成功
if lim.limit == Inf {
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: t,
}
} else if lim.limit == 0 {
// 如果併發度設置爲0但是允許n個併發則返回ok
var ok bool
if lim.burst >= n {
ok = true
lim.burst -= n
}
return Reservation{
ok: ok,
lim: lim,
tokens: lim.burst,
timeToAct: t,
}
}
//獲取當前桶中有的令牌
t, tokens := lim.advance(t)
// 計算剩餘的令牌是否夠用
tokens -= float64(n)
// 如果不夠用則計算需要等待的時間
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// 看併發度是否超過最大限度,並且是否進行等待
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// 返回限流的結果
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = t.Add(waitDuration)
lim.last = t
lim.tokens = tokens
lim.lastEvent = r.timeToAct
}
return r
}
advance 來計算令牌數,超過最大值則丟棄多餘的令牌
func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
// 更新最後獲取的時間,方便下次進行計算
last := lim.last
if t.Before(last) {
last = t
}
// 計算經過這段時間產生的令牌書
elapsed := t.Sub(last)
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
// 如果token已經溢出,那麼就設置成burst的值,丟掉多餘的tokens
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return t, tokens
}
回顧
最後回顧一下整個流程
-
Loop循環不斷監聽run和timer的信號量。 -
獲取信號量後嘗試執行邏輯,如果限流器沒有 token 則不執行
-
執行成功後重置 timer,進入下一次循環。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/WKz0nFtJXDJeNjzlgaGWww