從無棧協程到 C-- 異步框架

作者:fangshen,騰訊 IEG 遊戲客戶端開發工程師

導語

本文我們將嘗試對整個 C++ 的協程做深入淺出的剝析, 方便大家的理解. 再結合上層的封裝, 最終給出一個 C++ 異步框架實際業務使用的一種形態, 方便大家更好的在實際項目中應用無棧協程。

1. 淺談協程

在開始展開協程前, 我們先來看一下一些非 C++ 語言中的協程實現.

1.1 其他語言中的協程實現

很多語言裏面, 協程是作爲 "一類公民" 直接加入到語言特性中的, 比如:

1.1.1 Dart1.9 示例代碼
Future<int> getPage(t) async {
 var c = new http.Client();
 try {
  var r = await c.get('http://xxx');
  print(r);
  return r.length();
 } finally {
  await c.close();
 }
}
1.1.2 Python 示例代碼
async def abinary(n):
  if n <= 0:
    return 1
  l = await abinary(n-1)
  r = await abinary(n-1)
  return l + 1 + r
1.1.3 C# 示例代碼
aysnc Task<string> WaitAsync()
{
 await Task.Delay(10000);
 return "Finished";
}
1.1.4 小結

衆多語言都實現了自己的協程機制, 通過上面的例子, 我們也能看到, 相關的機制使函數的執行特殊化了, 變成了可以多次中斷和重入的結構. 那麼如果 C++ 要支持這種機制, 會是一個什麼情況呢? 接下來我們將先從最基本的原理逐步展開相關的探討.

1.2 從操作系統的調度說起

我們接觸的主流的操作系統, 如 Windows, 或者 Linux, 或者 MacOS, 都是搶佔式多任務的操作系統, 所以大家對搶佔式多任務的操作系統會比較熟悉. 相關的概念就是 進程 -> 線程 這些, 基本上各種語言通過操作系統提供的 Api, 都能直接獲取操作系統提供的這些能力了.   其實操作系統按任務的調度方式來區分, 有以下兩種模式:

  1. 協作式多任務操作系統

  2. 搶佔式多任務操作系統

搶佔式多任務操作系統我們剛剛說過了, 而協程本身的特性, 跟協作式多任務操作系統所提供的機制基本一致, 對於每個 Task, 我們可以多次的中斷和繼續執行, 說到這裏, 熟悉 Dos 開發的同學肯定就會想到 "INT 21H" 了, 這個其實就是我們早期利用相關機制來實現多任務協同目的的一種方式了, 我們也可以看成這是協程最早的雛形.

聊到中斷, 其中比較重要的就是執行環境的保存和恢復了, 而上下文的保存能力可以是操作系統直接提供的, 也可以是程序機制自身所提供的了, 綜上所述, 我們大致可以將 c++ 中的協程的實現方案的迭代看成如下情況:

1.3 協程的執行簡介

瞭解了協程在 C++ 中的部分歷史, 我們來簡單瞭解一下協程的執行機制, 這裏我們直接以 C++20 爲例, 先來看一下概覽圖:

關於協程的執行, 我們主要關注以下這些地方:

1.3.1 中斷點和重入點的定義

有棧協程和無棧協程定義中斷點和重入點的方式和機制略有差異, 執行到中斷點和重入點的時候大家使用的保存和恢復機制不太一樣, 但以 Host App 的視角來看, 整體的執行過程其實是比較一致的.

這裏我們是以 C++20 的無棧協程來舉例的, 通過圖中的關鍵字co_await, 我們定義了 point1 和 point2 兩個成對的中斷點和重入點.

我們來看一下協程執行到中斷點和重入點的時候具體發生的事情: 中斷點: 協程中斷執行的時候, 我們需要對當前的執行狀態:

重入點: 重入點是由中斷點帶出來的概念, 既然函數的執行能夠被中斷 (suspend), 那我們肯定也需要提供機制相關的機制恢復協程的執行了, 在複雜執行的時候, 我們需要對協程保存的執行狀態進行恢復:

整個協程的執行區別於普通函數的單次執行返回結果, 一般都會有多次的中斷與重入, 直到協程執行完成或者被外界強行中止.

而有棧協程和無棧協程的實現, 差異最大的地方就是如下兩點了:

  1. 怎麼保存和恢復當前的執行位置

  2. 怎麼保存和恢復當前協程引用到的內存 (變量等) 本篇主要側重無棧協程, 無棧協程相關的機制後續會具體展開. 對有棧協程相關機制感興趣的可以翻閱 libco 或 boost.context 相關的內容進行了解.

1.4 小議無棧協程的出現

其實之前介紹 C++ 協程歷史的時候, 我們有一個問題沒有展開, 爲啥有了像 libco, 與 boost.context 這樣的高性能有棧協程實現機制後, 標準委員會還會繼續尋求無棧協程的解決方案, 並最終將其作爲 C++ 協程的實現機制呢, 這裏分析主要的原因是爲了解決有棧協程天然存在的限制:

而無棧協程解決這些問題的方式也非常直接, 既然棧會導致問題, 那麼我們就直接去除對棧的依賴, 通過其他方式來解決數據存儲訪問的問題.

目前主要的方案是如下兩種:

1.4.1 Duff Device Hack 實現

我們後面介紹的 C++17 的實現就是基於這種方案, 因爲僅僅是框架級的實現, 我們能夠使用的實現方式會受到限制, 方案本身存在如棧變量的使用有嚴格的限制等問題, 但對於一些特殊的場合, 如基於寄存器實現的 lua vm, 這種方式會比較契合.

1.4.2 C++20 的 Coroutine

通過後面的分析, 我們其實會發現這與 Duff Device Hack 實現是一脈相承的, 只是通過 compiler 的配合, 像棧變量的自動處理等機制, 保證了用戶可以低心智負擔的使用它. 但同時, 相對其他語言的實現, 因爲相關特性的設計是 "面向庫作者的實現", 實際使用基本都需要二次封裝, 也就帶來了社區很多負面的聲音.

1.5 小結

前面我們對 C++ 中協程的歷史做了簡單的鋪墊, 接下來我們將對 C++17 中基於 Duff Device Hack 的無棧協程實現, 以及 C++20 中的無棧協程做更深入的介紹.

2. C++17 Stackless Coroutine 實現

  在異步操作比較多的情況下, 我們就考慮用協程來取代原來的 Callback 設計. 但當時的 GCC 用的是 8.3 版本, 並不支持 coroutine20, 所以我們最終採用的是一個基於 C++17 的無棧協程實現方案, 也就是使用前面介紹的 Duff Device Hack 方式實現的無棧協程. 我們先來看下當時的項目背景.

2.1 項目的背景介紹

   當時的情況也比較簡單, R 工作室內有多個項目處於預研的狀態, 所以大家打算協同共建一個工作室內的後臺 C++ Framework, 用於工作室內幾個預研項目中. 其中比較重要的一部分就是協程了, 當時引入協程的方式和目的都比較直接, 首先是使用 Duff Device Hack 的機制來實現整個無棧協程. 另外就是整個核心目標是希望通過引入協程和相關的調度器來幫助簡化多節點的異步編程支持. 整個框架包含的幾大部分如下圖所示, Coroutine 機制以及相關的 Scheduler 封裝是在 app_service 中作爲 C++ 微服務的基礎設施存在的.

實際使用下來, 協程和調度器主要帶來了以下這些優點:

2.2 爲何從 C++17 說起

我們爲什麼先從 C++17 的無棧協程開始介紹, 這是因爲 C++17 的實現與 20 的實現一脈相承. 如果我們分析 C++ 20 通過 Compiler 加工後的代碼, 就會發現這點. 相比於 C++20 協程大量的細節隱藏在 Compiler 的處理中 (當然我們後面也會介紹怎麼查看 Compiler 處理的這部分邏輯), C++17 的方案, 整個組織都在我們自己的代碼層面, 用於理解無棧協程的整體實現顯然是更合適的. 另外, 相關的調度器的實現, 與 C++17 和 C++20 都是兼容的, 像我們項目當時的實現, 是可以很好的做到 C++20 與 C++17 的協程混用的, 也樣也方便在過渡階段, 項目可以更平滑的從 C++17 向 C++20 遷移. 另外, 對於一些不支持 C++20 的受限使用場景, C++17 依然具有它的實用性.

2.3 實現概述

  我們先來看一下整個機制的概覽圖:

從上圖中我們能夠了解到, 整個基於 Duff Device Hack 的無棧協程實現的方式. 首先我們通過 CoPromise 對象來保存用作協程的 std::function 對象, 另外我們也會記錄協程當前的執行狀態, 其次, 我們還會在 CoPromise 中內置一個 std::tuple<> 用於保存我們需要在協程掛起和恢復時保存的狀態值.

另外, 整個核心的執行機制是依賴於幾個核心宏所組成的 switch case 狀態機來驅動的. 結合上特殊的 LINE 宏, 我們可以在每個 co_await() 對象調用的時候, 設置 CoPromise 對象當前的執行狀態爲 LINE**, 而下次跳轉的時候, 通過 switch(state) 就能正確跳轉到上次執行中斷的地方繼續往下執行了. 當然, 我們會看到我們的case **LINE**其實被穿插到了do{ } while(0)中間, 這個其實就利用到了 duff device 特性, 允許你通過 case 快速的跳轉到 for 循環或者 while 循環的內部, C 語言一個很特殊的特性. 利用這點, 首先我們可以完成 **co_awiat() 宏的封裝, 其次, 我們也能在邏輯代碼的 for 循環以及 while 循環中, 正確的應用 co_await(), 所以說 Duff Device 特性對於整個機制來說, 還是比較關鍵的.

如上例中所述的 Test Code 代碼, co_begin() 和 co_end() 展開後構成了 switch() {} 的開始和結束部分, 而中間我們加入的__co_await() 宏, 則會展開成用於完成中斷點和重入點的 case 邏輯, 整體的封裝還是很巧妙的.

2.4 執行流程概述

整體的執行流程通過上面的分析我們也能比較簡單的整理出來:

  1. 宏展開形成一個跨越協程函數首尾的大的 swith case 狀態機

  2. 協程執行時構建新的 CoPromise 對象, 正確的處理輸入參數, 輸入參數會被存儲在 CoPromise 對象的 std::tuple<> 上, 並且每次重入時作爲函數的入口參數以引用的方式轉入函數內部

  3. 每次 Resume() 時根據當前 CoPromise 記錄的 state, 跳轉到正確的 case label 繼續往下執行.

  4. 執行到下一個掛起點返回控制權到調度器

  5. 重複 3,4 直到執行結束.

從整體機制上, 我們也能簡單看到 C++17 對應實現的一些限制:

2.5 另外一個示例代碼

mScheduler.CreateTask([](int& c, LocalStruct& locals) -> logic::CoTaskForScheduler {
  rco_begin();
  {
    locals.local_i = 1024;
    auto* task = rco_self_task();
    printf("step1 %d\n", locals.local_i);
  }
  rco_yield_next_frame();
  {
    c = 0;
    while(c < 5) {
      printf("in while loop c = %d\n", c);
      rco_yield_sleep(1000);
      c++;
    }
    rco_yield_next_frame();
  }
  rco_end();
}, 3, LocalStruct{});

從上例可以看出, 雖然存在上一節中我們提到的一些限制, 依照設定的規則進行編碼實現, 整體使用還是比較簡單易懂的. 上面的 rco_yield_next_frame() 和 rco_yield_sleep() 是利用 Scheduler 的調度能力封裝出來的掛起到下一幀繼續恢復執行和休眠這兩個異步操作語義.

2.6 繞開棧變量限制的方法

提到棧變量的限制, 肯定有同學會想到, 是否有方法繞開棧變量的限制, 用一種更靈活的方式處理協程中臨時值的存取, 使其在跨越中斷點和重入點的情況依然有效?

答案是肯定的. 因爲我們有明確的與協程關聯的狀態存儲對象 CoPromise, 所以如果框架中有實現反射或者適應任意類型值存取的類型擦除機制, 我們當然能夠很簡單的對原有的實現進行擴展.

在 rstudio 的框架實現中, 我們通過在 CoPromise 對象上多存儲一個額外的std::map<std::string, reflection::Value>的成員, 再配合適當的包裝, 就很容易實現如下示例代碼所展示的功能了:

rco_begin();
{
  rco_set_value("id", 35567);
}
rco_yield_next_frame();
{
  {
    int64_t& val = rco_ref_value("id", int64_t);
    val = 5;
  }
  locals.local_i = rco_to_value("id", int);
}
rco_end();

通過額外擴展的rco_set_value(), rco_ref_value(), rco_to_value(), 我們即完成了一個比較簡單易用的通過 name 對各類型值進行存取的實現, 當然, 實際操作的其實都是在 CoPromise 上存儲的std::map<std::string, reflection::Value>成員.   這塊是反射的一個簡單應用, 關於類型擦除的細節, 與本篇關聯不大, 這裏不進行具體的展開了.

2.7 一個內部項目中後臺切場景的代碼示例

本章的結尾我們以一個具體的業務實例作爲參考, 方便大家瞭解相關的實現在具體業務中的大致工作情形.

一個原來參與的項目的後臺服務器是多節點的設計, 對於切場景來說, 需要訪問多個節點來完成相關的操作, 大致的切場景時序圖如下所示:

刪減細節代碼之後的主要異步代碼如下圖所示:

rco_begin();
{
    locals.clientReq = req;
    locals.session = CServerUtil::GetSessionObj(sessionId);
 // ...
    SSTbusppInstanceKey emptyInstKey;
    emptyInstKey.Init();
    if (locals.session->GetTargetGameSvrID() != emptyInstKey) {
        // ...
        rco_await(locals.gameSceneService->CheckChangeScene(locals.playerId, locals.checkChangeSceneReq));
        // ...
        // 保存大世界信息
        // ...
        rco_await(locals.gameSceneService->ExitMainland(locals.playerId, locals.exitMainlandReq));
        // ...
    }
    auto gameMgrClient = GServer->GetRpcClient(TbusppInstanceKey{TBUSPP_SERVER_GAMEMGRSVR, ""});
    locals.gameMgrService = rstudio::rpc_proxy::GameMgrService_Proxy::Create(gameMgrClient, GServer->GetRpcScheduler());
 // ...
    LOG_DEBUG(locals.playerId, "[CHANGE SCENE] ready to Queryline group");
}
rco_await(locals.gameMgrService->QueryMainland2(locals.playerId, locals.querySpaceReq));
{
    // ...
    rco_await(locals.gameSceneService->ChangeMainland(locals.playerId, locals.localInstanceKey, locals.changeMainlandReq));
    // ...
}
// ...
LOG_DEBUG(locals.playerId, "[CHANGE SCENE] send change mainland_conf");
rco_emit_finish_event(rstudio::logic::CoRpcFinishEvent(rstudio::reflection::Value(locals.clientRes)));

rco_return;
rco_end();

通過 rco_await() 發起的多個異步 Rpc 調用, 我們很好的完成了上述時序圖對應的邏輯功能實現.

Rpc 相關的協程化封裝在 C++20 中會有個相關的示例, 此處就不重複展開 C++17 的實現了.

3. C++20 Coroutine 機制簡介

瞭解了 C++17 的 Stackless Coroutine 實現機制後, 我們接着來看一下 C++20 Coroutine 的實現. 首先我們先來通過核心對象概覽圖來簡單瞭解一下 C++20 Coroutine:

如圖所示, C++ Coroutine20 的核心對象有如下這些:

  1. Function Body: 通常普通函數添加 co_await 等協程關鍵字處理返回值就可以作爲一個協程函數.

  2. coroutine_handle<>: 對協程的生命週期進行控制.

  3. promise_type: 異常處理, 結果接收, 同時也可以對協程部分行爲進行配置, 如協程初始化時的狀態, 結束時的狀態等.

  4. Awaitable 對象: 業務側的中斷重入點定義和數據傳輸定製點, 結合 co_await 關鍵字, 我們就能借助 compiler 實現正確的中斷, 重入語義了.

從圖上也能看到, 對比其它語言較精簡的 Coroutine 實現, C++20 這套實現, 還是偏複雜的, 這也是我們常調侃的 "庫作者向" 實現, 雖然整體使用很靈活, 也能跟泛型很好的搭配, 但我們還是需要在框架層做大量的包裝, 同時業務一般需要一個地方對應用中所有的協程做管理, 方便監控應用的整體運行情況等, 這也使得 C++ 這套特性沒法很簡單的直接在業務側進行使用, 後續我們講到 Coroutine Scheduler 的時候會進一步展開相關的內容.

此處我們只需要對 Coroutine 的核心對象的構成和作用有個簡單的認知, 接下來我們會結合相關的示例代碼來深入瞭解 C++20 Coroutine 的整體運作機制, 瞭解更多細節.

