C-- 從零實現協程調度框架

0 前言

推進掌握一門知識的方法除了溫故知新之外,還可以是觸類旁通. 近期沉寂停更的時間裏,我在經歷自學 c++ 理論知識的入門階段,不過大家都知道“紙上得來終覺淺”的道理,因此我決定以復刻 golang gmp 協程調度設計思路爲目標,基於c++11的風格實現一個低配乞丐版協程調度框架,以此作爲我的首個 c++ 實踐開源項目,並希望以此爲契機,在提高 c++ 編程熟練度的同時,也能提供一波旁支輸入,反補提升我對gmp概念的理解.

該項目我已於 github 開源,cbricks 是我基於 c++11 從零實現的基礎工具開源庫:https://github.com/xiaoxuxiansheng/cbricks.
其中實現的內容包括但不僅限於 協程調度框架 workerpool協程 coroutine/線程 thread併發通信隊列 channel日誌打印組件 logger等基本工具類,而 協程調度框架 workerpool 正是我今天我要向大家介紹的主題.
這是我作爲 c++ 初學者推進的首個開源項目,完全出於學習實踐目的,難免存在水平不足以及重複造輪的問題,如有考慮不到位、不完善之處請多多包涵,也歡迎批評指正~

在開始正文前,致敬環節 必不可少. 在實現 cbricks 的編程過程中,在很大程度上學習借鑑了sylar-yin 老師的課程,在此特別感謝,也附上其開源項目傳送門供大家參考使用:https://github.com/sylar-yin/sylar . 正因爲有前輩們慷慨無私的傾囊分享,我的學習之路才得以更加平坦順暢. 正是以上種種鼓勵着我能有動力把技術分享以及這種開源精神繼續傳播下去.

1 基本概念

首先,需要大家一起理清楚有關協程的基本概念.

1.1 線程與協程

我們通常所熟知的最小調度單元稱爲線程(thread),亦指內核級線程,其創建、銷燬、調度過程需要交由操作系統內核負責. 線程進程可以是多對一關係,可以充分利用 CPU 多核的能力,提高程序運行的併發度.

協程(coroutine) 又稱爲用戶級線程,是在用戶態視角下對線程概念的二次封裝. 一方面,協程與線程關係爲多對一,因此在邏輯意義上屬於更細粒度的調度單元;另一方面,因爲協程的創建、銷燬、調度行爲都在用戶態中完成,而無需內核參與,因此協程是一個更加輕量化的概念. (對於內核來說,其最小調度單元始終爲線程不變,至於用戶態下對線程又作了怎樣的邏輯拆分,對於內核而言是完全透明無感知的,因此無需介入)

線程與協程

1.2 coroutine 與 goroutine

因爲我畢竟有着較長的 golang 開發使用經驗,需要在探討相關問題的時候是無法繞開對 golang 中對 goroutine 這一設計的對比與探討的.

我們把常規的協程稱爲 coroutine. 而在 golang 語言層面天然支持一種優化版協程模型,稱爲 goroutine,並運轉調度於 go 語言中知名的 gmp(goroutine-machine-processor) 架構之下.

gmp 架構

有關 gmp 相關內容更細緻的講解,可以參見我之前分享的文章:golang gmp 原理

在經歷了 cbricks workerpool 的開發實踐後,我也對 gmp 架構下 groutine 相較於普通 coroutine 存在的優勢有了一些更深刻的體會:

用戶視角下的 gmp 併發

2 快速上手

做完基本概念鋪墊後,下面我們開始介紹有關協程調度框架 cbricks workerpool 的具體實現內容.

2.1 使用方法

本章我們聚焦在如何快速上手使用 workerpool 這一問題. workerpool 類型聲明於 ./pool/workerpool.h 頭文件中,使用方通常只需關心其構造函數兩個公開方法

