如何設計一套分佈式任務調度系統?

什麼是分佈式任務調度器?爲什麼需要分佈式任務調度系統?如何設計一套分佈式任務調度系統?這篇文章,我們來詳細分析。

1. 什麼是分佈式任務調度系統?

分佈式調度系統是一種軟件系統,用於在多個計算節點上協調和管理的執行,這類系統的主要目標是提高任務調度的效率、可靠性和可擴展性。分佈式調度系統通常用於處理需要在多個服務器或計算節點上並行執行的複雜計算任務。

2. 如何設計分佈式任務調度系統?

2.1 需求分析

在深入設計之前,讓我們列出功能和非功能需求。

2.1.1 功能需求

  1. 用戶可以提交一次性或週期性進行執行。

  2. 用戶可以取消已提交的任務。

  3. 系統應將任務分佈到多個工作節點進行執行。

  4. 系統應提供任務狀態監控(排隊中、運行中、已完成、失敗)。

  5. 系統應防止同一任務被多次併發執行。

2.1.2 非功能需求

  1. 可擴展性:系統應能夠調度和執行數百萬個任務。

  2. 高可用性:系統應具有容錯能力,且無單點故障。如果工作節點失敗,系統應將任務重新調度到其他可用節點。

  3. 延遲:任務應以最小的延遲進行調度和執行。

  4. 一致性:任務結果應一致,確保任務只執行一次(或最小化重複)。

2.2 High-level 設計

在高層次上,我們的分佈式任務調度器將包含以下組件:

2.2.1 任務提交服務

任務提交服務是客戶端與系統交互的入口。它提供用戶或服務通過 API 提交、更新或取消任務的接口。

該層暴露一個 RESTful API,接受任務詳細信息,如:

它將任務元數據(例如,execution_time、frequency、status = pending)保存在任務存儲(數據庫)中,並返回一個唯一的任務 ID 給客戶端。

2.2.2 任務存儲

任務存儲負責持久化任務信息並維護系統中所有任務和工作節點的當前狀態。

任務存儲包含以下數據庫表:

任務表

該表存儲任務的元數據,包括任務 ID、用戶 ID、頻率、負載、執行時間、重試次數和狀態(待處理、運行中、已完成、失敗)。

任務執行表

任務在失敗時可以多次執行。

該表跟蹤每個任務的執行嘗試,存儲信息如執行 ID、開始時間、結束時間、工作節點 ID、狀態和錯誤信息。

如果任務失敗並重試,每次嘗試都將在此記錄。

任務調度表

調度表存儲每個任務的調度詳情,包括 next_run_time。

對於一次性任務,next_run_time 與任務的執行時間相同,last_run_time 保持爲空。

對於週期性任務,next_run_time 在每次執行後更新,以反映下次計劃的運行時間。

工作節點表

工作節點表存儲每個工作節點的信息,包括其 IP 地址、狀態、最後心跳、容量和當前負載。

2.2.3 調度服務

調度服務負責根據任務調度表中的 next_run_time 選擇待執行的任務。

它定期查詢表中計劃在當前分鐘運行的任務:

SELECT * FROM JobSchedulesTable WHERE next_run_time = 1726110000;

一旦取回到期任務,它們將被推送到分佈式任務隊列中供工作節點執行。

同時,任務表中的狀態更新爲 SCHEDULED。

2.2.4 分佈式任務隊列

分佈式任務隊列(例如,Kafka、RabbitMQ)作爲調度服務和執行服務之間的緩衝區,確保任務高效地分佈到可用的工作節點。

它持有任務,並允許執行服務拉取任務並分配給工作節點。

2.2.5 執行服務

執行服務負責在工作節點上運行任務並在任務存儲中更新結果。

它由一個協調器和一組工作節點組成。

協調器

協調器(或編排器)節點負責:

分配任務:將任務從隊列分發到可用的工作節點。

管理工作節點:跟蹤活躍工作節點的狀態、健康狀況、容量和工作負載。

處理工作節點故障:檢測工作節點故障並將其任務重新分配給其他健康節點。

負載均衡:確保工作負載根據可用資源和容量均勻分佈在工作節點上。

工作節點

工作節點負責執行任務並將結果(例如,已完成、失敗、輸出)更新到任務存儲中。

當工作節點被分配一個任務時,它會在任務執行表中創建一個新條目,任務狀態設爲運行中並開始執行。

執行完成後,工作節點在任務表和任務執行表中更新任務的最終狀態(例如,已完成或失敗)以及任何輸出。

