一個強大可靠的 Golang 池化庫 ants

ants 是 golang 語言的一個強大的、可靠的池化解決方案庫。

ants 是一個高性能的 goroutine 池,實現了對大規模 goroutine 的調度管理、goroutine 複用,允許使用者在開發併發程序的時候限制 goroutine 數量,複用資源,達到更高效執行任務的效果。

它可以自動調度海量的 goroutine,複用 goroutine,以及定期清理過期的 goroutine,可以節省資源,提高資源利用率。它還提供了大量的接口,用來調整 goroutine 數量,Pool 大小,以及控制 Pool 等。它通過資源複用,可以節省內存使用量,在大規模批量併發場景下比原生 goroutine 併發具有更高性能,它是非阻塞機制,並可優雅處理 panic,防止程序崩潰。

在使用 go 寫高併發程序的時候會啓動大量的 goroutine,這會消耗大量的系統資源,通過使用 ants,可以實例化一個 goroutine 池,複用 goroutine,節省資源,提升性能。

下面是使用 ants 的一個示例:

package main
import (
  "fmt"
  "sync"
  "sync/atomic"
  "time"
  "github.com/panjf2000/ants/v2"
)
var sum int32
func myFunc(i interface{}) {
  n := i.(int32)
  atomic.AddInt32(&sum, n)
  fmt.Printf("run with %d\n", n)
}
func demoFunc() {
  time.Sleep(10 * time.Millisecond)
  fmt.Println("Hello World!")
}
func main() {
  defer ants.Release()
  runTimes := 1000
  // Use the common pool.
  var wg sync.WaitGroup
  syncCalculateSum := func() {
    demoFunc()
    wg.Done()
  }
  for i := 0; i < runTimes; i++ {
    wg.Add(1)
    _ = ants.Submit(syncCalculateSum)
  }
  wg.Wait()
  fmt.Printf("running goroutines: %d\n", ants.Running())
  fmt.Printf("finish all tasks.\n")
  // Use the pool with a function,
  // set 10 to the capacity of goroutine pool and 1 second for expired duration.
  p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
    myFunc(i)
    wg.Done()
  })
  defer p.Release()
  // Submit tasks one by one.
  for i := 0; i < runTimes; i++ {
    wg.Add(1)
    _ = p.Invoke(int32(i))
  }
  wg.Wait()
  fmt.Printf("running goroutines: %d\n", p.Running())
  fmt.Printf("finish all tasks, result is %d\n", sum)
  if sum != 499500 {
    panic("the final result is wrong!!!")
  }
  // Use the MultiPool and set the capacity of the 10 goroutine pools to unlimited.
  // If you use -1 as the pool size parameter, the size will be unlimited.
  // There are two load-balancing algorithms for pools: ants.RoundRobin and ants.LeastTasks.
  mp, _ := ants.NewMultiPool(10, -1, ants.RoundRobin)
  defer mp.ReleaseTimeout(5 * time.Second)
  for i := 0; i < runTimes; i++ {
    wg.Add(1)
    _ = mp.Submit(syncCalculateSum)
  }
  wg.Wait()
  fmt.Printf("running goroutines: %d\n", mp.Running())
  fmt.Printf("finish all tasks.\n")
  // Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
  mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) {
    myFunc(i)
    wg.Done()
  }, ants.LeastTasks)
  defer mpf.ReleaseTimeout(5 * time.Second)
  for i := 0; i < runTimes; i++ {
    wg.Add(1)
    _ = mpf.Invoke(int32(i))
  }
  wg.Wait()
  fmt.Printf("running goroutines: %d\n", mpf.Running())
  fmt.Printf("finish all tasks, result is %d\n", sum)
  if sum != 499500*2 {
    panic("the final result is wrong!!!")
  }
}

我也在項目中使用過 ants,有這樣一個場景,當消息提交過來後,需要將消息提交到第三方推送平臺。第三方平臺通過 HTTP 請求進行交互,如果不使用 goroutine,可以感覺到明顯的延遲,特別是消息大量湧進來之後。所以將執行提交的任務,通過 goroutine 進行異步化。基本滿足應用的性能需求。運行一段時間後,發現這種方式 goroutine 不可控,在經過優化後,使用 ants 將消息提交和消息處理進行分離。在達到需求的同時,減少了內存佔用。善用 ants,可以大大提高資源利用率。

更多內容,請參考 Github:

https://github.com/panjf2000/ants

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/-XRzHm3Y8TRFTYl3UiI9pA