c-- 異步框架 workflow 分析

簡述


workflow 項目地址 :https://github.com/sogou/workflow

workflow 是搜狗開源的一個開發框架。可以滿足絕大多數日常服務器開發,性能優異,給上層業務提供了易於開發的接口,卻只用了少量的代碼,舉重若輕,而且代碼整潔乾淨易讀。

搜狗官方宣傳強調,workflow 是一個異步任務調度編程範式,封裝了 6 種異步資源:CPU 計算、GPU 計算、網絡、磁盤 I/O、定時器、計數器,以回調函數模式提供給用戶使用,概括起來實際上主要是兩個功能:1、屏蔽阻塞調用的影響,使阻塞調用的開發接口變爲異步的,充分利用計算資源;2、框架管理線程池,使開發者迅速構建並行計算程序。

往往單臺機器要服務於千千萬萬終端,我們最希望服務器資源都能充分利用,然而計算資源和 I/O 資源天然的效率不對等,使我們不得不採用一些其他技術手段實現基礎資源充分利用。所謂 I/O 資源包括文件 I/O 和網絡 I/O,此外很多時候我們需要定時執行某段邏輯,同樣不希望等待時間阻塞計算資源的使用。

所以框架最基礎的功能,是要爲上層開發人員屏蔽底層資源的不對稱,使我們可以方便的開發業務邏輯而不需要把很多精力放在底層。

如何擬合計算資源和 io 資源

我們希望 io 等待或其他阻塞的時間,cpu 還能充分利用,執行一些任務。這要求發起 io 的線程不能調用阻塞接口原地等待,而是要切出去,往往採用 I/O 多路複用或者異步 I/O 的方式,分別對應 reactor 模型和 proactor 模型

對於網絡 I/O,linux 系統下缺乏對異步 I/O 的支持,即使近兩年有了 iouring,支持了異步 io,但性能上相對 epoll 未必會有多少提升,而且一切都交給系統調度,可控性上大大降低;另外開發難度也更大。反觀 epoll,無論系統的支持還是相關設計模型都非常成熟了,所以近一二十年底層大都採用 epoll,以 reactor 模式實現,reactor 統一處理請求,將就緒的任務轉給下游的處理器。根據業務不同,又有幾種不同實現方式,有的就單線程之內調度,單線程循環處理(如 redis),適合業務邏輯不復雜的場景;有的會單 reactor 處理請求,並通過消息隊列把請求轉發給下游多線程業務邏輯處理器處理;有的多線程多 reactor 處理請求,並通過消息隊列將任務分發給下游 handler,單 reactor 模式可以認爲是這種模式的特例,workflow 便以這種方式實現。

對於文件 I/O,linux 下有兩種異步 I/O 的支持,posix aio(glibcaio)和 linux 原生 aio,其中前者是一個通過多線程的異步,模擬的異步 io,性能極差;linux 原生 aio 是真正的 aio,但是要求 fd 只能以 O_DIRECT 方式打開,所以只適用於文件 I/O,workflow 中支持了這種方式處理文件 I/O。

對於定時器,常見的方式,有的通過 epoll 每次阻塞設置阻塞時間,用戶態管理定時器(如 redis);而 epoll 也支持時間事件,有的直接使用時間事件,workflow 便採用這種方式。

提供給用戶的接口

計算資源得以充分利用,還需要考慮給用戶提供什麼樣的接口,讓上層開發者能減少心智負擔,比如,以協程的方式,讓用戶像開發串行程序一樣開發異步程序,順序的寫邏輯;亦或者是提供讓用戶註冊回調的方式開發異步程序。workflow 中提出了子任務的概念,以任務的方式提供給用戶。

子任務定義了一種管理回調的方式,用串行並行來組織子任務調度。用戶可以把邏輯寫在任務裏,交給框架去調度。

把阻塞的任務交給 epoll 去異步調用,計算任務交給線程池去異步執行,以至於所有的任務都是異步調起的,這種設計思想,就是 workflow 被稱爲 “異步任務調度框架” 的原因。

代碼分析


根據上面的分析,對一般服務器框架結構已經有了一個整體認識。下面按這個順序,底層基礎數據結構——》純計算任務和 Reactor 層——》任務組織調度層——》用戶接口層,分四個層次逐步分析一下 workflow 的實現。

基礎數據結構

workflow 使用到的基礎數據結構:鏈表、紅黑樹、消息隊列、線程池,workflow 中這四個結構的設計都非常的精緻。

鏈表(見文件 list.h)

workflow 中的鏈表貌似引自 linux 內核,實現了一種非常非常靈活的鏈表,甚至鏈表串起的不同節點之間可以是不同的數據結構

一般來說一個普通的鏈表節點如下:

struct ListNode
{
    ListNode * prev_ = nullptr;
    ListNode * next_ = nullptr;
    void * p_value_ = nullptr;
};

定義節點時定義好數據段 p_value_,這樣的話數據結構的實現就會與業務邏輯結合在一起。

這裏不使用模板也實現了預定義獨立於業務邏輯的鏈表數據結構。

鏈表的節點:// 這是一個雙鏈表

