【c--】循環數組無鎖隊列的原理與實現

前言


本文介紹基於循環數組的無鎖隊列的原理與實現。源碼:https://github.com/gopherWxf/c-c-linux-LearningCode/tree/master/3.2.4%E6%97%A0%E9%94%81%E9%98%9F%E5%88%97freequeue

在 ZMQ 無鎖隊列的原理與實現一文中,我們已經知道了 ypipe 可以實現一線程寫一線程讀的無鎖隊列,那麼其劣勢就很明顯了,無法適應多寫多讀的場景,因爲其在讀的時候沒有對 r 指針加鎖,在寫的時候沒有對 w 指針加鎖。那麼如何實現一個多讀多寫的線程安全的無鎖隊列呢?

  1. 互斥鎖:mutexqueue(太簡單不介紹了)

  2. 互斥鎖 + 條件變量:blockqueue(太簡單不介紹了)

  3. 內存屏障:lockfreequeue(SimpleLockFreeQueue.h 暫時未寫文章介紹)

  4. 雙重 CAS 原子操作:arraylockfreequeue(本文)

  1. ArrayLockFreeQueue 的類接⼝和變量

該程序使用 gcc 內置的__sync_bool_compare_and_swap,但重新做了宏定義封裝。

#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)

所謂循環數組,就是 RingBuffer

#define QUEUE_INT unsigned long
#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // 2^16

template<typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
class ArrayLockFreeQueue {
public:

    ArrayLockFreeQueue();

    virtual ~ArrayLockFreeQueue();

    QUEUE_INT size();

    bool enqueue(const ELEM_T &a_data);// ⼊隊列

    bool dequeue(ELEM_T &a_data);// 出隊列

private:

    ELEM_T m_thequeue[Q_SIZE];

    volatile QUEUE_INT m_count;// 隊列內有多少元素
    volatile QUEUE_INT m_writeIndex;//新元素⼊列時存放位置在數組中的下標

    volatile QUEUE_INT m_readIndex;//下⼀個出列元素在數組中的下標

    volatile QUEUE_INT m_maximumReadIndex;最後⼀個已經完成⼊列操作的元素在數組中的下標

    //取餘
    inline QUEUE_INT countToIndex(QUEUE_INT a_count);
};

2.1 變量介紹

必須指明的是使⽤ 3 種不同的下標都是必須的,因爲隊列允許任意數量的⽣產者和消費者圍繞着它⼯作。已經存在⼀種基於循環數組的⽆鎖隊列,使得唯⼀的⽣產者和唯⼀的消費者可以良好的⼯作。它的實現相當簡潔⾮常值得閱讀。

2.2 函數介紹

2.2.1 取餘函數 QUEUE_INT countToIndex(QUEUE_INT a_count);

這個函數非常有用,因爲我們實現的是循環隊列,所以一定要對數組長度取餘。

template<typename ELEM_T, QUEUE_INT Q_SIZE>
inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count) {
    return (a_count % Q_SIZE);        // 取餘的時候
}

隊列已滿判斷如下

// (m_writeIndex + 1) %/Q_SIZE == m_readIndex
 countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)

隊列爲空判斷如下

//m_readIndex == m_maximumReadIndex
countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)

2.2.2 文字舉例

2.2.2.1 入隊函數 bool enqueue(const ELEM_T &a_data);

下面以文字舉例說明函數,後續再圖示舉例。

假設現在有兩個線程都進入了 enqueue 這個函數,m_writeIndex,m_readIndex 和 m_maximumReadIndex 都爲 0。

currentWriteIndex = 0
currentReadIndex = 0
CAS(0,0,1)----> m_writeIndex = 1
m_maximumReadIndex = 0
currentWriteIndex = 0
currentReadIndex = 0
CAS(1,0,1)----> m_writeIndex = m_writeIndex  false

while再次循環

