難以駕馭的 Go timer,一文帶你參透計時器的奧祕

在實際的應用工程中,我們常常會需要多久後,或定時去做某個事情。甚至在分析標準庫 context 的父子級傳播時,都能見到等待多久後自動觸發取消事件的蹤影。

而在 Go 語言中,能夠完成這類運行的功能訴求就是標準庫 time,在具體的功能範疇上我們稱其爲 “計時器 “,是一個非常具有價值的一個模塊。在這篇文章中我們將對其做進一步的分析和研討。

什麼是 timer

可以控制時間,確保應用程序中的某段代碼在某個時刻運行。在 Go 語言中可以單次執行,也可以循環執行。

最常見的方式就是引用標準庫 time 去做一些事情,普通開發者經常使用到的標準庫代碼是:

1time.Now().Unix()
2
3

上述代碼可用於獲取當前時間的 Unix 時間戳,而在內部的具體實現上提供了 TimeTimer 以及 Ticker 的各類配套方法。

timer 基本特性

Timer

演示代碼:

1func main() {
2 timer := time.NewTimer(2 * time.Second)
3 <-timer.C
4 fmt.Println("我的腦子真的進煎魚了!")
5}
6
7

輸出結果:

1// 等待兩秒...
2我的腦子真的進煎魚了!
3
4

我們可以通過 time.NewTimer 方法定時在 2 秒進行程序的執行。而其還有個變種的用法,在做 channel 的源碼剖析時有發現

 1func main() {
 2 v := make(chan struct{})
 3 timer := time.AfterFunc(2*time.Second, func() {
 4  fmt.Println("我想在這個點喫煎魚!")
 5  v <- struct{}{}
 6 })
 7 defer timer.Stop()
 8 <-v
 9}
10
11

在等待 2 秒後,會立即調用 time.AfterFunc 所對應的匿名方法。在時間上我們也可以指定對應的具體時間,達到異步的定時執行等訴求。

Ticker

演示代碼:

 1func main() {
 2 ticker := time.NewTicker(time.Second)
 3 defer ticker.Stop()
 4 done := make(chan bool)
 5 go func() {
 6  time.Sleep(10 * time.Second)
 7  done <- true
 8 }()
 9 for {
10  select {
11  case <-done:
12   fmt.Println("Done!")
13   return
14  case t := <-ticker.C:
15   fmt.Println("炸煎魚: ", t.Unix())
16  }
17 }
18}
19
20

輸出結果:

1// 每隔一秒輸出一次
2炸煎魚:  1611666168
3炸煎魚:  1611666169
4炸煎魚:  1611666170
5炸煎魚:  1611666171
6...
7
8

我們通過 time.NewTicker 方法設定每 1 秒執行一次方法,因此在 for-select 中,我們會每 1 秒就可以自動 “炸一條煎魚”,真是快樂極了。

而由於我們在 goroutine 中通過 sleep 方法的設定了 done 變量的輸入,因此在 10 秒後就會結束炸煎魚的循環輸出,最終退出。

最小堆:四叉堆

在 Go 語言中,內置計時器的數據結構都會涉及到最小四叉堆,如下圖所示:

整體來講就是父節點一定比其子節點小,子節點之間沒有任何關係和大小的要求。

數據結構

在 Go 語言中每個計時器運行時的基本單元是 runtime.timer

 1type timer struct {
 2 pp puintptr
 3
 4 when   int64
 5 period int64
 6 f      func(interface{}, uintptr)
 7 arg    interface{}
 8 seq    uintptr
 9 nextwhen int64
10 status uint32
11}
12
13

但這類基本單元都不會是對用戶端暴露的結構體,在對外上我們直觀見的最多的是 time.NewTimer 所創建的 Timer 結構體:

1type Timer struct {
2 C <-chan Time
3 r runtimeTimer
4}
5
6

同時在計時器運行模式上自 Go1.14 起發生了變更,runtime.timer 改爲將每個 timer 均存儲在對應的處理器 P 中

1type p struct {
2 ...
3 timersLock mutex
4 timers []*timer
5 ...
6}
7
8

在處理器 P 上,timers 字段就是一個以最小四叉堆形式存儲的媒介。在時序上,需要立刻執行,或說需要越早執行的,就越排在堆的越上面:

實現原理

在瞭解了計時器的基本特性和數據結構後,我們進一步展開,一層層剖析其原理,看看其是何物。在 Go 語言中,計時器在運行時涉及十種狀態處理,分別涉及增、刪、改以及重置等操作。

計時器所包含的狀態如下:

這時候可能就會有小夥伴疑惑,各種啓動、刪除、停止、啓動是指代的是什麼意思?爲什麼會涉及到 P 的管理?