struct list_head {
    list_head *next, *prev;
};

可以把鏈表嵌入到任何一個數據結構中,

那如何通過鏈表節點拿到當前所在結構呢?

通過一個宏來實現:

#define list_entry(ptr, type, member) \
((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))

簡單解釋下這個宏:ptr 表示鏈表節點指針,type 是當前節點數據結構類型名,member 是鏈表節點在數據結構中的成員名

&((type *)0)->member) 把指向地址空間起點的指針(空指針)轉化成指向節點數據結構的指針,然後取鏈表節點成員名,再取地址,就可以取到鏈表節點在這個數據結構中的偏移量。

ptr 是鏈表節點指針,按 (char *) 減去偏移量,就可以回退到結構起始位置。再把這個位置轉化成(type *). 就取到了指向當前數據結構的指針。

看接口甚至可以發現,當我想把當前數據結構從鏈表裏刪除的時候,甚至不需要拿到鏈表,而是直接通過 list_del(list_head * current_node) 函數傳入當前節點就可以刪除,靈活的一塌糊塗。

並且提供了遍歷鏈表的接口宏:

#define list_for_each(pos, head) \
for (pos = (head)->next; pos != (head); pos = pos->next)

每一行代碼都極其簡潔乾淨,妙到毫巔!

其他鏈表基礎知識不多贅述。

紅黑樹(見 rbtree.h/.c)

與鏈表類似,紅黑樹也使用了內核紅黑樹。

相同的風格,每個節點只有鏈接指針和節點顏色字段,而沒有數據。

struct rb_node
{
    struct rb_node *rb_parent;
    struct rb_node *rb_right;
    struct rb_node *rb_left;
    char rb_color;
#define RB_RED      0
#define RB_BLACK    1
};

當把紅黑樹 node 嵌入數據結構中之後,使用同樣原理的宏,來獲取節點所在結構的指針:

#define rb_entry(ptr, type, member) \       // 包含ptr的結構體指針
    ((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))

比較特別的是,由於節點不包含數據,數據結構不知道節點之間如何比較大小,所以需要用戶自己定義查找、插入函數,但給出了例子。

消息隊列(見 msgqueue.h/.c)

這裏實現了一個消息隊列,也是正常的提供一個 put 接口,供生產者 reactor 生產數據插入消息,一個 get 接口,傳遞給下游 handler 消費,消息隊列有消息上限,並提供阻塞和非阻塞兩種模式,阻塞模式下,當消息超過上限生產線成阻塞,等待消息小於上限了再插入。通過條件變量使沒有待處理的消息時,阻塞消費線程,於內核態等待消息出現。這裏的生產者和消費者都是多線程的,所以需要考慮線程安全,消息隊列的常見實現是一個數據存儲段,一個鎖,一個條件變量,而 workflow 中的消息隊列的高妙之處就在於,他有兩個鎖,兩個條件變量,兩個數據空間,雙倍快樂。

struct __msgqueue
{
    size_t msg_max;
    size_t msg_cnt;
    int linkoff;
    int nonblock;
    void *head1;
    void *head2;
    void **get_head;
    void **put_head;
    void **put_tail;
    pthread_mutex_t get_mutex;
    pthread_mutex_t put_mutex;
    pthread_cond_t get_cond;
    pthread_cond_t put_cond;
};

這裏使用了一個小技巧,大幅提升消息隊列性能,兩個數據段一個專門用來 get,一個專門用來 put,兩把鎖兩個條件變量,分別 put 時候和 get 時候使用。這樣的好處就是 get 和 put 操作之間幾乎互不干擾。put 操作不會鎖消費線程。get 操作絕大多數情況下不會鎖生產線程。

只有當 get 鏈表爲空時,纔會把 put 和 get 全鎖住,對兩個鏈表頭進行交換,極大的減少了生產線程和消費線程之間爭奪鎖產生的相互影響。

這裏還有一個點就是消息隊列要求節點是自帶鏈表字段的,並指定鏈接節點相對於結構頭的偏移量(linkoff)。所以插進來的節點 msg 的結構是 poller_result 但是實際結構是 poller_node 強轉過來的,再對比這兩個結構體,發現前三個成員是一致的,而第四個成員就是鏈接節點。

struct poller_result
{
    int state;
    int error;
    struct poller_data data;
};
struct __poller_node
{
    int state;
    int error;
    struct poller_data data;
#pragma pack(1)
    union
    {
        struct list_head list;
        struct rb_node rb;
    };
#pragma pack()
    ...
};

線程池(見 thrdpool.h/.c)

線程池實現的功能往往是創建一系列工作線程,工作線程執行線程回調函數,從消息隊列中取任務並執行,當消息隊列中沒有任務時,等待任務出現。

workflow 中的線程池就是這樣一個很標準的線程池,同時很靈活的讓邏輯脫離於線程池,線程回調函數並非實際要執行的邏輯,而是從消息隊列裏 get 出的 task,是一個包含了要執行的回調和上下文的 task,線程回調函數執行了這個 task。

struct thrdpool_task
{
  void (*routine)(void *);
  void *context;
};

