如何在 go 中實現一個 worker-pool?
之前寫過一篇文章,它有個響亮的名字:Handling 1 Million Requests per Minute with Go
使用 Go 每分鐘處理百萬請求
這是國外的一個作者寫的,我做了一篇說明,起的也是這個標題。
沒想到閱讀量是我最好的一篇,果然文章都是靠標題出彩的…..
今天偶然看到另一篇文章 (原文在文末 [1])。兩篇文章原理相似: 有一批工作任務 (job),通過工作池 (worker-pool) 的方式,達到多worker
併發處理job
的效果。
他們還是有很多不同的點,實現上差別也是蠻大的。
首先上一篇文章我放了一張圖片,大概就是上篇整體的工作流。
-
每個
worker
處理完任務就好,不關心結果, 不對結果做進一步處理。 -
只要請求不停止,程序就不會停止,沒有控制機制,除非宕機。
這篇文章不同點在於:
首先數據會從generate
(生產數據)-> 併發處理數據 -> 處理結果聚合。
圖大概是這樣的,
然後它可以通過context.context
達到控制工作池停止工作的效果。
最後通過代碼,你會發現它不是傳統意義上的worker-pool
,後面會說明。
下圖能清晰表達整體流程了。
順便說一句,這篇文章實現的代碼比 使用 Go 每分鐘處理百萬請求 的代碼簡單多了。
首先看job
。
這個可以簡單過一下。最終每個job
處理完都會包裝成Result
返回。
下面這段就是核心代碼了。
整個WorkerPool
結構很簡單。jobs
是一個緩衝channel
。每一個任務都會放入jobs
中等待處理woker
處理。
results
也是一個通道類型,它的作用是保存每個job
處理後產生的結果Result
。
首先通過New
初始化一個worker-pool
工作池, 然後執行Run
開始運行。
初始化的時候傳入worker
數,對應每個g
運行work(ctx,&wg,wp.jobs,wp.results)
, 組成了worker-pool
。
同時通過sync.WaitGroup
, 我們可以等待所有worker
工作結束,也就意味着work-pool
結束工作,當然可能是因爲任務處理結束,也可能是被停止了。
每個job
數據源是如何來的?
對應每個worker
的工作,
每個 worker 都嘗試從同一個jobs
獲取數據,這是一個典型的fan-out
模式。當對應的g
獲取到job
進行處理後,會把處理結果發送到同一個results channel
中, 這又是一個fan-in
模式。
當然我們通過context.Context
可以對每個worker
做停止運行控制。
最後是處理結果集合,
那麼整體的測試代碼就是:
看了代碼之後,我們知道,這並不是一個傳統意義的worker-pool
。它並不像上篇這篇文章一樣,初始化一個真正的worker-pool
,一旦接收到job
, 就嘗試從池中獲取一個worker
,把對應的job
交給這個work
進行處理,等work
處理完畢,重新進行到工作池中,等待下一次被利用。
附錄
[1]https://itnext.io/explain-to-me-go-concurrency-worker-pool-pattern-like-im-five-e5f1be71e2b0#fe56
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/0-cbZVAWgiTWIEcd4bInQA