創建計時器

接下來我們依然是從 NewTimerNewTicker 方法開始入手:

 1func NewTimer(d Duration) *Timer {
 2 c := make(chan Time, 1)
 3 t := &Timer{
 4  C: c,
 5  r: runtimeTimer{
 6   when: when(d),
 7   f:    sendTime,
 8   arg:  c,
 9  },
10 }
11 startTimer(&t.r)
12 return t
13}
14
15

在該方法中,其主要包含如下動作:

NewTicker 方法與 NewTimer 類似,主要是增加了 period 字段:

 1func NewTicker(d Duration) *Ticker {
 2 c := make(chan Time, 1)
 3 t := &Ticker{
 4  C: c,
 5  r: runtimeTimer{
 6   when:   when(d),
 7   period: int64(d),
 8   f:      sendTime,
 9   arg:    c,
10  },
11 }
12 startTimer(&t.r)
13 return t
14}
15
16

Ticker 結構體中,period 字段用於表示計時器再次被喚醒的時間,可以便於做輪詢觸發。

啓動計時器

在前面調用 NewTimerNewTicker 方法時,會將新創建的新計時器 timer 加入到創建 timer 的 P 的最小堆中:

 1func addtimer(t *timer) {
 2 if t.when < 0 {
 3  t.when = maxWhen
 4 }
 5 if t.status != timerNoStatus {
 6  throw("addtimer called with initialized timer")
 7 }
 8 t.status = timerWaiting
 9
10 when := t.when
11
12 pp := getg().m.p.ptr()
13 lock(&pp.timersLock)
14 cleantimers(pp)
15 doaddtimer(pp, t)
16 unlock(&pp.timersLock)
17
18 wakeNetPoller(when)
19}
20
21

停止計時器

在計時器的運轉中,一般會調用 timer.Stop() 方法來停止 / 終止 / 刪除計時器。雖然說法多樣。但大家的真實目的是一樣的,就是讓這個 timer 從輪詢器中消失,也就是從處理器 P 的堆中移除 timer

 1func deltimer(t *timer) bool {
 2 for {
 3  switch s := atomic.Load(&t.status); s {
 4  case timerWaiting, timerModifiedLater:
 5   // timerWaiting/timerModifiedLater -> timerDeleted
 6   ...
 7  case timerModifiedEarlier:
 8      // timerModifiedEarlier -> timerModifying -> timerDeleted
 9   ...
10  case timerDeleted, timerRemoving, timerRemoved:
11      // timerDeleted/timerRemoving/timerRemoved 
12   return false
13  case timerRunning, timerMoving:
14      // timerRunning/timerMoving
15   osyield()
16  case timerNoStatus:
17   return false
18  case timerModifying:
19   osyield()
20  default:
21   badTimer()
22  }
23 }
24}
25
26

但移除也不是直接一個 delete 就完事的,其在真正的刪除方法 deltimer 中遵循了基本的規則處理:

  1. timerWaiting/timerModifiedLater -> timerDeleted。

  2. timerModifiedEarlier -> timerModifying -> timerDeleted。

  3. timerDeleted/timerRemoving/timerRemoved -> 無需變更,已經滿足條件。

  4. timerRunning/timerMoving/timerModifying -> 正在執行、移動中,無法停止,等待下一次狀態檢查再處理。

  5. timerNoStatus -> 無法停止,不滿足條件。

上述五個基本流轉邏輯就覆蓋了 runtimer.deltimer 方法了,若有進一步需求的可通過傳送門詳細閱讀。

修改 / 重置計時器

在應用程序的調度中,有時候因爲邏輯產生了變更,我們需要重置計時器。這時候一般會調用 timer.Reset() 方法來重新設置 Duration 值。

其表面對應的是 resetTimer 方法,但實際與修改計時器的 modtimer 方法是共用的:

1func resettimer(t *timer, when int64) bool {
2 return modtimer(t, when, t.period, t.f, t.arg, t.seq)
3}
4
5

