深入理解無鎖隊列及其性能對比

一、無鎖隊列用在什麼樣的場景?

當需要處理的數據非常多,比如行情數據,一秒處理非常多的數據的時候,可以考慮用無鎖隊列。但是如果一秒只需要處理幾百或者幾千的數據,是沒有必要考慮用無鎖隊列的。用互斥鎖就能解決問題,數據量相對少的時候互斥鎖與無鎖隊列之間差別並不是很明顯。

二、爲什麼要用無鎖隊列?

有鎖隊列會有哪些問題?

1、Cache 的損壞,在線程間頻繁切換的時候會導致 Cache 中數據的丟失;

CPU 的運行速度比主存快 N 倍,所以大量的處理器時間被浪費在處理器與主存的數據傳輸上,這就是在處理器與主存之間引入 Cache 的原因。Cache 是一種速度更快但容量更小的內存,當處理器要訪問主存中的數據時,這些數據首先要被拷貝到 Cache 中,因爲這些數據在不久的將來可能又會被處理器訪問。Cache misses 對性能有非常大的影響,因爲處理器訪問 Cache 中的數據將比直接訪問主存快得多。

線程被頻繁搶佔產生的 Cache 損壞將導致應用程序性能下降。

2、在同步機制上爭搶隊列;

CPU 會將大量的時間浪費在保護隊列數據的互斥鎖,而不是處理隊列中的數據。

然後非阻塞的機制使用了 CAS 的特殊操作,使得任務之間可以不爭搶任何資源,然後在隊列中預定的位置上,插入或者提取數據。

3、多線程動態內存分配性能下降;

多線程同時分配內存時,會涉及到線程分配同一塊相同地址內存的問題,這個時候會用鎖來進行同步。顯然頻繁分配內存會導致應用程序性能下降。

三、無鎖隊列的實現

3.1 一讀一寫的無鎖隊列

yqueue 是用來設計隊列,ypipe 用來設計隊列的寫入時機、回滾以及 flush,首先我們來看 yqueue 的設計。

3.1.1 yqueue——無鎖隊列

1、內存分配

首先我們需要考慮隊列的內存分配,yqueue 中的數據結構使用的 chunk 塊機制,每次批量分配一批元素,這樣可以減少內存的分配和釋放:

template<typename T, int N>
    // 鏈表結點稱之爲chunk_t
    struct chunk_t
    {
        T values[N]; //每個chunk_t可以容納N個T類型的元素,以後就以一個chunk_t爲單位申請內存
        chunk_t *prev;
        chunk_t *next;
    };

當隊列不足的時候每次分配一個 chunk_t,每個 chunk_t 能存儲 N 個元素。

93     //  Adds an element to the back end of the queue.
94     inline void push()
95     {
96         back_chunk = end_chunk;
97         back_pos = end_pos; //
99         if (++end_pos != N) //end_pos!=N表明這個chunk節點還沒有滿
100             return;
101
102         chunk_t *sc = spare_chunk.xchg(NULL); // 爲什麼設置爲NULL? 因爲如果把之前值取出來了則沒有spare chunk了,所以設置爲NULL
103         if (sc)                               // 如果有spare chunk則繼續複用它
104         {
105             end_chunk->next = sc;
106             sc->prev = end_chunk;
107         }
108         else // 沒有則重新分配
109         {
110             // static int s_cout = 0;
111             // printf("s_cout:%d\n", ++s_cout);
112             end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t)); // 分配一個chunk
113             alloc_assert(end_chunk->next);
114             end_chunk->next->prev = end_chunk;
115         }
116         end_chunk = end_chunk->next;
117         end_pos = 0;
118     }

可以看到 112 行,在要 push 一個元素的時候,首先看最後一個 chunk,也就是 back_chunk 的 back_pos 是不是該 chunk 的最後一個元素,如果是,則重新分配一個 chunk,將這個 chunk 加到 chunk 鏈表的下一個節點。

這個邏輯相對來說還是比較簡單的。唯一需要關注的,就是:

102         chunk_t *sc = spare_chunk.xchg(NULL);

這一行,這個 spare_chunk 是怎麼來的?

154     //  Removes an element from the front end of the queue.
155     inline void pop()
156     {
157         if (++begin_pos == N) // 刪除滿一個chunk纔回收chunk
158         {
159             chunk_t *o = begin_chunk;
160             begin_chunk = begin_chunk->next;
161             begin_chunk->prev = NULL;
162             begin_pos = 0;
163
164             //  'o' has been more recently used than spare_chunk,
165             //  so for cache reasons we'll get rid of the spare and
166             //  use 'o' as the spare.
167             chunk_t *cs = spare_chunk.xchg(o); //由於局部性原理,總是保存最新的空閒塊而釋放先前的空閒快
168             free(cs);
169         }
170     }