// 命名空間 cbricks::pool
namespace cbricks{namespace pool{
// 協程調度池
classWorkerPool: base::Noncopyable{
public:
// 構造函數  threads——使用的線程個數. 默認爲 8 個
WorkerPool(size_t threads =8);
// ...
public:
/**
        公開方法
    */
/**
        * submit: 向協程調度池中提交一個任務 (仿 golang 協程池 ants 風格)
            - task 會被隨機分配到線程手中,保證負載均衡
            - task 被一個線程取到之後,會創建對應協程實例,爲其分配本地棧,此時該任務和協程就固定屬於一個線程了
        * param:task——提交的任務  nonblock——是否爲阻塞模式
            - 阻塞模式:線程本地任務隊列滿時阻塞等待 
            - 非阻塞模式:線程本地隊列滿時直接返回 false
        * response:true——提交成功 false——提交失敗
    */
bool submit(task task, bool nonblock = false);

// 工作協程調度任務過程中,可以通過執行次方法主動讓出線程的調度權 (仿 golang runtime.Goched 風格)
void sched();
}}

2.2 使用示例

下面是關於 workerpool 的具體使用示例,其中演示瞭如何完成一個 workerpool 的初始化,並通過 submit 方法向其中批量投遞異步執行的任務,最後對執行結果進行驗收:

#include <iostream>

#include "sync/sem.h"
#include "pool/workerpool.h"

void testWorkerPool();

int main(int argc, char** argv){
// 測試函數
testWorkerPool();
}

void testWorkerPool(){
// 協程調度框架類型別名定義
typedef cbricks::pool::WorkerPool workerPool;
// 信號量類型別名定義
typedef cbricks::sync::Semaphore semaphore;

// 初始化協程調度框架,設置併發的 threads 數量爲 8
workerPool::ptr workerPoolPtr(new workerPool(8));

// 初始化一個原子計數器
    std::atomic<int> cnt{0};
// 初始化一個信號量實例
    semaphore sem;

// 投遞 10000 個異步任務到協程調度框架中,執行邏輯就是對 cnt 加 1
for(int i =0; i <10000; i++){
// 執行 submit 方法,將任務提交到協程調度框架中
        workerPoolPtr->submit([&cnt,&sem](){
            cnt++;
            sem.notify();
});
}

// 通過信號量等待 10000 個異步任務執行完成
for(int i =0; i <10000; i++){
        sem.wait();
}

// 輸出 cnt 結果(預期結果爲 10000)
    std::cout << cnt << std::endl;
}

3 架構設計

瞭解完使用方式後,隨後就來揭曉其底層實現原理. 本着由總到分的學習指導綱領,本章我們從全局視角縱覽 workerpool 的設計實現架構.

3.1 整體架構與核心概念

cbricks 協程調度架構

workerpool 自下而上,由粗到細可以分爲如下層級概念:

3.2 相比 gmp 的不足之處

我在實現 workerpool 時,一定程度上仿照了 gmp 的風格,包括 thread 本地任務隊列 taskq 的實現以及 workstealing 機制的設計.

cbricks 協程調度框架的不足之處

然而受限於我的個人水平以及語言層面的風格差異,相較於 gmpworkerpool 還存在幾個明顯的缺陷:

4 頭文件源碼

從第 4 章開始,我們正式進入源碼解析環節. 首先給出關於 workerpool 頭文件的完整代碼展示,包含其中的成員屬性以及公私方法定義. 下面的示意圖以及源碼中給出的註釋相對比較完備,在此不再贅述:

workerpool 類定義

代碼位於 ./pool/workerpool.h

// 保證頭文件內容不被重複編譯
#pragma once 

/**
 依賴的標準庫頭文件   
*/
// 標準庫智能指針相關
#include <memory>
// 標準庫函數編程相關
#include <functional>
// 標準庫原子量相關
#include <atomic>
// 標準庫——動態數組,以此作爲線程池的載體
#include <vector>

/**
    依賴的項目內部頭文件
*/
// 線程 thread 實現
#include "../sync/thread.h"
// 協程 coroutine 實現
#include "../sync/coroutine.h"
// 阻塞隊列 channel 實現 (一定程度上仿 golang channel 風格)
#include "../sync/channel.h"
// 信號量 semaphore 實現
#include "../sync/sem.h"
// 拷貝禁用工具,用於保證類實例無法被值拷貝和值傳遞
#include "../base/nocopy.h"

// 命名空間 cbricks::pool
namespace cbricks{namespace pool{
// 協程調度池 繼承 Noncopyable 保證禁用值拷貝和值傳遞功能
classWorkerPool: base::Noncopyable{
public:
// 協程池共享指針類型別名
typedef std::shared_ptr<WorkerPool> ptr;
// 一筆需要執行的任務
typedef std::function<void()> task;
// 一個線程持有的本地任務隊列
typedef sync::Channel<task> localq;
// 本地任務隊列指針別名
typedef localq::ptr localqPtr;
// 線程指針別名
typedef sync::Thread* threadPtr;
// 一個分配了運行任務的協程
typedef sync::Coroutine worker;
// 協程智能指針別名
typedef sync::Coroutine::ptr workerPtr;
// 讀寫鎖別名
typedef sync::RWLock rwlock;
// 信號量類型別名
typedef sync::Semaphore semaphore;

public:
/**
      構造/析構函數
    */
// 構造函數  threads——使用的線程個數. 默認爲 8 個
WorkerPool(size_t threads =8);
// 析構函數  
~WorkerPool();

public:
/**
        公開方法
    */
/**
        * submit: 向協程調度池中提交一個任務 (仿 golang 協程池 ants 風格)
            - task 會被隨機分配到線程手中,保證負載均衡
            - task 被一個線程取到之後,會創建對應協程實例,爲其分配本地棧,此時該任務和協程就固定屬於一個線程了
        * param:task——提交的任務  nonblock——是否爲阻塞模式
            - 阻塞模式:線程本地任務隊列滿時阻塞等待 
            - 非阻塞模式:線程本地隊列滿時直接返回 false
        * response:true——提交成功 false——提交失敗
    */
bool submit(task task, bool nonblock = false);

// 工作協程調度任務過程中,可以通過執行次方法主動讓出線程的調度權 (仿 golang runtime.Goched 風格)
void sched();

private:
/**
     * thread——workerPool 中封裝的線程類
     * - index:線程在線程池中的 index
     * - thr:真正的線程實例,類型爲 sync/thread.h 中的 Thread
     * - taskq:線程的本地任務隊列,其中數據類型爲閉包函數 void()
     * - lock:一把線程實例粒度的讀寫鎖. 用於隔離 submit 操作和 workstealing 操作,避免因任務隊列阻塞導致死鎖
     */
structthread{
typedef std::shared_ptr<thread> ptr;
int index;
        threadPtr thr;
        localqPtr taskq;
        rwlock lock;
/**
         *  構造函數
         * param: index: 線程在線程池中的 index; thr: 底層真正的線程實例; taskq:線程持有的本地任務隊列
        */
thread(int index,threadPtr thr, localqPtr taskq):index(index),thr(thr),taskq(taskq){}
~thread()=default;
};

private:
/**
        私有方法
    */
// work:線程運行主函數,持續不斷地從本地任務隊列 taskq 或本地協程隊列 t_schedq 中獲取任務/協程進行調度. 倘若本地任務爲空,會嘗試從其他線程本地任務隊列竊取任務執行
void work();
/**
     * readAndGo:從指定的任務隊列中獲取任務並執行
     * param:taskq——指定的任務隊列 nonblock——是否爲阻塞模式
     * reponse:true——成功 false——失敗
    */
bool readAndGo(localqPtr taskq, bool nonblock);
/**
     * goTask: 爲一筆任務創建一個協程實例,並調度該任務函數
     * param: cb——待執行任務
     * tip:如果該任務未一次性執行完成(途中使用了 sched 方法),則會在棧中封存好任務的執行信息,然後將該協程實例追加到線程本地的協程隊列 t_schedq 中,等待後續再被線程調度
     */
void goTask(task cb);
/**
     * goWorker:調度某個協程實例,其中已經分配好執行的任務函數
     * param: worker——分配好執行任務函數的協程實例
     * tip:如果該任務未一次性執行完成(途中使用了 sched 方法),則會在棧中封存好任務的執行信息,然後將該協程實例追加到線程本地的協程隊列 t_schedq 中,等待後續再被線程調度
    */
void goWorker(workerPtr worker);

/**
     * workStealing:當其他線程任務隊列 taskq 中竊取半數任務填充到本地隊列
     */
void workStealing();
/**
     * workStealing 重載:從線程 stealFrom 的任務隊列中竊取半數任務填充到線程 stealTo 本地隊列
     */
void workStealing(thread::ptr stealTo, thread::ptr stealFrom);
/**
     * getStealingTarget:隨機獲取一個線程作爲竊取目標
     */
thread::ptr getStealingTarget();

/**
     * getThreadByThreadName 通過線程名稱獲取對應的線程實例
     */
thread::ptr getThreadByThreadName(std::string threadName);
/**
     * getThread 獲取當前線程實例
     */
thread::ptr getThread();

private:
/**
     * 靜態私有方法
     */
// getThreadNameByIndex:通過線程 index 映射得到線程名稱
static const std::string getThreadNameByIndex(int index);
// getThreadIndex:獲取當前線程的 index
static const int getThreadIndex();
// getThreadName:獲取當前線程的名稱
static const std::string getThreadName();


private:
/**
     * 私有成員屬性
     */
// 基於 vector 實現的線程池,元素類型爲 WorkerPool::thread 對應共享指針
    std::vector<thread::ptr> m_threadPool;

// 基於原子變量標識 workerPool 是否已關閉
    std::atomic<bool> m_closed{false};
};

}}

5 核心實現源碼

接下來針對 workerpool 中的核心流程進行詳細的源碼走讀,有關 workerpool 具體實現代碼位於 ./pool/workerpool.cpp 中.

5.1 依賴的頭文件與變量

依賴的外部變量

首先涉及到兩個核心變量的定義:

// 標準庫隊列實現. 依賴隊列作爲線程本地協程隊列的存儲載體
#include <queue>

// workerpool 頭文件
#include "workerpool.h"
// 本項目定義的斷言頭文件
#include "../trace/assert.h"

// namespace cbricks::pool
namespace cbricks{namespace pool{

/**
 * 全局變量 s_taskId:用於分配任務 id 的全局遞增計數器,通過原子變量保證併發安全
 * 每個任務函數會根據分配到的 id,被均勻地分發給各個線程,以此實現負載均衡
*/
static std::atomic<int> s_taskId{0};

/**
 * 線程本地變量  t_schedq:線程私有的協程隊列
 * 當線程下某個協程沒有一次性將任務執行完成時(任務調用了 sched 讓渡函數),則該協程會被暫存於此隊列中,等待後續被相同的線程繼續調度
 */
staticthread_local std::queue<WorkerPool::workerPtr> t_schedq;

// ...

}}

5.2 構造函數與析構函數

workerpool 構造函數

下面介紹workerpool 的構造函數,其任務很明確,就是初始化指定數量的 thread,爲其分配好對應的 taskq,並將 thread 一一投遞進入到線程池 threadPool 中.

此處值得一提的是,thread 啓動後異步運行的方法是 WorkerPool::work,其中會涉及到從 threadPool 中取出當前 thread 實例的操作,因此這裏需要通過信號量 semaphore 保證 thread 實例先被投遞進入 threadPool 後,對應 WorkerPool::work 方法才能被放行.

// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
 * workerpool 構造函數:
 * - 初始化好各個線程實例 thread
 * - 將各 thread 添加到線程池 m_threadPool 中
 */
WorkerPool::WorkerPool(size_t threads){
CBRICKS_ASSERT(threads >0,"worker pool init with nonpositive threads num");

// 爲線程池預留好對應的容量
this->m_threadPool.reserve(threads);

/**
     * 構造好對應於每個 thread 的信號量
     * 這是爲了保證 thread 實例先被添加進入 m_threadPool,thread 中的調度函數才能往下執行
     * 這樣做是因爲 thread 調度函數有依賴於從 m_threadPool 獲取自身實例的操作
    */
std::vector<semaphore> sems(threads);
// 另一個信號量,用於保證所有 thread 調度函數都正常啓動後,當前構造函數才能退出,避免 sems 被提前析構
    semaphore waitGroup;

// 初始化好對應數量的 thread 實例並添加進入 m_threadPool
for(int i =0; i < threads; i++){
// 根據 index 映射得到 thread 名稱
        std::string threadName =WorkerPool::getThreadNameByIndex(i);
// 將 thread 實例添加進入 m_threadPool
this->m_threadPool.push_back(thread::ptr(
// thread 實例初始化
newthread(
            i,
// 
new sync::Thread([this,&sems,&waitGroup](){
/**
                 * 此處 wait 操作是需要等待對應 thread 實例先被推送進入 m_threadPool
                 * 因爲一旦後續的 work 函數運行,就會涉及從 m_threadPool 中獲取 thread 實例的操作
                 * 因此先後順序不能顛倒
                 */
                sems[getThreadIndex()].wait();
/**
                 * 此處 notify 操作是配合外層的 waitGroup.wait 操作
                 * 保證所有 thread 都正常啓動後,workerPool 構造函數才能退出
                 * 這是爲了防止 sems 被提前析構
                 */
                waitGroup.notify();
// 異步啓動的 thread,最終運行的調度函數是 workerpool::work
this->work();

},
// 注入 thread 名稱,與 index 有映射關係
            threadName),
// 分配給 thread 的本地任務隊列
localqPtr(new localq))));
/**
         * 在 thread 實例被推送入 m_threadPool 後進行 notify
         * 這樣 thread 調度函數纔會被向下放行
         */
        sems[i].notify();
}

/**
     * 等待所有 thread 實例正常啓動後,構造函數再退出
     */
for(int i =0; i < threads; i++){
        waitGroup.wait();
}
}

析構函數中,要做的處理是將 workerpool 關閉標識 m_closed 置爲 true,並且一一關閉所有 thread 下的 taskq ,這樣運行中的 thread 在感知到這一信息後都會主動退出.

// 析構函數
WorkerPool::~WorkerPool(){
// 將 workpool 的關閉標識置爲 true,後續運行中的線程感知到此標識後會主動退出
this->m_closed.store(true);
// 等待所有線程都退出後,再退出 workpool 的析構函數
for(int i =0; i <this->m_threadPool.size(); i++){
// 關閉各 thread 的本地任務隊列
this->m_threadPool[i]->taskq->close();
// 等待各 thread 退出
this->m_threadPool[i]->thr->join();
}
}

// ...

}}

