Go 每日一庫之 ants(源碼賞析)

簡介

繼上一篇 Go 每日一庫之 ants,這篇文章我們來一起看看ants的源碼。

通過上篇文章,我們知道ants池有兩種創建方式:

ants中這兩種池子使用不同的結構來表示:ants.Poolants.PoolWithFunc。我們先來介紹PoolPoolWithFunc結構也是類似的,介紹完Pool之後,我們再簡單比較一下它們。

Pool結構定義在文件pool.go中:

// src/github.com/panjf2000/ants/pool.go
type Pool struct {
  capacity int32
  running int32
  workers workerArray
  state int32
  lock sync.Locker
  cond *sync.Cond
  workerCache sync.Pool
  blockingNum int
  options *Options
}

各個字段含義如下:

這裏明確一個概念,ants中爲每個任務都是由 worker 對象來處理的,每個 worker 對象會對應創建一個 goroutine 來處理任務。ants中使用goWorker表示 worker:

// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
  pool *Pool
  task chan func()
  recycleTime time.Time
}

後文詳細介紹這一塊內容,現在我們只需要知道Pool.workers字段就是存放goWorker對象的容器。

Pool創建

創建Pool對象需調用ants.NewPool(size, options)函數。省略了一些處理選項的代碼,最終代碼如下:

// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ...Option) (*Pool, error) {
  // ...
  p := &Pool{
    capacity: int32(size),
    lock:     internal.NewSpinLock(),
    options:  opts,
  }
  p.workerCache.New = func() interface{} {
    return &goWorker{
      pool: p,
      task: make(chan func(), workerChanCap),
    }
  }
  if p.options.PreAlloc {
    if size == -1 {
      return nil, ErrInvalidPreAllocSize
    }
    p.workers = newWorkerArray(loopQueueType, size)
  } else {
    p.workers = newWorkerArray(stackType, 0)
  }

  p.cond = sync.NewCond(p.lock)

  go p.purgePeriodically()
  return p, nil
}

代碼不難理解:

Pool.workers字段爲workerArray類型,這實際上是一個接口,表示一個 worker 容器:

type workerArray interface {
  len() int
  isEmpty() bool
  insert(worker *goWorker) error
  detach() *goWorker
  retrieveExpiry(duration time.Duration) []*goWorker
  reset()
}

每個方法從名字上很好理解含義:

workerArrayants中有兩種實現,即workerStackloopQueue

workerStack

我們先來介紹一下workerStack,它位於文件worker_stack.go中:

// src/github.com/panjf2000/ants/worker_stack.go
type workerStack struct {
  items  []*goWorker
  expiry []*goWorker
  size   int
}

func newWorkerStack(size int) *workerStack {
  return &workerStack{
    items: make([]*goWorker, 0, size),
    size:  size,
  }
}

goroutine 完成任務之後,Pool池會將相應的 worker 放回workerStack,調用workerStack.insert()直接appenditems中即可:

func (wq *workerStack) insert(worker *goWorker) error {
  wq.items = append(wq.items, worker)
  return nil
}

新任務到來時,會調用workerStack.detach()從容器中取出一個空閒的 worker:

func (wq *workerStack) detach() *goWorker {
  l := wq.len()
  if l == 0 {
    return nil
  }

  w := wq.items[l-1]
  wq.items[l-1] = nil // avoid memory leaks
  wq.items = wq.items[:l-1]

  return w
}

這裏總是返回最後一個 worker,每次insert()也是append到最後,符合棧後進先出的特點,故稱爲workerStack

這裏有一個細節,由於切片的底層結構是數組,只要有引用數組的指針,數組中的元素就不會釋放。這裏取出切片最後一個元素後,將對應數組元素的指針設置爲nil,主動釋放這個引用。

上面說過新建Pool對象時會創建一個 goroutine 定期檢查和清理過期的 worker。通過調用workerArray.retrieveExpiry()獲取過期的 worker 列表。workerStack實現如下:

func (wq *workerStack) retrieveExpiry(duration time.Duration) []*goWorker {
  n := wq.len()
  if n == 0 {
    return nil
  }

  expiryTime := time.Now().Add(-duration)
  index := wq.binarySearch(0, n-1, expiryTime)

  wq.expiry = wq.expiry[:0]
  if index != -1 {
    wq.expiry = append(wq.expiry, wq.items[:index+1]...)
    m := copy(wq.items, wq.items[index+1:])
    for i := m; i < n; i++ {
      wq.items[i] = nil
    }
    wq.items = wq.items[:m]
  }
  return wq.expiry
}

實現使用二分查找法找到已過期的最近一個 worker。由於過期時間是按照 goroutine 執行任務後的空閒時間計算的,而workerStack.insert()入隊順序決定了,它們的過期時間是從早到晚的。所以可以使用二分查找:

func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
  var mid int
  for l <= r {
    mid = (l + r) / 2
    if expiryTime.Before(wq.items[mid].recycleTime) {
      r = mid - 1
    } else {
      l = mid + 1
    }
  }
  return r
}

二分查找的是最近過期的 worker,即將過期的 worker 的前一個。它和在它之前的 worker 已經全部過期了。

如果找到索引index,將items從開頭到index(包括)的所有 worker 複製到expiry字段中。然後將index之後的所有未過期 worker 複製到切片頭部,這裏使用了copy函數。copy返回實際複製的數量,即未過期的 worker 數量m。然後將切片itemsm開始所有的元素置爲nil,避免內存泄漏,因爲它們已經被複制到頭部了。最後裁剪items切片,返回過期 worker 切片。

loopQueue

loopQueue實現基於循環隊列,結構定義在文件worker_loop_queue中:

type loopQueue struct {
  items  []*goWorker
  expiry []*goWorker
  head   int
  tail   int
  size   int
  isFull bool
}

func newWorkerLoopQueue(size int) *loopQueue {
  return &loopQueue{
    items: make([]*goWorker, size),
    size:  size,
  }
}

由於是循環隊列,這裏先創建好了一個長度爲size的切片。循環隊列有一個隊列頭指針head,指向第一個有元素的位置,一個隊列尾指針tail,指向下一個可以存放元素的位置。所以一開始狀態如下:

tail處添加元素,添加後tail指針後移。在head處取出元素,取出後head指針也後移。進行一段時間操作後,隊列狀態如下:

headtail指針到隊列尾了,需要回繞。所以可能出現這種情況:

tail指針趕上head指針了,說明隊列就滿了:

head指針趕上tail指針了,隊列再次爲空:

根據示意圖,我們再來看loopQueue的操作方法就很簡單了。

func (wq *loopQueue) insert(worker *goWorker) error {
  if wq.size == 0 {
    return errQueueIsReleased
  }

  if wq.isFull {
    return errQueueIsFull
  }
  wq.items[wq.tail] = worker
  wq.tail++

  if wq.tail == wq.size {
    wq.tail = 0
  }
  if wq.tail == wq.head {
    wq.isFull = true
  }

  return nil
}
func (wq *loopQueue) detach() *goWorker {
  if wq.isEmpty() {
    return nil
  }

  w := wq.items[wq.head]
  wq.items[wq.head] = nil
  wq.head++
  if wq.head == wq.size {
    wq.head = 0
  }
  wq.isFull = false

  return w
}

再看Pool創建

if p.options.PreAlloc {
  if size == -1 {
    return nil, ErrInvalidPreAllocSize
  }
  p.workers = newWorkerArray(loopQueueType, size)
} else {
  p.workers = newWorkerArray(stackType, 0)
}
type arrayType int

const (
  stackType arrayType = 1 << iota
  loopQueueType
)

func newWorkerArray(aType arrayType, size int) workerArray {
  switch aType {
  case stackType:
    return newWorkerStack(size)
  case loopQueueType:
    return newWorkerLoopQueue(size)
  default:
    return newWorkerStack(size)
  }
}

