深入理解高併發技術 dpdk 無鎖隊列

今天給大家分享一篇,DPDK 高性能無鎖隊列的實現,這纔是真正實用,非常考驗大家的工程能力的高級數據結構,很多人說算法和數據結構沒有用,可能你只知道做算法題裏面那些理想數據結構,不知道工作中的各種框架,虛擬機,標準庫,操作系統爲了高性能幫你做好了這一切。

一、dpdk 的 rte_ring 簡介

rte_ring 的實質是 FIFO 的無鎖環形隊列,無鎖隊列的出隊入隊操作是 rte_ring 實現的關鍵。常用於多線程 / 多進程之間的通信。

ring 的特點:

使用方法:

1. 創建一個 ring 對象。

2. 出入隊
有不同的出入隊方式(單、bulk、burst)都在 rte_ring.h 中。

例如:rte_ring_enqueuerte_ring_dequeue

這種數據結構與鏈表隊列相比:

優點如下:

缺點如下:

二、rte_ring 結構體分析

無鎖環形隊列的結構體如下:

struct rte_ring {
    TAILQ_ENTRY(rte_ring) next;     
char name[RTE_MEMZONE_NAMESIZE];      
int flags;                         
const struct rte_memzone *memzone;
struct prod {
uint32_t watermark;        
uint32_t sp_enqueue;       
uint32_t size;             
uint32_t mask;             
volatile uint32_t head;    
volatile uint32_t tail;    
    } prod __rte_cache_aligned;  
struct cons {
uint32_t sc_dequeue;       
uint32_t size;             
uint32_t mask;            
volatile uint32_t head;    
volatile uint32_t tail;    
#ifdef RTE_RING_SPLIT_PROD_CONS  
    } cons __rte_cache_aligned;  
#else
    } cons;  
#endif
#ifdef RTE_LIBRTE_RING_DEBUG  
struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
#endif
void *ring[] __rte_cache_aligned;     
};

dpdkrte_ring_list鏈表中創建一個rte_tailq_entry節點,在memzone中根據隊列的大小 count 申請一塊內存 (rte_ring的大小加上count*sizeof(void *))。緊鄰着rte_ring結構的void *數組用於放置入隊的對象(單純的賦值指針值)。rte_ring結構中有生產者結構prod、消費者結構cons,初始化參數之後,把rte_tailq_entrydata節點指向rte_ring結構地址。

可以注意到cons.head、cons.tail、prod.head、prod.tail的類型都是 uint32_t。除此之外,隊列的大小 count 被限制爲 2 的冪次方。這兩個條件放到一起構成了一個很巧妙的情景。因爲隊列的大小一般不會有 2 的 32 次方那麼大,所以,把隊列取爲 32 位的一個窗口,當窗口的大小是 2 的冪次方,則 32 位包含整數個窗口。這樣,用來存放 ring 對象的 void * 指針數組空間就可只申請一個窗口大小即可。根據二進制的迴環性,可以直接用 (uint32_t)( prod_tail - cons_tail)計算隊列中有多少生產的產品(即使溢出了也不會出錯,如(uint32_t)5-65535 = 6)。

三、rte_ring 實現多進程間通信

rte_ring 需要與 rte_mempool 配合使用,通過 rte_mempool 來共享內存。dpdk 多進程示例解讀(examples/multi_process/simple_mp),實現進程之間的 master 和 slave 線程互發字串 :