當 pop 的時候,如果刪除一個 chunk 裏面沒有元素了,這個時候會需要將這個 chunk 所開闢的空間釋放掉,但是這裏使用了一個技巧即:將這個 chunk 先不釋放,先放到 spare_chunk 裏面,等到下次需要開闢新的空間的時候再把這個 spare_chunk 拿來用。

我們再來看 ypipe。

3.1.2 ypipe——yqueue 的封裝

yqueue 負責元素內存的分配與釋放,入隊以及出隊列;ypipe 負責 yqueue 讀寫指針的變化。

ypipe 是在 yqueue_t 的基礎上再構建一個單讀單寫的無鎖隊列。

這裏有三個指針:

ypipe 的定義:

37     //  Initialises the pipe.
 38     inline ypipe_t()
 49     //  The destructor doesn't have to be virtual. It is mad virtual
 50     //  just to keep ICC and code checking tools from complaining.


 51     inline virtual ~ypipe_t()
 52     {
 53     }

 67     // 寫入數據,incomplete參數表示寫入是否還沒完成,在沒完成的時候不會修改flush指針,即這部分數據不會讓讀線程看到。
 68     inline void write(const T &value_, bool incomplete_);

 92     inline bool unwrite(T *value_);

104     // 刷新所有已經完成的數據到管道,返回false意味着讀線程在休眠,在這種情況下調用者需要喚醒讀線程。
105     // 批量刷新的機制, 寫入批量後喚醒讀線程;
106     // 反悔機制 unwrite
107     inline bool flush();


136     //  Check whether item is available for reading.
137     // 這裏面有兩個點,一個是檢查是否有數據可讀,一個是預取
138     inline bool check_read();
163     //  Reads an item from the pipe. Returns false if there is no value.
164     //  available.
165     inline bool read(T *value_)
178     //  Applies the function fn to the first elemenent in the pipe
179     //  and returns the value returned by the fn.
180     //  The pipe mustn't be empty or the function crashes.
181     inline bool probe(bool (*fn)(&))
189 protected:
190     //  Allocation-efficient queue to store pipe items.
191     //  Front of the queue points to the first prefetched item, back of
192     //  the pipe points to last un-flushed item. Front is used only by
193     //  reader thread, while back is used only by writer thread.
194     yqueue_t<T, N> queue;
195
196     //  Points to the first un-flushed item. This variable is used
197     //  exclusively by writer thread.
198     T *w; //指向第一個未刷新的元素,只被寫線程使用
199
200     //  Points to the first un-prefetched item. This variable is used
201     //  exclusively by reader thread.
202     T *r; //指向第一個還沒預提取的元素,只被讀線程使用
203
204     //  Points to the first item to be flushed in the future.
205     T *f; //指向下一輪要被刷新的一批元素中的第一個
206
207     //  The single point of contention between writer and reader thread.
208     //  Points past the last flushed item. If it is NULL,
209     //  reader is asleep. This pointer should be always accessed using
210     //  atomic operations.
211     atomic_ptr_t<T> c; //讀寫線程共享的指針,指向每一輪刷新的起點(看代碼的時候會詳細說)。當c爲空時,表示讀線程睡眠(只會在讀線程中被設置爲空)
212
213     //  Disable copying of ypipe object.
214     ypipe_t(const ypipe_t &);
215     const ypipe_t &operator=(const ypipe_t &);
3.1.3 ypipe 設計的目的

爲了批量讀寫,即用戶可以自主的決定寫了多少數據之後開啓讀。那因爲有了生產者和消費者,就會涉及到同步的問題,ypipe 這裏測試發現,用鎖和條件變量性能最佳。

我們來分兩種情況看一下讀寫的具體步驟。第一種情況:批量寫,第一輪寫:

在這個時候才能開始讀數據:

img

第二種方式:條件變量 + 互斥鎖:

flush 函數

101     //  Flush all the completed items into the pipe. Returns false if
102     //  the reader thread is sleeping. In that casecaller is obliged to
103     //  wake the reader up before using the pipe again.
104     // 刷新所有已經完成的數據到管道,返回false意味着讀線程在休眠,在這種情況下調用者需要喚醒讀線程。
105     // 批量刷新的機制, 寫入批量後喚醒讀線程;
106     // 反悔機制 unwrite
107     inline bool flush()
108     {
109         //  If there are no un-flushed items, do nothing.
110         if (w == f) // 不需要刷新,即是還沒有新元素加入
111             return true;
112
113         //  Try to set 'c' to 'f'.
114         // read時如果沒有數據可以讀取則c的值會被置爲NULL
115         if (c.cas(w, f) != w) // 嘗試將c設置爲f,即是準備更新w的位置
116         {
117
118             //  Compare-and-swap was unseccessful because 'c' is NULL.
119             //  This means that the reader is asleep. Therefore we don't
120             //  care about thread-safeness and update c in non-atomic
121             //  manner. We'll return false to let the caller know
122             //  that reader is sleeping.
123             c.set(f); // 更新爲新的f位置
124             w = f;
125             return false; //線程看到flush返回false之後會發送一個消息給讀線程,這需要寫業務去做處理
126         }
127         else  // 讀端還有數據可讀取
128         {
129             //  Reader is alive. Nothing special to do now. Just move
130             //  the 'first un-flushed item' pointer to 'f'.
131             w = f;             // 更新f的位置
132             return true;
133         }
134     }

