DPDK 環形緩衝區(Ring)詳解及性能優化

前言:此篇文章主要用來學習和記錄 DPDK 中無鎖環形隊列相關內容,結合了官方文檔說明和源碼中的實現,供大家交流和學習。

一、DPDK 中的環形數據結構

DPDK 中的環形結構經常用於隊列管理,因此又稱爲環形隊列。它不同於大小可靈活變化的鏈表,它的空間大小是固定的。DPDK 中的 rte_ring 擁有如下特性:

rte_ring 環形隊列與基於鏈表的隊列相比,擁有如下優點:

rte_ring 環形隊列與基於鏈表的隊列相比,缺點如下:

rte_ring 環形隊列描述:

環形隊列

DKDP 源碼中的環形隊列數據結構如下:

struct rte_ring {
	char name[RTE_RING_NAMESIZE];    /**< Name of the ring. */
	int flags;                       /**< Flags supplied at creation. */
	const struct rte_memzone *memzone;
	/**< Memzone, if any, containing the rte_ring */

	/** Ring producer status. */
	struct prod {
		uint32_t watermark;      /**< Maximum items before EDQUOT. */
		uint32_t sp_enqueue;     /**< True, if single producer. */
		uint32_t size;           /**< Size of ring. */
		uint32_t mask;           /**< Mask (size-1) of ring. */
		volatile uint32_t head;  /**< Producer head. */
		volatile uint32_t tail;  /**< Producer tail. */
	} prod __rte_cache_aligned;

	/** Ring consumer status. */
	struct cons {
		uint32_t sc_dequeue;     /**< True, if single consumer. */
		uint32_t size;           /**< Size of the ring. */
		uint32_t mask;           /**< Mask (size-1) of ring. */
		volatile uint32_t head;  /**< Consumer head. */
		volatile uint32_t tail;  /**< Consumer tail. */
	} cons __rte_cache_aligned;

	void * ring[0] __rte_cache_aligned; /**< Memory space of ring starts here.
	                                     * not volatile so need to be careful
	                                     * about compiler re-ordering */
};

每一個環形數據結構都包含兩對(head,tail)指針:一對用於生產者(prod),另一隊用於消費者(cons)。文章後面通過 prod_head, prod_tail, cons_head, cons_tail 來分別表示這四個指針。

head,tail 的範圍都是 0~2^32;,它恰恰是利用了 unsigned int 溢出的性質。

在 DPDK 實現中,rte_ring 是通過 “name” 字段來唯一標識的,我們可以通過rte_ring_create()來創建環形隊列,他可以保證創建的隊列 name 的唯一性。

二、環形隊列區別

2.1 環形隊列:單生產者 / 單消費者模式

本節內容主要爲單生產者下的入隊操作以及單消費者下的出隊操作。

爲了方便後續表達和理解,這裏有必要統一一下描述:

關於臨時變量可以這樣理解:每一個 CPU 都有獨佔 cache, 這些臨時變量 l_xxx_xxx 則是相應 CPU 存儲在本地 cache 中尚未更新到全局環形隊列上的值。而 g_xxx_xxx 則表示存儲中的共享的環形隊列值。

生產者–入隊

下面舉一個例子:只有一個生產者的入隊操作。

將環形隊列的 g_prod_head、g_cons_tail 存儲在臨時變量 l_prod_head、l_cons_tail 中記錄位置;臨時變量 l_prod_next 則根據插入對象的個數移動了相應的位置。如果沒有足夠的空間來執行入隊操作,則返回錯誤。

將環形隊列的 g_prod_head 移動到 l_prod_next 的位置;然後將對象添加到環形緩衝區中。

注: l_prod_next: 用來提前預定位置,g_prod_head 則是真正改變環形隊列指針,佔用位置生效。

一旦對象被添加到環形隊列中,g_prod_tail 將會被修改,指向 g_prod_head 的位置。

至此,入隊操作完成。說實話,只看到這段描述,是在看不出所以然,以及它的特點。下面我們通過查看源碼來加深對入隊操作的理解。(文字言簡意賅,讀不懂;代碼深奧,看不懂。O(∩_∩)O 哈哈~)

