​淺談協程

作者:kylinkzhang,騰訊 CSIG 後臺開發工程師

什麼是協程

我們可以簡單的認爲:協程就是用戶態的線程,但是上下文切換的時機是靠調用方(寫代碼的開發人員)自身去控制的。

同時,協程和用戶態線程非常接近,用戶態線程之間的切換不需要陷入內核,但部分操作系統中用戶態線程的切換需要內核態線程的輔助。

下面是一個簡單的例子:

void A() {
   cout << 1 << " ";
   cout << 2 << " ";
   cout << 3 << " ";
}

void B() {
   cout << "x" << " ";
   cout << "y" << " ";
   cout << "z" << " ";
}

int main(void) {
  A();
  B();
}

在單線程中,上述函數的輸出爲:

1 2 3 x y z

如果我們用 libco 庫將上面程序改造一下:

void A() {
   cout << 1 << " ";
   cout << 2 << " ";
   co_yield_ct();  // 切出到主協程
   cout << 3 << " ";
}

void B() {
   cout << "x" << " ";
   co_yield_ct();  // 切出到主協程
   cout << "y" << " ";
   cout << "z" << " ";
}

int main(void) {
  ...  // 主協程
  co_resume(A);  // 啓動協程 A
  co_resume(B);  // 啓動協程 B
  co_resume(A);  // 從協程 A 切出處繼續執行
  co_resume(B);  // 從協程 B 切出處繼續執行
}

同樣在單線程中,改造後的程序輸出如下:

1 2 x 3 y z

可以看出,切出操作是由 co_yield_ct() 函數實現的,而協程的啓動和恢復是由 co_resume 實現的。函數 A()B() 並不是一個執行完才執行另一個,而是產生了 “交叉執行 “ 的效果,這就是通過協程實現的!

線程挺好的,我們爲什麼需要協程呢?

因爲有些時候我們在執行一些操作(尤其是 IO 操作)時,不希望去做“創建一個新的線程”這種重量級的操作來異步處理。而是希望:在當前線程執行中,暫時切換到其他任務中執行,同時在 IO 真正準備好了之後,再切換回來繼續執行!

相比於多開一個線程來操作,使用協程的好處:

更多協程的好處:

  • https://www.zhihu.com/question/20511233

同時,下面是一些協程的特點:

不理解這些協程特點也不要緊,下文都會講到。

補充:線程上下文

下圖中展示了線程在運行過程 CPU 需要的一些信息(CPU Context,CPU 上下文),比如通用寄存器、棧信息(EBP/ESP)等,進程 / 線程切換時需要保存與恢復這些信息。而進程 / 內核態線程切換的時候需要與 OS 內核進行交互,保存 / 讀取 CPU 上下文信息。

線程時間消耗分析

內核態(Kernel)的一些數據是共享的,讀寫時需要同步機制,所以操作一旦陷入內核態就會消耗更多的時間。進程需要與操作系統中所有其他進程進行資源爭搶,且操作系統中資源的鎖是全局的;線程之間的數據一般在進程內共享,所以線程間資源共享相比如進程而言要輕一些。

雖然很多操作系統(比如 Linux)進程與線程區別不是非常明顯,但線程還是比進程要輕。

線程的切換(Context Switch)相比於其他操作而言並不是非常耗時,如下圖所示(2018 年):

參考這篇 Linux 線程相關文章,Linux 2.6 之後 Linux 多線程的性能提高了很多,大部分場景下線程切換耗時在 2us 左右;

下面是 Linux 下線程切換耗時統計(2013 年):

正常情況下線程可用的 CPU 時間片都在數十毫秒級別,而線程切換佔總耗時的千分之幾以內,協程的使用可以將這個損耗進一步降低(主要是去除了其他操作,比如 futex 等)。

線程內存消耗分析

不是所有編程語言或者系統都支持一次創建很多線程。

例如,在 x32 系統中即使使用了虛內存空間,因爲進程能訪問的虛內存空間大概是 3GB,所以單進程最多創建 300 多條線程(假設系統爲每條線程分配 10M 棧空間),太多線程甚至還伴隨着由於線程切換而觸發缺頁中斷的風險。

如果我們創建很多線程(比如 x64 系統下創建 1 萬個線程),不考慮優先級且假設 CPU 有 10 個核心,那麼每個線程每秒有 1ms 的時間片,整個業務的耗時大概是:

如果大量線程之間存在資源競爭,那麼系統行爲將難以預測。所以在有限的資源下創建大量線程是極其不合理的,服務線程的個數和 CPU 核心數應該在一個合理的比例內。

操作系統線程調度可參考:

  • https://help.perforce.com/sourcepro/current/HTML/index.html#page/SourcePro_Core/threadsug-ThreadPackage.22.118.html

在默認情況下,Linux 系統給每條線程分配的棧空間最大是 6~8MB,這個大小是上限,也是虛內存空間,並不是每條線程真實的棧使用情況。

線程真實棧內存使用會隨着線程執行而變化,如果線程只使用了少量局部變量,那麼真實線程棧可能只有幾十個字節的大小;系統在維護線程時需要分配額外的空間,所以線程數的增加還是會提高內存資源的消耗。

通過上面的分析我們可以知道:

如果業務處理時間遠小於 IO 耗時,線程切換非常頻繁,那麼使用協程是不錯的選擇;

並且,協程的優勢並不僅僅是減少線程之間切換,從編程的角度來看,協程的引入簡化了異步編程;

同時,協程爲一些異步編程提供了無鎖的解決方案,即:

協程可以用同步編程的方式實現異步編程才能實現的功能。

如何保存上下文

很多地方把協程稱爲 Subroutine;Subroutine 是什麼?就是函數!

上古時期的計算機科學家們早就給出了概念:Coroutine 就是可以中斷並恢復執行的 Subroutine。

因此從這個角度來看協程擁有調用棧並不是一個奇怪的事情。

再來思考,Coroutine 與 Subroutine 相比有什麼區別?區別僅有一個就是:

Coroutine 可以中斷並恢復,對應的操作就是 yield/resume

這樣看來 Subroutine 不過是 Coroutine 的一個子集罷了,也就是說把協程當做一個特殊的函數調用:

  • 可以中斷並恢復

既然可以把 Coroutine 當做一個特殊的函數調用,那麼如何像切換函數一樣去切換 Coroutine 呢?難點在於:

通常的做法是:在協程內部存儲自身的上下文,並在需要切換的時候把上下文切換;我們知道上下文其實本質上就是寄存器,所以保存上下文實際上就是把寄存器的值保存下來。

相對應的,有下面幾種方法:

關於setjmp.h:https://zh.m.wikipedia.org/zh-hans/Setjmp.h

需要注意的是:

setjmp/longjmp 一般不能作爲協程實現的底層機制,因爲 setjmp/longjmp 對棧信息的支持有限。

關於 ucontext.h:https://en.wikipedia.org/wiki/Setcontext

下面分別來看 setjmpucontext 方法,至於使用匯編的方法,會在本文講解有棧協程是講述。

使用 setjmp/longjmp

下面代碼模擬了單線程併發執行兩個 while(true){...} 函數:

源代碼:https://github.com/JasonkayZK/cpp-learn/tree/coroutine/setjmp_demo

setjmp_demo/setjmp_demo.cc

#include <cstdlib>
#include <cstdio>
#include <setjmp.h>

int max_iteration = 9;
int iter;

jmp_buf Main;
jmp_buf PointPing;
jmp_buf PointPong;

void Ping() {
    if (setjmp(PointPing) == 0) longjmp(Main, 1); // 可以理解爲重置,reset the world
    while (1) {
        printf("%3d : Ping-", iter);
        if (setjmp(PointPing) == 0) longjmp(PointPong, 1);
    }
}

void Pong() {
    if (setjmp(PointPong) == 0) longjmp(Main, 1);
    while (1) {
        printf("Pong\n");
        iter++;
        if (iter > max_iteration) exit(0);
        if (setjmp(PointPong) == 0) longjmp(PointPing, 1);
    }
}

int main(int argc, char* argv[]) {
    iter = 1;
    if (setjmp(Main) == 0) Ping();
    if (setjmp(Main) == 0) Pong();
    longjmp(PointPing, 1);
}

首先,我們定義了三個保存調用棧的節點:

並在 main 函數中首先創建(啓動)了兩個函數:Ping、Pong,在使用 longjmp(PointPing, 1); 之後,PointPing 不再是 0,從而啓動了 Ping 協程。此後,函數 Ping 和 函數 Pong 在 while (1) 中交替執行,而不再返回 main 函數中。

最後,當 iter > max_iteration 時,調用 exit(0) 退出。通過命令 g++ -std=c++11 setjmp_demo.cc -o setjmp_demo 編譯後執行 ./setjmp_demo ,輸出如下:

1 : Ping-Pong
2 : Ping-Pong
3 : Ping-Pong
4 : Ping-Pong
5 : Ping-Pong
6 : Ping-Pong
7 : Ping-Pong
8 : Ping-Pong
9 : Ping-Pong

雖然上面實現了比較簡單的函數切換,但是實際上我們無法通過 setjmp.h庫獲取到真正的上下文信息。如果想要真正獲取到上下文信息,可以使用 ucontext.h 庫。

使用 ucontext

下面關於 ucontext 的介紹源自:

  • http://pubs.opengroup.org/onlinepubs/7908799/xsh/ucontext.h.html

實際上,ucontext lib 已經不推薦使用了,但依舊是不錯的協程入門資料。其他底層協程庫實現可以查看:

協程庫的對比可以參考:

  • https://github.com/tboox/benchbox/wiki/switch

linux 系統一般都存在 ucontext 這個 C 語言庫,這個庫主要用於:操控當前線程下的 CPU 上下文。

setjmp/longjmp 不同,ucontext 直接提供了設置函數運行時棧的方式(makecontext),避免不同函數棧空間的重疊。

需要注意的是:

ucontext 只操作與當前線程相關的 CPU 上下文,所以下文中涉及 ucontext 的上下文均指當前線程的上下文;(一般 CPU 有多個核心,一個線程在某一時刻只能使用其中一個,所以 ucontext 只涉及一個與當前線程相關的 CPU 核心)

ucontext.h 頭文件中定義了 ucontext_t 這個結構體,這個結構體中至少包含以下成員:

ucontext_t *uc_link     // next context
sigset_t    uc_sigmask  // 阻塞信號阻塞
stack_t     uc_stack    // 當前上下文所使用的棧
mcontext_t  uc_mcontext // 實際保存 CPU 上下文的變量,這個變量與平臺&機器相關,最好不要訪問這個變量

可移植的程序最好不要讀取與修改 ucontext_t 中的 uc_mcontext,因爲不同平臺下 uc_mcontext 的實現是不同的。

同時,ucontext.h 頭文件中定義了四個函數,下面分別介紹:

int  getcontext(ucontext_t *); // 獲得當前 CPU 上下文
int  setcontext(const ucontext_t *);// 重置當前 CPU 上下文
void makecontext(ucontext_t *, (void *)(), int, ...); // 修改上下文信息,比如設置棧指針
int  swapcontext(ucontext_t *, const ucontext_t *);

下面分別來看。

getcontext
#include <ucontext.h>
int getcontext(uconte t_t *ucp);