flush 的目的就是將改變 w 的值,同時改變 c 的值,這裏有兩種情況:

1、c 的值與 w 的值相等

說明隊列的 w 值沒有更新,不對隊列的數據進行讀取:

這發生在 flush 第一次發生的時候以及 w 的值還未更新時,此時返回 true,表示隊列不可讀。

2、c 的值與 w 的值不相等

這發生在 c 在 w 位置後面,此時更新 c 與 w 的值,並返回 false,表示隊列可讀。

write 函數

write 函數相對簡單:

 64     //  Write an item to the pipe.  Don't flush it yet. If incomplete is
 65     //  set to true the item is assumed to be continued by items
 66     //  subsequently written to the pipe. Incomplete items are neverflushed down the stream.
 67     // 寫入數據,incomplete參數表示寫入是否還沒完成,在沒完成的時候不會修改flush指針,即這部分數據不會讓讀線程看到。
 68     inline void write(const T &value_, bool incomplete_)
 69     {
 70         //  Place the value to the queue, add new terminator element.
 71         queue.back() = value_;
 72         queue.push();
 73
 74         //  Move the "flush up to here" poiter.
 75         if (!incomplete_)
 76         {
 77             f = &queue.back(); // 記錄要刷新的位置
 78             // printf("1 f:%p, w:%p\n", f, w);
 79         }
 80         else
 81         {
 82             //  printf("0 f:%p, w:%p\n", f, w);
 83         }
 84     }

write 只更新 f 的位置。write 並不能決定該隊列是否能讀,因爲 write 並不能改變 w 指針,如果要隊列能讀,需要 w 指針改變位置纔行。

從 write 和 flush 可以看出,在更新 w 和 f 的時候並沒有互斥的保護,所以該無鎖隊列的設計並不適合多線程場景。

read 函數

138     inline bool check_read()
139     {
140         //  Was the value prefetched already? If so, return.
141         if (&queue.front() != r && r) //判斷是否在前幾次調用read函數時已經預取數據了return true;
142             return true;
143
144         //  There's no prefetched value, so let us prefetch more values.
145         //  Prefetching is to simply retrieve the
146         //  pointer from c in atomic fashion. If there are no
147         //  items to prefetch, set c to NULL (using compare-and-swap).
148         // 兩種情況
149         // 1. 如果c值和queue.front(), 返回c值並將c值置爲NULL,此時沒有數據可讀
150         // 2. 如果c值和queue.front(), 返回c值,此時可能有數據度的去
151         r = c.cas(&queue.front(), NULL); //嘗試預取數據
152
153         //  If there are no elements prefetched, exit.
154         //  During pipe's lifetime r should never be NULL, however,
155         //  it can happen during pipe shutdown when items are being deallocated.
156         if (&queue.front() == r || !r) //判斷是否成功預取數據
157             return false;
158
159         //  There was at least one value prefetched.
160         return true;
161     }
162
163     //  Reads an item from the pipe. Returns false if there is no value.
164     //  available.
165     inline bool read(T *value_)
166     {
167         //  Try to prefetch a value.
168         if (!check_read())
169             return false;
170
171         //  There was at least one value prefetched.
172         //  Return it to the caller.
173         *value_ = queue.front();
174         queue.pop();
175         return true;
176     }

這裏也是有兩種情況:

1、r 不爲空且 r 不等於 & queue.front()

說明此時隊列中有可讀數據,直接讀取即可。

2、r 指針指向隊頭元素 (r==&queue.front()) 或者 r 爲空

說明隊列中並沒有可讀的數據,此時將 r 指針更新成 c 的值,這個過程我們叫做預取。預取的指令就是:

r=c;

c 在 flush 的時候會被設置爲 w。而 w 與 & queue.front() 之間都是有距離的。這一段距離中間的數據就是預取數據,所以每次 read 都能取出一段數據。

當 & queue.front() == c 時,代表數據被取完了,這時把 c 指向 NULL,接着讀線程會睡眠,這也是給寫線程檢查讀線程是否睡眠的標誌。