int
main(int argc, char **argv)
{
const unsigned flags = 0;
const unsigned ring_size = 64;
const unsigned pool_size = 1024;
const unsigned pool_cache = 32;
const unsigned priv_data_sz = 0;
int ret;
unsigned lcore_id;
        ret = rte_eal_init(argc, argv);
if (ret < 0)
                rte_exit(EXIT_FAILURE, "Cannot init EAL\n");
if (rte_eal_process_type() == RTE_PROC_PRIMARY){
                send_ring = rte_ring_create(_PRI_2_SEC, ring_size, rte_socket_id(), flags);
                recv_ring = rte_ring_create(_SEC_2_PRI, ring_size, rte_socket_id(), flags);
                message_pool = rte_mempool_create(_MSG_POOL, pool_size,
                                STR_TOKEN_SIZE, pool_cache, priv_data_sz,
NULL, NULL, NULL, NULL,
                                rte_socket_id(), flags);
        } else {
                recv_ring = rte_ring_lookup(_PRI_2_SEC);
                send_ring = rte_ring_lookup(_SEC_2_PRI);
                message_pool = rte_mempool_lookup(_MSG_POOL);
        }
if (send_ring == NULL)
                rte_exit(EXIT_FAILURE, "Problem getting sending ring\n");
if (recv_ring == NULL)
                rte_exit(EXIT_FAILURE, "Problem getting receiving ring\n");
if (message_pool == NULL)
                rte_exit(EXIT_FAILURE, "Problem getting message pool\n");
        RTE_LOG(INFO, APP, "Finished Process Init.\n");
        RTE_LCORE_FOREACH_SLAVE(lcore_id) {
                rte_eal_remote_launch(lcore_recv, NULL, lcore_id);
        }
struct cmdline *cl = cmdline_stdin_new(simple_mp_ctx, "\nsimple_mp > ");
if (cl == NULL)
                rte_exit(EXIT_FAILURE, "Cannot create cmdline instance\n");
        cmdline_interact(cl);
        cmdline_stdin_exit(cl);
        rte_eal_mp_wait_lcore();
return 0;
}

使用時,rte_mempool_getmempool 中獲取一個對象,然後使用rte_ring_enqueue入隊列,另一個進程通過rte_ring_dequeue來出隊列,使用完成後需要rte_mempool_put將對象放回mempool

send:

static void cmd_send_parsed(void *parsed_result,
                __attribute__((unused)) struct cmdline *cl,
                __attribute__((unused)) void *data)
{
void *msg = NULL;
struct cmd_send_result *res = parsed_result;
if (rte_mempool_get(message_pool, &msg) < 0)
                rte_panic("Failed to get message buffer\n");
        strlcpy((char *)msg, res->message, STR_TOKEN_SIZE);
if (rte_ring_enqueue(send_ring, msg) < 0) {
printf("Failed to send message - message discarded\n");
                rte_mempool_put(message_pool, msg);
        }
}

receive:

static int
lcore_recv(__attribute__((unused)) void *arg)
{
unsigned lcore_id = rte_lcore_id();
printf("Starting core %u\n", lcore_id);
while (!quit){
void *msg;
if (rte_ring_dequeue(recv_ring, &msg) < 0){
                        usleep(5);
continue;
                }
printf("core %u: Received '%s'\n", lcore_id, (char *)msg);
                rte_mempool_put(message_pool, msg);
        }
return 0;
}

四、實現多生產 / 消費者同時生產 / 消費(同時出入隊)

1、多生產者入隊流程:

/**
 * @internal Enqueue several objects on the ring (multi-producers safe). 
 * 
 * This function uses a "compare and set" instruction to move the 
 * producer index atomically. 
 * 
 * @param r 
 *   A pointer to the ring structure. 
 * @param obj_table 
 *   A pointer to a table of void * pointers (objects). 
 * @param n 
 *   The number of objects to add in the ring from the obj_table. 
 * @param behavior 
 *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring 
 *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring 
 * @return 
 *   Depend on the behavior value 
 *   if behavior = RTE_RING_QUEUE_FIXED 
 *   - 0: Success; objects enqueue. 
 *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the 
 *     high water mark is exceeded. 
 *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. 
 *   if behavior = RTE_RING_QUEUE_VARIABLE 
 *   - n: Actual number of objects enqueued. 
 */
