Linux C-- 協程池原理分析及代碼實現

導語


本文介紹了協程的作用、結構、原理,並使用 C++ 和彙編實現了 64 位系統下的協程池。文章內容避免了協程晦澀難懂的部分,用大量圖文來分析原理,適合新手閱讀學習。

  1. Web 服務器問題

現代分佈式 Web 後臺服務邏輯通常由一系列 RPC 請求組成,若串行則耗時比較長。

此時一般都會使用線程池並行運行 RPC 請求,如圖中 GetData 函數

假設請求數據包不大,那麼可假設 GetData 耗時組成如下圖所示。在非阻塞讀情況下,CPU 將在 Wait 環節空轉浪費資源 (不斷地 read,得到返回碼 - 1)。

  1. 協程的引入

有沒有辦法只用一個線程並行執行 GetData 呢?答案是:可以!我們假設有 3 個並行的 GetData 任務,下圖線程 1 通過跳轉控制流,減少 CPU 資源浪費。執行流爲①~⑦,在 Wait 階段則跳到其他任務如①~⑤。運行結束後也跳到其他任務如⑥~⑦。通過這種方式,3 個 GetData 能用一個線程以 52ms 的耗時並行執行。

如果 GetData 任務可以被這樣分配,則可以減少線程切換的消耗。因爲協程的調度是線程內用戶態執行的,CPU 消耗非常小。

  1. 協程的原理

從上文可知,協程之間的切換本質是函數的跳轉,即如何讓正在執行的函數跳轉到另一個新的函數上,以及下次如何又跳轉回來。如下面代碼所示:

void func1() {
   printf("① 跳轉到func2");
   Coroutine::CoYield(); // 通過該函數跳到func2
   printf("③ func2跳轉回func1");
}

void func2() {
   printf("② func2執行完畢");
}

要實現這種能力,需要結合彙編知識。首先研究如下簡單函數的彙編語言

#include <iostream>
using namespace std;

class Object {
public:    
   int val[12];
};

int func(Object *pObj1, Object *pObj2) {     
   pObj1->val[0] = 1;
   pObj1->val[11] = 11;
   pObj2->val[0] = 2;
   pObj2->val[11] = 12;
   int arr[100];
   arr[0] = 3;
   arr[99] = 99;
   return pObj1->val[0];
}

int main() {
   Object obj, obj2;
   int a = func(&obj, &obj2);
   return a;
}

下面看看在 64 位系統彙編中,func 函數是如何執行的。push %rbp 是進入 func 函數執行的第一個指令,作用是把 rbp 的地址壓到棧頂。因爲 rsp 始終指向棧頂,所以壓棧後,rsp 的地址下移 8 字節。rdi 和 rsi 相差 48 個字節,該空間被 class Object 內的 int val[12] 佔用。

前兩個指令讓 rbp 指向 rsp 往下 296 字節的位置。後面兩個指令把 rdi 和 rsi 地址保存在最下面。

爲什麼 rsp 下移 296 字節?首先,上述代碼使用了臨時變量 int arr[100],需要有 400 個字節的棧空間;其次,x64 系統存有 128 字節的紅色區域可使用;最後,rdi 和 rsi 地址共佔 16 字節。因此,rbp 到紅色區域底部的空間一共是 288 + 8 + 104 + 8 + 8 = 416 字節。接下來纔開始執行 func 函數第一行代碼,給 val[0] 賦值。

然後分別給 pObj1 和 pObj2 的成員變量賦值

接下來給臨時變量 arr 賦值

最後讓 eax 指向返回值,恢復函數棧的棧底和棧頂。

  1. 協程的結構

從前面我們知道,每個函數在內存中都有棧頂 rsp 和棧底 rbp。這兩個值決定了函數可操作的內存範圍,如下圖所示

