手把手教你設計一個任務調度器
我們從業務現狀出發,考慮需要覆蓋的場景,設計調度器需要提供的角色。然後確定了調度器的關鍵接口,同時給出了簡單實現。同時,也提到一些個人設計和實現時候的一些思考,比如多叉樹和有向無環圖。另外,考慮到 Rxjs 比較適合 task 的組織,借鑑 Rxjs 的 API 並將其應用到實踐中。希望能給出一些啓發,歡迎一起討論~
現狀
以騰訊文檔業務中的 預加載
流程舉例,在進行預加載的過程中,有三個主要的流程,分別爲
-
數據預加載
-
離線數據同步
-
預渲染
在代碼實現過程中,這三大邏輯竟然寫在一個方法裏面,代碼已經有上百行。三者的依賴圖大致如下:
但是隨着代碼的膨脹,業務邏輯的複雜度指數增加。且維護的同學也不可能只有一個,慢慢地,相互之間就形成這樣的情況:
我們項目中的任務調度
我們發現, 預加載
整個流程中,很多業務邏輯是可以複用的。
假設有兩個業務場景,一個是斷網重連,一個是列表頁滑動。在斷網重連場景下,需要執行的流程爲:
在列表頁滑動場景下,需要執行的流程爲:
不同業務場景下,需要執行的操作有很多相同的地方,比如以上兩個場景,都有 “檢測文檔狀態”、“檢測文檔狀態” 和“預渲染”三個步驟。所以我們將每個場景需要執行的操作拆分爲可以複用的細粒度的任務,這樣可以減少重複代碼。
爲了更好地維護管理,減少耦合,我們打算引入一個任務調度器,以便能夠做到:
-
將業務流程中的各個邏輯合理拆分,使其粒度更細,便於複用。
-
拆分後的子任務,之間的依賴順序可以自由配置,靈活組配,適應更多的業務場景。
引入 task
我們首先將業務邏輯進行拆解,將業務邏輯的最小拆分單元,描述爲 task,task 是調度器能處理的最小單元。
如何描述 task 之間的執行順序
將業務邏輯拆分爲一個個細粒度的 task 之後,如何控制 task 之間的執行順序,如何使多個 task 靈活地並行和串行,是設計時遇到的一個難題。
我們想到的可能的場景有如下幾個。
首先,最簡單的串行任務。如圖,Task1、Task2 以及 Task3 串行執行,後面的 task 需要前面的返回值:
其次,就是並行任務。如圖,Task1 執行完之後,需要執行後續三個任務,這三個任務是並行執行的:
然後,還需要覆蓋到有條件的分支流程。如圖,Task1 執行完成在滿足一定條件後,才能依次執行 Task2 和 Task3 。若不滿足,則什麼都不執行:
也有其他的條件分支流程,比如 Task1 執行完成在滿足一定條件後,才能執行 Task2 。若不滿足,則需要並行執行 Task3 和 Task4。如圖:
最後,還有最複雜的情況,就是串行和並行交織在一起,再加上條件控制比較複雜的情況。舉例如下:
設計思路
如何根據以上需要滿足的場景,去設計我們調度器的架構呢?
一個任務完成後,可能接下來需要執行一個或者三個後續的流程。所以第一時間想到了多叉樹。且是有向的多叉樹。
有向多叉樹
最初將這種結構抽象爲 有向多叉樹
,可以滿足很多場景。比如:
但是 有向多叉樹
不能表示這種:
有向無環圖設計
我們發現 樹
這種數據結構可能並不能滿足需求,我們就想到圖。又因爲這種圖,好像有一種流向,貌似可以進行拓撲排序。所以我們嘗試用 有向無環圖
這種數據結構。
這樣一來,我們的數據結構抽象爲 DAG (有向無環圖),問題也就變成了,如何構建 DAG ,並操作 DAG 。
其他角色
爲了更好地實現任務調度器,除了引入 task 之外, 我們還引入 Job 和 Scheduler 的概念。
task
我們將業務邏輯的最小拆分單元,描述爲 task。task 是調度器能處理的最小單元,Scheduler 管理 Job,不直接管理 task ,task 由其所在的 Job 管理。
Job
Job 用來管理一組互相關聯的 task,成爲一個有意義的作業,即一個 Job 由一個或多個 task 組成。
BasicJob 是 Job 基類,業務的 Job 需要繼承並實現其抽象方法。
Scheduler
Scheduler 用來管理調度向其添加的 Job, 可以恢復和暫停自身的執行。
整體結構
上面介紹瞭如何描述 task 之間的執行順序,具體的業務邏輯由 task 來執行,那如何管理 task 呢?介紹角色的時候提到,Job 用來管理一組互相關聯的 task,成爲一個有意義的 作業。且用 Scheduler 來管理調度向其添加的 Job 。
整體的架構如下:
實現分析
接口定義
task 的接口
我們由底層向高層逐漸來分析。上文提到,task 是最基本的運行單位,是最細粒度的拆分單元。那我們該如何規範 task 呢?
我們希望業務方可以將自己的業務邏輯抽象封裝爲一個 task ,而封裝好的 task 能夠被我們調度器所識別,並能被調度器正確處理。所以 task 需要實現我們自定義的接口,接口 ITask 規範 task 應該具備怎麼樣的能力,應該怎麼樣被實現。
下面看下:
interface ITask {
/**
* 執行 task 的具體邏輯
*
* @return {*} {Promise<ITaskResult<Result>>} task 的返回結果
*/
run(previousTaskResult: ITaskResult): Promise<ITaskResult>;
/**
* 當 task 內部執行失敗時,應做的後續處理。
* 比如回滾 task 的執行
* 比如僅僅上報日誌
* onError 是可選的,可以不實現
*/
onError?(error: Error): void;
}
ITask 提供一個 run 方法,用來執行具體的業務邏輯。它接受上一個任務的執行結果,並按照規範返回自己的處理結果。
當 task 內部執行失敗時,需要做一些注入回滾或者上報之類的異常處理邏輯。所以提供 onError 方法。
job 的接口
Job 用來管理 task,需要具備兩大能力——
-
Job 需要能添加要處理的 task
-
Job 需要要能指定 task 的執行順序
JobScheduler 的接口
JobScheduler 用來管理 Job, 由於我們賦予 task 靈活的組織方式,可以並行,可以串行,同時可以指定條件分支。用來適應各種業務場景。
我們發現 Job 之間不需要太複雜的組織方式,簡單地串行即可。所以 JobScheduler 相對簡單,能管理 Job 進行串行執行即可。另外, JobScheduler 需要具備暫停當前調度器執行的能力,相應地,也需要具備恢復執行的能力。
於是,我們簡單定義 IJobScheduler 爲:
interface IJobScheduler {
/**
* 添加 job 以便調度
* 如果當前沒有 job 正在被執行,且該 JobSchedule 沒有被 pause,則立即執行
* 否則,只是放在隊列,以便後續執行
*/
add(job: BasicJob): void;
/**
* 暫停 JobScheduler 的調度行爲,當前正在執行的 Job 不受影響
* 增加 key 是爲了如果有多個原因要 pause ,那麼需要所有的原因都
* 可以 resume 的時候,才能真正 resume
*
* @param {string} [key] 標識當前暫停原因的 key
*/
pause(key?: string): string;
/**
* 恢復 JobSchedule 的執行
*
* @param {string} key pause 返回的 key
* @return {*} {boolean} resume 是否成功
*/
resume(key: string): boolean;
}
實現
接口 ITask 由業務方去執行即可,相對簡單。調度器管理的 ITask 接口,不關心具體的實現。
BasicJob
Job 管理 task,我們需要實現其兩大能力——
-
Job 需要能添加要處理的 task
-
Job 需要要能指定 task 的執行順序
這裏可以用構建和操作 有向無環圖
的方式實現。最初設想也是將問題轉化爲
-
將 task 的執行依賴抽象爲
有向無環圖
的構建 -
Job 的執行則是對
有向無環圖
進行遍歷。
後面參考了 Rxjs 的 API 和 使用方式,我們決定使用 Rxjs 來實現 Job 。
變換式編程思考
爲什麼使用 Rxjs ?
所有程序其實都是對數據的一種變換——將輸入轉換爲輸出。然而,當我們在構思設計時,很少考慮創建變換過程。相反,我們關心的是類和模塊、數據結構和算法、語言和框架。
我們認爲,從這個角度關注代碼往往忽略了要點——我們需要重新將程序視爲輸入到輸出的一個變換。當這樣做的時候,許多以前操心的細節就消失了。結構變得更清晰,錯誤處理更加一致,耦合下降了很多。
如果你不能將正在做的事情描述爲一個流程,那表示你不知道自己在做什麼。
————《程序員修煉之道》
而 Rxjs 做這件事就特別合適。其中主要用到了 Rxjs 中的 API 有:
-
pipe
-
concatMap
-
forkJoin
-
iif
我們可以將 task 的執行流程抽象爲一個數據管道,首先需要創建一個數據流的起始點:
private source: Observable<TaskResultType>;
// defaultData 可以是初始化的數據,也可以是外界傳入的配置信息
private initSource(defaultData?: unknown): void {
/** @type {ITaskResult} task 添加前的起始值 */
const startPoint: ITaskResult = {
status: TASK_STATUS.success,
data: defaultData,
};
// 創建 Observable
this.source = of(startPoint);
}
這裏的 source
是 Rxjs 中提供的 Observable
類型對象,即可以被觀察的,相當於一個生產者。
另外這裏也用到了 Rxjs 中提供的 of
操作符,簡單說就是將數據轉化爲 Observable
類型對象。那如何添加並組織 task 呢?這裏提供了三個方法:
/**
* 添加多個 task ,串行執行
*/
public serialNext(tasks: ITask[]): void {}
/**
* 添加多個 task ,並行執行
*/
public parallellNext(tasks: ITask[]): void {}
/**
* 條件操作。
* 第一個參數是一個 條件函數,返回 true false,以此來決定走哪個子 Job
*/
public iif(condition: (previousResult: unknown) => boolean, trueSource: BasicJob, falseSource?: BasicJob):void {}
JobScheduler
先分析下我們的構造函數:
public constructor(queue: IQueue = new FIFOQueue()) {}
這個 FIFOQueue
爲啥不直接使用數組在 JobScheduler
實現呢?
其實, FIFOQueue
的內部實現就是數組,代碼不過十行左右。這樣做,是爲了將 queue
和 job-schedule
解耦開,而 queue
是可以依賴注入的,可以自定義實現的。簡單說,就是不希望 queue
的實現細節在 job-schedule
中, job-schedule
只存在 queue
的接口操作。
再看 JobScheduler
的 pause()
和 resume()
方法,設計的時候考慮到,這裏不能是簡單的,調用 pause()
的時候暫停執行,調用 resume()
就恢復執行這麼簡單。
設想一種業務場景,如果有多個原因要暫停調度器的執行,那麼可能有一個業務模塊內的多處都執行了 pause()
。那麼當其中一一處需要恢復執行的時候,即某一個暫停執行的原因不再成立的時候,這時調用了 resume()
的時候,調度器是否應該立即響應,去恢復執行呢?
要不要恢復執行,就看此時是否真的可以恢復調度器的執行。如果業務方只在調度器可以恢復執行的時候,才真正恢復。也就是所以暫停執行的原因都不再成立的時候,才執行 resume()
。那麼這樣的話,調度器就把恢復和暫停的工作交給業務方來確保。
這樣的弊端是:
-
增加了業務方的處理負擔,需要關注調度器內部的執行狀態。
-
當需要調用
resume()
的時候,需要考慮到其他業務是否受當前執行狀態的影響。 -
debug 的時候也難以追蹤,很難調試。
-
維護成本很高,需要知道當前模塊下所有使用調度器的業務。
於是,我們將這一工作收斂到調度器內部。業務只關心自己應該暫停還是恢復,不需要關心其他業務會不會受影響。那麼當調用 resume()
的時候,可能並不會真正地開始恢復執行。
具體實現如下:
/**
* 暫停 JobScheduler 的調度行爲,當前正在執行的 Job 不受影響
* 增加 key 是爲了如果有多個原因要 pause ,那麼需要所有的原因都
* 可以 resume 的時候,才能真正 resume
*
* @param {string} [key] 標識當前暫停原因的 key
*/
public pause(key?: string): string {
// 如果不傳 key ,則生成一個隨機字符串作爲 key
const keyNotNull = key ?? this.generatePauseKey();
this.pausedKeys.add(keyNotNull);
return keyNotNull;
}
/**
* 恢復 JobSchedule 的執行
*
* @param {string} key pause 返回的 key
* @return {*} {boolean} resume 是否成功
*/
public resume(key: string): boolean {
// 如果 key 存在於 set 中,返回true,併成功移除。否則,返回 false
const isContainKey = this.pausedKeys.delete(key);
// 如果移除後 pausedKeys 爲空,則可以 resume
if (this.pausedKeys.size === 0) {
this.executeNextJob();
}
return isContainKey;
}
其他特性
除了上面介紹的,我們的調度器還支持
-
指定任務的優先級
-
任務被處理的各個時機回調
-
超時處理
-
異常處理
-
...
總結
我們從業務現狀出發,考慮需要覆蓋的場景,設計調度器需要提供的角色。然後確定了調度器的關鍵接口,同時給出了簡單實現。同時,也提到一些個人設計和實現時候的一些思考,比如多叉樹和有向無環圖。
另外,考慮到 Rxjs 比較適合 task 的組織,借鑑 Rxjs 的 API 並將其應用到實踐中。希望能給出一些啓發,歡迎一起討論~
參考文章
-
實現一個異步併發調度器
-
Rxjs 官網
-
learnrxjs
-
bullmq
關於 AlloyTeam
AlloyTeam 是國內影響力最大的前端團隊之一,核心成員來自前 WebQQ 前端團隊。
AlloyTeam 負責過 WebQQ、QQ 羣、興趣部落、騰訊文檔等大型 Web 項目,積累了許多豐富寶貴的 Web 開發經驗。
這裏技術氛圍好,領導 nice、錢景好,無論你是身經百戰的資深工程師,還是即將從學校步入社會的新人,只要你熱愛挑戰,希望前端技術和我們飛速提高,這裏將是最適合你的地方。
加入我們,請將簡歷發送至 alloyteam@qq.com,或直接在公衆號留言~
期待您的回覆😁
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/ETP23e72MRARnZhmFTnK6g