前端開發中大併發量如何控制併發數
寫在前面
最近在進行移動端 h5 開發,首頁需要加載的資源很多,一個 lottie 動效需要請求 70 多張圖片,但是遇到安卓 webview 限制請求併發數,導致部分圖片請求失敗破圖。當然圖片資源可以做閒時加載和預加載,可以減輕播放動效時資源未加載的問題。
同樣的,業務開發也會遇到需要異步請求幾十個接口,如果同時併發請求瀏覽器會進行限制請求數,也會給後端造成請求壓力。
場景說明
現在有個場景:
請你實現一個併發請求函數 concurrencyRequest(urls, maxNum),要求如下:
-
要求最大併發數 maxNum
-
每當有一個請求返回,就留下一個空位,可以增加新的請求
-
所有請求完成後,結果按照 urls 裏面的順序依次打出(發送請求的函數可以直接使用 fetch 即可)
初始實現:
const preloadManger = (urls, maxCount = 5) => {
let count = 0; // 計數 -- 用於控制併發數
const createTask = () => {
if (count < maxCount) {
const url = urls.pop(); // 從請求數組中取值
if (url) {
// 無論請求是否成功,都要執行taskFinish
loader(url).finally(taskFinish);
// 添加下一個請求
count++;
createTask();
}
}
};
const taskFinish = () => {
count--;
createTask();
};
createTask();
};
// 進行異步請求
const loader = async (url) => {
const res = await fetch(url).then(res=>res.json());
console.log("res",res);
return res
}
const urls = [];
for (let i = 1; i <= 20; i++) {
urls.push(`https://jsonplaceholder.typicode.com/todos/${i}`);
}
preloadManger(urls, 5)
請求狀態:
可以看到上面的請求是每五個一組進行請求,當一個請求無論返回成功或是失敗,都會從請求數組中再取一個請求進行補充。
設計思路
那麼,我們可以考慮使用隊列去請求大量接口。
思路如下:
假定最大併發數是 maxNum=5,圖中對接口進行了定義編號,當請求隊列池中有一個請求返回後,就向池子中新增一個接口進行請求,依次直到最後一個請求執行完畢。
當然,要保證程序的健壯性,需要考慮一些邊界情況,如下:
-
當初始請求數組 urls 的長度爲 0 時,此時請求結果數組 results 是個空數組
-
最大併發數 maxNums>urls 的長度時,請求數爲 urls 的長度
-
需要定義計數器 count 去判斷是否全部請求完畢
-
無論請求成功與否,都應該將結果存在結果數組 results 中
-
結果數組 results 和 urls 數組的順序保持一致,方便存取
代碼實現
在前面的初始實現的代碼中,雖然都能滿足基本需求,但是並沒有考慮一些邊界條件,對此需要根據上面設計思路重新實現得到:
// 併發請求函數
const concurrencyRequest = (urls, maxNum) => {
return new Promise((resolve) => {
if (urls.length === 0) {
resolve([]);
return;
}
const results = [];
let index = 0; // 下一個請求的下標
let count = 0; // 當前請求完成的數量
// 發送請求
async function request() {
if (index === urls.length) return;
const i = index; // 保存序號,使result和urls相對應
const url = urls[index];
index++;
console.log(url);
try {
const resp = await fetch(url);
// resp 加入到results
results[i] = resp;
} catch (err) {
// err 加入到results
results[i] = err;
} finally {
count++;
// 判斷是否所有的請求都已完成
if (count === urls.length) {
console.log('完成了');
resolve(results);
}
request();
}
}
// maxNum和urls.length取最小進行調用
const times = Math.min(maxNum, urls.length);
for(let i = 0; i < times; i++) {
request();
}
})
}
測試代碼:
const urls = [];
for (let i = 1; i <= 20; i++) {
urls.push(`https://jsonplaceholder.typicode.com/todos/${i}`);
}
concurrencyRequest(urls, 5).then(res => {
console.log(res);
})
請求結果:
上面代碼基本實現了前端併發請求的需求,也基本滿足需求,在生產中其實有很多已經封裝好的庫可以直接使用。比如:p-limit【https://github.com/sindresorhus/p-limit】
閱讀 p-limit 源碼
import Queue from 'yocto-queue';
import {AsyncResource} from '#async_hooks';
export default function pLimit(concurrency) {
// 判斷這個參數是否是一個大於0的整數,如果不是就拋出一個錯誤
if (
!((Number.isInteger(concurrency)
|| concurrency === Number.POSITIVE_INFINITY)
&& concurrency > 0)
) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
// 創建隊列 -- 用於存取請求
const queue = new Queue();
// 計數
let activeCount = 0;
// 用來處理併發數的函數
const next = () => {
activeCount--;
if (queue.size > 0) {
// queue.dequeue()可以理解爲[].shift(),取出隊列中的第一個任務,由於確定裏面是一個函數,所以直接執行就可以了;
queue.dequeue()();
}
};
// run函數就是用來執行異步併發任務
const run = async (function_, resolve, arguments_) => {
// activeCount加1,表示當前併發數加1
activeCount++;
// 執行傳入的異步函數,將結果賦值給result,注意:現在的result是一個處在pending狀態的Promise
const result = (async () => function_(...arguments_))();
// resolve函數就是enqueue函數中返回的Promise的resolve函數
resolve(result);
// 等待result的狀態發生改變,這裏使用了try...catch,因爲result可能會出現異常,所以需要捕獲異常;
try {
await result;
} catch {}
next();
};
// 將run函數添加到請求隊列中
const enqueue = (function_, resolve, arguments_) => {
queue.enqueue(
// 將run函數綁定到AsyncResource上,不需要立即執行,對此添加了一個bind方法
AsyncResource.bind(run.bind(undefined, function_, resolve, arguments_)),
);
// 立即執行一個異步函數,等待下一個微任務(注意:因爲activeCount是異步更新的,所以需要等待下一個微任務執行才能獲取新的值)
(async () => {
// This function needs to wait until the next microtask before comparing
// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
// when the run function is dequeued and called. The comparison in the if-statement
// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
await Promise.resolve();
// 判斷activeCount是否小於concurrency,並且隊列中有任務,如果滿足條件就會將隊列中的任務取出來執行
if (activeCount < concurrency && queue.size > 0) {
// 注意:queue.dequeue()()執行的是run函數
queue.dequeue()();
}
})();
};
// 接收一個函數fn和參數args,然後返回一個Promise,執行出隊操作
const generator = (function_, ...arguments_) => new Promise(resolve => {
enqueue(function_, resolve, arguments_);
});
// 向外暴露當前的併發數和隊列中的任務數,並且手動清空隊列
Object.defineProperties(generator, {
// 當前併發數
activeCount: {
get: () => activeCount,
},
// 隊列中的任務數
pendingCount: {
get: () => queue.size,
},
// 清空隊列
clearQueue: {
value() {
queue.clear();
},
},
});
return generator;
}
整個庫只有短短 71 行代碼,在代碼中導入了 yocto-queue 庫,它是一個微型的隊列數據結構。
手寫源碼
在進行手撕源碼時,可以藉助數組進行簡易的實現:
class PLimit {
constructor(concurrency) {
this.concurrency = concurrency;
this.activeCount = 0;
this.queue = [];
return (fn, ...args) => {
return new Promise(resolve => {
this.enqueue(fn, resolve, args);
});
}
}
enqueue(fn, resolve, args) {
this.queue.push(this.run.bind(this, fn, resolve, args));
(async () => {
await Promise.resolve();
if (this.activeCount < this.concurrency && this.queue.length > 0) {
this.queue.shift()();
}
})();
}
async run(fn, resolve, args) {
this.activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {
}
this.next();
}
next() {
this.activeCount--;
if (this.queue.length > 0) {
this.queue.shift()();
}
}
}
小結
在這篇文章中,簡要介紹了爲什麼要進行併發請求,闡述了使用請求池隊列實現併發請求的設計思路,簡要實現代碼。
此外,還閱讀分析了 p-limit 的源碼,並使用數組進行簡要的源碼編寫,以實現要求。
參考文章
-
【源碼共讀】大併發量如何控制併發數 https://juejin.cn/post/7179220832575717435?searchId=20240430092814392DC2208C545E691A26
-
前端實現併發控制網絡請求 https://mp.weixin.qq.com/s/9uq2SqkcMSSWjks0x7RQJg
-
關於前端:如何實現併發請求數量控制?https://juejin.cn/post/7163522138698153997
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/ZSHTpMo5Rj0RZtYrkXPpuA