static inline int __attribute__((always_inline))  
__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,  
unsigned n, enum rte_ring_queue_behavior behavior)  
{  
uint32_t prod_head, prod_next;  
uint32_t cons_tail, free_entries;  
const unsigned max = n;  
int success;  
unsigned i, rep = 0;  
uint32_t mask = r->prod.mask;  
int ret;  
/* Avoid the unnecessary cmpset operation below, which is also 
     * potentially harmful when n equals 0. */
if (n == 0)  
return 0;  
/* move prod.head atomically */
do {  
/* Reset n to the initial burst count */
        n = max;  
/* 1. 搶佔移動prod.head */
        prod_head = r->prod.head;  
        cons_tail = r->cons.tail;  
/* The subtraction is done between two unsigned 32bits value 
         * (the result is always modulo 32 bits even if we have 
         * prod_head > cons_tail). So 'free_entries' is always between 0 
         * and size(ring)-1. */
/* 2.檢查free空間是否足夠 */
        free_entries = (mask + cons_tail - prod_head);  
/* check that we have enough room in ring */
if (unlikely(n > free_entries)) {  
if (behavior == RTE_RING_QUEUE_FIXED) {  
                __RING_STAT_ADD(r, enq_fail, n);  
return -ENOBUFS;  
            }  
else {  
/* No free entry available */
if (unlikely(free_entries == 0)) {  
                    __RING_STAT_ADD(r, enq_fail, n);  
return 0;  
                }  
                n = free_entries;  
            }  
        }  
/* 3.利用cas操作,移動r->prod.head,預約生產*/
        prod_next = prod_head + n;  
        success = rte_atomic32_cmpset(&r->prod.head, prod_head,  
                          prod_next);  
    } while (unlikely(success == 0));  
/* write entries in ring */
    ENQUEUE_PTRS();  
    rte_smp_wmb();  
/* if we exceed the watermark */
/*4.檢查是否到了閾值,並添加到統計中*/
if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {  
        ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :  
                (int)(n | RTE_RING_QUOT_EXCEED);  
        __RING_STAT_ADD(r, enq_quota, n);  
    }  
else {  
        ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;  
        __RING_STAT_ADD(r, enq_success, n);  
    }  
/* 
     * If there are other enqueues in progress that preceded us, 
     * we need to wait for them to complete 
     */
/*5.等待之前的入隊操作完成,移動實際位置*/
while (unlikely(r->prod.tail != prod_head)) {  
        rte_pause();  
/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting 
         * for other thread finish. It gives pre-empted thread a chance 
         * to proceed and finish with ring dequeue operation. */
if (RTE_RING_PAUSE_REP_COUNT &&  
            ++rep == RTE_RING_PAUSE_REP_COUNT) {  
            rep = 0;  
            sched_yield();  
        }  
    }  
    r->prod.tail = prod_next;  
return ret;  
}
下面介紹當兩個生產者同時添加對象到ring時發生了什麼。

1)在初始狀態, prod_head 和 prod_tail 指向相同的位置:

在兩個生產者 core 中(這個 core 可以理解成同時運行的線程或進程),各自的局部變量都保存 ring->prod_head 和 ring->cons_tail。各自的局部變量 prod_next 索引指向 ring->prod_head 的下一個元素,如果是批量入隊,指向下幾個元素。假如 ring 裏沒有足夠的空間(檢查 cons_tail 獲知),入隊函數將返回 error:

prod_head = r->prod.head;  
cons_tail = r->cons.tail;  
...
free_entries = (mask + cons_tail - prod_head);  
...
prod_next = prod_head + n;

2)第二步是修改 ring 結構體裏的 ring->prod_head 索引,將它指向上面提到的局部變量 prod_next 指向的位置:

這個操作是通過使用 Compare And Swap (CAS) 執行完成的,rte_atomic32_cmpset() 所做的就是 CAS(compare and set) 操作,是無鎖隊列實現的關鍵。Compare And Swap (CAS) 包含以下原子操作:

在上圖中,生產者 core1 執行成功後,生產者 core2 重新運行後成功。

do {
   ...
   prod_head = r->prod.head;  
     cons_tail = r->cons.tail;
     ...
   success = rte_atomic32_cmpset(&r->prod.head, prod_head, prod_next); 
   ... 
  } while (unlikely(success == 0));

