Golang 定時器實現原理剖析
時間對於我們來說都不陌生,每時每刻都能感受到它的存在。時間又是一個抽象的概念,看不見摸不着。在我們編寫程序的時候,對時間的使用非常頻繁。本文講述 Go 中時間相關函數的使用和實現原理,時間相關的源碼在 src 下的 time 包和 runtime 包,下面的分析基於的 Go 的 1.14 版本。
常用的 API
獲取當前的時間戳
time.Now().Unix() 返回自從 1970 年 1 月 1 日以來到現在的秒數。它不依賴地理位置時區。
func main() {
fmt.Println(time.Now().Unix())
}
當前時間字符串表示
time.Now().Format("2006-01-02 15:04:05") 將當前的時間按照 Format 中的格式以字符串的形式顯示。下面實例會將當前的時間以 YYYY-MM-DD hh:mm:ss 格式顯示。
fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
date 轉時間戳
time.Date 將一個日期時間轉成 time.Time 類型,通過 Time 提供的 Unix()函數可以知道當前的時間戳
currentTime := time.Date(2021, 6, 6, 10, 18, 11, 0, time.Local)
fmt.Println(currentTime.Unix())
// 1622945891
time.Parse
time.Parse 將時間格式的字符串轉成 tim.Time
currentTime, _ = time.Parse("2006-01-02 15:04:05", "2021-06-06 10:24:40")
fmt.Println(currentTime.Unix())
// 1622975080
前面已經初步簡要介紹了幾個 time 常見的 API. 下面介紹定時器的內容,定時器與時間息息相關。在 Go 中,定時器並不是通過 sigalarm signal 實現的,而是通過堆(四叉堆)實現的。
定時器 API
下面介紹 time 包提供定時器相關的 API, 具體列舉如下:
t:=time.Tick(time.Second)
<-t
t=time.After(time.Second)
<-t
timer:=time.NewTimer(time.Second)
<-timer.C
ticker:=time.NewTicker(time.Second)
<-ticker.C
timer=time.AfterFunc(time.Second,func() {})
實現原理
先來看看 Timer 的結構,它包含一個 Time 類型的 Chan 一個 runtimeTimer 類型,C 是長度爲 1 的通道。r 是 timer 的真正實現結構體,在 runtime 包中也有一個 timer 定義,與這裏的 runtimeTimer 結構是一模一樣的。timer 的真正實現邏輯是在 runtime 包處理的。runtimeTimer 結構包含一個定時器觸發的時間點 when, 執行 P 的地址 pp, timer 會被掛在 P 的結構中的 timers 中,pp 執行 P 可以很方便的從定時器找到他所掛的 P. f 時間觸發之後要執行的邏輯功能,arg 是傳遞給執行函數 f 的參數。period 是下一次被觸發的時間,即兩次觸發之間的間隔時間。
// Timer Timer結構體,包含一個Time類型的chan和一個runtimeTimer結構體
// chan用來在時間達到後,發送一個通知給調用方
type Timer struct {
C <-chan Time
r runtimeTimer
}
// runTimer 是timer的真正實現結構體,runtime包中的timer定義與這裏是一致的
// timer真正實現的相關操作是在runtime中完成的
type runtimeTimer struct {
// 指向P(GMP模型中的處理器)的地址
pp uintptr
// 定時器在什麼時候觸發
when int64
// 下一次觸發的時間,即兩次觸發之間的間隔時間
period int64
// 時間觸發後執行的函數處理
f func(interface{}, uintptr) // NOTE: must not be closure
// 傳遞給處理函數f的參數
arg interface{}
// seq看源碼中並沒有使用
seq uintptr
// 當timer被修改時,timer位於修改狀態下,下一次觸發時間
nextwhen int64
// 定時timer的狀態
status uint32
}
下面看 timer 是如何創建的,time 包提供了構造方法 NewTimer 創建一個 * timer 類型的對象,time 類型的通道會初始爲 1,填充觸發時間點 when 和執行函數 f 及參數。然後執行 startTimer 啓動定時器,startTime 真正實現在 runtime 包中的 time.go 中
// 創建一個定時器timer結構,可以在d duration後從timer的通道中讀取通知信息
func NewTimer(d Duration) *Timer {
// 定義一個有緩衝的Time類型的Chan,緩衝區的大小爲1
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
// when是觸發時間,當前時間+d
when: when(d),
// 觸發時執行的操作,這裏是向c中發送當前時間
f: sendTime,
// 當f被執行的時候,傳遞給f的參數,這裏是Chan c
arg: c,
},
}
// 啓動定時器,startTimer真正實現在runtime包中的time.go中
startTimer(&t.r)
return t
}
// when 計算一個過期時間值,例如3秒後過期,將返回當前時間+3s後的時間值
// 如果傳入的時間值<=0
func when(d Duration) int64 {
if d <= 0 {
return runtimeNano()
}
// t是一個精確到納秒的時間
t := runtimeNano() + int64(d)
if t < 0 {
t = 1<<63 - 1 // math.MaxInt64
}
return t
}
time 包中的 startTimer 會鏈接到下面的處理函數,startTimer 真正處理是在 addtimer 函數中,繼續看 addtimer 具體做了什麼。
-
會設置 t 的狀態爲 timerWaiting
-
獲取當前運行的 P, 對 P 中的定時器執行 cleantimers
-
將 t 加入到 P 中
-
喚醒 netPoller 中休眠的線程
// time包中的startTimer會鏈接到這裏,真正處理的是addtimer函數
func startTimer(t *timer) {
if raceenabled {
racerelease(unsafe.Pointer(t))
}
addtimer(t)
}
// addtimer 將一個timer添加到P中,該函數只在新建一個timer的時候被調用
//
func addtimer(t *timer) {
// when不能爲負數,否則可能會導致增量計算時溢出,導致runtime中的timer永遠不會過期
if t.when < 0 {
t.when = maxWhen
}
// 剛添加的timer,走到這裏還未初始化狀態,如果狀態不是timerNoStatus說明出問題了
if t.status != timerNoStatus {
throw("addtimer called with initialized timer")
}
// 設置timer的狀態爲timerWaiting
t.status = timerWaiting
when := t.when
// pp是當前調度G的P
pp := getg().m.p.ptr()
lock(&pp.timersLock)
// 清理掉pp中timers隊列中頭部處於timerDeleted、timerModifiedEarlier、
// timerModifiedLater狀態的timer
cleantimers(pp)
// 將定時器t添加到P中的定時器切片中保存起來,t和P進行了雙向綁定
// 即t.pp指向了P,P對象中的timers維護了所有的timers
doaddtimer(pp, t)
unlock(&pp.timersLock)
// 喚醒netPoller中休眠的線程
wakeNetPoller(when)
}
接下進一步分析上面 addtimer 處理邏輯中的幾個重要調用函數。cleantimers 會清理 P 中維護的定時器隊列中頭部處於刪除狀態、修改爲更早、更晚觸發的定時器。
// cleantimers 清理P中維護的定時器隊列中頭部處於刪除狀態、修改爲更早、更晚觸發的定時器
func cleantimers(pp *p) {
for {
if len(pp.timers) == 0 {
return
}
t := pp.timers[0]
if t.pp.ptr() != pp {
throw("cleantimers: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerDeleted:
// 將t的狀態從timerDeleted修改爲timerRemoving
if !atomic.Cas(&t.status, s, timerRemoving) {
continue
}
// 將定時器t從pp的隊列中刪除
dodeltimer0(pp)
// 將t的狀態從timerRemoving修改爲timerRemoved
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
// 將pp中的需要刪除的定時器數量減少一個
atomic.Xadd(&pp.deletedTimers, -1)
case timerModifiedEarlier, timerModifiedLater:
// 將t的狀態從timerModifiedEarlier/timerModifiedLater修改爲timerMoving狀態
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
// Now we can change the when field.
// 因爲時間已經調整,將調整後的時間nextwhen給when,成爲下次觸發時間
t.when = t.nextwhen
// Move t to the right position.
// 通過先刪除後添加的方式將t放到正確的位置
dodeltimer0(pp)
doaddtimer(pp, t)
// 記錄向前調整的定時器數量減少1個
if s == timerModifiedEarlier {
atomic.Xadd(&pp.adjustTimers, -1)
}
// 將t的狀態從timeMoving修改爲timerWaiting
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
default:
// Head of timers does not need adjustment.
// 其他狀態的定時器不需要調整,直接返回
return
}
}
}
doaddtimer 將定時器 t 加入到 P 的 tiemrs 切片中,然後對 P 的 timers 進行調整,要調整成四叉堆的結構,調節的順序是按 t.when 即觸發時間,處於堆頂的元素是最早要觸發的定時器。
// doaddtimer將定時器t加入到P的堆中
func doaddtimer(pp *p, t *timer) {
// Timers rely on the network poller, so make sure the poller
// has started.
// timer依賴與網絡輪詢器的調度,所以添加之前要確保輪詢器poller已經開始
if netpollInited == 0 {
// 初始化輪詢器
netpollGenericInit()
}
// 還沒加入到pp中,這時的t是還沒綁定到P的,它的pp是默認值0
if t.pp != 0 {
throw("doaddtimer: P already set in timer")
}
// 將定時器t綁定到P上
t.pp.set(pp)
i := len(pp.timers)
// 將當前的定時器t加入到P中timer中
pp.timers = append(pp.timers, t)
// 將剛加入的定時器t調整到四叉堆的合適位置
siftupTimer(pp.timers, i)
// 如果t是最早觸發的定時器,將觸發的時間保存到pp的time0When中
if t == pp.timers[0] {
atomic.Store64(&pp.timer0When, uint64(t.when))
}
// 將P中記錄定時器數量的變量numTimers加1
atomic.Xadd(&pp.numTimers, 1)
}
wakeNetPoller 喚醒正在 netpoll 休眠的線程,喚醒的條件有兩個:
-
when 的值小於 pollUntil 時間
-
pollUntil 爲 0 timer 爲什麼需要喚醒 netpoll,跟 netpoll 有什麼關係呢?在 1.14 版中將 timer 和 netpoll 統一了起來,不管是網絡輪休還是定時器都是某個條件滿足了,對於網絡來說是有數據包了,對定時器來說是時間到了。netpoll 中的 poll_runtime_pollSetDeadline 函數用來設置 pollDesc 結構體的 deadline 相關字段,可以爲每個 fd 設置讀超時以及寫超時。其原理就是爲 pollDesc 結構體添加一個 timer,timer 的 f 函數設置爲 netpolldeadlineimpl 函數。netpolldeadlineimpl 函數會運行 pollDesc 結構體的協程,即使 pollDesc 中的事件沒有被 epoll 觸發,因爲 deadline 到了。
// wakeNetPoller 喚醒正在netpoll休眠的線程, 喚醒的條件是when的值小於
// pollUntil的時間,或者pollUntil爲0
func wakeNetPoller(when int64) {
if atomic.Load64(&sched.lastpoll) == 0 {
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
if pollerPollUntil == 0 || pollerPollUntil > when {
netpollBreak()
}
}
}
netpollBreak 進行喚醒操作,實現方法是往管道 netpollBreakWr 寫數據,這樣 netpoll 自然會被喚醒。
// netpollBreak 進行喚醒操作,實現方式是,netpollBreakWr是一個管道,用
// write給netpollBreakWr寫數據,這樣netpoll自然會被喚醒
func netpollBreak() {
for {
var b byte
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
if n == 1 || n == -_EAGAIN {
break
}
if n == -_EINTR {
continue
}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
}
}
下面看 Stop 定時器,執行了哪些操作. time 包中 Stop 操作,核心調的是 stopTimer 函數,該函數在 runtime 包中實現。
// Stop 停止定時器接口,阻止定時器被觸發。如果定時器已經過期或者被停止掉,次函數將返回
// false,否則返回true, 調用Stop並不會關閉定時器中的Chan,以防止通道不能成功的讀取
// Stop不能併發的執行, 對於通過AfterFunc創建的timer, 如果定時器已經到期並且f在它自己的goroutine中運行
// 調用Stop將返回false. 如果調用者想知道f是否執行完畢,需要自己實現與f進行協調
func (t *Timer) Stop() bool {
if t.r.f == nil {
panic("time: Stop called on uninitialized Timer")
}
return stopTimer(&t.r)
}
stopTimer 處理邏輯調用的是 deltimer 函數,根據 t 的狀態會做不同的處理,如果定時器處於 timerWaiting/timerModifiedLater/timerModifiedEarlier 狀態,先將其狀態修改爲 timerModifying 狀態,最後修改爲 timerDeleted 狀態, 如果定時器處於其他狀態,待狀態改變或直接返回。deltimer 函數並不是直接刪除定時器,而是將其狀態標記爲刪除狀態,是爲了防止併發衝突。真正執行刪除的邏輯是在各個 P 上完成的。
// 停止掉定時器
func stopTimer(t *timer) bool {
return deltimer(t)
}
// deltimer 刪除定時器,將定時器t從P上刪除。注意這裏的刪除並不是真正從P移除了,而是
// 將t的狀態標記爲刪除,是爲了防止併發衝突。然後每個P真正刪除邏輯會將自己上面的標記爲
// 刪除狀態的定時器刪除
func deltimer(t *timer) bool {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting, timerModifiedLater:
// 對處於等待狀態,延遲觸發的定時器將其狀態標記爲timerModifying
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
// t的pp字段指向了P的位置,可以方便的從t知道它被掛在哪個P下
tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
// 對P進行操作,將記錄刪除定時器的數量減1
atomic.Xadd(&tpp.deletedTimers, 1)
// Timer was not yet run.
// 因爲定時器還未被觸發,返回true
return true
} else {
releasem(mp)
}
case timerModifiedEarlier:
// 定時器處於提前觸發狀態,將其狀態修改爲timerModifying
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
tpp := t.pp.ptr()
// 定時器的狀態將被修改爲timerDeleted,所以維護調整狀態的定時器數量-1
atomic.Xadd(&tpp.adjustTimers, -1)
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
// 對P進行操作,將記錄刪除定時器的數量減1
atomic.Xadd(&tpp.deletedTimers, 1)
// Timer was not yet run.
// 因爲定時器還未被觸發,返回true
return true
} else {
releasem(mp)
}
case timerDeleted, timerRemoving, timerRemoved:
// Timer was already run.
// 定時器已經處於移除狀態了,返回false處理,這種情況在重複調用t.Stop會發生
return false
case timerRunning, timerMoving:
// 定時器已經被觸發或正在移動狀態,讓出當前的處理
osyield()
case timerNoStatus:
// 定時器還未添加或已經運行完了,返回false
return false
case timerModifying:
// 定時器處於修改狀態,併發的調用deltimer和modtimer會可能會走到這裏,讓出當前處理
osyield()
default:
badTimer()
}
}
}
定時器 Reset 操作,用於將一個已有定時器的觸發時間進行修改,可以往前修改也可以往後修改。Reset 應該在定時器停止之後或過期之後並且通道數據取走之後被調用,如果程序已經從 t.C 中取走了值,定時器已經到期,可以直接執行 t.Reset 操作,如果程序還未從 t.C 中取走值,在調用 t.Reset 之前必須先停止掉定時器,並將通道的數據取乾淨。
// Reset應該在定時器停止之後或過期之後並且通道數據取走之後被調用
// 如果程序已經從t.C中取走了值,定時器已經到期,可以直接執行t.Reset操作
// 如果程序還未從t.C中取走值,在調用t.Reset之前必須先停止掉定時器,並將通道的數據取乾淨
func (t *Timer) Reset(d Duration) bool {
if t.r.f == nil {
panic("time: Reset called on uninitialized Timer")
}
w := when(d)
active := stopTimer(&t.r)
resetTimer(&t.r, w)
return active
}
resettimer 完成 Reset 的真正處理邏輯,如果一個定時器處於不活動狀態,調用 resettimer 將變成激活狀態。如果一個定時器已經被使用或者可能被使用,應該調用 resettimer 而不是 addtimer.
// resettimer 重置一個定時器的觸發時間,如果一個定時器處於不活動狀態,調用resettimer
// 將變成活動狀態。如果一個定時器已經被使用或者可能被使用,應該調用resettimer而不是addtimer
func resettimer(t *timer, when int64) {
modtimer(t, when, t.period, t.f, t.arg, t.seq)
}
// modtimer 修改已存在定時器的觸發時間
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) {
if when < 0 {
when = maxWhen
}
status := uint32(timerNoStatus)
wasRemoved := false
var mp *m
loop:
for {
switch status = atomic.Load(&t.status); status {
case timerWaiting, timerModifiedEarlier, timerModifiedLater:
// 將定時器修改爲正在修改狀態
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
break loop
}
releasem(mp)
case timerNoStatus, timerRemoved:
mp = acquirem()
// 將定時器修改爲正在修改狀態
if atomic.Cas(&t.status, status, timerModifying) {
wasRemoved = true
break loop
}
releasem(mp)
case timerDeleted:
// 將定時器修改爲正在修改狀態
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
atomic.Xadd(&t.pp.ptr().deletedTimers, -1)
break loop
}
releasem(mp)
case timerRunning, timerRemoving, timerMoving:
// 定時器t正在運行、正在被移除、正在移動狀態,讓出當前處理
osyield()
case timerModifying:
// 爲啥不跟上面的合併處理呢? 處於修改狀態的定時器,也讓出當前處理
osyield()
default:
badTimer()
}
}
// 更新t的period、執行函數、參數、seq
t.period = period
t.f = f
t.arg = arg
t.seq = seq
// 處於timerNoStatus, timerRemoved下的定時器已進不在P中,需要重新加入
if wasRemoved {
t.when = when
pp := getg().m.p.ptr()
lock(&pp.timersLock)
// 將t加入到pp中
doaddtimer(pp, t)
unlock(&pp.timersLock)
if !atomic.Cas(&t.status, timerModifying, timerWaiting) {
badTimer()
}
releasem(mp)
// 喚醒net poller進行處理
wakeNetPoller(when)
} else {
// when不能直接給t.when, 此時的t還在某個P中,如果直接修改將導致無序,
// 所以這裏將when給t.nextwhen,讓各自的P在處理的時候將nextwhen的值
// 賦值給when
t.nextwhen = when
// newStatus記錄本次時間調整對對應的狀態,是提前還是延遲
newStatus := uint32(timerModifiedLater)
if when < t.when {
newStatus = timerModifiedEarlier
}
// 更新adjust的值,如果是將定時器從timerModifiedEarlier移除,
// adjust-1 ,如果是將定時器調整到timerModifiedEarlier,
// adjust+1
adjust := int32(0)
if status == timerModifiedEarlier {
adjust--
}
if newStatus == timerModifiedEarlier {
adjust++
}
if adjust != 0 {
atomic.Xadd(&t.pp.ptr().adjustTimers, adjust)
}
// 將定時器的狀態設置爲新的狀態
if !atomic.Cas(&t.status, timerModifying, newStatus) {
badTimer()
}
releasem(mp)
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
}
}
timer 是如何被觸發運行的?有兩種方式會觸發 timer 運行.
-
第一種是在調度循環中直接檢查是否有滿足定時器直接觸發
-
第二種是 Go 的後臺監控中會定時檢查是否有定時器需要觸發。在調度循環中觸發定時器的函數有 2 個函數,schedule 和 findrunnable。下面從這兩個函數中抽取出於定時器處理有關的邏輯。schedule 中會調用 checkTimers, checkTimers 會對需要進行調整的 timer 進行調整,如果沒有需要執行的定時器,直接返回,如果下一個要執行的定時器 timer 沒有到期並且需要刪除的定時器佔整個定時器的比例小於 1/4 也會直接返回。之後調用 adjusttimers 進行定時器的調整,調整成四叉堆的結構,在調用 runtimer 查找堆中是否存在需要執行的 timer。最後根據當前 goroutine 的 P 和傳入的 P 相同,並且需要刪除的 timer 超過了 timer 列表數量的四分之一,那麼調用 clearDeletedTimers 清理需要刪除的 timer.
func schedule() {
...
checkTimers(pp, 0)
var gp *g
var inheritTime bool
...
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
...
}
// checkTimers檢查P中的定時器觸發時間是否已滿足
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
// If there are no timers to adjust, and the first timer on
// the heap is not yet ready to run, then there is nothing to do.
// 檢查是否有處於調整時間過程中的定時器,如果沒有會進入if邏輯。檢查最早被觸發的定時
// 器的運行時間,即pp.time0When,如果當前時間還沒達到最早定時器的觸發時間。檢查
// 待刪除的定時器數量是否小於等待所有定時器的數量的1/4, 如果是直接返回。如果不是
// 執行清理操作,將deleted狀態的定時器刪除
if atomic.Load(&pp.adjustTimers) == 0 {
next := int64(atomic.Load64(&pp.timer0When))
if next == 0 {
return now, 0, false
}
if now == 0 {
now = nanotime()
}
if now < next {
if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
return now, next, false
}
}
}
lock(&pp.timersLock)
// 調整定時器
adjusttimers(pp)
rnow = now
if len(pp.timers) > 0 {
if rnow == 0 {
rnow = nanotime()
}
for len(pp.timers) > 0 {
// 運行定時器
if tw := runtimer(pp, rnow); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
// If this is the local P, and there are a lot of deleted timers,
// clear them out. We only do this for the local P to reduce
// lock contention on timersLock.
// P中待刪除的定時器數大於所有定時器數量的1/4,執行刪除操作
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
clearDeletedTimers(pp)
}
unlock(&pp.timersLock)
return rnow, pollUntil, ran
}
findrunnable 主要是竊取可運行的 G,在竊取前先會調用 checkTimers 檢查 P 中可執行的 timer,如果 netpoll 中有等待的 waiter,那麼會調用 netpoll 嘗試無阻塞的從 netpoller 獲取 Glist,如果獲取不到可執行的 G,那麼就會開始執行竊取。竊取的時候會調用 checkTimers 隨機從其他的 P 中獲取 timer,竊取完畢後也沒有可執行的 timer,那麼會繼續往下,休眠前再次檢查 netpoll 網絡,調用 netpoll 函數進行阻塞調用。
func findrunnable() (gp *g, inheritTime bool) {
...
// 從其他P中偷G到當前的處理P中
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
// 在循環的最後一輪,如果其他P運行隊列中沒有G,將從其他隊列的runnext中獲取
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
p2 := allp[enum.position()]
if _p_ == p2 {
continue
}
// 從其他的P的運行隊列中獲取一般的G到當前的隊列
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
return gp, false
}
...
// 如果運行隊列中沒有G,那麼從timers中獲取可執行的timer
if i > 2 && shouldStealTimers(p2) {
tnow, w, ran := checkTimers(p2, now)
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
...
}
監控線程 sysmon 會通過 timeSleepUntil 遍歷所有的 P 的 timer 列表,找到下一個需要執行的 timer, 如果超過 10ms 沒有 poll,則 poll 一下網絡, 如果有 timer 到期,這個時候直接啓動新的 M 處理 timer.
func sysmon() {
...
// poll network if not polled for more than 10ms
// 獲取上次poll 輪休的時間
lastpoll := int64(atomic.Load64(&sched.lastpoll))
// 如果上次輪詢 network距離現在已經超過了10ms,則輪詢一下網絡
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
// 非阻塞,返回G列表
list := netpoll(0)
// 如果G列表非空,將獲取到的G列表插入到空閒的P中或全局列表中
if !list.empty() {
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
// 如果有定時器到期,啓動新的M處理定時器
if next < now {
startm(nil, false)
}
...
}
總結
上述分析的是 1.14 版的定時器實現原理,在 1.14 版本之前,定時器的實現方法與上面不太一樣。在之前的版本,維護了一個桶結構,桶的大小爲 64,申請的每個定時器會分配到這 64 個桶中的某個桶上。然後對每個桶進行調度處理裏面定時器任務。在 1.14 版本將 timer 列表直接掛到了 P 上面,這不僅減少了上下文切換帶來的性能損耗,也減少了在鎖之間的爭搶問題。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/_k--ShmXUQKmEF2TJQd2yg