5.3 公有方法:提交任務

workerpool 提交任務流程

用戶通過 submit 方法,能夠將 task 提交到 workerpool 中. 在 submit 流程中:

這裏需要注意的是,在投遞任務到 thread 的 taskq 前,需要先加上該 thread 的讀鎖 readlock. 這是爲了和該 thread 下可能正在執行的 workStealing 操作進行互斥,避免因 taskq 空間不足而導致死鎖問題. 這個點在竊取流程的講解中詳細展開.

// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
 * submit: 提交一個任務到協程調度池中,任務以閉包函數 void() 的形式組裝
 * - 爲任務分配全局遞增且唯一的 taskId
 * - 根據 taskId 將任務均勻分發給指定 thread
 * - 將任務寫入到指定 thread 的本地任務隊列中
 */
bool WorkerPool::submit(task task, bool nonblock){
// 若 workerpool 已關閉,則提交失敗
if(this->m_closed.load()){
returnfalse;
}

// 基於任務 id 對 m_threadPool 長度取模,將任務映射到指定 thread
int targetThreadId =(s_taskId++)%(this->m_threadPool.size());
    thread::ptr targetThr =this->m_threadPool[targetThreadId];

// 針對目標 thread 加讀鎖,這是爲了防止和目標 thread 的 workstealing 操作併發最終因任務隊列 taskq 容量溢出而導致死鎖
rwlock::readLockGuard guard(targetThr->lock);

// 往對應 thread 的本地任務隊列中寫入任務
return targetThr->taskq->write(task, nonblock);
}