即如果設置了預分配選項,就採用loopQueue結構。否則就採用stack的結構。

worker 結構

介紹完Pool的創建和結構,我們來看看 worker 的結構。在ants中 worker 用結構體goWorker表示,定義在文件worker.go中。它的結構非常簡單:

// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
  pool *Pool
  task chan func()
  recycleTime time.Time
}

具體字段含義很明顯:

goWorker創建時會調用run()方法,run()方法中啓動一個新 goroutine 處理任務。run()主體流程非常簡單:

func (w *goWorker) run() {
  go func() {
    for f := range w.task {
      if f == nil {
        return
      }
      f()
      if ok := w.pool.revertWorker(w); !ok {
        return
      }
    }
  }()
}

這個方法啓動一個新的 goroutine,然後不停地從task通道中接收任務,然後執行任務,任務執行完成之後調用池對象的revertWorker()方法將該goWorker對象放回池中,以便下次取出處理新的任務。revertWorker()方法後面會詳細分析。

這裏注意,實際上for f := range w.task這個循環直到通道task關閉或取出爲nil的任務纔會終止。所以這個 goroutine 一直在運行,這正是ants高性能的關鍵所在。每個goWorker只會啓動一次 goroutine, 後續重複利用這個 goroutine。goroutine 每次只執行一個任務就會被放回池中。

還有一個細節,如果放回操作失敗,則會調用return,這會讓 goroutine 運行結束,防止 goroutine 泄漏

這裏f == nil爲 true 時return,也是一個細節點,我們後面講池關閉的時候會詳細介紹。

下面我們看看run()方法的異常處理:

defer func() {
  w.pool.workerCache.Put(w)
  if p := recover(); p != nil {
    if ph := w.pool.options.PanicHandler; ph != nil {
      ph(p)
    } else {
      w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)
      var buf [4096]byte
      n := runtime.Stack(buf[:]false)
      w.pool.options.Logger.Printf("worker exits from panic: %s\n", string(buf[:n]))
    }
  }
  w.pool.cond.Signal()
}()

簡單來說,就是在defer中通過recover()函數捕獲任務執行過程中拋出的panic。這時任務執行失敗,goroutine 也結束了。但是goWorker對象還是可以重複利用,所以defer函數一開始調用w.pool.workerCache.Put(w)goWorker對象放回sync.Pool池中。

接着就是處理panic,如果選項中指定了panic處理器,直接調用這個處理器。否則,ants調用選項中設置的Logger記錄一些日誌,如堆棧,panic信息等。

最後需要調用w.pool.cond.Signal()通知現在有空閒的goWorker了。因爲我們實際運行的goWorker數量由於panic少了一個,而池中可能有其他任務在等待處理。

提交任務

接下來,通過提交任務就可以串起整個流程。由上一篇文章我們知道,可以調用池對象的Submit()方法提交任務:

func (p *Pool) Submit(task func()) error {
  if p.IsClosed() {
    return ErrPoolClosed
  }
  var w *goWorker
  if w = p.retrieveWorker(); w == nil {
    return ErrPoolOverload
  }
  w.task <- task
  return nil
}

首先判斷池是否已關閉,然後調用retrieveWorker()方法獲取一個空閒的 worker,然後將任務task發送到 worker 的任務通道。下面是retrieveWorker()實現:

func (p *Pool) retrieveWorker() (w *goWorker) {
  p.lock.Lock()

  w = p.workers.detach()
  if w != nil {
    p.lock.Unlock()
  } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
    p.lock.Unlock()
    spawnWorker()
  } else {
    if p.options.Nonblocking {
      p.lock.Unlock()
      return
    }
  Reentry:
    if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
      p.lock.Unlock()
      return
    }
    p.blockingNum++
    p.cond.Wait()
    p.blockingNum--
    var nw int
    if nw = p.Running(); nw == 0 {
      p.lock.Unlock()
      if !p.IsClosed() {
        spawnWorker()
      }
      return
    }
    if w = p.workers.detach(); w == nil {
      if nw < capacity {
        p.lock.Unlock()
        spawnWorker()
        return
      }
      goto Reentry
    }

    p.lock.Unlock()
  }
  return
}