如果工作節點在執行期間失敗,協調器會將任務重新排隊到分佈式任務隊列中,允許其他工作節點拾取並完成任務。

2.3 Low-level 設計

2.3.1 系統 API 設計

以下是系統中一些重要的 API。

  1. 提交任務(POST /jobs)

  2. 獲取任務狀態(GET /jobs/{job_id})

  3. 取消任務(DELETE /jobs/{job_id})

  4. 列出待處理任務(GET /jobs?status=pending&user_id=u003)

  5. 獲取某個工作節點上正在運行的任務(GET /job/executions?worker_id=w001)

2.3.2 深入分析關鍵組件

SQL vs NoSQL

爲了選擇適合我們需求的數據庫,讓我們考慮一些可能影響選擇的因素:

我們需要每天存儲數百萬個任務。

讀寫查詢大致相同。

數據是具有固定模式的結構化數據。

我們不需要 ACID 事務或複雜的連接。

SQL 和 NoSQL 數據庫都可以滿足這些需求,但考慮到工作負載的規模和性質,像 DynamoDB 或 Cassandra 這樣的 NoSQL 數據庫可能更適合,特別是在處理每天數百萬個任務並支持高吞吐量的寫入和讀取時。

擴展調度服務

調度服務每分鐘定期檢查任務調度表中的待處理任務並將它們推送到任務隊列中進行執行。

例如,以下查詢檢索在當前分鐘內到期執行的所有任務:

SELECT * FROM JobSchedulesTable WHERE next_run_time = 1726110000;

優化從 JobSchedulesTable 讀取:

由於我們使用 next_run_time 列查詢 JobSchedulesTable,最好在 next_run_time 列上分區表,以高效檢索計劃在特定分鐘內運行的所有任務。

如果任何分鐘內的任務數量較少,一個節點就足夠了。

然而,在高峯期,如在一分鐘內需要處理 50,000 個任務時,依賴一個節點可能會導致執行延遲。

節點可能會過載並減慢速度,造成性能瓶頸。

此外,只有一個節點會引入單點故障。

如果該節點由於崩潰或其他問題而不可用,則在節點恢復之前不會調度或執行任何任務,導致系統停機。

爲了解決這個問題,我們需要一個分佈式架構,其中多個工作節點並行處理任務調度任務,由一箇中央節點協調。

但是,我們如何確保任務不會被多個工作節點同時處理呢?

解決方案是將任務劃分爲段。每個工作節點只處理 JobSchedulesTable 中分配給它的特定子集任務,專注於分配的段。

這是通過添加一個名爲 segment 的額外列來實現的。

segment 列邏輯上將任務分組(例如,segment=1,segment=2,等等),確保沒有兩個工作節點同時處理同一個任務。

協調器節點通過分配不同的段給工作節點來管理工作負載的分佈。

它還通過心跳或健康檢查監控工作節點的健康狀況。

在工作節點故障、新工作節點的添加或流量激增的情況下,協調器通過調整段分配動態重新平衡工作負載。

每個工作節點使用 next_run_time 和其分配的段查詢 JobSchedulesTable,以檢索它負責處理的任務。

以下是工作節點可能執行的查詢示例:

SELECT * FROM JobSchedulesTable WHERE next_run_time = 1726110000 AND segment in (1,2);

處理任務失敗

當任務在執行期間失敗時,工作節點會增加任務表中的 retry_count。

如果 retry_count 仍低於 max_retries 閾值,工作節點會從頭開始重試任務。

一旦 retry_count 達到 max_retries 限制,任務將被標記爲失敗,不會再次執行,狀態更新爲失敗。

注意:任務失敗後,工作節點不應立即重試任務,特別是如果失敗是由瞬態問題(例如,網絡故障)引起的。

相反,系統在延遲後重試任務,並且每次重試的延遲會呈指數增加(例如,1 分鐘、5 分鐘、10 分鐘)。

處理執行服務中工作節點的故障

工作節點負責執行由執行服務中的協調器分配給它們的任務。

當工作節點失敗時,系統必須檢測到故障,將未完成的任務重新分配給健康節點,並確保任務不會丟失或重複。

有幾種檢測故障的方法:

心跳機制:每個工作節點定期向協調器發送心跳信號(每幾秒一次)。協調器跟蹤這些心跳信號,如果在預定義時間段內(例如,連續 3 次心跳信號未收到),將工作節點標記爲 “不健康”。

健康檢查:除了心跳信號,協調器還可以對每個工作節點進行定期健康檢查。健康檢查可能包括 CPU、內存、磁盤空間和網絡連接,以確保節點不過載。