// ...

}}

5.4 公有方法:讓渡執行權

workerpool 協程讓渡流程

task 在運行過程中,可以通過調用 workerpool::sched 方法完成執行權的主動讓渡. 此時 task 對應 coroutine 會暫停運行,並將執行權切換回到 thread 主函數中,然後 thread 會將該 coroutine 暫存到本地協程隊列 schedq 中,等待後續再對其調度執行.

// namespace cbricks::pool
namespace cbricks{ namespace pool{
// ...
// sched:讓渡函數. 在任務執行過程中,可以通過該方法主動讓出線程的執行權,則此時任務所屬的協程會被添加到 thread 的本地協程隊列 t_schedq 中,等待後續再被調度執行
void WorkerPool::sched(){
    worker::GetThis()->sched();
}

// ...

}}

5.5 線程調度任務主流程

workerpool 線程調度主流程

workerpool::work 方法是各 thread 循環運行的主函數,其中包含了 thread 調度 task 和 coroutine 的核心邏輯:

// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
 * work: 線程運行的主函數
 * 1) 獲取需要調度的協程(下述任意步驟執行成功,則跳到步驟 2))
 *   - 從本地任務隊列 taskq 中取任務,獲取成功則爲之初始化協程實例
 *   - 從本地協程隊列 schedq 中取協程
 *   - 從其他線程的任務隊列 taskq 中偷取一半任務到本地任務隊列
 * 2) 調度協程執行任務
 * 3) 針對主動讓渡而退出的協程,添加到本地協程隊列
 * 4) 循環上述流程
 */
