純 C 語言實現協程框架,底層原理與性能分析

協程不是系統級線程,很多時候協程被稱爲 “輕量級線程”、“微線程”、“纖程(fiber)” 等。簡單來說可以認爲協程是線程裏不同的函數,這些函數之間可以相互快速切換。

協程和用戶態線程非常接近,用戶態線程之間的切換不需要陷入內核,但部分操作系統中用戶態線程的切換需要內核態線程的輔助。

協程是編程語言(或者 lib)提供的特性(協程之間的切換方式與過程可以由編程人員確定),是用戶態操作。協程適用於 IO 密集型的任務。常見提供原生協程支持的語言有:c++20、golang、python 等,其他語言以庫的形式提供協程功能,比如 C++20 之前騰訊的 fiber 和 libco 等等。

一、協程 (Coroutine) 簡介

協程,又稱微線程,纖程。英文名 Coroutine。

協程的概念很早就提出來了,但直到最近幾年纔在某些語言(如 Lua)中得到廣泛應用。

子程序,或者稱爲函數,在所有語言中都是層級調用,比如 A 調用 B,B 在執行過程中又調用了 C,C 執行完畢返回,B 執行完畢返回,最後是 A 執行完畢。所以子程序調用是通過棧實現的,一個線程就是執行一個子程序。

子程序調用總是一個入口,一次返回,調用順序是明確的。而協程的調用和子程序不同,協程看上去也是子程序,但執行過程中,在子程序內部可中斷,然後轉而執行別的子程序,在適當的時候再返回來接着執行 (注意,在一個子程序中中斷,去執行其他子程序,不是函數調用,有點類似 CPU 的中斷)。

比如子程序 A、B:def A():

print '1'
print '2'
print '3'
def B():
print 'x'
print 'y'
print 'z'

假設由協程執行,在執行 A 的過程中,可以隨時中斷,去執行 B,B 也可能在執行過程中中斷再去執行 A,結果可能是:

1
2
x
y
3
z

但是在 A 中是沒有調用 B 的,所以協程的調用比函數調用理解起來要難一些。

看起來 A、B 的執行有點像多線程,但協程的特點在於是一個線程執行,那和多線程比,協程有何優勢?

最大的優勢就是協程極高的執行效率。因爲子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯。

第二大優勢就是不需要多線程的鎖機制,因爲只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多線程高很多。

因爲協程是一個線程執行,那怎麼利用多核 CPU 呢?最簡單的方法是多進程 + 協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。

Python 對協程的支持還非常有限,用在 generator 中的 yield 可以一定程度上實現協程。雖然支持不完全,但已經可以發揮相當大的威力了。

來看例子:

傳統的生產者 - 消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。

如果改用協程,生產者生產消息後,直接通過 yield 跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高:import time

def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
c.next()
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
if __name__=='__main__':
c = consumer()
produce(c)

執行結果:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK=

注意到 consumer 函數是一個 generator(生成器),把一個 consumer 傳入 produce 後:

整個流程無鎖,由一個線程執行,produce 和 consumer 協作完成任務,所以稱爲 “協程”,而非線程的搶佔式多任務。

二、C/C++ 協程

c++ 做爲一個相對古老的語言,曾經是步履蹣跚,直到 c++11 才奮起直追,但是對新技術的整體演進,其實 c++ 仍然是保守的。現在 c++20 的標準雖然已經實現了協程,但目前能比較好支持 c++20 的編譯器幾乎都和整體的環境不太兼容。換句話說,還需要繼續等待整個 c++ 的迭代版本,可能到了 c++23,整體的環境就會跟上去,協程纔會真正的飛入程序員的 “尋常百姓家”。

正如前面提到的,協程一般來說是不需要鎖的,但是如果協程的底層操作是跨越線程動態操作,仍然是需要鎖的存在的。這也是爲什麼要求儘量把協和的調度放到一個線程中去的原因。

首先需要聲明的是,這裏不打算花時間來介紹什麼是協程,以及協程和線程有什麼不同。如果對此有任何疑問,可以自行 google。與 Python 不同,C/C++ 語言本身是不能天然支持協程的。現有的 C++ 協程庫均基於兩種方案:利用匯編代碼控制協程上下文的切換,以及利用操作系統提供的 API 來實現協程上下文切換。

典型的例如:

  1. libco,Boost.context:基於彙編代碼的上下文切換

  2. phxrpc:基於 ucontext/Boost.context 的上下文切換

  3. libmill:基於 setjump/longjump 的協程切換

一般而言,基於彙編的上下文切換要比採用系統調用的切換更加高效,這也是爲什麼 phxrpc 在使用 Boost.context 時要比使用 ucontext 性能更好的原因。關於 phxrpc 和 libmill 具體的協程實現方式,以後有時間再詳細介紹。

2.1 協程的原理

既然協程如此厲害,那麼它實現的原理到底是什麼呢?協程最重要的應用方式就是把線程在內核上的開銷轉到了應用層的開銷,避開或者屏蔽(對應用者)線程操作的難度。那多線程操作的複雜性在哪兒呢?線程切換的隨機性和線程 Context 的跟隨,出入棧的保存和恢復,相關數據的鎖和讀寫控制。這纔是多線程的複雜性,如果再加異步引起的數據的非連續性和事件的非必然性操作,就更加增強了多線程遇到問題的判別和斷點的準確。

好,既然是這樣,那麼上框架,封裝不就得了。

協程和線程一樣,同樣需要做好兩個重點:第一個是協程的調度;第二是上下文的切換。而這兩點在 OS 的相關書籍中的介紹海了去了,這裏就不再贅述,原理基本都是一樣的。

如果以協程的關係來區分,協程也可以劃分爲對稱和非對稱協程兩種。協程間是平等關係的,就是對稱的;反之爲非對稱的。名字越起越多,但事兒還是那麼兩下子,大家自己體會即可。

只要能保證上面所說的對上下文數據的安全性保證又能夠實現協程在具體線程上的操作(某一個線程上執行的所有協程是串行的),那麼鎖的操作,從理論上講是不需要的(但實際開發中,因爲協程的應用還是少,所以還需要具體的問題具體分析)。協程的動作集中在應用層,而把複雜的內核調度的線程屏蔽在下層框架上(或者以後會不會出現 OS 進行封裝),從而大幅的降低了編程的難度,但卻擁有了線程快速異步調用的效果。

2.2 協程實現機制

協程的實現有以下幾種機制:

1、基於彙編的實現:這個對彙編編程得要求有兩下子,這個網上也有不少例子,就不再這裏搬門弄斧了。