我們可以測試一下結果,對一個數據加 200 萬次,分別用環形數組、鏈表、互斥鎖、ypipe 隊列分別是什麼樣的性能。

通過測試發現在一讀一寫的情況下,ypipe 的優勢是非常大的。

那多讀多寫的場景呢?

四、多讀多寫的無鎖隊列實現

上面我們介紹的是一讀一寫的場景,用 ypipe 的方式會性能比較快,但是 ypipe 不適用於多讀多寫的場景,因爲在讀的時候是沒有對 r 指針加鎖,在寫的時候也沒有對 w 指針加鎖。

多讀多寫的線程安全隊列有以下幾種實現方式:

1、互斥鎖

2、互斥鎖 + 條件變量:BlockQueue

3、內存屏障:SimpleLockFreeQueue

4、CAS 原子操作:ArrayLockFreeQueue(也可以理解成 RingBuffer)

其中互斥鎖的性能是幾種方式裏面性能最低的,沒什麼講的必要,這裏就不對比這種實現方式了。

4.1 RingBuffer(ArrayLockFreeQueue)

下面我們來看基於循環數組的無鎖隊列,也就是 RingBuffer 如何解決多線程競爭的問題。

首先看下 RingBuffer 的數據結構如下:

 14 template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
 15 class ArrayLockFreeQueue
 16 {
 17 public:
 18
 19     ArrayLockFreeQueue();
 20     virtual ~ArrayLockFreeQueue();
 21
 22     QUEUE_INT size();
 23
 24     bool enqueue(const ELEM_T &a_data);//入隊列
 25
 26     bool dequeue(ELEM_T &a_data);//出隊列
 27
 28     bool try_dequeue(ELEM_T &a_data);
 29
 30 private:
 31
 32     ELEM_T m_thequeue[Q_SIZE];
 33
 34     volatile QUEUE_INT m_count;
 35     volatile QUEUE_INT m_writeIndex;
 36
 37     volatile QUEUE_INT m_readIndex;
 38
 39     volatile QUEUE_INT m_maximumReadIndex;
 40
 41     inline QUEUE_INT countToIndex(QUEUE_INT a_count);
 42 };

m_count: // 隊列的元素個數

我們先來看三種不同的下標:

以上三種不同的下標都是必須的,因爲隊列允許任意數量的生產者和消費者圍繞着它工作。已經存在一種基於循環數組的無鎖隊列,使得唯一的生產者和唯一的消費者可以良好的工作。它的實現相當簡潔非常值得閱讀。該程序使用 gcc 內置的__sync_bool_compare_and_swap,但重新做了宏定義封裝。

#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)

隊列已滿判斷:

(m_writeIndex+1) % Q_SIZE == m_readIndex

對應代碼:

countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)

隊列爲空判斷:

 m_readIndex == m_maximumReadIndex

該 RingBuffer 的重點主要是以下四個方面的問題:

1、多線程寫入的時候,m_writeIndex 如何更新?

2、m_maximumReadIndex 這個變量爲什麼會需要?它有什麼作用?

3、多線程讀的惡時候,m_readIndex 如何更新?

4、m_maximumReadIndex 在什麼時候改變?

4.2 enqueue 入隊列
42 template <typename ELEM_T, QUEUE_INT Q_SIZE>
 43 bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data)
 44 {
 45     QUEUE_INT currentWriteIndex;        // 獲取寫指針的位置
 46     QUEUE_INT currentReadIndex;
 47     // 1. 獲取可寫入的位置
 48     do
 49     {
 50         currentWriteIndex = m_writeIndex;
 51         currentReadIndex = m_readIndex;
 52         if(countToIndex(currentWriteIndex + 1) ==
 53             countToIndex(currentReadIndex))
 54         {
 55             return false;   // 隊列已經滿了
 56         }
 57         // 目的是爲了獲取一個能寫入的位置
 58     } while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
 59     // 獲取寫入位置後 currentWriteIndex 是一個臨時變量,保存我們寫入的位置
 60     // We know now that this index is reserved for us. Use it to save the data
 61     m_thequeue[countToIndex(currentWriteIndex)] = a_data;  // 把數據更新到對應的位置
 62
 63     // 2. 更新可讀的位置,按着m_maximumReadIndex+1的操作
 64     // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
 65     // inserting in the queue. It might fail if there are more than 1 producer threads because this
 66     // operation has to be done in the same order as the previous CAS
 67     while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
 68     {
 69          // this is a good place to yield the thread in case there are more
 70         // software threads than hardware processors and you have more
 71         // than 1 producer thread
 72         // have a look at sched_yield (POSIX.1b)
 73         sched_yield();      // 當線程超過cpu核數的時候如果不讓出cpu導致一直循環在此。
 74     }
 75
 76     AtomicAdd(&m_count, 1);
 77
 78     return true;
 79
 80 }