3)生產者 core2 中 CAS 指令重試成功

生產者 core1 更新對象 obj4 到 ring 中,生產者 core2 更新對象 obj5 到 ring 中(CAS 指令重試後執行成功的)。

/* write entries in ring */
    ENQUEUE_PTRS();  
    rte_smp_wmb();//內存屏障,防止亂序

4)現在每個生產者 core 都想更新 ring->prod_tail 索引。生產者 core 代碼中,只有 ring->prod_tail 等於自己局部變量 prod_head 才能被更新,顯然從上圖中可知,只有生產者 core1 才能滿足,生產者 core1 完成了入隊操作。

  1. 一旦生產者 core1 更新了 ring->prod_tail 後,生產者 core2 也可以更新 ring->prod_tail 了。生產者 core2 也完成了入隊操作

(4)(5) 兩步對應代碼:

/* 
     * If there are other enqueues in progress that preceded us, 
     * we need to wait for them to complete 
     */
while (unlikely(r->prod.tail != prod_head)) {  
        rte_pause();  
/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting  
         * for other thread finish. It gives pre-empted thread a chance  
         * to proceed and finish with ring dequeue operation. */
if (RTE_RING_PAUSE_REP_COUNT &&  
            ++rep == RTE_RING_PAUSE_REP_COUNT) {  
            rep = 0;  
            sched_yield();  
        }  
    }  
    r->prod.tail = prod_next;
  1. 多消費者出隊流程:
static inline int __attribute__((always_inline))  
__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,  
unsigned n, enum rte_ring_queue_behavior behavior)  
{  
uint32_t cons_head, prod_tail;  
uint32_t cons_next, entries;  
const unsigned max = n;  
int success;  
unsigned i, rep = 0;  
uint32_t mask = r->prod.mask;  
/* Avoid the unnecessary cmpset operation below, which is also  
     * potentially harmful when n equals 0. */
if (n == 0)  
return 0;  
/* move cons.head atomically   
    cgm  
    1.檢查可消費空間是否足夠  
    2.cms消費預約*/
do {  
/* Restore n as it may change every loop */
        n = max;  
        cons_head = r->cons.head;  
        prod_tail = r->prod.tail;  
/* The subtraction is done between two unsigned 32bits value  
         * (the result is always modulo 32 bits even if we have  
         * cons_head > prod_tail). So 'entries' is always between 0  
         * and size(ring)-1. */
        entries = (prod_tail - cons_head);  
/* Set the actual entries for dequeue */
if (n > entries) {  
if (behavior == RTE_RING_QUEUE_FIXED) {  
                __RING_STAT_ADD(r, deq_fail, n);  
return -ENOENT;  
            }  
else {  
if (unlikely(entries == 0)){  
                    __RING_STAT_ADD(r, deq_fail, n);  
return 0;  
                }  
                n = entries;  
            }  
        }  
        cons_next = cons_head + n;  
        success = rte_atomic32_cmpset(&r->cons.head, cons_head,  
                          cons_next);  
    } while (unlikely(success == 0));  
/* copy in table */
    DEQUEUE_PTRS();  
    rte_smp_rmb();  
/*  
     * If there are other dequeues in progress that preceded us,  
     * we need to wait for them to complete  
     cgm 等待之前的出隊操作完成  
     */
while (unlikely(r->cons.tail != cons_head)) {  
        rte_pause();  
/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting  
         * for other thread finish. It gives pre-empted thread a chance  
         * to proceed and finish with ring dequeue operation. */
if (RTE_RING_PAUSE_REP_COUNT &&  
            ++rep == RTE_RING_PAUSE_REP_COUNT) {  
            rep = 0;  
            sched_yield();  
        }  
    }  
    __RING_STAT_ADD(r, deq_success, n);  
    r->cons.tail = cons_next;  
return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;  
}

同生產者一個道理,代碼中加了點註釋,可以再看一下上面的代碼。

參考:

https://blog.csdn.net/qq_15437629/article/details/78147874

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/PqAKmmGFasPXo4NkTTb_Hg