Go 每日一庫之 ants(源碼賞析)
簡介
繼上一篇 Go 每日一庫之 ants,這篇文章我們來一起看看ants
的源碼。
通過上篇文章,我們知道ants
池有兩種創建方式:
-
p, _ := ants.NewPool(cap)
:這種方式創建的池子對象需要調用p.Submit(task)
提交任務,任務是一個無參數無返回值的函數; -
p, _ := ants.NewPoolWithFunc(cap, func(interface{}))
:這種方式創建的池子對象需要指定池函數,並且使用p.Invoke(arg)
調用池函數。arg
就是傳給池函數func(interface{})
的參數。
在ants
中這兩種池子使用不同的結構來表示:ants.Pool
和ants.PoolWithFunc
。我們先來介紹Pool
。PoolWithFunc
結構也是類似的,介紹完Pool
之後,我們再簡單比較一下它們。
Pool
結構定義在文件pool.go
中:
// src/github.com/panjf2000/ants/pool.go
type Pool struct {
capacity int32
running int32
workers workerArray
state int32
lock sync.Locker
cond *sync.Cond
workerCache sync.Pool
blockingNum int
options *Options
}
各個字段含義如下:
-
capacity
:池容量,表示ants
最多能創建的 goroutine 數量。如果爲負數,表示容量無限制; -
running
:已經創建的 worker goroutine 的數量; -
workers
:存放一組 worker 對象,workerArray
只是一個接口,表示一個 worker 容器,後面詳述; -
state
:記錄池子當前的狀態,是否已關閉(CLOSED
); -
lock
:鎖。ants
自己實現了一個自旋鎖。用於同步併發操作; -
cond
:條件變量。處理任務等待和喚醒; -
workerCache
:使用sync.Pool
對象池管理和創建worker
對象,提升性能; -
blockingNum
:阻塞等待的任務數量; -
options
:選項。上一篇文章已經詳細介紹過了。
這裏明確一個概念,ants
中爲每個任務都是由 worker 對象來處理的,每個 worker 對象會對應創建一個 goroutine 來處理任務。ants
中使用goWorker
表示 worker:
// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
pool *Pool
task chan func()
recycleTime time.Time
}
後文詳細介紹這一塊內容,現在我們只需要知道Pool.workers
字段就是存放goWorker
對象的容器。
Pool
創建
創建Pool
對象需調用ants.NewPool(size, options)
函數。省略了一些處理選項的代碼,最終代碼如下:
// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ...Option) (*Pool, error) {
// ...
p := &Pool{
capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
}
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerArray(loopQueueType, size)
} else {
p.workers = newWorkerArray(stackType, 0)
}
p.cond = sync.NewCond(p.lock)
go p.purgePeriodically()
return p, nil
}
代碼不難理解:
-
創建
Pool
對象,設置容量,創建一個自旋鎖來初始化lock
字段,設置選項; -
設置
workerCache
這個sync.Pool
對象的New
方法,在調用sync.Pool
對象的Get()
方法時,如果它沒有緩存的 worker 對象了,則調用這個方法創建一個; -
根據是否設置了預分配選項,創建不同類型的 workers;
-
使用
p.lock
鎖創建一個條件變量; -
最後啓動一個 goroutine 用於定期清理過期的 worker。
Pool.workers
字段爲workerArray
類型,這實際上是一個接口,表示一個 worker 容器:
type workerArray interface {
len() int
isEmpty() bool
insert(worker *goWorker) error
detach() *goWorker
retrieveExpiry(duration time.Duration) []*goWorker
reset()
}
每個方法從名字上很好理解含義:
-
len() int
:worker 數量; -
isEmpty() bool
:worker 數量是否爲 0; -
insert(worker *goWorker) error
:goroutine 任務執行結束後,將相應的 worker 放回workerArray
中; -
detach() *goWorker
:從workerArray
中取出一個 worker; -
retrieveExpiry(duration time.Duration) []*goWorker
:取出所有的過期 worker; -
reset()
:重置容器。
workerArray
在ants
中有兩種實現,即workerStack
和loopQueue
。
workerStack
我們先來介紹一下workerStack
,它位於文件worker_stack.go
中:
// src/github.com/panjf2000/ants/worker_stack.go
type workerStack struct {
items []*goWorker
expiry []*goWorker
size int
}
func newWorkerStack(size int) *workerStack {
return &workerStack{
items: make([]*goWorker, 0, size),
size: size,
}
}
-
items
:空閒的worker
; -
expiry
:過期的worker
。
goroutine 完成任務之後,Pool
池會將相應的 worker 放回workerStack
,調用workerStack.insert()
直接append
到items
中即可:
func (wq *workerStack) insert(worker *goWorker) error {
wq.items = append(wq.items, worker)
return nil
}
新任務到來時,會調用workerStack.detach()
從容器中取出一個空閒的 worker:
func (wq *workerStack) detach() *goWorker {
l := wq.len()
if l == 0 {
return nil
}
w := wq.items[l-1]
wq.items[l-1] = nil // avoid memory leaks
wq.items = wq.items[:l-1]
return w
}
這裏總是返回最後一個 worker,每次insert()
也是append
到最後,符合棧後進先出的特點,故稱爲workerStack
。
這裏有一個細節,由於切片的底層結構是數組,只要有引用數組的指針,數組中的元素就不會釋放。這裏取出切片最後一個元素後,將對應數組元素的指針設置爲nil
,主動釋放這個引用。
上面說過新建Pool
對象時會創建一個 goroutine 定期檢查和清理過期的 worker。通過調用workerArray.retrieveExpiry()
獲取過期的 worker 列表。workerStack
實現如下:
func (wq *workerStack) retrieveExpiry(duration time.Duration) []*goWorker {
n := wq.len()
if n == 0 {
return nil
}
expiryTime := time.Now().Add(-duration)
index := wq.binarySearch(0, n-1, expiryTime)
wq.expiry = wq.expiry[:0]
if index != -1 {
wq.expiry = append(wq.expiry, wq.items[:index+1]...)
m := copy(wq.items, wq.items[index+1:])
for i := m; i < n; i++ {
wq.items[i] = nil
}
wq.items = wq.items[:m]
}
return wq.expiry
}
實現使用二分查找法找到已過期的最近一個 worker。由於過期時間是按照 goroutine 執行任務後的空閒時間計算的,而workerStack.insert()
入隊順序決定了,它們的過期時間是從早到晚的。所以可以使用二分查找:
func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
var mid int
for l <= r {
mid = (l + r) / 2
if expiryTime.Before(wq.items[mid].recycleTime) {
r = mid - 1
} else {
l = mid + 1
}
}
return r
}
二分查找的是最近過期的 worker,即將過期的 worker 的前一個。它和在它之前的 worker 已經全部過期了。
如果找到索引index
,將items
從開頭到index
(包括)的所有 worker 複製到expiry
字段中。然後將index
之後的所有未過期 worker 複製到切片頭部,這裏使用了copy
函數。copy
返回實際複製的數量,即未過期的 worker 數量m
。然後將切片items
從m
開始所有的元素置爲nil
,避免內存泄漏,因爲它們已經被複制到頭部了。最後裁剪items
切片,返回過期 worker 切片。
loopQueue
loopQueue
實現基於循環隊列,結構定義在文件worker_loop_queue
中:
type loopQueue struct {
items []*goWorker
expiry []*goWorker
head int
tail int
size int
isFull bool
}
func newWorkerLoopQueue(size int) *loopQueue {
return &loopQueue{
items: make([]*goWorker, size),
size: size,
}
}
由於是循環隊列,這裏先創建好了一個長度爲size
的切片。循環隊列有一個隊列頭指針head
,指向第一個有元素的位置,一個隊列尾指針tail
,指向下一個可以存放元素的位置。所以一開始狀態如下:
在tail
處添加元素,添加後tail
指針後移。在head
處取出元素,取出後head
指針也後移。進行一段時間操作後,隊列狀態如下:
head
或tail
指針到隊列尾了,需要回繞。所以可能出現這種情況:
當tail
指針趕上head
指針了,說明隊列就滿了:
當head
指針趕上tail
指針了,隊列再次爲空:
根據示意圖,我們再來看loopQueue
的操作方法就很簡單了。
func (wq *loopQueue) insert(worker *goWorker) error {
if wq.size == 0 {
return errQueueIsReleased
}
if wq.isFull {
return errQueueIsFull
}
wq.items[wq.tail] = worker
wq.tail++
if wq.tail == wq.size {
wq.tail = 0
}
if wq.tail == wq.head {
wq.isFull = true
}
return nil
}
func (wq *loopQueue) detach() *goWorker {
if wq.isEmpty() {
return nil
}
w := wq.items[wq.head]
wq.items[wq.head] = nil
wq.head++
if wq.head == wq.size {
wq.head = 0
}
wq.isFull = false
return w
}
再看Pool
創建
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerArray(loopQueueType, size)
} else {
p.workers = newWorkerArray(stackType, 0)
}
type arrayType int
const (
stackType arrayType = 1 << iota
loopQueueType
)
func newWorkerArray(aType arrayType, size int) workerArray {
switch aType {
case stackType:
return newWorkerStack(size)
case loopQueueType:
return newWorkerLoopQueue(size)
default:
return newWorkerStack(size)
}
}
即如果設置了預分配選項,就採用loopQueue
結構。否則就採用stack
的結構。
worker 結構
介紹完Pool
的創建和結構,我們來看看 worker 的結構。在ants
中 worker 用結構體goWorker
表示,定義在文件worker.go
中。它的結構非常簡單:
// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
pool *Pool
task chan func()
recycleTime time.Time
}
具體字段含義很明顯:
-
pool
:持有 goroutine 池的引用; -
task
:任務通道,通過這個通道將類型爲func ()
的函數作爲任務發送給goWorker
; -
recyleTime
:這個字段記錄goWorker
什麼時候被放回池中(即什麼時候開始空閒)。其完成任務後,在將其放回 goroutine 池的時候設置。
goWorker
創建時會調用run()
方法,run()
方法中啓動一個新 goroutine 處理任務。run()
主體流程非常簡單:
func (w *goWorker) run() {
go func() {
for f := range w.task {
if f == nil {
return
}
f()
if ok := w.pool.revertWorker(w); !ok {
return
}
}
}()
}
這個方法啓動一個新的 goroutine,然後不停地從task
通道中接收任務,然後執行任務,任務執行完成之後調用池對象的revertWorker()
方法將該goWorker
對象放回池中,以便下次取出處理新的任務。revertWorker()
方法後面會詳細分析。
這裏注意,實際上for f := range w.task
這個循環直到通道task
關閉或取出爲nil
的任務纔會終止。所以這個 goroutine 一直在運行,這正是ants
高性能的關鍵所在。每個goWorker
只會啓動一次 goroutine, 後續重複利用這個 goroutine。goroutine 每次只執行一個任務就會被放回池中。
還有一個細節,如果放回操作失敗,則會調用return
,這會讓 goroutine 運行結束,防止 goroutine 泄漏。
這裏f == nil
爲 true 時return
,也是一個細節點,我們後面講池關閉的時候會詳細介紹。
下面我們看看run()
方法的異常處理:
defer func() {
w.pool.workerCache.Put(w)
if p := recover(); p != nil {
if ph := w.pool.options.PanicHandler; ph != nil {
ph(p)
} else {
w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)
var buf [4096]byte
n := runtime.Stack(buf[:], false)
w.pool.options.Logger.Printf("worker exits from panic: %s\n", string(buf[:n]))
}
}
w.pool.cond.Signal()
}()
簡單來說,就是在defer
中通過recover()
函數捕獲任務執行過程中拋出的panic
。這時任務執行失敗,goroutine 也結束了。但是goWorker
對象還是可以重複利用,所以defer
函數一開始調用w.pool.workerCache.Put(w)
將goWorker
對象放回sync.Pool
池中。
接着就是處理panic
,如果選項中指定了panic
處理器,直接調用這個處理器。否則,ants
調用選項中設置的Logger
記錄一些日誌,如堆棧,panic
信息等。
最後需要調用w.pool.cond.Signal()
通知現在有空閒的goWorker
了。因爲我們實際運行的goWorker
數量由於panic
少了一個,而池中可能有其他任務在等待處理。
提交任務
接下來,通過提交任務就可以串起整個流程。由上一篇文章我們知道,可以調用池對象的Submit()
方法提交任務:
func (p *Pool) Submit(task func()) error {
if p.IsClosed() {
return ErrPoolClosed
}
var w *goWorker
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
}
w.task <- task
return nil
}
首先判斷池是否已關閉,然後調用retrieveWorker()
方法獲取一個空閒的 worker,然後將任務task
發送到 worker 的任務通道。下面是retrieveWorker()
實現:
func (p *Pool) retrieveWorker() (w *goWorker) {
p.lock.Lock()
w = p.workers.detach()
if w != nil {
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
spawnWorker()
} else {
if p.options.Nonblocking {
p.lock.Unlock()
return
}
Reentry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
p.blockingNum++
p.cond.Wait()
p.blockingNum--
var nw int
if nw = p.Running(); nw == 0 {
p.lock.Unlock()
if !p.IsClosed() {
spawnWorker()
}
return
}
if w = p.workers.detach(); w == nil {
if nw < capacity {
p.lock.Unlock()
spawnWorker()
return
}
goto Reentry
}
p.lock.Unlock()
}
return
}
這個方法稍微有點複雜,我們一點點來看。首先調用p.workers.detach()
獲取goWorker
對象。p.workers
是loopQueue
或者workerStack
對象,它們都實現了detach()
方法,前面已經介紹過了。
如果返回了一個goWorker
對象,說明有空閒 goroutine,直接返回。
否則,池容量還沒用完(即容量大於正在工作的goWorker
數量),則調用spawnWorker()
新建一個goWorker
,執行其run()
方法:
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
}
否則,池容量已用完。如果設置了非阻塞選項,則直接返回。否則,如果設置了最大阻塞隊列長度上限,且當前阻塞等待的任務數量已經達到這個上限,直接返回。否則,阻塞等待數量 +1,調用p.cond.Wait()
等待。
然後goWorker.run()
完成一個任務後,調用池的revertWorker()
方法放回goWorker
:
func (p *Pool) revertWorker(worker *goWorker) bool {
if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
return false
}
worker.recycleTime = time.Now()
p.lock.Lock()
if p.IsClosed() {
p.lock.Unlock()
return false
}
err := p.workers.insert(worker)
if err != nil {
p.lock.Unlock()
return false
}
p.cond.Signal()
p.lock.Unlock()
return true
}
這裏設置了goWorker
的recycleTime
字段,用於判定過期。然後將goWorker
放回池。workers
的insert()
方法前面也已經分析過了。
接着調用p.cond.Signal()
喚醒之前retrieveWorker()
方法中的等待。retrieveWorker()
方法繼續執行,阻塞等待數量 -1,這裏判斷當前goWorker
的數量(也即 goroutine 數量)。如果數量等於 0,很有可能池子剛剛執行了Release()
關閉,這時需要判斷池是否處於關閉狀態,如果是則直接返回。否則,調用spawnWorker()
創建一個新的goWorker
並執行其run()
方法。
如果當前goWorker
數量不爲 0,則調用p.workers.detach()
取出一個空閒的goWorker
返回。這個操作有可能失敗,因爲可能同時有多個 goroutine 在等待,喚醒的時候只有部分 goroutine 能獲取到goWorker
。如果失敗了,其容量還未用完,直接創建新的goWorker
,反之重新執行阻塞等待邏輯。
這裏有很多加鎖和解鎖的邏輯,再加上和信號量混在一起很難看明白。其實只需要知道一點就很簡單了,那就是p.cond.Wait()
內部會將當前 goroutine 掛起,然後解開它持有的鎖,即會調用p.lock.Unlock()
。這也是爲什麼revertWorker()
中p.lock.Lock()
加鎖能成功的原因。然後p.cond.Signal()
或p.cond.Broadcast()
會喚醒因爲p.cond.Wait()
而掛起的 goroutine,但是需要Signal()/Broadcast()
所在 goroutine 調用解鎖方法。
最後,放上整體流程圖:
清理過期goWorker
在NewPool()
函數中會啓動一個 goroutine 定期清理過期的goWorker
:
func (p *Pool) purgePeriodically() {
heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop()
for range heartbeat.C {
if p.IsClosed() {
break
}
p.lock.Lock()
expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
p.lock.Unlock()
for i := range expiredWorkers {
expiredWorkers[i].task <- nil
expiredWorkers[i] = nil
}
if p.Running() == 0 {
p.cond.Broadcast()
}
}
}
如果池子已關閉,直接退出 goroutine。由選項ExpiryDuration
來設置清理的間隔,如果沒有設置該選項,採用默認值 1s:
// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ...Option) (*Pool, error) {
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
}
// src/github.com/panjf2000/ants/pool.go
const (
DefaultCleanIntervalTime = time.Second
)
然後就是每個清理週期,調用p.workers.retrieveExpiry()
方法,取出過期的goWorker
。因爲由這些goWorker
啓動的 goroutine 還阻塞在通道task
上,所以要向該通道發送一個nil
值,而goWorker.run()
方法中接收到一個值爲nil
的任務會return
,結束 goroutine,避免了 goroutine 泄漏。
如果所有goWorker
都被清理掉了,可能這時還有 goroutine 阻塞在retrieveWorker()
方法中的p.cond.Wait()
上,所以這裏需要調用p.cond.Broadcast()
喚醒這些 goroutine。
容量動態修改
在運行過程中,可以動態修改池的容量。調用p.Tune(size int)
方法:
func (p *Pool) Tune(size int) {
if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
}
這裏只是簡單設置了一下新的容量,不影響當前正在執行的goWorker
,而且如果設置了預分配選項,容量不能再次設置。
下次執行revertWorker()
的時候就會以新的容量判斷是否能放回,下次執行retrieveWorker()
的時候也會以新容量判斷是否能創建新goWorker
。
關閉和重新啓動Pool
使用完成之後,需要關閉Pool
,避免 goroutine 泄漏。調用池對象的Release()
方法關閉:
func (p *Pool) Release() {
atomic.StoreInt32(&p.state, CLOSED)
p.lock.Lock()
p.workers.reset()
p.lock.Unlock()
p.cond.Broadcast()
}
調用p.workers.reset()
結束loopQueue
或wokerStack
中的 goroutine,做一些清理工作,同時爲了防止有 goroutine 阻塞在p.cond.Wait()
上,執行一次p.cond.Broadcast()
。
workerStack
與loopQueue
的reset()
基本相同,即發送nil
到task
通道從而結束 goroutine,然後重置各個字段:
// loopQueue 版本
func (wq *loopQueue) reset() {
if wq.isEmpty() {
return
}
Releasing:
if w := wq.detach(); w != nil {
w.task <- nil
goto Releasing
}
wq.items = wq.items[:0]
wq.size = 0
wq.head = 0
wq.tail = 0
}
// stack 版本
func (wq *workerStack) reset() {
for i := 0; i < wq.len(); i++ {
wq.items[i].task <- nil
wq.items[i] = nil
}
wq.items = wq.items[:0]
}
池關閉後還可以調用Reboot()
重啓:
func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
go p.purgePeriodically()
}
}
由於p.purgePeriodically()
在p.Release()
之後檢測到池關閉就直接退出了,這裏需要重新開啓一個 goroutine 定期清理。
PoolWithFunc
和WorkWithFunc
上一篇文章中我們還介紹了另一種方式創建Pool
,即NewPoolWithFunc()
,指定一個函數。後面提交任務時調用p.Invoke()
提供參數就可以執行該函數了。這種方式創建的 Pool 和 Woker 結構如下:
type PoolWithFunc struct {
workers []*goWorkerWithFunc
poolFunc func(interface{})
}
type goWorkerWithFunc struct {
pool *PoolWithFunc
args chan interface{}
recycleTime time.Time
}
與前面介紹的Pool
和goWorker
大體相似,只是PoolWithFunc
保存了傳入的函數對象,使用數組保存 worker。goWorkerWithFunc
以interface{}
爲args
通道的數據類型,其實也好理解,因爲已經有函數了,只需要傳入數據作爲參數就可以運行了:
func (w *goWorkerWithFunc) run() {
go func() {
for args := range w.args {
if args == nil {
return
}
w.pool.poolFunc(args)
if ok := w.pool.revertWorker(w); !ok {
return
}
}
}()
}
從通道接收函數參數,執行池中保存的函數對象。
其他細節
task
緩衝通道
還記得創建p.workerCache
這個sync.Pool
對象的代碼麼:
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
在sync.Pool
中沒有goWorker
對象時,調用New()
方法創建一個,注意到這裏創建的task
通道使用workerChanCap
作爲容量。這個變量定義在ants.go
文件中:
var (
// workerChanCap determines whether the channel of a worker should be a buffered channel
// to get the best performance. Inspired by fasthttp at
// https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
workerChanCap = func() int {
// Use blocking channel if GOMAXPROCS=1.
// This switches context from sender to receiver immediately,
// which results in higher performance (under go1.5 at least).
if runtime.GOMAXPROCS(0) == 1 {
return 0
}
// Use non-blocking workerChan if GOMAXPROCS>1,
// since otherwise the sender might be dragged down if the receiver is CPU-bound.
return 1
}()
)
爲了方便對照,我把註釋也放上來了。ants
參考了著名的 Web 框架fasthttp
的實現。當GOMAXPROCS
爲 1 時(即操作系統線程數爲 1),向通道task
發送會掛起發送 goroutine,將執行流程轉向接收 goroutine,這能提升接收處理性能。如果GOMAXPROCS
大於 1,ants
使用帶緩衝的通道,爲了防止接收 goroutine 是 CPU 密集的,導致發送 goroutine 被阻塞。下面是fasthttp
中的相關代碼:
// src/github.com/valyala/fasthttp/workerpool.go
var workerChanCap = func() int {
// Use blocking workerChan if GOMAXPROCS=1.
// This immediately switches Serve to WorkerFunc, which results
// in higher performance (under go1.5 at least).
if runtime.GOMAXPROCS(0) == 1 {
return 0
}
// Use non-blocking workerChan if GOMAXPROCS>1,
// since otherwise the Serve caller (Acceptor) may lag accepting
// new connections if WorkerFunc is CPU-bound.
return 1
}()
自旋鎖
ants
利用atomic.CompareAndSwapUint32()
這個原子操作實現了一個自旋鎖。與其他類型的鎖不同,自旋鎖在加鎖失敗之後不會立刻進入等待,而是會繼續嘗試。這對於很快就能獲得鎖的應用來說能極大提升性能,因爲能避免加鎖和解鎖導致的線程切換:
type spinLock uint32
func (sl *spinLock) Lock() {
backoff := 1
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
for i := 0; i < backoff; i++ {
runtime.Gosched()
}
backoff <<= 1
}
}
func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}
// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {
return new(spinLock)
}
另外這裏使用了指數退避,先等 1 個循環週期,通過runtime.Gosched()
告訴運行時切換其他 goroutine 運行。如果還是獲取不到鎖,就再等 2 個週期。如果還是不行,再等 4,8,16... 以此類推。這可以防止短時間內獲取不到鎖,導致 CPU 時間的浪費。
總結
ants
源碼短小精悍,沒有引用其他任何第三方庫。各種細節處理,各種性能優化的點都是值得我們細細品味的。強烈建議大家讀一讀源碼。閱讀優秀的源碼,能極大地提高自身的編碼素養。
大家如果發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄
參考
-
ants GitHub:github.com/panjf2000/ants
-
Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib
我
我的博客:https://darjun.github.io
歡迎關注我的微信公衆號【GoUpUp】,共同學習,一起進步~
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/a84T6Hpbrhop7vQA01N1Bg