2、基於 switch-case 來實現:這個其實更像是一個 C 語言的技巧,利用不同的狀態 Case 來達到目的,或者說大家認知中的對編程語言的一種內卷使用,網上有一個開源的項目:

https://github.com/georgeredinger/protothreads

3、基於操作系統提供的接口:Linux 的 ucontext,Windows 的 Fiber

Fiber 可能很多人都不熟悉,這其實就是微軟原來提供的纖程,有興趣的可以去網上查找一下,有幾年這個概念炒得還是比較火的。ucontext 是 Linux 上的一種操作,這兩個都可以當作是一種類似特殊的應用存在。遊戲界的大佬雲風(《遊戲之旅:我的編程感悟》作者)的 coroutine 就是類似於這種。興趣是編程的動力,大家如果對這些有興趣可以看看這本書,雖然其中很多的東西都比較老了,但是整體的思想還是非常有借鑑的。

4、基於接口 setjmp 和 longjmp 同時使用 static local 的變量來保存協程內部的數據

這兩個函數是 C 語言的一個非常有意思的應用,一般寫 C 好長時間的人,都沒接觸過這兩個 API 函數,這個函數的定義是:

int setjmp(jmp_buf envbuf);
void longjmp(jmp_buf envbuf, int val);

它們兩個的作用,前者是用來將棧楨(上下文)保存在 jmp_buf 這個數據結構中,然後可以通過後者 longjmp 在指定的位置恢復出來。這就類似於使用 goto 語句跳轉到任意的地方,然後再把相關的數據恢復出來。看一下個《C 專家編程》中的例子:

#include <stdio.h>
#include <setjmp.h>

jmp_buf buf;

banana()
{
    printf("in banana() \n");
    longjmp(buf,1);
    printf("you'll never see this,because i longjmp'd");
}

main()
{
    if(setjmp(buf))
        printf("back in main\n");
    else
    {
        printf("first time through\n");
        banana();
    }
}

看完了上述的幾種方法,其實網上還有幾種實現的方式,但都是比較刻板,有興趣的可以搜索一下,這裏就不提供鏈接了。

協程的實現,按理說還是 OS 搞定最好,其實是框架底層,但 C/C++ 的複雜性,以及不同的平臺和不同編譯器、庫之間的長期差異,導致這方面能做好的可能性真心是覺得不會太大。

三、libco 協程的創建和切換

在介紹 coroutine 的創建之前,我們先來熟悉一下 libco 中用來表示一個 coroutine 的數據結構,即定義在 co_routine_inner.h 中的 stCoRoutine_t:

struct stCoRoutine_t
{
stCoRoutineEnv_t *env; // 協程運行環境
pfn_co_routine_t pfn; // 協程執行的邏輯函數
void *arg; // 函數參數
coctx_t ctx; // 保存協程的下文環境
...
char cEnableSysHook; // 是否運行系統 hook,即非侵入式邏輯
char cIsShareStack; // 是否在共享棧模式
void *pvEnv;
stStackMem_t* stack_mem; // 協程運行時的棧空間
char* stack_sp; // 用來保存協程運行時的棧空間
unsigned int save_size;
char* save_buffer;
};

我們暫時只需要瞭解表示協程的最簡單的幾個參數,例如協程運行環境,協程的上下文環境,協程運行的函數以及運行時棧空間。後面的 stack_sp,save_size 和 save_buffer 與 libco 共享棧模式相關,有關共享棧的內容我們後續再說。

四、協程的實現與原理剖析

4.1 協程的起源

問題:協程存在的原因?協程能夠解決哪些問題?

在我們現在 CS,BS 開發模式下,服務器的吞吐量是一個很重要的參數。其實吞吐量是 IO 處理時間加上業務處理。爲了簡單起見,比如,客戶端與服務器之間是長連接的,客戶端定期給服務器發送心跳包數據。客戶端發送一次心跳包到服務器,服務器更新該新客戶端狀態的。心跳包發送的過程,業務處理時長等於 IO 讀取(RECV 系統調用)加上業務處理(更新客戶狀態)。吞吐量等於 1s 業務處理次數。

業務處理(更新客戶端狀態)時間,業務不一樣的,處理時間不一樣,我們就不做討論。

那如何提升 recv 的性能。若只有一個客戶端,recv 的性能也沒有必要提升,也不能提升。若在有百萬計的客戶端長連接的情況,我們該如何提升。以 Linux 爲例,在這裏需要介紹一個 “網紅” 就是 epoll。服務器使用 epoll 管理百萬計的客戶端長連接,代碼框架如下:

while (1) {
    int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);
 
    for (i = 0;i < nready;i ++) {
 
        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            int connfd = accept(listenfd, xxx, xxxx);
           
            setnonblock(connfd);
 
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = connfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
 
        } else {
            handle(sockfd);
        }
    }
}

對於響應式服務器,所有的客戶端的操作驅動都是來源於這個大循環。來源於 epoll_wait 的反饋結果。

對於服務器處理百萬計的 IO。Handle(sockfd) 實現方式有兩種。

第一種,handle(sockfd) 函數內部對 sockfd 進行讀寫動作。代碼如下

int handle(int sockfd) {
    recv(sockfd, rbuffer, length, 0);
    parser_proto(rbuffer, length);
    send(sockfd, sbuffer, length, 0);
}

handle 的 io 操作(send,recv)與 epoll_wait 是在同一個處理流程裏面的。這就是 IO 同步操作。

優點:

缺點:

第二種,handle(sockfd) 函數內部將 sockfd 的操作,push 到線程池中,代碼如下:

int thread_cb(int sockfd) {
    // 此函數是在線程池創建的線程中運行。
    // 與handle不在一個線程上下文中運行
    recv(sockfd, rbuffer, length, 0);
    parser_proto(rbuffer, length);
    send(sockfd, sbuffer, length, 0);
}
 
int handle(int sockfd) {
    //此函數在主線程 main_thread 中運行
    //在此處之前,確保線程池已經啓動。
    push_thread(sockfd, thread_cb); //將sockfd放到其他線程中運行。
}

Handle 函數是將 sockfd 處理方式放到另一個已經其他的線程中運行,如此做法,將 io 操作(recv,send)與 epoll_wait 不在一個處理流程裏面,使得 io 操作(recv,send)與 epoll_wait 實現解耦。這就叫做 IO 異步操作。

優點:

  1. 子模塊好規劃。

  2. 程序性能高。

缺點:

正因爲子模塊好規劃,使得模塊之間的 sockfd 的管理異常麻煩。每一個子線程都需要管理好 sockfd,避免在 IO 操作的時候,sockfd 出現關閉或其他異常。

上文有提到 IO 同步操作,程序響應慢,IO 異步操作,程序響應快。