圖示(非常重要):

以下插圖展示了對隊列執行操作時各個下標時如何變化的。如果一個位置被標記爲 X,表示這個位置裏面存放了數據。空白表示位置是空的。對於下圖的情況,隊列中存放了兩個元素。WriteIndex 指示的位置是新元素將會被插入的位置。ReadIndex 指向的位置中的元素將會在下一次 pop 操作中被彈出。

當生產者準備將數據插入到隊列中時,它首先通過增加 WriteIndex 的值來申請空間。MaximumReadIndex 指向最後一個存放有效數據的位置(也就是實際的讀的隊列尾)。

一旦空間的申請完成,生產者就可以將數據拷貝到剛剛申請的位置中。完成之後增加 MaximumReadIndex 使得它與 WriteIndex 一致。

現在隊列中有 3 個元素,接着又有一個生產者嘗試向隊列中插入元素。

在第一個生產者完成數據拷貝之前,又有另外一個生產者申請了一個新的空間準備拷貝元素。現在有兩個生產者同時向隊列插入數據。

現在生產者開始拷貝數據,在完成拷貝之後,對 MaximumReadIndex 的遞增操作必須嚴格遵循一個順序:第一個生產者線程首先遞增 MaximumReadIndex,接着才輪到第二個生產者。這個順序必須被嚴格遵守的原因是,我們必須保證數據被完全拷貝到隊列之後才允許消費者線程將其出列。

while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)){
sched_yield();      // 當線程超過cpu核數的時候如果不讓出cpu導致一直循環在此。
}

第一個生產者完成了數據拷貝,並對 MaximumReadIndex 完成了遞增,現在第二個生產者可以遞增 MaximumReadIndex 了。

第二個生產者完成了對 MaximumReadIndex 的遞增,現在隊列中有 5 個元素。

4.3 dequeue 出隊列
88 template <typename ELEM_T, QUEUE_INT Q_SIZE>
 89 bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
 90 {
 91     QUEUE_INT currentMaximumReadIndex;
 92     QUEUE_INT currentReadIndex;
 93
 94     do
 95     {
 96          // to ensure thread-safety when there is more than 1 producer thread
 97         // a second index is defined (m_maximumReadIndex)
 98         currentReadIndex = m_readIndex;
 99         currentMaximumReadIndex = m_maximumReadIndex;
100
101         if(countToIndex(currentReadIndex) ==
102             countToIndex(currentMaximumReadIndex))      // 如果不爲空,獲取到讀索引的位置
103         {
104             // the queue is empty or
105             // a producer thread has allocate space in the queue but is
106             // waiting to commit the data into it
107             return false;
108         }
109         // retrieve the data from the queue
110         a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時位置讀取的
111
112         // try to perfrom now the CAS operation on the read index. If we succeed
113         // a_data already contains what m_readIndex pointed to before we
114         // increased it
115         if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
116         {
117             AtomicSub(&m_count, 1); // 真正讀取到了數據,元素-1
118             return true;
119         }
120     } while(true);
121
122     assert(0);
123      // Add this return statement to avoid compiler warnings
124     return false;
125
126 }

以下插入展示了元素出列的時候各種下標是如何變化的,隊列中初始有 2 個元素。WriteIndex 指示的位置是新元素將會被插入的位置。ReadIndex 指向的位置中的元素將會在下一次 pop 操作中被彈出。

消費者線程拷貝數組 ReadIndex 位置的元素,然後嘗試 CAS 操作將 ReadIndex 加 1. 如果操作成功消費者成功地將數據出列。因爲 CAS 操作是原子的,所以只有唯一的線程可以在同一時刻更新 ReadIndex 的值。

如果操作失敗,讀取新的 ReadIndex 的值,重複以上操作 (copy 數據,CAS)。

現在又有一個消費者將元素出列,隊列變成空。

現在有一個生產者正在向隊列中添加元素。它已經成功的申請了空間,但尚未完成數據拷貝。任何其他企圖從隊列中移除元素的消費者都會發現隊列非空 (因爲 writeIndex 不等於 readIndex)。但它不能讀取 readIndex 所指向位置中的數據,因爲 readIndex 與 MaximumReadIndex 相等。這個時候讀數據失敗,需要等到生產者完成數據拷貝增加 MaximumReadIndex 的值纔可以讀。

當生產者完成數據拷貝,隊列的大小是 1,消費者線程就可以讀取這個數據了。

4.4 yielding 處理器的必要性
 67     while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
 68     {
 69          // this is a good place to yield the thread in case there are more
 70         // software threads than hardware processors and you have more
 71         // than 1 producer thread
 72         // have a look at sched_yield (POSIX.1b)
 73         sched_yield();      // 當線程超過cpu核數的時候如果不讓出cpu導致一直循環在此。
 74     }