一旦檢測到工作節點故障,系統需要恢復並確保分配給故障工作節點的任務仍然被執行。

有兩種主要場景需要處理:

待處理任務(未開始) 對於分配給工作節點但尚未開始的任務,系統需要將這些任務重新分配給其他健康的工作節點。

協調器應將它們重新排隊到任務隊列中,讓另一工作節點拾取。

進行中的任務 在工作節點故障時正在執行的任務需要小心處理,以防止部分執行或數據丟失。

一種方法是使用任務檢查點,工作節點定期將長時間運行任務的進度保存到持久存儲(如數據庫)。如果工作節點失敗,另一工作節點可以從最後一個檢查點重新開始任務。

如果任務部分執行但未完成,協調器應將任務標記爲 “失敗” 並重新排隊到任務隊列中,讓另一個工作節點重試。

解決單點故障

我們在調度服務和執行服務中使用了協調器節點。

爲了防止協調器成爲單點故障,部署多個協調器節點並使用領導選舉機制。

這確保了一個節點是活動領導者,而其他節點處於待命狀態。如果領導者失敗,將選舉新的領導者,系統繼續運行不中斷。

領導選舉:使用像 Raft 或 Paxos 這樣的共識算法從協調器池中選舉領導者。像 Zookeeper 或 etcd 這樣的工具通常用於管理分佈式領導選舉。

故障切換:如果領導協調器失敗,其他協調器檢測到故障並選舉新的領導者。新領導者立即接管職責,確保任務調度、工作節點管理和健康監測的連續性。

數據同步:所有協調器應訪問相同的共享狀態(例如,任務調度數據和工作節點健康信息)。這可以存儲在分佈式數據庫中(例如,Cassandra、DynamoDB)。這樣可以確保當新的領導者接管時,它有最新的數據可用。

速率限制

1. 任務提交級別的速率限制

如果一次性提交給調度系統的任務過多,系統可能會過載,導致性能下降、超時或甚至調度服務失敗。

在客戶端級別實現速率限制,以確保沒有單個客戶端可以壓垮系統。

例如,限制每個客戶端每分鐘最多提交 1,000 個任務。

2. 任務隊列級別的速率限制

即使控制了任務提交速率,如果任務隊列(例如,Kafka、RabbitMQ)被過多任務淹沒,系統可能會過載,導致工作節點速度變慢或消息積壓。

限制任務推送到分佈式任務隊列的速率。這可以通過實現隊列級別的節流來實現,每秒或每分鐘只允許一定數量的任務進入隊列。

3. 工作節點級別的速率限制

如果系統允許工作節點同時執行過多任務,可能會導致基礎設施(例如,CPU、內存、數據庫)過載,導致性能下降或崩潰。

因此,在工作節點級別實現速率限制,以防止任何單個工作節點一次執行過多任務。在工作節點上設置最大併發限制,以控制每個工作節點可以同時執行的任務數量。

3. 分佈式任務的方案

上面的內容我們詳細分析了分佈式任務的設計和實現細節,但是,在實際工作中,我們一般都會採用一些三方的方案來實現分佈式任務。國內常見的分佈式任務的方案有:Quartz Cluster,XXL-Job,Elastic-Job 等

是的,你提到的 Quartz Cluster、XXL-Job 和 Elastic-Job 都是常見的分佈式任務調度方案。它們各自有不同的特點和適用場景。以下是對這些方案的簡要介紹:

Quartz Cluster

Quartz 是一個功能強大的開源任務調度框架,支持創建複雜的定時任務。Quartz Cluster 是其集羣版本,用於實現高可用性和負載均衡。

特點

適用場景

XXL-Job

XXL-Job 是一個分佈式任務調度平臺,旨在提供一個簡單、高效、可擴展的任務調度方案。

特點

適用場景

Elastic-Job

Elastic-Job 是噹噹網開源的分佈式調度解決方案,分爲 Elastic-Job-Lite 和 Elastic-Job-Cloud 兩個版本。

特點

適用場景

綜合來說:

國外一些常見的分佈式任務調度系統包括:

4. 總結

本文,我們從需求到架構再到實現細節,詳細地介紹瞭如何設計一個可擴展、高可用的分佈式任務調度系統。在實際工作中,我們一般都會採用一些三方的方案來實現分佈式任務,但是理解分佈式任務調度系統的設計可以幫助我們更好的理解和使用三方工具。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/p5-QVzbDC1yV_-UnJIXeZQ