下面來對比一下 IO 同步操作與 IO 異步操作。

代碼如下:

https://github.com/wangbojing/c1000k_test/blob/master/server_mulport_epoll.c

在這份代碼的 486 行,#if 1, 打開的時候,爲 IO 異步操作。關閉的時候,爲 IO 同步操作。

接下來把我測試接入量的結果粘貼出來。

對比項

Sockfd 管理

代碼邏輯

程序性能

有沒有一種方式,有異步性能,同步的代碼邏輯。來方便編程人員對 IO 操作的組件呢?有,採用一種輕量級的協程來實現。在每次 send 或者 recv 之前進行切換,再由調度器來處理 epoll_wait 的流程。

就是採用了基於這樣的思考,寫了 NtyCo,實現了一個 IO 異步操作與協程結合的組件。

4.2 協程的案例

問題:協程如何使用?與線程使用有何區別?

在做網絡 IO 編程的時候,有一個非常理想的情況,就是每次 accept 返回的時候,就爲新來的客戶端分配一個線程,這樣一個客戶端對應一個線程。就不會有多個線程共用一個 sockfd。每請求每線程的方式,並且代碼邏輯非常易讀。但是這只是理想,線程創建代價,調度代價就呵呵了。

先來看一下每請求每線程的代碼如下:

while(1) {
    socklen_t len = sizeof(struct sockaddr_in);
    int clientfd = accept(sockfd, (struct sockaddr*)&remote, &len);
 
    pthread_t thread_id;
    pthread_create(&thread_id, NULL, client_cb, &clientfd);
 
}

這樣的做法,寫完放到生產環境下面,如果你的老闆不打死你,你來找我。我來幫你老闆,爲民除害。

如果我們有協程,我們就可以這樣實現。參考代碼如下:

https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c
 
while (1) {
    socklen_t len = sizeof(struct sockaddr_in);
    int cli_fd = nty_accept(fd, (struct sockaddr*)&remote, &len);
        
    nty_coroutine *read_co;
    nty_coroutine_create(&read_co, server_reader, &cli_fd)}

這樣的代碼是完全可以放在生成環境下面的。如果你的老闆要打死你,你來找我,我幫你把你老闆打死,爲民除害。

線程的 API 思維來使用協程,函數調用的性能來測試協程。

NtyCo 封裝出來了若干接口,一類是協程本身的,二類是 posix 的異步封裝

協程 API:while

  1. 協程創建
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
  1. 協程調度器的運行
void nty_schedule_run(void)

POSIX 異步封裝 API:

int nty_socket(int domain, int type, int protocol)
int nty_accept(int fd, struct sockaddr *addr, socklen_t *len)
int nty_recv(int fd, void *buf, int length)
int nty_send(int fd, const void *buf, int length)
int nty_close(int fd)

4.3 協程的實現之工作流程

問題:協程內部是如何工作呢?

先來看一下協程服務器案例的代碼, 代碼參考:https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c

分別討論三個協程的比較晦澀的工作流程。第一個協程的創建;第二個 IO 異步操作;第三個協程子過程回調

(1) 創建協程

當我們需要異步調用的時候,我們會創建一個協程。比如 accept 返回一個新的 sockfd,創建一個客戶端處理的子過程。再比如需要監聽多個端口的時候,創建一個 server 的子過程,這樣多個端口同時工作的,是符合微服務的架構的。

創建協程的時候,進行了如何的工作?創建 API 如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)

協程不存在親屬關係,都是一致的調度關係,接受調度器的調度。調用 create API 就會創建一個新協程,新協程就會加入到調度器的就緒隊列中。

創建的協程具體步驟會在《協程的實現之原語操作》來描述。

(2) 實現 IO 異步操作

大部分的朋友會關心 IO 異步操作如何實現,在 send 與 recv 調用的時候,如何實現異步操作的。

先來看一下一段代碼:

while (1) {
    int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);
 
    for (i = 0;i < nready;i ++) {
 
        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            int connfd = accept(listenfd, xxx, xxxx);
           
            setnonblock(connfd);
 
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = connfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
 
        } else {
           
            epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
            recv(sockfd, buffer, length, 0);
 
            //parser_proto(buffer, length);
 
            send(sockfd, buffer, length, 0);
            epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, NULL);
        }
    }
}

在進行 IO 操作(recv,send)之前,先執行了 epoll_ctl 的 del 操作,將相應的 sockfd 從 epfd 中刪除掉,在執行完 IO 操作(recv,send)再進行 epoll_ctl 的 add 的動作。這段代碼看起來似乎好像沒有什麼作用。

如果是在多個上下文中,這樣的做法就很有意義了。能夠保證 sockfd 只在一個上下文中能夠操作 IO 的。不會出現在多個上下文同時對一個 IO 進行操作的。協程的 IO 異步操作正式是採用此模式進行的。

把單一協程的工作與調度器的工作的劃分清楚,先引入兩個原語操作 resume,yield 會在《協程的實現之原語操作》來講解協程所有原語操作的實現,yield 就是讓出運行,resume 就是恢復運行。

調度器與協程的上下文切換如下圖所示:

在協程的上下文 IO 異步操作(nty_recv,nty_send)函數,步驟如下:

IO 異步操作的上下文切換的時序圖如下:

(3) 回調協程的子過程

在 create 協程後,何時回調子過程?何種方式回調子過程?

首先來回顧一下 x86_64 寄存器的相關知識。彙編與寄存器相關知識還會在《協程的實現之切換》繼續深入探討的。x86_64 的寄存器有 16 個 64 位寄存器,分別是:

%rax, %rbx,%rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12, %r13, %r14, %r15。

以 NtyCo 的實現爲例,來分析這個過程。CPU 有一個非常重要的寄存器叫做 EIP,用來存儲 CPU 運行下一條指令的地址。我們可以把回調函數的地址存儲到 EIP 中,將相應的參數存儲到相應的參數寄存器中。實現子過程調用的邏輯代碼如下:

void _exec(nty_coroutine *co) {
    co->func(co->arg); //子過程的回調函數
}
 
void nty_coroutine_init(nty_coroutine *co) {
    //ctx 就是協程的上下文
    co->ctx.edi = (void*)co; //設置參數
    co->ctx.eip = (void*)_exec; //設置回調函數入口
    //當實現上下文切換的時候,就會執行入口函數_exec , _exec 調用子過程func
}

4.4 協程的實現之原語操作

問題:協程的內部原語操作有哪些?分別如何實現的?