getcontext` 函數使用當前 CPU 上下文初始化 ucp 所指向的結構體,初始化的內容包括:CPU 寄存器、信號 mask 和當前線程所使用的棧空間;

返回值getcontext 成功返回 0,失敗返回 -1。

setcontext
#include <ucontext.h>
int setcontext(ucontext_t *ucp);

getcontext 函數類似,setcontext 函數用於:設置 CPU 寄存器、信號 mask 和當前線程所使用的棧空間。

需要特別注意的是:

如果函數 setcontext 執行成功,那麼調用 setcontext 的函數將不會返回,因爲當前 CPU 的上下文已經交給其他函數或者過程了,當前函數完全放棄了 對 CPU 的 “所有權”。

getcontext 和 setcontext 的應用:

當信號處理函數需要執行的時候,當前線程的上下文需要保存起來,隨後進入信號處理階段;

makecontext
#include <ucontext.h>
void makecontext(ucontext_t *ucp, (void *func)(), int argc, ...);

makecontext 修改由 getcontext 創建的上下文 ucp

如果 ucp 指向的上下文由 swapcontextsetcontext 恢復,那麼當前線程將執行傳遞給 makecontext 的函數 func(...)

執行 makecontext 後需要爲新上下文分配一個棧空間,如果不創建,那麼新函數func執行時會使用舊上下文的棧,而這個棧可能已經不存在了。同時,argc 必須和 func 中整型參數的個數相等。

swapcontext
#include <ucontext.h>
int swapcontext(ucontext_t *oucp, const ucontext_t *ucp);

swapcontext 將當前上下文信息保存到 oucp 中並使用 ucp 重置 CPU 上下文。

返回值

如果 ucp 所指向的上下文沒有足夠的棧空間以執行餘下的過程,swapcontext 將返回 -1。

總結

相比於 setjml 略微簡單的功能,使用 ucontext 我們可以方便的獲取當前調用函數的上下文,進而實現協程。

協程的類別

協程的實現不只有一種,很多活躍的語言如 Python、Java、Golang 等都是支持協程的。

儘管這些協程可能名稱不同,甚至用法也不同,但它們都可以被劃分爲兩大類:

這裏所謂的有棧、無棧:

並不是說這個協程運行的時候有沒有棧,而是說協程之間是否存在調用棧(Callback Stack);

同時,根據協程之間是否有明顯的調用關係,我們又可以把協程分爲:

例如,協程 A 調用了協程 B:

  • 如果只有 B 完成之後才能調用 A,那麼此時 A/B 是非對稱協程;

  • 如果 A/B 被調用的概率相同,那麼此時 A/B 是對稱協程;

下面我們分別來看。

有棧協程

開源庫 libco 就是通過彙編語言實現的有棧協程庫,我們來看一看 libco 中對於 32 位機器的上下文切換操作是如何完成的:

通過分析代碼看到,無論是 co_yield_ct 還是 co_resume,在協程切出和恢復時,都調用了同一個函數co_swap,在這個函數中調用了 coctx_swap 來實現協程的切換,這一函數的原型是:

void coctx_swap( coctx_t *,coctx_t* ) asm("coctx_swap");

兩個參數都是 coctx_t * 指針類型,其中第一個參數表示要切出的協程,第二個參數表示切出後要進入的協程。

coctx_swap 函數便是用匯編實現的,我們這裏只關注 x86-64 相關的部分,其代碼如下:

coctx_swap:
   leaq 8(%rsp),%rax
   leaq 112(%rdi),%rsp
   pushq %rax
   pushq %rbx
   pushq %rcx
   pushq %rdx

   pushq -8(%rax) //ret func addr

   pushq %rsi
   pushq %rdi
   pushq %rbp
   pushq %r8
   pushq %r9
   pushq %r12
   pushq %r13
   pushq %r14
   pushq %r15

   movq %rsi, %rsp
   popq %r15
   popq %r14
   popq %r13
   popq %r12
   popq %r9
   popq %r8
   popq %rbp
   popq %rdi
   popq %rsi
   popq %rax //ret func addr
   popq %rdx
   popq %rcx
   popq %rbx
   popq %rsp
   pushq %rax

   xorl %eax, %eax
   ret

可以看出,coctx_swap 中並未像常規被調用函數一樣創立新的棧幀。

先看前兩條語句:

leaq 8(%rsp),%rax
leaq 112(%rdi),%rsp

leaq 用於把其第一個參數的值賦值給第二個寄存器參數,而第一條語句用來把 8(%rsp) 的本身的值存入到 %rax 中。

注意:

這裏使用的並不是 8(%rsp) 指向的值,而是把 8(%rsp) 表示的地址賦值給了 %rax,這一地址是父函數棧幀中除返回地址外棧幀頂的位置。

在第二條語句 leaq 112(%rdi), %rsp 中,%rdi 存放的是coctx_swap 第一個參數的值,這一參數是指向 coctx_t 類型的指針,表示當前要切出的協程,這一類型的定義如下:

struct coctx_t {
    void *regs[ 14 ]; 
    size_t ss_size;
    char *ss_sp;
};

因而 112(%rdi) 表示的就是第一個協程的 coctx_tregs[14] 數組的下一個 64 位地址,而接下來的語句:

pushq %rax   
pushq %rbx
pushq %rcx
pushq %rdx
pushq -8(%rax) //ret func addr
pushq %rsi
pushq %rdi
pushq %rbp
pushq %r8
pushq %r9
pushq %r12
pushq %r13
pushq %r14
pushq %r15

第一條語句 pushq %rax 用於把 %rax 的值放入到 regs[13] 中,resg[13] 用來存儲第一個協程的 %rsp 的值。這時 %rax 中的值是第一個協程 coctx_swap 父函數棧幀除返回地址外棧幀頂的地址。

由於 regs[] 中有單獨的元素存儲返回地址,棧中再保存返回地址是無意義的,因而把父棧幀中除返回地址外的棧幀頂作爲要保存的 %rsp 值是合理的;當協程恢復時,把保存的 regs[13] 的值賦值給 %rsp 即可恢復本協程 coctx_swap 父函數堆棧指針的位置。

第一條語句之後的語句就是用 pushq 把各 CPU 寄存器的值依次從 regs 尾部向前壓入。即通過調整 %rspregs[14] 當作堆棧,然後利用 pushq 把寄存器的值和返回地址存儲到 regs[14] 整個數組中。並且,regs[14] 數組中各元素與其要存儲的寄存器對應關係如下:

//-------------
// 64 bit
//low | regs[0]: r15 |
//    | regs[1]: r14 |
//    | regs[2]: r13 |
//    | regs[3]: r12 |
//    | regs[4]: r9  |
//    | regs[5]: r8  | 
//    | regs[6]: rbp |
//    | regs[7]: rdi |
//    | regs[8]: rsi |
//    | regs[9]: ret |  //ret func addr, 對應 rax
//    | regs[10]: rdx |
//    | regs[11]: rcx | 
//    | regs[12]: rbx |
//hig | regs[13]: rsp |

接下來的彙編語句:

movq %rsi, %rsp
popq %r15
popq %r14
popq %r13
popq %r12
popq %r9
popq %r8
popq %rbp
popq %rdi
popq %rsi
popq %rax //ret func addr
popq %rdx
popq %rcx
popq %rbx
popq %rsp

這裏用的方法還是通過改變 %rsp 的值,把某塊內存當作棧來使用。

第一句 movq %rsi, %rsp 就是讓 %rsp 指向 coctx_swap 第二個參數,這一參數表示要進入的協程。而第二個參數也是coctx_t 類型的指針,即執行完 movq 語句後,%rsp 指向了第二個參數 coctx_tregs[0],而之後的 pop 語句就是用 regs[0-13] 中的值填充 cpu 的寄存器,這裏需要注意的是 popq 會使得 %rsp 的值增加而不是減少,這一點保證了會從 regs[0]regs[13] 依次彈出到 cpu 寄存器中。

在執行完最後一句 popq %rsp 後,%rsp 已經指向了新協程要恢復的棧指針(即新協程之前調用 coctx_swap 時父函數的棧幀頂指針),由於每個協程都有一個自己的棧空間,可以認爲這一語句使得 %rsp 指向了要進入協程的棧空間。

coctx_swap 中最後三條語句如下:

pushq %rax
xorl %eax, %eax
ret

pushq %rax 用來把 %rax 的值壓入到新協程的棧中,這時 %rax 是要進入的目標協程的返回地址,即要恢復的執行點;然後用 xorl%rax 低 32 位清 0 以實現地址對齊;最後 ret 語句用來彈出棧的內容,並跳轉到彈出的內容表示的地址處,而彈出的內容正好是上面 pushq %rax 時壓入的 %rax 的值,即之前保存的此協程的返回地址。

即最後這三條語句實現了轉移到新協程返回地址處執行,從而完成了兩個協程的切換。

可以看出,這裏通過調整 %rsp 的值來恢復新協程的棧,並利用了 ret 語句來實現修改指令寄存器 %rip 的目的,通過修改 %rip 來實現程序運行邏輯跳轉。

注意:

%rip 的值不能直接修改,只能通過 callret 之類的指令來間接修改;

整體上看來,協程的切換其實就是:cpu 寄存器內容特別是 %rip%rsp 的寫入和恢復,因爲 cpu 的寄存器決定了程序從哪裏執行(%rip) 和使用哪個地址作爲堆棧 (%rsp)。

寄存器的寫入和恢復如下圖所示:

執行完上圖的流程,就將之前 cpu 寄存器的值保存到了協程 A 的 regs[14] 中,而將協程 B regs[14] 的內容寫入到了寄存器中,從而使執行邏輯跳轉到了 B 協程 regs[14] 中保存的返回地址處開始執行,即實現了協程的切換(從 A 協程切換到了 B 協程執行)。

詳細關於 libco 的實現細節:

  • https://www.zhihu.com/question/52193579

  • https://zhuanlan.zhihu.com/p/27409164

  • https://github.com/yyrdl/libco-code-study

無棧協程

無棧協程的本質就是一個狀態機(state machine),它可以理解爲在另一個角度去看問題,即:

首先,我們來看一個使用 libco 的協程的例子(當然 libco 是一個有棧協程):

void* test(void* para){
 co_enable_hook_sys();
 int i = 0;
 poll(0, 0, 0. 1000); // 協程切換執行權,1000ms後返回
 i++;
 poll(0, 0, 0. 1000); // 協程切換執行權,1000ms後返回
 i--;
 return 0;
}

int main(){
 stCoRoutine_t* routine;
 co_create(&routine, NULL, test, 0); // 創建一個協程
 co_resume(routine); 
 co_eventloop( co_get_epoll_ct(),0,0 );
 return 0;
}

這段代碼實際的意義就是:主協程跑一個協程去執行 test 函數,在 test 中我們需要兩次從協程中切換出去,這裏對應了兩個 poll 操作(hook 機制),hook 後的 poll 所做的事情就是把當前協程的 CPU 執行權切換到調用棧的上一層,並在超時或註冊的 fd 就緒時返回(當然樣例這裏就只是超時了)。

如果是無棧協程,實現相同邏輯的代碼是怎麼樣的呢?其實就是翻譯成類似於以下狀態機的代碼:

class test_coroutine {
    int i;
    int __state = 0;
    void MoveNext() {
        switch(__state) {
        case 0:
            return frist();
        case 1:
            return second();
        case 2:
         return third();
        }
    }
    void frist() {
        i = 0;
        __state = 1;
    }
    void second() {
        i++;
        _state = 2;
    }
    void third() {
     i--;
    }
};

我們可以看到:相比與有棧協程中的 test 函數,這裏把整個協程抽象成一個類,以原本需要執行切換的語句處爲界限,把函數劃分爲幾個部分,並在某一個部分執行完以後進行狀態轉移,在下一次調用此函數的時候就會執行下一部分。

這樣的話我們就完全沒有必要像有棧協程那樣顯式的執行上下文切換了,我們只需要一個簡易的調度器來調度這些函數即可。

在 Rust 中,async 也是一個語法糖,實際上編譯後就是實現了類似於上面的代碼結構,感興趣的可以去看《async book》。

從執行時棧的角度來看:

其實所有的協程共用的都是一個棧,即系統棧,也就也不必我們自行去給協程分配棧,因爲是函數調用,我們當然也不必去顯示的保存寄存器的值。而且相比有棧協程把局部變量放在新開的空間上,無棧協程直接使用系統棧使得 CPU cache 局部性更好,同時也使得無棧協程的中斷和函數返回幾乎沒有區別,這樣也可以凸顯出無棧協程的高效。

對稱協程與非對稱協程

前文中也簡單提到了對稱和非對稱協程,這裏也簡單聊一下吧。

其實對於 “對稱” 這個名詞,闡述的實際是:**協程之間的關係。**用大白話來說就是:對稱協程就是說協程之間人人平等,沒有誰調用誰一說,大家都是一樣的,而非對稱協程就是協程之間存在明顯的調用關係。

簡單來說就是這樣:

其實兩者的實現我覺得其實差異不大,非對稱協程其實就是擁有調用棧,而非對稱協程則是大家都平等,不需要調用棧,只需要一個數據結構存儲所有未執行完的協程即可。

至於哪種更優?這個需要分情況:

如果你使用協程的目的是爲了優化一些 IO 密集型應用,那麼協程切換出去的時候就是它等待事件到來的時候,此時你就算切換過去也沒有什麼意義,還不如等到事件到來的時候自動切換回去。

其實上面說的是有一些問題,因爲這個執行權的切換實際上是(調用者–被調用者)之間的切換,對稱就是它們之間都是平等的,就是假如 A 協程執行了 B,C 協程,那麼 B 協程可以切換回 A,也可以切換回 C;而非對稱只能是 B 切換回 A,A 切換回 C,C 再切換回 A,以此類推。

這樣看起來顯然非對稱協程相比之下更爲符合我們的認知,因爲對稱協程目前我不知道如何選擇一個合適的協程來獲得 CPU 執行權,正如上面所說,此協程可能正在等待事件;當然如果調度算法足夠優秀的話,對稱協程也是可取的。

關於協程的一些其他內容

N:1 & N:M 協程

我們知道,和線程綁定的協程只有在對應線程運行的時候纔有被執行的可能,如果對應線程中的某一個協程完全佔有了當前線程,那麼當前線程中的其他所有協程都不會被執行。

同時,協程的所有信息都保存在上下文(Contex)對象中,將不同上下文分發給不同的線程就可以實現協程的跨線程執行,如此,協程被阻塞的概率將減小。

因此,借用 BRPC 中對 N:M 協程的介紹,來解釋下什麼是 N:M 協程。

我們常說的協程通常指的是 N:1 線程庫,即所有的協程運行於一個系統線程中,計算能力和各類 eventloop 庫等價;

由於不跨線程,協程之間的切換不需要系統調用,可以非常快 (100ns-200ns),受 cache 一致性的影響也小;

但代價是協程無法高效地利用多核,代碼必須非阻塞,否則所有的協程都被卡住……

bthread 是一個 M:N 線程庫,一個bthread被卡住不會影響其他bthread

其中的關鍵技術有兩點:

前者讓 bthread 更快地被調度到更多的核心上,後者讓 bthread 和 pthread 可以相互等待和喚醒,這兩點協程都不需要;

更多 brpc 的線程見:

  • https://github.com/apache/incubator-brpc/blob/master/docs/cn/threading_overview.md

這麼看來 貌似 bthread 自己實現了 golang 的 goroutine?

表面看起來的卻如此:兩者都實現了 M:N 用戶態線程。但是事實上, golang 中的 goroutine 的實現要更爲複雜一些:

bthread 的設計比較接近 go 1.0 版本:OS 線程不會動態增加,在有大量的阻塞性 syscall 下,會有影響。

而 go 1.1 之後的設計就是動態增減 OS 線程,而且提供了 LockOSThread,可以讓 goroutine 和 OS 線程 1:1。

關於這個問題,見:https://www.zhihu.com/question/65549422

協程的組成

通過上面的描述,N:M 模式下的協程其實就是可用戶確定調度順序的用戶態線程,與系統級線程對照可以將協程框架分爲以下幾個模塊:

協程的調度

協程的調度與 OS 線程調度十分相似,如下圖協程調度示例所示:

協程相關工具

系統級線程有鎖(mutex)、條件變量(condition)等工具,協程也有對應的工具;比如:libgo 提供了協程之間使用的鎖 Co_mutex/Co_rwmutex

不同協程框架對工具的支持程度不同,實現方式也不盡相同;對此問題,本文不做深入介紹。

系統級線程和協程處於不同的系統層級,所以兩者的同步工具不完全通用,如果在協程中使用了線程的鎖(例如:std::mutex),則整個線程將會被阻塞,當前線程將不會再調度與執行其他協程。

最簡單的例子:

如果在一個協程中使用了 sleep,那麼這個線程下的所有協程全部都會被阻塞。** 在使用協程時,這種方法是非常低效的。**

協程 & 線程的對比

協程對 CPU/IO 的影響

協程的目的在於剔除線程的阻塞,儘可能提高 CPU 的利用率。很多服務在處理業務時需要請求第三方服務,向第三方服務發起 RPC 調用;RPC 調用的網絡耗時一般耗時在毫秒級別,RPC 服務的處理耗時也可能在毫秒級別,如果當前服務使用同步調用,即 RPC 返回後才進行後續邏輯,那麼一條線程每秒處理的業務數量是可以估算的。

假設每次業務處理花費在 RPC 調用上的耗時是 20ms,那麼一條線程一秒最多處理 50 次請求。

如果在等待 RPC 返回時當前線程沒有被系統調度轉換爲 Ready 狀態,那當前 CPU 核心就會空轉,浪費了 CPU 資源!通過增加線程數量提高系統吞吐量的效果非常有限,而且創建大量線程也會造成其他問題。

協程雖然不一定能減少一次業務請求的耗時,但一定可以提升系統的吞吐量:

C++20 標準中的協程

雖然 C++ 20 標準中引入了協程,但是 C++20 只引入了協程需要的底層支持,所以直接使用相對比較難,不過很多庫已經提供了封裝,比如:

需要說明的是:C++20 協程的性能還是非常高的,等 C++23 提供簡化後的 lib,我們就可以非常方便地使用協程了。

就目前而言,編譯協程相關代碼需要 g++10 或者更高版本(clang++12 對協程支持有限):

可以通過下面的命令安裝:

  • Mac:brew install gcc@10

  • Ubuntu:apt install gcc-10 / apt install g++-10

下面我寫了一個使用 C++20 標準中協程的例子:

cpp20_demo/cpp_20_demo.cc

#include <coroutine>
#include <iostream>

struct HelloCoroutine {

    struct HelloPromise {

        HelloCoroutine get_return_object() {
            return std::coroutine_handle<HelloPromise>::from_promise(*this);
        }

        std::suspend_never initial_suspend() { return {}; }

        std::suspend_always final_suspend() noexcept { return {}; }

        void unhandled_exception() {}
    };

    using promise_type = HelloPromise;

    HelloCoroutine(std::coroutine_handle<HelloPromise> h) : handle(h) {}

    std::coroutine_handle<HelloPromise> handle;
};

HelloCoroutine hello() {
    std::cout << "Hello " << std::endl;
    co_await std::suspend_always{};
    std::cout << "world!" << std::endl;
}

int main() {
    HelloCoroutine coro = hello();

    std::cout << "calling resume" << std::endl;
    coro.handle.resume();

    std::cout << "destroy" << std::endl;
    coro.handle.destroy();

    return 0;
}

編譯執行後輸出:

Hello 
calling resume
world!
destroy

由於篇幅有限,這裏不再詳述 C++20 標準中的協程使用了;

如果想更深入的學習,可以參考:

動手實現協程

上面文章的內容基本上已經把整個協程介紹的七七八八了。看了這麼多內容,你是不是心動想要自己動手寫一個協程庫了呢?

那麼,跟隨下面的內容,一起使用 C++ 實現協程吧。是的,有棧協程、無棧協程都會實現一遍。

基於彙編實現的有棧協程

首先我們來使用匯編來實現一個有棧協程,這裏參考的是微信開源的 libco;

源代碼:https://github.com/JasonkayZK/cpp-learn/tree/coroutine/stack_co

協程環境

本例中實現的協程不支持跨線程,而是每個線程分配一個環境,來維護該線程下運行中的協程之間的層次關係,代碼如下:

stack_co/environment.h

#ifndef COROUTINE_ENVIRONMENT_H
#define COROUTINE_ENVIRONMENT_H

#include "coroutine.h"

#include <cstddef>
#include <cstring>
#include <functional>
#include <memory>

namespace stack_co {

    class Coroutine;

    class Environment {
        friend class Coroutine;

    public:
        // Thread-local instance
        static Environment &instance();

        // Factory method
        template<typename Entry, typename ...Args>
        std::shared_ptr<Coroutine> create_coroutine(Entry &&entry, Args &&...arguments);

        // No copy constructor
        Environment(const Environment &) = delete;

        // No Assignment Operator
        Environment &operator=(const Environment &) = delete;

        // Get current coroutine in the stack
        Coroutine *current();

    private:
        // No explicit constructor
        Environment();

        void push(std::shared_ptr<Coroutine> coroutine);

        void pop();

    private:
        // Coroutine calling stack
        std::array<std::shared_ptr<Coroutine>, 1024> _c_stack;

        // Top of the coroutine calling stack
        size_t _c_stack_top;

        // Main coroutine(root)
        std::shared_ptr<Coroutine> _main;
    };

    // A default factory method
    template<typename Entry, typename ...Args>
    inline std::shared_ptr<Coroutine> Environment::create_coroutine(Entry &&entry, Args &&...arguments) {
        return std::make_shared<Coroutine>(
                this, std::forward<Entry>(entry), std::forward<Args>(arguments)...);
    }

} // namespace stack_co

#endif //COROUTINE_ENVIRONMENT_H

上面的代碼定義了協程運行的環境(Environment)。

需要注意的是:

我們顯式的刪除了 Environment 的拷貝構造函數和賦值運算符,並且將構造函數聲明爲 private,僅提供工廠方法來創建 Environment 實例。

而 Environment 在實現時,使用的是 thread_local,從而保證了每個線程僅會存在單個實例。

對外暴露了 current 方法用於獲取當前環境下調用棧中的協程,而三個成員變量是用來保存或記錄當前調用協程的:

Environment 類對應的實現:

stack_co/environment.cc

#include "environment.h"

namespace stack_co {

    Environment &Environment::instance() {
        static thread_local Environment env;
        return env;
    }

    Coroutine *Environment::current() {
        return this->_c_stack[this->_c_stack_top - 1].get();
    }

    void Environment::push(std::shared_ptr<Coroutine> coroutine) {
        _c_stack[_c_stack_top++] = std::move(coroutine);
    }

    void Environment::pop() {
        _c_stack_top--;
    }

    Environment::Environment() : _c_stack_top(0) {
        _main = std::make_shared<Coroutine>(this, []() {});
        push(_main);
    }

} // namespace stack_co

實現內容比較簡單,主要是:

下面來看協程實例相關的定義。

協程狀態

協程相關的狀態在 status.h 頭文件中定義了:

stack_co/status.h

#ifndef COROUTINE_STATUS_H
#define COROUTINE_STATUS_H

namespace stack_co {

    // The status of the coroutine
    struct Status {
        using Bitmask = unsigned char;

        constexpr static Bitmask MAIN = 1 << 0;
        constexpr static Bitmask IDLE = 1 << 1;
        constexpr static Bitmask RUNNING = 1 << 2;
        constexpr static Bitmask EXIT = 1 << 3;

        Bitmask operator&(Bitmask mask) const { return flag & mask; }

        Bitmask operator|(Bitmask mask) const { return flag | mask; }

        Bitmask operator^(Bitmask mask) const { return flag ^ mask; }

        void operator&=(Bitmask mask) { flag &= mask; }

        void operator|=(Bitmask mask) { flag |= mask; }

        void operator^=(Bitmask mask) { flag ^= mask; }

        Bitmask flag;
    };
} // namespace stack_co

#endif //COROUTINE_STATUS_H

協程相關的狀態主要包括了下面幾類:

並重載了一些運算符。

協程實例

協程的實例主要是用於支持接口 resumeyield

代碼如下:

stack_co/coroutine.h

#ifndef COROUTINE_COROUTINE_H
#define COROUTINE_COROUTINE_H

#include "status.h"
#include "context.h"

#include <functional>
#include <memory>

namespace stack_co {

    class Environment;

    class Coroutine : public std::enable_shared_from_this<Coroutine> {
        friend class Environment;

        friend class Context;

    public:
        static Coroutine ¤t();

        // 測試當前控制流是否位於協程上下文
        static bool test();

        // 獲取當前運行時信息
        // 目前僅支持運行、退出、主協程的判斷
        Status runtime() const;

        bool exit() const;

        bool running() const;

        // 核心操作:resume和yield

        // usage: Coroutine::current().yield()
        static void yield();

        // Note1: 允許處於EXIT狀態的協程重入,從而再次resume
        //        如果不使用這種特性,則用exit() / running()判斷
        //
        // Note2: 返回值可以得知resume並執行後的運行時狀態
        //        但是這個值只適用於簡單的場合
        //        如果接下來其它協程的運行也影響了該協程的狀態
        //        那建議用runtime()獲取
        Status resume();

        Coroutine(const Coroutine &) = delete;

        Coroutine(Coroutine &&) = delete;

        Coroutine &operator=(const Coroutine &) = delete;

        Coroutine &operator=(Coroutine &&) = delete;

    public:

        // 構造Coroutine執行函數,entry爲函數入口,對應傳參爲arguments...
        // Note: 出於可重入的考慮,entry強制爲值語義
        template<typename Entry, typename ...Args>
        Coroutine(Environment *master, Entry entry, Args ...arguments)
                : _entry([=] { entry(/*std::move*/(arguments)...); }),
                  _master(master) {}

    private:
        static void call_when_finish(Coroutine *coroutine);

    private:
        Status _runtime{};

        Context _context{};

        std::function<void()> _entry;

        Environment *_master;
    };

} // namespace stack_co

#endif //COROUTINE_COROUTINE_H

在 Coroutine 中定義了:

對應的方法實現:

stack_co/coroutine.cc

#include "coroutine.h"
#include "environment.h"

namespace stack_co {

    Coroutine &Coroutine::current() {
        return *Environment::instance().current();
    }

    bool Coroutine::test() {
        return current()._context.test();
    }

    Status Coroutine::runtime() const {
        return _runtime;
    }

    bool Coroutine::exit() const {
        return _runtime & Status::EXIT;
    }

    bool Coroutine::running() const {
        return _runtime & Status::RUNNING;
    }

    Status Coroutine::resume() {
        if (!(_runtime & Status::RUNNING)) {
            _context.prepare(Coroutine::call_when_finish, this);
            _runtime |= Status::RUNNING;
            _runtime &= ~Status::EXIT;
        }
        auto previous = _master->current();
        _master->push(shared_from_this());
        _context.switch_from(&previous->_context);
        return _runtime;
    }

    void Coroutine::yield() {
        auto &coroutine = current();
        auto ¤tContext = coroutine._context;

        coroutine._master->pop();

        auto &previousContext = current()._context;
        previousContext.switch_from(¤tContext);
    }

    void Coroutine::call_when_finish(Coroutine *coroutine) {
        auto &routine = coroutine->_entry;
        auto &runtime = coroutine->_runtime;
        if (routine) routine();
        runtime ^= (Status::EXIT | Status::RUNNING);
        // coroutine->yield();
        yield();
    }

} // namespace stack_co

協程內部的各種操作主要是調用其內部的 Context 實現的,下面我們來看。

上下文實例

上下文信息 Context 用於維護協程 Coroutine 的函數調用信息。

需要注意的是:上下文需要確保內存佈局準確無誤才能使用,一個context的起始地址必須是regs[0],否則會影響後面的協程切換正確性。

代碼如下:

stack_co/context.h

#ifndef COROUTINE_CONTEXT_H
#define COROUTINE_CONTEXT_H

#include <cstddef>
#include <cstring>
#include <iterator>

namespace stack_co {

    class Coroutine;

    /**
     * The context of coroutine(in x86-64)
     *
     * low | _registers[0]: r15  |
     *     | _registers[1]: r14  |
     *     | _registers[2]: r13  |
     *     | _registers[3]: r12  |
     *     | _registers[4]: r9   |
     *     | _registers[5]: r8   |
     *     | _registers[6]: rbp  |
     *     | _registers[7]: rdi  |
     *     | _registers[8]: rsi  |
     *     | _registers[9]: ret  |
     *     | _registers[10]: rdx |
     *     | _registers[11]: rcx |
     *     | _registers[12]: rbx |
     * hig | _registers[13]: rsp |
     *
     */
    class Context final {
    public:
        using Callback = void (*)(Coroutine *);
        using Word = void *;

        constexpr static size_t STACK_SIZE = 1 << 17;
        constexpr static size_t RDI = 7;
        constexpr static size_t RSI = 8;
        constexpr static size_t RET = 9;
        constexpr static size_t RSP = 13;

    public:
        void prepare(Callback ret, Word rdi);

        void switch_from(Context *previous);

        bool test();

    private:
        Word get_stack_pointer();

        void fill_registers(Word sp, Callback ret, Word rdi, ...);

    private:
        /**
         * We must ensure that registers are at the top of the memory layout.
         *
         * So the Context must have no virtual method, and len at least 14!
         */
        Word _registers[14];

        char _stack[STACK_SIZE];
    };

} // namespace stack_co

#endif //COROUTINE_CONTEXT_H

對應的 C++ 文件:

stack_co/context.cc

#include "context.h"

extern "C" {
extern void switch_context(stack_co::Context *, stack_co::Context *) asm("switch_context");
}

namespace stack_co {

    void Context::switch_from(Context *previous) {
        switch_context(previous, this);
    }

    void Context::prepare(Context::Callback ret, Context::Word rdi) {
        Word sp = get_stack_pointer();
        fill_registers(sp, ret, rdi);
    }

    bool Context::test() {
        char current;
        ptrdiff_t diff = std::distance(std::begin(_stack), ¤t);
        return diff >= 0 && diff < STACK_SIZE;
    }

    Context::Word Context::get_stack_pointer() {
        auto sp = std::end(_stack) - sizeof(Word);
        sp = decltype(sp)(reinterpret_cast<size_t>(sp) & (~0xF));
        return sp;
    }

    void Context::fill_registers(Word sp, Callback ret, Word rdi, ...) {
        ::memset(_registers, 0, sizeof _registers);
        auto pRet = (Word *) sp;
        *pRet = (Word) ret;
        _registers[RSP] = sp;
        _registers[RET] = *pRet;
        _registers[RDI] = rdi;
    }

} // namespace stack_co

其中,C++ 中的實現使用了彙編:

stack_co/switch_context.S

.globl switch_context
.type  switch_context, @function
switch_context:
    movq %rsp, %rax
    movq %rax, 104(%rdi)
    movq %rbx, 96(%rdi)
    movq %rcx, 88(%rdi)
    movq %rdx, 80(%rdi)
    movq 0(%rax), %rax
    movq %rax, 72(%rdi)
    movq %rsi, 64(%rdi)
    movq %rdi, 56(%rdi)
    movq %rbp, 48(%rdi)
    movq %r8, 40(%rdi)
    movq %r9, 32(%rdi)
    movq %r12, 24(%rdi)
    movq %r13, 16(%rdi)
    movq %r14, 8(%rdi)
    movq %r15, (%rdi)

    movq 48(%rsi), %rbp
    movq 104(%rsi), %rsp
    movq (%rsi), %r15
    movq 8(%rsi), %r14
    movq 16(%rsi), %r13
    movq 24(%rsi), %r12
    movq 32(%rsi), %r9
    movq 40(%rsi), %r8
    movq 56(%rsi), %rdi
    movq 72(%rsi), %rax
    movq 80(%rsi), %rdx
    movq 88(%rsi), %rcx
    movq 96(%rsi), %rbx
    movq 64(%rsi), %rsi

    movq %rax, (%rsp)
    xorq %rax, %rax
    ret

上面的 Context 的核心功能 switch_context 主要就是通過彙編 stack_co/switch_context.S 實現的,主要核心就是一個switch過程。

這裏調用時rdi(previous)和rsi(next)分別指向Context實例的地址。首先是保存當前的寄存器上下文到 previous 的 _registers 中:

恢復過程則是從next的_registers 中恢復:

具體的彙編含義在前文中已經完完整整講述了,這裏不再贅述;

而其他的方法,如:prepareget_stack_pointerfill_registers實際上都是爲了獲取當前調用棧的上下文信息;

補充:彙編擴展名的差異

上文中的彙編文件命名爲:switch_context.S,在 Unix/Linux 系統中:

  • .a是靜態庫的常用擴展 (也就是用多個.o文件製作的檔案ar(1)),動態庫,即共享對象,使用.so

  • .s用於 asm 編譯器輸出 (gcc -S foo.c生成 asm 輸出,默認文件名爲foo.s);

  • .S用於手寫的 asm 源文件,並且.S適用於 GNU as語法中的 asm,無論是否使用任何 C 預處理器功能;

例如,在 glibc 的源代碼樹中使用.S的所有 ASM 源文件

通常情況下:

  • 具有 gcc 背景的人可能將他們的 MIPS asm 放入文件.S.s文件中;

  • 而具有更多 NASM/YASM 經驗 (或 Windows) 的人可能會選擇.asm 擴展名;

但是建議不要使用 .s 文件,因爲它很容易被覆蓋:gcc -S foo.c

參考文章:彙編文件:.a .s .asm 之間的差異

測試代碼

測試代碼如下:

stack_co/stack_co_test.cc

#include "coroutine.h"
#include "environment.h"
#include "utils.h"

#include <iostream>

namespace stack_co {

    namespace this_coroutine {

        inline void yield() {
            return ::stack_co::Coroutine::yield();
        }

    } // namespace this_coroutine

    inline bool test() {
        return Coroutine::test();
    }

    inline Environment &open() {
        return Environment::instance();
    }

} // namespace stack_co

void where() {
    std::cout << "running code in a "
              << (stack_co::test() ? "coroutine" : "thread")
              << std::endl;
}

void print1() {
    std::cout << 1 << std::endl;
    stack_co::this_coroutine::yield();
    std::cout << 2 << std::endl;
}

void print2(int i, stack_co::Coroutine *co1) {
    std::cout << i << std::endl;
    co1->resume();
    where();
    std::cout << "bye" << std::endl;
}

int main() {
    auto &env = stack_co::open();
    auto co1 = env.create_coroutine(print1);
    auto co2 = env.create_coroutine(print2, 3, co1.get());
    co1->resume();
    co2->resume();
    where();
    return 0;
}

上面的代碼首先在 main 函數中創建了一個 Environment,隨後加入了兩個函數:

auto co1 = env.create_coroutine(print1);
auto co2 = env.create_coroutine(print2, 3, co1.get());

隨後啓動兩個協程;

首先進入 print1,打印 1

然後, print1 釋放 CPU,切換至 print2 打印 3

然後,在 print2 函數中回覆協程 1,繼續進入 print1 中執行,並打印 2

然後,print1 函數退出,調用棧返回至 print2 中,調用 where 函數;

然後,在 print2 函數中打印 bye

最後,print2 函數返回,在 main 函數中調用 where

代碼執行後,輸出結果如下:

1
3
2
running code in a coroutine
bye
running code in a thread

可以看到,跟隨着代碼來看協程的調用棧的切換是很清晰的!

實際上微信開源的 libco 不僅提供了一套類 pthread 的協程通信機制,同時可以零改造地將三方庫的阻塞 IO 調用進行協程化;

感興趣的可以看:微信 libco 協程庫源碼分析

基於ucontext實現的無棧協程

上面的例子是使用匯編實現的有棧協程,相對應的,我們繼續使用 ucontext 庫來實現一個無棧協程;

源代碼:https://github.com/JasonkayZK/cpp-learn/tree/coroutine/stackless_co

協程調用函數的定義

爲了簡單起見,我們這裏定義的協程可以調用的函數簽名爲:

stackless_co/utils.h

typedef void (*coroutine_func)(Schedule *, void *ud);

在調用時,可以傳入一個 arg 結構體,來使用;

例如:

struct args {
    int n;
};

作爲參數 *ud

同時,考慮到參數的通用性,這裏使用了 void* 作爲入參和返回值;

協程實例

協程的定義如下:

stackless_co/coroutine.h

#ifndef COROUTINE_COROUTINE_H
#define COROUTINE_COROUTINE_H

#include "utils.h"
#include "schedule.h"

#include <cstdio>
#include <cstdlib>
#include <cassert>
#include <cstddef>
#include <cstring>
#include <cstdint>

#if __APPLE__ && __MACH__
#include <sys/ucontext.h>
#else

#include <ucontext.h>

#endif

namespace stackless_co {

    class Schedule;

    class Coroutine {
    public:

        static Coroutine *new_co(Schedule *s, coroutine_func func, void *ud);

        void delete_co();

        inline coroutine_func get_func() {
            return func;
        }

        inline ucontext_t *get_ctx() {
            return &ctx;
        }

        inline int get_status() {
            return status;
        }

        inline ptrdiff_t get_size() {
            return this->size;
        }

        inline char *get_stack() {
            return this->stack;
        }

        inline ptrdiff_t get_cap() {
            return this->cap;
        }

        inline void *get_ud() {
            return this->ud;
        }

        inline void set_status(int status) {
            this->status = status;
        }

        inline void set_stack(char *stack) {
            this->stack = stack;
        }

        inline void set_cap(ptrdiff_t cap) {
            this->cap = cap;
        }

        inline void set_size(ptrdiff_t size) {
            this->size = size;
        }

    private:
        coroutine_func func;
        void *ud;
        ucontext_t ctx;
        ptrdiff_t cap;
        ptrdiff_t size;
        int status;
        char *stack;
    };

} // namespace stackless_co

#endif //COROUTINE_COROUTINE_H

協程 Coroutine 的定義比較簡單,主要用於存放一些協程的信息,並無特殊邏輯;

具體的幾個成員變量定義如下:

並且:

對應的實現:

stackless_co/coroutine.cc

#include "utils.h"
#include "coroutine.h"
#include "schedule.h"

namespace stackless_co {

    Coroutine *Coroutine::new_co(Schedule *s, coroutine_func func, void *ud) {
        auto *co = new(Coroutine);
        co->func = func;
        co->ud = ud;
        co->cap = 0;
        co->size = 0;
        co->status = Schedule::COROUTINE_READY;
        co->stack = nullptr;
        return co;
    }

    void Coroutine::delete_co() {
        free(this->stack);
        free(this);
    }

} // namespace stackless_co

方法實現非常簡單,這裏不再贅述了;

協程調度 Schedule

我們通過 Schedule 類來調度協程;

stackless_co/schedule.h

#ifndef COROUTINE_SCHEDULE_H
#define COROUTINE_SCHEDULE_H

#include "utils.h"
#include "coroutine.h"

#include <cstdio>
#include <cstdlib>
#include <cassert>
#include <cstring>
#include <cstdint>

#if __APPLE__ && __MACH__
#include <sys/ucontext.h>
#else

#include <ucontext.h>

#endif

namespace stackless_co {

    class Coroutine;

    class Schedule {

    private:

        static void _save_stack(Coroutine *C, char *top);

    public:
        static Schedule *coroutine_open();

        static void main_func(uint32_t low32, uint32_t hi32);

        void coroutine_close();

        int coroutine_new(coroutine_func, void *ud);

        void coroutine_resume(int id);

        int coroutine_status(int id);

        int coroutine_running() const;

        void coroutine_yield();

    public:

        constexpr static int COROUTINE_DEAD = 0;

        constexpr static int COROUTINE_READY = 1;

        constexpr static int COROUTINE_RUNNING = 2;

        constexpr static int COROUTINE_SUSPEND = 3;

    private:
        constexpr static int STACK_SIZE = 1024 * 1024;

        constexpr static int DEFAULT_COROUTINE = 16;

    private:
        char stack[STACK_SIZE];
        ucontext_t main;
        int nco;
        int cap;
        int running;
        Coroutine **co;
    };

} // namespace stackless_co

#endif //COROUTINE_SCHEDULE_H

協程調度器 Schedule 負責管理用其創建的所有協程,其中有幾個成員變量非常重要:

此外:

對應實現如下:

stackless_co/schedule.cc

#include "utils.h"
#include "schedule.h"
#include "coroutine.h"

#include <cstdlib>
#include <cassert>
#include <cstring>
#include <cstdint>

#if __APPLE__ && __MACH__

#include <sys/ucontext.h>

#else

#include <ucontext.h>

#endif


namespace stackless_co {

    Schedule *Schedule::coroutine_open() {
        auto *s = new(Schedule);
        s->nco = 0;
        s->cap = DEFAULT_COROUTINE;
        s->running = -1;
        s->co = (Coroutine **) malloc(sizeof(Coroutine) * s->cap);
        memset(s->co, 0, sizeof(struct coroutine *) * s->cap);
        return s;
    }

    void Schedule::main_func(uint32_t low32, uint32_t hi32) {
        uintptr_t ptr = (uintptr_t) low32 | ((uintptr_t) hi32 << 32);
        auto *s = (Schedule *) ptr;
        int id = s->running;
        Coroutine *c = s->co[id];
        c->get_func()(s, c->get_ud());
        c->delete_co();
        s->co[id] = nullptr;
        --s->nco;
        s->running = -1;
    }

    void Schedule::coroutine_close() {
        int i;
        for (i = 0; i < this->cap; i++) {
            Coroutine *inner_co = this->co[i];
            if (inner_co) {
                inner_co->delete_co();
            }
        }
        free(this->co);
        this->co = nullptr;
        free(this);
    }

    int Schedule::coroutine_new(coroutine_func func, void *ud) {
        Coroutine *inner_co = Coroutine::new_co(this, func, ud);

        if (this->nco >= this->cap) {
            int id = this->cap;
            this->co = (Coroutine **) realloc(this->co, this->cap * 2 * sizeof(Coroutine));
            memset(this->co + this->cap, 0, sizeof(struct coroutine *) * this->cap);
            this->co[this->cap] = inner_co;
            this->cap *= 2;
            ++this->nco;
            return id;
        } else {
            int i;
            for (i = 0; i < this->cap; i++) {
                int id = (i + this->nco) % this->cap;
                if (this->co[id] == nullptr) {
                    this->co[id] = inner_co;
                    ++this->nco;
                    return id;
                }
            }
        }

        return 0;
    }

    void Schedule::coroutine_resume(int id) {
        assert(this->running == -1);
        assert(id >= 0 && id < this->cap);
        Coroutine *c = this->co[id];
        if (c == nullptr) return;

        int status = c->get_status();
        auto ptr = (uintptr_t) this;
        switch (status) {
            case COROUTINE_READY:

                getcontext(c->get_ctx());
                c->get_ctx()->uc_stack.ss_sp = this->stack;
                c->get_ctx()->uc_stack.ss_size = STACK_SIZE;
                c->get_ctx()->uc_link = &this->main;
                this->running = id;
                c->set_status(COROUTINE_RUNNING);

                makecontext(c->get_ctx(), (void (*)()) main_func, 2, (uint32_t) ptr, (uint32_t) (ptr >> 32));
                swapcontext(&this->main, c->get_ctx());
                break;
            case COROUTINE_SUSPEND:
                memcpy(this->stack + STACK_SIZE - c->get_size(), c->get_stack(), c->get_size());
                this->running = id;
                c->set_status(COROUTINE_RUNNING);
                swapcontext(&this->main, c->get_ctx());
                break;
            default:
                assert(0);
        }
    }

    int Schedule::coroutine_status(int id) {
        assert(id >= 0 && id < this->cap);
        if (this->co[id] == nullptr) {
            return COROUTINE_DEAD;
        }
        return this->co[id]->get_status();
    }

    int Schedule::coroutine_running() const {
        return this->running;
    }

    void Schedule::coroutine_yield() {
        int id = this->running;
        assert(id >= 0);
        Coroutine *c = this->co[id];
        assert((char *) &c > this->stack);
        _save_stack(c, this->stack + STACK_SIZE);
        c->set_status(COROUTINE_SUSPEND);
        this->running = -1;
        swapcontext(c->get_ctx()&this->main);
    }

    void Schedule::_save_stack(Coroutine *c, char *top) {
        char dummy = 0;
        assert(top - &dummy <= STACK_SIZE);
        if (c->get_cap() < top - &dummy) {
            free(c->get_stack());
            c->set_cap(top - &dummy);
            c->set_stack(static_cast<char *>(malloc(c->get_cap())));
        }

        c->set_size(top - &dummy);
        memcpy(c->get_stack()&dummy, c->get_size());
    }

} // namespace stackless_co

下面分別來看;

協程的創建: coroutine_new
int Schedule::coroutine_new(coroutine_func func, void *ud) {
  Coroutine *inner_co = Coroutine::new_co(this, func, ud);

  if (this->nco >= this->cap) {
    int id = this->cap;
    this->co = (Coroutine **) realloc(this->co, this->cap * 2 * sizeof(Coroutine));
    memset(this->co + this->cap, 0, sizeof(struct coroutine *) * this->cap);
    this->co[this->cap] = inner_co;
    this->cap *= 2;
    ++this->nco;
    return id;
  } else {
    int i;
    for (i = 0; i < this->cap; i++) {
      int id = (i + this->nco) % this->cap;
      if (this->co[id] == nullptr) {
        this->co[id] = inner_co;
        ++this->nco;
        return id;
      }
    }
  }

  return 0;
}

coroutine_new 負責創建並初始化一個新協程對象,同時將該協程對象放到協程調度器裏面。

這裏的實現有兩個非常值得學習的點:

這樣,一個協程對象就被創建好,此時該協程的狀態是 READY,但尚未正式執行。

我們需要調用  coroutine_resume 方法啓動協程,下面來看  coroutine_resume 方法。

coroutine_resume(READY -> RUNNING)

調用 coroutine_resume 方法會切入到指定協程中執行,此時,當前正在執行的協程的上下文會被保存起來,同時上下文替換成新的協程,並將該協程的狀態置爲 RUNNING

進入 coroutine_resume 函數的前置狀態有兩個 READYSUSPEND,這兩個狀態下 coroutine_resume 的處理方法也是有很大不同。我們先看下協程在 READY 狀態下進行 coroutine_resume 的流程:

這塊代碼比較短,但是非常重要,所以我就直接貼代碼了:

// 初始化 ucontext_t 結構體,將當前的上下文放到 C->ctx 裏面
getcontext(c->get_ctx());

// 將當前協程的運行時棧的棧頂設置爲 s->stack,每個協程都這麼設置,這就是所謂的共享棧。(注意,這裏是棧頂)
c->get_ctx()->uc_stack.ss_sp = this->stack;
c->get_ctx()->uc_stack.ss_size = STACK_SIZE;
c->get_ctx()->uc_link = &this->main;
this->running = id;
c->set_status(COROUTINE_RUNNING);

// 設置執行 c->ctx 函數, 並將 s 作爲參數傳進去
makecontext(c->get_ctx()(void (*)()) main_func, 2, (uint32_t) ptr, (uint32_t) (ptr >> 32));

// 將當前的上下文放入 s->main 中,並將 c->ctx 的上下文替換到當前上下文
swapcontext(&this->main, c->get_ctx());

這段函數非常的重要,有幾個不可忽視的點:

接下來是 makecontext,這個函數用來設置對應 ucontext 的執行函數;如上,將 c->ctx 的執行函數體設置爲了 mainfunc

makecontext 後面的兩個參數也非常有意思:

可以看出來其入參是把一個指針掰成了兩個 int 作爲參數傳給 mainfunc;而在 mainfunc 的實現可以看到,其又會把這兩個 int 拼成 Schedule*

爲什麼不直接傳 Schedule*,而要這麼做,通過先拆兩半,再在函數中拼起來呢?

這是因爲 makecontext 的函數指針的參數是 uint32_t 類型,在 64 位系統下,一個 uint32_t 沒法承載一個指針, 所以基於兼容性的考慮,才採用了這種做法;

接下來調用了 swapcontext 函數,這個函數比較簡單,但也非常核心:

其作用是將當前的上下文內容放入 s->main 中,並使用 c->ctx 的上下文替換到當前上下文(類似於前文彙編的作用)。

這樣的話,就會執行新的上下文對應的程序了;(在 coroutine 中, 也就是開始執行 mainfunc 這個函數,mainfunc 是對用戶提供的協程函數的封裝)。

協程的切出:coroutine_yield

調用 coroutine_yield 可以使當前正在運行的協程切換到主協程中運行;此時,該協程會進入 SUSPEND 狀態。

coroutine_yield 的具體實現依賴於兩個行爲:

這裏有個點極其關鍵,就是:如何保存當前協程的運行時棧,即如何獲取整個棧的內存空間;

**注:**我們都知道,調用棧的生長方向是從高地址往低地址;

因此,我們只要找到棧的棧頂和棧底的地址,就可以找到整個棧內存空間了。

因爲協程的運行時棧的內存空間是自己分配的(在 coroutine_resume 階段設置了 c->ctx.uc_stack.ss_sp = s.this->stack)。

根據以上理論,棧的生長方向是高地址到低地址,因此:

棧底的就是內存地址最大的位置,即 s->stack + STACK_SIZE 就是棧底位置。

那麼,如何找到棧頂的位置呢?是通過下面的方法做的:

void Schedule::_save_stack(Coroutine *c, char *top) {
  char dummy = 0;
  assert(top - &dummy <= STACK_SIZE);
  if (c->get_cap() < top - &dummy) {
    free(c->get_stack());
    c->set_cap(top - &dummy);
    c->set_stack(static_cast<char *>(malloc(c->get_cap())));
  }

  c->set_size(top - &dummy);
  memcpy(c->get_stack()&dummy, c->get_size());
}

這裏特意使用到了一個 dummy 變量,這個 dummy 的作用非常關鍵也非常巧妙。

因爲 dummy 變量是剛剛分配到棧上的,因此,此時就位於 棧的最頂部位置。

並且,此時整個內存佈局如下圖所示:

因此整個棧的大小就是從棧底到棧頂,s->stack + STACK_SIZE - &dummy

最後又調用了 memcpy 將當前運行時棧的內容,拷貝到了 c->stack 中保存了起來。

coroutine_resume(SUSPEND -> RUNNING)

當協程被 yield 之後會進入 SUSPEND 階段,對該協程調用 coroutine_resume 會再次切入該協程。

這部分的代碼如下:

memcpy(this->stack + STACK_SIZE - c->get_size(), c->get_stack(), c->get_size());
this->running = id;
c->set_status(COROUTINE_RUNNING);
swapcontext(&this->main, c->get_ctx());

這裏的實現有兩個重要的點:

補充:共享棧

共享棧這個詞在 libco 中提到的多,其實 coroutine 也是用的共享棧模型;

共享棧這個東西說起來很玄乎,實際原理不復雜:本質就是所有的協程在運行的時候都使用同一個棧空間。

有共享棧自然就有非共享棧,也就是每個協程的棧空間都是獨立的,固定大小:

因爲棧空間在運行時不能隨時擴容,否則如果有指針操作執行了棧內存,擴容後將導致指針失效。

因此,爲了防止棧內存不夠,每個協程都要預先開一個足夠的棧空間使用;當然很多協程在實際運行中也用不了這麼大的空間,就必然造成內存的浪費和開闢大內存造成的性能損耗。

共享棧則是提前開了一個足夠大的棧空間(如上面的實現 STACK_SIZE = 1024 * 1024;,即1M大小);所有的棧運行的時候,都使用這個棧空間。

並且設置每個協程的運行時棧:

c->ctx.uc_stack.ss_sp = s->stack;
c->ctx.uc_stack.ss_size = STACK_SIZE;

對協程調用 yield 的時候,該協程棧內容暫時保存起來,保存的時候需要用到多少內存就開多少,這樣就減少了內存的浪費(即_save_stack 函數的內容)。

resume 該協程的時候,協程之前保存的棧內容,會被重新拷貝到運行時棧中,這就是所謂的共享棧的原理。

測試代碼

具體的測試代碼如下:

stackless_co/stackless_co_test.cc

#include "schedule.h"

#include <cstdio>

struct args {
    int n;
};

void foo(stackless_co::Schedule *s, void *ud) {
    args *arg = static_cast<args *>(ud);
    int start = arg->n;
    for (int i = 0; i < 5; i++) {
        printf("coroutine %d : %d\n", s->coroutine_running(), start + i);
        s->coroutine_yield();
    }
}

void test(stackless_co::Schedule *s) {
    struct args arg1 = {0};
    struct args arg2 = {100};

    int co1 = s->coroutine_new(foo, &arg1);
    int co2 = s->coroutine_new(foo, &arg2);
    printf("main start\n");

    while (s->coroutine_status(co1) && s->coroutine_status(co2)) {
        s->coroutine_resume(co1);
        s->coroutine_resume(co2);
    }
    printf("main end\n");
}

int main() {
    auto *s = stackless_co::Schedule::coroutine_open();
    test(s);
    s->coroutine_close();

    return 0;
}

上面的代碼首先利用 coroutine_open 創建了協程調度器 s,用來統一管理全部的協程。同時,在 test 函數中,創建了兩個協程 co1 和 co2,不斷的反覆 yieldresume 協程,直至兩個協程執行完畢。

執行後輸出:

main start
coroutine 0 : 0
coroutine 1 : 100
coroutine 0 : 1
coroutine 1 : 101
coroutine 0 : 2
coroutine 1 : 102
coroutine 0 : 3
coroutine 1 : 103
coroutine 0 : 4
coroutine 1 : 104
main end

可以看到,我們實現的代碼也可以完成協程的功能。

總結

本文首先介紹了:

隨後溫習了線程上下文相關的知識,包括:

緊接着介紹了幾種協程的實現種類:

然後探討了關於協程的一些其他內容,包括:

隨後是實踐部分,包括:

附錄

源代碼:

參考文章:

參考項目:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/SyWjLg3lYx3pIJQfEtik8Q