void WorkerPool::work(){
// 獲取到當前 thread 對應的本地任務隊列 taskq
    localqPtr taskq =this->getThread()->taskq;

// main loop
while(true){
// 如果 workerpool 已關閉 則主動退出
if(this->m_closed.load()){
return;
}

/**  
         * 執行優先級爲 本地任務隊列 taskq -> 本地協程隊列 t_t_schedq -> 竊取其他線程任務隊列 other_taskq
         * 爲防止飢餓,至多調度 10 次的 taskq 後,必須嘗試處理一次 t_schedq 
        */

// 標識本地任務隊列 taskq 是否爲空
bool taskqEmpty =false;
// 至多調度 10 次本地任務隊列 taskq
for(int i =0; i <10; i++){
// 以【非阻塞模式】從 taskq 獲取任務併爲之分配協程實例和調度執行
if(!this->readAndGo(taskq,false)){
// 如果 taskq 爲空,將 taskqEmpty 置爲 true 並直接退出循環
                taskqEmpty =true;
break;
}
}

// 嘗試從線程本地的協程隊列 t_schedq 中獲取協程並進行調度
if(!t_schedq.empty()){
// 從協程隊列中取出頭部的協程實例
            workerPtr worker = t_schedq.front();
            t_schedq.pop();
// 進行協程調度
this->goWorker(worker);
// 處理完成後直接進入下一輪循環
continue;
}

// 如果未發現 taskq 爲空,則無需 workstealing,直接進入下一輪循環
if(!taskqEmpty){
continue;
}

/** 
         * 走到這裏意味着 taskq 和 schedq 都是空的,則要嘗試發起竊取操作
         * 隨機選擇一個目標線程竊取半數任務添加到本地隊列中
        */
this->workStealing();

/**  
         * 以【阻塞模式】嘗試從本地任務獲取任務並調度執行
         * 若此時仍沒有可調度的任務,則當前 thread 陷入阻塞,讓出 cpu 執行權
         * 直到有新任務分配給當前 thread 時,thread 纔會被喚醒
        */
this->readAndGo(taskq,true);
}
}
// ...

}}

