釋放無鎖隊列的力量:探索用循環數組實現無鎖隊列

一、前言

在計算機科學領域,隊列是一種常見的數據結構,用於在多線程或多進程環境中進行有效的消息傳遞和任務調度。然而,傳統的隊列實現通常使用鎖來保護共享資源,這可能導致性能瓶頸和可伸縮性問題。

爲了克服這些限制,無鎖隊列應運而生。無鎖隊列通過採用特殊的算法和數據結構,使多個線程可以併發地訪問隊列,而無需使用鎖來保護共享資源。其中,基於循環數組的無鎖隊列是一種經典的實現方式。

本文將深入探討基於循環數組的無鎖隊列的原理和優勢。我們將介紹循環數組的基本概念,並解釋如何通過適當的算法和技術實現無鎖性。通過對比傳統的鎖保護隊列和無鎖隊列,我們將揭示無鎖隊列的性能提升和可伸縮性優勢。

此外,我們還將探討基於循環數組的無鎖隊列在實際應用中的挑戰和注意事項。我們將分享一些實際案例和經驗教訓,幫助讀者更好地理解和應用無鎖隊列。

通過閱讀本文,您將深入瞭解基於循環數組的無鎖隊列的強大功能和潛力,以及如何利用它們來提升系統性能和可伸縮性。無論您是系統設計師、開發人員還是對併發編程感興趣的研究人員,本文都將爲您帶來有價值的見解和啓發。

二、設計:類接口和變量

#ifndef _ARRAYLOCKFREEQUEUE_H___
#define _ARRAYLOCKFREEQUEUE_H___

#include <stdint.h>

#ifdef _WIN64
#define QUEUE_INT int64_t
#else
#define QUEUE_INT unsigned long
#endif

#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // 2^16

template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
class ArrayLockFreeQueue
{
public:

	ArrayLockFreeQueue();
	virtual ~ArrayLockFreeQueue();

	QUEUE_INT size();

	bool enqueue(const ELEM_T &a_data);//入隊

	bool dequeue(ELEM_T &a_data);// 出隊

    bool try_dequeue(ELEM_T &a_data);//嘗試入隊

private:

	ELEM_T m_thequeue[Q_SIZE];

	volatile QUEUE_INT m_count;//隊列的元素個數
	volatile QUEUE_INT m_writeIndex;//新元素入隊時存放位置在數組中的下標

	volatile QUEUE_INT m_readIndex;//下一個出隊元素在數組中的下標

	volatile QUEUE_INT m_maximumReadIndex;// 最後一個已經完成入隊操作的元素在數組中的下標

	inline QUEUE_INT countToIndex(QUEUE_INT a_count);
};

#include "ArrayLockFreeQueueImp.h"

#endif

m_maximumReadIndex: 最後一個已經完成入列操作的元素在數組中的下標。如果它的值跟 m_writeIndex 不一致,表明有寫請求尚未完成。這意味着,有寫請求成功申請了空間但數據還沒完全寫進隊列。所以如果有線程要讀取,必須要等到寫線程將數據完全寫入到隊列之後。

必須指明的是使用 3 種不同的下標都是必須的,因爲隊列允許任意數量的生產者和消費者圍繞着它工作。

數組環形圖:

三、CAS 的使用

使用 gcc 內置的 syn_bool_compare_and_swap,但重新做了宏定義封裝。

#ifndef _ATOM_OPT_H___
#define _ATOM_OPT_H___

#ifdef __GNUC__
	#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
	#define AtomicAdd(a_ptr,a_count) __sync_fetch_and_add (a_ptr, a_count)
	#define AtomicSub(a_ptr,a_count) __sync_fetch_and_sub (a_ptr, a_count)
	#include <sched.h> // sched_yield()
#else

#include <Windows.h>
#ifdef _WIN64
	#define CAS(a_ptr, a_oldVal, a_newVal) (a_oldVal == InterlockedCompareExchange64(a_ptr, a_newVal, a_oldVal))
	#define sched_yield()	SwitchToThread()
	#define AtomicAdd(a_ptr, num)	InterlockedIncrement64(a_ptr)
	#define AtomicSub(a_ptr, num)	InterlockedDecrement64(a_ptr)
#else
	#define CAS(a_ptr, a_oldVal, a_newVal) (a_oldVal == InterlockedCompareExchange(a_ptr, a_newVal, a_oldVal))
	#define sched_yield()	SwitchToThread()
	#define AtomicAdd(a_ptr, num)	InterlockedIncrement(a_ptr)
	#define AtomicSub(a_ptr, num)	InterlockedDecrement(a_ptr)
#endif

#endif

#endif