既然協程切換是從一個函數切換到另一個函數,那麼就需要知道兩個函數的 rbp 和 rsp。然而,函數的 rbp 和 rsp 是執行時設定的,代碼層面難以獲得。既然如此,我們可以實現騰出空間,讓函數在預期的 rbp 和 rsp 內。定義一個類如下:

class Coroutine {
    void* m_pRegister[14];
    char m_pStack[1024];
    std::function<void()> m_func;
};

那麼在內存模型中,該類的佈局如下所示

這樣的協程在能被使用前需要做初始化,如下圖所示

在其他協程切換過來時,cpu 寄存器可按 m_pRegister 預設的地址賦值,開始執行 DoWork 函數,函數代碼如下:

static void Coroutine::DoWork(Coroutine *pThis) {
    pThis->m_func();
    pThis->Yield(); // 轉讓控制流給同線程的其他協程
}

由於是靜態函數,需令參數 pThis 爲協程地址。所以,初始化時需要設置 m_pRegister 中的 rdi 爲 this。上述第二行代碼執行時,rbp 會設爲 this。所以執行 m_func 時,如下圖所示:

  1. 協程間的切換

下面以 Coroutine1 切換到 Coroutine2 爲例。主要分爲兩步:1. 保存 Coroutine1 的上下文

  1. 加載 Coroutine2 的上下文

切換代碼可見源代碼 Coroutine::Switch

  1. 協程池的實現

本文實現協程池比較簡單,初始化創建線程並設置 thread_local 變量以保存協程隊列狀態。並且,每個線程額外創建一個 main 協程用作 Guard。在執行時,每個線程通過輪詢的方式切換協程,若協程無任務則嘗試 CAS 獲取 Job,否則直接執行已有 Job。當 Job 執行完或主動 CoYield 時,切換到下一個協程。爲了避免 CAS 空轉,在沒有任務時會阻塞休眠。當任務來臨時則 Notify 所有線程的協程。

  1. 源代碼

example.cpp

/**
 * @file example.cpp
 * @author souma
 * @brief 使用協程池的示例,編譯命令如下
 * g++ example.cpp coroutine.cpp -lpthread -O3
 * @version 0.1
 * @date 2023-06-06
 * 
 * @copyright Copyright (c) 2023
 * 
 */
#include <iostream>
#include <array>
#include "coroutine.h"

using namespace std;
using namespace comm;

void func(const string &sTaskName, uint32_t uWaitSeconds) {
    printf("[%ld] [%s start], wait seconds[%u]\n", time(nullptr), sTaskName.c_str(), uWaitSeconds);
    time_t iStartSec = time(nullptr);
    // 默認可用65535字節的棧內存,具體可看CO_STACK_SIZE
    uint32_t uArrSize = 65535/4;
    int arr[uArrSize];
    while (time(nullptr) - iStartSec < uWaitSeconds) {
        // 操作棧內存
        for (int i = 0; i < uArrSize; ++i) {
            arr[i] = i;
        }

        // 切換控制流
        printf("[%ld] [%s] -> [協程池]\n", time(nullptr), sTaskName.c_str());
        usleep(100);
        Coroutine::CoYield(); // 只需這一個函數即可切換控制流
        printf("[%ld] [協程池] -> [%s]\n", time(nullptr), sTaskName.c_str());
    }

    // 檢查棧內存是否正確
    for (int i = 0; i < uArrSize; ++i) {
        if (arr[i] != i) {
            printf("棧內存錯誤\n");
            exit(-1);
        }
    }
    printf("[%ld] [%s end], expect_timecost[%d], real_timecost[%ld]\n", time(nullptr), sTaskName.c_str(), uWaitSeconds, time(nullptr) - iStartSec);
}

