C-- 分佈式編程來了

引言


隨着互聯網數據的爆發式增長,互聯網業務的發展速度已經明顯高於計算機硬件的發展速度。在此背景下,我們可以看到僅靠單機系統能解決的問題越來越少,而越來越多的領域和應用場景需要構建分佈式系統。C++ 作爲 native 的編程語言,由於其高性能、輕量級的特點廣泛應用於現代分佈式系統中,如 Tensorflow、Caffe、XGboost、Redis 等都選擇 C/C++ 作爲主要的編程語言。

相比單機應用,想要構建一個功能完善、高可用、可應用於生產環境的 C++ 分佈式系統並沒有那麼簡單。通常我們需要做以下考慮:

看起來,整個實現中需要考慮的問題非常多,完成所有這些事情並不容易。那麼有沒有一個系統,能夠幫你解決以上所有的分佈式問題,讓你能夠專注在系統本身的邏輯上呢?隨着 Ray 的到來,理想即將成爲現實。

Ray 是什麼

整體介紹

Ray(https://github.com/ray-project/ray)是一個簡單、通用的分佈式計算框架。項目最初由加州大學伯克利分校 RISELab 發起並開源。在過去的幾年中,Ray 項目發展迅速,在螞蟻集團、Intel、微軟、AWS、Uber 等公司被廣泛應用於構建各種 AI、大數據系統,並且已經形成了較爲完善的生態圈(Tune、Rlib、Serve、分佈式 Scikit-learn、XGboost on Ray 等)。相比現有的大數據計算系統(Spark、Flink 等),Ray 最本質的特點和不同點在於,Ray 的設計沒有基於某種特定的計算範式(例如 DataStream、DataSet),而是迴歸編程本身,通過抽象編程中最基本的 Function 和 Class 等概念,構建了一套簡單易用的分佈式編程 API。所以從系統層次的角度看,Ray 的 API 更加底層,更加靈活。Ray 不會限制你的應用場景,無論是批處理、流計算、圖計算,還是機器學習、科學計算等,只要你的系統具有分佈式特性,需要利用多機協同完成一個特定的任務,你就可以選擇 Ray 幫你完成分佈式系統的構建。

C++ API

Ray 在創建初期只支持 Python API,2018 年中旬螞蟻集團開源了 Java API。本文介紹的 C++ API 是 Ray 上用戶接口的第三種語言實現。有人會問,已經有了 Python 語言和 Java 語言的支持,爲什麼還要開發 C++ 版本?我們開發 C++ API 的主要原因是,在某些高性能場景,Java 和 Python 在系統調優之後仍然無法滿足業務需求。除此之外,Ray 底層內核和組件本身就是純 C++ 實現,使用 C++ API 會讓用戶層和內核層無縫銜接,整個系統無語言間調用開銷。

下面簡要介紹一下 Ray C++ API 中幾個核心概念和使用方式。

Task

Ray 中的 Task 對應單機編程中的 function。通過 Ray 的 Task API,我們可以很容易地把任意一個 C++ 函數放到分佈式集羣中異步執行,大幅提高執行效率。

假設我們有一個耗時的 heavy_compute 函數,如果在單機環境中串行執行 10000 次,整體耗時很長:

int heavy_compute(int value) {
  return value;   
}
std::vector<int> results;
for(int i = 0; i < 10000; i++) {
  results.push_back(heavy_compute(i));
}

利用 Ray 將單機的 heavy_compute 改造成分佈式的 heavy_compute:

// 聲明heavy_compute爲remote function
RAY_REMOTE(heavy_compute);
std::vector<ray::ObjectRef<int>> results;
for(int i = 0; i < 10000; i++) {
  // 利用ray::Task調用遠程函數,它們被Ray自動調度到集羣的節點上實現分佈式計算
  results.push_back(ray::Task(heavy_compute).Remote(i));
}
// 獲取分佈式計算的結果
for(auto result : ray::Get(results)) {
  std::cout<< *result << std::endl;   
}

Actor

普通的 Task 是一種無狀態的計算,如果想實現有狀態的計算,需要使用 Actor。

Actor 對應單機編程中的 Class。基於 Ray 強大的分佈式能力,我們可以將以下的單機 Counter 改造成部署在遠程節點的分佈式 Counter。

class Counter {
  int count;
public:
  Counter(int init) { count = init; }
  int Add(int x) { return x + 1; }
};
Counter *CreateCounter(int init) {
  return new Counter(init);   
}
RAY_REMOTE(CreateCounter, &Counter::Add);
// 創建一個Counter actor
ActorHandle<Counter> actor = ray::Actor(CreateCounter).Remote(0);
// 調用actor的遠程函數
auto result = actor.Task(&Counter::Add).Remote(1);
EXPECT_EQ(1, *(ray::Get(result)));

Object

在以上 Task 和 Actor 的例子中,我們注意到,最後都是使用 “ray::Get” 獲取計算結果。這裏繞不開的一個概念就是 Object。每次調用 “Remote” 方法返回的是一個 Object 引用(ObjectRef),每個 ObjectRef 指向一個集羣內唯一的遠程 Object。Ray 的 Object 類似異步編程中常見的 future 概念。在 Ray 中,Object 會存儲在底層的分佈式 Object Store 中(基於 shared memory)。當你調用 “ray::Get” 方法獲取一個 Object 時,會從遠程節點拉取數據到當前節點,經過反序列化返回到你的程序中。

除了存儲應用的中間計算結果,你也可以通過 “ray::Put” 創建一個 Object。除此之外,你也可以通過 “ray::Wait” 接口等待一組 Object 的結果。

// 把一個object Put到object store中
auto obj_ref1 = ray::Put(100);
// 從object store獲取數據
auto res1 = obj_ref1.Get();
//或者調用ray::Get(obj_ref1)
EXPECT_EQ(100, *res1);
// 等待多個object ready
auto obj_ref2 = ray::Put(200);
ray::Wait({obj_ref1, obj_ref2}, /*num_results=*/1, /*timeout_ms=*/1000);

用 Ray C++ 實現分佈式存儲系統

demo 說明

在這個簡單的 KV 存儲系統中,有一個 main server 和一個 backup server,僅 main server 提供服務,backup server 則只用於備份數據,不提供服務。同時要求系統具備自動故障恢復能力,即任意一個 server 重啓後數據不會丟失,能繼續提供服務。

:這只是一個 demo,不專注存儲本身的邏輯和優化,目的是使用盡可能簡單的代碼來展示如何用 Ray 來快速開發一個分佈式存儲系統。完整代碼請在文末點擊 “閱讀原文” 查看。

Server 實現

用 C++ API 實現一個分佈式存儲比較簡單,用戶可以先按照單機版的 KV store 的思路去寫 server,稍後再利用 Ray 的分佈式部署和調度能力將單機版的 KV store 變成一個分佈式的 KV store。

main server

class MainServer {
 public:
  MainServer();
  std::pair<bool, std::string> Get(const std::string &key);
  void Put(const std::string &key, const std::string &val);
 private:
  std::unordered_map<std::string, std::string> data_;
};
std::pair<bool, std::string> MainServer::Get(const std::string &key) {
  auto it = data_.find(key);
  if (it == data_.end()) {
    return std::pair<bool, std::string>{};
  }
  return {true, it->second};
}
void MainServer::Put(const std::string &key, const std::string &val) {
  // 先將數據同步到backup server
  ...
  // 再更新本地kv
  data_[key] = val;
}

可以看到分佈式 KV store 的讀數據 (MainServer::Get) 的實現和單機版的實現相比沒有任何差異,用戶不必關心分佈式的細節,關注業務邏輯本身即可。Put 時要注意先將數據同步寫到 backup server 中再寫本地,確保數據的一致性。

backup server

class BackupServer {
 public:
  BackupServer();
  // 當main server重啓時會調用GetAllData做數據恢復
  std::unordered_map<std::string, std::string> GetAllData() {
      return data_;
  }
  // 當main server寫數據時會調用SyncData將數據先同步到buckup server
  void SyncData(const std::string &key, const std::string &val) {
      data_[key] = val;
  }
 private:
  std::unordered_map<std::string, std::string> data_;
};

部署

集羣部署

部署應用之前需要首先部署一個 Ray 集羣。目前 Ray 已經支持在多個主流雲平臺進行一鍵部署,如 AWS、Azure、GCP、Aliyun 和 Kubernetes 環境等。如果你已經擁有了一個配置文件,可以在安裝 Ray 之後通過命令行進行一鍵部署:

ray up -y config.yaml

具體如何配置可以參考官方文檔。

另外一種選擇,如果你擁有正在運行的服務器,也可以通過在各服務器上執行 start 命令手動組建 Ray 集羣:

ray start --head
ray start --address=${HRAD_ADDRESS}

actor 部署

Ray 集羣部署之後,我們需要將前面創建的 MainServer 和 BackupServer 這兩個 actor 實例部署到集羣中,以提供分佈式存儲服務。用 Ray 創建 Actor 的 API 就可以很簡單的實現 actor 部署。

static MainServer *CreateMainServer() { return new MainServer(); }
static BackupServer *CreateBackupServer() { return new BackupServer(); }
// 聲明remote function
RAY_REMOTE(CreateMainServer, CreateBackupServer);
const std::string MAIN_SERVER_NAME = "main_actor";
const std::string BACKUP_SERVER_NAME = "backup_actor";
// 通過ray::Actor將actor的實例部署到Ray集羣中
void StartServer() {
  ray::Actor(CreateMainServer)
      .SetName(MAIN_SERVER_NAME)
      .Remote();
  ray::Actor(CreateBackupServer)
      .SetName(BACKUP_SERVER_NAME)
      .Remote();
}

調度

設置資源

如果你對 actor 運行環境的硬件有特殊要求,還可以通過 API 設置 actor 所需要的資源,比如 CPU,內存等資源。

// 所需資源cpu:1, 內存1G
const std::unordered_map<std::string, double> RESOUECES{
    {"CPU", 1.0}, {"memory", 1024.0 * 1024.0 * 1024.0}};
// 通過ray::Actor將actor的實例部署到Ray集羣中並設置資源需求
void StartServer() {
  ray::Actor(CreateMainServer)
      .SetName(MAIN_SERVER_NAME)
      .SetResources(RESOUECES) //設置資源
      .Remote();
  ray::Actor(CreateBackupServer)
      .SetName(BACKUP_SERVER_NAME)
      .SetResources(RESOUECES) //設置資源
      .Remote();
}

設置調度策略

我們希望將 main server 和 backup server 兩個 actor 調度到不同的節點上,以保證某一個節點掛掉不會同時影響兩個 server。可以利用 Ray 的 Placement Group 實現這種特殊的調度功能。

Placement Group 允許用戶從集羣中預置一部分資源供 Task 和 Actor 調度。

ray::PlacementGroup CreateSimplePlacementGroup(const std::string &name) {
  // 設置Placement Group的資源
  std::vector<std::unordered_map<std::string, double>> bundles{RESOUECES, RESOUECES};
  // 創建Placement Group並設置調度策略爲平鋪調度(SPREAD)
  ray::PlacementGroupCreationOptions options{
      false, name, bundles, ray::PlacementStrategy::SPREAD};
  return ray::CreatePlacementGroup(options);
}
auto placement_group = CreateSimplePlacementGroup("my_placement_group");
assert(placement_group.Wait(10));

上面的代碼創建了一個 Placement Group,調度策略爲平鋪 (SPREAD), 平鋪調度的含義是將 actor 以平鋪的方式調度到不同的節點上。更多的調度策略可以參考 Ray 的官方文檔 https://docs.ray.io/en/master/placement-group.html。

接下來就可以通過 Placement Group 將 actor 調度到不同的節點上了。

// 通過ray::Actor將actor的實例調度到Ray集羣不同的節點上
void StartServer() {
  // 調度main server
  ray::Actor(CreateMainServer)
      .SetName(MAIN_SERVER_NAME)
      .SetResources(RESOUECES)
      .SetPlacementGroup(placement_group, 0) //調度到某個節點上
      .Remote();
  // 調度backup server
  ray::Actor(CreateBackupServer)
      .SetName(BACKUP_SERVER_NAME)
      .SetResources(RESOUECES)
      .SetPlacementGroup(placement_group, 1) //調度到某個節點上
      .Remote();
}

服務發現和組件通信

現在我們已經把 main server 和 backup server 兩個 actor 實例部署到 Ray 集羣中的兩個節點上了,接下來需要解決 main server 的服務發現問題和 client-server 的通信問題。

Ray 的 named actor 可以很方便的實現服務發現,我們在創建 actor 的時候設置了 actor 的名字,後續就可以通過 ray::GetActor(name) 來發現之前創建的 actor 了。

Ray Task 則可以解決 client-server 之間的通信問題,就像調用本地函數一樣實現遠程函數調用,用戶無需關心數據通信的細節(如傳輸協議、網絡通信等)。

class Client {
 public:
  Client() {
    // main server服務發現
    main_actor_ = ray::GetActor<MainServer>(MAIN_SERVER_NAME);
  }
  bool Put(const std::string &key, const std::string &val) {
    // 利用Ray Task調用遠端main server的Put函數
    (*main_actor_).Task(&MainServer::Put).Remote(key, val).Get();
    return true;
  }
  std::pair<bool, std::string> Get(const std::string &key) {
    // 利用Ray Task調用遠端main server的Get函數
    return *(*main_actor_).Task(&MainServer::Get).Remote(key).Get();
  }
 private:
  boost::optional<ray::ActorHandle<MainServer>> main_actor_;
};

故障恢復

進程故障恢復

Ray 提供了進程故障恢復的功能,比如 actor 進程掛掉之後 Ray 會自動將 actor 進程拉起來,並重新創建 actor 實例,只需要設置 actor 的最大重啓次數即可。

// 通過ray::Actor將actor的實例調度到Ray集羣不同的節點上並設置最大重啓次數
void StartServer() {
  ray::Actor(CreateMainServer)
      .SetName("main_actor")
      .SetResources(RESOUECES)
      .SetPlacementGroup(placement_group, 0)
      .SetMaxRestarts(1) //設置最大重啓次數讓Ray做自動故障恢復
      .Remote();
  ray::Actor(CreateBackupServer)
      .SetName("backup_actor")
      .SetResources(RESOUECES)
      .SetPlacementGroup(placement_group, 1)
      .SetMaxRestarts(1) //設置最大重啓次數讓Ray做自動故障恢復
      .Remote();
}

狀態恢復

雖然 Ray 會做進程故障恢復,重新創建 actor 實例,但是 actor 的運行狀態需要用戶去做狀態恢復處理,比如 main actor 掛掉之後重新拉起來,之前在 main actor 內存中的數據都會丟失,需要做數據恢復。

MainServer::MainServer() {
  // 如果當前實例是重啓的則做故障處理
  if (ray::WasCurrentActorRestarted()) {
    HandleFailover();
  }
}
void MainServer::HandleFailover() {
  backup_actor_ = *ray::GetActor<BackupServer>(BACKUP_SERVER_NAME);
  // 從backup server拉取所有數據
  data_ = *backup_actor_.Task(&BackupServer::GetAllData).Remote().Get();
  RAYLOG(INFO) << "MainServer get all data from BackupServer";
}

main server 的 failover 是在構造函數里面做的,Ray 提供了判斷 actor 實例是否重啓的 API。在 actor 的構造函數中如果發現是重啓的實例則做數據恢復的處理,具體方法就是從 backup server 拉取所有數據。backup server 的 failover 處理也是類似的, 不再贅述。

運維與監控

Ray 提供了一套簡單的運維和監控系統 (Dashboard) 讓我們可以實時查看系統的運行情況。

以上面的 KV store 爲例,我們可以看到 actor 列表、node 列表和運行日誌等信息,並且可以捕捉到一些異常的 events。

actor 列表

node 列表

運行日誌

異常 events 透出

如何快速開始

Ray 目前已經完成了 1.7.0 版本的 release,C++ API 作爲其中一個 highlight 功能正式集成到 wheel 包發佈,詳見發佈記錄 https://github.com/ray-project/ray/releases/tag/ray-1.7.0。

考慮到 Ray 內核的實現是多語言的,跑 C++ 應用需要同時具備 Python 環境和 C++ 環境,我們將 Ray 整體打包成 wheel 用 pip 進行管理。你可以通過以下方式快速獲取一個 Ray C++ 模版工程。

環境要求:Linux 系統或 macOS, Python 3.6-3.9 版本,C++17 環境,bazel 3.4 以上版本(可選,模版工程基於 bazel)。

pip install -U ray[cpp]
mkdir ray-template && ray cpp --generate-bazel-project-template-to ray-template
cd ray-template && sh run.sh

以上運行方式會在跑 example 的過程中在本地拉起 Ray 集羣,example 運行結束後自動關閉 Ray 集羣。如果你想讓應用連接已有的 Ray 集羣,可以按如下方式啓動:

ray start --head
RAY_ADDRESS=127.0.0.1:6379 sh run.sh

測試結束後可以通過 stop 命令關閉 ray 集羣,避免殘留進程:

ray stop

現在,你可以開始基於模版工程開發自己的 C++ 分佈式系統了!

總結

本文通過一個存儲系統的例子介紹瞭如何利用 Ray C++ API 構建分佈式系統,整個 demo 代碼不過 200 多行,卻同時解決了部署、調度、通信、故障恢復和運維監控等問題。我們可以發現,Ray 致力於解決分佈式系統的通用問題,不限制你的計算範式和應用場景,無論是針對新建應用還是已有的應用,都可以通過 Ray 快速升級爲強大的分佈式系統。

瞭解更多細節,請閱讀 Ray 官方文檔 https://docs.ray.io/en/master/index.html。

貢獻代碼,可以直接提 PR 到 https://github.com/ray-project/ray,或者給我們提 issue。

您也可以通過 slack 聯繫我們,加入 Ray 的 channel。

微信公衆號:搜索並關注 “Ray 中文社區”。

關於我們

我們是螞蟻計算智能技術部團隊,橫跨美國硅谷、中國北京、上海、杭州和成都。我們追求的工程師文化是開放、簡單、迭代、追求效率、用技術解決問題!熱情相邀加入我們!!

請聯繫我們的郵箱:antcomputing@antgroup.com

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/eU4hL3OFv4fj-cOrbdywbw