四、圖解:隊列的實現

4.1、 enqueue 入隊列

template <typename ELEM_T, QUEUE_INT Q_SIZE>
inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count)
{
	return (a_count % Q_SIZE);		// 取餘的時候
}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data)
{
	QUEUE_INT currentWriteIndex;		// 獲取寫指針的位置
	QUEUE_INT currentReadIndex;
	// 1. 獲取可寫入的位置
	do
	{
		currentWriteIndex = m_writeIndex;
		currentReadIndex = m_readIndex;
		if(countToIndex(currentWriteIndex + 1) ==
			countToIndex(currentReadIndex))
		{
			return false;	// 隊列已經滿了	
		}
		// 目的是爲了獲取一個能寫入的位置
	} while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
	// 獲取寫入位置後 currentWriteIndex 是一個臨時變量,保存我們寫入的位置
	// We know now that this index is reserved for us. Use it to save the data
	m_thequeue[countToIndex(currentWriteIndex)] = a_data;  // 把數據更新到對應的位置

	// 2. 更新可讀的位置,按着m_maximumReadIndex+1的操作
 	// update the maximum read index after saving the data. It wouldn't fail if there is only one thread 
	// inserting in the queue. It might fail if there are more than 1 producer threads because this
	// operation has to be done in the same order as the previous CAS
	while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
	{
		 // this is a good place to yield the thread in case there are more
		// software threads than hardware processors and you have more
		// than 1 producer thread
		// have a look at sched_yield (POSIX.1b)
		sched_yield();		// 當線程超過cpu核數的時候如果不讓出cpu導致一直循環在此。
	}

	AtomicAdd(&m_count, 1);

	return true;

}

分析:
(1)對於下圖,隊列中存放了兩個元素。WriteIndex 指示的位置是新元素將會被插入的位置。ReadIndex 指向的位置中的元素將會在下一次 pop 操作中被彈出。

(2)當生產者準備將數據插入到隊列中, 它首先通過增加 WriteIndex 的值來申請空間。MaximumReadIndex 指向最後一個存放有效數據的位置 (也就是實際的隊列尾)。

(3)一旦空間的申請完成, 生產者就可以將數據拷貝到剛剛申請到的位置中。完成之後增加 MaximumReadIndex 使得它與 WriteIndex 的一致。

(4)現在隊列中有 3 個元素,接着又有一個生產者嘗試向隊列中插入元素。

(5)在第一個生產者完成數據拷貝之前,又有另外一個生產者申請了一個新的空間準備拷貝數據。現在有兩個生產者同時向隊列插入數據。

(6)現在生產者開始拷貝數據,在完成拷貝之後,對 MaximumReadIndex 的遞增操作必須嚴格遵循一個順序:第一個生產者線程首先遞增 MaximumReadIndex,接着才輪到第二個生產者。 這個順序必須被嚴格遵守的原因是,我們必須保證數據被完全拷貝到隊列之後才允許消費者線程將其出列。
第一個生產者完成了數據拷貝,並對 MaximumReadIndex 完成了遞增。

(7)現在第二個生產者可以遞增 MaximumReadIndex 了;第二個生產者完成了對 MaximumReadIndex 的遞增, 現在隊列中有 5 個元素。

4.2、dequeue 出隊列

template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
{
	QUEUE_INT currentMaximumReadIndex;
	QUEUE_INT currentReadIndex;

	do
	{
		 // to ensure thread-safety when there is more than 1 producer thread
       	// a second index is defined (m_maximumReadIndex)
		currentReadIndex = m_readIndex;
		currentMaximumReadIndex = m_maximumReadIndex;

		if(countToIndex(currentReadIndex) ==
			countToIndex(currentMaximumReadIndex))		// 如果不爲空,獲取到讀索引的位置
		{
			// the queue is empty or
			// a producer thread has allocate space in the queue but is 
			// waiting to commit the data into it
			return false;
		}
		// retrieve the data from the queue
		a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時位置讀取的

		// try to perfrom now the CAS operation on the read index. If we succeed
		// a_data already contains what m_readIndex pointed to before we 
		// increased it
		if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
		{
			AtomicSub(&m_count, 1);	// 真正讀取到了數據,元素-1
			return true;
		}
	} while(true);

	assert(0);
	 // Add this return statement to avoid compiler warnings
	return false;

}

分析:
(1)以下插圖展示了元素出列的時候各種下標是如何變化的, 隊列中初始有 2 個元素。WriteIndex 指示的位置是新元素將會被插入的位置。ReadIndex 指向的位置中的元素將會在下一次 pop 操作中被彈出。