int main() {
    // 如果想當線程池用,可以令第一個參數爲線程數,第二個參數爲1。
    // 在該場景下,使用小線程大協程不僅CPU消耗低,整體耗時也很低,可以自行測試。
    CoroutinePool oPool(2, 300);
    oPool.Run();

    time_t iStartTime = time(nullptr);
    const int iTaskCnt = 400;
    vector<shared_ptr<Future>> vecFuture;
    for (int i = 0; i < iTaskCnt; ++i) {
        // 模擬GetData中的Wait環節, 1 ~ 5秒等待
        shared_ptr<Future> pFuture = oPool.Submit([i](){func("Task" + to_string(i), random() % 5 + 1);});
        if (pFuture != nullptr) {
            vecFuture.emplace_back(pFuture);
        }
    }
    
    // 阻塞等待所有Task完成
    for (auto it = vecFuture.begin(); it != vecFuture.end(); ++it) {
        (*it)->Get();
    }

    printf("demo's finished, time cost[%ld]\n", time(nullptr) - iStartTime);
    return 0;
}

coroutine.h

/**
 * @file coroutine.h
 * @author souma
 * @brief 多線程無棧式協程池,請不要用-O0編譯否則會產生coredump
 * @version 0.1
 * @date 2023-06-06
 * 
 * @copyright Copyright (c) 2023
 * 
 */
#pragma once
#include <functional>
#include <memory>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <signal.h>
#include <pthread.h>
#include <condition_variable>
#include <unistd.h>

namespace comm {
    class Future;
    class CoroutinePool;
    class Coroutine;
    struct CoroutinePoolCtx;
    struct CoroutineTaskCtx;


    struct CoroutinePoolCtx {
        std::vector<std::shared_ptr<Coroutine>> m_vecCoroutine;
        std::shared_ptr<Coroutine> m_pMainCoroutine;
        uint32_t m_uCursor;
        uint32_t m_uWorkCnt;
    };

    struct CoroutineTaskCtx {
        std::function<void()> m_userFunc;
        std::shared_ptr<Future> m_pFuture;
    };

    // class ArraySyncQueue start
    template <class T>
    class ArraySyncQueue {
    public:
        ArraySyncQueue(uint32_t uCapacity, uint32_t uSleepUs = 100, uint32_t uRetryTimes = 3);
        bool Push(T *pObj);
        T* Pop();
        inline bool IsFull() const { return m_uPushCursor == m_uPopCursor - 1 || (m_uPopCursor == 0 && m_uPushCursor == m_vecQueue.size() - 1); }
        bool IsEmpty() const { return m_uPopCursor == m_uPushCursor; }

        ~ArraySyncQueue();

    private:
        uint32_t GetNextCursor(uint32_t uCursor);
    private:
        std::vector<T*> m_vecQueue;
        uint32_t m_uPushCursor = 0;
        uint32_t m_uPopCursor = 0;
        uint32_t m_uSleepUs;
        uint32_t m_uRetryTimes;
    };
    // class ArraySyncQueue end

    // class Coroutine start
    class Coroutine {
    public:
        
        friend class CoroutinePool;

        /**
         * @brief 調用該函數將執行流交給其他協程,僅在協程池環境下有效
         * 
         * @return true:協程切換成功, false:不在協程池環境中運行
         */
        static bool CoYield();
        
        Coroutine(const Coroutine &) = delete;
        Coroutine(Coroutine &&) = delete;
        Coroutine & operator=(const Coroutine &) = delete;
        Coroutine & operator=(Coroutine &&) = delete;

    private:
        // 4096是預留給庫使用的棧內存大小,後者是留給用戶使用的棧內存大小
        constexpr static uint32_t CO_STACK_SIZE = 4096 + 65535; 

        Coroutine();

        /**
         * @brief 當前協程是否綁定了任務
         *
         * @return true:是
         */
        inline bool HasTask() const { return m_pTaskCtx != nullptr; }

        /**
         * @brief 兩個協程切換,從pPrev切換到pNext
         */
        static void Switch(Coroutine *pPrev, Coroutine *pNext);

        /**
         * @brief 將控制流轉給同線程的其他協程
         */
        void Yield();

        /**
         * @brief 這個是給main協程用的
         */
        void Register();