在 enqueue 的第二個 CAS 裏面有一個 sched_yield() 來主動讓出處理器的操作,對於一個聲稱無鎖的算法而言,這個調用看起來有點兒奇怪。多線程環境下影響性能的其中一個因素就是 Cache 損壞。而產生 Cache 損壞的一種情況就是一個線程被搶佔,操作系統需要保存被搶佔線程的上下文,然後被選中作爲下一個調度線程的上下文載入。此時 Cache 中緩存的數據都會失效,因爲它是被搶佔線程的數據而不是新線程的數據。

無鎖算法和通過阻塞機制同步的算法的一個主要區別在於無鎖算法不會阻塞在線程同步上。那這裏的讓出 CPU,與阻塞在線程同步上有啥區別?爲什麼不直接自旋?

首先說下 sched_yield 的必要性:sched_yield 的調用與有多少個生產者線程在併發地往隊列中存放數據有關:每個生產者線程所執行的 CAS 操作都必須嚴格遵循 FIFO 次序, 一個用於申請空間, 另一個用於通知消費者數據已經寫入完成可以被讀取了. 如果我們的應用程序只有唯一的生產者這個操作隊列,sched_yield 將永遠沒有機會被調用,因爲 enqueue 的第二個 CAS 操作永遠不會失敗。因爲一個生產者的情況下沒人能破壞生產者執行這兩個 CAS 操作的 FIFO 順序。

而對於多個生產者線程往隊列中存放數據的時候,問題就出現了。概括來說,一個生產者通過第 1 個 CAS 操作申請空間, 然後將數據寫入到申請到的空間中, 然後執行第 2 個 CAS 操作通知消費者數據準備完畢可供讀取了. 這第 2 個 CAS 操作必須遵循 FIFO 順序, 也就是說, 如果 A 線程第首先執行完第一個 CAS 操作, 那麼它也要第 1 個執行完第 2 個 CAS 操作, 如果 A 線程在執行完第一個 CAS 操作之後停止, 然後 B 線程執行完第 1 個 CAS 操作, 那麼 B 線程將無法完成第 2 個 CAS 操作, 因爲它要等待 A 先完成第 2 個 CAS 操作. 而這就是問題產生的根源. 讓我們考慮如下場景, 3 個消費者線程和 1 個消費者線程:

  1. 線程 1,2,3 按順序調用第 1 個 CAS 操作申請了空間. 那麼它們完成第 2 個 CAS 操作的順序也應該與這個順序一致, 1,2,3;

  2. 線程 2 首先嚐試執行第 2 個 CAS, 但它會失敗, 因爲線程 1 還沒完成它的第 2 此 CAS 操作呢. 同樣對於線程 3 也是一樣的;

  3. 線程 2 和 3 將會不斷的調用它們的第 2 個 CAS 操作, 直到線程 1 完成它的第 2 個 CAS 操作爲止;

  4. 線程 1 最終完成了它的第 2 個 CAS, 現在線程 3 必須等線程 2 先完成它的第 2 個 CAS;

  5. 線程 2 也完成了, 最終線程 3 也完成。

在上面的場景中, 生產者可能會在第 2 個 CAS 操作上自旋一段時間, 用於等待先於它執行第 1 個 CAS 操作的線程完成它的第 2 次 CAS 操作. 在一個物理處理器數量大於操作隊列線程數量的系統上, 這不會有太嚴重的問題: 因爲每個線程都可以分配在自己的處理器上執行, 它們最終都會很快完成各自的第 2 次 CAS 操作. 雖然算法導致線程處理忙等狀態, 但這正是我們所期望的, 因爲這使得操作更快的完成. 也就是說在這種情況下我們是不需要 sche_yield() 的, 它完全可以從代碼中刪除。

但是, 在一個物理處理器數量少於線程數量的系統上, sche_yield()就變得至關重要了. 讓我們再次考查上面 3 個線程的場景, 當線程 3 準備向隊列中插入數據: 如果線程 1 在執行完第 1 個 CAS 操作, 在執行第 2 個 CAS 操作之前被搶佔, 那麼線程 2,3 就會一直在它們的第 2 個 CAS 操作上忙等 (它們忙等, 不讓出處理器, 線程 1 也就沒機會執行, 它們就只能繼續忙等), 直到線程 1 重新被喚醒, 完成它的第 2 個 CAS 操作。這就是需要 sche_yield() 的場合了, 操作系統應該避免讓線程 2,3 處於忙等狀態. 它們應該儘快的讓出處理器讓線程 1 執行, 使得線程 1 可以把它的第 2 個 CAS 操作完成. 這樣線程 2 和 3 才能繼續完成它們的操作。