這樣實現一個效果,就是可以運行時才動態決定要執行什麼邏輯,即每個 task 可以是不同的任務,靈活度大大提升。

基礎數據結構主要就這四種,這裏只分析了其設計中比較可圈可點的部分,而沒有仔細講一些簡單的基礎細節。

純計算任務和 Reactor 調度層

把阻塞的任務交給 epoll 去異步調用,計算任務交給線程池去異步執行,實現所有任務的異步調度,下面分別看看計算任務和 reactor。

純計算任務

WorkFlow 由框架統一管理原始任務線程池,單例__ExecManager 內有一個單的封裝,優雅的實現對線程池的管理。

這一層有三個新概念:

ExecQueue 是一個有鎖鏈表隊列;

ExecSession 的 execute() 接口由派生出來的任務自己去定義需要執行的邏輯。

Executor 類,創建並管理線程池,提供 request() 方法,request 方法把對應任務放入到線程池去執行。request 的參數有兩個,分別是當前 session 和所在的 ExecQueue,如果 queue 裏面只有這一個 session,則把這個 session 放入 Executor 管理的線程池裏裏執行,如果不是首個任務,則只要放入隊列裏就行了,線程 routine 會調度當前隊列中所有的任務進入線程池執行,並用 ExecQueue 中的鎖保持隊列中任務調度的同步性。

Executor::executor_thread_routine 是線程執行 routine,一共做了兩件事:

第一步會遞歸的調度所有當前 Queue 中的任務進線程池,並用 ExecQueue 中的鎖保持隊列中任務調度的同步性;

第二步是執行當前 session,並由 session 自己保持數據同步。

Reactor:

這裏主要涉及四個文件 poller.h/.c mpoller.h/.c Communicator.h/.cc CommScheduler.h/.cc

其中 poller 是對 epoll 的封裝,mpoller 又集成多個 poller 線程;Communicator 顧名思義,就是通信器,封裝了 mpoller 和線程池;CommScheduler 是對 Communicator 的封裝,全局唯一,最後創建在__CommManager 中,通過 WFGlobal 暴露出來。

這一層主要完成了右圖所示的工作,poller 線程把 epoll 事件做初加工處理,生成一個 poller_result,設置需要 handle 的類型,然後把處理結果 put() 進消息隊列,給工作線程去處理。handler 線程等待任務,當隊列裏有任務時,根據任務的 operation 類型做相應處理。

poller

poller.h/.c 提供了 poller 的創建、啓動、stop、poller_add、poller_del、poller_mod 和 add_timer 的接口。

poller_create 創建了 poller 數據結構,分配了 poller_node 的指針數組 nodes,這裏的 nodes 是一個以 fd 爲下標的數組,這時候只有一個指針數組,node 還沒有創建,node 是在 poller_add 的時候創建的,創建 node 的時候會檢查監聽的操作是否需要 result,需要的話同時分配 result 空間。但這時候 poller 線程還沒有跑起來,執行 poller_start 時將 poller 線程跑起來;poller_add、poller_del、poller_mod 分別是 epoll 的增加節點、刪除節點、改變監聽事件 三種操作的簡單封裝;add_timer 增加時間事件,

前面說過消息隊列裏面裝的是 poller_result(poller_node),poller_result 裏面都會有一個 poller_data。

#define PD_OP_READ          1
#define PD_OP_WRITE         2
#define PD_OP_LISTEN        3
#define PD_OP_CONNECT       4
#define PD_OP_SSL_READ      PD_OP_READ
#define PD_OP_SSL_WRITE     PD_OP_WRITE
#define PD_OP_SSL_ACCEPT    5
#define PD_OP_SSL_CONNECT   6
#define PD_OP_SSL_SHUTDOWN  7
#define PD_OP_EVENT         8
#define PD_OP_NOTIFY        9
#define PD_OP_TIMER         10
struct poller_data
{
    short operation;
    unsigned short iovcnt;
    int fd;
    union
    {
        SSL *ssl;
        void *(*accept)(const struct sockaddr *, socklen_t, int, void *);
        void *(*event)(void *);
        void *(*notify)(void *, void *);
    };
    void *context;//CommService或CommConnEntry
    union
    {
        poller_message_t *message;
        struct iovec *write_iov;
        void *result;
    };
};

poller_data 封裝了需要處理的 fd、對應的操作(operation)、上下文(可能是 CommService 或 CommConnEntry)。

poller 的核心是 poller_thread,poller_start 的時候啓動了是一個 poller_thread,poller_thread 處理的是 epoll_event,主流程是一個經典的雙循環,外層循環 epoll_wait,每次最多處理 256 個 fd,epoll 返回後,再根據每個 epoll_event 事件的類型,循環處理每個類型的事件,從枚舉可以看到對當前 node 的操作有讀、寫、listen、connect、timer 等等,不管是什麼類型的 epoll 事件,poller_thread 處理的結果會生成一個. poller_result,並把這個結果插入到消息隊列中。

具體的操作非常的多了,不適合靜態分析,後面再動態分析請求的全流程。

poller 的操作都是線程安全的,mpoller 啓動多個線程的時候也可以直接使用。