currentWriteIndex = 1
currentReadIndex = 0
CAS(1,1,2)----> m_writeIndex = 2
m_maximumReadIndex = 0
CAS(0,1,2) -----> m_maximumReadIndex = m_maximumReadIndex false
yidld讓出CPU
此時線程1執行↓
CAS(0,0,1) ------>m_maximumReadIndex = 1
執行結束,此時線程2恢復執行
CAS(1,1,2) ------>m_maximumReadIndex = 2
執行結束
template<typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data) {
    QUEUE_INT currentWriteIndex;        // 獲取寫指針的位置
    QUEUE_INT currentReadIndex;
    // 1. 獲取可寫入的位置
    do {
        currentWriteIndex = m_writeIndex;
        currentReadIndex = m_readIndex;
        if (countToIndex(currentWriteIndex + 1) ==
            countToIndex(currentReadIndex)) {
            return false;    // 隊列已經滿了
        }
        // 目的是爲了獲取一個能寫入的位置
    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
    // 獲取寫入位置後 currentWriteIndex 是一個臨時變量,保存我們寫入的位置
    // We know now that this index is reserved for us. Use it to save the data
    m_thequeue[countToIndex(currentWriteIndex)] = a_data;  // 把數據更新到對應的位置

    // 2. 更新可讀的位置,按着m_maximumReadIndex+1的操作
    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
    // inserting in the queue. It might fail if there are more than 1 producer threads because this
    // operation has to be done in the same order as the previous CAS
    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
        // this is a good place to yield the thread in case there are more
        // software threads than hardware processors and you have more
        // than 1 producer thread
        // have a look at sched_yield (POSIX.1b)
        sched_yield();        // 當線程超過cpu核數的時候如果不讓出cpu導致一直循環在此。
    }
    //printf("m_writeIndex:%d currentWriteIndex:%d m_maximumReadIndex:%d\n",m_writeIndex,currentWriteIndex,m_maximumReadIndex);
    AtomicAdd(&m_count, 1);
    
    return true;
}

2.2.2.2 出隊函數 bool dequeue(ELEM_T &a_data);

下面以文字舉例說明函數,後續再圖示舉例。

假設現在有兩個線程都進入了 dequeue 這個函數,currentReadIndex 爲 0,currentMaximumReadIndex 爲 2。

currentReadIndex = 0
currentMaximumReadIndex = 2
data = m_thequeue[0]
CAS(0,0,1) ----> m_readIndex = 1
currentReadIndex = 0
currentMaximumReadIndex = 2
data = m_thequeue[0]
CAS(1,0,1) ----> m_readIndex = m_readIndex  false

while再循環

currentReadIndex = 1
currentMaximumReadIndex = 2
data = m_thequeue[1]
CAS(1,1,2) ----> m_readIndex = 2

如果沒有新數據寫入,再次讀取數據,則 currentReadIndex(2)==currentMaximumReadIndex(2) 相等,return false,沒有數據可讀。

template<typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data) {
    QUEUE_INT currentMaximumReadIndex;
    QUEUE_INT currentReadIndex;

    do {
        // to ensure thread-safety when there is more than 1 producer thread
        // a second index is defined (m_maximumReadIndex)
        currentReadIndex = m_readIndex;
        currentMaximumReadIndex = m_maximumReadIndex;

        if (countToIndex(currentReadIndex) ==
            countToIndex(currentMaximumReadIndex))        // 如果不爲空,獲取到讀索引的位置
        {
            // the queue is empty or
            // a producer thread has allocate space in the queue but is
            // waiting to commit the data into it
            return false;
        }
        // retrieve the data from the queue
        a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時位置讀取的

        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we
        // increased it
        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
            //printf("m_readIndex:%d currentReadIndex:%d m_maximumReadIndex:%d\n",m_readIndex,currentReadIndex,m_maximumReadIndex);
            AtomicSub(&m_count, 1);    // 真正讀取到了數據,元素-1
            return true;
        }
    } while (true);

    assert(0);
    // Add this return statement to avoid compiler warnings
    return false;
}

2.2.3 圖示舉例

2.2.3.1 入隊函數 bool enqueue(const ELEM_T &a_data);

下面的圖我就不分目錄了,直接一口氣說明完。

以下插圖展示了對隊列執⾏操作時各下標是如何變化的。如果⼀個位置被標記爲 X,標識這個位置⾥存放了數據。空⽩表示位置是空的。對於下圖的情況, 隊列中存放了兩個元素。WriteIndex 指示的位置是新元素將會被插⼊的位置。ReadIndex 指向的位置中的元素將會在下⼀次 pop 操作中被彈出。

當⽣產者準備將數據插⼊到隊列中, 它⾸先通過增加 WriteIndex 的值來申請空間。MaximumReadIndex 指向最後⼀個存放有效數據的位置 (也就是實際的隊列尾)。

⼀旦空間的申請完成, ⽣產者就可以將數據拷⻉到剛剛申請到的位置中。完成之後增加 MaximumReadIndex 使得它與 WriteIndex 的⼀致。

現在隊列中有 3 個元素,接着⼜有⼀個⽣產者嘗試向隊列中插⼊元素。