也就是說,如果不適用 sched_yield,一直自旋,那麼可能多個線程同時阻塞在第二個 CAS 那兒。

4.5 多讀多寫的 RingBuffer 存在的問題

1、多於一個生產者線程性能提升不明顯

如果有多於一個的生產者線程, 那麼將它們很可能花費大量的時間用於等待更新 MaximumReadIndex(第 2 個 CAS). 這個隊列最初的設計場景是滿足單一消費者, 所以不用懷疑在多生產者的情形下會比單一生產者有大幅的性能下降。

另外如果你只打算將此隊列用於單一生產者的場合, 那麼第 2 個 CAS 操作可以去除. 同樣 m_maximumReadIndex 也可以一同被移除了, 所有對 m_maximumReadIndex 的引用都改成 m_writeIndex. 所以, 在這樣的場合下 push 和 pop 可以被改寫如下:

template <typename ELEM_T>
    bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
    {
        uint32_t currentReadIndex;
        uint32_t currentWriteIndex;

        currentWriteIndex = m_writeIndex;
        currentReadIndex  = m_readIndex;
        if (countToIndex(currentWriteIndex + 1) ==
            countToIndex(currentReadIndex))
        {
            // the queue is full
            return false;
        }

        // save the date into the q
        m_theQueue[countToIndex(currentWriteIndex)] = a_data;

        // increment atomically write index. Now a consumer thread can read
        // the piece of data that was just stored
        AtomicAdd(&m_writeIndex, 1);

        return true;
    }

    template <typename ELEM_T>
    bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
    {
    uint32_t currentMaximumReadIndex;
    uint32_t currentReadIndex;
do
    {
        // m_maximumReadIndex doesn't exist when the queue is set up as
        // single-producer. The maximum read index is described by the current
        // write index
        currentReadIndex        = m_readIndex;
        currentMaximumReadIndex = m_writeIndex;

        if (countToIndex(currentReadIndex) ==
            countToIndex(currentMaximumReadIndex))
        {
            // the queue is empty or
            // a producer thread has allocate space in the queue but is
            // waiting to commit the data into it
            return false;
        }

        // retrieve the data from the queue
        a_data = m_theQueue[countToIndex(currentReadIndex)];

        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we
        // increased it
        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
        {
            return true;
        }

        // it failed retrieving the element off the queue. Someone else must
        // have read the element stored at countToIndex(currentReadIndex)
        // before we could perform the CAS operation

    } while(1); // keep looping to try again!

    // Something went wrong. it shouldn't be possible to reach here
    assert(0);

    // Add this return statement to avoid compiler warnings
    return false;
    }

但是如果是單讀單寫的場景,沒有必要用這個無鎖隊列,可以看以上單讀單寫的無鎖隊列。

2、與智能指針一起使用,內存無法得到釋放

如果你打算用這個隊列來存放智能指針對象. 需要注意, 將一個智能指針存入隊列之後, 如果它所佔用的位置沒有被另一個智能指針覆蓋, 那麼它所指向的內存是無法被釋放的 (因爲它的引用計數器無法下降爲 0). 這對於一個操作頻繁的隊列來說沒有什麼問題, 但是程序員需要注意的是, 一旦隊列被填滿過一次那麼應用程序所佔用的內存就不會下降, 即使隊列被清空. 除非自己做改動,每次 pop 手動 delete。

3、計算隊列的大小存在 ABA 問題

size 函數可能會返回一個不正確的值, size 的實現如下:

template <typename ELEM_T>
    inline uint32_t ArrayLockFreeQueue<ELEM_T>::size()
    {
        uint32_t currentWriteIndex = m_writeIndex;
        uint32_t currentReadIndex  = m_readIndex;

        if (currentWriteIndex >= currentReadIndex)
        {
            return (currentWriteIndex - currentReadIndex);
        }
        else
        {
            return (m_totalSize + currentWriteIndex - currentReadIndex);
        }
    }

下面的場景描述了 size 爲何會返回一個不正確的值:

  1. 當 currentWriteIndex = m_writeIndex 執行之後, m_writeIndex=3,m_readIndex = 2 那麼實際 size 是 1;

  2. 之後操作線程被搶佔, 且在它停止運行的這段時間內, 有 2 個元素被插入和從隊列中移除。所以 m_writeIndex=5,m_readIndex = 4, 而 size 還是 1;

  3. 現在被搶佔的線程恢復執行, 讀取 m_readIndex 值, 這個時候 currentReadIndex=4,currentWriteIndex=3;

  4. currentReadIndex > currentWriteIndex'所以 m_totalSize + currentWriteIndex - currentReadIndex` 被返回, 這個值意味着隊列幾乎是滿的, 而實際上隊列幾乎是空的。