協程的核心原語操作:create, resume, yield。協程的原語操作有 create 怎麼沒有 exit?以 NtyCo 爲例,協程一旦創建就不能有用戶自己銷燬,必須得以子過程執行結束,就會自動銷燬協程的上下文數據。以_exec 執行入口函數返回而銷燬協程的上下文與相關信息。co->func(co->arg) 是子過程,若用戶需要長久運行協程,就必須要在 func 函數里面寫入循環等操作。所以 NtyCo 裏面沒有實現 exit 的原語操作。

create:創建一個協程。

實現代碼如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {
 
    assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);
    nty_schedule *sched = nty_coroutine_get_sched();
 
    if (sched == NULL) {
        nty_schedule_create(0);
        
        sched = nty_coroutine_get_sched();
        if (sched == NULL) {
            printf("Failed to create schedulern");
            return -1;
        }
    }
 
    nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
    if (co == NULL) {
        printf("Failed to allocate memory for new coroutinen");
        return -2;
    }
 
    //
    int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);
    if (ret) {
        printf("Failed to allocate stack for new coroutinen");
        free(co);
        return -3;
    }
 
    co->sched = sched;
    co->stack_size = sched->stack_size;
    co->status = BIT(NTY_COROUTINE_STATUS_NEW); //
    co->id = sched->spawned_coroutines ++;
co->func = func;
 
    co->fd = -1;
co->events = 0;
 
    co->arg = arg;
    co->birth = nty_coroutine_usec_now();
    *new_co = co;
 
    TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);
 
    return 0;
}

yield:讓出 CPU。

void nty_coroutine_yield(nty_coroutine *co)

參數:當前運行的協程實例

調用後該函數不會立即返回,而是切換到最近執行 resume 的上下文。該函數返回是在執行 resume 的時候,會有調度器統一選擇 resume 的,然後再次調用 yield 的。resume 與 yield 是兩個可逆過程的原子操作。

resume:恢復協程的運行權

int nty_coroutine_resume(nty_coroutine *co)

參數:需要恢復運行的協程實例

調用後該函數也不會立即返回,而是切換到運行協程實例的 yield 的位置。返回是在等協程相應事務處理完成後,主動 yield 會返回到 resume 的地方。

4.5 協程的實現之切換

問題:協程的上下文如何切換?切換代碼如何實現?

首先來回顧一下 x86_64 寄存器的相關知識。x86_64 的寄存器有 16 個 64 位寄存器,分別是:

%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12,%r13, %r14, %r15。

%rax 作爲函數返回值使用的。

%rsp 棧指針寄存器,指向棧頂

%rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數參數,依次對應第 1 參數,第 2 參數。。。

%rbx, %rbp, %r12, %r13, %r14, %r15 用作數據存儲,遵循調用者使用規則,換句話說,就是隨便用。調用子函數之前要備份它,以防它被修改

%r10, %r11 用作數據存儲,就是使用前要先保存原值。

上下文切換,就是將 CPU 的寄存器暫時保存,再將即將運行的協程的上下文寄存器,分別 mov 到相對應的寄存器上。此時上下文完成切換。如下圖所示:

切換_switch 函數定義:

int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);

我們 nty_cpu_ctx 結構體的定義,爲了兼容 x86,結構體項命令採用的是 x86 的寄存器名字命名。

typedef struct _nty_cpu_ctx {
void *esp; //
void *ebp;
void *eip;
void *edi;
void *esi;
void *ebx;
void *r1;
void *r2;
void *r3;
void *r4;
void *r5;
} nty_cpu_ctx;

_switch 返回後,執行即將運行協程的上下文。是實現上下文的切換

_switch 的實現代碼:

0: __asm__ (
1: "    .text                                  n"
2: "       .p2align 4,,15                                   n"
3: ".globl _switch                                          n"
4: ".globl __switch                                         n"
5: "_switch:                                                n"
6: "__switch:                                               n"
7: "       movq %rsp, 0(%rsi)      # save stack_pointer     n"
8: "       movq %rbp, 8(%rsi)      # save frame_pointer     n"
9: "       movq (%rsp), %rax       # save insn_pointer      n"
10: "       movq %rax, 16(%rsi)                              n"
11: "       movq %rbx, 24(%rsi)     # save rbx,r12-r15       n"
12: "       movq %r12, 32(%rsi)                              n"
13: "       movq %r13, 40(%rsi)                              n"
14: "       movq %r14, 48(%rsi)                              n"
15: "       movq %r15, 56(%rsi)                              n"
16: "       movq 56(%rdi), %r15                              n"
17: "       movq 48(%rdi), %r14                              n"
18: "       movq 40(%rdi), %r13     # restore rbx,r12-r15    n"
19: "       movq 32(%rdi), %r12                              n"
20: "       movq 24(%rdi), %rbx                              n"
21: "       movq 8(%rdi), %rbp      # restore frame_pointer  n"
22: "       movq 0(%rdi), %rsp      # restore stack_pointer  n"
23: "       movq 16(%rdi), %rax     # restore insn_pointer   n"
24: "       movq %rax, (%rsp)                                n"
25: "       ret                                              n"
26: );

按照 x86_64 的寄存器定義,%rdi 保存第一個參數的值,即 new_ctx 的值,%rsi 保存第二個參數的值,即保存 cur_ctx 的值。X86_64 每個寄存器是 64bit,8byte。

Movq %rsp, 0(%rsi) 保存在棧指針到 cur_ctx 實例的 rsp 項

Movq %rbp, 8(%rsi)

Movq (%rsp), %rax #將棧頂地址裏面的值存儲到 rax 寄存器中。Ret 後出棧,執行棧頂

Movq %rbp, 8(%rsi) #後續的指令都是用來保存 CPU 的寄存器到 new_ctx 的每一項中

Movq 8(%rdi), %rbp #將 new_ctx 的值

Movq 16(%rdi), %rax #將指令指針 rip 的值存儲到 rax 中

Movq %rax, (%rsp) # 將存儲的 rip 值的 rax 寄存器賦值給棧指針的地址的值。

Ret # 出棧,回到棧指針,執行 rip 指向的指令。

上下文環境的切換完成。

4.6 協程的實現之定義

問題:協程如何定義? 調度器如何定義?

先來一道設計題:

設計一個協程的運行體 R 與運行體調度器 S 的結構體

這道設計題拆分兩個個問題,一個運行體如何高效地在多種狀態集合更換。調度器與運行體的功能界限。

運行體如何高效地在多種狀態集合更換

新創建的協程,創建完成後,加入到就緒集合,等待調度器的調度;協程在運行完成後,進行 IO 操作,此時 IO 並未準備好,進入等待狀態集合;IO 準備就緒,協程開始運行,後續進行 sleep 操作,此時進入到睡眠狀態集合。