在第⼀個⽣產者完成數據拷⻉之前,⼜有另外⼀個⽣產者申請了⼀個新的空間準備拷⻉數據。現在有兩個⽣產者同時向隊列插⼊數據。

現在⽣產者開始拷⻉數據,在完成拷⻉之後,對 MaximumReadIndex 的遞增操作必須嚴格遵循⼀個順序:第⼀個⽣產者線程⾸先遞增 MaximumReadIndex,接着才輪到第⼆個⽣產者。這個順序必須被嚴格遵守的原因是,我們必須保證數據被完全拷⻉到隊列之後才允許消費者線程將其出列。讓出 cpu 的⽬的也是爲了讓排在最前⾯的⽣產者完成 m_maximumReadIndex 的更新

while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
        sched_yield();        // 當線程超過cpu核數的時候如果不讓出cpu導致一直循環在此。
    }

第⼀個⽣產者完成了數據拷⻉,並對 MaximumReadIndex 完成了遞增,現在第⼆個⽣產者可以遞增 MaximumReadIndex 了。

第⼆個⽣產者完成了對 MaximumReadIndex 的遞增, 現在隊列中有 5 個元素。

template<typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data) {
    QUEUE_INT currentWriteIndex;        // 獲取寫指針的位置
    QUEUE_INT currentReadIndex;
    // 1. 獲取可寫入的位置
    do {
        currentWriteIndex = m_writeIndex;
        currentReadIndex = m_readIndex;
        if (countToIndex(currentWriteIndex + 1) ==
            countToIndex(currentReadIndex)) {
            return false;    // 隊列已經滿了
        }
        // 目的是爲了獲取一個能寫入的位置
    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
    // 獲取寫入位置後 currentWriteIndex 是一個臨時變量,保存我們寫入的位置
    // We know now that this index is reserved for us. Use it to save the data
    m_thequeue[countToIndex(currentWriteIndex)] = a_data;  // 把數據更新到對應的位置

    // 2. 更新可讀的位置,按着m_maximumReadIndex+1的操作
    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
    // inserting in the queue. It might fail if there are more than 1 producer threads because this
    // operation has to be done in the same order as the previous CAS
    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
        // this is a good place to yield the thread in case there are more
        // software threads than hardware processors and you have more
        // than 1 producer thread
        // have a look at sched_yield (POSIX.1b)
        sched_yield();        // 當線程超過cpu核數的時候如果不讓出cpu導致一直循環在此。
    }
//    printf("m_writeIndex:%d currentWriteIndex:%d m_maximumReadIndex:%d\n",m_writeIndex,currentWriteIndex,m_maximumReadIndex);
    AtomicAdd(&m_count, 1);

    return true;
}

2.2.3.2 出隊函數 bool dequeue(ELEM_T &a_data);

以下插圖展示了元素出列的時候各種下標是如何變化的, 隊列中初始有 2 個元素。WriteIndex 指示的位置是新元素將會被插⼊的位置。ReadIndex 指向的位置中的元素將會在下⼀次 pop 操作中被彈出。

消費者線程拷⻉數組 ReadIndex 位置的元素,然後嘗試⽤ CAS 操作將 ReadIndex 加 1。如果操作成功消費者成功的將數據出列。因爲 CAS 操作是原⼦的,所以只有唯⼀的線程可以在同⼀時刻更新 ReadIndex 的值。如果操作失敗,讀取新的 ReadIndex 值,以重複以上操作 (copy 數據,CAS)。

現在⼜有⼀個消費者將元素出列,隊列變成空。

現在有⼀個⽣產者正在向隊列中添加元素。它已經成功的申請了空間,但尚未完成數據拷⻉。任何其它企圖從隊列中移除元素的消費者都會發現隊列⾮空 (因爲 writeIndex 不等於 readIndex)。但它不能讀取 readIndex 所指向位置中的數據,因爲 readIndex 與 MaximumReadIndex 相等 (相等 break)。直到⽣產者完成數據拷⻉增加 MaximumReadIndex 的值才能讀取這個數據。

當⽣產者完成數據拷⻉,隊列的⼤⼩是 1,消費者線程可以讀取這個數據了。