實際上也就是 ABA 的一個場景。與本文一起上傳的代碼中包含了處理這個問題的解決方案。

解決方案:添加一個用於保存隊列中元素數量的成員 count. 這個成員可以通過 AtomicAdd/AtomicSub 來實現原子的遞增和遞減。

但需要注意的是這增加了一定開銷, 因爲原子遞增, 遞減操作比較昂貴也很難被編譯器優化。

例如, 在 core 2 duo E6400 2.13 Ghz 的機器上, 單生產者單消費者, 隊列數組的初始大小是 1000, 測試執行 10,000k 次的插入, 沒有 count 成員的版本用時 2.64 秒, 而維護了 count 成員的版本用時 3.42 秒. 而對於 2 消費者, 1 生產者的情況, 沒有 count 成員的版本用時 3.98 秒, 維護 count 的版本用時 5.15 秒。

這也就是爲什麼我把是否啓用此成員變量的選擇交給實際的使用者. 使用者可以根據自己的使用場合選擇是否承受額外的運行時開銷。

在 array_lock_free_queue.h 中有一個名爲 ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE 的宏變量, 如果它被定義那麼將啓用 count 變量, 否則將 size 函數將有可能返回不正確的值。

4.6 多讀多寫 RingBuffer 的性能

無鎖 vs 阻塞隊列

併發的插入和移除 100W 元素所花費的時間 (越小越好, 隊列的數組大小初始爲 16384)。在單生產者的情況下, 無鎖隊列戰勝了阻塞隊列. 而隨着生產者數量的增加, 無鎖隊列的效率迅速下降。因爲在多個生產者的情況下,第 2 個 CAS 將對性能產生影響。

然後我們來看代碼中的情況:

再來看看消費者線程數量對性能的影響。

1、一個生產者線程

2、兩個生產者

3、三個生產者

4.7 RingBuffer 結論

1、CAS 操作是原子的,線程並行執行 push/pop 不會導致死鎖;

2、多生產者同時向隊列 push 數據的時候不會將數據寫入到同一個位置, 產生數據覆蓋;

3、多消費者同時執行 pop 不會導致一個元素被出列多於 1 次;

4、線程不能將數據 push 進已經滿的隊列中, 不能從空的隊列中 pop 元素;

5、push 和 pop 都沒有 ABA 問題。

但是,雖然這個隊列是線程安全的, 但是在多生產者線程的環境下它的性能還是不如阻塞隊列. 因此, 在符合下述條件的情況下可以考慮使用這個隊列來代替阻塞隊列:

1、只有一個生產者線程;

2、只有一個頻繁操作隊列的生產者, 但偶爾會有其它生產者向隊列 push 數據;

在 reactor 網絡框架中,如果只有一個 reactor 在處理 client 的話,用數組實現的 RingBuffer 來存儲消息是比較合適的。

4.8 四種線程安全隊列實現性能對比

互斥鎖隊列 vs 互斥鎖 + 條件變量隊列 vs 內存屏障鏈表 vs RingBuffer CAS 實現。

1、4 寫 1 讀

2、4 寫 4 讀

3、1 寫 4 讀

可以發現 RingBuffer 的實現性能在幾個場景中都是比較好的,但是相對而言,在 1 寫 4 讀的場景下性能是最明顯的,幾乎是內存屏障的 3 倍性能了。

爲什麼鏈表的方式性能相對 BlockQueue 沒有很大的提升呢?

1、鏈表的方式需要不斷的申請和釋放元素。當然,用內存池可以適當改善這個影響,但是內存池在分配內存與釋放內存的時候也會涉及到線程間的數據競爭,所以用鏈表的方式性能相對提升不多。

入隊:

74   template <typename U>
 75   inline bool enqueue(&&item)
 76   {
 77     idx_t nodeIdx = allocate_node_for(std::forward<U>(item));
 78
 79     auto tail_ = tail.load(std::memory_order_relaxed);
 80     while (!tail.compare_exchange_weak(tail_, nodeIdx, std::memory_order_release, std::memory_order_relaxed))
 81       continue;
 82     get_node_at(tail_)->next.store(nodeIdx, std::memory_order_release);
 83
 84     return true;
 85   }

出隊:

87   inline bool try_dequeue(&item) {
…….
125           add_node_to_free_list(head_, headNode);
}

2、鏈表需要不斷地去更新頭節點和尾節點指針的位置,在一個 while 循環裏面反覆去執行:

80     while (!tail.compare_exchange_weak(tail_, nodeIdx, std::memory_order_release, std::memory_order_relaxed))
81       continue;

參考:

https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular

https://zhuanlan.zhihu.com/p/33985732

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/rQzi0FvavoKHCRVBy1JknQ