用 go 語言實現一個有界協程池
寫在文章開頭
本篇文章算是對go語言
系列的一個收尾,通過 go 語言實現一個實現一個簡單的有界協程池。
Hi,我是 sharkChili ,是個不斷在硬核技術上作死的 java coder ,是 CSDN 的博客專家 ,也是開源項目 Java Guide 的維護者之一,熟悉 Java 也會一點 Go ,偶爾也會在 C 源碼 邊緣徘徊。寫過很多有意思的技術博客,也還在研究並輸出技術的路上,希望我的文章對你有幫助,非常歡迎你關注我的公衆號:寫代碼的 SharkChili 。
詳解 go 語言協程池的實現
整體交互流程設計
我們希望創建一個協程池,該協程池大小由用戶決定,主協程不斷生產任務並投遞到channel
中,協程池收到任務後,如果發現沒有對應處理的協程worker
則創建一個協程並處理傳入的任務,反之這些任務就會有序得等待協程有序調度執行:
定義 worker
基於上圖我們給出worker
的接口定義,按照我們的實現每一個任務都是一個worker
,協程池的協程可以從channel
中得到對應的Worker
並執行其Task
方法:
type Worker interface {
Task()
}
聲明協程池
基於worker
我們封裝一個worker
池,也就是本文提到的協程池,可以看到該Pool
有一個worker
的通道用於存放主協程投遞進來的任務,而wg
則用於控制協程的生命週期,這一點我們會在後續的工作代碼中詳盡說明:
type Pool struct {
//記錄主協程投遞的任務
work chan Worker
//控制工作協程的生命週期
wg sync.WaitGroup
}
創建協程池
有了協程池的定義之後,我們就可以編寫協程池的,可以看到我們可以通過入參決定channel
和協程的大小,通過傳入maxGoroutines
設置wg
的大小,當協程都沒有任務執行時,纔會調用wg
的Done
方法,確保所有任務執行完成後,主協程才能退出:
func New(maxGoroutines int) *Pool {
//創建指定協程數的channel
p := Pool{
work: make(chan Worker, maxGoroutines),
}
//基於協程數創建倒計時門閂
p.wg.Add(maxGoroutines)
//創建maxGoroutines個協程獲取channel的任務執行
for i := 0; i < maxGoroutines; i++ {
go func() {
for w := range p.work {
w.DoTask()
}
//任務執行完成且channel關閉之後,按下倒計時門閂
p.wg.Done()
}()
}
//返回pool的指針
return &p
}
投遞任務
當我們需要投遞任務時,就可以將自實現的worker
投遞到channle
中:
func (p *Pool) Run(w Worker) {
//將任務w投遞到channel中
p.work <- w
}
關閉協程池
最後我們給出關於協程池關閉的實現,其邏輯比較簡單:
-
關閉
channel
不再接受新任務。 -
調用
waitGroup
的Wait
方法等待所有協程執行完再返回。
func (p *Pool) ShutDown() {
close(p.work)
p.wg.Wait()
}
測試代碼
最後我們給出本文的測試代碼,使用示例比較簡單:
-
定義一個姓名切片,作爲測試數據。
-
創建一個名爲
namePrinter
的結構體,內部包含name
屬性,該結構體會繼承Worker
實現打印姓名的Task
方法。 -
創建一個
channel
和協程大小都爲 2 的Pool
。 -
通過多協程循環遍歷
name
切片並將其封裝成 namePrinter 投遞到 chanel 中。 -
協程池的協程消費這些打印姓名的任務。
-
調用
shutDown
方法等待協程池內部協程工作完成後退出主協程。
// 創建一個測試用的姓名切片
var names = []string{
"user.go-1",
"user.go-2",
"user.go-3",
"user.go-4",
"user.go-5",
}
// 實現worker接口 打印姓名
type namePrinter struct {
name string
}
func (n *namePrinter) Task() {
fmt.Println(n.name)
time.Sleep(time.Second)
}
func main() {
//創建還有兩個協程的pool
p := work.New(2)
//創建main協程的倒計時門閂
var wg sync.WaitGroup
wg.Add(100 * len(names))
//多協程投遞任務到pool
for i := 0; i < 100; i++ {
for _, name := range names {
np := namePrinter{
name: name,
}
go func() {
p.Run(&np)
wg.Done()
}()
}
}
//等待任務投遞完成
wg.Wait()
fmt.Println("執行結束,關閉pool")
p.ShutDown()
}
小結
自此,本文基於 go 語言的併發技術實現了一個簡單的協程池,希望對你有所幫助。而 go 語言系列也到此告一段落。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/DSGsJVDy43d5Gs3FpnXxZA