難以駕馭的 Go timer,一文帶你參透計時器的奧祕
在實際的應用工程中,我們常常會需要多久後,或定時去做某個事情。甚至在分析標準庫 context 的父子級傳播時,都能見到等待多久後自動觸發取消事件的蹤影。
而在 Go 語言中,能夠完成這類運行的功能訴求就是標準庫 time,在具體的功能範疇上我們稱其爲 “計時器 “,是一個非常具有價值的一個模塊。在這篇文章中我們將對其做進一步的分析和研討。
什麼是 timer
可以控制時間,確保應用程序中的某段代碼在某個時刻運行。在 Go 語言中可以單次執行,也可以循環執行。
最常見的方式就是引用標準庫 time
去做一些事情,普通開發者經常使用到的標準庫代碼是:
1time.Now().Unix()
2
3
上述代碼可用於獲取當前時間的 Unix
時間戳,而在內部的具體實現上提供了 Time
、Timer
以及 Ticker
的各類配套方法。
timer 基本特性
Timer
演示代碼:
1func main() {
2 timer := time.NewTimer(2 * time.Second)
3 <-timer.C
4 fmt.Println("我的腦子真的進煎魚了!")
5}
6
7
輸出結果:
1// 等待兩秒...
2我的腦子真的進煎魚了!
3
4
我們可以通過 time.NewTimer
方法定時在 2 秒進行程序的執行。而其還有個變種的用法,在做 channel 的源碼剖析時有發現
1func main() {
2 v := make(chan struct{})
3 timer := time.AfterFunc(2*time.Second, func() {
4 fmt.Println("我想在這個點喫煎魚!")
5 v <- struct{}{}
6 })
7 defer timer.Stop()
8 <-v
9}
10
11
在等待 2 秒後,會立即調用 time.AfterFunc
所對應的匿名方法。在時間上我們也可以指定對應的具體時間,達到異步的定時執行等訴求。
Ticker
演示代碼:
1func main() {
2 ticker := time.NewTicker(time.Second)
3 defer ticker.Stop()
4 done := make(chan bool)
5 go func() {
6 time.Sleep(10 * time.Second)
7 done <- true
8 }()
9 for {
10 select {
11 case <-done:
12 fmt.Println("Done!")
13 return
14 case t := <-ticker.C:
15 fmt.Println("炸煎魚: ", t.Unix())
16 }
17 }
18}
19
20
輸出結果:
1// 每隔一秒輸出一次
2炸煎魚: 1611666168
3炸煎魚: 1611666169
4炸煎魚: 1611666170
5炸煎魚: 1611666171
6...
7
8
我們通過 time.NewTicker
方法設定每 1 秒執行一次方法,因此在 for-select
中,我們會每 1 秒就可以自動 “炸一條煎魚”,真是快樂極了。
而由於我們在 goroutine 中通過 sleep
方法的設定了 done
變量的輸入,因此在 10 秒後就會結束炸煎魚的循環輸出,最終退出。
最小堆:四叉堆
在 Go 語言中,內置計時器的數據結構都會涉及到最小四叉堆,如下圖所示:
整體來講就是父節點一定比其子節點小,子節點之間沒有任何關係和大小的要求。
數據結構
在 Go 語言中每個計時器運行時的基本單元是 runtime.timer
:
1type timer struct {
2 pp puintptr
3
4 when int64
5 period int64
6 f func(interface{}, uintptr)
7 arg interface{}
8 seq uintptr
9 nextwhen int64
10 status uint32
11}
12
13
-
pp:計時器所在的處理器 P 的指針地址。
-
when:計時器被喚醒的時間。
-
period:計時器再次被喚醒的時間(when+period)。
-
f:回調函數,每次在計時器被喚醒時都會調用。
-
arg:回調函數的參數,每次在計時器被喚醒時會將該參數項傳入回調函數
f
中。 -
seq:回調函數的參數,該參數僅在
netpoll
的應用場景下使用。 -
nextwhen:當計時器狀態爲 timerModifiedXX 時,將會使用
nextwhen
的值設置到where
字段上。 -
status:計時器的當前狀態值,計時器本身包含大量的枚舉標識,這塊會在後面介紹。
但這類基本單元都不會是對用戶端暴露的結構體,在對外上我們直觀見的最多的是 time.NewTimer
所創建的 Timer
結構體:
1type Timer struct {
2 C <-chan Time
3 r runtimeTimer
4}
5
6
-
C:用於接收
Timer
所觸發的事件,當計時器的消息事件(例如:到期)發生時,該 channel 會接收到通知。 -
r:與
runtime.timer
作用類似,內在屬性保持一致。
同時在計時器運行模式上自 Go1.14 起發生了變更,runtime.timer
改爲將每個 timer
均存儲在對應的處理器 P 中
1type p struct {
2 ...
3 timersLock mutex
4 timers []*timer
5 ...
6}
7
8
在處理器 P 上,timers
字段就是一個以最小四叉堆形式存儲的媒介。在時序上,需要立刻執行,或說需要越早執行的,就越排在堆的越上面:
實現原理
在瞭解了計時器的基本特性和數據結構後,我們進一步展開,一層層剖析其原理,看看其是何物。在 Go 語言中,計時器在運行時涉及十種狀態處理,分別涉及增、刪、改以及重置等操作。
計時器所包含的狀態如下:
這時候可能就會有小夥伴疑惑,各種啓動、刪除、停止、啓動是指代的是什麼意思?爲什麼會涉及到 P 的管理?
創建計時器
接下來我們依然是從 NewTimer
和 NewTicker
方法開始入手:
1func NewTimer(d Duration) *Timer {
2 c := make(chan Time, 1)
3 t := &Timer{
4 C: c,
5 r: runtimeTimer{
6 when: when(d),
7 f: sendTime,
8 arg: c,
9 },
10 }
11 startTimer(&t.r)
12 return t
13}
14
15
在該方法中,其主要包含如下動作:
-
創建
Timer
對象,主要是C
和r
屬性,含義與前面所表述的一致。 -
調用
startTimer
方法,啓動計時器。
NewTicker
方法與 NewTimer
類似,主要是增加了 period
字段:
1func NewTicker(d Duration) *Ticker {
2 c := make(chan Time, 1)
3 t := &Ticker{
4 C: c,
5 r: runtimeTimer{
6 when: when(d),
7 period: int64(d),
8 f: sendTime,
9 arg: c,
10 },
11 }
12 startTimer(&t.r)
13 return t
14}
15
16
在 Ticker
結構體中,period
字段用於表示計時器再次被喚醒的時間,可以便於做輪詢觸發。
啓動計時器
在前面調用 NewTimer
、NewTicker
方法時,會將新創建的新計時器 timer
加入到創建 timer 的 P 的最小堆中:
1func addtimer(t *timer) {
2 if t.when < 0 {
3 t.when = maxWhen
4 }
5 if t.status != timerNoStatus {
6 throw("addtimer called with initialized timer")
7 }
8 t.status = timerWaiting
9
10 when := t.when
11
12 pp := getg().m.p.ptr()
13 lock(&pp.timersLock)
14 cleantimers(pp)
15 doaddtimer(pp, t)
16 unlock(&pp.timersLock)
17
18 wakeNetPoller(when)
19}
20
21
-
檢查是否滿足基本條件:新增計時器的邊界處理,
timerNoStatus
狀態判斷排除。 -
調用
cleantimers
方法:清理處理器 P 中的計時器隊列,可以加快創建和刪除計時器的程序的速度。 -
調用
doaddtimer
方法:將當前所新創建的timer
新增到當前處理器 P 的堆中。 -
調用
wakeNetPoller
方法:喚醒網絡輪詢器中休眠的線程,檢查計時器被喚醒的時間(when)是否在當前輪詢預期運行的時間(pollerPollUntil)內,若是喚醒。
停止計時器
在計時器的運轉中,一般會調用 timer.Stop()
方法來停止 / 終止 / 刪除計時器。雖然說法多樣。但大家的真實目的是一樣的,就是讓這個 timer
從輪詢器中消失,也就是從處理器 P 的堆中移除 timer
:
1func deltimer(t *timer) bool {
2 for {
3 switch s := atomic.Load(&t.status); s {
4 case timerWaiting, timerModifiedLater:
5 // timerWaiting/timerModifiedLater -> timerDeleted
6 ...
7 case timerModifiedEarlier:
8 // timerModifiedEarlier -> timerModifying -> timerDeleted
9 ...
10 case timerDeleted, timerRemoving, timerRemoved:
11 // timerDeleted/timerRemoving/timerRemoved
12 return false
13 case timerRunning, timerMoving:
14 // timerRunning/timerMoving
15 osyield()
16 case timerNoStatus:
17 return false
18 case timerModifying:
19 osyield()
20 default:
21 badTimer()
22 }
23 }
24}
25
26
但移除也不是直接一個 delete
就完事的,其在真正的刪除方法 deltimer
中遵循了基本的規則處理:
-
timerWaiting/timerModifiedLater -> timerDeleted。
-
timerModifiedEarlier -> timerModifying -> timerDeleted。
-
timerDeleted/timerRemoving/timerRemoved -> 無需變更,已經滿足條件。
-
timerRunning/timerMoving/timerModifying -> 正在執行、移動中,無法停止,等待下一次狀態檢查再處理。
-
timerNoStatus -> 無法停止,不滿足條件。
上述五個基本流轉邏輯就覆蓋了 runtimer.deltimer 方法了,若有進一步需求的可通過傳送門詳細閱讀。
修改 / 重置計時器
在應用程序的調度中,有時候因爲邏輯產生了變更,我們需要重置計時器。這時候一般會調用 timer.Reset()
方法來重新設置 Duration
值。
其表面對應的是 resetTimer
方法,但實際與修改計時器的 modtimer
方法是共用的:
1func resettimer(t *timer, when int64) bool {
2 return modtimer(t, when, t.period, t.f, t.arg, t.seq)
3}
4
5
因此在這節中我們可以將重置和修改計時器放在一起分析。修改計時器,本質上是需要變更現有計時器,而在 Go 語言的計時器中是需要遵循基本規則,因此 modtimer
遵循下述規則處理:
-
timerWaiting -> timerModifying -> timerModifiedXX
-
timerModifiedXX -> timerModifying -> timerModifiedYY
-
timerNoStatus -> timerModifying -> timerWaiting
-
timerRemoved -> timerModifying -> timerWaiting
-
timerDeleted -> timerModifying -> timerModifiedXX
-
timerRunning -> 等待狀態改變,纔可以進行下一步
-
timerMoving -> 等待狀態改變,纔可以進行下一步
-
timerRemoving -> 等待狀態改變,纔可以進行下一步
-
timerModifying -> 等待狀態改變,纔可以進行下一步
1func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
2 ...
3 if wasRemoved {
4 t.when = when
5 pp := getg().m.p.ptr()
6 lock(&pp.timersLock)
7 doaddtimer(pp, t)
8 unlock(&pp.timersLock)
9
10 releasem(mp)
11 wakeNetPoller(when)
12 } else {
13 t.nextwhen = when
14 newStatus := uint32(timerModifiedLater)
15 if when < t.when {
16 newStatus = timerModifiedEarlier
17 }
18 ...
19 releasem(mp)
20
21 if newStatus == timerModifiedEarlier {
22 wakeNetPoller(when)
23 }
24 }
25
26 return pending
27}
28
29
在完成了計時器的狀態處理後,會分爲兩種情況處理:
-
待修改的計時器已經被刪除:由於既有的計時器已經沒有了,因此會調用
doaddtimer
方法創建一個新的計時器,並將原本的timer
屬性賦值過去,再調用wakeNetPoller
方法在預定時間喚醒網絡輪詢。 -
正常邏輯處理:如果修改後的計時器的觸發時間小於原本的觸發時間,則修改該計時器的狀態爲
timerModifiedEarlier
,並且調用wakeNetPoller
方法在預定時間喚醒網絡輪詢。
觸發計時器
在前面有提到 Go1.14 後,Go Timer 都已經歸屬到各個處理器 P 中去了,因此計時器的觸發分爲了兩個部分:
-
通過調度器在調度時進行計時器的觸發。
-
通過系統監控檢查並觸發計時器(到期未執行)。
調度器觸發
調度器的觸發一共分兩種情況,一種是在調度循環的時候調用 checkTimers
方法進行計時器的觸發:
1func schedule() {
2 _g_ := getg()
3
4top:
5 pp := _g_.m.p.ptr()
6 pp.preempt = false
7
8 // 處理調度時的計時器觸發
9 checkTimers(pp, 0)
10 ...
11
12 execute(gp, inheritTime)
13}
14
15
另外一種是當前處理器 P 沒有可執行的 Timer,且沒有可執行的 G。那麼按照調度模型,就會去竊取其他計時器和 G:
1func findrunnable() (gp *g, inheritTime bool) {
2 _g_ := getg()
3
4top:
5 _p_ := _g_.m.p.ptr()
6 ...
7 now, pollUntil, _ := checkTimers(_p_, 0)
8 ...
9}
10
11
調度系統在計時器處不深究,我們進一步剖析具體觸發計時器的 checkTimers
方法:
1func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
2 if atomic.Load(&pp.adjustTimers) == 0 {
3 next := int64(atomic.Load64(&pp.timer0When))
4 if next == 0 {
5 return now, 0, false
6 }
7 if now == 0 {
8 now = nanotime()
9 }
10 if now < next {
11 if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
12 return now, next, false
13 }
14 }
15 }
16
17 lock(&pp.timersLock)
18
19 adjusttimers(pp)
20 ...
21}
22
23
-
起始先通過
pp.adjustTimers
檢查當前處理器 P 中是否有需要處理的計時器。 -
若無需執行的計時器,則直接返回。
-
若有,則判斷下一個計時器待刪除的計時器和處理器 P 上的計時器數量,若前者小於後者 1/4 則直接返回。
-
確定需要處理計時器後,通過調用
adjusttimers
方法重新根據時間將timers
切片中timer
的先後順序重新排列(相當於 resort)。
1func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
2 ...
3 rnow = now
4 if len(pp.timers) > 0 {
5 if rnow == 0 {
6 rnow = nanotime()
7 }
8 for len(pp.timers) > 0 {
9 if tw := runtimer(pp, rnow); tw != 0 {
10 if tw > 0 {
11 pollUntil = tw
12 }
13 break
14 }
15 ran = true
16 }
17 }
18 ...
19}
20
21
在前面調整了 timers
切片中的最小堆的排序後,將會調用 runtimer
方法去真正運行所需要執行的 timer
,完成觸計時器的發。
1func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
2 ...
3 if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
4 clearDeletedTimers(pp)
5 }
6
7 unlock(&pp.timersLock)
8
9 return rnow, pollUntil, ran
10}
11
12
在最後掃尾階段,如果當前 G 的處理器與調用 checkTimers
方法所傳入的處理器一致,並且處理器中 timerDeleted
狀態的計時器數量是處理器 P 堆中的計時器的 1/4 以上,則調用 clearDeletedTimers
方法對已爲刪除狀態的的計時器進行清理。
系統監控觸發
即使是通過每次調度器調度和竊取的時候觸發,但畢竟是具有一定的隨機和不確定性。
因此係統監控觸發依然是一個兜底保障,在 Go 語言中 runtime.sysmon
方法承擔了這一個責任,存在觸發計時器的邏輯:
1func sysmon() {
2 ...
3 for {
4 ...
5 next, _ := timeSleepUntil()
6
7 if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
8 lock(&sched.lock)
9 if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
10 if next > now {
11 ...
12 next, _ = timeSleepUntil()
13 lock(&sched.lock)
14 atomic.Store(&sched.sysmonwait, 0)
15 noteclear(&sched.sysmonnote)
16 }
17 idle = 0
18 delay = 20
19 }
20 unlock(&sched.lock)
21 }
22 ...
23 }
24}
25
26
在每次進行系統監控時,都會在流程上調用 timeSleepUntil
方法去獲取下一個計時器應觸發的時間,以及保存該計時器已打開的計時器堆的 P。
在獲取完畢後會馬上檢查當前是否存在 GC,若是正在 STW 則獲取調度互斥鎖。若發現下一個計時器的觸發時間已經過去,則重新調用 timeSleepUntil
獲取下一個計時器的時間和相應 P 的地址。
1func sysmon() {
2 ...
3 for {
4 ...
5 lock(&sched.sysmonlock)
6 {
7 now1 := nanotime()
8 if now1-now > 50*1000 /* 50µs */ {
9 next, _ = timeSleepUntil()
10 }
11 now = now1
12 }
13 ...
14 }
15}
16
17
檢查 sched.sysmonlock
所花費的時間是否超過 50µs。若是,則有可能前面所獲取的下一個計時器觸發時間已過期,因此重新調用 timeSleepUntil
方法再次獲取。
1func sysmon() {
2 ...
3 for {
4 ...
5 lastpoll := int64(atomic.Load64(&sched.lastpoll))
6 if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
7 atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
8 list := netpoll(0) // non-blocking - returns list of goroutines
9 if !list.empty() {
10 incidlelocked(-1)
11 injectglist(&list)
12 incidlelocked(1)
13 }
14 }
15 if next < now {
16 startm(nil, false)
17 }
18 }
19}
20
21
如果發現超過 10ms 的時間沒有進行 netpoll
網絡輪詢,則主動調用 netpoll
方法觸發輪詢。
同時如果存在不可搶佔的處理器 P,則調用 startm
方法來運行那些應該運行,但沒有在運行的計時器。
運行計時器
runtimer
方法主要承擔計時器的具體運行,同時也會針對計時器的不同狀態(含刪除、修改、等待等)都進行了對應的處理,也相當於是個大的集中處理中樞了。例如在 timerDeleted
狀態下的計時器將會進行刪除。
其遵循下述規則處理
-
timerNoStatus -> 恐慌:計時器未初始化
-
timerWaiting -> timerWaiting
-
timerWaiting -> timerRunning -> timerNoStatus
-
timerWaiting -> timerRunning -> timerWaiting
-
timerModifying -> 等待狀態改變,纔可以進行下一步
-
timerModifiedXX -> timerMoving -> timerWaiting
-
timerDeleted -> timerRemoving -> timerRemoved
-
timerRunning -> 恐慌:併發調用
-
timerRemoved -> 恐慌:計時器堆不一致
-
timerRemoving -> 恐慌:計時器堆不一致
-
timerMoving -> 恐慌:計時器堆不一致
我們再根據時間狀態機,去針對性的看看源碼是如何實現的:
1func runtimer(pp *p, now int64) int64 {
2 for {
3 t := pp.timers[0]
4 switch s := atomic.Load(&t.status); s {
5 case timerWaiting:
6 if t.when > now {
7 return t.when
8 }
9
10 runOneTimer(pp, t, now)
11 return 0
12
13 case timerDeleted:
14 ...
15 case timerModifiedEarlier, timerModifiedLater:
16 ...
17 case timerModifying:
18 osyield()
19 case timerNoStatus, timerRemoved:
20 badTimer()
21 case timerRunning, timerRemoving, timerMoving:
22 badTimer()
23 default:
24 badTimer()
25 }
26 }
27}
28
29
我們主要關注運行計時器,也就是 timerWaiting
狀態下的處理,其首先會對觸發時間(when)進行判定,若大於當前時間則直接返回(因爲所需觸發的時間未到)。否則將會調用 runOneTimer
方法去執行本次觸發:
1func runOneTimer(pp *p, t *timer, now int64) {
2 f := t.f
3 arg := t.arg
4 seq := t.seq
5
6 if t.period > 0 {
7 delta := t.when - now
8 t.when += t.period * (1 + -delta/t.period)
9 siftdownTimer(pp.timers, 0)
10 if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
11 badTimer()
12 }
13 updateTimer0When(pp)
14 } else {
15 dodeltimer0(pp)
16 }
17
18 unlock(&pp.timersLock)
19 f(arg, seq)
20 lock(&pp.timersLock)
21}
22
23
-
如果
period
大於 0,說明當前是 ticker,需要再次觸發,因此還需要調整計時器的狀態。 -
重新計算下一次的觸發時間,並且更新其在最小堆的位置。
-
調用
atomic.Cas
方法該計時器的狀態從timerRunning
原子修改爲timerWaiting
狀態。 -
調用
updateTimer0When
方法設置處理器 P 的timer0When
字段。 -
如果
period
等於 0,說明當前是 timer,只需要單次觸發就可以了。
在完成計時器的運行屬性更新後,上互斥鎖,調用計時器的回調方法 f
,完成本次完整的觸發流程。
總結
Go 語言的 Timer 其實已經改過了好幾版,在 Go1.14 的正式大改版後。目前來看已經初步的到了一個新的階段。其設計的模式主要圍繞三塊:
-
在各個處理器 P 中,Timer 以最小四叉堆的存儲方式在 timers 中。
-
在調度器的每輪調度中都會對計時器進行觸發和檢查。
-
在系統監聽上
netpoll
會定時進行計時器的觸發和檢查。 -
在計時器的處理中,十個狀態的流轉和對應處理非常重要。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/gxX-q2EvgWZEWe-deRITSw