        /**
         * @brief 這個是給執行用戶任務的協程用的
         */
        void Register(std::shared_ptr<CoroutineTaskCtx> pTaskCtx);

        /**
         * @return CoroutinePoolCtx& 當前線程的協程上下文
         */
        static CoroutinePoolCtx & GetCtx();

        /**
         * @brief 讓當前線程的cursor往後移,輪詢協程
         */
        static void MoveCursor();
        
        /**
         * @brief 協程包一層的函數
         */
        static void DoWork(Coroutine *pThis);

        /**
         * 
         * @return void* 獲得自建rsp地址
         */
        void* GetRsp();

        /**
         * 保存寄存器的值到m_pStack中
         */
        void SaveReg();

    private:
        void* m_pRegister[14];
        char m_pStack[CO_STACK_SIZE];
        std::shared_ptr<CoroutineTaskCtx> m_pTaskCtx;
    };
    // class Coroutine end

    // class CoroutinePool start
    class CoroutinePool {
    public:
        friend class Coroutine;
        /**
         * @brief 建立一個多線程協程池,即創建uThreadCnt個線程,每個線程含有uCoroutineCnt個協程
                  調用Run開始運行,調用Stop或直接析構結束
         * @param uThreadCnt 線程數,小於1則爲1
         * @param uCoroutineCnt 每個線程的協程數,小於1則爲1
         * @param uJobQueueSize 總任務隊列大小,小於1則爲1
         */
        CoroutinePool(uint32_t uThreadCnt, uint32_t uCoroutineCnt, uint32_t uJobQueueSize = 1024000);

        /**
         * @brief 線程安全,可重入
         * @return true:正常
         */
        bool Run();

        /**
         * @brief 停止協程池 (會先保證池中任務完成再停止),線程安全可重入
         * 
         */
        void Stop();

        /**
         * @param userFunc 用戶函數
         * @return std::shared_ptr<Future>  nullptr:協程池隊列滿了,提交不了
         */
        std::shared_ptr<Future> Submit(const std::function<void()> &userFunc);

        ~CoroutinePool();
        CoroutinePool(const CoroutinePool &) = delete;
        CoroutinePool(CoroutinePool &&) = delete;
        CoroutinePool & operator=(const CoroutinePool &) = delete;
        CoroutinePool & operator=(CoroutinePool &&) = delete;

    private:
        static void LoopWork(CoroutinePool &oPool);

    private:
        bool m_bStarted;
        uint32_t m_uThreadCnt;
        uint32_t m_uRoutineCnt;
        ArraySyncQueue<CoroutineTaskCtx> m_queueJob;
        std::vector<std::shared_ptr<std::thread>> m_vecThread;
        std::mutex m_oMutex;
        std::condition_variable m_oCondition;
    };
    // class CoroutinePool end

    // class Future start
    class Future {
    public:
        /**
        * @brief 阻塞獲得結果
        * 
        * @param uTimeoutMs 超時時間
        * @return true:成功, false:超時
        */
        bool Get(uint32_t uTimeoutMs = -1);

        /**
        * @brief 設置狀態爲完成
        */
        void SetFinished();

        Future();

        Future(const Future&) = delete;
        Future(Future&&) = delete;

        Future & operator=(const Future&) = delete;
        Future & operator=(Future&&) = delete;

    private:
        std::mutex m_oMutex;
        std::condition_variable m_oCondition;
        bool m_bFinished;
    };
    // class Future end
}

coroutine.cpp

/**
 * @file coroutine.cpp
 * @author souma
 * @brief 協程池的具體實現
 * @version 0.1
 * @date 2023-06-06
 * 
 * @copyright Copyright (c) 2023
 * 
 */

#include "coroutine.h"
#include <cstring>

using namespace std;
namespace comm {

    // class Coroutine start
    Coroutine::Coroutine() {
        m_pTaskCtx = nullptr;
    }

