C-- 從零實現協程調度框架
0 前言
推進掌握一門知識的方法除了溫故知新之外,還可以是觸類旁通. 近期沉寂停更的時間裏,我在經歷自學 c++ 理論知識的入門階段,不過大家都知道“紙上得來終覺淺”的道理,因此我決定以復刻 golang gmp 協程調度設計思路爲目標,基於c++11的風格實現一個低配乞丐版協程調度框架,以此作爲我的首個 c++ 實踐開源項目,並希望以此爲契機,在提高 c++ 編程熟練度的同時,也能提供一波旁支輸入,反補提升我對gmp概念的理解.
該項目我已於 github 開源,cbricks 是我基於 c++11 從零實現的基礎工具開源庫:https://github.com/xiaoxuxiansheng/cbricks.
其中實現的內容包括但不僅限於 協程調度框架 workerpool、協程 coroutine/線程 thread、併發通信隊列 channel、日誌打印組件 logger等基本工具類,而 協程調度框架 workerpool 正是我今天我要向大家介紹的主題.
這是我作爲 c++ 初學者推進的首個開源項目,完全出於學習實踐目的,難免存在水平不足以及重複造輪的問題,如有考慮不到位、不完善之處請多多包涵,也歡迎批評指正~
在開始正文前,
致敬環節必不可少. 在實現 cbricks 的編程過程中,在很大程度上學習借鑑了sylar-yin 老師的課程,在此特別感謝,也附上其開源項目傳送門供大家參考使用:https://github.com/sylar-yin/sylar . 正因爲有前輩們慷慨無私的傾囊分享,我的學習之路才得以更加平坦順暢. 正是以上種種鼓勵着我能有動力把技術分享以及這種開源精神繼續傳播下去.
1 基本概念
首先,需要大家一起理清楚有關協程的基本概念.
1.1 線程與協程
我們通常所熟知的最小調度單元稱爲線程(thread),亦指內核級線程,其創建、銷燬、調度過程需要交由操作系統內核負責. 線程與進程可以是多對一關係,可以充分利用 CPU 多核的能力,提高程序運行的併發度.
而協程(coroutine) 又稱爲用戶級線程,是在用戶態視角下對線程概念的二次封裝. 一方面,協程與線程關係爲多對一,因此在邏輯意義上屬於更細粒度的調度單元;另一方面,因爲協程的創建、銷燬、調度行爲都在用戶態中完成,而無需內核參與,因此協程是一個更加輕量化的概念. (對於內核來說,其最小調度單元始終爲線程不變,至於用戶態下對線程又作了怎樣的邏輯拆分,對於內核而言是完全透明無感知的,因此無需介入)
線程與協程
1.2 coroutine 與 goroutine
因爲我畢竟有着較長的 golang 開發使用經驗,需要在探討相關問題的時候是無法繞開對 golang 中對 goroutine 這一設計的對比與探討的.
我們把常規的協程稱爲 coroutine. 而在 golang 語言層面天然支持一種優化版協程模型,稱爲 goroutine,並運轉調度於 go 語言中知名的 gmp(goroutine-machine-processor) 架構之下.
gmp 架構
有關 gmp 相關內容更細緻的講解,可以參見我之前分享的文章:golang gmp 原理
在經歷了 cbricks workerpool 的開發實踐後,我也對 gmp 架構下 groutine 相較於普通 coroutine 存在的優勢有了一些更深刻的體會:
-
• 線程松耦合:經由
P的從中斡旋,goroutine能夠跨 M(thread)調度運行,真正實現 G 與 M 之間的松耦合. 而這一點我所實現的 c++ coroutine 中無法做到.(本文實現的 coroutine 底層依賴於 c 中的ucontext庫完成協程棧空間的分配,由於棧的線程私有性,一經分配便不允許被其他線程染指,因此 coroutine 在初始化後就必然是某個 thread 所獨有的) -
• 棧空間自適應擴縮:
goroutine 棧空間大小可以根據實際需要做到自適應擴縮,並且針對使用方完全屏蔽這一細節. 而我所實現的c++ coroutine需要在其初始化時就顯式設定好棧空間大小,並且在運行過程中不便於修改.
用戶視角下的 gmp 併發
- • 阻塞粒度適配:這一點非常重要. golang 爲使用方屏蔽了線程的概念,所有
併發操作都基於goroutine 粒度完成,這不僅限於調度,也包括與之相應的一系列併發阻塞工具,例如鎖 mutex,通道 channel等,都在語言層面天然支持goroutine 粒度的被動阻塞(go_park)操作,與 gmp 體系完美適配;而這一點在 c++ 中則有所不同,如鎖 mutex、信號量 semaphore等工具的最小阻塞粒度都是線程,這就會導致協程的優勢遭到削弱,因爲在一個 coroutine 中的阻塞行爲最終會上升到 thread 粒度,並進而導致 thread 下其他 coroutine 也無法得到正常調度.
2 快速上手
做完基本概念鋪墊後,下面我們開始介紹有關協程調度框架 cbricks workerpool 的具體實現內容.
2.1 使用方法
本章我們聚焦在如何快速上手使用 workerpool 這一問題. workerpool 類型聲明於 ./pool/workerpool.h 頭文件中,使用方通常只需關心其構造函數和兩個公開方法:
-
• 構造函數——WorkerPool:初始化
workerpool實例,其中唯一入參threads爲需要啓用的線程個數,默認爲 8 個 -
• 公開方法——submit:往
workerpool中投遞一個任務 task(以void() 閉包函數的形式) -
• 公開方法——sched:
主動讓渡當前task的執行權,以實現同線程下協程間的切換
// 命名空間 cbricks::pool
namespace cbricks{namespace pool{
// 協程調度池
classWorkerPool: base::Noncopyable{
public:
// 構造函數 threads——使用的線程個數. 默認爲 8 個
WorkerPool(size_t threads =8);
// ...
public:
/**
公開方法
*/
/**
* submit: 向協程調度池中提交一個任務 (仿 golang 協程池 ants 風格)
- task 會被隨機分配到線程手中,保證負載均衡
- task 被一個線程取到之後,會創建對應協程實例,爲其分配本地棧,此時該任務和協程就固定屬於一個線程了
* param:task——提交的任務 nonblock——是否爲阻塞模式
- 阻塞模式:線程本地任務隊列滿時阻塞等待
- 非阻塞模式:線程本地隊列滿時直接返回 false
* response:true——提交成功 false——提交失敗
*/
bool submit(task task, bool nonblock = false);
// 工作協程調度任務過程中,可以通過執行次方法主動讓出線程的調度權 (仿 golang runtime.Goched 風格)
void sched();
}}
2.2 使用示例
下面是關於 workerpool 的具體使用示例,其中演示瞭如何完成一個 workerpool 的初始化,並通過 submit 方法向其中批量投遞異步執行的任務,最後對執行結果進行驗收:
#include <iostream>
#include "sync/sem.h"
#include "pool/workerpool.h"
void testWorkerPool();
int main(int argc, char** argv){
// 測試函數
testWorkerPool();
}
void testWorkerPool(){
// 協程調度框架類型別名定義
typedef cbricks::pool::WorkerPool workerPool;
// 信號量類型別名定義
typedef cbricks::sync::Semaphore semaphore;
// 初始化協程調度框架,設置併發的 threads 數量爲 8
workerPool::ptr workerPoolPtr(new workerPool(8));
// 初始化一個原子計數器
std::atomic<int> cnt{0};
// 初始化一個信號量實例
semaphore sem;
// 投遞 10000 個異步任務到協程調度框架中,執行邏輯就是對 cnt 加 1
for(int i =0; i <10000; i++){
// 執行 submit 方法,將任務提交到協程調度框架中
workerPoolPtr->submit([&cnt,&sem](){
cnt++;
sem.notify();
});
}
// 通過信號量等待 10000 個異步任務執行完成
for(int i =0; i <10000; i++){
sem.wait();
}
// 輸出 cnt 結果(預期結果爲 10000)
std::cout << cnt << std::endl;
}
3 架構設計
瞭解完使用方式後,隨後就來揭曉其底層實現原理. 本着由總到分的學習指導綱領,本章我們從全局視角縱覽 workerpool 的設計實現架構.
3.1 整體架構與核心概念
cbricks 協程調度架構
workerpool 自下而上,由粗到細可以分爲如下層級概念:
-
• 線程池 threadPool:
workerpool初始化時就啓動指定數量的常駐線程 thread 實例. 這些 thread 數量固定不變,並且會持續運行,直到整個 workerpool 被析構爲止. 由這些 thread 組成的集合,我們稱爲線程池 threadPool. -
• 線程 thread:持續運營的
thread 單元,不斷執行着調度邏輯,依次嘗試從本地任務隊列 taskq、本地協程隊列 sched_q中獲取任務 task/協程 coroutine進行調度. 如果前兩者都空閒,則 thread 會仿照 gmp 中的workstealing機制,從其他 thread 的 taskq 中竊取 task 過來執行. 最後 steal 後仍缺少 task 供執行調度,則會利用channel的機制,使thread陷入阻塞,從而讓出 cpu 執行權 -
• 任務 task:用戶提交的
異步任務(對應爲void() 閉包函數類型). task 會被均勻分配到特定 thread 的 taskq中,但還存在被其他 thread 竊取的可能性,因此 task 本質上還是能夠跨 thread 傳遞使用的 -
• 協程 coroutine:在 workerpool 中,thread 不會直接執行 task,而是會爲 task
一對一構建出coroutine 實例,並切換至 coroutine 中完成對 task 的執行. coroutine 被創建出來後,會完成棧 stack 的初始化和分配,隨後 coroutine 就固定屬於一個 thread 了,終生不可再被其他 thread 染指 -
• 線程本地任務隊列 taskq:每個
thread 私有的緩存 task 的隊列,底層由併發安全的通信隊列channel實現. 當一筆 task 被投遞到 workerpool 時,會基於負載均衡策略投遞到特定 thread 的 taskq中,接下來會被該 thread 優先調度執行 -
• 線程本地協程隊列 schedq:每個
thread 私有的緩存 coroutine 的隊列,底層由普通隊列queue實現,但屬於線程本地變量thread_local,因此也是併發安全的. 當一個coroutine因主動讓渡 sched操作而暫停執行時,會將其暫存到schedq中,等待後續再找時機完成該 coroutine 的調度工作.
3.2 相比 gmp 的不足之處
我在實現 workerpool 時,一定程度上仿照了 gmp 的風格,包括 thread 本地任務隊列 taskq 的實現以及 workstealing 機制的設計.
cbricks 協程調度框架的不足之處
然而受限於我的個人水平以及語言層面的風格差異,相較於 gmp,workerpool 還存在幾個明顯的缺陷:
-
• coroutine 與 thread 強綁定:當一個 coroutine 被初始化時,我使用的是 c 語言中
ucontext.h完成stack 的分配,這樣 coroutine stack 就是thread 私有的,因此 coroutine 不能做到跨 thread 調度. -
• thread 級阻塞粒度:
c++中,併發工具因此的阻塞行爲都是以 thread 爲單位. 以互斥鎖 lock 爲例,哪怕觸發加鎖阻塞行爲的對象是 coroutine,但最終還是會引起整個 thread 對象陷入阻塞,從而導致 thread 下的其他已分配好的 coroutine 也無法得到執行.要解決這一問題,就必須連帶着對 lock、cond、semaphore 等工具進行改造,使得其能夠支持 coroutine 粒度的阻塞操作,這樣的成本無疑很高,本項目未予以實踐.
4 頭文件源碼
從第 4 章開始,我們正式進入源碼解析環節. 首先給出關於 workerpool 頭文件的完整代碼展示,包含其中的成員屬性以及公私方法定義. 下面的示意圖以及源碼中給出的註釋相對比較完備,在此不再贅述:
workerpool 類定義
代碼位於 ./pool/workerpool.h:
// 保證頭文件內容不被重複編譯
#pragma once
/**
依賴的標準庫頭文件
*/
// 標準庫智能指針相關
#include <memory>
// 標準庫函數編程相關
#include <functional>
// 標準庫原子量相關
#include <atomic>
// 標準庫——動態數組,以此作爲線程池的載體
#include <vector>
/**
依賴的項目內部頭文件
*/
// 線程 thread 實現
#include "../sync/thread.h"
// 協程 coroutine 實現
#include "../sync/coroutine.h"
// 阻塞隊列 channel 實現 (一定程度上仿 golang channel 風格)
#include "../sync/channel.h"
// 信號量 semaphore 實現
#include "../sync/sem.h"
// 拷貝禁用工具,用於保證類實例無法被值拷貝和值傳遞
#include "../base/nocopy.h"
// 命名空間 cbricks::pool
namespace cbricks{namespace pool{
// 協程調度池 繼承 Noncopyable 保證禁用值拷貝和值傳遞功能
classWorkerPool: base::Noncopyable{
public:
// 協程池共享指針類型別名
typedef std::shared_ptr<WorkerPool> ptr;
// 一筆需要執行的任務
typedef std::function<void()> task;
// 一個線程持有的本地任務隊列
typedef sync::Channel<task> localq;
// 本地任務隊列指針別名
typedef localq::ptr localqPtr;
// 線程指針別名
typedef sync::Thread* threadPtr;
// 一個分配了運行任務的協程
typedef sync::Coroutine worker;
// 協程智能指針別名
typedef sync::Coroutine::ptr workerPtr;
// 讀寫鎖別名
typedef sync::RWLock rwlock;
// 信號量類型別名
typedef sync::Semaphore semaphore;
public:
/**
構造/析構函數
*/
// 構造函數 threads——使用的線程個數. 默認爲 8 個
WorkerPool(size_t threads =8);
// 析構函數
~WorkerPool();
public:
/**
公開方法
*/
/**
* submit: 向協程調度池中提交一個任務 (仿 golang 協程池 ants 風格)
- task 會被隨機分配到線程手中,保證負載均衡
- task 被一個線程取到之後,會創建對應協程實例,爲其分配本地棧,此時該任務和協程就固定屬於一個線程了
* param:task——提交的任務 nonblock——是否爲阻塞模式
- 阻塞模式:線程本地任務隊列滿時阻塞等待
- 非阻塞模式:線程本地隊列滿時直接返回 false
* response:true——提交成功 false——提交失敗
*/
bool submit(task task, bool nonblock = false);
// 工作協程調度任務過程中,可以通過執行次方法主動讓出線程的調度權 (仿 golang runtime.Goched 風格)
void sched();
private:
/**
* thread——workerPool 中封裝的線程類
* - index:線程在線程池中的 index
* - thr:真正的線程實例,類型爲 sync/thread.h 中的 Thread
* - taskq:線程的本地任務隊列,其中數據類型爲閉包函數 void()
* - lock:一把線程實例粒度的讀寫鎖. 用於隔離 submit 操作和 workstealing 操作,避免因任務隊列阻塞導致死鎖
*/
structthread{
typedef std::shared_ptr<thread> ptr;
int index;
threadPtr thr;
localqPtr taskq;
rwlock lock;
/**
* 構造函數
* param: index: 線程在線程池中的 index; thr: 底層真正的線程實例; taskq:線程持有的本地任務隊列
*/
thread(int index,threadPtr thr, localqPtr taskq):index(index),thr(thr),taskq(taskq){}
~thread()=default;
};
private:
/**
私有方法
*/
// work:線程運行主函數,持續不斷地從本地任務隊列 taskq 或本地協程隊列 t_schedq 中獲取任務/協程進行調度. 倘若本地任務爲空,會嘗試從其他線程本地任務隊列竊取任務執行
void work();
/**
* readAndGo:從指定的任務隊列中獲取任務並執行
* param:taskq——指定的任務隊列 nonblock——是否爲阻塞模式
* reponse:true——成功 false——失敗
*/
bool readAndGo(localqPtr taskq, bool nonblock);
/**
* goTask: 爲一筆任務創建一個協程實例,並調度該任務函數
* param: cb——待執行任務
* tip:如果該任務未一次性執行完成(途中使用了 sched 方法),則會在棧中封存好任務的執行信息,然後將該協程實例追加到線程本地的協程隊列 t_schedq 中,等待後續再被線程調度
*/
void goTask(task cb);
/**
* goWorker:調度某個協程實例,其中已經分配好執行的任務函數
* param: worker——分配好執行任務函數的協程實例
* tip:如果該任務未一次性執行完成(途中使用了 sched 方法),則會在棧中封存好任務的執行信息,然後將該協程實例追加到線程本地的協程隊列 t_schedq 中,等待後續再被線程調度
*/
void goWorker(workerPtr worker);
/**
* workStealing:當其他線程任務隊列 taskq 中竊取半數任務填充到本地隊列
*/
void workStealing();
/**
* workStealing 重載:從線程 stealFrom 的任務隊列中竊取半數任務填充到線程 stealTo 本地隊列
*/
void workStealing(thread::ptr stealTo, thread::ptr stealFrom);
/**
* getStealingTarget:隨機獲取一個線程作爲竊取目標
*/
thread::ptr getStealingTarget();
/**
* getThreadByThreadName 通過線程名稱獲取對應的線程實例
*/
thread::ptr getThreadByThreadName(std::string threadName);
/**
* getThread 獲取當前線程實例
*/
thread::ptr getThread();
private:
/**
* 靜態私有方法
*/
// getThreadNameByIndex:通過線程 index 映射得到線程名稱
static const std::string getThreadNameByIndex(int index);
// getThreadIndex:獲取當前線程的 index
static const int getThreadIndex();
// getThreadName:獲取當前線程的名稱
static const std::string getThreadName();
private:
/**
* 私有成員屬性
*/
// 基於 vector 實現的線程池,元素類型爲 WorkerPool::thread 對應共享指針
std::vector<thread::ptr> m_threadPool;
// 基於原子變量標識 workerPool 是否已關閉
std::atomic<bool> m_closed{false};
};
}}
5 核心實現源碼
接下來針對 workerpool 中的核心流程進行詳細的源碼走讀,有關 workerpool 具體實現代碼位於 ./pool/workerpool.cpp 中.
5.1 依賴的頭文件與變量
依賴的外部變量
首先涉及到兩個核心變量的定義:
-
• 全局變量 s_taskId:
全局單調遞增的原子計數器,爲每個到來的 task 分配全局唯一 task id,並依據此 id 明確 task 應該指派給哪個 thread -
• 線程本地變量(thread_local) t_schedq:
線程私有的協程隊列. 運行過程因主動讓渡而暫停的 coroutine,會被暫存到其中,等待後續被相同的 thread繼續調度執行.
// 標準庫隊列實現. 依賴隊列作爲線程本地協程隊列的存儲載體
#include <queue>
// workerpool 頭文件
#include "workerpool.h"
// 本項目定義的斷言頭文件
#include "../trace/assert.h"
// namespace cbricks::pool
namespace cbricks{namespace pool{
/**
* 全局變量 s_taskId:用於分配任務 id 的全局遞增計數器,通過原子變量保證併發安全
* 每個任務函數會根據分配到的 id,被均勻地分發給各個線程,以此實現負載均衡
*/
static std::atomic<int> s_taskId{0};
/**
* 線程本地變量 t_schedq:線程私有的協程隊列
* 當線程下某個協程沒有一次性將任務執行完成時(任務調用了 sched 讓渡函數),則該協程會被暫存於此隊列中,等待後續被相同的線程繼續調度
*/
staticthread_local std::queue<WorkerPool::workerPtr> t_schedq;
// ...
}}
5.2 構造函數與析構函數
workerpool 構造函數
下面介紹workerpool 的構造函數,其任務很明確,就是初始化好指定數量的 thread,爲其分配好對應的 taskq,並將 thread 一一投遞進入到線程池 threadPool 中.
此處值得一提的是,thread 啓動後異步運行的方法是 WorkerPool::work,其中會涉及到從 threadPool 中取出當前 thread 實例的操作,因此這裏需要通過信號量 semaphore 保證 thread 實例先被投遞進入 threadPool 後,對應 WorkerPool::work 方法才能被放行.
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
* workerpool 構造函數:
* - 初始化好各個線程實例 thread
* - 將各 thread 添加到線程池 m_threadPool 中
*/
WorkerPool::WorkerPool(size_t threads){
CBRICKS_ASSERT(threads >0,"worker pool init with nonpositive threads num");
// 爲線程池預留好對應的容量
this->m_threadPool.reserve(threads);
/**
* 構造好對應於每個 thread 的信號量
* 這是爲了保證 thread 實例先被添加進入 m_threadPool,thread 中的調度函數才能往下執行
* 這樣做是因爲 thread 調度函數有依賴於從 m_threadPool 獲取自身實例的操作
*/
std::vector<semaphore> sems(threads);
// 另一個信號量,用於保證所有 thread 調度函數都正常啓動後,當前構造函數才能退出,避免 sems 被提前析構
semaphore waitGroup;
// 初始化好對應數量的 thread 實例並添加進入 m_threadPool
for(int i =0; i < threads; i++){
// 根據 index 映射得到 thread 名稱
std::string threadName =WorkerPool::getThreadNameByIndex(i);
// 將 thread 實例添加進入 m_threadPool
this->m_threadPool.push_back(thread::ptr(
// thread 實例初始化
newthread(
i,
//
new sync::Thread([this,&sems,&waitGroup](){
/**
* 此處 wait 操作是需要等待對應 thread 實例先被推送進入 m_threadPool
* 因爲一旦後續的 work 函數運行,就會涉及從 m_threadPool 中獲取 thread 實例的操作
* 因此先後順序不能顛倒
*/
sems[getThreadIndex()].wait();
/**
* 此處 notify 操作是配合外層的 waitGroup.wait 操作
* 保證所有 thread 都正常啓動後,workerPool 構造函數才能退出
* 這是爲了防止 sems 被提前析構
*/
waitGroup.notify();
// 異步啓動的 thread,最終運行的調度函數是 workerpool::work
this->work();
},
// 注入 thread 名稱,與 index 有映射關係
threadName),
// 分配給 thread 的本地任務隊列
localqPtr(new localq))));
/**
* 在 thread 實例被推送入 m_threadPool 後進行 notify
* 這樣 thread 調度函數纔會被向下放行
*/
sems[i].notify();
}
/**
* 等待所有 thread 實例正常啓動後,構造函數再退出
*/
for(int i =0; i < threads; i++){
waitGroup.wait();
}
}
在析構函數中,要做的處理是將 workerpool 關閉標識 m_closed 置爲 true,並且一一關閉所有 thread 下的 taskq ,這樣運行中的 thread 在感知到這一信息後都會主動退出.
// 析構函數
WorkerPool::~WorkerPool(){
// 將 workpool 的關閉標識置爲 true,後續運行中的線程感知到此標識後會主動退出
this->m_closed.store(true);
// 等待所有線程都退出後,再退出 workpool 的析構函數
for(int i =0; i <this->m_threadPool.size(); i++){
// 關閉各 thread 的本地任務隊列
this->m_threadPool[i]->taskq->close();
// 等待各 thread 退出
this->m_threadPool[i]->thr->join();
}
}
// ...
}}
5.3 公有方法:提交任務
workerpool 提交任務流程
用戶通過 submit 方法,能夠將 task 提交到 workerpool 中. 在 submit 流程中:
-
• 首先,爲 task 分配
全局唯一的taskId. -
• 然後,對
threadPool 長度取模後,找到 task從屬的 thread. -
• 接下來,將
task 投遞到該 thread 的taskq中即可.
這裏需要注意的是,在投遞任務到 thread 的 taskq 前,需要先加上該 thread 的讀鎖 readlock. 這是爲了和該 thread 下可能正在執行的 workStealing 操作進行互斥,避免因 taskq 空間不足而導致死鎖問題. 這個點在竊取流程的講解中詳細展開.
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
* submit: 提交一個任務到協程調度池中,任務以閉包函數 void() 的形式組裝
* - 爲任務分配全局遞增且唯一的 taskId
* - 根據 taskId 將任務均勻分發給指定 thread
* - 將任務寫入到指定 thread 的本地任務隊列中
*/
bool WorkerPool::submit(task task, bool nonblock){
// 若 workerpool 已關閉,則提交失敗
if(this->m_closed.load()){
returnfalse;
}
// 基於任務 id 對 m_threadPool 長度取模,將任務映射到指定 thread
int targetThreadId =(s_taskId++)%(this->m_threadPool.size());
thread::ptr targetThr =this->m_threadPool[targetThreadId];
// 針對目標 thread 加讀鎖,這是爲了防止和目標 thread 的 workstealing 操作併發最終因任務隊列 taskq 容量溢出而導致死鎖
rwlock::readLockGuard guard(targetThr->lock);
// 往對應 thread 的本地任務隊列中寫入任務
return targetThr->taskq->write(task, nonblock);
}
// ...
}}
5.4 公有方法:讓渡執行權
workerpool 協程讓渡流程
task 在運行過程中,可以通過調用 workerpool::sched 方法完成執行權的主動讓渡. 此時 task 對應 coroutine 會暫停運行,並將執行權切換回到 thread 主函數中,然後 thread 會將該 coroutine 暫存到本地協程隊列 schedq 中,等待後續再對其調度執行.
// namespace cbricks::pool
namespace cbricks{ namespace pool{
// ...
// sched:讓渡函數. 在任務執行過程中,可以通過該方法主動讓出線程的執行權,則此時任務所屬的協程會被添加到 thread 的本地協程隊列 t_schedq 中,等待後續再被調度執行
void WorkerPool::sched(){
worker::GetThis()->sched();
}
// ...
}}
5.5 線程調度任務主流程
workerpool 線程調度主流程
workerpool::work 方法是各 thread 循環運行的主函數,其中包含了 thread 調度 task 和 coroutine 的核心邏輯:
-
• 調度優先級一:從 thread 的本地任務隊列
taskq中獲取 task 並調度執行 -
• 調度優先級二:當
taskq 爲空或者連續獲取 10 次 taskq後(爲避免 schedq 產生飢餓),會主動獲取一次本地協程隊列 schedq中的 coroutine 進行調度 -
• 調度優先級三:如果
taskq和schedq都是空的,則進入workstealing流程,嘗試從其他 thread taskq中竊取半數 taskq填充到當前 thread taskq 中 -
• 必要性阻塞:如果經歷完上述流程,仍沒有合適的目標供 thread 調度,則 thread 會依賴
channel 的阻塞消費能力陷入阻塞,從而讓出 cpu 執行權,避免資源浪費
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
* work: 線程運行的主函數
* 1) 獲取需要調度的協程(下述任意步驟執行成功,則跳到步驟 2))
* - 從本地任務隊列 taskq 中取任務,獲取成功則爲之初始化協程實例
* - 從本地協程隊列 schedq 中取協程
* - 從其他線程的任務隊列 taskq 中偷取一半任務到本地任務隊列
* 2) 調度協程執行任務
* 3) 針對主動讓渡而退出的協程,添加到本地協程隊列
* 4) 循環上述流程
*/
void WorkerPool::work(){
// 獲取到當前 thread 對應的本地任務隊列 taskq
localqPtr taskq =this->getThread()->taskq;
// main loop
while(true){
// 如果 workerpool 已關閉 則主動退出
if(this->m_closed.load()){
return;
}
/**
* 執行優先級爲 本地任務隊列 taskq -> 本地協程隊列 t_t_schedq -> 竊取其他線程任務隊列 other_taskq
* 爲防止飢餓,至多調度 10 次的 taskq 後,必須嘗試處理一次 t_schedq
*/
// 標識本地任務隊列 taskq 是否爲空
bool taskqEmpty =false;
// 至多調度 10 次本地任務隊列 taskq
for(int i =0; i <10; i++){
// 以【非阻塞模式】從 taskq 獲取任務併爲之分配協程實例和調度執行
if(!this->readAndGo(taskq,false)){
// 如果 taskq 爲空,將 taskqEmpty 置爲 true 並直接退出循環
taskqEmpty =true;
break;
}
}
// 嘗試從線程本地的協程隊列 t_schedq 中獲取協程並進行調度
if(!t_schedq.empty()){
// 從協程隊列中取出頭部的協程實例
workerPtr worker = t_schedq.front();
t_schedq.pop();
// 進行協程調度
this->goWorker(worker);
// 處理完成後直接進入下一輪循環
continue;
}
// 如果未發現 taskq 爲空,則無需 workstealing,直接進入下一輪循環
if(!taskqEmpty){
continue;
}
/**
* 走到這裏意味着 taskq 和 schedq 都是空的,則要嘗試發起竊取操作
* 隨機選擇一個目標線程竊取半數任務添加到本地隊列中
*/
this->workStealing();
/**
* 以【阻塞模式】嘗試從本地任務獲取任務並調度執行
* 若此時仍沒有可調度的任務,則當前 thread 陷入阻塞,讓出 cpu 執行權
* 直到有新任務分配給當前 thread 時,thread 纔會被喚醒
*/
this->readAndGo(taskq,true);
}
}
// ...
}}
workerpool 單個任務處理流程
以 readAndGo 方法爲入口,thread 會嘗試從 taskq 中獲取一筆 task;獲取到後,會爲 task 構建一一對應的 coroutine 實例(至此 task/coroutine 與 thread 完全綁定),然後通過 coroutine::go 方法,將 thread 執行權切換至 coroutine 手中,由 coroutine 執行其中的 task. 只有在 task 執行結束或者主動讓渡時,執行權纔會返還到 thread 主函數中,此時 thread 會判斷 coroutine 是否是因爲主動讓渡而暫停執行,如果是的話,則會將該 coroutine 實例追加到 schedq 中,等待後續尋找合適時機再作調度執行.
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
* readAndGo:
* - 從指定任務隊列中獲取一個任務
* - 爲之分配協程實例並調度執行
* - 若協程實例未一次性執行完成(執行了讓渡 sched),則將協程添加到線程本地的協程隊列 schedq 中
* param:taskq——任務隊列;nonblock——是否以非阻塞模式從任務隊列中獲取任務
* response:true——成功;false,失敗(任務隊列 taskq 爲空)
*/
// 將一個任務包裝成協程並進行調度. 如果沒有一次性調度完成,則將協程實例添加到線程本地的協程隊列 t_schedq
bool WorkerPool::readAndGo(cbricks::pool::WorkerPool::localqPtr taskq, bool nonblock){
// 任務容器
task cb;
// 從 taskq 中獲取任務
if(!taskq->read(cb,nonblock)){
returnfalse;
}
// 對任務進行調度
this->goTask(cb);
returntrue;
}
/**
* goTask
* - 爲指定任務分配協程實例
* - 執行協程
* - 若協程實例未一次性執行完成(執行了讓渡 sched),則將協程添加到線程本地的協程隊列 schedq 中
* param:cb——待執行的任務
*/
void WorkerPool::goTask(task cb){
// 初始化協程實例
workerPtr _worker(newworker(cb));
// 調度協程
this->goWorker(_worker);
}
/**
* goWorker
* - 執行協程
* - 若協程實例未一次性執行完成(執行了讓渡 sched),則將協程添加到線程本地的協程隊列 schedq 中
* param:worker——待運行的協程
*/
void WorkerPool::goWorker(workerPtr worker){
// 調度協程,此時線程的執行權會切換進入到協程對應的方法棧中
worker->go();
// 走到此處意味着線程執行權已經從協程切換回來
// 如果此時協程並非已完成的狀態,則需要將其添加到線程本地的協程隊列 schedq 中,等待後續繼續調度
if(worker->getState()!= sync::Coroutine::Dead){
t_schedq.push(worker);
}
}
// ...
}}
5.6 任務竊取流程
workerpool 跨線程任務竊取流程
當 thread 發現 taskq 和 schedq 都空閒時,則會嘗試執行竊取操作. 此時 thread 隨機選取另一個 thread 作爲竊取目標,竊取其 taskq 中的半數 task,追加到本地 taskq 中.
在執行竊取操作的過程中,需要對當前 thread 加寫鎖,以避免發生死鎖問題:
比如在竊取前,當前 thread 判定自己的 taskq 還有足夠空間用於承載竊取來的 task;但是此期間若有新的任務 submit 到來,則可能把 taskq 的空間佔據,最後導致沒有足夠容量承載竊取到的 task,最終導致 thread 調度流程 hang 死在 workstealing 流程無法退出.
上述問題的解法就是,在竊取前,先加 thread 寫鎖(這樣併發到來的 submit 操作就無法完成 task 投遞)然後再檢查一遍 taskq 並確認容量充足後,再發起實際的竊取操作.
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
// 從某個 thread 中竊取一半任務給到本 thread 的 taskq
void WorkerPool::workStealing(){
// 選擇一個竊取的目標 thread
thread::ptr stealFrom =this->getStealingTarget();
if(!stealFrom){
return;
}
// 從目標 thread 中竊取半數任務添加到本 thread taskq 中
this->workStealing(this->getThread(),stealFrom);
}
// 從 thread:stealFrom 中竊取半數任務給到 thread:stealTo
void WorkerPool::workStealing(thread::ptr stealTo, thread::ptr stealFrom){
// 確定竊取任務數量:目標本地任務隊列 taskq 中任務總數的一半
int stealNum = stealFrom->taskq->size()/2;
if(stealNum <=0){
return;
}
// 針對 thread:stealTo 加寫鎖,防止因 workstealing 和 submit 行爲併發,導致線程因 taskq 容量溢出而發生死鎖
rwlock::lockGuard guard(stealTo->lock);
// 檢查此時 stealTo 中的 taskq 如果剩餘容量已不足以承載擬竊取的任務量,則直接退出
if(stealTo->taskq->size()+ stealNum > stealTo->taskq->cap()){
return;
}
// 創建任務容器,以非阻塞模式從 stealFrom 的 taskq 中竊取指定數量的任務
std::vector<task> containers(stealNum);
if(!stealFrom->taskq->readN(containers,true)){
return;
}
// 將竊取到的任務添加到 stealTo 的 taskq 中
stealTo->taskq->writeN(containers,false);
}
// 隨機選擇本 thread 外的一個 thread 作爲竊取的目標
WorkerPool::thread::ptr WorkerPool::getStealingTarget(){
// 如果線程池長度不足 2,直接返回
if(this->m_threadPool.size()<2){
returnnullptr;
}
// 通過隨機數,獲取本 thread 之外的一個目標 thread index
int threadIndex =WorkerPool::getThreadIndex();
int targetIndex =rand()%this->m_threadPool.size();
while( targetIndex == threadIndex){
targetIndex =rand()%this->m_threadPool.size();
}
// 返回目標 thread
returnthis->m_threadPool[targetIndex];
}
// ...
}}
6 總結
祝賀,至此本文結束. 本篇和大家探討了,如何基於 c++ 從零到一實現一個協程調度框架,其核心功能包括:
-
• 創建指定數量線程持續複用,調度後續到來的任務
-
• 以閉包函數的風格提交任務到框架中,由異步協程完成執行
-
• 任務運行過程中支持通過主動讓渡操作讓出調度執行權
-
• 支持線程間的任務竊取操作,使得各調度線程間忙閒有度、負載均衡
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/cWBRZ9Zha8qN_A9C_epn1A