Ray2-0 架構 - 中文翻譯版 -Part1-
原文地址:https://docs.ray.io/en/latest/ray-contribute/whitepaper.html
綜述
Ray 是一個爲了給分佈式提供通用的 API 發明出來的分佈式計算框架,希望通過簡單但通用的抽象編程方式,讓系統自動完成所有的工作。Ray 的設計者基於這個理念讓 Ray 可以跟 Python 緊密相連,能夠通過很少的代碼就能處理業務,而其它的並行、分佈式內存管理等問題都不用擔心,Ray 會根據這些資源的情況自動調度和縮放。
其次 Ray 希望能夠對應用程序的一些系統級行爲進行控制,如系統的環境變量、參數、故障處理等等。
Ray 使 用了包括分佈式引用計數和分佈式內存等組件,這些組件增加了體系結構的複雜性,但對性能和可靠性來說是必需的。
Ray 構建在 gRPC 的基礎上,並且在許多情況下與 gRPC 直接調用的性能一致。與單獨使用 gRPC 相比,Ray 使應用程序更容易利用並行和分佈式執行、分佈式內存共享(通過共享內存對象存儲)和動態創建輕量級服務(如通過 gRPC 調用 Actor)。
爲了提高可靠性,Ray 的內部協議旨在確保故障期間的正確性只會增加非常低的性能開銷。Ray 實現了分佈式引用計數協議,以確保內存安全幫助從故障中恢復。
由於 Ray 用戶考慮用資源而不是機器來表示他們的計算資源,因此 Ray 應用程序可以簡單地從筆記本電腦擴展到集羣,而無需任何代碼更改。Ray 的分佈式調度器和對象管理器旨在實現這種無縫擴展,且開銷低。
核心相關係統
Cluster Orchestrators
Ray 提供了更簡單運行在 Kubernetes 的方法,可以使用 KubeRay Operator ,它提供了一種在 k8s 中管理 Ray 集羣的解決方案,每個 Ray 集羣由一個 head 節點和一羣 woker 節點組成。你可以通過 KubeRay 來根據所需調整集羣大小,同時也支持 GPU 之類的異構計算,也支持有多個版本的 Ray 的集羣。
Parallelization Frameworks
與 multiprocessing 或者 Celery 之類的框架相比,Ray 提供了更通用且性能更高的 API。同時支持內存共享。
Data Processing Frameworks
與 Spark、Flink 等框架相比,Ray 提供的 API 更加底層和靈活,更適合作爲 “distributed glue” 框架。另一方面 Ray 沒有限定一定是數據處理的模式,而是通過功能庫的方式提供不同的處理模式。
Actor Frameworks
不像 Erlang 和 Akka 之類的框架,Ray 支持跨語言的操作和使用那個語言原生的庫,能夠透明地管理無狀態的並行計算,顯式地支持 Actor 之間共享內存。
HPC Systems
許多 HPC 系統公開了更低級的消息傳遞接口,雖然很靈活,但開發人員需要付出更多時間和成本。Ray 上的應用程序可以通過初始化 Ray 的 Actor 組之間的通信羣來利用這些優化後的 communication primitives。(類似 allreduce)
2.0 帶來的新特性
-
原本的 Global Control Store 改名叫 Global Control Service,簡稱 GCS,有着全新的設計更加簡單和可靠。
-
分佈式調度器(包括調度策略和置放羣組)能讓你更方便地擴展功能。
-
在可靠性和容錯性方面進行改進,包括從故障節點中恢復 object reconstruction 和 GCS 的容錯機制。
-
增加了像 KubeRay 等方便集羣管理的一些工具。
架構設計概述
主要概念解釋
Task
一個用於遠程調用的函數,在不同的調用方的進程上執行,也可以是在不同的機器上執行。Task 可以是無狀態的也可以是有狀態的(如 Actor)。
Object
應用所需的值,這些是任務返回的或者通過 ray.put
創建的值,這些對象是一旦創建就不可以修改的。可以通過 ObjectRef
引用。以下也可能被稱爲對象。
Actor
有狀態的工作進程,Actor 的任務必須使用特定的方式提交給指定的實例,可以在執行過程中修改 Actor 的內部狀態。你可以理解爲是一個常駐進程,或者是有狀態的 Task。
Driver
程序的 root 或者是主程序,一般指放在ray.init
的代碼的應用。
Job
來自同個 Driver 的 Task 和 Actor 的集合,Driver 和 Job 是 1:1 映射關係。是一個邏輯上的概念,其含義爲運行一次用戶側代碼所所涉及到的所有生成的 Task 以及產生的狀態的集合。
設計
協議概覽 (大部分通過 gRPC):
a. 任務執行,對象引用計數。
b. 本地資源管理。
c. 遠程 / 分佈式資源管理。
d. 分佈式對象傳輸。
e. 大型對象的存儲和檢索。檢索是通過 ray.get
或在任務執行過程中進行。或者用對象的值替換一個任務的 ObjectID 參數時。
f. 調度器從遠程節點獲取對象,以滿足本地排隊任務的依賴滿足。
組件
Ray 集羣是由一個或者多個 worker 節點組成,每個 worker 節點由以下物理進程組成:
-
一個或多個的 worker 進程,負責任務的提交和執行,worker 進程要麼是無狀態的,要麼是一個 actor。初始工作線程由機器的 CPU 數量決定。每個工作節點會存儲:
-
一個 ownership 表,worker 引用的對象的系統元數據,例如引用技術和對象位置。
-
進程內存儲,存放一些小對象。
-
raylet,用於管理每個節點上的共享資源,與工作進程不同的是,raylet 是在所有 worker 中共享的:
-
Scheduler,負責資源管理、任務放置和完成將 Task 的參數存儲在分佈式的 Object Store 中。
-
Object Store,一個共享內存存儲,也被稱爲 Plasma Object Store。負責存儲、轉移和溢出(spilling,如果 Object Store 滿了會移動到外部存儲)大型對象。集羣中各個 Object Store 共同構建了 Ray 的分佈式對象存儲。
每一個工作進程和 raylet 都被分配了一個唯一的 28-byte 的標識符和一個 ip 地址、端口。
同樣的地址和端口在工作進程死亡後重新恢復時可以重複使用,但是唯一 ID 不會。工作進程和 raylet 是 fate-share 的,一個出故障另外一個就無法使用了。
其中有個節點會被指定爲 Head 節點,除了有上述進程外還會託管 GCS 和 Driver。在新版本的 GCS 是一個管理集羣的元數據的服務器,比如 actor 的位置、worker 存儲的 key-value 對等。GCS 還管理少量的集羣幾筆的操作包括調度預佔用組和 actor 以及確定集羣中哪些是成員。一般來說 GCS 中保存的數據很少被調用,但是可以被集羣中幾乎所有的 worker 節點使用。GCS 容錯機制是在 v2 版本中加入的,它可以運行在任何節點或者多個節點,之前只能在指定的節點。
Dirver 是一個用於指定的用於運行最上級的代碼的應用的節點,它能提交任務但是並不能在自己上面執行。雖然 Driver 可以在任何節點上運行,但默認情況下只在 Head 節點運行。
Head 節點還包含了其它類似集羣級別服務的自動縮放、任務提交等等。
Ownership
大多數的系統是通過一種叫做 Ownership 的分散控制的方式管理的,這個方式是指每一個 ObjectRef
都是由所在的 worker 進程管理的,該 worker 或者也被叫做 owner 需要確保 Task 的執行、創建 value。
一般有兩種方式去創建 ObjectRef
,在下面兩個例子中,owner 都是實際運行的 worker 的進程。
-
x_ref = f.remote()
-
x_ref = ray.put()
換句話來說 owner 是生成和初始化 ObjectRef
的 worker,如果 ObjectRef
由 Task 返回,那麼這個值是由遠程 worker 創建的而不是拿到返回值的 worker。
在 2.0 版本中,這個方式帶來了更好的性能和更簡單的結構、提升了可靠性,每個 application 是相對獨立的,一個遠程調用故障了並不會影響另一個。
但 ownership 還是存在一些問題,像如果要解析 ObjectRef
,就必須能夠訪問對象的 owner,這意味着 object 和 owner 是 fate-share(一個掛掉,另一個一起掛掉)。其次是目前無法轉移所有權。
內存模型
Ray 通過以下方式使用內存:
-
Ray 的 worker 在執行任務或者運行 Actor 時會使用堆內存,由於 Ray 的 Task 和 Actor 一般是並行運行,開發人員應該關注每個 Task 的堆內存的情況。如果內存壓力過大,Ray 會自動釋放掉消耗內存大的進程。
-
當一個 worker 調用
ray.put()
或者從一個 Task 返回時,它會將提供的值複製到 Ray 的共享內存對象存儲中。然後 Ray 會讓這些對象在整個集羣中可訪問,在發生故障時嘗試恢復它們,如果對象存儲超過其配置的容量,則將它們轉移其它存儲設備,並在所有ObjectRef
超出範圍時將它們垃圾回收。對於可以被 zero-copy 的反序列化的值,會在取出時將指向共享緩衝區的指針給 worker,其它則是被反序列化到接收的 worker 的堆內存中。 -
如果 Object 足夠小(默認 100 kb),Ray 將直接把值存儲在 owner 的內存中,而不是在 Raylet 裏的共享對象存儲。任何其它使用這個對象的 worker 都會把值直接複製到自己的內存裏。同樣 Ray 也會對他們進行垃圾回收。
-
Ray 的元數據也會使用堆內存,大部分元數據都很小,可能就幾 kb。例如:
-
GCS 的所有 Actor、所有節點、所有的預佔用組集羣。
-
Raylet 的本地排隊的 Task、這些任務的對象參數、對象。
-
Worker 的提交了等待處理的任務或者可能需要重新通過 lineage reconstruction 執行的。擁有的對象等等。
語言運行時
所有 Ray 核心組件都在 C++ 中實現。Ray 通過一個稱爲 “core worker” 的 C++ 庫支持 Python、Java 和 C++ 前端。該庫實現了所有權表、進程內存儲,並管理與其他工作程序和 raylet 的 gRPC 通信。
Task 的生命週期
所有者需要能夠指定被提交來的任務並且將 ObjectRef
解析成一個普通的值。Driver 去 Raylet 中請求需要的值,將值和 Task A 都交給 Worker 1 運行,Driver 擁有 Task A 結果的所有權,而 Worker 1 有 TaskB 的所有權。
所有者可以將普通的 Python 對象作爲任務參數傳遞,如果參數傳遞的值很小,會直接將這個值從所有者的內存中複製到 Task 中,讓執行者可以直接引用。如果傳遞的參數很大,所有者會先通過 ray.put 放入共享對象存儲,然後將 ObjectRef
作爲參數傳遞。
Ray 會在每一次自動進行上面的流程,如果你喜歡兩個 Task 共用一個請顯式調用 put。
所有者也可以直接將其它的 ObjectRef
作爲任務參數傳遞,如果 ObjectRef
對應的值很小,會直接放到 Task 的 specification 中,否則傳遞 ObjectRef
。任務執行時會將 ObjectRef
解析成具體的值。
一旦所有任務依賴項就緒,所有者就從分佈式調度器請求資源來執行任務。分佈式調度器嘗試獲取資源,並通過分佈式內存將 Task 的 specification 中的任何 ObjectRef
參數獲取到本地節點。一旦資源和參數都可用,調度程序就會批准請求。
所有者通過 gRPC 將 Task 的 specification 發送給 worker 來調度。執行 Task 後 worker 必須存儲返回的值。如果返回的值很小會直接返回給所有者,如果很大會存到共享內存存儲將 ObjectRef
返回,允許所有者引用返回值而不需要先拿到本地節點。
當 Ray 的 Task 第一次被調用時,它會被存儲到 GCS 中,稍後會由被租用的 worker 獲取出函數的定義進行運行。
Task 可能在運行過程中可能會出現應用級錯誤(worker 進程仍然是活躍的狀態)或者是系統級錯誤(worker 進程已經死亡或者故障)中斷拋出。
默認情況下,由於應用程序級別錯誤而失敗的任務不會自動重試。異常被捕獲並存儲爲任務的返回值。在 2.0 中,用戶可以傳遞應用程序級異常的白名單,Ray 可以自動重試。由於系統級錯誤而失敗的任務會自動重試,你可以指定最多重試次數。
Object 的生命週期
對象是一個不可變的值,可以從 Ray 集羣中的任何位置存儲和引用。對象的所有者是通過提交創建任務或調用 ray.put 創建初始化 ObjectRef
的 worker。所有者負責管理對象的生存期。Ray 保證如果所有者活着,對象最終可能會被解析爲其值(或者在工作程序失敗的情況下拋出錯誤)。如果所有者已死亡,嘗試獲取對象的值將引發異常,即使仍然存在對象的物理副本。
每個工作程序存儲其擁有的對象的引用計數。僅在以下操作期間計算引用:
-
向任務傳遞
ObjectRef
或包含ObjectRef
作爲參數的對象。 -
從任務中返回
ObjectRef
或包含ObjectRef
的對象。
對象可以存儲在所有者的進程內存存儲或分佈式對象存儲中。進程內內存存儲是在所有者的堆上分配的,不強制限制存儲量。因爲 Ray 只存儲很小的對象。過多的小對象存儲在內存中可能會引起內存不足的問題而導致進程被結束。存儲在分佈式對象存儲的對象首先會存儲在共享內存存儲中,共享內存存儲默認是機器內存的 30%,在達到上限後轉移到本地磁盤上。
你可以通過 ray.get
將 ObjectRef
轉爲實際的值或者是將 ObjectRef
作爲參數傳遞,具體的執行者會自動解析。
如果出現系統級的故障,對象存儲在分佈式內存存儲中,並且該對象的所有副本都因 raylet 故障而丟失,則該對象就丟失了。Ray 會嘗試通過重建的方式去恢復這個對象,如果所有者進程也死亡了,則無法重建。
Actor 的生命週期
當在 Python 中創建 Actor 時,將構建一個特殊任務,稱爲 Actor 創建任務,該任務運行 Actor 的 Python 構造函數。創建的 worker 等待創建任務的所有依賴項就緒,類似於普通任務。一旦完成, worker 將向 GCS 異步註冊參與者。GCS 通過調度 Acotr 創建任務來創建 Actor。這與普通任務的調度類似,只是在 Actor 進程的生命週期內獲取其指定的資源。
同時,創建 actor 的 Python 調用立即返回一個 “actor handle” ,即使尚未安排 actor 創建任務,也可以使用該句柄。在參與者創建任務完成之前,不會調度將 actor handle 相關的任務。有關詳細信息,請參見 Actor 創作。
Actor 的執行與正常任務執行類似,主要有兩個區別:
-
默認情況下 Actor 已經不需要從調度器中獲取資源了,當被創建時就已經獲取了。
-
對於 Actor 的每個調用方,任務的執行順序與提交順序相同。
當 Actor 的創建者退出時且集羣中沒有其它還沒結束的 actor handle 時會自動被清理。當然你也可以顯式清理。
在某些情況下,可能不需要順序執行。爲了支持這樣的場景,Ray 還提供了一個選項,通過它可以使用事件循環併發運行任務,或者使用線程並行運行多個任務。從調用者的角度來看,向這些參與者提交任務與向常規參與者提交任務相同。唯一的區別是,當任務在參與者上運行時,它被髮布到後臺線程或線程池,而不是直接在主線程上運行。Ray API(如任務提交和 Ray.get)是線程安全的,但用戶需要負責其它部分的線程安全。
故障模型
系統模型
Ray 集羣中的任意一個非 Head 的節點的丟失是不影響集羣的,Head 節點託管了 GCS,但在 2.0 中,允許 GCS 重啓到其它節點來減少對集羣的干擾。
所有節點都被分配了一個唯一的標識符,並通過心跳相互通信。GCS 負責決定集羣的成員資格,即哪些節點當前處於活動狀態。GCS 將刪除任何超時的節點 ID,這意味着必須使用不同的節點 ID 在該節點上啓動新的 raylet,以便重新使用物理資源。如果一條仍然活躍的的 Ray 收到它已經超時,它就會退出。節點的故障檢測當前不處理網絡分區問題:如果從 GCS 分區了工作節點,它將超時並標記爲不可用。
每個 raylet 向 GCS 報告任何本地的 worker 進程的死亡情況。GCS 會廣播這些故障事件,並使用它們讓位置在故障節點的已註冊的 Actor 死亡。所有 worker 進程的命運都與其節點上的 raylet 共享。
raylets 負責防止 worker 工作進程失敗後集羣資源和系統狀態發生泄漏問題。對於失敗的 worker 進程(本地或遠程),每個 raylet 負責:
-
通過殺死任何故障的 worker 進程來釋放任務執行所需的集羣資源,如 CPU。故障的 worker 發出的任何未完成的資源請求也將被取消。
-
釋放該 worker 持有的在分佈式對象內存存儲的對象。
應用程序模型
系統故障模型意味着 Ray 裏面的任務和對象將與所有者共享命運。例如,如果在這種情況下運行 “a” 的 worker 失敗,那麼將收集在其子樹(圖中灰色的 “b” 和 “z”)中創建的所有對象和任務。如果 “b” 是在 “a” 的子樹中創建的 actor(參見 Actor 死亡),則同樣適用。
-
如果嘗試獲取這個發生了錯誤的對象的值,任何活動進程會收到應用級異常。如上圖, Driver 會在 get 結果時收到異常。
-
你可以通過讓不同的 Task 放到不同的子樹(調用嵌套的函數),故障可以彼此隔離。
-
應用程序與 Driver 是命運共享的,如果 Driver 掛了整個執行過程都會故障。
如果希望避免命運共享(fate-share),可以將它變爲獨立的 Actor 就不會受到 Driver 證明週期的影響,變爲 Actor 後只能通過顯式調用方法來銷燬它。
1.3 開始 Ray 可以通過將對象放到其它存儲空間來實現持久化,2.0 開始 Ray 默認會爲普通 Task 啓用對象重建。
作者介紹:大家好!我是 Andy.Qin,一個想創造哆啦 A 夢的 Maker,連續創業者。我最熟悉的領域是分佈式應用開發、高併發架構設計,其次是機器學習、自然語言處理和理解方向。我對開源社區和開源項目的建設也有極大的熱忱,期望能與大家多多交流討論!
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/WTBk8hmymL9kue7eKZf5SA