    void Coroutine::Register() {
        m_pTaskCtx = make_shared<CoroutineTaskCtx>();
        m_pTaskCtx->m_userFunc = [](){};
        m_pTaskCtx->m_pFuture = nullptr;
        SaveReg();
    }

    void Coroutine::Register(shared_ptr<CoroutineTaskCtx> pTaskCtx) {
        m_pTaskCtx = pTaskCtx;
        SaveReg();
    }

    inline void Coroutine::Yield() {
        Coroutine::Switch(this, Coroutine::GetCtx().m_pMainCoroutine.get());
    }

    bool Coroutine::CoYield() {
        if (GetCtx().m_vecCoroutine.size() == 0) {
            return false;
        }
        GetCtx().m_vecCoroutine[GetCtx().m_uCursor]->Yield();
        return true;
    }

    CoroutinePoolCtx & Coroutine::GetCtx() {
        thread_local CoroutinePoolCtx coroutinePoolCtx;
        return coroutinePoolCtx;
    }

    void Coroutine::MoveCursor() {
        GetCtx().m_uCursor = GetCtx().m_uCursor == GetCtx().m_vecCoroutine.size() - 1 ? 0 : GetCtx().m_uCursor + 1;
    }

    extern "C" __attribute__((noinline, weak))
    void Coroutine::Switch(Coroutine *pPrev, Coroutine *pNext) {
        // 1.保存pPrev協程的上下文, rdi和pPrev同指向
        // 2.加載pNext協程的上下文, rsi和pNext同指向
        asm volatile(R"(
            movq %rsp, %rax
            movq %rbp, 104(%rdi)
            movq %rax, 96(%rdi)
            movq %rbx, 88(%rdi)
            movq %rcx, 80(%rdi)
            movq %rdx, 72(%rdi)
            movq 0(%rax), %rax
            movq %rax, 64(%rdi)
            movq %rsi, 56(%rdi)
            movq %rdi, 48(%rdi)
            movq %r8, 40(%rdi)
            movq %r9, 32(%rdi)
            movq %r12, 24(%rdi)
            movq %r13, 16(%rdi)
            movq %r14, 8(%rdi)
            movq %r15, (%rdi)

            movq (%rsi), %r15
            movq 8(%rsi), %r14
            movq 16(%rsi), %r13
            movq 24(%rsi), %r12
            movq 32(%rsi), %r9
            movq 40(%rsi), %r8
            movq 48(%rsi), %rdi
            movq 64(%rsi), %rax
            movq 72(%rsi), %rdx
            movq 80(%rsi), %rcx
            movq 88(%rsi), %rbx
            movq 96(%rsi), %rsp
            movq 104(%rsi), %rbp
            movq 56(%rsi), %rsi
            movq %rax, (%rsp)
            xorq %rax, %rax
        )");
    }

    void Coroutine::DoWork(Coroutine *pThis) {
        pThis->m_pTaskCtx->m_userFunc();
        pThis->m_pTaskCtx->m_pFuture->SetFinished();
        pThis->m_pTaskCtx.reset();
        Coroutine::GetCtx().m_uWorkCnt--;
        pThis->Yield();
    }

    void* Coroutine::GetRsp() {
        // m_pRegister和m_pStack中間預留一個指針空間
        auto sp = std::end(m_pStack) - sizeof(void*);
        // 預定Rsp的地址保證能夠整除8字節
        sp = decltype(sp)(reinterpret_cast<size_t>(sp) & (~0xF));
        return sp;
    }

    void Coroutine::SaveReg() {
        void *pStack = GetRsp();
        memset(m_pRegister, 0, sizeof m_pRegister);
        void **pRax = (void**)pStack;
        *pRax = (void*) DoWork;
        // rsp
        m_pRegister[12] = pStack;
        // rax
        m_pRegister[8] = *pRax;
        // rdi
        m_pRegister[6] = this;
    }
    // class Coroutine end