(2)消費者線程拷貝數組 ReadIndex 位置的元素,然後嘗試用 CAS 操作將 ReadIndex 加 1。如果操作成功消費者成功的將數據出列。因爲 CAS 操作是原子的,所以只有唯一的線程可以在同一時刻更新 ReadIndex 的值。如果操作失敗,讀取新的 ReadIndex 值,以重複以上操作 (copy 數據,CAS)。

(3)現在又有一個消費者將元素出列,隊列變成空。

(4)現在有一個生產者正在向隊列中添加元素。它已經成功的申請了空間,但尚未完成數據拷貝。任何其它企圖從隊列中移除元素的消費者都會發現隊列非空 (因爲 writeIndex 不等於 readIndex)。但它不能讀取 readIndex 所指向位置中的數據,因爲 readIndex 與 MaximumReadIndex 相等。消費者將會在 do 循環中不斷的反覆嘗試,直到生產者完成數據拷貝增加 MaximumReadIndex 的值,或者隊列變成空 (這在多個消費者的場景下會發生)。

(5)當生產者完成數據拷貝,隊列的大小是 1,消費者線程可以讀取這個數據了。

4.3、在多於一個生產者線程的情況下 “讓出”CPU 的必要性

enqueue 函數中使用了 sched_yiedld() 來主動的讓出 CPU,對於一個無鎖的算法而言,這個調用看起來有點奇怪。

多線程環境下影響性能的其中一個因素就是 Cache 損壞。而產生 Cache 損壞的一種情況就是一個線程被搶佔,操作系統需要保存被搶佔線程的上下文,然後將被選中作爲下一個調度線程的上下文載入。此時 Cache 中緩存的數據都會失效,因爲它是被搶佔線程的數據而不是新線程的數據。

所以,當此算法調用 sched_yield() 意味着告訴操作系統:“我要把處理器時間讓給其它線程,因爲我要等待某件事情的發生”。無鎖算法和通過阻塞機制同步的算法的一個主要區別在於無鎖算法不會阻塞在線程同步上。
那麼爲什麼在這裏我們要主動請求操作系統搶佔自己呢? 它與有多少個生產者線程在併發的往隊列中存放數據有關:每個生產者線程所執行的 CAS 操作都必須嚴格遵循 FIFO 次序,一個用於申請空間,另一個用於通知消費者數據已經寫入完成可以被讀取了。
如果應用程序只有唯一的生產者操作這個隊列,sche_yield() 將永遠沒有機會被調用,第 2 個 CAS 操作永遠不會失敗。因爲在一個生產者的情況下沒有人能破壞生產者執行這兩個 CAS 操作的 FIFO 順序。
而當多於一個生產者線程往隊列中存放數據的時候,問題就出現了。概括來說,一個生產者通過第 1 個
CAS 操作申請空間,然後將數據寫入到申請到的空間中,然後執行第 2 個 CAS 操作通知消費者數據準備完畢可供讀取了。這第 2 個 CAS 操作必須遵循 FIFO 順序,也就是說,如果 A 線程第首先執行完第一個 CAS 操作,那麼它也要第 1 個執行完第 2 個 CAS 操作,如果 A 線程在執行完第一個 CAS 操作之後停止,然後 B 線程執行完第 1 個 CAS 操作,那麼 B 線程將無法完成第 2 個 CAS 操作,因爲它要等待 A 先完成第 2 個 CAS 操作。而這就是問題產生的根源。

五、完整源碼

#ifndef _ARRAYLOCKFREEQUEUEIMP_H___
#define _ARRAYLOCKFREEQUEUEIMP_H___
#include "ArrayLockFreeQueue.h"

#include <assert.h>
#include "atom_opt.h"