因此在這節中我們可以將重置和修改計時器放在一起分析。修改計時器,本質上是需要變更現有計時器,而在 Go 語言的計時器中是需要遵循基本規則,因此 modtimer 遵循下述規則處理:

  1. timerWaiting    -> timerModifying -> timerModifiedXX

  2. timerModifiedXX -> timerModifying -> timerModifiedYY

  3. timerNoStatus   -> timerModifying -> timerWaiting

  4. timerRemoved    -> timerModifying -> timerWaiting

  5. timerDeleted    -> timerModifying -> timerModifiedXX

  6. timerRunning    -> 等待狀態改變,纔可以進行下一步

  7. timerMoving     -> 等待狀態改變,纔可以進行下一步

  8. timerRemoving   -> 等待狀態改變,纔可以進行下一步

  9. timerModifying  -> 等待狀態改變,纔可以進行下一步

 1func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
 2 ...
 3 if wasRemoved {
 4  t.when = when
 5  pp := getg().m.p.ptr()
 6  lock(&pp.timersLock)
 7  doaddtimer(pp, t)
 8  unlock(&pp.timersLock)
 9  
10  releasem(mp)
11  wakeNetPoller(when)
12 } else {
13  t.nextwhen = when
14  newStatus := uint32(timerModifiedLater)
15  if when < t.when {
16   newStatus = timerModifiedEarlier
17  }
18  ...
19  releasem(mp)
20
21  if newStatus == timerModifiedEarlier {
22   wakeNetPoller(when)
23  }
24 }
25
26 return pending
27}
28
29

在完成了計時器的狀態處理後,會分爲兩種情況處理:

觸發計時器

在前面有提到 Go1.14 後,Go Timer 都已經歸屬到各個處理器 P 中去了,因此計時器的觸發分爲了兩個部分:

調度器觸發

調度器的觸發一共分兩種情況,一種是在調度循環的時候調用 checkTimers 方法進行計時器的觸發:

 1func schedule() {
 2 _g_ := getg()
 3
 4top:
 5 pp := _g_.m.p.ptr()
 6 pp.preempt = false
 7
 8 // 處理調度時的計時器觸發
 9 checkTimers(pp, 0)
10 ...
11
12 execute(gp, inheritTime)
13}
14
15

另外一種是當前處理器 P 沒有可執行的 Timer,且沒有可執行的 G。那麼按照調度模型,就會去竊取其他計時器和 G:

 1func findrunnable() (gp *g, inheritTime bool) {
 2 _g_ := getg()
 3
 4top:
 5 _p_ := _g_.m.p.ptr()
 6 ...
 7 now, pollUntil, _ := checkTimers(_p_, 0)
 8 ...
 9}
10
11

調度系統在計時器處不深究,我們進一步剖析具體觸發計時器的 checkTimers 方法:

 1func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
 2 if atomic.Load(&pp.adjustTimers) == 0 {
 3  next := int64(atomic.Load64(&pp.timer0When))
 4  if next == 0 {
 5   return now, 0, false
 6  }
 7  if now == 0 {
 8   now = nanotime()
 9  }
10  if now < next {
11   if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
12    return now, next, false
13   }
14  }
15 }
16
17 lock(&pp.timersLock)
18
19 adjusttimers(pp)
20    ...
21}
22
23
 1func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
 2 ...
 3 rnow = now
 4 if len(pp.timers) > 0 {
 5  if rnow == 0 {
 6   rnow = nanotime()
 7  }
 8  for len(pp.timers) > 0 {
 9   if tw := runtimer(pp, rnow); tw != 0 {
10    if tw > 0 {
11     pollUntil = tw
12    }
13    break
14   }
15   ran = true
16  }
17 }
18 ...
19}
20
21

在前面調整了 timers 切片中的最小堆的排序後,將會調用 runtimer 方法去真正運行所需要執行的 timer,完成觸計時器的發。

 1func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
 2 ...
 3 if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
 4  clearDeletedTimers(pp)
 5 }
 6
 7 unlock(&pp.timersLock)
 8
 9 return rnow, pollUntil, ran
10}
11
12

在最後掃尾階段,如果當前 G 的處理器與調用 checkTimers 方法所傳入的處理器一致,並且處理器中 timerDeleted 狀態的計時器數量是處理器 P 堆中的計時器的 1/4 以上,則調用 clearDeletedTimers 方法對已爲刪除狀態的的計時器進行清理。

系統監控觸發

即使是通過每次調度器調度和竊取的時候觸發,但畢竟是具有一定的隨機和不確定性。

因此係統監控觸發依然是一個兜底保障,在 Go 語言中 runtime.sysmon 方法承擔了這一個責任,存在觸發計時器的邏輯:

 1func sysmon() {
 2 ...
 3 for {
 4  ...
 5  next, _ := timeSleepUntil()
 6
 7  if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
 8   lock(&sched.lock)
 9   if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
10    if next > now {
11     ...
12     next, _ = timeSleepUntil()
13     lock(&sched.lock)
14     atomic.Store(&sched.sysmonwait, 0)
15     noteclear(&sched.sysmonnote)
16    }
17    idle = 0
18    delay = 20
19   }
20   unlock(&sched.lock)
21  }
22  ...
23 }
24}
25
26

在每次進行系統監控時,都會在流程上調用 timeSleepUntil 方法去獲取下一個計時器應觸發的時間,以及保存該計時器已打開的計時器堆的 P。