    // class CoroutinePool start
    CoroutinePool::CoroutinePool(uint32_t uThreadCnt, uint32_t uCoroutineCnt, uint32_t uJobQueueSize) : m_queueJob(uJobQueueSize) {
        m_bStarted = false;
        m_uThreadCnt = max(uThreadCnt, 1u);
        m_uRoutineCnt = max(uCoroutineCnt, 1u);
    }

    bool CoroutinePool::Run() {
        if (!__sync_bool_compare_and_swap(&m_bStarted, false, true)) {
            return false;
        }
        
        for (decltype(m_uThreadCnt) i = 0; i < m_uThreadCnt; ++i) {
            m_vecThread.emplace_back(make_shared<thread>(CoroutinePool::LoopWork, ref(*this)));   
        }
        return true;
    }

    void CoroutinePool::Stop() {
        if (!__sync_bool_compare_and_swap(&m_bStarted, true, false)) {
            return;
        }
        
        m_oCondition.notify_all();
        for (auto it = m_vecThread.begin(); it != m_vecThread.end(); ++it) {
            (*it)->join();
        }
        m_vecThread.clear();
    }

    shared_ptr<Future> CoroutinePool::Submit(const function<void()> &userFunc) {
        shared_ptr<Future> pNewFuture = make_shared<Future>();
        CoroutineTaskCtx *pTaskCtx = new CoroutineTaskCtx;
        pTaskCtx->m_pFuture = pNewFuture;
        pTaskCtx->m_userFunc = userFunc;

        if (!m_queueJob.Push(pTaskCtx)) {
            delete pTaskCtx, pTaskCtx = nullptr;
            return nullptr;
        }
        m_oCondition.notify_all();
        return pNewFuture;
    }

    CoroutinePool::~CoroutinePool() {
        Stop();
    }

    void CoroutinePool::LoopWork(CoroutinePool &oPool) {
        Coroutine::GetCtx().m_uCursor = 0;
        Coroutine::GetCtx().m_uWorkCnt = 0;
        Coroutine::GetCtx().m_pMainCoroutine = shared_ptr<Coroutine>(new Coroutine);
        Coroutine::GetCtx().m_pMainCoroutine->Register();

        Coroutine::GetCtx().m_vecCoroutine.clear();
        for (decltype(oPool.m_uRoutineCnt) i = 0; i < oPool.m_uRoutineCnt; ++i) {
            Coroutine::GetCtx().m_vecCoroutine.emplace_back(shared_ptr<Coroutine>(new Coroutine));
        }

        Coroutine *pMainCoroutine, *pCurCoroutine;
        while (oPool.m_bStarted || Coroutine::GetCtx().m_uWorkCnt > 0 || !oPool.m_queueJob.IsEmpty()) {
            
            pMainCoroutine = Coroutine::GetCtx().m_pMainCoroutine.get();
            pCurCoroutine = Coroutine::GetCtx().m_vecCoroutine[Coroutine::GetCtx().m_uCursor].get();
            
            if (pCurCoroutine->HasTask()) {
                Coroutine::Switch(pMainCoroutine, pCurCoroutine);
                Coroutine::MoveCursor();
                continue;
            }

            CoroutineTaskCtx *pTaskCtx = oPool.m_queueJob.Pop();
            if (pTaskCtx == nullptr) {
                if (Coroutine::GetCtx().m_uWorkCnt > 0) {
                    Coroutine::MoveCursor();
                    continue;
                }
                unique_lock<mutex> oLock(oPool.m_oMutex);
                oPool.m_oCondition.wait(oLock);
                continue;
            }

            pCurCoroutine->Register(shared_ptr<CoroutineTaskCtx>(pTaskCtx));
            ++Coroutine::GetCtx().m_uWorkCnt;
            Coroutine::Switch(pMainCoroutine, pCurCoroutine);
            Coroutine::MoveCursor();
        }
    }
    // class CoroutinePool end

    // class Future start
    Future::Future() {
        m_bFinished = false;
    }