這個方法稍微有點複雜,我們一點點來看。首先調用p.workers.detach()獲取goWorker對象。p.workersloopQueue或者workerStack對象,它們都實現了detach()方法,前面已經介紹過了。

如果返回了一個goWorker對象,說明有空閒 goroutine,直接返回。

否則,池容量還沒用完(即容量大於正在工作的goWorker數量),則調用spawnWorker()新建一個goWorker,執行其run()方法:

spawnWorker := func() {
  w = p.workerCache.Get().(*goWorker)
  w.run()
}

否則,池容量已用完。如果設置了非阻塞選項,則直接返回。否則,如果設置了最大阻塞隊列長度上限,且當前阻塞等待的任務數量已經達到這個上限,直接返回。否則,阻塞等待數量 +1,調用p.cond.Wait()等待。

然後goWorker.run()完成一個任務後,調用池的revertWorker()方法放回goWorker

func (p *Pool) revertWorker(worker *goWorker) bool {
  if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
    return false
  }
  worker.recycleTime = time.Now()
  p.lock.Lock()

  if p.IsClosed() {
    p.lock.Unlock()
    return false
  }

  err := p.workers.insert(worker)
  if err != nil {
    p.lock.Unlock()
    return false
  }

  p.cond.Signal()
  p.lock.Unlock()
  return true
}

這裏設置了goWorkerrecycleTime字段,用於判定過期。然後將goWorker放回池。workersinsert()方法前面也已經分析過了。

接着調用p.cond.Signal()喚醒之前retrieveWorker()方法中的等待。retrieveWorker()方法繼續執行,阻塞等待數量 -1,這裏判斷當前goWorker的數量(也即 goroutine 數量)。如果數量等於 0,很有可能池子剛剛執行了Release()關閉,這時需要判斷池是否處於關閉狀態,如果是則直接返回。否則,調用spawnWorker()創建一個新的goWorker並執行其run()方法。

如果當前goWorker數量不爲 0,則調用p.workers.detach()取出一個空閒的goWorker返回。這個操作有可能失敗,因爲可能同時有多個 goroutine 在等待,喚醒的時候只有部分 goroutine 能獲取到goWorker。如果失敗了,其容量還未用完,直接創建新的goWorker,反之重新執行阻塞等待邏輯。

這裏有很多加鎖和解鎖的邏輯,再加上和信號量混在一起很難看明白。其實只需要知道一點就很簡單了,那就是p.cond.Wait()內部會將當前 goroutine 掛起,然後解開它持有的鎖,即會調用p.lock.Unlock()。這也是爲什麼revertWorker()p.lock.Lock()加鎖能成功的原因。然後p.cond.Signal()p.cond.Broadcast()會喚醒因爲p.cond.Wait()而掛起的 goroutine,但是需要Signal()/Broadcast()所在 goroutine 調用解鎖方法。

最後,放上整體流程圖:

清理過期goWorker

NewPool()函數中會啓動一個 goroutine 定期清理過期的goWorker

func (p *Pool) purgePeriodically() {
  heartbeat := time.NewTicker(p.options.ExpiryDuration)
  defer heartbeat.Stop()

  for range heartbeat.C {
    if p.IsClosed() {
      break
    }

    p.lock.Lock()
    expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
    p.lock.Unlock()

    for i := range expiredWorkers {
      expiredWorkers[i].task <- nil
      expiredWorkers[i] = nil
    }

    if p.Running() == 0 {
      p.cond.Broadcast()
    }
  }
}

如果池子已關閉,直接退出 goroutine。由選項ExpiryDuration來設置清理的間隔,如果沒有設置該選項,採用默認值 1s:

// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ...Option) (*Pool, error) {
  if expiry := opts.ExpiryDuration; expiry < 0 {
    return nil, ErrInvalidPoolExpiry
  } else if expiry == 0 {
    opts.ExpiryDuration = DefaultCleanIntervalTime
  }
}

// src/github.com/panjf2000/ants/pool.go
const (
  DefaultCleanIntervalTime = time.Second
)

然後就是每個清理週期,調用p.workers.retrieveExpiry()方法,取出過期的goWorker因爲由這些goWorker啓動的 goroutine 還阻塞在通道task上,所以要向該通道發送一個nil值,而goWorker.run()方法中接收到一個值爲nil的任務會return,結束 goroutine,避免了 goroutine 泄漏

如果所有goWorker都被清理掉了,可能這時還有 goroutine 阻塞在retrieveWorker()方法中的p.cond.Wait()上,所以這裏需要調用p.cond.Broadcast()喚醒這些 goroutine。

容量動態修改

在運行過程中,可以動態修改池的容量。調用p.Tune(size int)方法:

func (p *Pool) Tune(size int) {
  if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
    return
  }
  atomic.StoreInt32(&p.capacity, int32(size))
}

這裏只是簡單設置了一下新的容量,不影響當前正在執行的goWorker,而且如果設置了預分配選項,容量不能再次設置。

下次執行revertWorker()的時候就會以新的容量判斷是否能放回,下次執行retrieveWorker()的時候也會以新容量判斷是否能創建新goWorker

關閉和重新啓動Pool

使用完成之後,需要關閉Pool,避免 goroutine 泄漏。調用池對象的Release()方法關閉:

func (p *Pool) Release() {
  atomic.StoreInt32(&p.state, CLOSED)
  p.lock.Lock()
  p.workers.reset()
  p.lock.Unlock()
  p.cond.Broadcast()
}

調用p.workers.reset()結束loopQueuewokerStack中的 goroutine,做一些清理工作,同時爲了防止有 goroutine 阻塞在p.cond.Wait()上,執行一次p.cond.Broadcast()

workerStackloopQueuereset()基本相同,即發送niltask通道從而結束 goroutine,然後重置各個字段:

// loopQueue 版本
func (wq *loopQueue) reset() {
  if wq.isEmpty() {
    return
  }

Releasing:
  if w := wq.detach(); w != nil {
    w.task <- nil
    goto Releasing
  }
  wq.items = wq.items[:0]
  wq.size = 0
  wq.head = 0
  wq.tail = 0
}

// stack 版本
func (wq *workerStack) reset() {
  for i := 0; i < wq.len(); i++ {
    wq.items[i].task <- nil
    wq.items[i] = nil
  }
  wq.items = wq.items[:0]
}

池關閉後還可以調用Reboot()重啓:

func (p *Pool) Reboot() {
  if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
    go p.purgePeriodically()
  }
}

由於p.purgePeriodically()p.Release()之後檢測到池關閉就直接退出了,這裏需要重新開啓一個 goroutine 定期清理。

PoolWithFuncWorkWithFunc

上一篇文章中我們還介紹了另一種方式創建Pool,即NewPoolWithFunc(),指定一個函數。後面提交任務時調用p.Invoke()提供參數就可以執行該函數了。這種方式創建的 Pool 和 Woker 結構如下:

type PoolWithFunc struct {
  workers []*goWorkerWithFunc
  poolFunc func(interface{})
}

type goWorkerWithFunc struct {
  pool *PoolWithFunc
  args chan interface{}
  recycleTime time.Time
}

與前面介紹的PoolgoWorker大體相似,只是PoolWithFunc保存了傳入的函數對象,使用數組保存 worker。goWorkerWithFuncinterface{}args通道的數據類型,其實也好理解,因爲已經有函數了,只需要傳入數據作爲參數就可以運行了:

func (w *goWorkerWithFunc) run() {
  go func() {
    for args := range w.args {
      if args == nil {
        return
      }
      w.pool.poolFunc(args)
      if ok := w.pool.revertWorker(w); !ok {
        return
      }
    }
  }()
}

從通道接收函數參數,執行池中保存的函數對象。

其他細節

task緩衝通道

還記得創建p.workerCache這個sync.Pool對象的代碼麼:

p.workerCache.New = func() interface{} {
  return &goWorker{
    pool: p,
    task: make(chan func(), workerChanCap),
  }
}

sync.Pool中沒有goWorker對象時,調用New()方法創建一個,注意到這裏創建的task通道使用workerChanCap作爲容量。這個變量定義在ants.go文件中:

var (
  // workerChanCap determines whether the channel of a worker should be a buffered channel
  // to get the best performance. Inspired by fasthttp at
  // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
  workerChanCap = func() int {
    // Use blocking channel if GOMAXPROCS=1.
    // This switches context from sender to receiver immediately,
    // which results in higher performance (under go1.5 at least).
    if runtime.GOMAXPROCS(0) == 1 {
      return 0
    }

    // Use non-blocking workerChan if GOMAXPROCS>1,
    // since otherwise the sender might be dragged down if the receiver is CPU-bound.
    return 1
  }()
)

爲了方便對照,我把註釋也放上來了。ants參考了著名的 Web 框架fasthttp的實現。當GOMAXPROCS爲 1 時(即操作系統線程數爲 1),向通道task發送會掛起發送 goroutine,將執行流程轉向接收 goroutine,這能提升接收處理性能。如果GOMAXPROCS大於 1,ants使用帶緩衝的通道,爲了防止接收 goroutine 是 CPU 密集的,導致發送 goroutine 被阻塞。下面是fasthttp中的相關代碼:

// src/github.com/valyala/fasthttp/workerpool.go
var workerChanCap = func() int {
  // Use blocking workerChan if GOMAXPROCS=1.
  // This immediately switches Serve to WorkerFunc, which results
  // in higher performance (under go1.5 at least).
  if runtime.GOMAXPROCS(0) == 1 {
    return 0
  }

  // Use non-blocking workerChan if GOMAXPROCS>1,
  // since otherwise the Serve caller (Acceptor) may lag accepting
  // new connections if WorkerFunc is CPU-bound.
  return 1
}()

自旋鎖

ants利用atomic.CompareAndSwapUint32()這個原子操作實現了一個自旋鎖。與其他類型的鎖不同,自旋鎖在加鎖失敗之後不會立刻進入等待,而是會繼續嘗試。這對於很快就能獲得鎖的應用來說能極大提升性能,因爲能避免加鎖和解鎖導致的線程切換:

type spinLock uint32

func (sl *spinLock) Lock() {
  backoff := 1
  for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
    for i := 0; i < backoff; i++ {
      runtime.Gosched()
    }
    backoff <<= 1
  }
}

func (sl *spinLock) Unlock() {
  atomic.StoreUint32((*uint32)(sl), 0)
}

// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {
  return new(spinLock)
}

另外這裏使用了指數退避,先等 1 個循環週期,通過runtime.Gosched()告訴運行時切換其他 goroutine 運行。如果還是獲取不到鎖,就再等 2 個週期。如果還是不行,再等 4,8,16... 以此類推。這可以防止短時間內獲取不到鎖,導致 CPU 時間的浪費。

總結

ants源碼短小精悍,沒有引用其他任何第三方庫。各種細節處理,各種性能優化的點都是值得我們細細品味的。強烈建議大家讀一讀源碼。閱讀優秀的源碼,能極大地提高自身的編碼素養。

大家如果發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄

參考

  1. ants GitHub:github.com/panjf2000/ants

  2. Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib

我的博客:https://darjun.github.io

歡迎關注我的微信公衆號【GoUpUp】,共同學習,一起進步~

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/a84T6Hpbrhop7vQA01N1Bg