workerpool 單個任務處理流程

以 readAndGo 方法爲入口,thread 會嘗試從 taskq 中獲取一筆 task;獲取到後,會爲 task 構建一一對應的 coroutine 實例(至此 task/coroutine 與 thread 完全綁定),然後通過 coroutine::go 方法,將 thread 執行權切換至 coroutine 手中,由 coroutine 執行其中的 task. 只有在 task 執行結束或者主動讓渡時,執行權纔會返還到 thread 主函數中,此時 thread 會判斷 coroutine 是否是因爲主動讓渡而暫停執行,如果是的話,則會將該 coroutine 實例追加到 schedq 中,等待後續尋找合適時機再作調度執行.

// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
 * readAndGo:
 *   - 從指定任務隊列中獲取一個任務
 *   - 爲之分配協程實例並調度執行
 *   - 若協程實例未一次性執行完成(執行了讓渡 sched),則將協程添加到線程本地的協程隊列 schedq 中
 * param:taskq——任務隊列;nonblock——是否以非阻塞模式從任務隊列中獲取任務
 * response:true——成功;false,失敗(任務隊列 taskq 爲空)
 */
// 將一個任務包裝成協程並進行調度. 如果沒有一次性調度完成,則將協程實例添加到線程本地的協程隊列 t_schedq
bool WorkerPool::readAndGo(cbricks::pool::WorkerPool::localqPtr taskq, bool nonblock){
// 任務容器
    task cb;
// 從 taskq 中獲取任務
if(!taskq->read(cb,nonblock)){
returnfalse;
}

// 對任務進行調度
this->goTask(cb);
returntrue;
}

/**
 * goTask
 *   - 爲指定任務分配協程實例
 *   - 執行協程
 *   - 若協程實例未一次性執行完成(執行了讓渡 sched),則將協程添加到線程本地的協程隊列 schedq 中
 * param:cb——待執行的任務
 */
