深入理解高併發技術 dpdk 無鎖隊列
今天給大家分享一篇,DPDK 高性能無鎖隊列的實現,這纔是真正實用,非常考驗大家的工程能力的高級數據結構,很多人說算法和數據結構沒有用,可能你只知道做算法題裏面那些理想數據結構,不知道工作中的各種框架,虛擬機,標準庫,操作系統爲了高性能幫你做好了這一切。
一、dpdk 的 rte_ring 簡介
rte_ring 的實質是 FIFO 的無鎖環形隊列,無鎖隊列的出隊入隊操作是 rte_ring 實現的關鍵。常用於多線程 / 多進程之間的通信。
ring 的特點:
-
無鎖出入隊(除了 cas(compare and swap) 操作)
-
多消費 / 生產者同時出入隊
使用方法:
1. 創建一個 ring 對象。
2. 出入隊
有不同的出入隊方式(單、bulk、burst)都在 rte_ring.h 中。
例如:rte_ring_enqueue
和rte_ring_dequeue
這種數據結構與鏈表隊列相比:
優點如下:
-
更快:比較 void * 大小的數據,只需要執行單次 Compare-And-Swap 指令,而不需要執行 2 次 Compare-And-Swap 指令
-
比完全無鎖隊列簡單
-
適用於批量入隊 / 出隊操作。因爲指針存儲在表中,多個對象出隊並不會像鏈表隊列那樣產生大量的緩存未命中,此外,多個對象批量出隊不會比單個對象出隊開銷大
-
CAS(Compare and Swap) 是個原子操作
缺點如下:
-
大小固定
-
許多環在內存方面的成本比鏈表列表的成本更高。空環至少包含 N 個指針。
二、rte_ring 結構體分析
無鎖環形隊列的結構體如下:
struct rte_ring {
TAILQ_ENTRY(rte_ring) next;
char name[RTE_MEMZONE_NAMESIZE];
int flags;
const struct rte_memzone *memzone;
struct prod {
uint32_t watermark;
uint32_t sp_enqueue;
uint32_t size;
uint32_t mask;
volatile uint32_t head;
volatile uint32_t tail;
} prod __rte_cache_aligned;
struct cons {
uint32_t sc_dequeue;
uint32_t size;
uint32_t mask;
volatile uint32_t head;
volatile uint32_t tail;
#ifdef RTE_RING_SPLIT_PROD_CONS
} cons __rte_cache_aligned;
#else
} cons;
#endif
#ifdef RTE_LIBRTE_RING_DEBUG
struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
#endif
void *ring[] __rte_cache_aligned;
};
dpdk
在rte_ring_list
鏈表中創建一個rte_tailq_entry
節點,在memzone
中根據隊列的大小 count 申請一塊內存 (rte_ring
的大小加上count*sizeof(void *)
)。緊鄰着rte_ring
結構的void *
數組用於放置入隊的對象(單純的賦值指針值)。rte_ring
結構中有生產者結構prod
、消費者結構cons
,初始化參數之後,把rte_tailq_entry
的data
節點指向rte_ring
結構地址。
可以注意到cons.head、cons.tail、prod.head、prod.tail
的類型都是 uint32_t。除此之外,隊列的大小 count 被限制爲 2 的冪次方。這兩個條件放到一起構成了一個很巧妙的情景。因爲隊列的大小一般不會有 2 的 32 次方那麼大,所以,把隊列取爲 32 位的一個窗口,當窗口的大小是 2 的冪次方,則 32 位包含整數個窗口。這樣,用來存放 ring 對象的 void * 指針數組空間就可只申請一個窗口大小即可。根據二進制的迴環性,可以直接用 (uint32_t)( prod_tail - cons_tail)
計算隊列中有多少生產的產品(即使溢出了也不會出錯,如(uint32_t)5-65535 = 6
)。
三、rte_ring 實現多進程間通信
rte_ring 需要與 rte_mempool 配合使用,通過 rte_mempool 來共享內存。dpdk 多進程示例解讀(examples/multi_process/simple_mp),實現進程之間的 master 和 slave 線程互發字串 :
int
main(int argc, char **argv)
{
const unsigned flags = 0;
const unsigned ring_size = 64;
const unsigned pool_size = 1024;
const unsigned pool_cache = 32;
const unsigned priv_data_sz = 0;
int ret;
unsigned lcore_id;
ret = rte_eal_init(argc, argv);
if (ret < 0)
rte_exit(EXIT_FAILURE, "Cannot init EAL\n");
if (rte_eal_process_type() == RTE_PROC_PRIMARY){
send_ring = rte_ring_create(_PRI_2_SEC, ring_size, rte_socket_id(), flags);
recv_ring = rte_ring_create(_SEC_2_PRI, ring_size, rte_socket_id(), flags);
message_pool = rte_mempool_create(_MSG_POOL, pool_size,
STR_TOKEN_SIZE, pool_cache, priv_data_sz,
NULL, NULL, NULL, NULL,
rte_socket_id(), flags);
} else {
recv_ring = rte_ring_lookup(_PRI_2_SEC);
send_ring = rte_ring_lookup(_SEC_2_PRI);
message_pool = rte_mempool_lookup(_MSG_POOL);
}
if (send_ring == NULL)
rte_exit(EXIT_FAILURE, "Problem getting sending ring\n");
if (recv_ring == NULL)
rte_exit(EXIT_FAILURE, "Problem getting receiving ring\n");
if (message_pool == NULL)
rte_exit(EXIT_FAILURE, "Problem getting message pool\n");
RTE_LOG(INFO, APP, "Finished Process Init.\n");
RTE_LCORE_FOREACH_SLAVE(lcore_id) {
rte_eal_remote_launch(lcore_recv, NULL, lcore_id);
}
struct cmdline *cl = cmdline_stdin_new(simple_mp_ctx, "\nsimple_mp > ");
if (cl == NULL)
rte_exit(EXIT_FAILURE, "Cannot create cmdline instance\n");
cmdline_interact(cl);
cmdline_stdin_exit(cl);
rte_eal_mp_wait_lcore();
return 0;
}
使用時,rte_mempool_get
從mempoo
l 中獲取一個對象,然後使用rte_ring_enqueue
入隊列,另一個進程通過rte_ring_dequeue
來出隊列,使用完成後需要rte_mempool_put
將對象放回mempool
:
send:
static void cmd_send_parsed(void *parsed_result,
__attribute__((unused)) struct cmdline *cl,
__attribute__((unused)) void *data)
{
void *msg = NULL;
struct cmd_send_result *res = parsed_result;
if (rte_mempool_get(message_pool, &msg) < 0)
rte_panic("Failed to get message buffer\n");
strlcpy((char *)msg, res->message, STR_TOKEN_SIZE);
if (rte_ring_enqueue(send_ring, msg) < 0) {
printf("Failed to send message - message discarded\n");
rte_mempool_put(message_pool, msg);
}
}
receive:
static int
lcore_recv(__attribute__((unused)) void *arg)
{
unsigned lcore_id = rte_lcore_id();
printf("Starting core %u\n", lcore_id);
while (!quit){
void *msg;
if (rte_ring_dequeue(recv_ring, &msg) < 0){
usleep(5);
continue;
}
printf("core %u: Received '%s'\n", lcore_id, (char *)msg);
rte_mempool_put(message_pool, msg);
}
return 0;
}
四、實現多生產 / 消費者同時生產 / 消費(同時出入隊)
-
移動 prod.head 表示生產者預定的生產數量
-
當該生產者生產結束,且在此之前的生產也都結束後,移動 prod.tail 表示實際生產的位置
-
同樣,移動 cons.head 表示消費者預定的消費數量
-
當該消費者消費結束,且在此之前的消費也都結束後,移動 cons.tail 表示實際消費的位置
1、多生產者入隊流程:
/**
* @internal Enqueue several objects on the ring (multi-producers safe).
*
* This function uses a "compare and set" instruction to move the
* producer index atomically.
*
* @param r
* A pointer to the ring structure.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
* @param behavior
* RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
* @return
* Depend on the behavior value
* if behavior = RTE_RING_QUEUE_FIXED
* - 0: Success; objects enqueue.
* - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
* high water mark is exceeded.
* - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
* if behavior = RTE_RING_QUEUE_VARIABLE
* - n: Actual number of objects enqueued.
*/
static inline int __attribute__((always_inline))
__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
uint32_t prod_head, prod_next;
uint32_t cons_tail, free_entries;
const unsigned max = n;
int success;
unsigned i, rep = 0;
uint32_t mask = r->prod.mask;
int ret;
/* Avoid the unnecessary cmpset operation below, which is also
* potentially harmful when n equals 0. */
if (n == 0)
return 0;
/* move prod.head atomically */
do {
/* Reset n to the initial burst count */
n = max;
/* 1. 搶佔移動prod.head */
prod_head = r->prod.head;
cons_tail = r->cons.tail;
/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
* prod_head > cons_tail). So 'free_entries' is always between 0
* and size(ring)-1. */
/* 2.檢查free空間是否足夠 */
free_entries = (mask + cons_tail - prod_head);
/* check that we have enough room in ring */
if (unlikely(n > free_entries)) {
if (behavior == RTE_RING_QUEUE_FIXED) {
__RING_STAT_ADD(r, enq_fail, n);
return -ENOBUFS;
}
else {
/* No free entry available */
if (unlikely(free_entries == 0)) {
__RING_STAT_ADD(r, enq_fail, n);
return 0;
}
n = free_entries;
}
}
/* 3.利用cas操作,移動r->prod.head,預約生產*/
prod_next = prod_head + n;
success = rte_atomic32_cmpset(&r->prod.head, prod_head,
prod_next);
} while (unlikely(success == 0));
/* write entries in ring */
ENQUEUE_PTRS();
rte_smp_wmb();
/* if we exceed the watermark */
/*4.檢查是否到了閾值,並添加到統計中*/
if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
(int)(n | RTE_RING_QUOT_EXCEED);
__RING_STAT_ADD(r, enq_quota, n);
}
else {
ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
__RING_STAT_ADD(r, enq_success, n);
}
/*
* If there are other enqueues in progress that preceded us,
* we need to wait for them to complete
*/
/*5.等待之前的入隊操作完成,移動實際位置*/
while (unlikely(r->prod.tail != prod_head)) {
rte_pause();
/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
* for other thread finish. It gives pre-empted thread a chance
* to proceed and finish with ring dequeue operation. */
if (RTE_RING_PAUSE_REP_COUNT &&
++rep == RTE_RING_PAUSE_REP_COUNT) {
rep = 0;
sched_yield();
}
}
r->prod.tail = prod_next;
return ret;
}
下面介紹當兩個生產者同時添加對象到ring時發生了什麼。
1)在初始狀態, prod_head 和 prod_tail 指向相同的位置:
在兩個生產者 core 中(這個 core 可以理解成同時運行的線程或進程),各自的局部變量都保存 ring->prod_head 和 ring->cons_tail。各自的局部變量 prod_next 索引指向 ring->prod_head 的下一個元素,如果是批量入隊,指向下幾個元素。假如 ring 裏沒有足夠的空間(檢查 cons_tail 獲知),入隊函數將返回 error:
prod_head = r->prod.head;
cons_tail = r->cons.tail;
...
free_entries = (mask + cons_tail - prod_head);
...
prod_next = prod_head + n;
2)第二步是修改 ring 結構體裏的 ring->prod_head 索引,將它指向上面提到的局部變量 prod_next 指向的位置:
-
如果 ring->prod_head 索引和局部變量 prod_head 索引不相等,CAS 操作失敗,代碼將從新從第一步開始執行。
-
若相等,將 ring->prod_head 索引指向局部變量 prod_next 的位置,CAS 操作成功,繼續下一步處理。
在上圖中,生產者 core1 執行成功後,生產者 core2 重新運行後成功。
do {
...
prod_head = r->prod.head;
cons_tail = r->cons.tail;
...
success = rte_atomic32_cmpset(&r->prod.head, prod_head, prod_next);
...
} while (unlikely(success == 0));
3)生產者 core2 中 CAS 指令重試成功
生產者 core1 更新對象 obj4 到 ring 中,生產者 core2 更新對象 obj5 到 ring 中(CAS 指令重試後執行成功的)。
/* write entries in ring */
ENQUEUE_PTRS();
rte_smp_wmb();//內存屏障,防止亂序
4)現在每個生產者 core 都想更新 ring->prod_tail 索引。生產者 core 代碼中,只有 ring->prod_tail 等於自己局部變量 prod_head 才能被更新,顯然從上圖中可知,只有生產者 core1 才能滿足,生產者 core1 完成了入隊操作。
- 一旦生產者 core1 更新了 ring->prod_tail 後,生產者 core2 也可以更新 ring->prod_tail 了。生產者 core2 也完成了入隊操作
(4)(5) 兩步對應代碼:
/*
* If there are other enqueues in progress that preceded us,
* we need to wait for them to complete
*/
while (unlikely(r->prod.tail != prod_head)) {
rte_pause();
/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
* for other thread finish. It gives pre-empted thread a chance
* to proceed and finish with ring dequeue operation. */
if (RTE_RING_PAUSE_REP_COUNT &&
++rep == RTE_RING_PAUSE_REP_COUNT) {
rep = 0;
sched_yield();
}
}
r->prod.tail = prod_next;
- 多消費者出隊流程:
static inline int __attribute__((always_inline))
__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
uint32_t cons_head, prod_tail;
uint32_t cons_next, entries;
const unsigned max = n;
int success;
unsigned i, rep = 0;
uint32_t mask = r->prod.mask;
/* Avoid the unnecessary cmpset operation below, which is also
* potentially harmful when n equals 0. */
if (n == 0)
return 0;
/* move cons.head atomically
cgm
1.檢查可消費空間是否足夠
2.cms消費預約*/
do {
/* Restore n as it may change every loop */
n = max;
cons_head = r->cons.head;
prod_tail = r->prod.tail;
/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
* cons_head > prod_tail). So 'entries' is always between 0
* and size(ring)-1. */
entries = (prod_tail - cons_head);
/* Set the actual entries for dequeue */
if (n > entries) {
if (behavior == RTE_RING_QUEUE_FIXED) {
__RING_STAT_ADD(r, deq_fail, n);
return -ENOENT;
}
else {
if (unlikely(entries == 0)){
__RING_STAT_ADD(r, deq_fail, n);
return 0;
}
n = entries;
}
}
cons_next = cons_head + n;
success = rte_atomic32_cmpset(&r->cons.head, cons_head,
cons_next);
} while (unlikely(success == 0));
/* copy in table */
DEQUEUE_PTRS();
rte_smp_rmb();
/*
* If there are other dequeues in progress that preceded us,
* we need to wait for them to complete
cgm 等待之前的出隊操作完成
*/
while (unlikely(r->cons.tail != cons_head)) {
rte_pause();
/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
* for other thread finish. It gives pre-empted thread a chance
* to proceed and finish with ring dequeue operation. */
if (RTE_RING_PAUSE_REP_COUNT &&
++rep == RTE_RING_PAUSE_REP_COUNT) {
rep = 0;
sched_yield();
}
}
__RING_STAT_ADD(r, deq_success, n);
r->cons.tail = cons_next;
return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
}
同生產者一個道理,代碼中加了點註釋,可以再看一下上面的代碼。
參考:
https://blog.csdn.net/qq_15437629/article/details/78147874
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/PqAKmmGFasPXo4NkTTb_Hg