4. 結合代碼理解 Coroutine

4.1 一個簡單的示例 - 並不簡單

#include <iostream>
#include <coroutine>

using namespace std;

struct resumable_thing
{
  struct promise_type
  {
    resumable_thing get_return_object()
    {
      return resumable_thing(coroutine_handle<promise_type>::from_promise(*this));
    }
    auto initial_suspend() { return suspend_never{}; }
    auto final_suspend() noexcept { return suspend_never{}; }
    void return_void() {}

    void unhandled_exception() {}
  };
  coroutine_handle<promise_type> _coroutine = nullptr;
  resumable_thing() = default;
  resumable_thing(resumable_thing const&) = delete;
  resumable_thing& operator=(resumable_thing const&) = delete;
  resumable_thing(resumable_thing&& other)
    : _coroutine(other._coroutine) {
      other._coroutine = nullptr;
    }
  resumable_thing& operator = (resumable_thing&& other) {
    if (&other != this) {
      _coroutine = other._coroutine;
      other._coroutine = nullptr;
    }
  }
  explicit resumable_thing(coroutine_handle<promise_type> coroutine) : _coroutine(coroutine)
  {
  }
  ~resumable_thing()
  {
    if (_coroutine) { _coroutine.destroy(); }
  }
  void resume() { _coroutine.resume(); }
};

resumable_thing counter() {
  cout << "counter: called\n";
  for (unsigned i = 1; ; i++)
  {
    co_await std::suspend_always{};
    cout << "counter:: resumed (#" << i << ")\n";
  }
}

int main()
{
  cout << "main:    calling counter\n";
  resumable_thing the_counter = counter();
  cout << "main:    resuming counter\n";
  the_counter.resume();
  the_counter.resume();
  the_counter.resume();
  the_counter.resume();
  the_counter.resume();
  cout << "main:    done\n";
  return 0;
}

從上面的代碼我們也能看出, 雖然協程函數 counter()的定義是簡單的, 使用也是簡單的, 但其實包含promise_type定義的resumable_thing的定義並不簡單, 相比其他語言, C++ 的使用明顯複雜很多.

相關代碼的輸出如下:

main:    calling counter
counter: called
main:    resuming counter
counter: resumed (#1)
counter: resumed (#2)
counter: resumed (#3)
counter: resumed (#4)
counter: resumed (#5)
main:    done

4.2 Coroutine20 的實現猜想

前面我們說過, C++17 下對應的實現機制大致如下:

那麼對於 C++20 來說, 它的整體運作機制又是什麼樣子的呢? 顯然, 我們從示例代碼和前面簡單介紹的核心對象, 並不能推導出它的運作機制, 編譯器幫我們做了很多額外的處理, 這也導致我們沒有辦法直接從代碼理解它實際的執行情況.

這其實也是 C++20 Coroutine 使用的一大難點, 除了前文提到的, 特性通過 Awaitable 定製點開放給你的地方, 整體的運作機制, 我們是很難直接得出的. 另外, 在一些多線程協程混用的複雜情況下, 整體運作機制對於我們實現正確的框架, 正確的分析解決碰到的問題至關重要. 那麼我們現在的問題就變成了, 怎麼去補全出包含編譯器處理的整體代碼?

4.3 藉助 "cppinsights"

因爲 C++ 各種複雜的 compiler 處理機制, 已經有相關的 compiler 預處理分析的工具被開發出來了, 我們這裏用的是一個叫 cppinsights 的工具, 這是一個基於 web 的工具, 所以我們打開網頁即可使用它, 網址是 cppinsights.io 工具的截圖如下:

cppinsights本身是基於 clang 的, 提供了多種 clang compiler 預處理信息的查看, 比如我們現在需要用到的 coroutine transformation:

對於前面的示例代碼, 我們通過cppinsights處理後生成的代碼如下:

/*************************************************************************************
 * NOTE: The coroutine transformation you've enabled is a hand coded transformation! *
 *       Most of it is _not_ present in the AST. What you see is an approximation.   *
 *************************************************************************************/
#include <iostream>
#include <coroutine>

using namespace std;

struct resumable_thing
{
  struct promise_type
  {
    inline resumable_thing get_return_object()
    {
      return resumable_thing(resumable_thing(std::coroutine_handle<promise_type>::from_promise(*this)));
    }

    inline std::suspend_never initial_suspend()
    {
      return std::suspend_never{};
    }

    inline std::suspend_never final_suspend() noexcept
    {
      return std::suspend_never{};
    }

    inline void return_void()
    {
    }

    inline void unhandled_exception()
    {
    }

    // inline constexpr promise_type() noexcept = default;
  };

  std::coroutine_handle<promise_type> _coroutine;
  inline constexpr resumable_thing() /* noexcept */ = default;
  // inline resumable_thing(const resumable_thing &) = delete;
  // inline resumable_thing & operator=(const resumable_thing &) = delete;
  inline resumable_thing(resumable_thing && other)
  : _coroutine{std::coroutine_handle<promise_type>(other._coroutine)}
  {
    other._coroutine.operator=(nullptr);
  }

  inline resumable_thing & operator=(resumable_thing && other)
  {
    if(&other != this) {
      this->_coroutine.operator=(other._coroutine);
      other._coroutine.operator=(nullptr);
    }

  }

  inline explicit resumable_thing(std::coroutine_handle<promise_type> coroutine)
  : _coroutine{std::coroutine_handle<promise_type>(coroutine)}
  {
  }

  inline ~resumable_thing() noexcept
  {
    if(static_cast<bool>(this->_coroutine.operator bool())) {
      this->_coroutine.destroy();
    }

  }

  inline void resume()
  {
    this->_coroutine.resume();
  }

};



struct __counterFrame
{
  void (*resume_fn)(__counterFrame *);
  void (*destroy_fn)(__counterFrame *);
  std::__coroutine_traits_impl<resumable_thing>::promise_type __promise;
  int __suspend_index;
  bool __initial_await_suspend_called;
  unsigned int i;
  std::suspend_never __suspend_44_17;
  std::suspend_always __suspend_48_14;
  std::suspend_never __suspend_44_17_1;
};

resumable_thing counter()
{
  /* Allocate the frame including the promise */
  __counterFrame * __f = reinterpret_cast<__counterFrame *>(operator new(__builtin_coro_size()));
  __f->__suspend_index = 0;
  __f->__initial_await_suspend_called = false;

  /* Construct the promise. */
  new (&__f->__promise)std::__coroutine_traits_impl<resumable_thing>::promise_type{};

  resumable_thing __coro_gro = __f->__promise.get_return_object() /* NRVO variable */;

  /* Forward declare the resume and destroy function. */
  void __counterResume(__counterFrame * __f);
  void __counterDestroy(__counterFrame * __f);

  /* Assign the resume and destroy function pointers. */
  __f->resume_fn = &__counterResume;
  __f->destroy_fn = &__counterDestroy;

  /* Call the made up function with the coroutine body for initial suspend.
     This function will be called subsequently by coroutine_handle<>::resume()
     which calls __builtin_coro_resume(__handle_) */
  __counterResume(__f);


  return __coro_gro;
}

/* This function invoked by coroutine_handle<>::resume() */
void __counterResume(__counterFrame * __f)
{
  try
  {
    /* Create a switch to get to the correct resume point */
    switch(__f->__suspend_index) {
      case 0: break;
      case 1: goto __resume_counter_1;
      case 2: goto __resume_counter_2;
    }

    /* co_await insights.cpp:44 */
    __f->__suspend_44_17 = __f->__promise.initial_suspend();
    if(!__f->__suspend_44_17.await_ready()) {
      __f->__suspend_44_17.await_suspend(std::coroutine_handle<resumable_thing::promise_type>::from_address(static_cast<void *>(__f)).operator coroutine_handle());
      __f->__suspend_index = 1;
      __f->__initial_await_suspend_called = true;
      return;
    }

    __resume_counter_1:
    __f->__suspend_44_17.await_resume();
    std::operator<<(std::cout, "counter: called\n");
    for( __f->i = 1; ; __f->i++) {

      /* co_await insights.cpp:48 */
      __f->__suspend_48_14 = std::suspend_always{};
      if(!__f->__suspend_48_14.await_ready()) {
        __f->__suspend_48_14.await_suspend(std::coroutine_handle<resumable_thing::promise_type>::from_address(static_cast<void *>(__f)).operator coroutine_handle());
        __f->__suspend_index = 2;
        return;
      }

      __resume_counter_2:
      __f->__suspend_48_14.await_resume();
      std::operator<<(std::operator<<(std::cout, "counter:: resumed (#").operator<<(__f->i)")\n");
    }

    goto __final_suspend;
  } catch(...) {
    if(!__f->__initial_await_suspend_called) {
      throw ;
    }

    __f->__promise.unhandled_exception();
  }

  __final_suspend:

  /* co_await insights.cpp:44 */
  __f->__suspend_44_17_1 = __f->__promise.final_suspend();
  if(!__f->__suspend_44_17_1.await_ready()) {
    __f->__suspend_44_17_1.await_suspend(std::coroutine_handle<resumable_thing::promise_type>::from_address(static_cast<void *>(__f)).operator coroutine_handle());
  }

  ;
}

/* This function invoked by coroutine_handle<>::destroy() */
void __counterDestroy(__counterFrame * __f)
{
  /* destroy all variables with dtors */
  __f->~__counterFrame();
  /* Deallocating the coroutine frame */
  operator delete(__builtin_coro_free(static_cast<void *>(__f)));
}



int main()
{
  std::operator<<(std::cout, "main:    calling counter\n");
  resumable_thing the_counter = counter();
  std::operator<<(std::cout, "main:    resuming counter\n");
  the_counter.resume();
  the_counter.resume();
  the_counter.resume();
  the_counter.resume();
  the_counter.resume();
  std::operator<<(std::cout, "main:    done\n");
  return 0;
}

cppinsights 本身也跟 Compiler Explorer 做了拉通, 做代碼深度分析的時候, 更多的結合這些開源工具, 很多時候還是非常有幫助的.

那麼有了 compiler 預處理後的代碼, 再來分析 C++20 Coroutine 的機制, 就變得簡單了.

4.4 Coroutine20 基本結構 - Compiler 視角

對於 compiler 預處理後的代碼, 我們直接結合結構圖來分析:

我們會發現, couter 被編譯器處理後基本就只是一個空殼函數了, 原來的實現邏輯被整體搬入了一個編譯器幫我們定義的函數__coutnerResume()中, 然後出現了一個編譯器幫我們定義的對象__couterFrame, 通過分析代碼很容易知道, __counterFrame結構主要完成幾部分的事情:

  1. virtual table 部分, 正確的告知你協程使用的 resume 函數以及 destroy 函數

  2. 自動處理的棧變量, 如下圖中所示的 i

  3. 各種使用到的 awaitable object, 這是因爲 awaitable object 本身也是有狀態的, 需要正確記錄

  4. 當前執行到的位置, 這個是通過整形的__suspend_index來記錄的.

當我們觀察__counterResume()的實現, 有趣的事情來了, 我們發現, 其實 C++20 也是使用一個大的 switch-case 來作爲協程執行的全局狀態機, 只不過每個 case lablel 後面, 接的是 goto, 而不是像我們在 C++17 下面那樣, 直接嵌入的業務代碼.

整體 C++20 的實現思路, 基本上與 17 的實現思路是一脈相承的, 只不過得益於 compiler 的支持, 很多事情我們都由主動處理 -> 自動處理.

4.5 Compiler 視角重新分析示例代碼

4.5.1 couter() - Function Body

我們知道, couter()會被編譯器改寫, 最終其實是變成了三個函數:

  1. 單純負責生命週期以及生成正確的__counterFrame對象的counter(), 只是一個協程入口函數.

  2. 負責真正執行邏輯的 __counterResume()函數, 它的輸入參數就是__counterFrame對象.

  3. 負責刪除 **counterFrame 對象的 **counterDestroy()函數.

通過一拆三, 編譯器很好的解決了協程的入口, 協程的中斷重入, 和協程以及相關對象的銷燬的問題.

4.5.2 coroutine_handle<>

部分coroutine_handle<>的定義代碼如下

template <> struct coroutine_handle<void>{
  constexpr coroutine_handle() noexcept;
  constexpr coroutine_handle(nullptr_t) noexcept;
  coroutine_handle& operator=(nullptr_t) noexcept;
  constexpr void* address() const noexcept;
  constexpr static coroutine_handle from_address(void* addr);
  constexpr explicit operator bool() const noexcept;
  bool done() const;
  void operator()();
  void resume();
  void destroy();
private:
  void* ptr;// exposition only
};

我們結合前面展開的代碼, 已經很好理解coroutine_handle<>爲何會有協程生命週期控制的能力了, 因爲它關聯了xxxFrame對象, 而通過前面的分析, xxxFrame的是虛表記錄了協程 resume 和 destroy 的函數, 所以這個地方的 ptr, 其實就是一個xxxFrame對象, 正確的關聯了xxxFrame對象, 透過它, 我們自然能夠擁有resume(), destroy()等一系列的能力了, 這裏並沒有任何魔法的存在.

template <typename Promise>
struct coroutine_handle
: coroutine_handle<void>
{
  Promise& promise() const noexcept;
  static coroutine_handle from_promise(Promise&) noexcept;
};

另外通過繼承的方式, coroutine_handle<>完成了與 Promise 對象關聯和轉換的功能.

4.5.3 promise_type

同樣, 我們結合預處理後的代碼:

/* This function invoked by coroutine_handle<>::resume() */
void __counterResume(__counterFrame * __f)
{
  try
  {
    /* Create a switch to get to the correct resume point */
    switch(__f->__suspend_index) {
      case 0: break;
      case 1: goto __resume_counter_1;
      case 2: goto __resume_counter_2;
    }
    /* initial suspend handle here~~ */
    __f->__suspend_44_17 = __f->__promise.initial_suspend();
__resume_counter_1:
    /* do somthing for yield~~ */
__resume_counter_2:
    /* do somthing for resume~~ */
    goto __final_suspend;
  } catch(...) {
    if(!__f->__initial_await_suspend_called) {
      throw ;
    }
    __f->__promise.unhandled_exception();
  }
__final_suspend:
  /* final suspend here~~ */
  __f->__suspend_44_17_1 = __f->__promise.final_suspend();
}

通過__counterResume()的邏輯實現, promise 爲何可以對協程的初始化和結束行爲進行控制, 也很一目瞭然了, 因爲__counterFrame對象中關聯了我們定義的promise_type類型, 所以我們也能很直接的通過__counterFrame訪問到promise_type類型, 一方面充當配置項的角色, 如控制initial_suspend, final_suspend. 另外, promise_type也作爲一個 Wrapper, 對如co_yield等進行轉義執行, 以及異常的轉發處理, 也是非常好理解的機制.

4.5.4 Awaitable 對象

常見的 awaitable 對象如我們示例中看到的, 系統預定義的:

__resume_counter_1:
    __f->__suspend_44_17.await_resume();
    std::operator<<(std::cout, "counter: called\n");
    for( __f->i = 1; ; __f->i++) {

      /* co_await insights.cpp:48 */
      __f->__suspend_48_14 = std::suspend_always{};
      if(!__f->__suspend_48_14.await_ready()) {
        __f->__suspend_48_14.await_suspend(coroutine_handle);
        __f->__suspend_index = 2;
        return;
      }
__resume_counter_2:
      __f->__suspend_48_14.await_resume();
      std::cout << "counter:: resumed (#" << __f->i << ")\n";
    }

對於每一次的co_await, 編譯器處理後的代碼, 都會形成一個中斷點 和一個重入點, 其實對應的是兩個狀態, 剛開始執行的時候, 進入的是中斷點的邏輯, 也就是我們看到的__resume_counter_1對應 label 的代碼, 而重入點則是__resume_counter_2對應 label 的代碼, 結合此處展開的實例代碼, 我們也能很好的理解 awaitable 三個子函數的具體作用了:

4.6 小結 - C++20 協程的特點總結

我們總結 C++20 協程的特點:

5. Coroutine Scheduler

5.1 Sheduler 實現的動機

前面我們也提到了, 要做到 "庫作者向特性" => 面向業務的異步框架, 我們還需要一些額外的工作, 這就是我們馬上要介紹的 Coroutine Scheduler - 協程調度器.

5.2 Scheduler 核心機制

如上圖所示, Scheduler 主要提供對 SchedTask 的管理, 以及兩個基礎機制(對比 17 版的三個) 方便協程相關業務機制的實現:

  1. Awaitable 機制: 前面也介紹了利用 c++20 的 co_await 關鍵字和 awaitable 對象, 我們可以很好的定義掛起點, 以及交換協程和外部系統的數據。

  2. Return Callback 機制: 部分協程執行完後需要向外界反饋執行結果 (如協程模式執行的 Rpc Service).

5.3 Scheduler 核心對象

5.3.1 ISchedTask & SchedTaskCpp20
using CoReturnFunction = std::function<void(const CoReturnObject*)>;

class ISchedTask
{
    friend class Scheduler;
  public:
    ISchedTask() = delete;
    ISchedTask(const SchedTaskCpp17&) = delete;
    ISchedTask(uint64_t taskId, Scheduler* manager);
    virtual ~ISchedTask();
    uint64_t GetId() const;
    virtual int Run() = 0;
    virtual bool IsDone() const = 0;
    virtual CO_TASK_STATE GetCoState() const = 0;
    void BindSleepHandle(uint64_t handle);
    AwaitMode GetAwaitMode() const;
    int GetAwaitTimeout() const;
    template<typename AwaitEventType>
    auto BindResumeObject(AwaitEventType&& awaitEvent)->std::enable_if_t<std::is_base_of<ResumeObject, AwaitEventType>::value>;
    template<typename AwaitEventType>
    auto GetResumeObjectAsType()->std::enable_if_t<std::is_base_of<ResumeObject, AwaitEventType>::value, AwaitEventType*>;
    bool HasResumeObject() const noexcept;
    void ClearResumeObject();
    bool IsLastInvokeSuc() const noexcept;
    bool IsLastInvokeTimeOut() const noexcept;
    bool IsLastInvokeFailed() const noexcept;
    void AddChildTask(uint64_t tid);
    void AddWaitNofityTask(uint64_t tid);
    const auto& GetChildTaskArray() const;
    const auto& GetWaitNotifyArray() const;
    void Terminate();
    Scheduler* GetManager() const;
    static ISchedTask* CurrentTask();
    void DoYield(AwaitMode mode, int awaitTimeMs = 0);
    void SetReturnFunction(CoReturnFunction&& func);
    void DoReturn(const CoReturnObject& obj);
    void DoReturn();
  protected:
    uint64_t     mTaskId;
    Scheduler*      mManager;
    std::vector<uint64_t>  mChildArray;
    std::vector<uint64_t>  mWaitNotifyArray;
    //value used to return from coroutine
    AwaitMode     mAwaitMode = AwaitMode::AwaitDoNothing;
    int       mAwaitTimeout = 0;
    //value used to send to coroutine(now as a AwaitEvent)
    reflection::UserObject  mResumeObject;
    uint64_t     mSleepHandle = 0;
    bool      mIsTerminate = false;
    CoReturnFunction   mCoReturnFunc;
};

class SchedTaskCpp20: public ISchedTask
{
  public:
    SchedTaskCpp20(uint64_t taskId, CoTaskFunction&& taskFunc, Scheduler* manager);
    ~SchedTaskCpp20();
    int Run() override;
    bool IsDone() const override;
    CO_TASK_STATE GetCoState() const override;
    void BindSelfToCoTask();
    const CoResumingTaskCpp20& GetResumingTask() const;
  protected:
    CoResumingTaskCpp20   mCoResumingTask;
    CoTaskFunction    mTaskFuncion;
};

C++20 的 SchedTaskCpp20 主要完成對協程對象的封裝, CoTaskFunction 用於存儲相關的函數對象, 而 CoResumingTaskCpp20 則如同前面示例中的 resumable_thing 對象,內部有需要的 promise_type 實現, 我們對協程的訪問也是通過它來完成的。

此處需要注意的是我們保存了協程對象外, 還額外保存了相關的函數對象, 這是因爲如果協程本身是一個 lambda, compiler 並不會幫我們正確維護 lambda 的生命週期以及 lambda 所捕獲的函數, 尚未清楚是實現缺陷還是功能就是如此, 所以此處需要一個額外存在的 std::function<> 對象, 來保證對應 lambda 的生命週期是正確的。

對比 17 的實現, 我們的 SchedTask 對象中主要保留了:reflection::UserObject mResumeObject: 主要用於異步等待的執行, 當一個異步等待成功執行的時候, 向協程傳遞值。

原來利用事件去處理最終返回值的機制也替換成了 Return 回調的方式,相對來說更簡單直接, 利用 lambda 本身也能很方便的保存需要最終回傳的臨時值了。

5.3.2 Scheduler

Scheduler 的代碼比較多, 主要就是 SchedTask 的管理器, 另外也完成對前面提到的三種機制的支持, 文章重點分析一下三種機制的實現代碼.

5.3.3 Yield 處理
void Scheduler::Update()
{
    RSTUDIO_PROFILER_METHOD_INFO(sUpdate, "Scheduler::Update()", rstudio::ProfilerGroupType::kLogicJob);
    RSTUDIO_PROFILER_AUTO_SCOPE(sUpdate);

    //Handle need kill task first
    while(!mNeedKillArray.empty())
    {
        auto tid = mNeedKillArray.front();
        mNeedKillArray.pop();
        auto* tmpTask = GetTaskById(tid);
        if (tmpTask != nullptr)
        {
            DestroyTask(tmpTask);
        }
    }

    //Keep a temp queue for not excute next frame task right now
    decltype(mFrameStartTasks) tmpFrameTasks;
    mFrameStartTasks.swap(tmpFrameTasks);

    while (!tmpFrameTasks.empty())
    {
        auto task_id = tmpFrameTasks.front();
        tmpFrameTasks.pop();
        auto* task = GetTaskById(task_id);
        LOG_CHECK_ERROR(task);
        if (task)
        {
            AddToImmRun(task);
        }
    }
}

void Scheduler::AddToImmRun(ISchedTask* schedTask)
{
    LOG_PROCESS_ERROR(schedTask);
    schedTask->Run();

    if (schedTask->IsDone())
    {
        DestroyTask(schedTask);
        return;
    }

    {
        auto awaitMode = schedTask->GetAwaitMode();
        auto awaitTimeoutMs = schedTask->GetAwaitTimeout();
        switch (schedTask->GetAwaitMode())
        {
            case rstudio::logic::AwaitMode::AwaitNever:
                AddToImmRun(schedTask);
                break;
            case rstudio::logic::AwaitMode::AwaitNextframe:
                AddToNextFrameRun(schedTask);
                break;
            case rstudio::logic::AwaitMode::AwaitForNotifyNoTimeout:
            case rstudio::logic::AwaitMode::AwaitForNotifyWithTimeout:
                {
                    HandleTaskAwaitForNotify(schedTask, awaitMode, awaitTimeoutMs);
                }
                break;
            case rstudio::logic::AwaitMode::AwaitDoNothing:
                break;
            default:
                RSTUDIO_ERROR(CanNotRunToHereError());
                break;
        }
    }
    Exit0:
    return;
}

上面是 Scheduler 的 Update() 以及 Update 用到的核心函數 AddToImmRun() 的實現代碼, 在每個 task->Run() 後, 到達下一個掛起點, 返回外部代碼的時候, 外部代碼會根據 Task 當前的 AwaitMode 對協程後續行爲進行控制, 主要是以下幾種模式:

  1. rstudio::logic::AwaitMode::AwaitNever: 立即將協程加入回 mReadyTask 隊列, 對應協程會被馬上喚醒執行

  2. rstudio::logic::AwaitMode::AwaitNextframe: 將協程加入到下一幀執行的隊列, 協程將會在下一幀被喚醒執行

  3. rstudio::logic::AwaitMode::AwaitForNotifyNoTimeout: 等待外界通知後再喚醒執行 (無超時模式), 注意該模式下如果一直沒收到通知, 相關協程會一直在隊列中存在.

  4. rstudio::logic::AwaitMode::AwaitForNotifyWithTimeout: 同 3, 差別是存在一個超時時間, 超時時間到了也會喚醒協程, 業務方可以通過 ResumeObject 判斷協程是被超時喚醒的.

  5. rstudio::logic::AwaitMode::AwaitDoNothing: 特殊的 AwaitHandle 實現會使用該模式, 比如刪除 Task 的實現, 都要刪除 Task 了, 我們肯定不需要再將 Task 加入任何可喚醒隊列了.

5.3.4 Resume 處理

Resume 機制主要是通過喚醒在 Await 隊列中的協程的時候向關聯的 Task 對象傳遞 ResumeObject 實現的:

//Not a real event notify here, just do need things
template <typename E>
auto ResumeTaskByAwaitObject(E&& awaitObj)
 -> std::enable_if_t<std::is_base_of<ResumeObject, E>::value>
{
    auto tid = awaitObj.taskId;
    if (IsTaskInAwaitSet(tid))
    {
        //Only in await set task can be resume
        auto* task = GetTaskById(tid);
        if (RSTUDIO_LIKELY(task != nullptr))
        {
            task->BindResumeObject(std::forward<E>(awaitObj));
            AddToImmRun(task);
        }

        OnTaskAwaitNotifyFinish(tid);
    }
}

然後再通過 rco_get_resume_object() 宏在協程代碼中獲取對應的 ResumeObject. 宏的聲明代碼如下:

#define rco_get_resume_object(ResumeObjectType)      rco_self_task()->GetResumeObjectAsType<ResumeObjectType>()

本身就是一個簡單的傳值取值的過程. 注意傳遞 ResumeObject 後, 我們也會馬上將協程加入到 mReadTasks 隊列中以方便在接下來的 Update 中喚醒它.

5.3.5 一個 Awaitable 實現的範例

我們以 Rpc 的協程化 Caller 實現爲例, 看看一個 awaitable 對象應該如何構造:

class RSTUDIO_APP_SERVICE_API RpcRequest
{
  public:
    RpcRequest() = delete;
    ////RpcRequest(const RpcRequest&) = delete;
    ~RpcRequest() = default;

    RpcRequest(const logic::GameServiceCallerPtr& proxy,
               const std::string_view funcName,
               reflection::Args&& arg, int timeoutMs) :
    mProxy(proxy)
        , mFuncName(funcName)
        , mArgs(std::forward<reflection::Args>(arg))
        , mTimeoutMs(timeoutMs)
    {}
    bool await_ready()
    {
  return false;
 }
    void await_suspend(coroutine_handle<>) const noexcept
    {
        auto* task = rco_self_task();
        auto context = std::make_shared<ServiceContext>();
        context->TaskId = task->GetId();
        context->Timeout = mTimeoutMs;
        auto args = mArgs;
        mProxy->DoDynamicCall(mFuncName, std::move(args), context);
        task->DoYield(AwaitMode::AwaitForNotifyNoTimeout);
    }
    ::rstudio::logic::RpcResumeObject* await_resume() const noexcept
    {
        return rco_get_resume_object(logic::RpcResumeObject);
    }
  private:
    logic::GameServiceCallerPtr     mProxy;
    std::string         mFuncName;
    reflection::Args       mArgs;
    int           mTimeoutMs;
};

重點是前面說到的 await_ready(), await_suspend(), await_resume() 函數的實現。

5.3.6 ReturnCallback 機制

有一些場合, 可能需要協程執行完成後向業務系統發起通知並傳遞返回值, 比如 Rpc Service 的協程支持實現等, 這個特性其實比較類似 go 的 defer, 只是這裏的實現更簡單, 只支持單一函數的指定而不是隊列. 我們直接以 RpcService 的協程支持爲例來看一下這一塊的具體使用.

首先是業務側, 在創建完協程後, 需要給協程綁定後續協程執行完成後做進一步操作需要的數據:

task->SetReturnFunction(
    [this, server, entity, cmdHead, routerAddr,
     reqHead, context](const CoReturnObject* obj) {
    const auto* returnObj = dynamic_cast<const CoRpcReturnObject*>(obj);
    if (RSTUDIO_LIKELY(returnObj))
    {
        DoRpcResponse(server, entity.get(), routerAddr, &cmdHead,
                      reqHead, const_cast<ServiceContext&>(context),
                      returnObj->rpcResultType,
                      returnObj->totalRet, returnObj->retValue);
    }
});

這裏將 Connection id 等信息通過 lambda 的 capture 功能直接綁定到 SchedTask 的返回函數,然後業務代碼會利用 co_return 本身的功能向 promise_type 傳遞返回值:

CoTaskInfo HeartBeatService::DoHeartBeat(
    logic::Scheduler& scheduler, int testVal)
{
    return scheduler.CreateTask20(
        [testVal]() -> logic::CoResumingTaskCpp20 {

            co_await logic::cotasks::Sleep(1000);

            printf("service yield call finish!\n");

            co_return CoRpcReturnObject(reflection::Value(testVal + 1));
        }
    );
}

最終我們利用 promise_type 的 return_value() 來完成對設置的回調的調用:

void CoResumingTaskCpp20::promise_type::return_value(const CoReturnObject& obj)
{
    auto* task = rco_self_task();
    task->DoReturn(obj);
}

注意這個地方 task 上存儲的 ExtraFinishObject 會作爲 event 的一部分直接傳遞給業務系統, 並在發起事件後調用刪除協程任務的方法.

對比原版 17 的 Finish Event 實現, 通過 Return Callback 的方式來對一些特殊的返回進行處理, 這種機制是更容易使用的。

5.4 示例代碼

//C++ 20 coroutine
auto clientProxy = mRpcClient->CreateServiceProxy("mmo.HeartBeat");
mScheduler.CreateTask20([clientProxy]()
                        -> rstudio::logic::CoResumingTaskCpp20 {
    auto* task = rco_self_task();

    printf("step1: task is %llu\n", task->GetId());
    co_await rstudio::logic::cotasks::NextFrame{};

    printf("step2 after yield!\n");
    int c = 0;
    while (c < 5) {
        printf("in while loop c=%d\n", c);
        co_await rstudio::logic::cotasks::Sleep(1000);
        c++;
    }
    for (c = 0; c < 5; c++) {
        printf("in for loop c=%d\n", c);
        co_await rstudio::logic::cotasks::NextFrame{};
    }

    printf("step3 %d\n", c);
    auto newTaskId = co_await rstudio::logic::cotasks::CreateTask(false,
                                    []()-> logic::CoResumingTaskCpp20 {
        printf("from child coroutine!\n");
        co_await rstudio::logic::cotasks::Sleep(2000);
        printf("after child coroutine sleep\n");
    });
    printf("new task create in coroutine: %llu\n", newTaskId);
    printf("Begin wait for task!\n");
    co_await rstudio::logic::cotasks::WaitTaskFinish{ newTaskId, 10000 };
    printf("After wait for task!\n");

    rstudio::logic::cotasks::RpcRequest
        rpcReq{clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3 }, 5000};
    auto* rpcret = co_await rpcReq;
    if (rpcret->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc) {
        assert(rpcret->totalRet == 1);
        auto retval = rpcret->retValue.to<int>();
        assert(retval == 4);
        printf("rpc coroutine run suc, val = %d!\n", retval);
    }
    else {
        printf("rpc coroutine run failed! result = %d \n"(int)rpcret->rpcResultType);
    }
    co_await rstudio::logic::cotasks::Sleep(5000);
    printf("step4, after 5s sleep\n");
    co_return rstudio::logic::CoNil;
} );

  執行結果:

step1: task is 1
step2 after yield!
in while loop c=0
in while loop c=1
in while loop c=2
in while loop c=3
in while loop c=4
in for loop c=0
in for loop c=1
in for loop c=2
in for loop c=3
in for loop c=4
step3 5
new task create in coroutine: 2
Begin wait for task!
from child coroutine!
after child coroutine sleep
After wait for task!
service yield call finish!
rpc coroutine run suc, val = 4!
step4, after 5s sleep

對比 17 的實現, 主要的好處是:

  1. 代碼更精簡了

  2. Stack 變量可以被 Compiler 自動處理, 正常使用了。

  3. co_await 可以直接返回值, 並有強制的類型約束了。

  4. 一個協程函數就是一個返回值爲 logic::CoResumingTaskCpp20 類型的 lambda, 可以充分利用 lambda 本身的特性還實現正確的邏輯了。

6. Scheduler 的使用

6.1 示例代碼

//C++ 20 coroutine
auto clientProxy = mRpcClient->CreateServiceProxy("mmo.HeartBeat");
mScheduler.CreateTask20([clientProxy]()
                        -> rstudio::logic::CoResumingTaskCpp20 {
    auto* task = rco_self_task();

    printf("step1: task is %llu\n", task->GetId());
    co_await rstudio::logic::cotasks::NextFrame{};

    printf("step2 after yield!\n");
    int c = 0;
    while (c < 5) {
        printf("in while loop c=%d\n", c);
        co_await rstudio::logic::cotasks::Sleep(1000);
        c++;
    }
    for (c = 0; c < 5; c++) {
        printf("in for loop c=%d\n", c);
        co_await rstudio::logic::cotasks::NextFrame{};
    }

    printf("step3 %d\n", c);
    auto newTaskId = co_await rstudio::logic::cotasks::CreateTask(false,
                                    []()-> logic::CoResumingTaskCpp20 {
        printf("from child coroutine!\n");
        co_await rstudio::logic::cotasks::Sleep(2000);
        printf("after child coroutine sleep\n");
    });
    printf("new task create in coroutine: %llu\n", newTaskId);
    printf("Begin wait for task!\n");
    co_await rstudio::logic::cotasks::WaitTaskFinish{ newTaskId, 10000 };
    printf("After wait for task!\n");

    rstudio::logic::cotasks::RpcRequest
        rpcReq{clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3 }, 5000};
    auto* rpcret = co_await rpcReq;
    if (rpcret->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc) {
        assert(rpcret->totalRet == 1);
        auto retval = rpcret->retValue.to<int>();
        assert(retval == 4);
        printf("rpc coroutine run suc, val = %d!\n", retval);
    }
    else {
        printf("rpc coroutine run failed! result = %d \n"(int)rpcret->rpcResultType);
    }
    co_await rstudio::logic::cotasks::Sleep(5000);
    printf("step4, after 5s sleep\n");
    co_return rstudio::logic::CoNil;
} );

6.2 小議 C++20 Coroutine 對比 C++17 Coroutine 帶來的改進

通過前面的介紹, 我們很容易得出以下幾個 C++20 Coroutine 的優勢:

  1. 原生關鍵字 co_await, co_return 的支持, 業務側使用代碼更加精簡, 也進一步統一了大家對無棧協程的標準理解.

  2. Stack 變量可以被 compiler 自動處理, 這點對比 C++17 需要自行組織狀態變量來說是非常節約心智負責的.

  3. co_await可以直接返回對應類型的值, 這樣協程本身就有了強制的類型約束, 整體業務的表達也會因爲不需要從類型擦除的對象獲取需要的類型, 變得更順暢.

7. 一個有意思的實例

我們思考一個問題, 如果部分使用 OOP 進行設計的系統, 使用協程的思路重構, 會是什麼樣子的?

剛好筆者原來的某個項目是使用 Python 作爲腳本, 當時嘗試使用 Python 的 Coroutine 實現了一版技能系統, 今天我們來嘗試使用 C++20 Coroutine 重新實現它, 這樣也能夠對比一下, 在有協程調度器存在的情況下, 業務側對協程的使用感受, 與其他語言如 Python 中的差異.

7.1 一個 Python 實現的技能示例

我們以一個原來在 python 中利用包裝的協程調度器實現的技能系統爲例, 先來看看相關的實現效果和核心代碼。

python 的 stackless 協程實現不是我們關注的重點,參考的第一個鏈接是相關的實現思路,感興趣的可以打開相關鏈接詳細瞭解, 此處就不再展開細說了。

7.1.1 實現效果

以下是相關實現的示例效果, 主要是一個火球技能和實現和一個閃電鏈技能的實現:

7.1.2 技能主流程代碼

我們先來看一下技能的主流程代碼, 可以發現使用協程方式實現, 整個代碼更函數式, 區別於面向對象構造不同對象存儲中間態數據的設計。

# handle one skill instance create
def skill_instance_run_func(instance, user, skill_data, target, target_pos, finish_func):
    # set return callback here
 yield TaskSetExitCallback(finish_func)
    # ... some code ignore here
 from common.gametime import GameTime
 init_time = GameTime.now_time
 for skill_step in step_list:
  step_start_time = GameTime.now_time
        # ... some code ignore here
        ### 1. period task handle
  if skill_step.cast_type == CastSkillStep.CAST_TYPE_PERIOD:
   #... some code ignore here
        ### 2. missle skill
  elif skill_step.cast_type == CastSkillStep.CAST_TYPE_MISSLE_TO_TARGET:
   if len(skill_step.cast_action_group_list) > 0:
    action_group = skill_step.cast_action_group_list[0]
    for i in range(skill_step.cast_count):
                    # yield for sleep
     yield TaskSleep(skill_step.cast_period)
     ret_val = do_skill_spend(skill_data, user, instance)
     if not ret_val:
      return
                    # sub coroutine(missle_handle_func)
     task_id = yield TaskNew(missle_handle_func(
                        skill_data, instance, user, skill_step, action_group, target_id, target_pos))
     instance.add_child_task_id(task_id)
        ### 3. guide skill
  elif skill_step.cast_type == CastSkillStep.CAST_TYPE_GUIDE_TO_TARGET:
   #... some code ignore here
  now_time = GameTime.now_time
  step_pass_time = now_time - step_start_time
  need_sleep_time = skill_step.step_total_time - step_pass_time
  if need_sleep_time > 0:
   yield TaskSleep(need_sleep_time)
  instance.on_one_step_finish(skill_step)
 if skill_data.delay_end_time > 0:
  yield TaskSleep(skill_data.delay_end_time)
    # wait for child finish~~
 for task_id in instance.child_task_list:
  yield TaskWait(task_id)
 instance.task_id = 0

整體實現比較簡單, 整個技能是由多個 SkillStep 來配置的, 整體技能的流程就是 for 循環執行所有 SkillStep, 然後提供了多種 SkillStep 類型的處理, 主要是以下幾類:

最後所有 step 應用完畢會進入配置的休眠和等待子任務的階段。

7.1.3 子任務 - 導彈類技能相關代碼

對於上面介紹的導彈類技能(火球), 核心實現也比較簡單, 實現了一個飛行物按固定速度逼近目標的效果, 具體代碼如下, 利用 yield 我們可以實現在飛行物未達到目標點的時候每幀執行一次的效果:

### 1. handle for missle skill(etc: fire ball)
def missle_handle_func(skill_data, instance, user, skill_step, action_group, target_id, target_pos):
 effect = instance.create_effect(skill_step.missle_info.missle_fx_path)
 effect.set_scale(skill_step.missle_info.missle_scale)

 cur_target_pos, is_target_valid = skill_step.missle_info.get_target_position(
        user, target_id, target_pos)
 start_pos = skill_step.missle_info.get_start_position(user, target_id, target_pos)

 is_reach_target = False
 from common.gametime import GameTime
 init_time = GameTime.now_time
 while True:
  # ... some code ignore here
  fly_distance = skill_step.missle_info.fly_speed*GameTime.elapse_time
  if fly_distance < total_distance:
   start_pos += fly_direction*math3d.vector(fly_distance, fly_distance, fly_distance)
   effect.set_position(start_pos)
  else:
   is_reach_target = True
   break
        # do yield util next frame
  yield
 effect.destroy()
 if is_reach_target:
  target_list = skill_data.get_target_list(user.caster, target_id, target_pos)
  for target in target_list:
   action_group.do(user.caster, target)
7.1.4 子任務 - 引導類技能代碼

對於上面介紹的引導類技能(閃電鏈),依託框架本身的 guide effect 實現, 我們利用 yield TaskSleep() 就能很好的完成相關的功能了:

### 2. handle for guide skill(etc: lighting chain)
def guide_handle_func(skill_data, instance, user, skill_step, start_pos, target_id, target_pos):
 effect = instance.create_effect(skill_step.guide_info.guide_fx_path)
 effect.set_scale(skill_step.guide_info.guide_scale)

 effect.set_position(start_pos)

 effect.set_guide_end_pos(target_pos - start_pos)

    # yield for sleep
 yield TaskSleep(skill_step.guide_info.guide_time)
 effect.destroy()

7.2 對應的 C++ 實現

前面的 python 實現只是個引子, 拋開具體的畫面和細節, 我們來嘗試用我們構建的 C++20 版協程調度器來實現相似的代碼(拋開顯示相關的內容, 純粹過程模擬):

//C++ 20 skill test coroutine
mScheduler.CreateTask20([instance]() -> rstudio::logic::CoResumingTaskCpp20 {
    rstudio::logic::ISchedTask* task = rco_self_task();
    task->SetReturnFunction([](const rstudio::logic::CoReturnObject*) {
        //ToDo: return handle code add here
    });

    for (auto& skill_step : step_list) {
        auto step_start_time = GGame->GetTimeManager().GetTimeHardwareMS();
        switch (skill_step.cast_type) {
            case CastSkillStep::CAST_TYPE_PERIOD: {
                    //... some code ignore here
                }
                break;
            case CastSkillStep::CAST_TYPE_MISSLE_TO_TARGET: {
                    if (skill_step.cast_action_group_list.size() > 0) {
                        auto& action_group = skill_step.cast_action_group_list[0];
                        for (int i = 0; i < skill_step.cast_count; i++) {
                            co_await rstudio::logic::cotasks::Sleep(skill_step.cast_period);
                            bool ret_val = do_skill_spend(skill_data, user, instance);
                            if (!ret_val) {
                                co_return rstudio::logic::CoNil;
                            }
                            auto task_id = co_await rstudio::logic::cotasks::CreateTask(true,
                             [&skill_step]()->rstudio::logic::CoResumingTaskCpp20 {
        auto cur_target_pos = skill_step.missle_info.get_target_position(
                                    user, target_id, target_pos);
                                auto start_pos = skill_step.missle_info.get_start_position(
                                    user, target_id, target_pos);
                                bool is_reach_target = false;
                                auto init_time = GGame->GetTimeManager().GetTimeHardwareMS();
                                auto last_time = init_time;
                                do {
                                    auto now_time = GGame->GetTimeManager().GetTimeHardwareMS();
                                    auto elapse_time = now_time - last_time;
                                    last_time = now_time;
                                    if (now_time - init_time >= skill_step.missle_info.long_fly_time) {
                                        break;
                                    }

                                    auto cur_target_pos = skill_step.missle_info.get_target_position(
                                        user, target_id, target_pos);

                                    rstudio::math::Vector3 fly_direction = cur_target_pos - start_pos;
                                    auto total_distance = fly_direction.Normalise();
                                    auto fly_distance = skill_step.missle_info.fly_speed * elapse_time;
                                    if (fly_distance < total_distance) {
                                        start_pos += fly_direction * fly_distance;
                                    }
                                    else {
                                        is_reach_target = true;
                                        break;
                                    }

                                    co_await rstudio::logic::cotasks::NextFrame{};
                                } while (true);
                                if (is_reach_target) {
                                    //ToDo: add damage calculate here~~
                                }

                             });
                            instance.add_child_task_id(task_id);
                        }
                    }
                }
                break;
            case CastSkillStep::CAST_TYPE_GUIDE_TO_TARGET: {
                    //... some code ignore here
                }
                break;
            default:
                break;
        }

        auto now_time = GGame->GetTimeManager().GetTimeHardwareMS();
        auto step_pass_time = now_time - step_start_time;
        auto need_sleep_time = skill_step.step_total_time - step_pass_time;
        if (need_sleep_time > 0) {
            co_await rstudio::logic::cotasks::Sleep(need_sleep_time);
        }

        instance.on_one_step_finish(skill_step);
    }

    if (skill_data.delay_end_time > 0) {
        co_await rstudio::logic::cotasks::Sleep(skill_data.delay_end_time);
    }

    for (auto tid :instance.child_task_list) {
        co_await rstudio::logic::cotasks::WaitTaskFinish(tid, 10000);
    }
});

我們可以看到, 依賴 C++20 的新特性和我們自己封裝的調度器, 我們已經可以很自然很順暢的用比較低的心智負擔來表達原來在 python 中實現的功能了, 這應該算是一個非常明顯的進步了。

7.3 小結

通過上面兩版實現的對比, 我們不難發現:

  1. 結合調度器, C++ Coroutine 的實現與腳本一樣具備簡潔性, 這得益於 Compiler 對 Stack 變量的自動處理, 以及規整的co_await等關鍵字支持, 從某種程度上, 我們可以認爲這種處理提供了一個簡單的類 GC 的能力, 我們可以更低心智負擔的開發相關代碼.

  2. 協程的使用同時也會帶來其他一些好處, 像避免多級 Callback 帶來的代碼分散邏輯混亂等問題, 這個在 C++17 協程使用的範例中已經提到過, 此處不再重複.

8. RoadMap

8.1 對 asio coroutine20 實現部分的思考

我們知道最新版的 asio 已經在嘗試使用 C++ Coroutine20 來簡化它大量存在的異步操作. 先拋開具體的細節以及代碼實現質量等問題, 我們來看一下個人認爲 asio 做得比較好的兩點:

8.1.1 低使用成本的經典 callback 兼容方案

asio::awaitable<void> watchdog(asio::io_context& ctx) {
  asio::steady_timer timer(ctx);
  timer.expires_after(1s);
  co_await timer.async_wait(asio::use_awaitable);
  co_return;
}

這個實現比較巧妙的地方在於, steady_timerasync_wait()接口, 原來接受的是一個 callback 函數, 這個地方, asio 通過引入 asio::use_awaitable 對象, 實現了 callback 語義到co_await 協程語義的轉換, 這對於我們兼容大量包含 callback 的歷史代碼, 是非常具有參考價值的.

asio coroutine 實現的剝析, 在筆者的另外一篇文章 asio 的 coroutine 實現分析中有詳細的展開, 感興趣的讀者可以自行翻閱.

8.1.2 利用操作符定義複合任務
  auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
  if (!e)
  {
    co_await (
        (
          transfer(client, server, client_to_server_deadline) ||
          watchdog(client_to_server_deadline)
        )
        &&
        (
          transfer(server, client, server_to_client_deadline) ||
          watchdog(server_to_client_deadline)
        )
      );
  }

協程的使用, 不可避免的會出現協程與子協程, 協程與協程之間的複合關係, Asio 通過重載|| 運算和&& 運算, 來嘗試表達多個異步任務的組合, 具體的作用如下:

||: 用來表達兩個同時開始的異步任務, 其中一個成功執行, 則返回這個執行的結果, 並取消另外一個異步任務的執行.&&: 用來表達兩個同時執行的異步任務, 兩個任務都成功後返回包含這兩個任務執行結果的std::tuple<>值, 其中任意一個任務失敗, 則直接返回錯誤.

通過這種機制, 我們一定程度擁有了對任務的複合關係進行表達的能力, 比如對一個原本不支持超時的異步任務, 我們可以非常簡單的||上一個超時異步任務, 來解決它的超時支持問題. 這種設計也是很值得參考的.

8.2 關於 executions

聊到異步, 不得不說起最近幾年頻繁調整提案, 直到最近提案才逐步成熟的 executions 了. 我們先來簡單瞭解一下 executions:

在底層設計上, executions 與 ranges 非常類同, 都是先解決本身的 DSL 表達的問題, 再來構建更上層的應用, 區別在於 ranges 主要是使用了 CPO 以及|運算符來做到這一點, 而 executions 因爲本身的複雜度基於 CPO 引入了更復雜的tag invoke機制, 來組織自己的 DSL, 因爲這種表達代碼層面有很高的複雜度, 也被社區廣泛的戲稱爲 "存在大量的代碼噪聲", 或者說開發了一種 "方言". 但不可否認, 通過引入底層的 DSL 支撐特性, executions 很好的實現了結構化併發.

目前我們可以參考學習的工程化實踐, 主要是 Meta 公司開發的 libunifex 庫, 在結構化併發這部分, libunfix 其實已經做得比較好了, 但其本身是存在一些缺陷的, 一方面, libunifex 的調度器實現相比 asio, 還存在很大的落差, 另外, 一些支持工程應用的算法也有很多的缺失, 需要更長週期的發展和穩定.

所以對此, 我們目前的策略是保持預研的狀態, 在實現上嘗試將 libunifex 的調度器更多的結合 asio 的調度器, 並實現一些我們工程化比較急需的算法, 逐步引入 executions 的結構化併發, 對異步進行更好的開發與管理. 但不可否認的是, 目前綜合來看, executions 的成熟度和易用性都遠遠比不上 C++ Coroutine20, 短時間來看, 還是基於 Coroutine 的異步框架更值得投入.

8.3 關於後續的迭代

協程部分的特性目前是作爲我們自研引擎框架能力的一部分提供的, 一方面我們會圍繞 Coroutine 以及 Scheduler 補齊更多相關的特性, 如前面說到的對複合的異步任務的支持等, 另外我們也會嘗試一些 Executions 相關的探索, 如異構併發支持等, 相信隨着標準的進一步發展, 越來越多的人對這塊的投入和嘗試, 整個 C++ 的異步會向着使用側更簡潔, 表達能力更強的方向進化.

9. Reference

  1. asio 官網

  2. libunifex 源碼庫

  3. c++ 異步從理論到實踐 - 總覽篇

  4. A Curious Course on Coroutines and Concurrency - David Beazley [1]

  5. Marvin's Blog【程式人生】- C++20 中的 Coroutine [2]

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/QVXE7QbxEchl8ue4SoijiQ