static inline int __attribute__((always_inline))
__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
			 unsigned n, enum rte_ring_queue_behavior behavior)
{
	uint32_t prod_head, cons_tail;
	uint32_t prod_next, free_entries;
	unsigned i;
	uint32_t mask = r->prod.mask;
	int ret;

	prod_head = r->prod.head;
	cons_tail = r->cons.tail;
	/* The subtraction is done between two unsigned 32bits value
	 * (the result is always modulo 32 bits even if we have
	 * prod_head > cons_tail). So 'free_entries' is always between 0
	 * and size(ring)-1. */
	free_entries = mask + cons_tail - prod_head;

	/* check that we have enough room in ring */
	if (unlikely(n > free_entries)) {
		if (behavior == RTE_RING_QUEUE_FIXED) {
			__RING_STAT_ADD(r, enq_fail, n);
			return -ENOBUFS;
		}
		else {
			/* No free entry available */
			if (unlikely(free_entries == 0)) {
				__RING_STAT_ADD(r, enq_fail, n);
				return 0;
			}

			n = free_entries;
		}
	}

	prod_next = prod_head + n;
	r->prod.head = prod_next;

	/* write entries in ring */
	ENQUEUE_PTRS();
	rte_smp_wmb();

	/* if we exceed the watermark */
	if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
		ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
			(int)(n | RTE_RING_QUOT_EXCEED);
		__RING_STAT_ADD(r, enq_quota, n);
	}
	else {
		ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
		__RING_STAT_ADD(r, enq_success, n);
	}

	r->prod.tail = prod_next;
	return ret;
}

我們依然按照剛纔的三步走來說明代碼:

第 9、10 行:用來保存 prod_head, cons_tail 變量。它的目的嘛,準備開房。
第 13~34 行,確定麗晶大酒店是否還有剩餘房間可供咱們開!(確保環形隊列剩餘空間足夠入隊操作,如果空間不足,則提示相應信息並返回錯誤碼)。

第 36 行:打電話 (使用臨時變量 l_prod_next) 預定麗晶大酒店房間的位置。

第 37 行:房間已經預定成功,可以直接開車去訂好的房間 (l_prod_next)。

第 40 行:將對象(object)拉進房間中辦事

第 41 行:等事情辦完 (內存屏障) 再走

第 54 行:事情已經辦完,對象仍在房間回味呢,我收拾乾淨移步到下一個房間,繼續等待另一個對象 (object) 的到來。

到此爲止,單生產者入隊完畢。我特麼懷疑我自己講了一個特別有內涵的段子,我太有才了 O(∩_∩)O。

消費者–出隊

本節介紹一個消費者出隊操作在環形隊列中如何實現的。在這個例子中,只有消費者的 head、tail(即 cons_head、cons_tail)會被修改。在初始狀態時,cons_head 和 cons_tail 指向相同的內存空間。

下面舉一個例子:只有一個消費者的出隊操作。

將環形隊列的 g_prod_head、g_cons_tail 存儲在臨時變量 l_prod_head、l_cons_tail 中記錄位置;臨時變量 l_cons_next 則根據出隊對象的個數移動了相應的位置。如果沒有足夠的對象來執行出隊操作,則返回錯誤。

將環形隊列的 g_cons_head 移動到 l_cons_next 的位置;然後將對象添加到環形緩衝區中。

注: l_cons_next: 用來提前預定位置,g_cons_head 則是真正改變環形隊列指針,佔用位置生效。

出隊完成後,g_cons_tail 將會被修改,指向 g_prod_head 的位置。

至此,但消費者的出隊操作便完成了。那接下來我們繼續講解我的小段子:

static inline int __attribute__((always_inline))
__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
		 unsigned n, enum rte_ring_queue_behavior behavior)
{
	uint32_t cons_head, prod_tail;
	uint32_t cons_next, entries;
	unsigned i;
	uint32_t mask = r->prod.mask;

	cons_head = r->cons.head;
	prod_tail = r->prod.tail;
	/* The subtraction is done between two unsigned 32bits value
	 * (the result is always modulo 32 bits even if we have
	 * cons_head > prod_tail). So 'entries' is always between 0
	 * and size(ring)-1. */
	entries = prod_tail - cons_head;

	if (n > entries) {
		if (behavior == RTE_RING_QUEUE_FIXED) {
			__RING_STAT_ADD(r, deq_fail, n);
			return -ENOENT;
		}
		else {
			if (unlikely(entries == 0)){
				__RING_STAT_ADD(r, deq_fail, n);
				return 0;
			}

			n = entries;
		}
	}

	cons_next = cons_head + n;
	r->cons.head = cons_next;

	/* copy in table */
	DEQUEUE_PTRS();
	rte_smp_rmb();

	__RING_STAT_ADD(r, deq_success, n);
	r->cons.tail = cons_next;
	return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
}

我們依然按照剛纔的三步走來說明代碼:

第 9、10 行:用來保存 prod_head, cons_tail 變量。它的目的嘛,事都辦完了,準備退房。
第 16~31 行,確定麗晶大酒店開了幾間房,不能多退(出隊個數檢查,不得超過緩衝緩衝區中存儲的個數)。
第 33 行:打電話通知前臺,準備要退的房間 (使用臨時變量 l_cons_next 記錄)

第 34 行:上一個房間已退,還好我叫了好幾個對象,我可以準備去下一個對象的房間 (l_prod_next)。

第 37 行:上一個房間裏的對象收拾好行李,神清氣爽、精神飽滿、幸福感十足的走出房間。

第 38 行:等這個對象真的走遠,真的走遠了才能行動(內存屏蔽)。

第 41 行:確認完畢方纔對象真的走了,我開心的進入了下一個對象房間。

到此爲止,小故事已經講完,單消費者出隊操作完畢。

2.3 環形隊列:多生產者 / 多消費者模式

關於變量命名規則可以參見第 2 章節。

多生產者–入隊

本節將介紹兩個生產者同時執行入隊在環形緩衝區是如何操作的。 在這個例子中,只有一個生產者的 head、tail(即 cons_head、cons_tail)會被修改。在初始狀態時,cons_head 和 cons_tail 指向相同的內存空間。

在兩個 CPU 上,環形隊列的 g_prod_head、 g_cons_tail 同時被兩個核存儲在本地臨時變量中。並同時將 l_prod_next 根據入隊個數預定位置,並指向預留好的位置後面。

修改 g_prod_head 指向 l_prod_next 位置,這部分操作完成後則說明環形隊列允許入隊操作。該操作是通過 CAS 指令來完成,它和內存屏蔽是無鎖環形隊列的核心和關鍵。這個操作一次只能在其中一個 core 上完成 (假設 CPU1 上成功執行了 CAS 操作)。而 CPU2 跳轉到第①步從頭開始執行。等 CPU2 執行完畢第二步時,結果如下圖所示

入隊操作第③步

CPU2 上 CAS 執行成功,CPU1 和 CPU2 開始進行真正的入隊操作,分別將對象添加到隊列中。

兩個 CPU 同時更新 prod_head 指針,如果 g_prod_tail == l_prod_head, 則更新 g_prod_tail 指針。從上圖中我們可以看出,這個操作最初只能在 CPU1 上執行成功。結果如下:

CPU1 將 g_prod_tail 指針進行了更新,此時 CPU2 上已經滿足了 g_prod_tail == l_prod_head。

入隊操作第⑤步

CPU 執行第④步操作,操作成功後,入隊操作便執行完畢。

DPDK 中的源碼實現如下:

static inline int __attribute__((always_inline))
__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
			 unsigned n, enum rte_ring_queue_behavior behavior)
{
	uint32_t prod_head, prod_next;
	uint32_t cons_tail, free_entries;
	const unsigned max = n;
	int success;
	unsigned i, rep = 0;
	uint32_t mask = r->prod.mask;
	int ret;

	/* move prod.head atomically */
	do {
		/* Reset n to the initial burst count */
		n = max;

		prod_head = r->prod.head;
		cons_tail = r->cons.tail;
		/* The subtraction is done between two unsigned 32bits value
		 * (the result is always modulo 32 bits even if we have
		 * prod_head > cons_tail). So 'free_entries' is always between 0
		 * and size(ring)-1. */
		free_entries = (mask + cons_tail - prod_head);

		/* check that we have enough room in ring */
		if (unlikely(n > free_entries)) {
			if (behavior == RTE_RING_QUEUE_FIXED) {
				__RING_STAT_ADD(r, enq_fail, n);
				return -ENOBUFS;
			}
			else {
				/* No free entry available */
				if (unlikely(free_entries == 0)) {
					__RING_STAT_ADD(r, enq_fail, n);
					return 0;
				}

				n = free_entries;
			}
		}

		prod_next = prod_head + n;
		/*
		*	rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src)
		*
		* if(dst==exp) dst=src;
		* else 
		*		return false;
		*/
		success = rte_atomic32_cmpset(&r->prod.head, prod_head,
					      prod_next);/*此操作應該會從內存中讀取值,並將不同核的修改寫回到內存中*/
	} while (unlikely(success == 0));/*如果失敗,更新相關指針重新操作*/

	/* write entries in ring */
	ENQUEUE_PTRS();
	rte_smp_wmb();/*寫內存屏障*/

	/* if we exceed the watermark */
	if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
		ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
				(int)(n | RTE_RING_QUOT_EXCEED);
		__RING_STAT_ADD(r, enq_quota, n);
	}
	else {
		ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
		__RING_STAT_ADD(r, enq_success, n);
	}

	/*
	 * If there are other enqueues in progress that preceded us,
	 * we need to wait for them to complete
	 */
	while (unlikely(r->prod.tail != prod_head)) {
		rte_pause();

		/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
		 * for other thread finish. It gives pre-empted thread a chance
		 * to proceed and finish with ring dequeue operation. */
		if (RTE_RING_PAUSE_REP_COUNT &&
		    ++rep == RTE_RING_PAUSE_REP_COUNT) {
			rep = 0;
			sched_yield();
		}
	}
	r->prod.tail = prod_next;
	return ret;
}

多消費者–出隊

多消費者出隊官方文檔並沒有說明,我也不再描述,直接附上源碼供大家學習。

static inline int __attribute__((always_inline))
__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
		 unsigned n, enum rte_ring_queue_behavior behavior)
{
	uint32_t cons_head, prod_tail;
	uint32_t cons_next, entries;
	const unsigned max = n;
	int success;
	unsigned i, rep = 0;
	uint32_t mask = r->prod.mask;

	/* move cons.head atomically */
	do {
		/* Restore n as it may change every loop */
		n = max;

		cons_head = r->cons.head;
		prod_tail = r->prod.tail;
		/* The subtraction is done between two unsigned 32bits value
		 * (the result is always modulo 32 bits even if we have
		 * cons_head > prod_tail). So 'entries' is always between 0
		 * and size(ring)-1. */
		entries = (prod_tail - cons_head);

		/* Set the actual entries for dequeue */
		if (n > entries) {
			if (behavior == RTE_RING_QUEUE_FIXED) {
				__RING_STAT_ADD(r, deq_fail, n);
				return -ENOENT;
			}
			else {
				if (unlikely(entries == 0)){
					__RING_STAT_ADD(r, deq_fail, n);
					return 0;
				}

				n = entries;
			}
		}

		cons_next = cons_head + n;
		success = rte_atomic32_cmpset(&r->cons.head, cons_head,
					      cons_next);
	} while (unlikely(success == 0));

	/* copy in table */
	DEQUEUE_PTRS();
	rte_smp_rmb();

	/*
	 * If there are other dequeues in progress that preceded us,
	 * we need to wait for them to complete
	 */
	while (unlikely(r->cons.tail != cons_head)) {
		rte_pause();

		/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
		 * for other thread finish. It gives pre-empted thread a chance
		 * to proceed and finish with ring dequeue operation. */
		if (RTE_RING_PAUSE_REP_COUNT &&
		    ++rep == RTE_RING_PAUSE_REP_COUNT) {
			rep = 0;
			sched_yield();
		}
	}
	__RING_STAT_ADD(r, deq_success, n);
	r->cons.tail = cons_next;

	return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
}