mpoller

可以看到實際上使用的並不是 poller 而是 mpoller,mpoller 是對多線程 poller 的封裝,一個 mpoller 包括至少一個 poller,實際配幾個線程就創建幾個 poller,並統一分配 poller_node,所有 poller 共享 poller_node 數組。實際使用的時候可以根據運算核心數和業務邏輯的複雜程度調整 poller_thread 和 handler_thread 的配比。mpoller 的 add、del、mod 接口會對傳入的 fd 對線程數求模,將 fd 均勻的分配到各個 poller。

關於數據同步

可以看到對 fd 的 [] 操作並沒有加鎖,以 mpoller_add 爲例

static inline int mpoller_add(const struct poller_data *data, int timeout,
                              mpoller_t *mpoller)
{
    unsigned int index = (unsigned int)data->fd % mpoller->nthreads;
    return poller_add(data, timeout, mpoller->poller[index]);
}

第 4 行計算 index,fd 和 nthreads 都是不會發生變化,不會修改的,線程之間無衝突,所以不需要加鎖。

第 5 行由 poller_add 來保證線程安全,每個 poller 中都有一個鎖,poller_add、poller_del、poller_mod 的操作都是加鎖的,因爲這三種操作都可能發生在不同的線程。

Communicator

Communicator 是通訊器,是底層和業務層的樞紐,創建了 mpoller 和 handler 線程池,初始化時候啓動兩個線程池,bind 的時候會把服務綁到 communicator 上,把服務創建的 listen_fd 放入到 poller 中開始監聽。handler_thread 就是在 Communicator 中啓動的,handler_thread 從消息隊列裏拿到的是 poller_result,handler_thread 做的是拿到任務以後根據 poller_result::poller_data::operation 類型做相應處理。

相關的結構有:

鏈接:

class WFConnection : public CommConnection 創建的鏈接

對端:

CommTarget 通訊目標,封裝了對端的地址、port、超時時間

消息:

struct __poller_message
{
    int (*append)(const void *, size_t *, poller_message_t *);
    char data[0];   // 柔性數組
};
class CommMessageIn : private __poller_message
{
private:
    virtual int append(const void *buf, size_t *size) = 0;
    struct CommConnEntry *entry;
};
class CommMessageOut
{
private:
    virtual int encode(struct iovec vectors[], int max) = 0;
};

很明顯 CommMessageIn 是一次通信中的輸入消息,CommMessageOut 是返回的消息的基類,輸入消息的基類是__poller_message,這裏又使用了一個 c 程序員常用的小技巧,成員 char data[0] 是一個柔性數組,把__poller_message 變成了一個變長結構體。

結構體中末尾成員是一個長度爲 0 的 char 數組,這樣聲明看起來和 char *data 是一樣的,但是這樣寫相對於 char 指針有一些優勢。

對比如下結構,考慮__poller_message_test 和__poller_message 有什麼區別

struct __poller_message_test
{
    int (*append)(const void *, size_t *, poller_message_t *);
    char *data;   // char指針
};

首先,數組長度是 0,說明沒分配空間。所以 64 位系統中,sizeof(struct __poller_message_test) == 16 而 sizeof(struct __poller_message) == 8。其次,如果使用一個 char 指針, 需要爲指針分配內存。而使用 data[0] 則不需要二次給指針分配內存,直接爲結構分配適量大小內存即可,成員 data 會自動指向結構尾部的下一個字節。

輸入消息有一個 append 的虛方法,子類自己去定義如何反序列化,輸出消息有一個 encode 的虛方法,子類消息自己去定義序列化發送消息。基類__poller_message 中的函數指針會被賦值爲 Communicator::append(const void *buf, size_t *size, poller_message_t *msg),實際運行時由函數指針 append 去調用各子類消息的 virtual int append(const void *buf, size_t *size) 對消息進行反序列化。

框架內已經定義好一些常用協議了:

會話:CommSession

CommSession 封裝了一次會話所有組成單位,包括輸入 / 輸出消息、CommConnection、CommTarget

定義了消息的生產方式

服務器:CommService

類圖:

class WFServerBase : protected CommService 服務器的抽象。封裝了服務器地址、監聽套接字、活躍鏈接和連接數、服務器參數。

基類定義了 newsession、newconnect 接口。WFServerBase 類中實現了服務啓動 start()、停止 stop()、創建 / 刪除鏈接 newconnect()。

WFServer 是一個模板類,模板參數是輸入輸出消息類型,可以實例化爲各種類型的服務器,不同類型的服務器就是消息類型不同的服務器實例化,因爲不同類型服務器實例消息類型不同,處理消息方式也不同,WFServer 中保存了處理消息方式的回調——processer,並在服務創建的時候初始化。在 WFServer 中定義 session 創建方式 new_session() 的時候,會用 processer 來創建 task,process 實際上是 task 的處理方式。

服務 Start() 的時候會被 bind() 到全局的 Communicator 上,包括創建 fd、bind、listen、放入 epoll 監聽,成爲 epoll 監聽的第一個 fd。服務實際上是交給 Communicator 創建的 handler_thread 線程池來驅動起來的。