    bool Future::Get(uint32_t uTimeoutMs) {
        unique_lock<mutex> oLock(m_oMutex);
        if (m_bFinished) {
            return true;
        }
        return m_oCondition.wait_for(oLock, chrono::milliseconds(uTimeoutMs)) == cv_status::no_timeout;
    }

    void Future::SetFinished() {
        {
            unique_lock<mutex> oLock(m_oMutex);
            m_bFinished = true;
        }
        m_oCondition.notify_all();
    }
    // class Future end

    // class ArraySyncQueue start
    template <class T>
    ArraySyncQueue<T>::ArraySyncQueue(uint32_t uCapacity, uint32_t uSleepUs, uint32_t uRetryTimes) {
        for (uint32_t i = 0; i < std::max(uCapacity, 1u); ++i) {
            m_vecQueue.emplace_back(nullptr);
        }
        m_uSleepUs = uSleepUs;
        m_uRetryTimes = uRetryTimes;
    }

    template <class T>
    bool ArraySyncQueue<T>::Push(T *pObj) {
        if (pObj == nullptr) {
            return false;
        }
        uint32_t uRetryTimes = 0;
        while (uRetryTimes <= m_uRetryTimes) {
            uint32_t uPushCursor = m_uPushCursor;
            if (uPushCursor == m_uPopCursor - 1 || (m_uPopCursor == 0 && uPushCursor == m_vecQueue.size() - 1)) {
                // 隊列滿了
                return false;
            }

            if (!__sync_bool_compare_and_swap(&m_vecQueue[uPushCursor], nullptr, pObj)) {
                uRetryTimes++;
                usleep(m_uSleepUs);
                continue;
            }

            m_uPushCursor = GetNextCursor(uPushCursor);
            return true;
        }
        // 競爭失敗
        return false;
    }

    template <class T>
    T* ArraySyncQueue<T>::Pop() {
        uint32_t uRetryTimes = 0;
        while (uRetryTimes <= m_uRetryTimes) {
            uint32_t uPopCursor = m_uPopCursor;
            if (uPopCursor == m_uPushCursor) {
                return nullptr;
            }

            T* pToReturn = m_vecQueue[uPopCursor];
            if (pToReturn == nullptr || !__sync_bool_compare_and_swap(&m_vecQueue[uPopCursor], pToReturn, nullptr)) {
                usleep(m_uSleepUs);
                uRetryTimes++;
                continue;
            }
            m_uPopCursor = GetNextCursor(uPopCursor);
            return pToReturn;
        }
        return nullptr;
    }

    template <class T>
    uint32_t ArraySyncQueue<T>::GetNextCursor(uint32_t uCursor) {
        if (uCursor == m_vecQueue.size() - 1) {
            return 0;
        }
        return uCursor + 1;
    }

    template <class T>
    ArraySyncQueue<T>::~ArraySyncQueue() {
        m_uRetryTimes = -1;
        do {
            T *pObj = Pop();
            if (pObj == nullptr) {
                return;
            }
            delete pObj, pObj = nullptr;
        } while (true);
    }
    // class ArraySyncQueue end
}
  1. 補充說明

8.1. 爲什麼不能 - O0 編譯?

在 - O0 的情況下,編譯器會給函數 (coroutine.cpp:57)Coroutine::Switch 包一層彙編指令,導致實際執行彙編指令不是期望的。具體可以分別用 - O0 和 - O3 在 GDB 下 disassemble 看到差異。

8.2. 如果函數使用棧很大怎麼辦?

源碼中定義的協程棧爲 CO_STACK_SIZE=4096 + 65535KB,若用戶函數使用的棧超過該範圍會產生 coredump。簡單可行的解法是:1. 儘量使用堆變量;2. 改大 CO_STACK_SIZE。

轉自:https://zhuanlan.zhihu.com/p/652275703

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/QEVmm7BuMxIcmLhSAk0s5Q