三、緩衝區 Ring 實現

3.1ring 提供的接口

對於一個模塊而言,其對外提供的接口直接表明了它所提供的功能,也是我們分析一個模塊最初的入口。ring 是一個環形無鎖隊列,支持多生產者多消費者操作,所以對於隊列的操作構成了模塊的主要接口。ring 的實現在文件 rte_ring.c 和 rte_ring.h 中。

rte_ring_create() //ring的創建
 
rte_ring_init() //ring的初始化
 
rte_ring_lookup() //ring的查找
 
rte_ring_free() //ring的釋放
 
rte_ring_dump() //獲取ring的使用信息
 
rte_ring_set_water_mark() //設置ring的水標

以上的幾個大的接口提供了 ring 的開始和結束以及查找。同時對於一個隊列來說,還有更多的入隊,出隊操作。如函數

rte_ring_mp_enqueue_burst()//多生產者批量入隊

此處就省略其他很多單(多)生產者,單(多)消費者的操作函數接口了。

3.2ring 的創建及初始化

rte ring 的創建是通過 rte_ring_create() 函數來實現的,這個函數的原型 struct rte_ring * rte_ring_create(const char *name, unsigned count, int socket_id,unsigned flags)

這裏需要注意的是 socket_id 和 flags,在多個進程同時訪問同一個 ring 時,要改善性能,可以創建多個 ring, 同時要注意多個進程綁定在同一個 socket 上。另一個參數 flags 則是表明創建的 ring 是否安全支持多生產者多消費者模型。接下來就來看看創建及初始化中的一些細節。

首先找到 ring_list,ring_list 是掛接在隊列中的,根據 ring 不跨 socket 的原則,推斷是每個 socket 都維護有一個這樣的隊列,具體就不去摳代碼了,先主後次。

接下來就是兩個準備操作:

struct rte_tailq_entry {
	TAILQ_ENTRY(rte_tailq_entry) next; 
	/**< Pointer entries for a tailq list */
	void *data; /**< Pointer to the data referenced by this 	tailq entry */
};

中的 data 指針就指向了創建的 ring 的地址。然後就是爲新創建的 ring 分配內存空間咯,使用了 rte_memzone_reserve() 函數分配,這個函數在內存部分詳細說明,但是需要注意:

rte_memzone_reserve() 只能用於 primary 進程中內存分配,也暗含了對於多生產者多消費者使用的 ring, 其 ring 的創建要在 primary 進程中。

最後就是把分配的 ring 初始化 -rte_ring_init(), 並填充 te 元素,把 te 掛接在隊列中。

3.3ring 的出入隊

ring 的出入隊操作,我們重點來關注幾個接口:

__rte_ring_mp_do_enqueue()
__rte_ring_sp_do_enqueue()
__rte_ring_mc_do_dequeue()
__rte_ring_sc_do_dequeue()

無論使用的哪個上層接口,最終調用的就是這 4 個函數。在使用多生產者多消費者時,函數中會有rte_pause()的操作,裏面使用了__mm_pause()指令,看註釋意思是避免忙等待,主要應用在短時的 loops。至於具體的頭和尾指針的移動。

3.4ring 的使用範圍以及潛在問題

1.ring 的調試信息在 non_EAL 線程中是不支持獲取的。

2.ring 支持多生產者入隊和多消費者出隊,但是都是不可搶佔的。不可搶佔的意思是:

這意味着如果兩個線程在同一個 core 上操作,那麼 2th 線程則必須等到 1th 線程調度後才能訪問,因此,儘量不要在同一個 core 上對同一個 ring 做多生產者同時入隊或者出隊。

3.5 關於水標的使用

在初始化 ring 的時候,可以設置對應的水標位置,但感覺它並未提供設置接口,用的地方不是很多。比如,當入隊已經達到水標位置時,就可以返回對應的錯誤值,上層調用就可以做些處理。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/H9lczBd7egOvJAjjm5hKHA