Entry:CommConnEntry

打包了所有一次會話需要的上下文,包括 poller、servide、session、target、socket 等,處理 accept 事件(handle_listen_result)的時候由 Communicator::accept_conn 創建,創建後放在 poller_data 中,mpoller_add 監聽

Communicator:

有了上面這些基礎結構,Communicator 就是一個完全體了,Communicator 初始化的時候,啓動了 poller_thread、handler_thread 驅動服務進行消息處理。

以示例代碼的 hello_world 程序爲例,觀察一次網絡請求過程,看看 poller_thread 和 handler_thread 分別都做了什麼。

從 hello_world 啓服到線程工作:

這裏特別看一下 poller_add 的時候創建了 poller_node 實體, poller_node 中有一個成員 struct __poller_node *res,__poller_data_get_event() 的時候會返回一個 bool 值,表示是否需要創建 res。可以看到操作類型爲 listen 的情況。是需要 res 的。

經過這個過程,服務器就啓動開始接受請求了,service 創建 listen_fd 交由 poller 管理,當監聽到有客戶端鏈接時,accept+read。下面分析接收到一個請求時,poller_thread 和 handler_thread 分別做了什麼。

poller_thread 知道 listenfd 可讀,則 accept 一個 readfd,創建了對端 target,把這個 poller_result(poller_node) 放進消息隊列。

handler_thread 拿到這個 poller_result 之後,主要是創建了完整的 CommConnEntry,並把負責 read 的 poller_node 放入 epoll 監聽,等待內核緩衝區有數據可讀。

這裏有個細節,readfd 是無阻塞模式,因爲使用了 epoll 的邊緣觸發模式,即每個 fd 的狀態變化只通知一次,這樣的話需要把 readfd 上的數據全讀完,所以 readfd 必須設置成無阻塞模式,否則循環讀到最後肯定會被阻塞。

如果遇到 errorno==EAGAIN 則直接返回,因爲對於 fd 阻塞調用 eagain 表示提示重試,對於非阻塞 fd,errorno==EAGAIN 則表示緩衝區已經寫滿,直接 return 本次處理結束。

readfd 放入 epoll 之後,readfd 上有數據到來後會被操作系統拷進內核緩衝區,然後 epoll 提示 readfd 可讀。poller_thread 會進入處理可讀事件 (handle_read)。

poller_thread 對可讀事件的處理主要是把字節流讀出來,並反序列化,放入隊列提供給 handler_thread,handler_thread 調 service 處理業務邏輯。

handler 對收到的消息的處理分兩種情況,如果是服務端,當做請求處理,如果是客戶端,當回覆處理,所以 hello_world 程序進入請求處理流程。

服務器對請求的處理是創建服務對應類型的 CommRequest,helloworld 中實際是執行了一個 WFHttpServerTask。

繼承關係:WFHttpServerTask——>WFServerTask——>WFNetworkTask——>CommRequest——>SubTask,CommSession。

SubTask 和 CommSession 後面再仔細分析,這裏先從字面理解,SubTask 就是任務,就是處理自定義邏輯的過程,CommSession 是會話。那 handle 的時候會先調用當前 Task 的 processor.dispatch() 執行任務,任務執行完自動 subtask_done() 的時候會調用 scheduler->reply(),將結果返回 Send_message()。可以看到 Send_message 是先嚐試同步寫,如果同步寫失敗了,再嘗試異步寫,異步寫的過程就是先把文件描述符加入 epoll 監聽,等待可寫信號出現後,再寫入。寫的時候使用 iovec,聚集寫儘量減少拷貝次數。

至此 poller 事件各種 operation 的處理,已經分析過 PD_OP_READ、PD_OP_WRITE、PD_OP_LISTEN,再通過 wget 看一下 PD_OP_CONNECT。

connect 主要是處理客戶端鏈接服務端時,服務端無法立刻建立鏈接時的等待,異步等待屏蔽等待時間。

request 的時候會優先檢查目標上有沒有 idle 鏈接,如果有的話直接複用,如果沒有會創建 connect,conn_fd 是非阻塞的,operation 設置爲 PD_OP_CONNECT,放在 epoll 中管理,等待 fd 可用。

可以看到,是一個簡單的發送請求,等待結果的過程。

poller 事件共有 10 種 operation,這裏分析過讀、寫、connect、listen 四種流程,PD_OP_SSL_ACCEPT、PD_OP_SSL_CONNECT、PD_OP_SSL_SHUTDOWN 三個只是使用 openssl 庫時的創建和關閉鏈接。還有另外兩種事件:PD_OP_EVENT、PD_OP_NOTIFY,這兩種分別是 linux 和 mac 環境下處理異步文件 I/O 用的。

異步文件 I/O:

TODO

任務組織調度層


下面分析任務線程是如何執行任務的邏輯。這個層次有兩個核心基礎概念,一個是任務的抽象,一個是會話 (session) 的抽象,二者是所有執行邏輯的祖爺爺和祖奶奶。

任務:

前面看到對於請求的處理,實際是執行了 CommRequest,CommRequest 既是一個 SubTask 又是一個 CommSession,最後是通過執行的是 SubTask 的接口 dispatch() 執行起來的,這裏最重要的概念——子任務。workflow 裏面所有的邏輯,最後都是通過子任務執行起來的;子任務又可以通過各種組合關係,串並聯的組織起來。

這裏有四個重要的基本元素:

1,SubTask——子任務,是一切任務的祖先。

2、ParallelTask——並行任務,並行任務裏面管理 SubTask 數組,啓動時會把自己管理的 SubTask 一個一個全部 dispatch 一遍。

3、SeriesWork——串聯工作組,裏面管理了一個數組的子任務,逐個執行。

4、ParallelWork——並聯工作組,裏面管理了一個 SeriesWork 數組,其本身的祖先是一個 SubTask,所以他可以被 SeriesWork 管理。

這樣就實現了任務的串並聯執行甚至以 DAG 的形式複合。

下面逐一分析:

SubTask:
class SubTask{
public:
    virtual void dispatch() = 0;
private:
    virtual SubTask *done() = 0;
protected:
    void subtask_done();
private:
    ParallelTask *parent;
    SubTask **entry;
    void *pointer;
};

SubTask 是一切執行任務的祖先,不同的任務實現,實現不同的 dispatch() 和 done() 接口,提供兩個接口留給用戶自定義:

1、dispatch() 接口 就是執行任務,用戶任務自定義執行邏輯,而在執行結束後,必須調用 subtask_done()。

2、done() 接口 在任務邏輯執行結束後,由 subtask_done() 調起 done(),這個接口是用戶自定義的結束回調,在 done() 接口裏面回收資源,銷燬任務。done() 函數還會返回一個子任務的指針,噹噹前任務執行完還要執行下一個任務的時候,返回下一個任務,如果沒有下一個任務,則返回 nullptr。爲什麼這麼約定呢?這需要看一下 subtask_done() 函數的工作方式。

需要知道成員變量的意思才能明白調度方式:

pointer 一般指向當前所在 SeriesWork,SubWork 最後也是放在 SeriesWork 之中啓動起來的;

parent 當一個子任務被 ParallelTask 任務管理的時候,parent 指向被管理的並行任務。

entry 指向待執行任務數組的首位。

subtask_done():仔細解讀一下 subtask_done() 的工作方式:

void SubTask::subtask_done()
{
    SubTask *cur = this;
    ParallelTask *parent;
    SubTask **entry;
    while (1){
        parent = cur->parent;
        entry = cur->entry;
        cur = cur->done();    
        if (cur){
            cur->parent = parent;
            cur->entry = entry;
            if (parent)
                *entry = cur;
            cur->dispatch(); 
        }
        else if (parent) {
            if (__sync_sub_and_fetch(&parent->nleft, 1) == 0) {
                cur = parent;
                continue;
            }
        }
        break;
    }
}

可以看到先保存了當前任務的 parent 和 entry,然後直接調用了當前任務的 done() 接口。如果又返回了一個子任務,則調用新任務的 dispatch(),使其運行起來,dispatch() 到最後必然又會調用新任務的 subtask_done();從而遞歸執行這條線上所有任務,直至 done() 不會再返回任務;當不再返回任務時,說明 parent 的孩子都執行完,就可以繼續再往上執行 (parent 也是一個 SubTask),直至根任務執行完。

ParallelTask:

ParallelTask 是 SubTask 的兒子,結構很簡單,管理了一個 SubTask 數組,ParallelTask::dispatch() 的時候會把數組內管理的所有 SubTask 逐一 dispatch() 一遍,這樣的話就實現了同級任務的並列執行,特別注意並列執行不一定是並行,是否並行取決於調度。任務本身是順序 dispatch() 的,如果 dispatch 調度的時候把任務放入線程池執行任務就是並行的。

SeriesWork:

SeriesWork 是一個有鎖的線程安全隊列,隊列中存儲了需要按順序執行的 SubTask,預分配 4 個空間,如果入隊時隊列已滿,則像 vector 一樣拓展二倍空間。

SubTask 都是放到 SeriesWork 中執行的。SeriesWork 是怎麼調度執行任務的?啓動函數 Start(),會從第一個 SubTask 開始 dispatch(),可以看到多數任務 Task 的 done() 的實現都是返回 return series->pop(); 意思就是當前任務執行完了,返回當前所在的 SeriesWork 中的下一個任務,繼續執行,直至所有任務執行完。

注意 SeriesWork 本身不是一個 SubTask,所以無法被 SeriesWork 管理。

ParallelWork:

ParallelWork 稍微複雜一點

繼承關係:ParallelWork——>ParallelTask——>SubTask

可見:1、ParallelWork 是一個 SubTask,所以可以被 SeriesWork 管理;2、ParallelWork 同時也是一個 ParallelTask,管理了一個數組的 SubTask;3、ParallelWork 管理了一個 SeriesWork 數組,這個數組的長度和 SubTask 數組的長度相同。並且讓 SubTask 指向同索引 SeriesWork 的首個 SubTask。

ParallelWork 是怎樣啓動和調度任務的:

ParallelWork 本身是一個 SubTask,所以啓動時把他放入一個 SeriesWork,作爲 SeriesWork 的 firsttask 被調起 dispatch();然後 ParallelWork 本身是一個 ParallelTask,dispatch 的時候會把其下管理的所有的 SubTask 逐個啓動 dispatch();如圖,SubTask 指向的實際是管理的 SeriesWork 的 first Task,所以實際上相當於啓動了管理的所有 SeriesWork。

這四個結構就是整個任務調度的基石,所有的邏輯都是作爲任務執行起來的。並行任務管理串行任務,串行任務管理 SubTask(並行任務也是 SubTask),這套設定使任務可以自由複合 DAG 複合。

這時可以明白這個框架名字所謂 WorkFlow,其核心就是組織任務的執行流,所有的執行邏輯都是任務。

會話 (session):

想要執行的邏輯,通過成爲 SubTask 可以啓動起來,並按一定的順序調度,那具體做的事,則被抽象爲會話。

基礎 session 有四種:CommSession、ExecSession、IOSession、SleepSession,分別代表網絡操作、運算操作、I/O 操作、睡眠操作,session 都需要實現 handle() 接口,所有最後執行的任務都是這四種操作派生出來的。

SubTask 這個大渣男分別和四種 session 結合生成了 CommRequest、ExecRequest、SleepRequest、IORequest,使得所有的 request 都可以被作爲子任務調度,都有 state 和 error。

四種 request 分別派生出了 WFNetWorkTask、WFThreadTask、WFTimerTask、WFFileTask。其中 WFNetWorkTask 和 WFThreadTask 都是兩個參數的模板類。對通信任務來說,參數是請求消息和回覆消息,對於計算任務來說參數是輸入和輸出,WFReduceTask、WFSortTask、WFMergeTask 是不用參數的的實例化,WFHttpTask、WFRedisTask、WFMysqlTask、WFKafkaTask 只不過是不同協議的 WFNetWorkTask 的實例化。

CommRequest 派生了 WFNetworkTask;ExecRequest 派生了 WFThreadTask,二者都加入了輸入輸出模板參數,和一些控制參數,提供了方便的啓動多線程任務和網絡任務的方式。更有 WFMultiThreadTask 任務,批量管理多線程任務。

這裏還有一個 WFTimerTask,實現了不佔線程的定時功能.。

WFTimerTask:

WFTimerTask 可以讓任務休眠一定時長後執行,不佔線程,達到時長之後返回執行回調,就是定時任務。

如果一個 WFTimerTask 被直接 start(),則創建一個 SeriesWork,並 dispatch() 起來,如果是串在其他的 SeriesWork,當執行到這個 task 的時候直接 dispatch()。

當 SleepRequest 被 dispatch() 時候,實際是調用當前 scheduler(即 communicator) 的 sleep(),實際是取出當前 WFTimerTask 的休眠時間,然後創建一個定時任務 mpoller_add_timer 交給 epoll 管理,等 epoll 提示時間到了,再切回來執行。

層次結構:

借用一張官圖非常清楚的表達清楚任務之間的層次關係。

用戶接口

至此,底層支持都分析過了,下面看看通過這些底層結構可以組織出什麼花樣。

其他 Tasks

WFCounterTask:

CounterTask 是一個計數器 Task,任務裏保存了一個原子的 unsigned 用來計數,初始化時候傳入需要記的個數,每次任務被 dispatch() 的時候,計數器減一,直到計數器爲 0 時,執行回調,配合一個阻塞信號量,可以實現一批並行任務的統一等待,如: WaitGroup。

可能是覺得手動創建 CounterTask 不夠優雅,框架還創建了 CounterTask 管理器,用一個紅黑樹以名字爲 key 統一管理 CounterTask,可以通過名字全局操作 CounterTask。

WaitGroup

既然說到了就順便說一下 WaitGroup。

WaitGroup 實現了阻塞等待多個任務完成的效果。

WaitGroup 由一個原子的等待個數,一個 WFCounterTask 和一個 std::future 組成。構造時創建一個 std::promise,並綁定到 future 上;創建一個計數 1 的 CounterTask 並註冊回調,回調中時給 promise->setvalue()。

每次調用 done 會給剩餘個數減一,當減完時,counter->done(),這時回調會告訴 futrue,所有任務都完成了,阻塞結束。

WFGraphNode 和 WFGraphTask:

WFGraphTask 實現了將任務迅速的組織成有向無環圖的方法,一個 WFGraphTask 管理了一張由多個 WFGraphNode 組成。

WFGraphNode 是一個 WFCounterTask,並加入了一個 WFGraphNode * 列表:follower,follower 表達了鄰接關係,保存的就是依賴當前任務的下游節點。因爲是 counter 任務,所以具有計數的功能,記的數就是當前 Node 的入度。在當前任務執行完之後,會把所有下游節點都 dispatch(計數)一次,當計數減少到 0 時,說明當前 Node 所有依賴已經完成了,就把當前 graphNode 上掛的 SeriesWork 執行起來。

依賴處理:當一個 node1 依賴 Node2 時候,Node2 的下游節點列表里加入 Node1,Node1 的入度自增。

