手寫 p-limit,40 行代碼實現併發控制

前端代碼經常要處理各種異步邏輯。

有的是串行的:

const promise1 = new Promise(function(resolve) {
    // 異步邏輯 1...
    resolve();
});
const promise2 = new Promise(function(resolve) {
    // 異步邏輯 2...
    resolve();
});

promise1.then(() => promise2);
await promise1;
await promise2;

有的是並行的:

await Promise.all([promise1, promise2]);
await Promise.race([promise1, promise2]);

並行的異步邏輯有時還要做併發控制。

併發控制是常見的需求,也是面試常考的面試題。

一般我們會用 p-limit 來做:

import pLimit from 'p-limit';

const limit = pLimit(2);

const input = [
    limit(() => fetchSomething('foo')),
    limit(() => fetchSomething('bar')),
    limit(() => doSomething())
];

const result = await Promise.all(input);
console.log(result);

比如上面這段邏輯,就是幾個異步邏輯並行執行,並且最大併發是 2。

那如何實現這樣的併發控制呢?

我們自己來寫一個:

首先,要傳入併發數量,返回一個添加併發任務的函數,我們把它叫做 generator:

const pLimit = (concurrency) ={
    const generator = (fn, ...args) =>
      new Promise((resolve) ={
        //...
      });
      
    return generator;
}

這裏添加的併發任務要進行排隊,所以我們準備一個 queue,並記錄當前在進行中的異步任務。

const queue = [];
let activeCount = 0;

const generator = (fn, ...args) =>
  new Promise((resolve) ={
    enqueue(fn, resolve, ...args);
  });

添加的異步任務就入隊,也就是 enqueue。

enqueue 做的事情就是把一個異步任務添加到 queue 中,並且只要沒達到併發上限就再執行一批任務:

const enqueue = (fn, resolve, ...args) ={
  queue.push(run.bind(null, fn, resolve, ...args));

  if (activeCount < concurrency && queue.length > 0) {
    queue.shift()();
  }
};

具體運行的邏輯是這樣的:

 const run = async (fn, resolve, ...args) ={
  activeCount++;

  const result = (async () => fn(...args))();

  resolve(result);

  try {
    await result;
  } catch {}

  next();
};

計數,運行這個函數,改變最後返回的那個 promise 的狀態,然後執行完之後進行下一步處理:

下一步處理自然就是把活躍任務數量減一,然後再跑一個任務:

const next = () ={
  activeCount--;

  if (queue.length > 0) {
    queue.shift()();
  }
};

這樣就保證了併發的數量限制。

現在的全部代碼如下,只有 40 行代碼:

const pLimit = (concurrency) ={  
    const queue = [];
    let activeCount = 0;
  
    const next = () ={
      activeCount--;
  
      if (queue.length > 0) {
        queue.shift()();
      }
    };
  
    const run = async (fn, resolve, ...args) ={
      activeCount++;
  
      const result = (async () => fn(...args))();

      resolve(result);
  
      try {
        await result;
      } catch {}

      next();
    };
  
    const enqueue = (fn, resolve, ...args) ={
      queue.push(run.bind(null, fn, resolve, ...args));
  
      if (activeCount < concurrency && queue.length > 0) {
          queue.shift()();
      }
    };
  
    const generator = (fn, ...args) =>
      new Promise((resolve) ={
        enqueue(fn, resolve, ...args);
      });
  
    return generator;
};

這就已經實現了併發控制。

不信我們跑跑看:

準備這樣一段測試代碼:

const limit = pLimit(2);
  
function asyncFun(value, delay) {
    return new Promise((resolve) ={
        console.log('start ' + value);
        setTimeout(() => resolve(value), delay);
    });
}

(async function () {
    const arr = [
        limit(() => asyncFun('aaa', 2000)),
        limit(() => asyncFun('bbb', 3000)),
        limit(() => asyncFun('ccc', 1000)),
        limit(() => asyncFun('ccc', 1000)),
        limit(() => asyncFun('ccc', 1000))
    ];
  
    const result = await Promise.all(arr);
    console.log(result);
})();

沒啥好說的,就是 setTimeout + promise,設置不同的 delay 時間。

併發數量爲 2。

我們試下:

先併發執行前兩個任務,2s 的時候一個任務執行完,又執行了一個任務,然後再過一秒,都執行完了,有同時執行了兩個任務。

經過測試,我們已經實現了併發控制!