void WorkerPool::goTask(task cb){
// 初始化協程實例
    workerPtr _worker(newworker(cb));
// 調度協程
this->goWorker(_worker);
}

/**
 * goWorker
 *   - 執行協程
 *   - 若協程實例未一次性執行完成(執行了讓渡 sched),則將協程添加到線程本地的協程隊列 schedq 中
 * param:worker——待運行的協程
 */
void WorkerPool::goWorker(workerPtr worker){
// 調度協程,此時線程的執行權會切換進入到協程對應的方法棧中
    worker->go();
// 走到此處意味着線程執行權已經從協程切換回來
// 如果此時協程並非已完成的狀態,則需要將其添加到線程本地的協程隊列 schedq 中,等待後續繼續調度
if(worker->getState()!= sync::Coroutine::Dead){
        t_schedq.push(worker);
}
}
// ...

}}

5.6 任務竊取流程

workerpool 跨線程任務竊取流程

當 thread 發現 taskq 和 schedq 都空閒時,則會嘗試執行竊取操作. 此時 thread 隨機選取另一個 thread 作爲竊取目標,竊取其 taskq 中的半數 task,追加到本地 taskq 中.

在執行竊取操作的過程中,需要對當前 thread 加寫鎖,以避免發生死鎖問題:

比如在竊取前,當前 thread 判定自己的 taskq 還有足夠空間用於承載竊取來的 task;但是此期間若有新的任務 submit 到來,則可能把 taskq 的空間佔據,最後導致沒有足夠容量承載竊取到的 task,最終導致 thread 調度流程 hang 死在 workstealing 流程無法退出.

上述問題的解法就是,在竊取前,先加 thread 寫鎖(這樣併發到來的 submit 操作就無法完成 task 投遞)然後再檢查一遍 taskq 並確認容量充足後,再發起實際的竊取操作.

// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
// 從某個 thread 中竊取一半任務給到本 thread 的 taskq
void WorkerPool::workStealing(){
// 選擇一個竊取的目標 thread 
    thread::ptr stealFrom =this->getStealingTarget();
if(!stealFrom){
return;
}
// 從目標 thread 中竊取半數任務添加到本 thread taskq 中
this->workStealing(this->getThread(),stealFrom);
}

// 從 thread:stealFrom 中竊取半數任務給到 thread:stealTo
void WorkerPool::workStealing(thread::ptr stealTo, thread::ptr stealFrom){
// 確定竊取任務數量:目標本地任務隊列 taskq 中任務總數的一半
int stealNum = stealFrom->taskq->size()/2;
if(stealNum <=0){
return;
}

// 針對 thread:stealTo 加寫鎖,防止因 workstealing 和 submit 行爲併發,導致線程因 taskq 容量溢出而發生死鎖
rwlock::lockGuard guard(stealTo->lock);
// 檢查此時 stealTo 中的 taskq 如果剩餘容量已不足以承載擬竊取的任務量,則直接退出
if(stealTo->taskq->size()+ stealNum > stealTo->taskq->cap()){
return;
}

// 創建任務容器,以非阻塞模式從 stealFrom 的 taskq 中竊取指定數量的任務
std::vector<task> containers(stealNum);
if(!stealFrom->taskq->readN(containers,true)){
return;
}

// 將竊取到的任務添加到 stealTo 的 taskq 中
    stealTo->taskq->writeN(containers,false);
}

// 隨機選擇本 thread 外的一個 thread 作爲竊取的目標
WorkerPool::thread::ptr WorkerPool::getStealingTarget(){
// 如果線程池長度不足 2,直接返回
if(this->m_threadPool.size()<2){
returnnullptr;
}

// 通過隨機數,獲取本 thread 之外的一個目標 thread index
int threadIndex =WorkerPool::getThreadIndex();
int targetIndex =rand()%this->m_threadPool.size();
while( targetIndex == threadIndex){
        targetIndex =rand()%this->m_threadPool.size();
}

// 返回目標 thread
returnthis->m_threadPool[targetIndex];
}
// ...

}}

6 總結

祝賀,至此本文結束. 本篇和大家探討了,如何基於 c++ 從零到一實現一個協程調度框架,其核心功能包括:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/cWBRZ9Zha8qN_A9C_epn1A