在獲取完畢後會馬上檢查當前是否存在 GC,若是正在 STW 則獲取調度互斥鎖。若發現下一個計時器的觸發時間已經過去,則重新調用 timeSleepUntil 獲取下一個計時器的時間和相應 P 的地址。

 1func sysmon() {
 2 ...
 3 for {
 4  ...
 5  lock(&sched.sysmonlock)
 6  {
 7   now1 := nanotime()
 8   if now1-now > 50*1000 /* 50µs */ {
 9    next, _ = timeSleepUntil()
10   }
11   now = now1
12  }
13  ...
14 }
15}
16
17

檢查 sched.sysmonlock 所花費的時間是否超過 50µs。若是,則有可能前面所獲取的下一個計時器觸發時間已過期,因此重新調用 timeSleepUntil 方法再次獲取。

 1func sysmon() {
 2 ...
 3 for {
 4  ...
 5  lastpoll := int64(atomic.Load64(&sched.lastpoll))
 6  if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
 7   atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
 8   list := netpoll(0) // non-blocking - returns list of goroutines
 9   if !list.empty() {
10    incidlelocked(-1)
11    injectglist(&list)
12    incidlelocked(1)
13   }
14  }
15  if next < now {
16   startm(nil, false)
17  }
18 }
19}
20
21

如果發現超過 10ms 的時間沒有進行 netpoll 網絡輪詢,則主動調用 netpoll 方法觸發輪詢。

同時如果存在不可搶佔的處理器 P,則調用 startm 方法來運行那些應該運行,但沒有在運行的計時器。

運行計時器

runtimer 方法主要承擔計時器的具體運行,同時也會針對計時器的不同狀態(含刪除、修改、等待等)都進行了對應的處理,也相當於是個大的集中處理中樞了。例如在 timerDeleted 狀態下的計時器將會進行刪除。

其遵循下述規則處理

  1. timerNoStatus    -> 恐慌:計時器未初始化

  2. timerWaiting     -> timerWaiting

  3. timerWaiting     -> timerRunning -> timerNoStatus

  4. timerWaiting     -> timerRunning -> timerWaiting

  5. timerModifying   -> 等待狀態改變,纔可以進行下一步

  6. timerModifiedXX  -> timerMoving -> timerWaiting

  7. timerDeleted     -> timerRemoving -> timerRemoved

  8. timerRunning     -> 恐慌:併發調用

  9. timerRemoved     -> 恐慌:計時器堆不一致

  10. timerRemoving   -> 恐慌:計時器堆不一致

  11. timerMoving     -> 恐慌:計時器堆不一致

我們再根據時間狀態機,去針對性的看看源碼是如何實現的:

 1func runtimer(pp *p, now int64) int64 {
 2 for {
 3  t := pp.timers[0]
 4  switch s := atomic.Load(&t.status); s {
 5  case timerWaiting:
 6   if t.when > now {
 7    return t.when
 8   }
 9
10   runOneTimer(pp, t, now)
11   return 0
12
13  case timerDeleted:
14      ...
15  case timerModifiedEarlier, timerModifiedLater:
16      ...
17  case timerModifying:
18   osyield()
19  case timerNoStatus, timerRemoved:
20   badTimer()
21  case timerRunning, timerRemoving, timerMoving:
22   badTimer()
23  default:
24   badTimer()
25  }
26 }
27}
28
29

我們主要關注運行計時器,也就是 timerWaiting 狀態下的處理,其首先會對觸發時間(when)進行判定,若大於當前時間則直接返回(因爲所需觸發的時間未到)。否則將會調用 runOneTimer 方法去執行本次觸發:

 1func runOneTimer(pp *p, t *timer, now int64) {
 2 f := t.f
 3 arg := t.arg
 4 seq := t.seq
 5
 6 if t.period > 0 {
 7  delta := t.when - now
 8  t.when += t.period * (1 + -delta/t.period)
 9  siftdownTimer(pp.timers, 0)
10  if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
11   badTimer()
12  }
13  updateTimer0When(pp)
14 } else {
15  dodeltimer0(pp)
16 }
17
18 unlock(&pp.timersLock)
19 f(arg, seq)
20 lock(&pp.timersLock)
21}
22
23

在完成計時器的運行屬性更新後,上互斥鎖,調用計時器的回調方法 f,完成本次完整的觸發流程。

總結

Go 語言的 Timer 其實已經改過了好幾版,在 Go1.14 的正式大改版後。目前來看已經初步的到了一個新的階段。其設計的模式主要圍繞三塊:


本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/gxX-q2EvgWZEWe-deRITSw