JavaScript 中如何實現併發控制?

一、併發控制簡介

在日常開發過程中,你可能會遇到併發控制的場景,比如控制請求併發數。那麼在 JavaScript 中如何實現併發控制呢?在回答這個問題之前,我們來簡單介紹一下併發控制。

假設有 6 個待辦任務要執行,而我們希望限制同時執行的任務個數,即最多隻有 2 個任務能同時執行。當 正在執行任務列表 中的任何 1 個任務完成後,程序會自動從 待辦任務列表 中獲取新的待辦任務並把該任務添加到 正在執行任務列表 中。爲了讓大家能夠更直觀地理解上述的過程,阿寶哥特意畫了以下 3 張圖:

1.1 階段一

1.2 階段二

1.3 階段三

好的,介紹完併發控制之後,阿寶哥將以 Github 上 async-pool 這個庫來介紹一下異步任務併發控制的具體實現。

https://github.com/rxaviers/async-pool

Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7。

二、併發控制的實現

async-pool 這個庫提供了 ES7 和 ES6 兩種不同版本的實現,在分析其具體實現之前,我們來看一下它如何使用。

2.1 asyncPool 的使用

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);

在以上代碼中,我們使用 async-pool 這個庫提供的 asyncPool 函數來實現異步任務的併發控制。asyncPool 函數的簽名如下所示:

function asyncPool(poolLimit, array, iteratorFn){ ... }

該函數接收 3 個參數:

對於以上示例來說,在使用了 asyncPool 函數之後,對應的執行過程如下所示:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.

通過觀察以上的註釋信息,我們可以大致地瞭解 asyncPool 函數內部的控制流程。下面我們先來分析 asyncPool 函數的 ES7 實現。

2.2 asyncPool ES7 實現

async function asyncPool(poolLimit, array, iteratorFn) {
  const ret = []; // 存儲所有的異步任務
  const executing = []; // 存儲正在執行的異步任務
  for (const item of array) {
    // 調用iteratorFn函數創建異步任務
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p); // 保存新的異步任務

    // 當poolLimit值小於或等於總任務個數時,進行併發控制
    if (poolLimit <= array.length) {
      // 當任務完成後,從正在執行的任務數組中移除已完成的任務
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e); // 保存正在執行的異步任務
      if (executing.length >= poolLimit) {
        await Promise.race(executing); // 等待較快的任務執行完成
      }
    }
  }
  return Promise.all(ret);
}

在以上代碼中,充分利用了 Promise.allPromise.race 函數特點,再結合 ES7 中提供的 async await 特性,最終實現了併發控制的功能。利用 await Promise.race(executing); 這行語句,我們會等待 正在執行任務列表 中較快的任務執行完成之後,纔會繼續執行下一次循環。

asyncPool ES7 實現相對比較簡單,接下來我們來看一下不使用 async await 特性要如何實現同樣的功能。

2.3 asyncPool ES6 實現

function asyncPool(poolLimit, array, iteratorFn) {
  let i = 0;
  const ret = []; // 存儲所有的異步任務
  const executing = []; // 存儲正在執行的異步任務
  const enqueue = function () {
    if (i === array.length) {
      return Promise.resolve();
    }
    const item = array[i++]; // 獲取新的任務項
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    let r = Promise.resolve();

    // 當poolLimit值小於或等於總任務個數時,進行併發控制
    if (poolLimit <= array.length) {
      // 當任務完成後,從正在執行的任務數組中移除已完成的任務
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        r = Promise.race(executing); 
      }
    }
 
    // 正在執行任務列表 中較快的任務執行完成之後,纔會從array數組中獲取新的待辦任務
    return r.then(() => enqueue());
  };
  return enqueue().then(() => Promise.all(ret));
}

在 ES6 的實現版本中,通過內部封裝的 enqueue 函數來實現核心的控制邏輯。當 Promise.race(executing) 返回的 Promise 對象變成已完成狀態時,纔會調用 enqueue 函數,從 array 數組中獲取新的待辦任務。

三、阿寶哥有話說

asyncPool 這個庫的 ES7 和 ES6 的具體實現中,我們都使用到了 Promise.allPromise.race 函數。其中手寫 Promise.all 是一道常見的面試題。剛好趁着這個機會,阿寶哥跟大家一起來手寫簡易版的 Promise.allPromise.race 函數。

3.1 手寫 Promise.all

Promise.all(iterable) 方法會返回一個 promise 對象,當輸入的所有 promise 對象的狀態都變成 resolved 時,返回的 promise 對象就會以數組的形式,返回每個 promise 對象 resolve 後的結果。當輸入的任何一個 promise 對象狀態變成 rejected 時,則返回的 promise 對象會 reject 對應的錯誤信息。

Promise.all = function (iterators) {
  return new Promise((resolve, reject) => {
    if (!iterators || iterators.length === 0) {
      resolve([]);
    } else {
      let count = 0; // 計數器,用於判斷所有任務是否執行完成
      let result = []; // 結果數組
      for (let i = 0; i < iterators.length; i++) {
        // 考慮到iterators[i]可能是普通對象,則統一包裝爲Promise對象
        Promise.resolve(iterators[i]).then(
          (data) => {
            result[i] = data; // 按順序保存對應的結果
            // 當所有任務都執行完成後,再統一返回結果
            if (++count === iterators.length) {
              resolve(result);
            }
          },
          (err) => {
            reject(err); // 任何一個Promise對象執行失敗,則調用reject()方法
            return;
          }
        );
      }
    }
  });
};

需要注意的是對於 Promise.all 的標準實現來說,它的參數是一個可迭代對象,比如 Array、String 或 Set 等。

3.2 手寫 Promise.race

Promise.race(iterable) 方法會返回一個 promise 對象,一旦迭代器中的某個 promise 對象 resolvedrejected,返回的 promise 對象就會 resolve 或 reject 相應的值。

Promise.race = function (iterators) {
  return new Promise((resolve, reject) => {
    for (const iter of iterators) {
      Promise.resolve(iter)
        .then((res) => {
          resolve(res);
        })
        .catch((e) => {
          reject(e);
        });
    }
  });
};

本文阿寶哥帶大家詳細分析了 async-pool 異步任務併發控制的具體實現,同時爲了讓大家能夠更好地理解 async-pool 的核心代碼。最後阿寶哥還帶大家一起手寫簡易版的 Promise.allPromise.race 函數。其實除了 Promise.all 函數之外,還存在另一個函數 —— Promise.allSettled,該函數用於解決 Promise.all 存在的問題,感興趣的小夥伴可以自行研究一下。

四、參考資源

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/yWOPoef9ixuSBWApZQhjIg