手寫 p-limit,40 行代碼實現併發控制
前端代碼經常要處理各種異步邏輯。
有的是串行的:
const promise1 = new Promise(function(resolve) {
// 異步邏輯 1...
resolve();
});
const promise2 = new Promise(function(resolve) {
// 異步邏輯 2...
resolve();
});
promise1.then(() => promise2);
await promise1;
await promise2;
有的是並行的:
await Promise.all([promise1, promise2]);
await Promise.race([promise1, promise2]);
並行的異步邏輯有時還要做併發控制。
併發控制是常見的需求,也是面試常考的面試題。
一般我們會用 p-limit 來做:
import pLimit from 'p-limit';
const limit = pLimit(2);
const input = [
limit(() => fetchSomething('foo')),
limit(() => fetchSomething('bar')),
limit(() => doSomething())
];
const result = await Promise.all(input);
console.log(result);
比如上面這段邏輯,就是幾個異步邏輯並行執行,並且最大併發是 2。
那如何實現這樣的併發控制呢?
我們自己來寫一個:
首先,要傳入併發數量,返回一個添加併發任務的函數,我們把它叫做 generator:
const pLimit = (concurrency) => {
const generator = (fn, ...args) =>
new Promise((resolve) => {
//...
});
return generator;
}
這裏添加的併發任務要進行排隊,所以我們準備一個 queue,並記錄當前在進行中的異步任務。
const queue = [];
let activeCount = 0;
const generator = (fn, ...args) =>
new Promise((resolve) => {
enqueue(fn, resolve, ...args);
});
添加的異步任務就入隊,也就是 enqueue。
enqueue 做的事情就是把一個異步任務添加到 queue 中,並且只要沒達到併發上限就再執行一批任務:
const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));
if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
};
具體運行的邏輯是這樣的:
const run = async (fn, resolve, ...args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
計數,運行這個函數,改變最後返回的那個 promise 的狀態,然後執行完之後進行下一步處理:
下一步處理自然就是把活躍任務數量減一,然後再跑一個任務:
const next = () => {
activeCount--;
if (queue.length > 0) {
queue.shift()();
}
};
這樣就保證了併發的數量限制。
現在的全部代碼如下,只有 40 行代碼:
const pLimit = (concurrency) => {
const queue = [];
let activeCount = 0;
const next = () => {
activeCount--;
if (queue.length > 0) {
queue.shift()();
}
};
const run = async (fn, resolve, ...args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));
if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
};
const generator = (fn, ...args) =>
new Promise((resolve) => {
enqueue(fn, resolve, ...args);
});
return generator;
};
這就已經實現了併發控制。
不信我們跑跑看:
準備這樣一段測試代碼:
const limit = pLimit(2);
function asyncFun(value, delay) {
return new Promise((resolve) => {
console.log('start ' + value);
setTimeout(() => resolve(value), delay);
});
}
(async function () {
const arr = [
limit(() => asyncFun('aaa', 2000)),
limit(() => asyncFun('bbb', 3000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000))
];
const result = await Promise.all(arr);
console.log(result);
})();
沒啥好說的,就是 setTimeout + promise,設置不同的 delay 時間。
併發數量爲 2。
我們試下:
先併發執行前兩個任務,2s 的時候一個任務執行完,又執行了一個任務,然後再過一秒,都執行完了,有同時執行了兩個任務。
經過測試,我們已經實現了併發控制!
回顧一下我們實現的過程,其實就是一個隊列來保存任務,開始的時候一次性執行最大併發數的任務,然後每執行完一個啓動一個新的。
還是比較簡單的。
上面的 40 行代碼是最簡化的版本,其實還有一些可以完善的地方,我們繼續完善一下。
首先,我們要把併發數暴露出去,還要讓開發者可以手動清理任務隊列。
我們這樣寫:
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount
},
pendingCount: {
get: () => queue.length
},
clearQueue: {
value: () => {
queue.length = 0;
}
}
});
用 Object.defineProperties 只定義 get 函數,這樣 activeCount、pendingCount 就是隻能讀不能改的。
同時還提供了一個清空任務隊列的函數。
然後傳入的參數也加個校驗邏輯:
if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
不是整數或者小於 0 就報錯,當然,Infinity 也是可以的。
最後,其實還有一個特別需要完善的點,就是這裏:
const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));
if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
};
應該改成這樣:
const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));
(async () => {
await Promise.resolve();
if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
})();
};
因爲 activeCount-- 的邏輯是在執行完任務之後才執行的,萬一任務還沒執行完,這時候 activeCount 就是不準的。
所以爲了保證併發數量能控制準確,要等全部的微任務執行完再拿 activeCount。
怎麼在全部的微任務執行完再執行邏輯呢?
加一個新的微任務不就行了?
所以有這樣的 await Promise.resolve(); 的邏輯。
這樣,就是一個完善的併發控制邏輯了,p-limit 也是這麼實現的。
感興趣的同學可以自己試一下:
const pLimit = (concurrency) => {
if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
const queue = [];
let activeCount = 0;
const next = () => {
activeCount--;
if (queue.length > 0) {
queue.shift()();
}
};
const run = async (fn, resolve, ...args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));
(async () => {
await Promise.resolve();
if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
})();
};
const generator = (fn, ...args) =>
new Promise((resolve) => {
enqueue(fn, resolve, ...args);
});
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount
},
pendingCount: {
get: () => queue.length
},
clearQueue: {
value: () => {
queue.length = 0;
}
}
});
return generator;
};
const limit = pLimit(2);
function asyncFun(value, delay) {
return new Promise((resolve) => {
console.log('start ' + value);
setTimeout(() => resolve(value), delay);
});
}
(async function () {
const arr = [
limit(() => asyncFun('aaa', 2000)),
limit(() => asyncFun('bbb', 3000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000))
];
const result = await Promise.all(arr);
console.log(result);
})();
總結
js 代碼經常要處理異步邏輯的串行、並行,還可能要做併發控制,這也是面試常考的點。
實現併發控制的核心就是通過一個隊列保存所有的任務,然後最開始批量執行一批任務到最大併發數,然後每執行完一個任務就再執行一個新的。
其中要注意的是爲了保證獲取的任務數量是準確的,要在所有微任務執行完之後再獲取數量。
實現併發控制只要 40 多行代碼,其實這就是 p-limit 的源碼了,大家感興趣也可以自己實現一下。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/TbtChYU9OL64MTcQhrA63w