執行處理:當 Node2 執行完,Node1 的入度減一。

框架的重載了 GraphNode 的自增運算符和大於號、小於號,自增運算符返回 Node 本身。大於號、小於號運算符調用依賴關係函數。從而很形象的可以通過如下語法表達節點之間的依賴關係:

a-->b;
    a-->c;
    b-->d;
    c-->d;

是不是很秀?簡直妙不可言

再說一個細節:DAG 建立起來了,但是 Node 上是怎麼掛的任務呢?

答:創建 WFGraphNode 通過統一接口:WFGraphNode& WFGraphTask::create_graph_node(SubTask *task),創建的時候傳入你想要執行的任務,然後把要執行的任務和當前 Counter 任務串在一個 Series 裏面。噹噹前 Node 計數器第一次變 0 的時候,會調到 Done(),看一下關鍵的 done() 實現:

SubTask *WFGraphNode::done()
{
    SeriesWork *series = series_of(this);
    if (!this->user_data)//首次done會進這裏
    {
        this->value = 1;//value=1使該任務再執行一次就可以達到結束狀態
        this->user_data = (void *)1;//下次再進來就不進這個分支了,而是直接delete this;
    }
    else
        delete this;
    return series->pop();
}

首次 done() 的時候不析構,並將狀態置爲下次進來析構(value 賦 1&&user_data 非空)。

然後將本 series 裏面要執行的用戶任務執行起來。當用戶任務執行完,會再次執行到 GraphNode->Done(); 這時侯,Node 析構,並將所有 follower->dispatch() 起來。這就是圖任務的整體執行路徑。

WFRepeaterTask:

這是一個遞歸 Task,繼承自 GenericTask,也就是說啓動時,會創建一個 Series,並把 Series 啓動起來。創建的時候傳入創建任務的回調 Create,在 dispatch() 得時候,往當前 Series 裏傳入兩個任務,一個是 Create 回調創建出來的新任務,一個是當前任務。這樣的話,順序任務的調度就變成:執行任務—》創建任務—》執行任務。。。

WFConditional:

WFConditional 是條件任務包裝器,可以把其他任務包裝成條件任務,通過一個 atomic 變量實現。新增加一個 signal 接口,當 dispatch 和 signal 都執行後,任務會被執行。原理:當任務被 dispatch 或者 signal 時,都會去設置原子 bool 的值,並檢查狀態,如果設置過狀態,就調起任務,可見第一次不會調起,第二次纔會調起任務。

爲了避免發送 signal 者持有條件任務的裸指針,框架還提供了全局的命名的條件任務,發送者可以根據名字給 conditional 發 signal,內部是一個觀察者模式,以 cond 的名字爲 key 構建了一個紅黑樹管理,當 signal 某個 key 的時候,找到對應的條件任務發送 signal()。

WFModuleTask:

WFModuleTask 提供了一個模塊級的封裝,可以把一系列任務封裝到一個模塊裏,可以註冊一個模塊的回調函數。WFModuleTask 本質上還是一個 SeriesWork,把一系列任務封裝在一起,降低功能任務之間的耦合程度。

服務

基於 workflow 框架我們可以迅速的構建 http 服務器,只需要幾行代碼:

int main()
{
    WFHttpServer server([](WFHttpTask *task) {
        task->get_resp()->append_output_body("<html>Hello World!</html>");
    });
    if (server.start(8888) == 0) { // start server on port 8888
        getchar(); // press "Enter" to end.
        server.stop();
    }
    return 0;
}

可以看到構造一個 WFHttpServer,只要傳入一個處理 WFHttpTask 的回調函數即可。

下面分別看 WFHttpServer 、WFServerTask

WFHttpServer

首先 WFHttpServer 是 WFServer 的 http 消息時的特化版本。WFServer 在 BaseServer 的基礎上增加了輸入輸出模板參數,並增加了一個可以處理 WFNetworkTask 的回調函數,同時重寫了 new_session 方法;

poller 在 create_message 的時候會調到 new_session,創建 WFServerTask;

Communicator 並不知道 Service 是什麼類型的 service,在 create_message 的時候不管是什麼類型的 service,都調用 service 對應的 new_session 接口去生產 session 交給 Poller 去生成任務交由線程池執行。

WFServerTask

WFServerTask 繼承自 WFHttpTask,WFServerTask 內定義了兩個局部類,Processor 和 Series。

前者 Processor 保存着服務初始化時傳入的回調和當前 WFServerTask 的指針,dispatch 時執行回調處理當前任務。

後者 Series 本質上是一個 SeriesWork,把 Processor 和當前任務串起來,並先執行 Processor,最後執行當前 WFServerTask,當前任務負責 reply。同時負責引用計數,讓 service 知道有多少任務在引用。

服務小結

session 是被動產生的,服務是靜態定義的,服務定義了自己的服務類型、和產生任務的方法、處理任務的回調等等,然後在服務啓動的時候綁定地址創建 fd,把自己綁定到 Communicator 上,交給 Reactor 去調度。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/m53Yz7Q-RpNCUcNmSUJOXw