如何在 go 中實現一個 worker-pool?

之前寫過一篇文章,它有個響亮的名字:Handling 1 Million Requests per Minute with Go使用 Go 每分鐘處理百萬請求

這是國外的一個作者寫的,我做了一篇說明,起的也是這個標題。
沒想到閱讀量是我最好的一篇,果然文章都是靠標題出彩的…..

今天偶然看到另一篇文章 (原文在文末 [1])。兩篇文章原理相似: 有一批工作任務 (job),通過工作池 (worker-pool) 的方式,達到多worker併發處理job的效果。

他們還是有很多不同的點,實現上差別也是蠻大的。

首先上一篇文章我放了一張圖片,大概就是上篇整體的工作流。

這篇文章不同點在於:

首先數據會從generate(生產數據)-> 併發處理數據 -> 處理結果聚合。
圖大概是這樣的,

然後它可以通過context.context達到控制工作池停止工作的效果。

最後通過代碼,你會發現它不是傳統意義上的worker-pool,後面會說明。

下圖能清晰表達整體流程了。

順便說一句,這篇文章實現的代碼比 使用 Go 每分鐘處理百萬請求 的代碼簡單多了。

首先看job

這個可以簡單過一下。最終每個job處理完都會包裝成Result返回。

下面這段就是核心代碼了。

整個WorkerPool結構很簡單。jobs是一個緩衝channel。每一個任務都會放入jobs中等待處理woker處理。

results也是一個通道類型,它的作用是保存每個job處理後產生的結果Result

首先通過New初始化一個worker-pool工作池, 然後執行Run開始運行。

初始化的時候傳入worker數,對應每個g運行work(ctx,&wg,wp.jobs,wp.results), 組成了worker-pool

同時通過sync.WaitGroup, 我們可以等待所有worker工作結束,也就意味着work-pool結束工作,當然可能是因爲任務處理結束,也可能是被停止了。

每個job數據源是如何來的?

對應每個worker的工作,

每個 worker 都嘗試從同一個jobs獲取數據,這是一個典型的fan-out模式。當對應的g獲取到job進行處理後,會把處理結果發送到同一個results channel中, 這又是一個fan-in模式。

當然我們通過context.Context可以對每個worker做停止運行控制。

最後是處理結果集合,

那麼整體的測試代碼就是:

看了代碼之後,我們知道,這並不是一個傳統意義的worker-pool。它並不像上篇這篇文章一樣,初始化一個真正的worker-pool,一旦接收到job, 就嘗試從池中獲取一個worker,把對應的job交給這個work進行處理,等work處理完畢,重新進行到工作池中,等待下一次被利用。

附錄

[1]https://itnext.io/explain-to-me-go-concurrency-worker-pool-pattern-like-im-five-e5f1be71e2b0#fe56

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/0-cbZVAWgiTWIEcd4bInQA