template<typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data) {
    QUEUE_INT currentMaximumReadIndex;
    QUEUE_INT currentReadIndex;

    do {
        // to ensure thread-safety when there is more than 1 producer thread
        // a second index is defined (m_maximumReadIndex)
        currentReadIndex = m_readIndex;
        currentMaximumReadIndex = m_maximumReadIndex;

        if (countToIndex(currentReadIndex) ==
            countToIndex(currentMaximumReadIndex))        // 如果不爲空,獲取到讀索引的位置
        {
            // the queue is empty or
            // a producer thread has allocate space in the queue but is
            // waiting to commit the data into it
            return false;
        }
        // retrieve the data from the queue
        a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時位置讀取的

        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we
        // increased it
        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
            //printf("m_readIndex:%d currentReadIndex:%d m_maximumReadIndex:%d\n",m_readIndex,currentReadIndex,m_maximumReadIndex);
            AtomicSub(&m_count, 1);    // 真正讀取到了數據,元素-1
            return true;
        }
    } while (true);

    assert(0);
    // Add this return statement to avoid compiler warnings
    return false;
}

2.2.4 計算隊列的大小 size 函數的 ABA 問題與解決方案

template<typename ELEM_T, QUEUE_INT Q_SIZE>
QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size() {
    QUEUE_INT currentWriteIndex = m_writeIndex;
    QUEUE_INT currentReadIndex = m_readIndex;

    if (currentWriteIndex >= currentReadIndex)
        return currentWriteIndex - currentReadIndex;
    else
        return Q_SIZE + currentWriteIndex - currentReadIndex;
}

下面的場景描述了 size 爲何會返回一個不正確的值:

1. 當 currentWriteIndex = m_writeIndex 執行之後,m_writeIndex=3,m_readIndex = 2 那麼實際 size 是 1;
2. 之後操作線程被搶佔,且在它停止運行的這段時間內,有 2 個元素被插入和從隊列中移除。所以 m_writeIndex=5,m_readIndex = 4,而 size 還是 1;
3. 現在被搶佔的線程恢復執行,讀取 m_readIndex 值,這個時候 currentReadIndex=4,currentWriteIndex=3;
4. currentReadIndex > currentWriteIndex'所以 m_totalSize + currentWriteIndex - currentReadIndex`被返回,這個值意味着隊列幾乎是滿的,而實際上隊列幾乎是空的。

解決方案:添加一個用於保存隊列中元素數量的成員 count. 這個成員可以通過 AtomicAdd/AtomicSub 來實現原子的遞增和遞減。

但需要注意的是這增加了一定開銷, 因爲原子遞增, 遞減操作比較昂貴也很難被編譯器優化。如果可以容忍 size 函數的 ABA 問題,則可以不用 count 與 AtomicAdd/AtomicSub。使用者可以根據自己的使用場合選擇是否承受額外的運行時開銷。

2.2.5 與智能指針一起使用,內存無法得到釋放

如果你打算用這個隊列來存放智能指針對象. 需要注意, 將一個智能指針存入隊列之後, 如果它所佔用的位置沒有被另一個智能指針覆蓋, 那麼它所指向的內存是無法被釋放的 (因爲它的引用計數器無法下降爲 0). 這對於一個操作頻繁的隊列來說沒有什麼問題, 但是程序員需要注意的是, 一旦隊列被填滿過一次那麼應用程序所佔用的內存就不會下降, 即使隊列被清空. 除非自己做改動,每次 pop 手動 delete。

  1. 多個⽣產者線程的情況下 yielding 處理器的必要性

讀者可能注意到了 enqueue 函數中使⽤了 sched_yield() 來主動的讓出處理器,對於⼀個聲稱⽆鎖的算法⽽⾔,這個調⽤看起來有點奇怪。正如⽂章開始的部分解釋過的,多線程環境下影響性能的其中⼀個因素就是 Cache 損壞。⽽產⽣ Cache 損壞的⼀種情況就是⼀個線程被搶佔,操作系統需要保存被搶佔線程的上下⽂,然後將被選中作爲下⼀個調度線程的上下⽂載⼊。此時 Cache 中緩存的數據都會失效,因爲它是被搶佔線程的數據⽽不是新線程的數據。

所以,當此算法調⽤ sched_yield() 意味着告訴操作系統:“我要把處理器時間讓給其它線程,因爲我要等待某件事情的發⽣”。⽆鎖算法和通過阻塞機制同步的算法的⼀個主要區別在於⽆鎖算法不會阻塞在線程同步上,那麼爲什麼在這⾥我們要主動請求操作系統搶佔⾃⼰呢?這個問題的答案沒那麼簡單。它與有多少個⽣產者線程在併發的往隊列中存放數據有關:每個⽣產者線程所執⾏的 CAS 操作都必須嚴格遵循 FIFO 次序,⼀個⽤於申請空間,另⼀個⽤於通知消費者數據已經寫⼊完成可以被讀取了。

