dpdk 原理概述及核心源碼剖析
dpdk 原理
1、操作系統、計算機網絡誕生已經幾十年了,部分功能不再能滿足現在的業務需求。如果對操作系統做更改,成本非常高,所以部分問題是在應用層想辦法解決的,比如前面介紹的協程、quic 等,都是在應用層重新開發的框架,簡單回顧如下:
-
協程:server 多線程通信時,如果每連接一個客戶端就要生成一個線程去處理,對 server 硬件資源消耗極大!爲了解決多線程以及互相切換帶來的性能損耗,應用層發明了協程框架:單線程人爲控制跳轉到不同的代碼塊執行,避免了 cpu 浪費、線程鎖 / 切換等一系列耗時的問題!
-
quic 協議:tcp 協議已經深度嵌入了操作系統,更改起來難度很大,所以同樣也是在應用層基於 udp 協議實現了 tls、擁塞控制等,徹底讓協議和操作系統松耦合!
除了上述問題,操作還有另一個比較嚴重的問題:基於 os 內核的網絡數據 IO!傳統做網絡開發時,接收和發送數據用的是操作系統提供的 receive 和 send 函數,用戶配置一下網絡參數、傳入應用層的數據即可!操作系統由於集成了協議棧,會在用戶傳輸的應用層數據前面加上協議不同層級的包頭,然後通過網卡發送數據;接收到的數據處理方式類似,按照協議類型一層一層撥開,直到獲取到應用層的數據!整個流程大致如下:
網卡接受數據 -----> 發出硬件中斷通知 cpu 來取數據 ----->os 把數據複製到內存並啓動內核線程 ---> 軟件中斷 ---> 內核線程在協議棧中處理包 ---> 處理完畢通知用戶層
大家有沒有覺得這個鏈條忒長啊?這麼長的處理流程帶來的問題:
-
“中間商” 多,整個流程耗時;數據進入下一個環節時容易 cache miss
-
同一份數據在內存不同的地方存儲(緩存內存、內核內存、用戶空間的內存),浪費內存
-
網卡通過中斷通知 cpu,每次硬中斷大約消耗 100 微秒,這還不算因爲終止上下文所帶來的 Cache Miss(L1、L2、TLB 等 cpu 的 cache 可能都會更新)
-
用戶到內核態的上下文切換耗時
-
數據在內核態用戶態之間切換拷貝帶來大量 CPU 消耗,全局鎖競爭
-
內核工作在多核上,爲保障全局一致,即使採用 Lock Free,也避免不了鎖總線、內存屏障帶來的性能損耗
這一系列的問題都是內核處理網卡接收到的數據導致的。大膽一點想象:如果不讓內核處理網卡數據了?能不能避免上述各個環節的損耗了?能不能讓 3 環的應用直接控制網卡收發數據了?
2、如果真的通過 3 環應用層直接讀寫網卡,面臨的問題:
-
用戶空間的內存要映射到網卡,才能直接讀寫網卡
-
驅動要運行在用戶空間
(1)這兩個問題是怎麼解決的了?這一切都得益於 linux 提供的 UIO 機制!UIO 能夠攔截中斷,並重設中斷回調行爲(相當於 hook 了,這個功能還是要在內核實現的,因爲硬件中斷只能在內核處理),從而繞過內核協議棧後續的處理流程。這裏借用別人的一張圖:
UIO 設備的實現機制其實是對用戶空間暴露文件接口,比如當註冊一個 UIO 設備 uioX,就會出現文件 /dev/uioX(用於讀取中斷,底層還是要在內核處理,因爲硬件中斷只能發生在內核),對該文件的讀寫就是對設備內存的讀寫(通過 mmap 實現)。除此之外,對設備的控制還可以通過 /sys/class/uio 下的各個文件的讀寫來完成。所以 UIO 的本質:
- 讓用戶空間的程序攔截內核的中斷,更改中斷的 handler 處理函數,讓用戶空間的程序第一時間拿到剛從網卡接收到的 “一手、熱乎” 數據,減少內核的數據處理流程!由於應用程序拿到的是網絡鏈路層(也就是第二層)的數據,這就需要應用程序自己按照協議解析數據了!說個額外的:這個功能可以用來抓包!
簡化後的示意圖如下:原本網卡是由操作系統內核接管的,現在直接由 3 環的 dpdk 應用控制了!
這就是 dpdk 的第一個優點;除了這個,還有以下幾個:
(2)Huge Page 大頁:傳統頁面大小是 4Kb,如果進程要使用 64G 內存,則 64G/4KB=16000000(一千六百萬)頁,所有在頁表項中佔用 16000000 * 4B=62MB;但是 TLB 緩存的空間是有限的,不可能存儲這麼多頁面的地址映射關係,所以可能導致 TLB miss;如果改成 2MB 的 huge Page,所需頁面減少到 64G/2MB=2000 個。在 TLB 容量有限的情況下儘可能地多在 TLB 存放地址映射,極大減少了 TLB miss!下圖是採用不同大小頁面時 TLB 能覆蓋的內存對比!
(3)mempool 內存池:任何網絡協議都要處理報文,這些報文肯定是存放在內存的!申請和釋放內存就需要調用 malloc 和 free 函數了!這兩個是系統調用,涉及到上下文切換;同時還要用 buddy 或 slab 算法查找空閒內存塊,效率較低!dpdk 在用戶空間實現了一套精巧的內存池技術,內核空間和用戶空間的內存交互不進行拷貝,只做控制權轉移。當收發數據包時,就減少了內存拷貝的開銷!
(4)Ring 無鎖環:多線程 / 多進程之間互斥,傳統的方式就是上鎖!但是 dpdk 基於 Linux 內核的無鎖環形緩衝 kfifo 實現了自己的一套無鎖機制,支持多消費者或單消費者出隊、多生產者或單生產者入隊;
(5)PMD poll-mode 網卡驅動:網絡 IO 監聽有兩種方式,分別是
-
事件驅動,比如 epoll:這種方式進程讓出 cpu 後等數據;一旦有了數據,網卡通過中斷通知操作系統,然後喚醒進程繼續執行!這種方式適合於接收的數據量不大,但實時性要求高的場景;
-
輪詢,比如 poll:本質就是用死循環不停的檢查內存有沒有數據到來!這種方式適合於接收大塊數據,實時性要求不高的場景;
總的來說說:中斷是外界強加給的信號,必須被動應對,而輪詢則是應用程序主動地處理事情。前者最大的影響就是打斷系統當前工作的連續性,而後者則不會,事務的安排自在掌握!
dpdk 採用第二種輪詢方式:直接用死循環不停的地檢查網卡內存,帶來了零拷貝、無系統調用的好處,同時避免了網卡硬件中斷帶來的上下文切換(理論上會消耗 300 個時鐘週期)、cache miss、硬中斷執行等損耗!
(6)NUMA:dpdk 內存分配上通過 proc 提供的內存信息,使 CPU 核心儘量使用靠近其所在節點的內存,避免了跨 NUMA 節點遠程訪問內存的性能問題;其軟件架構去中心化,儘量避免全局共享,帶來全局競爭,失去橫向擴展的能力
(7)CPU 親和性: dpdk 利用 CPU 的親和性將一個線程或多個線程綁定到一個或多個 CPU 上,這樣在線程執行過程中,就不會被隨意調度,一方面減少了線程間的頻繁切換帶來的開銷,另一方面避免了 CPU L1、L2、TLB 等緩存的局部失效性,增加了 CPU cache 的命中率。
dpdk 核心源碼
dpdk 是 intel 主導開發的網絡編程框架, 有這麼多的優點,都是怎麼實現的了?
1、UIO 原理:dpdk 繞過了操作系統內核,直接接管網卡,用戶程序可以直接在 3 環讀寫網卡的數據,這就涉及到兩個關鍵技術點了:
-
地址映射:3 環的程序是怎麼定位到網卡數據存放在哪的了?
-
攔截硬件中斷:傳統數據處理流程是網卡收到數據後通過硬件中斷通知 cpu 來取數據,3 環的程序肯定要攔截這個中斷,然後通過輪詢方式取數據,這個又是怎麼實現的了?
(1)地址映射:3 環程序最常使用的就是內存地址了,一共 32 或 64bit;C/C++ 層面可以通過指針直接讀寫地址的值;除了內存,還有很多設備也需要和 cpu 交互數據,比如顯示器:要在屏幕顯示的內容肯定是需要用戶指定的,用戶程序可以把顯示的內容發送到顯示器指定的地方,然後再屏幕打印出來。爲了方便用戶程序發送數據,硬件層面會把顯示器的部分存儲空間映射到內存地址,做到了和內存條硬件的尋址方式一樣,用戶也可以直接通過指針往這裏寫數據(彙編層面直接通過 mov 指令操作即可)!網卡也類似:網卡是插在 pci 插槽的,網卡(或者說 pci 插槽)的存儲空間也會映射到內存地址,應用程序讀寫這塊物理地址就等同於讀寫網卡的存儲空間!實際寫代碼時,由於要深入驅動,pci 網卡預留物理的內存與 io 空間會保存到 uio 設備上,相當於將這些物理空間與 io 空間暴露給 uio 設備,應用程序訪問這些 uio 設備即可!幾個關鍵的函數如下:
將 pci 網卡的物理內存空間以及 io 空間保存在 uio 設備結構 struct uio_info 中的 mem 成員以及 port 成員中,uio 設備就知道了網卡的物理以及 io 空間。應用層訪問這個 uio 設備的物理空間以及 io 空間,就相當於訪問 pci 設備的物理以及 io 空間;本質上就是將 pci 網卡的空間暴露給 uio 設備。
int igbuio_pci_probe(struct pci_dev *dev, const struct pci_device_id *id)
{
//將pci內存,端口映射給uio設備
struct rte_uio_pci_dev *udev;
err = igbuio_setup_bars(dev, &udev->info);
}
static int igbuio_setup_bars(struct pci_dev *dev, struct uio_info *info)
{
//pci內存,端口映射給uio設備
for (i = 0; i != sizeof(bar_names) / sizeof(bar_names[0]); i++)
{
if (pci_resource_len(dev, i) != 0 && pci_resource_start(dev, i) != 0)
{
flags = pci_resource_flags(dev, i);
if (flags & IORESOURCE_MEM)
{
//暴露pci的內存空間給uio設備
ret = igbuio_pci_setup_iomem(dev, info, iom, i, bar_names[i]);
}
else if (flags & IORESOURCE_IO)
{
//暴露pci的io空間給uio設備
ret = igbuio_pci_setup_ioport(dev, info, iop, i, bar_names[i]);
}
}
}
}
(2)攔截硬件中斷:爲了減掉內核中冗餘的數據處理流程,應用程序要 hook 網卡的中斷,從源頭開始攔截網卡數據!當硬件中斷觸發時,纔不會一直觸發內核去執行中斷回調。也就是通過這種方式,才能在應用層實現硬件中斷處理過程。注意:這裏說的中斷僅是控制中斷,而不是報文收發的數據中斷,數據中斷是不會走到這裏來的,因爲在 pmd 開啓中斷時,沒有設置收發報文的中斷掩碼,只註冊了網卡狀態改變的中斷掩碼;hook 中斷的代碼如下:
int igbuio_pci_probe(struct pci_dev *dev, const struct pci_device_id *id)
{
//填充uio信息
udev->info.name = "igb_uio";
udev->info.version = "0.1";
udev->info.handler = igbuio_pci_irqhandler; //硬件控制中斷的入口,劫持原來的硬件中斷
udev->info.irqcontrol = igbuio_pci_irqcontrol; //應用層開關中斷時被調用,用於是否開始中斷
}
static irqreturn_t igbuio_pci_irqhandler(int irq, struct uio_info *info)
{
if (udev->mode == RTE_INTR_MODE_LEGACY && !pci_check_and_mask_intx(udev->pdev))
{
return IRQ_NONE;
}
//返回IRQ_HANDLED時,linux uio框架會喚醒等待uio中斷的進程。註冊到epoll的uio中斷事件就會被調度
/* Message signal mode, no share IRQ and automasked */
return IRQ_HANDLED;
}
static int igbuio_pci_irqcontrol(struct uio_info *info, s32 irq_state)
{
//調用內核的api來開關中斷
if (udev->mode == RTE_INTR_MODE_LEGACY)
{
pci_intx(pdev, !!irq_state);
}
else if (udev->mode == RTE_INTR_MODE_MSIX)\
{
list_for_each_entry(desc, &pdev->msi_list, list)
igbuio_msix_mask_irq(desc, irq_state);
}
}
2、內存池:傳統應用要使用內存時,一般都是調用 malloc 讓操作系統在堆上分配。這樣做有兩點弊端:
-
進入內核要切換上下文
-
操作系統通過 buddy&slab 算法找合適的空閒內存
所以頻繁調用 malloc 會嚴重拉低效率!如果不頻繁調用 malloc,怎麼處理頻繁收到和需要發送的報文數據了?dpdk 採用的是內存池的技術:即在 huge page 內存中開闢一個連續的大緩衝區當做內存池!同時提供 rte_mempool_get 從內存池中獲取內存空間。也可調用 rte_mempool_put 將不再使用的內存空間放回到內存池中。從這裏就能看出:dpdk 自己從 huge page 處維護了一大塊內存供應用程序使用,應用程序不再需要通過系統調用從操作系統申請內存了!
(1)內存池的創建,在 rte_mempool_create 接口中完成。這個接口主要是在大頁內存中開闢一個連續的大緩衝區當做內存池,然後將這個內存池進行分割,頭部爲 struct rte_mempool 內存池結構;緊接着是內存池的私有結構大小,這個由應用層自己設置,每個創建內存池的應用進程都可以指定不同的私有結構;最後是多個連續的對象元素,這些對象元素都是處於同一個內存池中。每個對象元素又有對象的頭部,對象的真實數據區域,對象的尾部組成。這裏所說的對象元素,其實就是應用層要開闢的真實數據空間,例如應用層自己定義的結構體變量等;本質上是 dpdk 自己實現了一套內存的管理辦法,其作用和 linux 的 buddy&slab 是一樣的,沒本質區別!整個內存池圖示如下:
每創建一個內存池,都會創建一個鏈表節點,然後插入到鏈表中,因此這個鏈表記錄着當前系統創建了多少內存池。核心代碼如下:
//創建內存池鏈表節點
te = rte_zmalloc("MEMPOOL_TAILQ_ENTRY", sizeof(*te), 0);
//內存池鏈表節點插入到內存池鏈表中
te->data = (void *) mp;
RTE_EAL_TAILQ_INSERT_TAIL(RTE_TAILQ_MEMPOOL, rte_mempool_list, te);
所以說內存池可能不止 1 個,會有多個!在內存池中,內存被劃分成了 N 多的對象。應用程序要申請內存時,怎麼知道哪些對象空閒可以用,哪些對象已經被佔用了?當對象元素初始化完成後,會把對象指針放入 ring 隊列,所以說 ring 隊列的所有對象指針都是可以使用的!應用程序要申請內存時,可以調用 rte_mempool_get 接口從 ring 隊列中獲取,也就是出隊;使用完畢後調用 rte_mempool_put 將內存釋放回收時,也是將要回收的內存空間對應的對象指針放到這個 ring 隊列中,也就是入隊!
(2)具體分配內存時的步驟:
-
現代 cpu 基本都是多核的,多個 cpu 同時在內存池申請內存時無法避免涉及到互斥,會在一定程度上影響分配的效率,所以每個 cpu 自己都有自己的 “自留地”,會優先在自己的“自留地” 申請內存;
-
如果 “自留地” 的內存已耗盡,纔會繼續去內存池申請內存!核心代碼如下:
int rte_mempool_get(struct rte_mempool *mp, void **obj_table, unsigned n)
{
#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
//從當前cpu應用層緩衝區中獲取
cache = &mp->local_cache[lcore_id];
cache_objs = cache->objs;
for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++)
{
*obj_table = cache_objs[len];
}
return 0;
#endif
/* get remaining objects from ring */
//直接從ring隊列中獲取
ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
}
釋放內存的步驟和申請類似:
-
先查看 cpu 的 “自留地” 是否還有空間。如果有,就先把釋放的對象指針放在“自留地”;
-
如果 “自留地” 沒空間了,再把釋放的對象指針放在內存池!核心代碼如下:
int rte_mempool_put(struct rte_mempool *mp, void **obj_table, unsigned n)
{
#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
//在當前cpu本地緩存有空間的場景下, 先放回到本地緩存。
cache = &mp->local_cache[lcore_id];
cache_objs = &cache->objs[cache->len];
for (index = 0; index < n; ++index, obj_table++)
{
cache_objs[index] = *obj_table;
}
//緩衝達到閾值,刷到隊列中
if (cache->len >= flushthresh)
{
rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size], cache->len - cache_size);
cache->len = cache_size;
}
return 0
#endif
//直接放回到ring隊列
rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n);
}
注意:這裏的 ring 是環形無鎖隊列!
3、Poll mode driver:不論何總形式的 io,接收方獲取數據的方式有兩種:
-
被動接收中斷的喚醒:典型如網卡收到數據,通過硬件中斷通知操作系統去處理;操作系統收到數據後會喚醒休眠的進程繼續處理數據
-
輪詢 poll:寫個死循環不停的檢查內存地址是否有新數據到了!
在 x86 體系結構中,一次中斷處理需要將 CPU 的狀態寄存器保存到堆棧,並運行中斷 handler,最後再將保存的狀態寄存器信息從堆棧中恢復,整個過程需要至少 300 個處理器時鐘週期!所以 dpdk 果斷拋棄了中斷,轉而使用輪詢方式!整個流程大致是這樣的:內核態的 UIO Driver hook 了網卡發出的中斷信號,然後由用戶態的 PMD Driver 採用主動輪詢的方式。除了鏈路狀態通知仍必須採用中斷方式以外(因爲網卡發出硬件中斷才能觸發執行 hook 代碼的嘛,這個容易理解吧?),均使用無中斷方式直接操作網卡設備的接收和發送隊列。整體流程大致如下:UIO hook 了網卡的中斷,網卡收到數據後 “被迫” 執行 hook 代碼!先是通過 UIO 把網卡的存儲地址映射到 / dev/uio 文件,而後應用程序通過 PMD 輪詢檢查文件是否有新數據到來!期間也使用 mmap 把應用的虛擬地址映射到網卡的物理地址,減少數據的拷貝轉移!
總的來說:UIO+PMD,前者旁路了內核,後者主動輪詢避免了硬中斷,DPDK 從而可以在用戶態進行收發包的處理。帶來了零拷貝(Zero Copy)、無系統調用(System call)的優化。同時,還避免了軟中斷的異步處理,也減少了上下文切換帶來的 Cache Miss!輪詢收報核心代碼如下:
/*PMD輪詢接收數據包*/
uint16_t
eth_em_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
uint16_t nb_pkts)
{
/* volatile防止編譯器優化,每次使用必須又一次從memory中取而不是用寄存器的值 */
volatile struct e1000_rx_desc *rx_ring;
volatile struct e1000_rx_desc *rxdp;//指向rx ring中某個e1000_rx_desc描述符
struct em_rx_queue *rxq;//整個接收隊列
struct em_rx_entry *sw_ring;//指向描述符隊列的頭部,根據rx tail來偏移
struct em_rx_entry *rxe;//指向sw ring中具體的entry
struct rte_mbuf *rxm;//entry裏的rte mbuf
/*是new mbuf,新申請的mbuf,當rxm從ring中取出後,需要用nmb再掛上去,
更新對應rx ring和sw ring中的值,爲下一次收包做準備*/
struct rte_mbuf *nmb;
struct e1000_rx_desc rxd;//具體的非指針描述符
uint64_t dma_addr;
uint16_t pkt_len;
uint16_t rx_id;
uint16_t nb_rx;
uint16_t nb_hold;
uint8_t status;
rxq = rx_queue;
nb_rx = 0;
nb_hold = 0;
//初始化臨時變量,要開始遍歷隊列了
rx_id = rxq->rx_tail;
rx_ring = rxq->rx_ring;
sw_ring = rxq->sw_ring;
/* 一次性收32個報文 */
while (nb_rx < nb_pkts) {
/*
* The order of operations here is important as the DD status
* bit must not be read after any other descriptor fields.
* rx_ring and rxdp are pointing to volatile data so the order
* of accesses cannot be reordered by the compiler. If they were
* not volatile, they could be reordered which could lead to
* using invalid descriptor fields when read from rxd.
*/
/* 當前報文的descriptor */
rxdp = &rx_ring[rx_id];
status = rxdp->status; /* 結束標記,必須首先讀取 */
/*檢查狀態是否爲dd, 不是則說明驅動還沒有把報文放到接收隊列,直接退出*/
if (! (status & E1000_RXD_STAT_DD))
break;
rxd = *rxdp; /* 複製一份 */
/*
* End of packet.
*
* If the E1000_RXD_STAT_EOP flag is not set, the RX packet is
* likely to be invalid and to be dropped by the various
* validation checks performed by the network stack.
*
* Allocate a new mbuf to replenish the RX ring descriptor.
* If the allocation fails:
* - arrange for that RX descriptor to be the first one
* being parsed the next time the receive function is
* invoked [on the same queue].
*
* - Stop parsing the RX ring and return immediately.
*
* This policy do not drop the packet received in the RX
* descriptor for which the allocation of a new mbuf failed.
* Thus, it allows that packet to be later retrieved if
* mbuf have been freed in the mean time.
* As a side effect, holding RX descriptors instead of
* systematically giving them back to the NIC may lead to
* RX ring exhaustion situations.
* However, the NIC can gracefully prevent such situations
* to happen by sending specific "back-pressure" flow control
* frames to its peer(s).
*/
PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_id=%u "
"status=0x%x pkt_len=%u",
(unsigned) rxq->port_id, (unsigned) rxq->queue_id,
(unsigned) rx_id, (unsigned) status,
(unsigned) rte_le_to_cpu_16(rxd.length));
nmb = rte_mbuf_raw_alloc(rxq->mb_pool);
if (nmb == NULL) {
PMD_RX_LOG(DEBUG, "RX mbuf alloc failed port_id=%u "
"queue_id=%u",
(unsigned) rxq->port_id,
(unsigned) rxq->queue_id);
rte_eth_devices[rxq->port_id].data->rx_mbuf_alloc_failed++;
break;
}
/* 表示當前descriptor被上層軟件佔用 */
nb_hold++;
/* 當前收到的mbuf */
rxe = &sw_ring[rx_id];
/* 收包位置,假設超過環狀數組則回滾 */
rx_id++;
if (rx_id == rxq->nb_rx_desc)
rx_id = 0;
/* mbuf加載cache下次循環使用 */
/* Prefetch next mbuf while processing current one. */
rte_em_prefetch(sw_ring[rx_id].mbuf);
/*
* When next RX descriptor is on a cache-line boundary,
* prefetch the next 4 RX descriptors and the next 8 pointers
* to mbufs.
*/
/* 取下一個descriptor,以及mbuf指針下次循環使用 */
/* 一個cache line是4個descriptor大小(64字節) */
if ((rx_id & 0x3) == 0) {
rte_em_prefetch(&rx_ring[rx_id]);
rte_em_prefetch(&sw_ring[rx_id]);
}
/* Rearm RXD: attach new mbuf and reset status to zero. */
rxm = rxe->mbuf;
rxe->mbuf = nmb;
dma_addr =
rte_cpu_to_le_64(rte_mbuf_data_iova_default(nmb));
rxdp->buffer_addr = dma_addr;
rxdp->status = 0;/* 重置當前descriptor的status */
/*
* Initialize the returned mbuf.
* 1) setup generic mbuf fields:
* - number of segments,
* - next segment,
* - packet length,
* - RX port identifier.
* 2) integrate hardware offload data, if any:
* - RSS flag & hash,
* - IP checksum flag,
* - VLAN TCI, if any,
* - error flags.
*/
pkt_len = (uint16_t) (rte_le_to_cpu_16(rxd.length) -
rxq->crc_len);
rxm->data_off = RTE_PKTMBUF_HEADROOM;
rte_packet_prefetch((char *)rxm->buf_addr + rxm->data_off);
rxm->nb_segs = 1;
rxm->next = NULL;
rxm->pkt_len = pkt_len;
rxm->data_len = pkt_len;
rxm->port = rxq->port_id;
rxm->ol_flags = rx_desc_status_to_pkt_flags(status);
rxm->ol_flags = rxm->ol_flags |
rx_desc_error_to_pkt_flags(rxd.errors);
/* Only valid if PKT_RX_VLAN set in pkt_flags */
rxm->vlan_tci = rte_le_to_cpu_16(rxd.special);
/*
* Store the mbuf address into the next entry of the array
* of returned packets.
*/
/* 把收到的mbuf返回給用戶 */
rx_pkts[nb_rx++] = rxm;
}
/* 收包位置更新 */
rxq->rx_tail = rx_id;
/*
* If the number of free RX descriptors is greater than the RX free
* threshold of the queue, advance the Receive Descriptor Tail (RDT)
* register.
* Update the RDT with the value of the last processed RX descriptor
* minus 1, to guarantee that the RDT register is never equal to the
* RDH register, which creates a "full" ring situtation from the
* hardware point of view...
*/
nb_hold = (uint16_t) (nb_hold + rxq->nb_rx_hold);
if (nb_hold > rxq->rx_free_thresh) {
PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_tail=%u "
"nb_hold=%u nb_rx=%u",
(unsigned) rxq->port_id, (unsigned) rxq->queue_id,
(unsigned) rx_id, (unsigned) nb_hold,
(unsigned) nb_rx);
rx_id = (uint16_t) ((rx_id == 0) ?
(rxq->nb_rx_desc - 1) : (rx_id - 1));
E1000_PCI_REG_WRITE(rxq->rdt_reg_addr, rx_id);
nb_hold = 0;
}
rxq->nb_rx_hold = nb_hold;
return nb_rx;
}
接收報文的整理流程梳理如下圖所示:
-
DMA 控制器控制報文一個個寫到 rx ring 中接收描述符指定的 IO 虛擬內存中,對應的實際內存應該就是 mbuf;
-
接收函數用 rx tail 變量控制不停地讀取 rx ring 中的描述符和 sw ring 中的 mbuf,並申請新的 mbuf 放入 sw ring 中,更新 rx ring 中的 buffer addr
-
最後把讀取的 mbuf 返回給應用程序。
4、線程親和性
一個 cpu 上可以運行多個線程, 由 linux 內核來調度各個線程的執行。內核在調度線程時,會進行上下文切換,保存線程的堆棧等信息, 以便這個線程下次再被調度執行時,繼續從指定的位置開始執行。然而上下文切換是需要耗費 cpu 資源的的。多核體系的 CPU,物理核上的線程來回切換,會導致 L1/L2 cache 命中率的下降。同時 NUMA 架構下,如果操作系統調度線程的時候,跨越了 NUMA 節點,將會導致大量的 L3 cache 的丟失。Linux 對線程的親和性是有支持的, 如果將線程和 cpu 進行綁定的話,線程會一直在指定的 cpu 上運行,不會被操作系統調度到別的 cpu 上,線程之間互相獨立工作而不會互相擾完,節省了操作系統來回調度的時間。目前 DPDK 通過把線程綁定到 cpu 的方法來避免跨核任務中的切換開銷。
線程綁定 cpu 物理核的函數如下:
/* set affinity for current EAL thread */
static int
eal_thread_set_affinity(void)
{
unsigned lcore_id = rte_lcore_id();
/* acquire system unique id */
rte_gettid();
/* update EAL thread core affinity */
return rte_thread_set_affinity(&lcore_config[lcore_id].cpuset);
}
繼續往下走:
/*
根據前面的rte_cpuset_t ,設置tid的綁定關係
存儲thread local socket_id
存儲thread local rte_cpuset_t
*/
int
rte_thread_set_affinity(rte_cpuset_t *cpusetp)
{
int s;
unsigned lcore_id;
pthread_t tid;
tid = pthread_self();//得到當前線程id
//綁定cpu和線程
s = pthread_setaffinity_np(tid, sizeof(rte_cpuset_t), cpusetp);
if (s != 0) {
RTE_LOG(ERR, EAL, "pthread_setaffinity_np failed\n");
return -1;
}
/* store socket_id in TLS for quick access */
//socketid存放到線程本地空間,便於快速讀取
RTE_PER_LCORE(_socket_id) =
eal_cpuset_socket_id(cpusetp);
/* store cpuset in TLS for quick access */
//cpu信息存放到cpu本地空間,便於快速讀取
memmove(&RTE_PER_LCORE(_cpuset), cpusetp,
sizeof(rte_cpuset_t));
lcore_id = rte_lcore_id();//獲取線程綁定的CPU
if (lcore_id != (unsigned)LCORE_ID_ANY) {//如果不相等,就更新lcore配置
/* EAL thread will update lcore_config */
lcore_config[lcore_id].socket_id = RTE_PER_LCORE(_socket_id);
memmove(&lcore_config[lcore_id].cpuset, cpusetp,
sizeof(rte_cpuset_t));
}
return 0;
}
繼續往下走:
int
pthread_setaffinity_np(pthread_t thread, size_t cpusetsize,
const rte_cpuset_t *cpuset)
{
if (override) {
/* we only allow affinity with a single CPU */
if (CPU_COUNT(cpuset) != 1)
return POSIX_ERRNO(EINVAL);
/* we only allow the current thread to sets its own affinity */
struct lthread *lt = (struct lthread *)thread;
if (lthread_current() != lt)
return POSIX_ERRNO(EINVAL);
/* determine the CPU being requested */
int i;
for (i = 0; i < LTHREAD_MAX_LCORES; i++) {
if (!CPU_ISSET(i, cpuset))
continue;
break;
}
/* check requested core is allowed */
if (i == LTHREAD_MAX_LCORES)
return POSIX_ERRNO(EINVAL);
/* finally we can set affinity to the requested lcore
前面做了大量的檢查和容錯,這裏終於開始綁定cpu了
*/
lthread_set_affinity(i);
return 0;
}
return _sys_pthread_funcs.f_pthread_setaffinity_np(thread, cpusetsize,
cpuset);
}
綁定 cpu 的方法也簡單:本質就是個上下文切換
/*
* migrate the current thread to another scheduler running
* on the specified lcore.
*/
int lthread_set_affinity(unsigned lcoreid)
{
struct lthread *lt = THIS_LTHREAD;
struct lthread_sched *dest_sched;
if (unlikely(lcoreid >= LTHREAD_MAX_LCORES))
return POSIX_ERRNO(EINVAL);
DIAG_EVENT(lt, LT_DIAG_LTHREAD_AFFINITY, lcoreid, 0);
dest_sched = schedcore[lcoreid];
if (unlikely(dest_sched == NULL))
return POSIX_ERRNO(EINVAL);
if (likely(dest_sched != THIS_SCHED)) {
lt->sched = dest_sched;
lt->pending_wr_queue = dest_sched->pready;
//真正切換線程到指定cpu運行的代碼
_affinitize();
return 0;
}
return 0;
}
tatic __rte_always_inline void
_affinitize(void);
static inline void
_affinitize(void)
{
struct lthread *lt = THIS_LTHREAD;
DIAG_EVENT(lt, LT_DIAG_LTHREAD_SUSPENDED, 0, 0);
ctx_switch(&(THIS_SCHED)->ctx, <->ctx);
}
void
ctx_switch(struct ctx *new_ctx __rte_unused, struct ctx *curr_ctx __rte_unused)
{
/* SAVE CURRENT CONTEXT */
asm volatile (
/* Save SP */
"mov x3, sp\n"
"str x3, [x1, #0]\n"
/* Save FP and LR */
"stp x29, x30, [x1, #8]\n"
/* Save Callee Saved Regs x19 - x28 */
"stp x19, x20, [x1, #24]\n"
"stp x21, x22, [x1, #40]\n"
"stp x23, x24, [x1, #56]\n"
"stp x25, x26, [x1, #72]\n"
"stp x27, x28, [x1, #88]\n"
/*
* Save bottom 64-bits of Callee Saved
* SIMD Regs v8 - v15
*/
"stp d8, d9, [x1, #104]\n"
"stp d10, d11, [x1, #120]\n"
"stp d12, d13, [x1, #136]\n"
"stp d14, d15, [x1, #152]\n"
);
/* RESTORE NEW CONTEXT */
asm volatile (
/* Restore SP */
"ldr x3, [x0, #0]\n"
"mov sp, x3\n"
/* Restore FP and LR */
"ldp x29, x30, [x0, #8]\n"
/* Restore Callee Saved Regs x19 - x28 */
"ldp x19, x20, [x0, #24]\n"
"ldp x21, x22, [x0, #40]\n"
"ldp x23, x24, [x0, #56]\n"
"ldp x25, x26, [x0, #72]\n"
"ldp x27, x28, [x0, #88]\n"
/*
* Restore bottom 64-bits of Callee Saved
* SIMD Regs v8 - v15
*/
"ldp d8, d9, [x0, #104]\n"
"ldp d10, d11, [x0, #120]\n"
"ldp d12, d13, [x0, #136]\n"
"ldp d14, d15, [x0, #152]\n"
);
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/b8YssI7D4iJ8DyCDQvHfuA