解析分佈式應用框架 Ray 架構源碼
摘要:Ray 的定位是分佈式應用框架,主要目標是使能分佈式應用的開發和運行。
Ray 是 UC Berkeley 大學 RISE lab(前 AMP lab)2017 年 12 月 開源的新一代分佈式應用框架(剛發佈的時候定位是高性能分佈式計算框架,20 年中修改定位爲分佈式應用框架),通過一套引擎解決複雜場景問題,通過動態計算及狀態共享提高效率,實現研發、運行時、容災一體化。
業務目標
Ray 的定位是分佈式應用框架,主要目標是使能分佈式應用的開發和運行。
業務場景
具體的粗粒度使用場景包括
-
彈性負載,比如 ServerlessComputing
-
機器學習訓練,Ray Tune,RLlib, RaySGD 提供的訓練能力
-
在線服務, 例如 Ray Server 提供在線學習的案例
-
數據處理, 例如 Modin,Dask-On-Ray, MARS-on-Ray
-
臨時計算(例如,並行化 Python 應用程序,將不同的分佈式框架粘合在一起)
Ray 的 API 讓開發者可以輕鬆的在單個分佈式應用中組合多個 libraries,例如,Ray 的 tasks 和 Actors 可能會 call into 或 called from 在 Ray 上運行的分佈式訓練(e.g. torch.distributed)或者在線服務負載; 在這種場景下, Ray 是作爲一個 “分佈式膠水” 系統,因爲它提供通用 API 接口並且性能足以支撐許多不同工作負載類型。
系統設計目標
-
Ray 架構設計的核心原則是 API 的簡單性和通用性
-
Ray 的系統的核心目標是性能(低開銷和水平可伸縮性)和可靠性。爲了達成核心目標,設計過程中需要犧牲一些其他理想的目標,例如簡化的系統架構。例如,Ray 使用了分佈式參考計數和分佈式內存之類的組件,這些組件增加了體系結構的複雜性,但是對於性能和可靠性而言卻是必需的。
-
爲了提高性能,Ray 建立在 gRPC 之上,並且在許多情況下可以達到或超過 gRPC 的原始性能。與單獨使用 gRPC 相比,Ray 使應用程序更容易利用並行和分佈式執行以及分佈式內存共享(通過共享內存對象存儲)。
-
爲了提高可靠性,Ray 的內部協議旨在確保發生故障時的正確性,同時又減少了常見情況的開銷。 Ray 實施了分佈式參考計數協議以確保內存安全,並提供了各種從故障中恢復的選項。
-
由於 Ray 使用抽象資源而不是機器來表示計算能力,因此 Ray 應用程序可以無縫的從便攜機環境擴展到羣集,而無需更改任何代碼。 Ray 通過分佈式溢出調度程序和對象管理器實現了無縫擴展,而開銷卻很低。
相關係統上下文
-
集羣管理系統:Ray 可以在 Kubernetes 或 SLURM 之類的集羣管理系統之上運行,以提供更輕量的 task 和 Actor 而不是容器和服務。
-
並行框架:與 Python 並行化框架(例如 multiprocessing 或 Celery)相比,Ray 提供了更通用,更高性能的 API。Ray 系統還明確支持內存共享。
-
數據處理框架: 與 Spark,Flink,MARS 或 Dask 等數據處理框架相比,Ray 提供了一個 low-level 且較簡化的 API。這使 API 更加靈活,更適合作爲 “分佈式膠水” 框架。另一方面,Ray 對數據模式,關係表或流數據流沒有內在的支持。僅通過庫(例如 Modin,Dask-on-Ray,MARS-on-Ray)提供此類功能。
-
Actor 框架:與諸如 Erlang 和 Akka 之類的專用 actor 框架不同,Ray 與現有的編程語言集成,從而支持跨語言操作和語言本機庫的使用。 Ray 系統還透明地管理無狀態計算的並行性,並明確支持參與者之間的內存共享。
-
HPC 系統:HPC 系統都支持 MPI 消息傳遞接口,MPI 是比 task 和 actor 更底層的接口。這可以使應用程序具有更大的靈活性,但是開發的複雜度加大了很多。這些系統和庫中的許多(例如 NCCL,MPI)也提供了優化的集體通信原語(例如 allreduce)。 Ray 應用程序可以通過初始化各組 Ray Actor 之間的通信組來利用此類原語(例如,就像 RaySGD 的 torch distributed)。
系統設計
邏輯架構:
領域模型
-
Task:在與調用者不同的進程上執行的單個函數調用。任務可以是無狀態的(@ ray.remote 函數)或有狀態的(@ ray.remote 類的方法 - 請參見下面的 Actor)。任務與調用者異步執行:.remote()調用立即返回一個 ObjectRef,可用於檢索返回值。
-
Object:應用程序值。這可以由任務返回,也可以通過 ray.put 創建。對象是不可變的:創建後就無法修改。工人可以使用 ObjectRef 引用對象。
-
Actor:有狀態的工作進程(@ ray.remote 類的實例)。 Actor 任務必須使用句柄或對 Actor 的特定實例的 Python 引用來提交。
-
Driver:程序根目錄。這是運行 ray.init()的代碼。
-
Job:源自同一驅動程序的(遞歸)任務,對象和參與者的集合
集羣設計
如上圖所示,Ray 集羣包括一組同類的 worker 節點和一個集中的全局控制存儲(GCS)實例。
部分系統元數據由 GCS 管理,GCS 是基於可插拔數據存儲的服務,這些元數據也由 worker 本地緩存,例如 Actor 的地址。 GCS 管理的元數據訪問頻率較低,但可能被羣集中的大多數或所有 worker 使用,例如,羣集的當前節點成員身份。這是爲了確保 GCS 性能對於應用程序性能影響不大。
Ownership
-
大部分系統元數據是根據去中心化理念(ownership)進行管理的:每個工作進程都管理和擁有它提交的任務以及這些任務返回的 “ObjectRef”。Owner 負責確保任務的執行並促進將 ObjectRef 解析爲其基礎值。類似地,worker 擁有通過“ ray.put” 調用創建的任何對象。
-
OwnerShip 的設計具有以下優點(與 Ray 版本 < 0.8 中使用的更集中的設計相比):
-
低任務延遲(〜1 RTT,<200us)。經常訪問的系統元數據對於必須對其進行更新的過程而言是本地的。
-
高吞吐量(每個客戶端約 10k 任務 / 秒;線性擴展到集羣中數百萬個任務 / 秒),因爲系統元數據通過嵌套的遠程函數調用自然分佈在多個 worker 進程中。
-
簡化的架構。owner 集中了安全垃圾收集對象和系統元數據所需的邏輯。
-
提高了可靠性。可以根據應用程序結構將工作程序故障彼此隔離,例如,一個遠程調用的故障不會影響另一個。
- OwnerShip 附帶的一些權衡取捨是:
-
要解析 “ObjectRef”,對象的 owner 必須是可及的。這意味着對象必須與其 owner 綁定。有關對象恢復和持久性的更多信息,請參見 object 故障和 object 溢出。
-
目前無法轉讓 ownership。
核心組件
- Ray 實例由一個或多個工作節點組成,每個工作節點由以下物理進程組成:
- 一個或多個工作進程,負責任務的提交和執行。工作進程要麼是無狀態的(可以執行任何 @ray.remote 函數),要麼是 Actor(只能根據其 @ray.remote 類執行方法)。每個 worker 進程都與特定的作業關聯。初始工作線程的默認數量等於計算機上的 CPU 數量。每個 worker 存儲 ownership 表和小對象:
a. Ownership 表。工作線程具有引用的對象的系統元數據,例如,用於存儲引用計數。
b. in-process store,用於存儲小對象。
2.Raylet。raylet 在同一羣集上的所有作業之間共享。raylet 有兩個主線程:
a. 調度器。負責資源管理和滿足存儲在分佈式對象存儲中的任務參數。羣集中的單個調度程序包括 Ray 分佈式調度程序。
b. 共享內存對象存儲(也稱爲 Plasma Object Store)。負責存儲和傳輸大型對象。集羣中的單個對象存儲包括 Ray 分佈式對象存儲。
每個工作進程和 raylet 都被分配了一個唯一的 20 字節標識符以及一個 IP 地址和端口。相同的地址和端口可以被後續組件重用(例如,如果以前的工作進程死亡),但唯一 ID 永遠不會被重用(即,它們在進程死亡時被標記爲墓碑)。工作進程與其本地 raylet 進程共享命運。
- 其中一個工作節點被指定爲 Head 節點。除了上述進程外,Head 節點還託管:
- 全局控制存儲 (GCS)。GCS 是一個鍵值服務器,包含系統級元數據,如對象和參與者的位置。GCS 目前還不支持高可用,後續版本中 GCS 可以在任何和多個節點上運行,而不是指定的頭節點上運行。
2.Driver 進程 (es)。Driver 是一個特殊的工作進程,它執行頂級應用程序(例如,Python 中的__main__)。它可以提交任務,但不能執行任何任務本身。Driver 進程可以在任何節點上運行。
交互設計
應用的 Driver 可以通過以下方式之一連接到 Ray:
-
調用 `ray.init()’,沒有參數。這將啓動一個嵌入式單節點 Ray 實例,應用可以立即使用該實例。
-
通過指定 ray.init(地址 =)連接到現有的 Ray 集羣。在後端,Driver 將以指定的地址連接到 GCS,並查找羣集其他組件的地址,例如其本地 raylet 地址。Driver 必須與 Ray 羣集的現有節點之一合部。這是因爲 Ray 的共享內存功能,所以合部是必要的前提。
-
使用 Ray 客戶端 `ray.util.connect()'從遠程計算機(例如筆記本電腦)連接。默認情況下,每個 Ray 羣集都會在可以接收遠程客戶端連接的頭節點上啓動一個 Ray ClientServer,用來接收遠程 client 連接。但是由於網絡延遲,直接從客戶端運行的某些操作可能會更慢。
Runtime
所有 Ray 核心組件都是用 C++ 實現的。Ray 通過一個名爲 “coreworker” 的通用嵌入式 C++ 庫支持 Python 和 Java。此庫實現 ownership 表、進程內存儲,並管理與其他工作器和 Raylet 的 gRPC 通信。由於庫是用 C++ 實現的,所有語言運行時都共享 Ray 工作協議的通用高性能實現。
Task 的 lifetime
Owner 負責確保提交的 Task 的執行,並促進將返回的 ObjectRef 解析爲其基礎值。如下圖,提交 Task 的進程被視爲結果的 Owner,並負責從 raylet 獲取資源以執行 Task,Driver 擁有 A 的結果,Worker 1 擁有 B 的結果。
-
提交 Task 時,Owner 會等待所有依賴項就緒,即作爲參數傳遞給 Task 的 ObjectRefs(請參見 Object 的 lifetime)變得可用。依賴項不需要是本地的;Owner 一旦認爲依賴項在羣集中的任何地方可用,就會立即就緒。當依賴關係就緒時,Owner 從分佈式調度程序請求資源以執行任務,一旦資源可用,調度程序就會授予請求,並使用分配給 owner 的 worker 的地址進行響應。
-
Owner 將 task spec 通過 gRPC 發送給租用的 worker 來調度任務。執行任務後,worker 必須存儲返回值。如果返回值較小,則工作線程將值直接 inline 返回給 Owner,Owner 將其複製到其進程中對象存儲區。如果返回值很大,則 worker 將對象存儲在其本地共享內存存儲中,並向所有者返回分佈式內存中的 ref。讓 owner 可以引用對象,不必將對象提取到其本地節點。
-
當 Task 以 ObjectRef 作爲其參數提交時,必須在 worker 開始執行之前解析對象值。如果該值較小,則它將直接從所有者的進程中對象存儲複製到任務說明中,在任務說明中,執行 worker 線程可以引用它。如果該值較大,則必須從分佈式內存中提取對象,以便 worker 在其本地共享內存存儲中具有副本。scheduler 通過查找對象的位置並從其他節點請求副本來協調此對象傳輸。
-
容錯:任務可能會以錯誤結束。Ray 區分了兩種類型的任務錯誤:
-
應用程序級。這是工作進程處於活動狀態,但任務以錯誤結束的任何場景。例如,在 Python 中拋出 IndexError 的任務。
-
系統級。這是工作進程意外死亡的任何場景。例如,隔離故障的進程,或者如果工作程序的本地 raylet 死亡。
-
由於應用程序級錯誤而失敗的任務永遠不會重試。異常被捕獲並存儲爲任務的返回值。由於系統級錯誤而失敗的任務可以自動重試到指定的嘗試次數。
-
代碼參考:
1.src/ray/core_worker/core_worker.cc
2.src/ray/common/task/task_spec.h
3.src/ray/core_worker/transport/direct_task_transport.cc
4.src/ray/core_worker/transport / 依賴關係_解析器. cc
5.src/ray/core_worker/task_manager.cc
6.src/ray/protobuf/common.proto
Object 的 lifetime
下圖 Ray 中的分佈式內存管理。worker 可以創建和獲取對象。owner 負責確定對象何時安全釋放。
-
對象的 owner 就是通過提交創建 task 或調用 ray.put 創建初始 ObjectRef 的 worker。owner 管理對象的生存期。Ray 保證,如果 owner 是活的,對象最終可能會被解析爲其值(或者在 worker 失敗的情況下引發錯誤)。如果 owner 已死亡,則獲取對象值的嘗試永遠不會 hang,但可能會引發異常,即使對象仍有物理副本。
-
每個 worker 存儲其擁有的對象的引用計數。有關如何跟蹤引用的詳細信息,請參閱引用計數。Reference 僅在下面兩種操作期間計算:
-
將 ObjectRef 或包含 ObjectRef 的對象作爲參數傳遞給 Task。
-
從 Task 中返回 ObjectRef 或包含 ObjectRef 的對象。
-
對象可以存儲在 owner 的進程內內存存儲中,也可以存儲在分佈式對象存儲中。此決定旨在減少每個對象的內存佔用空間和解析時間。
-
當沒有故障時,owner 保證,只要對象仍在作用域中(非零引用計數),對象的至少一個副本最終將可用。
-
有兩種方法可以將 ObjectRef 解析爲其值:
-
在 ObjectRef 上調用 ray.get。
-
將 ObjectRef 作爲參數傳遞給任務。執行工作程序將解析 ObjectRefs,並將任務參數替換爲解析的值。
-
當對象較小時,可以通過直接從 owner 的進程內存儲中檢索它來解析。大對象存儲在分佈式對象存儲中,必須使用分佈式協議解析。
-
當沒有故障時,解析將保證最終成功(但可能會引發應用程序級異常,例如 workersegfault)。如果存在故障,解析可能會引發系統級異常,但永遠不會掛起。如果對象存儲在分佈式內存中,並且對象的所有副本都因 raylet 故障而丟失,則該對象可能會失敗。Ray 還提供了一個選項,可以通過重建自動恢復此類丟失的對象。如果對象的所有者進程死亡,對象也可能失敗。
-
代碼參考:
1.src/ray/core_worker/store_Provider/memory_store/memory_store.cc
2.src/ray/core_worker/store_Provider/plasma_store_provider.cc
3.src/ray/core_worker/reference_count.cc
4.src/ray/object_manager/object_manager.cc
Actor 的 lifetime
Actor 的 lifetimes 和 metadata (如 IP 和端口) 是由 GCS service 管理的. 每一個 Actor 的 Client 都會在本地緩存 metadata,使用 metadata 通過 gRPC 將 task 發送給 Actor.
如上圖,與 Task 提交不同,Task 提交完全分散並由 Task Owner 管理,Actor lifetime 由 GCS 服務集中管理。
-
在 Python 中創建 Actor 時,worker 首先同步向 GCS 註冊 Actor。這確保了在創建 Actor 之前, 如果創建 worker 失敗的情況下的正確性。一旦 GCS 響應,Actor 創建過程的其餘部分將是異步的。Worker 進程在創建一個稱爲 Actor 創建 Task 的特殊 Task 隊列。這與普通的非 Actor 任務類似,只是其指定的資源是在 actor 進程的生存期內獲取的。創建者異步解析 actor 創建 task 的依賴關係,然後將其發送到要調度的 GCS 服務。同時,創建 actor 的 Python 調用立即返回一個 “actor 句柄”,即使 actor 創建任務尚未調度,也可以使用該句柄。
-
Actor 的任務執行與普通 Task 類似:它們返回 futures,通過 gRPC 直接提交給 actor 進程,在解析所有 ObjectRef 依賴關係之前,不會運行。和普通 Task 主要有兩個區別:
-
執行 Actor 任務不需要從調度器獲取資源。這是因爲在計劃其創建任務時,參與者已在其生命週期內獲得資源。
-
對於 Actor 的每個調用者,任務的執行順序與提交順序相同。
-
當 Actor 的創建者退出時,或者羣集中的作用域中沒有更多掛起的任務或句柄時,將被清理。不過對於 detached Actor 來說不是這樣的,因爲 detached actor 被設計爲可以通過名稱引用的長 Actor,必須使用 ray.kill(no_restart=True) 顯式清理。
-
Ray 還支持 async actor,這些 Actor 可以使用 asyncio event loop 併發運行任務。從調用者的角度來看,向這些 actor 提交任務與向常規 actor 提交任務相同。唯一的區別是,當 task 在 actor 上運行時,它將發佈到在後臺線程或線程池中運行的異步事件循環中,而不是直接在主線程上運行。
-
代碼參考:
1.Core worker 源碼: src/ray/core_worker/core_worker.h. 此代碼是任務調度、Actor 任務調度、進程內存儲和內存管理中涉及的各種協議的主幹。
2.Python: python/ray/includes/libcoreworker.pxd
3.Java: src/ray/core_worker/lib/java
4.src/ray/core_worker/core_worker.cc
5.src/ray/core_worker/transport/direct_actor_transport.cc
6.src/ray/gcs/gcs_server/gcs_actor_manager.cc
7.src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
8.src/ray/protobuf/core_worker.proto
本文分享自華爲雲社區《分佈式應用框架 Ray 架構源碼解析》,原文作者:Leo Xiao 。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://juejin.cn/post/6939762041913606152?utm_source=gold_browser_extension