template <typename ELEM_T, QUEUE_INT Q_SIZE>
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :
	m_writeIndex(0),
	m_readIndex(0),
	m_maximumReadIndex(0)
{
	m_count = 0;
}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()
{

}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count)
{
	return (a_count % Q_SIZE);		// 取餘的時候
}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()
{
	QUEUE_INT currentWriteIndex = m_writeIndex;
	QUEUE_INT currentReadIndex = m_readIndex;

	if(currentWriteIndex>=currentReadIndex)
		return currentWriteIndex - currentReadIndex;
	else
		return Q_SIZE + currentWriteIndex - currentReadIndex;

}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data)
{
	QUEUE_INT currentWriteIndex;		// 獲取寫指針的位置
	QUEUE_INT currentReadIndex;
	// 1. 獲取可寫入的位置
	do
	{
		currentWriteIndex = m_writeIndex;
		currentReadIndex = m_readIndex;
		if(countToIndex(currentWriteIndex + 1) ==
			countToIndex(currentReadIndex))
		{
			return false;	// 隊列已經滿了	
		}
		// 目的是爲了獲取一個能寫入的位置
	} while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
	// 獲取寫入位置後 currentWriteIndex 是一個臨時變量,保存我們寫入的位置
	// We know now that this index is reserved for us. Use it to save the data
	m_thequeue[countToIndex(currentWriteIndex)] = a_data;  // 把數據更新到對應的位置

	// 2. 更新可讀的位置,按着m_maximumReadIndex+1的操作
 	// update the maximum read index after saving the data. It wouldn't fail if there is only one thread 
	// inserting in the queue. It might fail if there are more than 1 producer threads because this
	// operation has to be done in the same order as the previous CAS
	while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
	{
		 // this is a good place to yield the thread in case there are more
		// software threads than hardware processors and you have more
		// than 1 producer thread
		// have a look at sched_yield (POSIX.1b)
		sched_yield();		// 當線程超過cpu核數的時候如果不讓出cpu導致一直循環在此。
	}

	AtomicAdd(&m_count, 1);

	return true;

}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::try_dequeue(ELEM_T &a_data)
{
    return dequeue(a_data);
}

template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
{
	QUEUE_INT currentMaximumReadIndex;
	QUEUE_INT currentReadIndex;

	do
	{
		 // to ensure thread-safety when there is more than 1 producer thread
       	// a second index is defined (m_maximumReadIndex)
		currentReadIndex = m_readIndex;
		currentMaximumReadIndex = m_maximumReadIndex;

		if(countToIndex(currentReadIndex) ==
			countToIndex(currentMaximumReadIndex))		// 如果不爲空,獲取到讀索引的位置
		{
			// the queue is empty or
			// a producer thread has allocate space in the queue but is 
			// waiting to commit the data into it
			return false;
		}
		// retrieve the data from the queue
		a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時位置讀取的

		// try to perfrom now the CAS operation on the read index. If we succeed
		// a_data already contains what m_readIndex pointed to before we 
		// increased it
		if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
		{
			AtomicSub(&m_count, 1);	// 真正讀取到了數據,元素-1
			return true;
		}
	} while(true);

	assert(0);
	 // Add this return statement to avoid compiler warnings
	return false;

}

#endif

源碼測試

#include "ArrayLockFreeQueue.h"
ArrayLockFreeQueue<int> arraylockfreequeue;
void *arraylockfreequeue_producer_thread(void *argv)
{
  PRINT_THREAD_INTO();
  int count = 0;
  int write_failed_count = 0;

  for (int i = 0; i < s_queue_item_num;)
  {
    if (arraylockfreequeue.enqueue(count)) // enqueue的順序是無法保證的,我們只能計算enqueue的個數
    {
      count = lxx_atomic_add(&s_count_push, 1);
      i++;
    }
    else
    {
      write_failed_count++;
      // printf("%s %lu enqueue failed, q:%d\n", __FUNCTION__,  pthread_self(), arraylockfreequeue.size());
      sched_yield();
      // usleep(10000);
    }
  }
  // printf("%s %lu write_failed_count:%d\n", __FUNCTION__, pthread_self(), write_failed_count)
  PRINT_THREAD_LEAVE();
  return NULL;
}

void *arraylockfreequeue_consumer_thread(void *argv)
{
  int last_value = 0;
  PRINT_THREAD_INTO();
  int value = 0;
  int read_failed_count = 0;
  while (true)
  {

    if (arraylockfreequeue.dequeue(value))
    {
      if (s_consumer_thread_num == 1 && s_producer_thread_num == 1 && (last_value + 1) != value) // 只有一入一出的情況下才有對比意義
      {
        // printf("pid:%lu, -> value:%d, expected:%d\n", pthread_self(), value, last_value);
      }
      lxx_atomic_add(&s_count_pop, 1);
      last_value = value;
    }
    else
    {
      read_failed_count++;
      // printf("%s %lu no data, s_count_pop:%d, value:%d\n", __FUNCTION__, pthread_self(), s_count_pop, value);
      // usleep(100);
      sched_yield();
    }

    if (s_count_pop >= s_queue_item_num * s_producer_thread_num)
    {
      // printf("%s dequeue:%d, s_count_pop:%d, %d, %d\n", __FUNCTION__, last_value, s_count_pop, s_queue_item_num, s_consumer_thread_num);
      break;
    }
  }
  // printf("%s %lu read_failed_count:%d\n", __FUNCTION__, pthread_self(), read_failed_count)
  PRINT_THREAD_LEAVE();
  return NULL;
}

總結

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/6blmalIVRiwNG6y6wAMz0A