淺談協程
作者:kylinkzhang,騰訊 CSIG 後臺開發工程師
什麼是協程
我們可以簡單的認爲:協程就是用戶態的線程,但是上下文切換的時機是靠調用方(寫代碼的開發人員)自身去控制的。
同時,協程和用戶態線程非常接近,用戶態線程之間的切換不需要陷入內核,但部分操作系統中用戶態線程的切換需要內核態線程的輔助。
下面是一個簡單的例子:
void A() {
cout << 1 << " ";
cout << 2 << " ";
cout << 3 << " ";
}
void B() {
cout << "x" << " ";
cout << "y" << " ";
cout << "z" << " ";
}
int main(void) {
A();
B();
}
在單線程中,上述函數的輸出爲:
1 2 3 x y z
如果我們用 libco 庫將上面程序改造一下:
void A() {
cout << 1 << " ";
cout << 2 << " ";
co_yield_ct(); // 切出到主協程
cout << 3 << " ";
}
void B() {
cout << "x" << " ";
co_yield_ct(); // 切出到主協程
cout << "y" << " ";
cout << "z" << " ";
}
int main(void) {
... // 主協程
co_resume(A); // 啓動協程 A
co_resume(B); // 啓動協程 B
co_resume(A); // 從協程 A 切出處繼續執行
co_resume(B); // 從協程 B 切出處繼續執行
}
同樣在單線程中,改造後的程序輸出如下:
1 2 x 3 y z
可以看出,切出操作是由 co_yield_ct()
函數實現的,而協程的啓動和恢復是由 co_resume
實現的。函數 A()
和 B()
並不是一個執行完才執行另一個,而是產生了 “交叉執行 “ 的效果,這就是通過協程實現的!
線程挺好的,我們爲什麼需要協程呢?
因爲有些時候我們在執行一些操作(尤其是 IO 操作)時,不希望去做“創建一個新的線程”
這種重量級的操作來異步處理。而是希望:在當前線程執行中,暫時切換到其他任務中執行,同時在 IO 真正準備好了之後,再切換回來繼續執行!
相比於多開一個線程來操作,使用協程的好處:
-
減少了線程的重複高頻創建;
-
儘量避免線程的阻塞;
-
提升代碼的可維護與可理解性(畢竟不需要考慮多線程那一套東西了);
更多協程的好處:
- https://www.zhihu.com/question/20511233
同時,下面是一些協程的特點:
-
協程可以主動讓出 CPU 時間片;(注意:不是當前線程讓出 CPU 時間片,而是線程內的某個協程讓出時間片供同線程內其他協程運行;)
-
協程可以恢復 CPU 上下文;當另一個協程繼續執行時,其需要恢復 CPU 上下文環境;
-
協程有個管理者,管理者可以選擇一個協程來運行,其他協程要麼阻塞,要麼 ready,或者 died;
-
運行中的協程將佔有當前線程的所有計算資源;
-
協程天生有棧屬性,而且是 lock free;
不理解這些協程特點也不要緊,下文都會講到。
補充:線程上下文
下圖中展示了線程在運行過程 CPU 需要的一些信息(CPU Context,CPU 上下文),比如通用寄存器、棧信息(EBP/ESP)等,進程 / 線程切換時需要保存與恢復這些信息。而進程 / 內核態線程切換的時候需要與 OS 內核進行交互,保存 / 讀取 CPU 上下文信息。
線程時間消耗分析
內核態(Kernel)的一些數據是共享的,讀寫時需要同步機制,所以操作一旦陷入內核態就會消耗更多的時間。進程需要與操作系統中所有其他進程進行資源爭搶,且操作系統中資源的鎖是全局的;線程之間的數據一般在進程內共享,所以線程間資源共享相比如進程而言要輕一些。
雖然很多操作系統(比如 Linux)進程與線程區別不是非常明顯,但線程還是比進程要輕。
線程的切換(Context Switch)相比於其他操作而言並不是非常耗時,如下圖所示(2018 年):
參考這篇 Linux 線程相關文章,Linux 2.6 之後 Linux 多線程的性能提高了很多,大部分場景下線程切換耗時在 2us 左右;
下面是 Linux 下線程切換耗時統計(2013 年):
正常情況下線程可用的 CPU 時間片都在數十毫秒級別,而線程切換佔總耗時的千分之幾以內,協程的使用可以將這個損耗進一步降低(主要是去除了其他操作,比如 futex 等)。
線程內存消耗分析
不是所有編程語言或者系統都支持一次創建很多線程。
例如,在 x32 系統中即使使用了虛內存空間,因爲進程能訪問的虛內存空間大概是 3GB,所以單進程最多創建 300 多條線程(假設系統爲每條線程分配 10M 棧空間),太多線程甚至還伴隨着由於線程切換而觸發缺頁中斷的風險。
如果我們創建很多線程(比如 x64 系統下創建 1 萬個線程),不考慮優先級且假設 CPU 有 10 個核心,那麼每個線程每秒有 1ms 的時間片,整個業務的耗時大概是:
-
(n-1) * 1 + n * 0.001(n−1)∗1+n∗0.001秒
; -
(n 是線程在處理業務的過程中被調度的次數)
;
如果大量線程之間存在資源競爭,那麼系統行爲將難以預測。所以在有限的資源下創建大量線程是極其不合理的,服務線程的個數和 CPU 核心數應該在一個合理的比例內。
操作系統線程調度可參考:
- https://help.perforce.com/sourcepro/current/HTML/index.html#page/SourcePro_Core/threadsug-ThreadPackage.22.118.html
在默認情況下,Linux 系統給每條線程分配的棧空間最大是 6~8MB,這個大小是上限,也是虛內存空間,並不是每條線程真實的棧使用情況。
線程真實棧內存使用會隨着線程執行而變化,如果線程只使用了少量局部變量,那麼真實線程棧可能只有幾十個字節的大小;系統在維護線程時需要分配額外的空間,所以線程數的增加還是會提高內存資源的消耗。
通過上面的分析我們可以知道:
如果業務處理時間遠小於 IO 耗時,線程切換非常頻繁,那麼使用協程是不錯的選擇;
並且,協程的優勢並不僅僅是減少線程之間切換,從編程的角度來看,協程的引入簡化了異步編程;
同時,協程爲一些異步編程提供了無鎖的解決方案,即:
協程可以用同步編程的方式實現異步編程才能實現的功能。
如何保存上下文
很多地方把協程稱爲 Subroutine;Subroutine 是什麼?就是函數!
上古時期的計算機科學家們早就給出了概念:Coroutine 就是可以中斷並恢復執行的 Subroutine。
因此從這個角度來看協程擁有調用棧並不是一個奇怪的事情。
再來思考,Coroutine 與 Subroutine 相比有什麼區別?區別僅有一個就是:
Coroutine 可以中斷並恢復,對應的操作就是 yield/resume
。
這樣看來 Subroutine 不過是 Coroutine 的一個子集罷了,也就是說把協程當做一個特殊的函數調用:
- 可以中斷並恢復
既然可以把 Coroutine 當做一個特殊的函數調用,那麼如何像切換函數一樣去切換 Coroutine 呢?難點在於:
- 除了像函數一樣切換出去,還要在某種條件滿足的時候切換回來。
通常的做法是:在協程內部存儲自身的上下文,並在需要切換的時候把上下文切換;我們知道上下文其實本質上就是寄存器,所以保存上下文實際上就是把寄存器的值保存下來。
相對應的,有下面幾種方法:
-
使用
setjmp/longjmp
; -
使用匯編保存寄存器中的值,libco 就使用了這種方法;
-
使用
ucontext.h
這個封裝好的庫也可以幫我們完成上下文的相關工作。
關於
setjmp.h
:https://zh.m.wikipedia.org/zh-hans/Setjmp.h需要注意的是:
setjmp/longjmp 一般不能作爲協程實現的底層機制,因爲 setjmp/longjmp 對棧信息的支持有限。
關於 ucontext.h
:https://en.wikipedia.org/wiki/Setcontext
下面分別來看 setjmp
和 ucontext
方法,至於使用匯編的方法,會在本文講解有棧協程是講述。
使用 setjmp/longjmp
下面代碼模擬了單線程併發執行兩個 while(true){...}
函數:
源代碼:https://github.com/JasonkayZK/cpp-learn/tree/coroutine/setjmp_demo
setjmp_demo/setjmp_demo.cc
#include <cstdlib>
#include <cstdio>
#include <setjmp.h>
int max_iteration = 9;
int iter;
jmp_buf Main;
jmp_buf PointPing;
jmp_buf PointPong;
void Ping() {
if (setjmp(PointPing) == 0) longjmp(Main, 1); // 可以理解爲重置,reset the world
while (1) {
printf("%3d : Ping-", iter);
if (setjmp(PointPing) == 0) longjmp(PointPong, 1);
}
}
void Pong() {
if (setjmp(PointPong) == 0) longjmp(Main, 1);
while (1) {
printf("Pong\n");
iter++;
if (iter > max_iteration) exit(0);
if (setjmp(PointPong) == 0) longjmp(PointPing, 1);
}
}
int main(int argc, char* argv[]) {
iter = 1;
if (setjmp(Main) == 0) Ping();
if (setjmp(Main) == 0) Pong();
longjmp(PointPing, 1);
}
首先,我們定義了三個保存調用棧的節點:
-
jmp_buf Main;
-
jmp_buf PointPing;
-
jmp_buf PointPong;
並在 main 函數中首先創建(啓動)了兩個函數:Ping、Pong,在使用 longjmp(PointPing, 1);
之後,PointPing
不再是 0,從而啓動了 Ping 協程。此後,函數 Ping 和 函數 Pong 在 while (1)
中交替執行,而不再返回 main 函數中。
最後,當 iter > max_iteration
時,調用 exit(0)
退出。通過命令 g++ -std=c++11 setjmp_demo.cc -o setjmp_demo
編譯後執行 ./setjmp_demo
,輸出如下:
1 : Ping-Pong
2 : Ping-Pong
3 : Ping-Pong
4 : Ping-Pong
5 : Ping-Pong
6 : Ping-Pong
7 : Ping-Pong
8 : Ping-Pong
9 : Ping-Pong
雖然上面實現了比較簡單的函數切換,但是實際上我們無法通過 setjmp.h
庫獲取到真正的上下文信息。如果想要真正獲取到上下文信息,可以使用 ucontext.h
庫。
使用 ucontext
下面關於 ucontext 的介紹源自:
- http://pubs.opengroup.org/onlinepubs/7908799/xsh/ucontext.h.html
實際上,ucontext lib 已經不推薦使用了,但依舊是不錯的協程入門資料。其他底層協程庫實現可以查看:
協程庫的對比可以參考:
- https://github.com/tboox/benchbox/wiki/switch
linux 系統一般都存在 ucontext 這個 C 語言庫,這個庫主要用於:操控當前線程下的 CPU 上下文。
和 setjmp/longjmp
不同,ucontext
直接提供了設置函數運行時棧的方式(makecontext
),避免不同函數棧空間的重疊。
需要注意的是:
ucontext 只操作與當前線程相關的 CPU 上下文,所以下文中涉及 ucontext 的上下文均指當前線程的上下文;(一般 CPU 有多個核心,一個線程在某一時刻只能使用其中一個,所以 ucontext 只涉及一個與當前線程相關的 CPU 核心)
ucontext.h
頭文件中定義了 ucontext_t
這個結構體,這個結構體中至少包含以下成員:
ucontext_t *uc_link // next context
sigset_t uc_sigmask // 阻塞信號阻塞
stack_t uc_stack // 當前上下文所使用的棧
mcontext_t uc_mcontext // 實際保存 CPU 上下文的變量,這個變量與平臺&機器相關,最好不要訪問這個變量
可移植的程序最好不要讀取與修改
ucontext_t
中的uc_mcontext
,因爲不同平臺下uc_mcontext
的實現是不同的。
同時,ucontext.h
頭文件中定義了四個函數,下面分別介紹:
int getcontext(ucontext_t *); // 獲得當前 CPU 上下文
int setcontext(const ucontext_t *);// 重置當前 CPU 上下文
void makecontext(ucontext_t *, (void *)(), int, ...); // 修改上下文信息,比如設置棧指針
int swapcontext(ucontext_t *, const ucontext_t *);
下面分別來看。
getcontext
#include <ucontext.h>
int getcontext(uconte t_t *ucp);
getcontext` 函數使用當前 CPU 上下文初始化 ucp 所指向的結構體,初始化的內容包括:CPU 寄存器、信號 mask 和當前線程所使用的棧空間;
返回值:getcontext 成功返回 0,失敗返回 -1。
setcontext
#include <ucontext.h>
int setcontext(ucontext_t *ucp);
和 getcontext
函數類似,setcontext
函數用於:設置 CPU 寄存器、信號 mask 和當前線程所使用的棧空間。
需要特別注意的是:
如果函數 setcontext
執行成功,那麼調用 setcontext
的函數將不會返回,因爲當前 CPU 的上下文已經交給其他函數或者過程了,當前函數完全放棄了 對 CPU 的 “所有權”。
getcontext 和 setcontext 的應用:
當信號處理函數需要執行的時候,當前線程的上下文需要保存起來,隨後進入信號處理階段;
makecontext
#include <ucontext.h>
void makecontext(ucontext_t *ucp, (void *func)(), int argc, ...);
makecontext
修改由 getcontext
創建的上下文 ucp
;
如果 ucp
指向的上下文由 swapcontext
或 setcontext
恢復,那麼當前線程將執行傳遞給 makecontext
的函數 func(...)
。
執行 makecontext 後需要爲新上下文分配一個棧空間,如果不創建,那麼新函數func
執行時會使用舊上下文的棧,而這個棧可能已經不存在了。同時,argc 必須和 func 中整型參數的個數相等。
swapcontext
#include <ucontext.h>
int swapcontext(ucontext_t *oucp, const ucontext_t *ucp);
swapcontext
將當前上下文信息保存到 oucp
中並使用 ucp
重置 CPU 上下文。
返回值:
-
成功則返回 0;
-
失敗返回 -1 並置
errno
;
如果 ucp
所指向的上下文沒有足夠的棧空間以執行餘下的過程,swapcontext
將返回 -1。
總結
相比於 setjml
略微簡單的功能,使用 ucontext
我們可以方便的獲取當前調用函數的上下文,進而實現協程。
協程的類別
協程的實現不只有一種,很多活躍的語言如 Python、Java、Golang 等都是支持協程的。
儘管這些協程可能名稱不同,甚至用法也不同,但它們都可以被劃分爲兩大類:
-
有棧(stackful)協程,這類協程的實現類似於內核態線程的實現,不同協程間切換還是要切換對應的棧上下文,只是不用陷入內核而已;例如:goroutine、libco;
-
無棧(stackless)協程,無棧協程的上下文都會放到公共內存中,在協程切換時使用狀態機來切換,而不用切換對應的上下文(因爲都已經在堆中了),因此相比有棧協程要輕量許多;例如:C++20、Rust、JavaScript 中的協程。
這裏所謂的有棧、無棧:
並不是說這個協程運行的時候有沒有棧,而是說協程之間是否存在調用棧(Callback Stack);
同時,根據協程之間是否有明顯的調用關係,我們又可以把協程分爲:
-
非對稱協程:協程之間有明顯的調用關係;
-
對稱協程:協程之間無明顯的調用關係。
例如,協程 A 調用了協程 B:
如果只有 B 完成之後才能調用 A,那麼此時 A/B 是非對稱協程;
如果 A/B 被調用的概率相同,那麼此時 A/B 是對稱協程;
下面我們分別來看。
有棧協程
開源庫 libco 就是通過彙編語言實現的有棧協程庫,我們來看一看 libco 中對於 32 位機器的上下文切換操作是如何完成的:
通過分析代碼看到,無論是 co_yield_ct
還是 co_resume
,在協程切出和恢復時,都調用了同一個函數co_swap
,在這個函數中調用了 coctx_swap
來實現協程的切換,這一函數的原型是:
void coctx_swap( coctx_t *,coctx_t* ) asm("coctx_swap");
兩個參數都是 coctx_t *
指針類型,其中第一個參數表示要切出的協程,第二個參數表示切出後要進入的協程。
coctx_swap
函數便是用匯編實現的,我們這裏只關注 x86-64 相關的部分,其代碼如下:
coctx_swap:
leaq 8(%rsp),%rax
leaq 112(%rdi),%rsp
pushq %rax
pushq %rbx
pushq %rcx
pushq %rdx
pushq -8(%rax) //ret func addr
pushq %rsi
pushq %rdi
pushq %rbp
pushq %r8
pushq %r9
pushq %r12
pushq %r13
pushq %r14
pushq %r15
movq %rsi, %rsp
popq %r15
popq %r14
popq %r13
popq %r12
popq %r9
popq %r8
popq %rbp
popq %rdi
popq %rsi
popq %rax //ret func addr
popq %rdx
popq %rcx
popq %rbx
popq %rsp
pushq %rax
xorl %eax, %eax
ret
可以看出,coctx_swap
中並未像常規被調用函數一樣創立新的棧幀。
先看前兩條語句:
leaq 8(%rsp),%rax
leaq 112(%rdi),%rsp
leaq
用於把其第一個參數的值賦值給第二個寄存器參數,而第一條語句用來把 8(%rsp)
的本身的值存入到 %rax
中。
注意:
這裏使用的並不是
8(%rsp)
指向的值,而是把 8(%rsp) 表示的地址賦值給了 %rax,這一地址是父函數棧幀中除返回地址外棧幀頂的位置。
在第二條語句 leaq 112(%rdi), %rsp
中,%rdi
存放的是coctx_swap
第一個參數的值,這一參數是指向 coctx_t
類型的指針,表示當前要切出的協程,這一類型的定義如下:
struct coctx_t {
void *regs[ 14 ];
size_t ss_size;
char *ss_sp;
};
因而 112(%rdi)
表示的就是第一個協程的 coctx_t
中 regs[14]
數組的下一個 64 位地址,而接下來的語句:
pushq %rax
pushq %rbx
pushq %rcx
pushq %rdx
pushq -8(%rax) //ret func addr
pushq %rsi
pushq %rdi
pushq %rbp
pushq %r8
pushq %r9
pushq %r12
pushq %r13
pushq %r14
pushq %r15
第一條語句 pushq %rax
用於把 %rax
的值放入到 regs[13]
中,resg[13]
用來存儲第一個協程的 %rsp
的值。這時 %rax
中的值是第一個協程 coctx_swap
父函數棧幀除返回地址外棧幀頂的地址。
由於 regs[]
中有單獨的元素存儲返回地址,棧中再保存返回地址是無意義的,因而把父棧幀中除返回地址外的棧幀頂作爲要保存的 %rsp
值是合理的;當協程恢復時,把保存的 regs[13]
的值賦值給 %rsp
即可恢復本協程 coctx_swap
父函數堆棧指針的位置。
第一條語句之後的語句就是用 pushq
把各 CPU 寄存器的值依次從 regs
尾部向前壓入。即通過調整 %rsp
把 regs[14]
當作堆棧,然後利用 pushq
把寄存器的值和返回地址存儲到 regs[14]
整個數組中。並且,regs[14]
數組中各元素與其要存儲的寄存器對應關係如下:
//-------------
// 64 bit
//low | regs[0]: r15 |
// | regs[1]: r14 |
// | regs[2]: r13 |
// | regs[3]: r12 |
// | regs[4]: r9 |
// | regs[5]: r8 |
// | regs[6]: rbp |
// | regs[7]: rdi |
// | regs[8]: rsi |
// | regs[9]: ret | //ret func addr, 對應 rax
// | regs[10]: rdx |
// | regs[11]: rcx |
// | regs[12]: rbx |
//hig | regs[13]: rsp |
接下來的彙編語句:
movq %rsi, %rsp
popq %r15
popq %r14
popq %r13
popq %r12
popq %r9
popq %r8
popq %rbp
popq %rdi
popq %rsi
popq %rax //ret func addr
popq %rdx
popq %rcx
popq %rbx
popq %rsp
這裏用的方法還是通過改變 %rsp
的值,把某塊內存當作棧來使用。
第一句 movq %rsi, %rsp
就是讓 %rsp
指向 coctx_swap
第二個參數,這一參數表示要進入的協程。而第二個參數也是coctx_t
類型的指針,即執行完 movq
語句後,%rsp
指向了第二個參數 coctx_t
中 regs[0]
,而之後的 pop
語句就是用 regs[0-13]
中的值填充 cpu 的寄存器,這裏需要注意的是 popq
會使得 %rsp
的值增加而不是減少,這一點保證了會從 regs[0]
到 regs[13]
依次彈出到 cpu 寄存器中。
在執行完最後一句 popq %rsp
後,%rsp
已經指向了新協程要恢復的棧指針(即新協程之前調用 coctx_swap
時父函數的棧幀頂指針),由於每個協程都有一個自己的棧空間,可以認爲這一語句使得 %rsp
指向了要進入協程的棧空間。
coctx_swap
中最後三條語句如下:
pushq %rax
xorl %eax, %eax
ret
pushq %rax
用來把 %rax
的值壓入到新協程的棧中,這時 %rax
是要進入的目標協程的返回地址,即要恢復的執行點;然後用 xorl
把 %rax
低 32 位清 0 以實現地址對齊;最後 ret
語句用來彈出棧的內容,並跳轉到彈出的內容表示的地址處,而彈出的內容正好是上面 pushq %rax
時壓入的 %rax
的值,即之前保存的此協程的返回地址。
即最後這三條語句實現了轉移到新協程返回地址處執行,從而完成了兩個協程的切換。
可以看出,這裏通過調整 %rsp
的值來恢復新協程的棧,並利用了 ret
語句來實現修改指令寄存器 %rip
的目的,通過修改 %rip
來實現程序運行邏輯跳轉。
注意:
%rip
的值不能直接修改,只能通過call
或ret
之類的指令來間接修改;
整體上看來,協程的切換其實就是:cpu 寄存器內容特別是 %rip
和 %rsp
的寫入和恢復,因爲 cpu 的寄存器決定了程序從哪裏執行(%rip) 和使用哪個地址作爲堆棧 (%rsp)。
寄存器的寫入和恢復如下圖所示:
執行完上圖的流程,就將之前 cpu 寄存器的值保存到了協程 A 的 regs[14] 中,而將協程 B regs[14] 的內容寫入到了寄存器中,從而使執行邏輯跳轉到了 B 協程 regs[14] 中保存的返回地址處開始執行,即實現了協程的切換(從 A 協程切換到了 B 協程執行)。
詳細關於 libco 的實現細節:
https://www.zhihu.com/question/52193579
https://zhuanlan.zhihu.com/p/27409164
https://github.com/yyrdl/libco-code-study
無棧協程
無棧協程的本質就是一個狀態機(state machine),它可以理解爲在另一個角度去看問題,即:
- 同一協程協程的切換本質不過是指令指針寄存器的改變。
首先,我們來看一個使用 libco 的協程的例子(當然 libco 是一個有棧協程):
void* test(void* para){
co_enable_hook_sys();
int i = 0;
poll(0, 0, 0. 1000); // 協程切換執行權,1000ms後返回
i++;
poll(0, 0, 0. 1000); // 協程切換執行權,1000ms後返回
i--;
return 0;
}
int main(){
stCoRoutine_t* routine;
co_create(&routine, NULL, test, 0); // 創建一個協程
co_resume(routine);
co_eventloop( co_get_epoll_ct(),0,0 );
return 0;
}
這段代碼實際的意義就是:主協程跑一個協程去執行 test 函數,在 test 中我們需要兩次從協程中切換出去,這裏對應了兩個 poll 操作(hook 機制),hook 後的 poll 所做的事情就是把當前協程的 CPU 執行權切換到調用棧的上一層,並在超時或註冊的 fd 就緒時返回(當然樣例這裏就只是超時了)。
如果是無棧協程,實現相同邏輯的代碼是怎麼樣的呢?其實就是翻譯成類似於以下狀態機的代碼:
class test_coroutine {
int i;
int __state = 0;
void MoveNext() {
switch(__state) {
case 0:
return frist();
case 1:
return second();
case 2:
return third();
}
}
void frist() {
i = 0;
__state = 1;
}
void second() {
i++;
_state = 2;
}
void third() {
i--;
}
};
我們可以看到:相比與有棧協程中的 test 函數,這裏把整個協程抽象成一個類,以原本需要執行切換的語句處爲界限,把函數劃分爲幾個部分,並在某一個部分執行完以後進行狀態轉移,在下一次調用此函數的時候就會執行下一部分。
這樣的話我們就完全沒有必要像有棧協程那樣顯式的執行上下文切換了,我們只需要一個簡易的調度器來調度這些函數即可。
在 Rust 中,async 也是一個語法糖,實際上編譯後就是實現了類似於上面的代碼結構,感興趣的可以去看《async book》。
從執行時棧的角度來看:
其實所有的協程共用的都是一個棧,即系統棧,也就也不必我們自行去給協程分配棧,因爲是函數調用,我們當然也不必去顯示的保存寄存器的值。而且相比有棧協程把局部變量放在新開的空間上,無棧協程直接使用系統棧使得 CPU cache 局部性更好,同時也使得無棧協程的中斷和函數返回幾乎沒有區別,這樣也可以凸顯出無棧協程的高效。
對稱協程與非對稱協程
前文中也簡單提到了對稱和非對稱協程,這裏也簡單聊一下吧。
其實對於 “對稱” 這個名詞,闡述的實際是:**協程之間的關係。**用大白話來說就是:對稱協程就是說協程之間人人平等,沒有誰調用誰一說,大家都是一樣的,而非對稱協程就是協程之間存在明顯的調用關係。
簡單來說就是這樣:
-
對稱協程 Symmetric Coroutine:任何一個協程都是相互獨立且平等的,調度權可以在任意協程之間轉移;
-
非對稱協程 Asymmetric Coroutine:協程出讓調度權的目標只能是它的調用者,即協程之間存在調用和被調用關係;
其實兩者的實現我覺得其實差異不大,非對稱協程其實就是擁有調用棧,而非對稱協程則是大家都平等,不需要調用棧,只需要一個數據結構存儲所有未執行完的協程即可。
至於哪種更優?這個需要分情況:
如果你使用協程的目的是爲了優化一些 IO 密集型應用,那麼協程切換出去的時候就是它等待事件到來的時候,此時你就算切換過去也沒有什麼意義,還不如等到事件到來的時候自動切換回去。
其實上面說的是有一些問題,因爲這個執行權的切換實際上是(調用者–被調用者)之間的切換,對稱就是它們之間都是平等的,就是假如 A 協程執行了 B,C 協程,那麼 B 協程可以切換回 A,也可以切換回 C;而非對稱只能是 B 切換回 A,A 切換回 C,C 再切換回 A,以此類推。
這樣看起來顯然非對稱協程相比之下更爲符合我們的認知,因爲對稱協程目前我不知道如何選擇一個合適的協程來獲得 CPU 執行權,正如上面所說,此協程可能正在等待事件;當然如果調度算法足夠優秀的話,對稱協程也是可取的。
關於協程的一些其他內容
N:1 & N:M 協程
我們知道,和線程綁定的協程只有在對應線程運行的時候纔有被執行的可能,如果對應線程中的某一個協程完全佔有了當前線程,那麼當前線程中的其他所有協程都不會被執行。
同時,協程的所有信息都保存在上下文(Contex)對象中,將不同上下文分發給不同的線程就可以實現協程的跨線程執行,如此,協程被阻塞的概率將減小。
因此,借用 BRPC 中對 N:M
協程的介紹,來解釋下什麼是 N:M
協程。
我們常說的協程通常指的是
N:1
線程庫,即所有的協程運行於一個系統線程中,計算能力和各類 eventloop 庫等價;由於不跨線程,協程之間的切換不需要系統調用,可以非常快 (100ns-200ns),受 cache 一致性的影響也小;
但代價是協程無法高效地利用多核,代碼必須非阻塞,否則所有的協程都被卡住……
bthread
是一個 M:N
線程庫,一個bthread
被卡住不會影響其他bthread
。
其中的關鍵技術有兩點:
-
work stealing 調度;
-
butex;
前者讓 bthread 更快地被調度到更多的核心上,後者讓 bthread 和 pthread 可以相互等待和喚醒,這兩點協程都不需要;
更多 brpc 的線程見:
- https://github.com/apache/incubator-brpc/blob/master/docs/cn/threading_overview.md
這麼看來 貌似 bthread 自己實現了 golang 的 goroutine?
表面看起來的卻如此:兩者都實現了 M:N 用戶態線程。但是事實上, golang 中的 goroutine 的實現要更爲複雜一些:
bthread 的設計比較接近 go 1.0 版本:OS 線程不會動態增加,在有大量的阻塞性 syscall 下,會有影響。
而 go 1.1 之後的設計就是動態增減 OS 線程,而且提供了 LockOSThread,可以讓 goroutine 和 OS 線程 1:1。
關於這個問題,見:https://www.zhihu.com/question/65549422
協程的組成
通過上面的描述,N:M
模式下的協程其實就是可用戶確定調度順序的用戶態線程,與系統級線程對照可以將協程框架分爲以下幾個模塊:
-
協程上下文:對應操作系統中的 PCB/TCB(Process/Thread Control Block);
-
保存協程上下文的容器:對應操作系統中保存 PCB/TCB 的容器,一般是一個列表(在實際實現時,協程上下文容器可以使用一個也可以使用多個,比如:普通協程隊列、定時的協程優先隊列等);
-
協程的執行器:
-
協程的調度器,對應操作系統中的進程 / 線程調度器;
-
執行協程的 worker 線程,對應實際線程 / 進程所使用的 CPU 核心;
協程的調度
協程的調度與 OS 線程調度十分相似,如下圖協程調度示例所示:
協程相關工具
系統級線程有鎖(mutex)、條件變量(condition)等工具,協程也有對應的工具;比如:libgo 提供了協程之間使用的鎖 Co_mutex/Co_rwmutex
。
不同協程框架對工具的支持程度不同,實現方式也不盡相同;對此問題,本文不做深入介紹。
系統級線程和協程處於不同的系統層級,所以兩者的同步工具不完全通用,如果在協程中使用了線程的鎖(例如:std::mutex
),則整個線程將會被阻塞,當前線程將不會再調度與執行其他協程。
最簡單的例子:
如果在一個協程中使用了
sleep
,那麼這個線程下的所有協程全部都會被阻塞。** 在使用協程時,這種方法是非常低效的。**
協程 & 線程的對比
協程對 CPU/IO 的影響
協程的目的在於剔除線程的阻塞,儘可能提高 CPU 的利用率。很多服務在處理業務時需要請求第三方服務,向第三方服務發起 RPC 調用;RPC 調用的網絡耗時一般耗時在毫秒級別,RPC 服務的處理耗時也可能在毫秒級別,如果當前服務使用同步調用,即 RPC 返回後才進行後續邏輯,那麼一條線程每秒處理的業務數量是可以估算的。
假設每次業務處理花費在 RPC 調用上的耗時是 20ms,那麼一條線程一秒最多處理 50 次請求。
如果在等待 RPC 返回時當前線程沒有被系統調度轉換爲 Ready 狀態,那當前 CPU 核心就會空轉,浪費了 CPU 資源!通過增加線程數量提高系統吞吐量的效果非常有限,而且創建大量線程也會造成其他問題。
協程雖然不一定能減少一次業務請求的耗時,但一定可以提升系統的吞吐量:
-
當前業務只有一次第三方 RPC 的調用,那麼協程不會減少業務處理的耗時,但可以提升 QPS;
-
當前業務需要多個第三方 RPC 調用,同時創建多個協程可以讓多個 RPC 調用一起執行,則當前業務的 RPC 耗時由耗時最長的 RPC 調用決定。
C++20 標準中的協程
雖然 C++ 20 標準中引入了協程,但是 C++20 只引入了協程需要的底層支持,所以直接使用相對比較難,不過很多庫已經提供了封裝,比如:
需要說明的是:C++20 協程的性能還是非常高的,等 C++23 提供簡化後的 lib,我們就可以非常方便地使用協程了。
就目前而言,編譯協程相關代碼需要 g++10 或者更高版本(clang++12 對協程支持有限):
可以通過下面的命令安裝:
Mac:
brew install gcc@10
;Ubuntu:
apt install gcc-10
/apt install g++-10
;
下面我寫了一個使用 C++20 標準中協程的例子:
cpp20_demo/cpp_20_demo.cc
#include <coroutine>
#include <iostream>
struct HelloCoroutine {
struct HelloPromise {
HelloCoroutine get_return_object() {
return std::coroutine_handle<HelloPromise>::from_promise(*this);
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception() {}
};
using promise_type = HelloPromise;
HelloCoroutine(std::coroutine_handle<HelloPromise> h) : handle(h) {}
std::coroutine_handle<HelloPromise> handle;
};
HelloCoroutine hello() {
std::cout << "Hello " << std::endl;
co_await std::suspend_always{};
std::cout << "world!" << std::endl;
}
int main() {
HelloCoroutine coro = hello();
std::cout << "calling resume" << std::endl;
coro.handle.resume();
std::cout << "destroy" << std::endl;
coro.handle.destroy();
return 0;
}
編譯執行後輸出:
Hello
calling resume
world!
destroy
由於篇幅有限,這裏不再詳述 C++20 標準中的協程使用了;
如果想更深入的學習,可以參考:
https://cloud.tencent.com/developer/article/1882417
https://en.cppreference.com/w/cpp/language/coroutines
https://www.scs.stanford.edu/~dm/blog/c++-coroutines.html
動手實現協程
上面文章的內容基本上已經把整個協程介紹的七七八八了。看了這麼多內容,你是不是心動想要自己動手寫一個協程庫了呢?
那麼,跟隨下面的內容,一起使用 C++ 實現協程吧。是的,有棧協程、無棧協程都會實現一遍。
基於彙編實現的有棧協程
首先我們來使用匯編來實現一個有棧協程,這裏參考的是微信開源的 libco;
源代碼:https://github.com/JasonkayZK/cpp-learn/tree/coroutine/stack_co
協程環境
本例中實現的協程不支持跨線程,而是每個線程分配一個環境,來維護該線程下運行中的協程之間的層次關係,代碼如下:
stack_co/environment.h
#ifndef COROUTINE_ENVIRONMENT_H
#define COROUTINE_ENVIRONMENT_H
#include "coroutine.h"
#include <cstddef>
#include <cstring>
#include <functional>
#include <memory>
namespace stack_co {
class Coroutine;
class Environment {
friend class Coroutine;
public:
// Thread-local instance
static Environment &instance();
// Factory method
template<typename Entry, typename ...Args>
std::shared_ptr<Coroutine> create_coroutine(Entry &&entry, Args &&...arguments);
// No copy constructor
Environment(const Environment &) = delete;
// No Assignment Operator
Environment &operator=(const Environment &) = delete;
// Get current coroutine in the stack
Coroutine *current();
private:
// No explicit constructor
Environment();
void push(std::shared_ptr<Coroutine> coroutine);
void pop();
private:
// Coroutine calling stack
std::array<std::shared_ptr<Coroutine>, 1024> _c_stack;
// Top of the coroutine calling stack
size_t _c_stack_top;
// Main coroutine(root)
std::shared_ptr<Coroutine> _main;
};
// A default factory method
template<typename Entry, typename ...Args>
inline std::shared_ptr<Coroutine> Environment::create_coroutine(Entry &&entry, Args &&...arguments) {
return std::make_shared<Coroutine>(
this, std::forward<Entry>(entry), std::forward<Args>(arguments)...);
}
} // namespace stack_co
#endif //COROUTINE_ENVIRONMENT_H
上面的代碼定義了協程運行的環境(Environment)。
需要注意的是:
我們顯式的刪除了 Environment 的拷貝構造函數和賦值運算符,並且將構造函數聲明爲 private
,僅提供工廠方法來創建 Environment 實例。
而 Environment 在實現時,使用的是 thread_local
,從而保證了每個線程僅會存在單個實例。
對外暴露了 current
方法用於獲取當前環境下調用棧中的協程,而三個成員變量是用來保存或記錄當前調用協程的:
-
_c_stack
; -
_c_stack_top
; -
_main
;
Environment 類對應的實現:
stack_co/environment.cc
#include "environment.h"
namespace stack_co {
Environment &Environment::instance() {
static thread_local Environment env;
return env;
}
Coroutine *Environment::current() {
return this->_c_stack[this->_c_stack_top - 1].get();
}
void Environment::push(std::shared_ptr<Coroutine> coroutine) {
_c_stack[_c_stack_top++] = std::move(coroutine);
}
void Environment::pop() {
_c_stack_top--;
}
Environment::Environment() : _c_stack_top(0) {
_main = std::make_shared<Coroutine>(this, []() {});
push(_main);
}
} // namespace stack_co
實現內容比較簡單,主要是:
-
instance
:工廠方法; -
current
:獲取當前環境下棧頂的協程實例; -
push/pop
:協程壓棧 / 出棧;
下面來看協程實例相關的定義。
協程狀態
協程相關的狀態在 status.h
頭文件中定義了:
stack_co/status.h
#ifndef COROUTINE_STATUS_H
#define COROUTINE_STATUS_H
namespace stack_co {
// The status of the coroutine
struct Status {
using Bitmask = unsigned char;
constexpr static Bitmask MAIN = 1 << 0;
constexpr static Bitmask IDLE = 1 << 1;
constexpr static Bitmask RUNNING = 1 << 2;
constexpr static Bitmask EXIT = 1 << 3;
Bitmask operator&(Bitmask mask) const { return flag & mask; }
Bitmask operator|(Bitmask mask) const { return flag | mask; }
Bitmask operator^(Bitmask mask) const { return flag ^ mask; }
void operator&=(Bitmask mask) { flag &= mask; }
void operator|=(Bitmask mask) { flag |= mask; }
void operator^=(Bitmask mask) { flag ^= mask; }
Bitmask flag;
};
} // namespace stack_co
#endif //COROUTINE_STATUS_H
協程相關的狀態主要包括了下面幾類:
-
MAIN:僅作爲協程入口調用棧的標記;
-
IDLE:空閒狀態;
-
RUNNING:執行中;
-
EXIT:線程退出。
並重載了一些運算符。
協程實例
協程的實例主要是用於支持接口 resume
和 yield
;
代碼如下:
stack_co/coroutine.h
#ifndef COROUTINE_COROUTINE_H
#define COROUTINE_COROUTINE_H
#include "status.h"
#include "context.h"
#include <functional>
#include <memory>
namespace stack_co {
class Environment;
class Coroutine : public std::enable_shared_from_this<Coroutine> {
friend class Environment;
friend class Context;
public:
static Coroutine ¤t();
// 測試當前控制流是否位於協程上下文
static bool test();
// 獲取當前運行時信息
// 目前僅支持運行、退出、主協程的判斷
Status runtime() const;
bool exit() const;
bool running() const;
// 核心操作:resume和yield
// usage: Coroutine::current().yield()
static void yield();
// Note1: 允許處於EXIT狀態的協程重入,從而再次resume
// 如果不使用這種特性,則用exit() / running()判斷
//
// Note2: 返回值可以得知resume並執行後的運行時狀態
// 但是這個值只適用於簡單的場合
// 如果接下來其它協程的運行也影響了該協程的狀態
// 那建議用runtime()獲取
Status resume();
Coroutine(const Coroutine &) = delete;
Coroutine(Coroutine &&) = delete;
Coroutine &operator=(const Coroutine &) = delete;
Coroutine &operator=(Coroutine &&) = delete;
public:
// 構造Coroutine執行函數,entry爲函數入口,對應傳參爲arguments...
// Note: 出於可重入的考慮,entry強制爲值語義
template<typename Entry, typename ...Args>
Coroutine(Environment *master, Entry entry, Args ...arguments)
: _entry([=] { entry(/*std::move*/(arguments)...); }),
_master(master) {}
private:
static void call_when_finish(Coroutine *coroutine);
private:
Status _runtime{};
Context _context{};
std::function<void()> _entry;
Environment *_master;
};
} // namespace stack_co
#endif //COROUTINE_COROUTINE_H
在 Coroutine 中定義了:
-
_runtime
:當前線程的狀態; -
_context
:當前協程的上下文信息(核心); -
_entry
:協程函數入口; -
_master
:協程調用環境;
對應的方法實現:
stack_co/coroutine.cc
#include "coroutine.h"
#include "environment.h"
namespace stack_co {
Coroutine &Coroutine::current() {
return *Environment::instance().current();
}
bool Coroutine::test() {
return current()._context.test();
}
Status Coroutine::runtime() const {
return _runtime;
}
bool Coroutine::exit() const {
return _runtime & Status::EXIT;
}
bool Coroutine::running() const {
return _runtime & Status::RUNNING;
}
Status Coroutine::resume() {
if (!(_runtime & Status::RUNNING)) {
_context.prepare(Coroutine::call_when_finish, this);
_runtime |= Status::RUNNING;
_runtime &= ~Status::EXIT;
}
auto previous = _master->current();
_master->push(shared_from_this());
_context.switch_from(&previous->_context);
return _runtime;
}
void Coroutine::yield() {
auto &coroutine = current();
auto ¤tContext = coroutine._context;
coroutine._master->pop();
auto &previousContext = current()._context;
previousContext.switch_from(¤tContext);
}
void Coroutine::call_when_finish(Coroutine *coroutine) {
auto &routine = coroutine->_entry;
auto &runtime = coroutine->_runtime;
if (routine) routine();
runtime ^= (Status::EXIT | Status::RUNNING);
// coroutine->yield();
yield();
}
} // namespace stack_co
協程內部的各種操作主要是調用其內部的 Context 實現的,下面我們來看。
上下文實例
上下文信息 Context
用於維護協程 Coroutine 的函數調用信息。
需要注意的是:上下文需要確保內存佈局準確無誤才能使用,一個context
的起始地址必須是regs[0]
,否則會影響後面的協程切換正確性。
代碼如下:
stack_co/context.h
#ifndef COROUTINE_CONTEXT_H
#define COROUTINE_CONTEXT_H
#include <cstddef>
#include <cstring>
#include <iterator>
namespace stack_co {
class Coroutine;
/**
* The context of coroutine(in x86-64)
*
* low | _registers[0]: r15 |
* | _registers[1]: r14 |
* | _registers[2]: r13 |
* | _registers[3]: r12 |
* | _registers[4]: r9 |
* | _registers[5]: r8 |
* | _registers[6]: rbp |
* | _registers[7]: rdi |
* | _registers[8]: rsi |
* | _registers[9]: ret |
* | _registers[10]: rdx |
* | _registers[11]: rcx |
* | _registers[12]: rbx |
* hig | _registers[13]: rsp |
*
*/
class Context final {
public:
using Callback = void (*)(Coroutine *);
using Word = void *;
constexpr static size_t STACK_SIZE = 1 << 17;
constexpr static size_t RDI = 7;
constexpr static size_t RSI = 8;
constexpr static size_t RET = 9;
constexpr static size_t RSP = 13;
public:
void prepare(Callback ret, Word rdi);
void switch_from(Context *previous);
bool test();
private:
Word get_stack_pointer();
void fill_registers(Word sp, Callback ret, Word rdi, ...);
private:
/**
* We must ensure that registers are at the top of the memory layout.
*
* So the Context must have no virtual method, and len at least 14!
*/
Word _registers[14];
char _stack[STACK_SIZE];
};
} // namespace stack_co
#endif //COROUTINE_CONTEXT_H
對應的 C++ 文件:
stack_co/context.cc
#include "context.h"
extern "C" {
extern void switch_context(stack_co::Context *, stack_co::Context *) asm("switch_context");
}
namespace stack_co {
void Context::switch_from(Context *previous) {
switch_context(previous, this);
}
void Context::prepare(Context::Callback ret, Context::Word rdi) {
Word sp = get_stack_pointer();
fill_registers(sp, ret, rdi);
}
bool Context::test() {
char current;
ptrdiff_t diff = std::distance(std::begin(_stack), ¤t);
return diff >= 0 && diff < STACK_SIZE;
}
Context::Word Context::get_stack_pointer() {
auto sp = std::end(_stack) - sizeof(Word);
sp = decltype(sp)(reinterpret_cast<size_t>(sp) & (~0xF));
return sp;
}
void Context::fill_registers(Word sp, Callback ret, Word rdi, ...) {
::memset(_registers, 0, sizeof _registers);
auto pRet = (Word *) sp;
*pRet = (Word) ret;
_registers[RSP] = sp;
_registers[RET] = *pRet;
_registers[RDI] = rdi;
}
} // namespace stack_co
其中,C++ 中的實現使用了彙編:
stack_co/switch_context.S
.globl switch_context
.type switch_context, @function
switch_context:
movq %rsp, %rax
movq %rax, 104(%rdi)
movq %rbx, 96(%rdi)
movq %rcx, 88(%rdi)
movq %rdx, 80(%rdi)
movq 0(%rax), %rax
movq %rax, 72(%rdi)
movq %rsi, 64(%rdi)
movq %rdi, 56(%rdi)
movq %rbp, 48(%rdi)
movq %r8, 40(%rdi)
movq %r9, 32(%rdi)
movq %r12, 24(%rdi)
movq %r13, 16(%rdi)
movq %r14, 8(%rdi)
movq %r15, (%rdi)
movq 48(%rsi), %rbp
movq 104(%rsi), %rsp
movq (%rsi), %r15
movq 8(%rsi), %r14
movq 16(%rsi), %r13
movq 24(%rsi), %r12
movq 32(%rsi), %r9
movq 40(%rsi), %r8
movq 56(%rsi), %rdi
movq 72(%rsi), %rax
movq 80(%rsi), %rdx
movq 88(%rsi), %rcx
movq 96(%rsi), %rbx
movq 64(%rsi), %rsi
movq %rax, (%rsp)
xorq %rax, %rax
ret
上面的 Context 的核心功能 switch_context
主要就是通過彙編 stack_co/switch_context.S
實現的,主要核心就是一個switch
過程。
這裏調用時rdi
(previous)和rsi
(next)分別指向Context
實例的地址。首先是保存當前的寄存器上下文到 previous 的 _registers 中:
-
真實的
rsp
存放到 previous 的 104 中(13*8,可能是位於棧區的 rsp,也可能是協程僞造的 rsp),而返回地址放到 previous 的 72 中; -
其餘的按部就班賦值到 previous 的 _registers 中;
恢復過程則是從next
的_registers 中恢復:
-
首次啓動時已經由
preapre
方法把必要的 ret 和函數傳參 rdi 被寫到_registers 上了,因此恢復時相當於直接調用對應函數(既調用callWhenFinish(next)
,裏面嵌套着實際的用戶回調); -
如果非首次啓動,那麼 ret 就是協程執行中的控制流;
-
不管怎樣,在恢復時,把當前
rsp
覆蓋return address
,然後使用ret
指令執行後就能切換到對應的控制流。
具體的彙編含義在前文中已經完完整整講述了,這裏不再贅述;
而其他的方法,如:prepare
、get_stack_pointer
、fill_registers
實際上都是爲了獲取當前調用棧的上下文信息;
補充:彙編擴展名的差異
上文中的彙編文件命名爲:
switch_context.S
,在 Unix/Linux 系統中:
.a
是靜態庫的常用擴展 (也就是用多個.o
文件製作的檔案ar(1)
),動態庫,即共享對象,使用.so
;
.s
用於 asm 編譯器輸出 (gcc -S foo.c
生成 asm 輸出,默認文件名爲foo.s
);
.S
用於手寫的 asm 源文件,並且.S
適用於 GNUas
語法中的 asm,無論是否使用任何 C 預處理器功能;例如,在 glibc 的源代碼樹中使用
.S
的所有 ASM 源文件;通常情況下:
具有 gcc 背景的人可能將他們的 MIPS asm 放入文件
.S
或.s
文件中;而具有更多 NASM/YASM 經驗 (或 Windows) 的人可能會選擇
.asm
擴展名;但是建議不要使用
.s
文件,因爲它很容易被覆蓋:gcc -S foo.c
;
測試代碼
測試代碼如下:
stack_co/stack_co_test.cc
#include "coroutine.h"
#include "environment.h"
#include "utils.h"
#include <iostream>
namespace stack_co {
namespace this_coroutine {
inline void yield() {
return ::stack_co::Coroutine::yield();
}
} // namespace this_coroutine
inline bool test() {
return Coroutine::test();
}
inline Environment &open() {
return Environment::instance();
}
} // namespace stack_co
void where() {
std::cout << "running code in a "
<< (stack_co::test() ? "coroutine" : "thread")
<< std::endl;
}
void print1() {
std::cout << 1 << std::endl;
stack_co::this_coroutine::yield();
std::cout << 2 << std::endl;
}
void print2(int i, stack_co::Coroutine *co1) {
std::cout << i << std::endl;
co1->resume();
where();
std::cout << "bye" << std::endl;
}
int main() {
auto &env = stack_co::open();
auto co1 = env.create_coroutine(print1);
auto co2 = env.create_coroutine(print2, 3, co1.get());
co1->resume();
co2->resume();
where();
return 0;
}
上面的代碼首先在 main 函數中創建了一個 Environment,隨後加入了兩個函數:
auto co1 = env.create_coroutine(print1);
auto co2 = env.create_coroutine(print2, 3, co1.get());
隨後啓動兩個協程;
首先進入 print1
,打印 1
;
然後, print1
釋放 CPU,切換至 print2
打印 3
;
然後,在 print2
函數中回覆協程 1,繼續進入 print1
中執行,並打印 2
;
然後,print1
函數退出,調用棧返回至 print2
中,調用 where
函數;
然後,在 print2
函數中打印 bye
;
最後,print2
函數返回,在 main 函數中調用 where
;
代碼執行後,輸出結果如下:
1
3
2
running code in a coroutine
bye
running code in a thread
可以看到,跟隨着代碼來看協程的調用棧的切換是很清晰的!
實際上微信開源的 libco 不僅提供了一套類 pthread 的協程通信機制,同時可以零改造地將三方庫的阻塞 IO 調用進行協程化;
感興趣的可以看:微信 libco 協程庫源碼分析
基於ucontext
實現的無棧協程
上面的例子是使用匯編實現的有棧協程,相對應的,我們繼續使用 ucontext
庫來實現一個無棧協程;
源代碼:https://github.com/JasonkayZK/cpp-learn/tree/coroutine/stackless_co
協程調用函數的定義
爲了簡單起見,我們這裏定義的協程可以調用的函數簽名爲:
stackless_co/utils.h
typedef void (*coroutine_func)(Schedule *, void *ud);
在調用時,可以傳入一個 arg 結構體,來使用;
例如:
struct args {
int n;
};
作爲參數 *ud
;
同時,考慮到參數的通用性,這裏使用了
void*
作爲入參和返回值;
協程實例
協程的定義如下:
stackless_co/coroutine.h
#ifndef COROUTINE_COROUTINE_H
#define COROUTINE_COROUTINE_H
#include "utils.h"
#include "schedule.h"
#include <cstdio>
#include <cstdlib>
#include <cassert>
#include <cstddef>
#include <cstring>
#include <cstdint>
#if __APPLE__ && __MACH__
#include <sys/ucontext.h>
#else
#include <ucontext.h>
#endif
namespace stackless_co {
class Schedule;
class Coroutine {
public:
static Coroutine *new_co(Schedule *s, coroutine_func func, void *ud);
void delete_co();
inline coroutine_func get_func() {
return func;
}
inline ucontext_t *get_ctx() {
return &ctx;
}
inline int get_status() {
return status;
}
inline ptrdiff_t get_size() {
return this->size;
}
inline char *get_stack() {
return this->stack;
}
inline ptrdiff_t get_cap() {
return this->cap;
}
inline void *get_ud() {
return this->ud;
}
inline void set_status(int status) {
this->status = status;
}
inline void set_stack(char *stack) {
this->stack = stack;
}
inline void set_cap(ptrdiff_t cap) {
this->cap = cap;
}
inline void set_size(ptrdiff_t size) {
this->size = size;
}
private:
coroutine_func func;
void *ud;
ucontext_t ctx;
ptrdiff_t cap;
ptrdiff_t size;
int status;
char *stack;
};
} // namespace stackless_co
#endif //COROUTINE_COROUTINE_H
協程 Coroutine 的定義比較簡單,主要用於存放一些協程的信息,並無特殊邏輯;
具體的幾個成員變量定義如下:
-
func
:協程所用的函數; -
ud
:協程參數; -
ctx
:協程上下文; -
cap
:已經分配的內存大小; -
size
:當前協程運行時棧,保存起來後的大小; -
status
:協程當前的狀態; -
stack
:當前協程的保存起來的運行時棧。
並且:
-
方法
coroutine_new
負責創建並初始化一個新協程對象,同時將該協程對象放到協程調度器裏面; -
方法
delete_co
用於關閉當前協程,並釋放協程中的資源。
對應的實現:
stackless_co/coroutine.cc
#include "utils.h"
#include "coroutine.h"
#include "schedule.h"
namespace stackless_co {
Coroutine *Coroutine::new_co(Schedule *s, coroutine_func func, void *ud) {
auto *co = new(Coroutine);
co->func = func;
co->ud = ud;
co->cap = 0;
co->size = 0;
co->status = Schedule::COROUTINE_READY;
co->stack = nullptr;
return co;
}
void Coroutine::delete_co() {
free(this->stack);
free(this);
}
} // namespace stackless_co
方法實現非常簡單,這裏不再贅述了;
協程調度 Schedule
我們通過 Schedule 類來調度協程;
stackless_co/schedule.h
#ifndef COROUTINE_SCHEDULE_H
#define COROUTINE_SCHEDULE_H
#include "utils.h"
#include "coroutine.h"
#include <cstdio>
#include <cstdlib>
#include <cassert>
#include <cstring>
#include <cstdint>
#if __APPLE__ && __MACH__
#include <sys/ucontext.h>
#else
#include <ucontext.h>
#endif
namespace stackless_co {
class Coroutine;
class Schedule {
private:
static void _save_stack(Coroutine *C, char *top);
public:
static Schedule *coroutine_open();
static void main_func(uint32_t low32, uint32_t hi32);
void coroutine_close();
int coroutine_new(coroutine_func, void *ud);
void coroutine_resume(int id);
int coroutine_status(int id);
int coroutine_running() const;
void coroutine_yield();
public:
constexpr static int COROUTINE_DEAD = 0;
constexpr static int COROUTINE_READY = 1;
constexpr static int COROUTINE_RUNNING = 2;
constexpr static int COROUTINE_SUSPEND = 3;
private:
constexpr static int STACK_SIZE = 1024 * 1024;
constexpr static int DEFAULT_COROUTINE = 16;
private:
char stack[STACK_SIZE];
ucontext_t main;
int nco;
int cap;
int running;
Coroutine **co;
};
} // namespace stackless_co
#endif //COROUTINE_SCHEDULE_H
協程調度器 Schedule
負責管理用其創建的所有協程,其中有幾個成員變量非常重要:
-
Coroutine **co
:是一個一維數組,存放了目前其管理的所有協程; -
ucontext_t main
:主協程的上下文,方便後面協程執行完後切回到主協程; -
char stack[STACK_SIZE]
:所有協程的運行時棧,具體共享棧的原理會在下文講到。
此外:
-
coroutine_open
是Schedule
的工廠方法,負責創建並初始化一個協程調度器; -
coroutine_close
負責銷燬協程調度器以及清理其管理的所有協程; -
coroutine_resume/coroutine_resume
分別用於釋放、恢復當前協程。
對應實現如下:
stackless_co/schedule.cc
#include "utils.h"
#include "schedule.h"
#include "coroutine.h"
#include <cstdlib>
#include <cassert>
#include <cstring>
#include <cstdint>
#if __APPLE__ && __MACH__
#include <sys/ucontext.h>
#else
#include <ucontext.h>
#endif
namespace stackless_co {
Schedule *Schedule::coroutine_open() {
auto *s = new(Schedule);
s->nco = 0;
s->cap = DEFAULT_COROUTINE;
s->running = -1;
s->co = (Coroutine **) malloc(sizeof(Coroutine) * s->cap);
memset(s->co, 0, sizeof(struct coroutine *) * s->cap);
return s;
}
void Schedule::main_func(uint32_t low32, uint32_t hi32) {
uintptr_t ptr = (uintptr_t) low32 | ((uintptr_t) hi32 << 32);
auto *s = (Schedule *) ptr;
int id = s->running;
Coroutine *c = s->co[id];
c->get_func()(s, c->get_ud());
c->delete_co();
s->co[id] = nullptr;
--s->nco;
s->running = -1;
}
void Schedule::coroutine_close() {
int i;
for (i = 0; i < this->cap; i++) {
Coroutine *inner_co = this->co[i];
if (inner_co) {
inner_co->delete_co();
}
}
free(this->co);
this->co = nullptr;
free(this);
}
int Schedule::coroutine_new(coroutine_func func, void *ud) {
Coroutine *inner_co = Coroutine::new_co(this, func, ud);
if (this->nco >= this->cap) {
int id = this->cap;
this->co = (Coroutine **) realloc(this->co, this->cap * 2 * sizeof(Coroutine));
memset(this->co + this->cap, 0, sizeof(struct coroutine *) * this->cap);
this->co[this->cap] = inner_co;
this->cap *= 2;
++this->nco;
return id;
} else {
int i;
for (i = 0; i < this->cap; i++) {
int id = (i + this->nco) % this->cap;
if (this->co[id] == nullptr) {
this->co[id] = inner_co;
++this->nco;
return id;
}
}
}
return 0;
}
void Schedule::coroutine_resume(int id) {
assert(this->running == -1);
assert(id >= 0 && id < this->cap);
Coroutine *c = this->co[id];
if (c == nullptr) return;
int status = c->get_status();
auto ptr = (uintptr_t) this;
switch (status) {
case COROUTINE_READY:
getcontext(c->get_ctx());
c->get_ctx()->uc_stack.ss_sp = this->stack;
c->get_ctx()->uc_stack.ss_size = STACK_SIZE;
c->get_ctx()->uc_link = &this->main;
this->running = id;
c->set_status(COROUTINE_RUNNING);
makecontext(c->get_ctx(), (void (*)()) main_func, 2, (uint32_t) ptr, (uint32_t) (ptr >> 32));
swapcontext(&this->main, c->get_ctx());
break;
case COROUTINE_SUSPEND:
memcpy(this->stack + STACK_SIZE - c->get_size(), c->get_stack(), c->get_size());
this->running = id;
c->set_status(COROUTINE_RUNNING);
swapcontext(&this->main, c->get_ctx());
break;
default:
assert(0);
}
}
int Schedule::coroutine_status(int id) {
assert(id >= 0 && id < this->cap);
if (this->co[id] == nullptr) {
return COROUTINE_DEAD;
}
return this->co[id]->get_status();
}
int Schedule::coroutine_running() const {
return this->running;
}
void Schedule::coroutine_yield() {
int id = this->running;
assert(id >= 0);
Coroutine *c = this->co[id];
assert((char *) &c > this->stack);
_save_stack(c, this->stack + STACK_SIZE);
c->set_status(COROUTINE_SUSPEND);
this->running = -1;
swapcontext(c->get_ctx(), &this->main);
}
void Schedule::_save_stack(Coroutine *c, char *top) {
char dummy = 0;
assert(top - &dummy <= STACK_SIZE);
if (c->get_cap() < top - &dummy) {
free(c->get_stack());
c->set_cap(top - &dummy);
c->set_stack(static_cast<char *>(malloc(c->get_cap())));
}
c->set_size(top - &dummy);
memcpy(c->get_stack(), &dummy, c->get_size());
}
} // namespace stackless_co
下面分別來看;
協程的創建: coroutine_new
int Schedule::coroutine_new(coroutine_func func, void *ud) {
Coroutine *inner_co = Coroutine::new_co(this, func, ud);
if (this->nco >= this->cap) {
int id = this->cap;
this->co = (Coroutine **) realloc(this->co, this->cap * 2 * sizeof(Coroutine));
memset(this->co + this->cap, 0, sizeof(struct coroutine *) * this->cap);
this->co[this->cap] = inner_co;
this->cap *= 2;
++this->nco;
return id;
} else {
int i;
for (i = 0; i < this->cap; i++) {
int id = (i + this->nco) % this->cap;
if (this->co[id] == nullptr) {
this->co[id] = inner_co;
++this->nco;
return id;
}
}
}
return 0;
}
coroutine_new
負責創建並初始化一個新協程對象,同時將該協程對象放到協程調度器裏面。
這裏的實現有兩個非常值得學習的點:
-
擴容:當目前尚存活的線程個數
nco
已經等於協程調度器的容量cap
了,這個時候需要對協程調度器進行擴容,這裏直接就是非常經典簡單的 2 倍擴容; -
如果無需擴容,則需要找到一個空的位置,放置初始化好的協程;這裏從第
nco
位開始尋找(nco
代表當前存活的個數;因爲一般來說,前面幾位最開始都是存活的,從第nco
位開始找,效率會更高。
這樣,一個協程對象就被創建好,此時該協程的狀態是 READY
,但尚未正式執行。
我們需要調用 coroutine_resume
方法啓動協程,下面來看 coroutine_resume
方法。
coroutine_resume(READY -> RUNNING)
調用 coroutine_resume
方法會切入到指定協程中執行,此時,當前正在執行的協程的上下文會被保存起來,同時上下文替換成新的協程,並將該協程的狀態置爲 RUNNING
。
進入 coroutine_resume
函數的前置狀態有兩個 READY
和 SUSPEND
,這兩個狀態下 coroutine_resume
的處理方法也是有很大不同。我們先看下協程在 READY 狀態下進行 coroutine_resume
的流程:
這塊代碼比較短,但是非常重要,所以我就直接貼代碼了:
// 初始化 ucontext_t 結構體,將當前的上下文放到 C->ctx 裏面
getcontext(c->get_ctx());
// 將當前協程的運行時棧的棧頂設置爲 s->stack,每個協程都這麼設置,這就是所謂的共享棧。(注意,這裏是棧頂)
c->get_ctx()->uc_stack.ss_sp = this->stack;
c->get_ctx()->uc_stack.ss_size = STACK_SIZE;
c->get_ctx()->uc_link = &this->main;
this->running = id;
c->set_status(COROUTINE_RUNNING);
// 設置執行 c->ctx 函數, 並將 s 作爲參數傳進去
makecontext(c->get_ctx(), (void (*)()) main_func, 2, (uint32_t) ptr, (uint32_t) (ptr >> 32));
// 將當前的上下文放入 s->main 中,並將 c->ctx 的上下文替換到當前上下文
swapcontext(&this->main, c->get_ctx());
這段函數非常的重要,有幾個不可忽視的點:
-
getcontext(c->get_ctx());
初始化 ucontext_t 結構體,將當前的上下文放到c->ctx
裏面; -
c->get_ctx()->uc_stack.ss_sp = this->stack;
設置當前協程的運行時棧,也是共享棧; -
c->get_ctx()->uc_link = &this->main;
如果協程執行完,則切換到s->main
主協程中進行執行;如果不設置,則默認爲 nullptr,那麼協程執行完,整個程序就結束了。
接下來是 makecontext
,這個函數用來設置對應 ucontext 的執行函數;如上,將 c->ctx
的執行函數體設置爲了 mainfunc
。
makecontext
後面的兩個參數也非常有意思:
可以看出來其入參是把一個指針掰成了兩個 int 作爲參數傳給 mainfunc
;而在 mainfunc
的實現可以看到,其又會把這兩個 int 拼成 Schedule*
。
爲什麼不直接傳 Schedule*
,而要這麼做,通過先拆兩半,再在函數中拼起來呢?
這是因爲 makecontext
的函數指針的參數是 uint32_t
類型,在 64 位系統下,一個 uint32_t
沒法承載一個指針, 所以基於兼容性的考慮,才採用了這種做法;
接下來調用了 swapcontext
函數,這個函數比較簡單,但也非常核心:
其作用是將當前的上下文內容放入 s->main
中,並使用 c->ctx
的上下文替換到當前上下文(類似於前文彙編的作用)。
這樣的話,就會執行新的上下文對應的程序了;(在 coroutine 中, 也就是開始執行 mainfunc
這個函數,mainfunc
是對用戶提供的協程函數的封裝)。
協程的切出:coroutine_yield
調用 coroutine_yield
可以使當前正在運行的協程切換到主協程中運行;此時,該協程會進入 SUSPEND
狀態。
coroutine_yield
的具體實現依賴於兩個行爲:
-
調用
_save_stack
將當前協程的棧保存起來,本例中協程的實現是基於共享棧的,所以協程的棧內容需要單獨保存起來; -
swapcontext
將當前上下文保存到當前協程的 ucontext 裏面,同時替換當前上下文爲主協程的上下文;這樣的話,當前協程會被掛起,主協程會被繼續執行。
這裏有個點極其關鍵,就是:如何保存當前協程的運行時棧,即如何獲取整個棧的內存空間;
**注:**我們都知道,調用棧的生長方向是從高地址往低地址;
因此,我們只要找到棧的棧頂和棧底的地址,就可以找到整個棧內存空間了。
因爲協程的運行時棧的內存空間是自己分配的(在 coroutine_resume 階段設置了 c->ctx.uc_stack.ss_sp = s.this->stack
)。
根據以上理論,棧的生長方向是高地址到低地址,因此:
棧底的就是內存地址最大的位置,即 s->stack + STACK_SIZE
就是棧底位置。
那麼,如何找到棧頂的位置呢?是通過下面的方法做的:
void Schedule::_save_stack(Coroutine *c, char *top) {
char dummy = 0;
assert(top - &dummy <= STACK_SIZE);
if (c->get_cap() < top - &dummy) {
free(c->get_stack());
c->set_cap(top - &dummy);
c->set_stack(static_cast<char *>(malloc(c->get_cap())));
}
c->set_size(top - &dummy);
memcpy(c->get_stack(), &dummy, c->get_size());
}
這裏特意使用到了一個 dummy 變量,這個 dummy 的作用非常關鍵也非常巧妙。
因爲 dummy 變量是剛剛分配到棧上的,因此,此時就位於 棧的最頂部位置。
並且,此時整個內存佈局如下圖所示:
因此整個棧的大小就是從棧底到棧頂,s->stack + STACK_SIZE - &dummy
。
最後又調用了 memcpy 將當前運行時棧的內容,拷貝到了 c->stack
中保存了起來。
coroutine_resume(SUSPEND -> RUNNING)
當協程被 yield
之後會進入 SUSPEND
階段,對該協程調用 coroutine_resume
會再次切入該協程。
這部分的代碼如下:
memcpy(this->stack + STACK_SIZE - c->get_size(), c->get_stack(), c->get_size());
this->running = id;
c->set_status(COROUTINE_RUNNING);
swapcontext(&this->main, c->get_ctx());
這裏的實現有兩個重要的點:
-
memcpy(this->stack + STACK_SIZE - c->get_size(), c->get_stack(), c->get_size());
:我們知道,在yield
的時候,協程的棧內容保存到了 C->stack 數組中;這個時候,就是用memcpy
把協程的之前保存的棧內容,重新拷貝到運行時棧裏面;這裏有個點,拷貝的開始位置,需要簡單計算下s->stack + STACK_SIZE - c->size
這個位置就是之前協程的棧頂位置; -
swapcontext(&this->main, c->get_ctx());
交換上下文;這點在上文有具體描述。
補充:共享棧
共享棧這個詞在 libco 中提到的多,其實 coroutine 也是用的共享棧模型;
共享棧這個東西說起來很玄乎,實際原理不復雜:本質就是所有的協程在運行的時候都使用同一個棧空間。
有共享棧自然就有非共享棧,也就是每個協程的棧空間都是獨立的,固定大小:
-
好處是協程切換的時候,內存不用拷貝來拷貝去;
-
壞處則是 內存空間浪費;
因爲棧空間在運行時不能隨時擴容,否則如果有指針操作執行了棧內存,擴容後將導致指針失效。
因此,爲了防止棧內存不夠,每個協程都要預先開一個足夠的棧空間使用;當然很多協程在實際運行中也用不了這麼大的空間,就必然造成內存的浪費和開闢大內存造成的性能損耗。
共享棧則是提前開了一個足夠大的棧空間(如上面的實現 STACK_SIZE = 1024 * 1024;
,即1M
大小);所有的棧運行的時候,都使用這個棧空間。
並且設置每個協程的運行時棧:
c->ctx.uc_stack.ss_sp = s->stack;
c->ctx.uc_stack.ss_size = STACK_SIZE;
對協程調用 yield
的時候,該協程棧內容暫時保存起來,保存的時候需要用到多少內存就開多少,這樣就減少了內存的浪費(即_save_stack 函數的內容)。
當 resume
該協程的時候,協程之前保存的棧內容,會被重新拷貝到運行時棧中,這就是所謂的共享棧的原理。
測試代碼
具體的測試代碼如下:
stackless_co/stackless_co_test.cc
#include "schedule.h"
#include <cstdio>
struct args {
int n;
};
void foo(stackless_co::Schedule *s, void *ud) {
args *arg = static_cast<args *>(ud);
int start = arg->n;
for (int i = 0; i < 5; i++) {
printf("coroutine %d : %d\n", s->coroutine_running(), start + i);
s->coroutine_yield();
}
}
void test(stackless_co::Schedule *s) {
struct args arg1 = {0};
struct args arg2 = {100};
int co1 = s->coroutine_new(foo, &arg1);
int co2 = s->coroutine_new(foo, &arg2);
printf("main start\n");
while (s->coroutine_status(co1) && s->coroutine_status(co2)) {
s->coroutine_resume(co1);
s->coroutine_resume(co2);
}
printf("main end\n");
}
int main() {
auto *s = stackless_co::Schedule::coroutine_open();
test(s);
s->coroutine_close();
return 0;
}
上面的代碼首先利用 coroutine_open
創建了協程調度器 s
,用來統一管理全部的協程。同時,在 test 函數中,創建了兩個協程 co1 和 co2,不斷的反覆 yield
和 resume
協程,直至兩個協程執行完畢。
執行後輸出:
main start
coroutine 0 : 0
coroutine 1 : 100
coroutine 0 : 1
coroutine 1 : 101
coroutine 0 : 2
coroutine 1 : 102
coroutine 0 : 3
coroutine 1 : 103
coroutine 0 : 4
coroutine 1 : 104
main end
可以看到,我們實現的代碼也可以完成協程的功能。
總結
本文首先介紹了:
-
什麼是協程;
-
協程的優勢;
-
協程的一些特點等;
隨後溫習了線程上下文相關的知識,包括:
-
線程時間、內存消耗;
-
如何保存上下文:
setjmp/longjmp
、ucontext
、彙編;
緊接着介紹了幾種協程的實現種類:
-
有棧和無棧協程;
-
對稱與非對稱協程;
然後探討了關於協程的一些其他內容,包括:
-
N:1 & N:M 協程;
-
協程的組成;
-
協程的調度;
-
協程相關的工具:鎖、條件變量等;
-
協程和線程的對比;
-
協程對 CPU/IO 的影響;
隨後是實踐部分,包括:
-
使用 C++20 標準中的協程;
-
動手實踐協程:
-
基於彙編實現的有棧協程;
-
基於
ucontext
實現的無棧協程;
附錄
源代碼:
- https://github.com/JasonkayZK/cpp-learn/tree/coroutine
參考文章:
-
https://www.zhihu.com/question/20511233
-
https://www.cnblogs.com/lidabo/p/15745123.html
-
https://eli.thegreenplace.net/2018/measuring-context-switching-and-memory-overheads-for-linux-threads/
-
https://zhuanlan.zhihu.com/p/27409164
-
https://www.zhihu.com/question/52193579/answer/447612082
-
http://www.caturra.cc/2022/05/29/%E5%AE%9E%E7%8E%B0%E4%B8%80%E4%B8%AA%E7%AE%80%E5%8D%95%E7%9A%84%E5%8D%8F%E7%A8%8B/
-
https://zhuanlan.zhihu.com/p/347445164
-
https://www.cyhone.com/articles/analysis-of-libco/
-
https://www.cyhone.com/articles/analysis-of-cloudwu-coroutine/
參考項目:
-
https://github.com/cloudwu/Coroutine
-
https://github.com/Caturra000/co
-
https://github.com/yyrdl/libco-code-study
-
https://github.com/Tencent/libco
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/SyWjLg3lYx3pIJQfEtik8Q