可能是最簡單的 JS 併發控制方案

近期刷到了一些關於前端併發控制的文章,感覺有些人對併發控制有些誤解,想起自己很多年前寫一個掃描腳本的時候,由於數據量過大,爲了讓腳本能夠運行的更快些寫了一個併發控制的庫: node-job-runner,現在回頭看來着實有些簡陋。想想不如乘機寫(水)一篇。

想要直接看方案抄作業的可直接跳到下方 僞線程方案 查看。

併發控制

咱們先從什麼是併發控制嘮起。併發控制其實在後端算是一個比較困難的問題,因爲後端的併發控制大部分情況下指的是高併發場景下多個用戶或程序同時訪問或修改數據時對操作進行管理。這就涉及到資源鎖定、操作排序、隔離等等。

不過在前端所聊的併發控制就很簡單了,一般指的是前端發起請求或其他一些異步操作時爲了控制併發的請求或操作數量而做的控制。

使用場景

常見的使用併發控制的場景如下:

  1. 瀏覽器請求併發量控制:每個瀏覽器都存在併發請求上限(http 2.0 之前),所以如果一次發送的請求過多,將會阻塞後續的其他請求。比如有一個表格需要一次性加載幾萬條數據,需要每次 100 條從 API 拉取,這就需要創建幾百個請求任務,如果一次性將這些請求全部打滿,後續萬一有了更高優先級的請求則一定會需要排隊等待前面的幾百個請求完成纔可以。

  2. nodejs 腳本掃描併發量控制:如果需要使用 nodejs 去做一些掃描任務,通常需要併發大量請求來提高執行速度,然而如果併發過高則會出現請求直接報錯或者是描述符超限的情況。

所以如果異步任務併發較高且容易導致問題的場景就需要做好併發控制。

實現方案

計數器方案

要實現併發控制,最簡單常見的方案便是通過計數器來控制併發數量,我們以 Promise 實現來演示:

const axios = require('axios');

const concurrency = (urls, limit) ={
    const result = [];
    let count = 0;
    const len = urls.length;
    return new Promise(resolve ={
        const next = () ={
            if (count === len) return resolve(result);

            let current = count++;
            axios
                .get(urls[current])
                .then(res ={
                    result[current] = { result: res.data };
                })
                .catch(err ={
                    result[current] = { error: err };
                })
                .finally(() ={
                    next();
                });
        };
        while (count < limit) {
            next();
        }
    });
};

const urls = new Array(10).fill('https://www.baidu.com');
concurrency(urls, 3).then(console.log);

事例中通過 count 來控制併發數量,第一個 whilecount 小於 limit 時,就會繼續執行 next 方法,從而在一開始將併發數量提升到 limit 個,然後後續每次 next 執行完成後都會繼續調用下一個 next 來填補空餘,從而將併發數量一直維持在 limit 個。每次請求的執行結果都通過索引將其保存到 result 中的對應位置,當最後一個請求完成後 count === len 就會觸發 resolve,從而返回結果。

但其實,上述的代碼存在一些漏洞:

  1. 如果 urls 的長度 小於等於 limit,那麼就會導致第一個 while 語句時 count === len 從而提前 resolve,解決倒也簡單,直接增加一個完成的計數器,然後把完成的條件判斷放到每個任務完成時判定即可。

  2. 如果 urls 的長度爲 0,將永遠無法觸發 resolve,這個問題也很好解決,提前判斷一下 urls 的長度即可。

所以最終改造後的代碼如下:

const axios = require('axios');

const concurrency = (urls, limit) ={
    const result = [];
    let count = 0,
        completed = 0;
    const len = urls.length;
    if (len === 0) return Promise.resolve([]);
    return new Promise(resolve ={
        const next = () ={
            if (count === len) return;

            let current = count++;
            axios
                .get(urls[current])
                .then(res ={
                    result[current] = { result: res.data };
                })
                .catch(err ={
                    result[current] = { error: err };
                })
                .finally(() ={
                    if (++completed === len) {
                        resolve(result);
                    } else {
                        next();
                    }
                });
        };
        while (count < limit) {
            next();
        }
    });
};

const urls = new Array(10).fill('https://www.baidu.com');
concurrency(urls, 3).then(console.log);

僞線程方案

再來看看我平時用到的另一種更簡單的方案,我不太清楚怎麼命名,姑且稱之爲僞線程方案。主要思路就是模擬線程的設計,開幾個線程,每個線程去任務棧中取任務,當所有線程都完成時則代表所有任務完成即可返回結果。

const axios = require('axios');

const concurrency = async (urls, limit) ={
    const result = [];
    const len = urls.length;
    const next = async () ={
        while (urls.length) {
            const index = len - urls.length,
                url = urls.pop();
            try {
                result[index] = { result: (await axios.get(url)).data };
            } catch (error) {
                result[index] = { error };
            }
        }
    };

    await Promise.all(new Array(limit).fill(null).map(() => next()));

    return result;
};

const urls = new Array(10).fill('https://www.baidu.com');
concurrency([...urls], 3).then(console.log);

乍看之下可能會覺得代碼量差不多,實則不然,這種方案的可讀性和代碼量其實都優於上一個方案。首先通過 new Array 快速創建出一個長度爲 limit 的線程池,然後在每個線程中會去 urls 中一個個拿取帶處理的數據,通過 while/await 直到將數據全部處理完成。也不用擔心 urls 長度小於 limit 等邊界情況。

如果不需要返回結果,那麼這種方案就會顯得更爲簡潔。

const axios = require('axios');

const concurrency = async (urls, limit) ={
    const next = async () ={
        while (urls.length) {
            await axios.get(urls.pop());
        }
    };

    await Promise.all(new Array(limit).fill(null).map(() => next()));
};

const urls = new Array(10).fill('https://www.baidu.com');
concurrency([...urls], 3).then(console.log);

可以看出,核心代碼不過數行,而且可讀性也很高。個人覺得時目前最簡單的併發控制方案。

總結

其實可以看出,前端的併發控制其實非常簡單,做好限制並想辦法依次處理未處理的數據即可。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/blsKTeNXDo5cvFx5AWn1xw