就緒 (ready),睡眠(sleep),等待(wait) 集合該採用如何數據結構來存儲?

就緒 (ready) 集合並不沒有設置優先級的選型,所有在協程優先級一致,所以可以使用隊列來存儲就緒的協程,簡稱爲就緒隊列(ready_queue)。

睡眠 (sleep) 集合需要按照睡眠時長進行排序,採用紅黑樹來存儲,簡稱睡眠樹 (sleep_tree) 紅黑樹在工程實用爲 < key, value>, key 爲睡眠時長,value 爲對應的協程結點。

等待 (wait) 集合,其功能是在等待 IO 準備就緒,等待 IO 也是有時長的,所以等待 (wait) 集合採用紅黑樹的來存儲,簡稱等待樹(wait_tree),此處借鑑 nginx 的設計。

Coroutine 就是協程的相應屬性,status 表示協程的運行狀態。sleep 與 wait 兩顆紅黑樹,ready 使用的隊列,比如某協程調用 sleep 函數,加入睡眠樹 (sleep_tree),status |= S 即可。比如某協程在等待樹(wait_tree) 中,而 IO 準備就緒放入 ready 隊列中,只需要移出等待樹(wait_tree),狀態更改 status &= ~W 即可。有一個前提條件就是不管何種運行狀態的協程,都在就緒隊列中,只是同時包含有其他的運行狀態。

(2) 調度器與協程的功能界限

每一協程都需要使用的而且可能會不同屬性的,就是協程屬性。每一協程都需要的而且數據一致的,就是調度器的屬性。比如棧大小的數值,每個協程都一樣的後不做更改可以作爲調度器的屬性,如果每個協程大小不一致,則可以作爲協程的屬性。

用來管理所有協程的屬性,作爲調度器的屬性。比如 epoll 用來管理每一個協程對應的 IO,是需要作爲調度器屬性。

按照前面幾章的描述,定義一個協程結構體需要多少域,我們描述了每一個協程有自己的上下文環境,需要保存 CPU 的寄存器 ctx;需要有子過程的回調函數 func;需要有子過程回調函數的參數 arg;需要定義自己的棧空間 stack;需要有自己棧空間的大小 stack_size;需要定義協程的創建時間 birth;需要定義協程當前的運行狀態 status;需要定當前運行狀態的結點(ready_next, wait_node, sleep_node);需要定義協程 id;需要定義調度器的全局對象 sched。

協程的核心結構體如下:

typedef struct _nty_coroutine {
 
    nty_cpu_ctx ctx;
    proc_coroutine func;
    void *arg;
    size_t stack_size;
 
    nty_coroutine_status status;
    nty_schedule *sched;
 
    uint64_t birth;
    uint64_t id;
 
    void *stack;
 
    RB_ENTRY(_nty_coroutine) sleep_node;
    RB_ENTRY(_nty_coroutine) wait_node;
 
    TAILQ_ENTRY(_nty_coroutine) ready_next;
    TAILQ_ENTRY(_nty_coroutine) defer_next;
 
} nty_coroutine;

調度器是管理所有協程運行的組件,協程與調度器的運行關係。

調度器的屬性,需要有保存 CPU 的寄存器上下文 ctx,可以從協程運行狀態 yield 到調度器運行的。從協程到調度器用 yield,從調度器到協程用 resume 以下爲協程的定義。

typedef struct _nty_coroutine_queue nty_coroutine_queue;
 
typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;
typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;
 
typedef struct _nty_schedule {
    uint64_t birth;
nty_cpu_ctx ctx;
 
    struct _nty_coroutine *curr_thread;
    int page_size;
 
    int poller_fd;
    int eventfd;
    struct epoll_event eventlist[NTY_CO_MAX_EVENTS];
    int nevents;
 
    int num_new_events;
 
    nty_coroutine_queue ready;
    nty_coroutine_rbtree_sleep sleeping;
    nty_coroutine_rbtree_wait waiting;
 
} nty_schedule;

4.7 協程的實現之調度器

問題:協程如何被調度?

調度器的實現,有兩種方案,一種是生產者消費者模式,另一種多狀態運行。

(1) 生產者消費者模式

邏輯代碼如下:

while (1) {
 
        //遍歷睡眠集合,將滿足條件的加入到ready
        nty_coroutine *expired = NULL;
        while ((expired = sleep_tree_expired(sched)) != ) {
            TAILQ_ADD(&sched->ready, expired);
        }
 
        //遍歷等待集合,將滿足添加的加入到ready
        nty_coroutine *wait = NULL;
        int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
        for (i = 0;i < nready;i ++) {
            wait = wait_tree_search(events[i].data.fd);
            TAILQ_ADD(&sched->ready, wait);
        }
 
        // 使用resume回覆ready的協程運行權
        while (!TAILQ_EMPTY(&sched->ready)) {
            nty_coroutine *ready = TAILQ_POP(sched->ready);
            resume(ready);
        }
    }

(2) 多狀態運行

實現邏輯代碼如下:

while (1) {
 
        //遍歷睡眠集合,使用resume恢復expired的協程運行權
        nty_coroutine *expired = NULL;
        while ((expired = sleep_tree_expired(sched)) != ) {
            resume(expired);
        }
 
        //遍歷等待集合,使用resume恢復wait的協程運行權
        nty_coroutine *wait = NULL;
        int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
        for (i = 0;i < nready;i ++) {
            wait = wait_tree_search(events[i].data.fd);
            resume(wait);
        }
 
        // 使用resume恢復ready的協程運行權
        while (!TAILQ_EMPTY(sched->ready)) {
            nty_coroutine *ready = TAILQ_POP(sched->ready);
            resume(ready);
        }
    }

4.8 協程性能測試

測試環境:4 臺 VMWare 虛擬機

操作系統:ubuntu 14.04

服務器端測試代碼:https://github.com/wangbojing/NtyCo

客戶端測試代碼:https://github.com/wangbojing/c1000k_test/blob/master/client_mutlport_epoll.c

按照每一個連接啓動一個協程來測試。每一個協程棧空間 4096byte

6G 內存 –> 測試協程數量 100W 無異常。並且能夠正常收發數據。

五、協程創建和運行

由於多個協程運行於一個線程內部的,因此當創建線程中的第一個協程時,需要初始化該協程所在的環境 stCoRoutineEnv_t,這個環境是線程用來管理協程的,通過該環境,線程可以得知當前一共創建了多少個協程,當前正在運行哪一個協程,當前應當如何調度協程:

struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ]; // 記錄當前創建的協程
int iCallStackSize; // 記錄當前一共創建了多少個協程
stCoEpoll_t *pEpoll; // 該線程的協程調度器
// 在使用共享棧模式拷貝棧內存時記錄相應的 coroutine
stCoRoutine_t* pending_co;
stCoRoutine_t* occupy_co;
};

上述代碼表明 libco 允許一個線程內最多創建 128 個協程,其中 pCallStack[iCallStackSize-1] 也就是棧頂的協程表示當前正在運行的協程。當調用函數 co_create 時,首先檢查當前線程中的 coroutine env 結構是否創建。這裏 libco 對於每個線程內的 stCoRoutineEnv_t 並沒有使用 thread-local 的方式(例如 gcc 內置的 __thread,phxrpc 採用這種方式)來管理,而是預先定義了一個大的數組,並通過對應的 PID 來獲取其協程環境。

static stCoRoutineEnv_t* g_arrCoEnvPerThread[204800]
stCoRoutineEnv_t *co_get_curr_thread_env()
{
return g_arrCoEnvPerThread[ GetPid() ];
}

初始化 stCoRoutineEnv_t 時主要完成以下幾步:

爲 stCoRoutineEnv_t 申請空間並且進行初始化,設置協程調度器 pEpoll。

創建一個空的 coroutine,初始化其上下文環境 (有關 coctx 在後文詳細介紹),將其加入到該線程的協程環境中進行管理,並且設置其爲 main coroutine。這個 main coroutine 用來運行該線程主邏輯。

當初始化完成協程環境之後,調用函數 co_create_env 來創建具體的協程,該函數初始化一個協程結構 stCoRoutine_t,設置該結構中的各項字段,例如運行的函數 pfn,運行時的棧地址等等。需要說明的就是,如果使用了非共享棧模式,則需要爲該協程單獨申請棧空間,否則從共享棧中申請空間。棧空間表示如下:

struct stStackMem_t
{
stCoRoutine_t* occupy_co; // 使用該棧的協程
int stack_size; // 棧大小
char* stack_bp; // 棧的指針,棧從高地址向低地址增長
char* stack_buffer; // 棧底
};

使用 co_create 創建完一個協程之後,將調用 co_resume 來將該協程激活運行:

void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;
// 獲取當前正在運行的協程的結構
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
if( !co->cStart )
{
// 爲將要運行的 co 佈置上下文環境
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
co->cStart = 1;
}
env->pCallStack[ env->iCallStackSize++ ] = co; // 設置co爲運行的線程
co_swap( lpCurrRoutine, co );
}

函數 co_swap 的作用類似於 Unix 提供的函數 swapcontext:將當前正在運行的 coroutine 的上下文以及狀態保存到結構 lpCurrRoutine 中,並且將 co 設置成爲要運行的協程,從而實現協程的切換。co_swap 具體完成三項工作:

記錄當前協程 curr 的運行棧的棧頂指針,通過 char c; curr_stack_sp=&c 實現,當下次切換回 curr 時,可以從該棧頂指針指向的位置繼續,執行完 curr 後可以順利釋放該棧。

處理共享棧相關的操作,並且調用函數 coctx_swap 來完成上下文環境的切換。注意執行完 coctx_swap 之後,執行流程將跳到新的 coroutine 也就是 pending_co 中運行,後續的代碼需要等下次切換回 curr 時纔會執行。

當下次切換回 curr 時,處理共享棧相關的操作。

對應於 co_resume 函數,協程主動讓出執行權則調用 co_yield 函數。co_yield 函數調用了 co_yield_env,將當前協程與當前線程中記錄的其他協程進行切換:

void co_yield_env( stCoRoutineEnv_t *env )
{
stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
env->iCallStackSize--;
co_swap( curr, last);
}

前面我們已經提到過,pCallStack 棧頂所指向的即爲當前正在運行的協程所對應的結構,因此該函數將 curr 取出來,並將當前正運行的協程上下文保存到該結構上,並切換到協程 last 上執行。接下來我們以 32-bit 的系統爲例來分析 libco 是如何實現協程運行環境的切換的。

六、協程上下文的創建和切換

libco 使用結構 struct coctx_t 來表示一個協程的上下文環境:

struct coctx_t
{

if defined(__i386__)
void *regs[ 8 ];

else
void *regs[ 14 ];

endif
size_t ss_size;
char *ss_sp;
};

結合上圖,我們需要知道關鍵的幾點:

函數調用棧是調用者和被調用者共同負責佈置的。Caller 將其參數從右向左反向壓棧,再將調用後的返回地址壓棧,然後將執行流程交給 Callee。

典型的編譯器會將 Callee 函數彙編成爲以 push %ebp; move %ebp, %esp; sub $esp N; 這種形式開頭的彙編代碼。這幾句代碼主要目的是爲了方便 Callee 利用 ebp 來訪問調用者提供的參數以及自身的局部變量(如下圖)。

當調用過程完成清除了局部變量以後,會執行 pop %ebp; ret,這樣指令會跳轉到 RA 也就是返回地址上面執行。這一點也是實現協程切換的關鍵:我們只需要將指定協程的函數指針地址保存到 RA 中,當調用完 coctx_swap 之後,會自動跳轉到該協程的函數起始地址開始運行。

瞭解了這些,我們就來看一下協程上下文環境的初始化函數 coctx_make:

int coctx_make( coctx_t ctx, coctx_pfn_t pfn, const void s, const void *s1 )
{
char *sp = ctx->ss_sp + ctx->ss_size - sizeof(coctx_param_t);
sp = (char*)((unsigned long)sp & -16L);
coctx_param_t param = (coctx_param_t)sp ;
param->s1 = s;
param->s2 = s1;
memset(ctx->regs, 0, sizeof(ctx->regs));
ctx->regs[ kESP ] = (char)(sp) - sizeof(void);
ctx->regs[ kEIP ] = (char*)pfn;
return 0;
}

這段代碼應該比較好理解,首先爲函數 coctx_pfn_t 預留 2 個參數的棧空間並對其到 16 字節,之後將實參設置到預留的棧上空間中。最後在 ctx 結構中填入相應的,其中記錄 reg[kEIP] 返回地址爲函數指針 pfn,記錄 reg[kESP] 爲獲得的棧頂指針 sp 減去一個指針長度,這個減去的空間是爲返回地址 RA 預留的。當調用 coctx_swap 時,reg[kEIP] 會被放到返回地址 RA 的位置,待 coctx_swap 執行結束,自然會跳轉到函數 pfn 處執行。

coctx_swap(ctx1, ctx2) 在 coctx_swap.S 中實現。這裏可以看到,該函數並沒有使用 push %ebp; move %ebp, %esp; sub $esp N; 開頭,因此棧空間分佈中不會出現 ebp 的位置。coctx_swap 函數主要分爲兩段,其首先將當前的上下文環境保存到 ctx1 結構中:

leal 4(%esp), %eax // eax = old_esp + 4
movl 4(%esp), %esp // 將 esp 的值設爲 &ctx1(即ctx1的地址)
leal 32(%esp), %esp // esp = (char*)&ctx1 + 32
pushl %eax // ctx1->regs[EAX] = %eax
pushl %ebp // ctx1->regs[EBP] = %ebp
pushl %esi // ctx1->regs[ESI] = %esi
pushl %edi // ctx1->regs[EDI] = %edi
pushl %edx // ctx1->regs[EDX] = %edx
pushl %ecx // ctx1->regs[ECX] = %ecx
pushl %ebx // ctx1->regs[EBX] = %ebx
pushl -4(%eax) // ctx1->regs[EIP] = RA, 注意:%eax-4=%old_esp

這裏需要注意指令 leal 和 movl 的區別。leal 將 eax 的值設置成爲 esp 的值加 4,而 movl 將 esp 的值設爲 esp+4 所指向的內存上的值,也就是參數 ctx1 的地址。之後該函數將 ctx2 中記錄的上下文恢復到 CPU 寄存器中,並跳轉到其函數地址處運行:

movl 4(%eax), %esp // 將 esp 的值設爲 &ctx2(即ctx2的地址)
popl %eax // %eax = ctx1->regs[EIP],也就是 &pfn
popl %ebx // %ebx = ctx1->regs[EBP]
popl %ecx // %ecx = ctx1->regs[ECX]
popl %edx // %edx = ctx1->regs[EDX]
popl %edi // %edi = ctx1->regs[EDI]
popl %esi // %esi = ctx1->regs[ESI]
popl %ebp // %ebp = ctx1->regs[EBP]
popl %esp // %esp = ctx1->regs[ESP],即(char)(sp) - sizeof(void)
pushl %eax // RA = %eax = &pfn,注意此時esp已經指向了新的esp
xorl %eax, %eax // reset eax
ret

上面的代碼看起來可能有些繞:

首先 line 1 將 esp 設置爲參數 ctx2 的地址,後續的 popl 操作均在 ctx2 的內存空間上執行。

line 2-9 將 ctx2->regs[] 中的內容恢復到相應的寄存器中。還記得在前面 coctx_make 中設置了 regs[EIP] 和 regs[ESP] 嗎?這裏剛好就對應恢復了相應的值。

當執行完 line 9 之後,esp 已經指向了 ctx2 中新的棧頂指針,由於在 coctx_make 中預留了一個指針長度的 RA 空間,line 10 剛好將新的函數指針 &pfn 設置到該 RA 上。

最後執行 ret 指令時,函數流程將跳到 pfn 處執行。這樣,整個協程上下文的切換就完成了。

七、如何使用 libco

我們首先以 libco 提供的例子 example_echosvr.cpp 來介紹應用程序如何使用 libco 來編寫服務端程序。在 example_echosvr.cpp 的 main 函數中,主要執行如下幾步:

創建 socket,監聽在本機的 1024 端口,並設置爲非阻塞;

主線程使用函數 readwrite_coroutine 創建多個讀寫協程,調用 co_resume 啓動協程運行直到其掛起。這裏我們忽略掉無關的多進程 fork 的過程;

主線程繼續創建 socket 接收協程 accpet_co,同樣調用 co_resume 啓動協程直到其掛起;

主線程調用函數 co_eventloop 實現事件的監聽和協程的循環切換;

函數 readwrite_coroutine 在外層循環中將新創建的讀寫協程都加入到隊列 g_readwrite 中,此時這些讀寫協程都沒有具體與某個 socket 連接對應,可以將隊列 g_readwrite 看成一個 coroutine pool。當加入到隊列中之後,調用函數 co_yield_ct 函數讓出 CPU,此時控制權回到主線程。

主線程中的函數 co_eventloop 監聽網絡事件,將來自於客戶端新進的連接交由協程 accept_co 處理,關於 co_eventloop 如何喚醒 accept_co 的細節我們將在後續介紹。accept_co 調用函數 accept_routine 接收新連接,該函數的流程如下:

檢查隊列 g_readwrite 是否有空閒的讀寫 coroutine,如果沒有,調用函數 poll 將該協程加入到 Epoll 管理的定時器隊列中,也就是 sleep(1000) 的作用;

調用 co_accept 來接收新連接,如果接收連接失敗,那麼調用 co_poll 將服務端的 listen_fd 加入到 Epoll 中來觸發下一次連接事件;

對於成功的連接,從 g_readwrite 中取出一個讀寫協程來負責處理讀寫;

再次回到函數 readwrite_coroutine 中,該函數會調用 co_poll 將新建立的連接的 fd 加入到 Epoll 監聽中,並將控制流程返回到 main 協程;當有讀或者寫事件發生時,Epoll 會喚醒對應的 coroutine ,繼續執行 read 函數以及 write 函數。

上面的過程大致說明了控制流程是如何在不同的協程中切換,接下來我們介紹具體的實現細節,即如何通過 Epoll 來管理協程,以及如何對系統函數進行改造以滿足 libco 的調用。

八、通過 Epoll 管理和喚醒協程

Epoll 監聽 FD

上一章節中介紹了協程可以通過函數 co_poll 來將 fd 交由 Epoll 管理,待 Epoll 的相應的事件觸發時,再切換回來執行 read 或者 write 操作,從而實現由 Epoll 管理協程的功能。co_poll 函數原型如下:

int co_poll(stCoEpoll_t *ctx, struct pollfd fds[],
nfds_t nfds, int timeout_ms)

stCoEpoll_t 是爲 libco 定製的 Epoll 相關數據結構,fds 是 pollfd 結構的文件句柄,nfds 爲 fds 數組的長度,最後一個參數表示定時器時間,也就是在 timeout 毫秒之後觸發處理這些文件句柄。這裏可以看到,co_poll 能夠同時將多個文件句柄同時加入到 Epoll 管理中。我們先看 stCoEpoll_t 結構:

struct stCoEpoll_t
{
int iEpollFd; // Epoll 主 FD
static const int _EPOLL_SIZE = 1024 * 10; // Epoll 可以監聽的句柄總數
struct stTimeout_t *pTimeout; // 時間輪定時器
struct stTimeoutItemLink_t *pstTimeoutList; // 已經超時的時間
struct stTimeoutItemLink_t *pstActiveList; // 活躍的事件
co_epoll_res *result; // Epoll 返回的事件結果
};

