用 go 語言實現一個有界協程池

寫在文章開頭

本篇文章算是對go語言系列的一個收尾,通過 go 語言實現一個實現一個簡單的有界協程池。

Hi,我是 sharkChili ,是個不斷在硬核技術上作死的 java coder ,是 CSDN 的博客專家 ,也是開源項目 Java Guide 的維護者之一,熟悉 Java 也會一點 Go ,偶爾也會在 C 源碼 邊緣徘徊。寫過很多有意思的技術博客,也還在研究並輸出技術的路上,希望我的文章對你有幫助,非常歡迎你關注我的公衆號:寫代碼的 SharkChili

詳解 go 語言協程池的實現

整體交互流程設計

我們希望創建一個協程池,該協程池大小由用戶決定,主協程不斷生產任務並投遞到channel中,協程池收到任務後,如果發現沒有對應處理的協程worker則創建一個協程並處理傳入的任務,反之這些任務就會有序得等待協程有序調度執行:

定義 worker

基於上圖我們給出worker的接口定義,按照我們的實現每一個任務都是一個worker,協程池的協程可以從channel中得到對應的Worker並執行其Task方法:

type Worker interface {
 Task()
}

聲明協程池

基於worker我們封裝一個worker池,也就是本文提到的協程池,可以看到該Pool有一個worker的通道用於存放主協程投遞進來的任務,而wg則用於控制協程的生命週期,這一點我們會在後續的工作代碼中詳盡說明:

type Pool struct {
 //記錄主協程投遞的任務
 work chan Worker
 //控制工作協程的生命週期
 wg   sync.WaitGroup
}

創建協程池

有了協程池的定義之後,我們就可以編寫協程池的,可以看到我們可以通過入參決定channel和協程的大小,通過傳入maxGoroutines 設置wg的大小,當協程都沒有任務執行時,纔會調用wgDone方法,確保所有任務執行完成後,主協程才能退出:

func New(maxGoroutines int) *Pool {
 //創建指定協程數的channel
 p := Pool{
  work: make(chan Worker, maxGoroutines),
 }

 //基於協程數創建倒計時門閂
 p.wg.Add(maxGoroutines)

 //創建maxGoroutines個協程獲取channel的任務執行
 for i := 0; i < maxGoroutines; i++ {

  go func() {
   for w := range p.work {
    w.DoTask()

   }
   //任務執行完成且channel關閉之後,按下倒計時門閂
   p.wg.Done()
  }()
 }
 
 //返回pool的指針
 return &p
}

投遞任務

當我們需要投遞任務時,就可以將自實現的worker投遞到channle中:

func (p *Pool) Run(w Worker) {
 //將任務w投遞到channel中
 p.work <- w
}

關閉協程池

最後我們給出關於協程池關閉的實現,其邏輯比較簡單:

  1. 關閉channel不再接受新任務。

  2. 調用waitGroupWait方法等待所有協程執行完再返回。

func (p *Pool) ShutDown() {
 close(p.work)
 p.wg.Wait()
}

測試代碼

最後我們給出本文的測試代碼,使用示例比較簡單:

  1. 定義一個姓名切片,作爲測試數據。

  2. 創建一個名爲namePrinter 的結構體,內部包含name屬性,該結構體會繼承Worker實現打印姓名的Task方法。

  3. 創建一個channel和協程大小都爲 2 的Pool

  4. 通過多協程循環遍歷name切片並將其封裝成 namePrinter 投遞到 chanel 中。

  5. 協程池的協程消費這些打印姓名的任務。

  6. 調用shutDown方法等待協程池內部協程工作完成後退出主協程。

// 創建一個測試用的姓名切片
var names = []string{
 "user.go-1",
 "user.go-2",
 "user.go-3",
 "user.go-4",
 "user.go-5",
}

// 實現worker接口 打印姓名
type namePrinter struct {
 name string
}

func (n *namePrinter) Task() {
 fmt.Println(n.name)
 time.Sleep(time.Second)
}

func main() {

 //創建還有兩個協程的pool
 p := work.New(2)

 //創建main協程的倒計時門閂
 var wg sync.WaitGroup
 wg.Add(100 * len(names))

 //多協程投遞任務到pool
 for i := 0; i < 100; i++ {

  for _, name := range names {
   np := namePrinter{
    name: name,
   }
   go func() {
    p.Run(&np)
    wg.Done()
   }()

  }
 }

 //等待任務投遞完成
 wg.Wait()

 fmt.Println("執行結束,關閉pool")
 p.ShutDown()

}

小結

自此,本文基於 go 語言的併發技術實現了一個簡單的協程池,希望對你有所幫助。而 go 語言系列也到此告一段落。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/DSGsJVDy43d5Gs3FpnXxZA