回顧一下我們實現的過程,其實就是一個隊列來保存任務,開始的時候一次性執行最大併發數的任務,然後每執行完一個啓動一個新的。

還是比較簡單的。

上面的 40 行代碼是最簡化的版本,其實還有一些可以完善的地方,我們繼續完善一下。

首先,我們要把併發數暴露出去,還要讓開發者可以手動清理任務隊列。

我們這樣寫:

Object.defineProperties(generator, {
  activeCount: {
    get: () => activeCount
  },
  pendingCount: {
    get: () => queue.length
  },
  clearQueue: {
    value: () ={
      queue.length = 0;
    }
  }
});

用 Object.defineProperties 只定義 get 函數,這樣 activeCount、pendingCount 就是隻能讀不能改的。

同時還提供了一個清空任務隊列的函數。

然後傳入的參數也加個校驗邏輯:

if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
  throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}

不是整數或者小於 0 就報錯,當然,Infinity 也是可以的。

最後,其實還有一個特別需要完善的點,就是這裏:

const enqueue = (fn, resolve, ...args) ={
  queue.push(run.bind(null, fn, resolve, ...args));

  if (activeCount < concurrency && queue.length > 0) {
    queue.shift()();
  }
};

應該改成這樣:

const enqueue = (fn, resolve, ...args) ={
  queue.push(run.bind(null, fn, resolve, ...args));

  (async () ={
    await Promise.resolve();

    if (activeCount < concurrency && queue.length > 0) {
      queue.shift()();
    }
  })();
};

因爲 activeCount-- 的邏輯是在執行完任務之後才執行的,萬一任務還沒執行完,這時候 activeCount 就是不準的。

所以爲了保證併發數量能控制準確,要等全部的微任務執行完再拿 activeCount。

怎麼在全部的微任務執行完再執行邏輯呢?

加一個新的微任務不就行了?

所以有這樣的 await Promise.resolve(); 的邏輯。

這樣,就是一個完善的併發控制邏輯了,p-limit 也是這麼實現的。

感興趣的同學可以自己試一下:

const pLimit = (concurrency) ={
    if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
      throw new TypeError('Expected `concurrency` to be a number from 1 and up');
    }
  
    const queue = [];
    let activeCount = 0;
  
    const next = () ={
      activeCount--;
  
      if (queue.length > 0) {
        queue.shift()();
      }
    };
  
    const run = async (fn, resolve, ...args) ={
      activeCount++;
  
      const result = (async () => fn(...args))();

      resolve(result);
  
      try {
        await result;
      } catch {}

      next();
    };
  
    const enqueue = (fn, resolve, ...args) ={
      queue.push(run.bind(null, fn, resolve, ...args));
  
      (async () ={
        await Promise.resolve();
  
        if (activeCount < concurrency && queue.length > 0) {
          queue.shift()();
        }
      })();
    };
  
    const generator = (fn, ...args) =>
      new Promise((resolve) ={
        enqueue(fn, resolve, ...args);
      });
  
    Object.defineProperties(generator, {
      activeCount: {
        get: () => activeCount
      },
      pendingCount: {
        get: () => queue.length
      },
      clearQueue: {
        value: () ={
          queue.length = 0;
        }
      }
    });
  
    return generator;
  };
  
const limit = pLimit(2);
  
function asyncFun(value, delay) {
    return new Promise((resolve) ={
        console.log('start ' + value);
        setTimeout(() => resolve(value), delay);
    });
}

(async function () {
    const arr = [
        limit(() => asyncFun('aaa', 2000)),
        limit(() => asyncFun('bbb', 3000)),
        limit(() => asyncFun('ccc', 1000)),
        limit(() => asyncFun('ccc', 1000)),
        limit(() => asyncFun('ccc', 1000))
    ];
  
    const result = await Promise.all(arr);
    console.log(result);
})();

總結

js 代碼經常要處理異步邏輯的串行、並行,還可能要做併發控制,這也是面試常考的點。

實現併發控制的核心就是通過一個隊列保存所有的任務,然後最開始批量執行一批任務到最大併發數,然後每執行完一個任務就再執行一個新的。

其中要注意的是爲了保證獲取的任務數量是準確的,要在所有微任務執行完之後再獲取數量。

實現併發控制只要 40 多行代碼,其實這就是 p-limit 的源碼了,大家感興趣也可以自己實現一下。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/TbtChYU9OL64MTcQhrA63w