如果我們的應⽤程序只有唯⼀的⽣產者操作這個隊列,sche_yield() 將永遠沒有機會被調⽤,第 2 個 CAS 操作永遠不會失敗。因爲在⼀個⽣產者的情況下沒有⼈能破壞⽣產者執⾏這兩個 CAS 操作的 FIFO 順序。

⽽當多於⼀個⽣產者線程往隊列中存放數據的時候,問題就出現了。概括來說,⼀個⽣產者通過第 1 個 CAS 操作申請空間,然後將數據寫⼊到申請到的空間中,然後執⾏第 2 個 CAS 操作通知消費者數據準備完畢可供讀取了。這第 2 個 CAS 操作必須遵循 FIFO 順序,也就是說,如果 A 線程第⾸先執⾏完第⼀個 CAS 操作,那麼它也要第 1 個執⾏完第 2 個 CAS 操作,如果 A 線程在執⾏完第⼀個 CAS 操作之後停⽌,然後 B 線程執⾏完第 1 個 CAS 操作,那麼 B 線程將⽆法完成第 2 個 CAS 操作,因爲它要等待 A 先完成第 2 個 CAS 操作。⽽這就是問題產⽣的根源。讓我們考慮如下場景,3 個消費者線程和 1 個消費者線程:

在上⾯的場景中,⽣產者可能會在第 2 個 CAS 操作上⾃旋⼀段時間,⽤於等待先於它執⾏第 1 個 CAS 操作的線程完成它的第 2 次 CAS 操作。在⼀個物理處理器數量⼤於操作隊列線程數量的系統上,這不會有太嚴重的問題:因爲每個線程都可以分配在⾃⼰的處理器上執⾏,它們最終都會很快完成各⾃的第 2 次 CAS 操作。雖然算法導致線程處理忙等狀態,但這正是我們所期望的,因爲這使得操作更快的完成。也就是說在這種情況下我們是不需要 sche_yield() 的,它完全可以從代碼中刪除。

但是,在⼀個物理處理器數量少於線程數量的系統上,sche_yield()就變得⾄關重要了。讓我們再次考查上⾯ 3 個線程的場景,當線程 3 準備向隊列中插⼊數據:如果線程 1 在執⾏完第 1 個 CAS 操作,在執⾏第 2 個 CAS 操作之前被搶佔,那麼線程 2,3 就會⼀直在它們的第 2 個 CAS 操作上忙等 (它們忙等,不讓出處理器,線程 1 也就沒機會執⾏,它們就只能繼續忙等,也就是說,如果不適用 sched_yield,一直自旋,那麼可能多個線程同時阻塞在第二個 CAS 那兒),直到線程 1 重新被喚醒,完成它的第 2 個 CAS 操作。這就是需要 sche_yield() 的場合了,操作系統應該避免讓線程 2,3 處於忙等狀態。它們應該儘快的讓出處理器讓線程 1 執⾏,使得線程 1 可以把它的第 2 個 CAS 操作完成。這樣線程 2 和 3 才能繼續完成它們的操作。

  1. 循環數組無鎖隊列的性能測試

4.1 性能測試

互斥鎖隊列 vs 互斥鎖 + 條件變量隊列 vs 內存屏障鏈表 vs RingBuffer CAS 實現。

4 寫 1 讀:性能中等

4 寫 4 讀:性能中上

1 寫 4 讀:性能最好

7 寫 7 讀:性能比互斥鎖隊列還差

4.2 分析

雖然沒有分析第三個內存屏障鏈表的代碼,但是我們不難看出互斥鎖 + 條件變量 與 內存屏障鏈表 的性能差別不大。爲什麼呢?鏈表的方式需要不斷的申請和釋放元素。當然,用內存池可以適當改善這個影響,但是內存池在分配內存與釋放內存的時候也會涉及到線程間的數據競爭,所以用鏈表的方式性能相對提升不多。

隨着生產者數量的增加, 無鎖隊列的效率迅速下降。因爲在多個生產者的情況下,第 2 個 CAS 將對性能產生影響。我們通過測試得出循環數組的⽆鎖隊列在 1 寫 4 讀的場景下性能提升是最高的,因爲只有一個生產者,那麼第二個 CAS 不會有 yield 的情況出現。由此我們可以得出一個結論,在一寫多讀的場景,我們可以優先使用循環數組的⽆鎖隊列,比如下圖的場景。

來源:https://gopher.blog.csdn.net/article/details/126295835

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/P9tHsYX2AzuGkXql7wMKTg