以 stTimeout_ 開頭的數據結構與 libco 的定時器管理有關,我們在後面介紹。co_epoll_res 是對 Epoll 事件數據結構的封裝,也就是每次觸發 Epoll 事件時的返回結果,在 Unix 和 MaxOS 下,libco 將使用 Kqueue 替代 Epoll,因此這裏也保留了 kevent 數據結構。

```clike
struct co_epoll_res
{
int size;
struct epoll_event *events; // for linux epoll
struct kevent *eventlist; // for Unix or MacOs kqueue
};

co_poll 實際是對函數 co_poll_inner 的封裝。我們將 co_epoll_inner 函數的結構分爲上下兩半段。在上半段中,調用 co_poll 的協程 CC 將其需要監聽的句柄數組 fds 都加入到 Epoll 管理中,並通過函數 co_yield_env 讓出 CPU;當 main 協程的事件循環 co_eventloop 中觸發了 CC 對應的監聽事件時,會恢復 CC 的執行。此時,CC 將開始執行下半段,即將上半段添加的句柄 fds 從 epoll 中移除,清理殘留的數據結構,下面的流程圖簡要說明了控制流的轉移過程:

有了上面的基本概念,我們來看具體的實現細節。co_poll 首先在內部將傳入的文件句柄數組 fds 轉化爲數據結構 stPoll_t,這一步主要是爲了方便後續處理。該結構記錄了 iEpollFd,ndfs,fds 數組,以及該協程需要執行的函數和參數。有兩點需要說明的是:

1、對於每一個 fd,爲其申請一個 stPollItem_t 來管理對應 Epoll 事件以及記錄回調參數。libco 在此做了一個小的優化,對於長度小於 2 的 fds 數組,直接在棧上定義相應的 stPollItem_t 數組,否則從堆中申請內存。這也是一種比較常見的優化,畢竟從堆中申請內存比較耗時;

2、函數指針 OnPollProcessEvent 封裝了協程的切換過程。當傳入指定的 stPollItem_t 結構時,即可喚醒對應於該結構的 coroutine,將控制權交由其執行;

co_poll 的第二步,也是最關鍵的一步,就是將 fd 數組全部加入到 Epoll 中進行監聽。協程 CC 會將每一個 epoll_event 的 data.ptr 域設置爲對應的 stPollItem_t 結構。這樣當事件觸發時,可以直接從對應的 ptr 中取出 stPollItem_t 結構,然後喚醒指定協程。

如果本次操作提供了 Timeout 參數,co_poll 還會將協程 CC 本次操作對應的 stPoll_t 加入到定時器隊列中。這表明在 Timeout 定時觸發之後,也會喚醒協程 CC 的執行。當整個上半段都完成後,co_poll 立即調用 co_yield_env 讓出 CPU,執行流程跳轉回到 main 協程中。

從上面的流程圖中也可以看出,當執行流程再次跳回時,表明協程 CC 添加的讀寫等監聽事件已經觸發,即可以執行相應的讀寫操作了。此時 CC 首先將其在上半段中添加的監聽事件從 Epoll 中刪除,清理殘留的數據結構,然後調用讀寫邏輯。

九、定時器實現

協程 CC 在將一組 fds 加入 Epoll 的同時,還能爲其設置一個超時時間。在超時時間到期時,也會再次喚醒 CC 來執行。libco 使用 Timing-Wheel 來實現定時器。關於 Timing-Wheel 算法,可以參考,其優勢是 O(1) 的插入和刪除複雜度,缺點是隻有有限的長度,在某些場合下不能滿足需求。

回過去看 stCoEpoll_t 結構,其中 pTimeout 代表時間輪,通過函數 AllocateTimeout 初始化爲一個固定大小(60 1000)的數組。根據 Timing-Wheel 的特性可知,libco 只支持最大 60s 的定時事件。而實際上,在添加定時器時,libco 要求定時時間不超過 40s。成員 pstTimeoutList 記錄在 co_eventloop 中發生超時的事件,而 pstActiveList 記錄當前活躍的事件,包括超時事件。這兩個結構都將在 co_eventloop 中進行處理。

下面我們簡要分析一下加入定時器的實現:

int AddTimeout( stTimeout_t apTimeout, stTimeoutItem_t apItem,
unsigned long long allNow )
{
if( apTimeout->ullStart == 0 ) // 初始化時間輪的基準時間
{
apTimeout->ullStart = allNow;
apTimeout->llStartIdx = 0; // 當前時間輪指針指向數組0
}
// 1. 當前時間不可能小於時間輪的基準時間
// 2. 加入的定時器的超時時間不能小於當前時間
if( allNow < apTimeout->ullStart || apItem->ullExpireTime < allNow )
{
return __LINE__;
}
int diff = apItem->ullExpireTime - apTimeout->ullStart;
if( diff >= apTimeout->iItemSize ) // 添加的事件不能超過時間輪的大小
{
return __LINE__;
}
// 插入到時間輪盤的指定位置
AddTail( apTimeout->pItems +
(apTimeout->llStartIdx + diff ) % apTimeout->iItemSize, apItem );
return 0;
}

定時器的超時檢查在函數 co_eventloop 中執行。

十、EPOLL 事件循環

main 協程通過調用函數 co_eventloop 來監聽 Epoll 事件,並在相應的事件觸發時切換到指定的協程執行。有關 co_eventloop 與 應用協程的交互過程在上一節的流程圖中已經比較清楚了,下面我們主要介紹一下 co_eventloop 函數的實現:

上文中也提到,通過 epoll_wait 返回的事件都保存在 stCoEpoll_t 結構的 co_epoll_res 中。因此 co_eventloop 首先爲 co_epoll_res 申請空間,之後通過一個無限循環來監聽所有 coroutine 添加的所有事件:

for(;;)
{
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
...
}

對於每一個觸發的事件,co_eventloop 首先通過指針域 data.ptr 取出保存的 stPollItem_t 結構,並將其添加到 pstActiveList 列表中;之後從定時器輪盤中取出所有已經超時的事件,也將其全部添加到 pstActiveList 中,pstActiveList 中的所有事件都作爲活躍事件處理。

對於每一個活躍事件,co_eventloop 將通過調用對應的 pfnProcess 也就是上圖中的 OnPollProcessEvent 函數來切換到該事件對應的 coroutine,將流程跳轉到該 coroutine 處執行。

最後 co_eventloop 在調用時也提供一個額外的參數來供調用者傳入一個函數指針 pfn。該函數將會在每次循環完成之後執行;當該函數返回 -1 時,將會終止整個事件循環。用戶可以利用該函數來控制 main 協程的終止或者完成一些統計需求。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/zC7Wa32aJo-22tAlEAf6dA