使用 AF_XDP Socket 更高效的網絡傳輸
Linux 網絡棧並不缺乏功能,它的表現也很好 足以滿足大多數用途。但是,在高速網絡下,傳統網絡編程的額外開銷佔比太大了。在前一篇關於 syscall.Socket 的文章中,我們介紹了 AF_PACKET 類型的 socket,它的性能着實一般,所有的數據都得在用戶態和內核態之間做轉換,而且在高併發的情況下還有大量的中斷。使用 eBPF XDP 可以完美解決高性能的問題,我們在更早的文章中介紹了 XDP 的技術,Björn Töpel 在 Linux 4.18 版本中爲 Socket 增加了一個協議族 AF_XDP
, 可以利用 Socket 接口和 XDP 技術實現高性能的網絡讀寫。
2019 年 Intel 的 Björn Töpel, 也是他主要實現的 AF_XDP Socket, 在一次分享中介紹了 AF_XDP 和普通的 AF_PACKET 的三個場景下的性能對比:
沒有對比就沒有傷害,可以看到 AF_XDP 的性能遠遠大於 AF_PACKET。
AF_XDP Socket 介紹
AF_XDP(eXpress Data Path)是一種高性能網絡協議棧,可用於實現零拷貝數據傳輸和零中斷數據接收。AF_XDP socket 是一種 Linux 內核中支持 AF_XDP 協議的 socket 類型。
相較於傳統的 socket,AF_XDP socket 具有以下幾個顯著的特點:
-
零拷貝傳輸:在使用 AF_XDP socket 傳輸數據時,數據可以直接在內存中進行傳輸,而無需將數據從用戶空間複製到內核空間,從而減少了數據傳輸過程中的內存複製次數,提高了數據傳輸效率。
-
零中斷接收:在使用 AF_XDP socket 接收數據時,數據可以直接從網卡中接收,而無需通過中斷通知內核,從而減少了中斷處理的次數,提高了接收數據的效率。
-
支持多隊列:AF_XDP socket 支持多個隊列,可以將不同的網絡流量路由到不同的隊列中,從而實現更好的負載均衡和多核利用。
-
支持用戶空間協議棧:AF_XDP socket 可以與用戶空間中的協議棧結合使用,從而可以在用戶空間中實現網絡協議棧,提高了網絡應用程序的性能和靈活性。
總之,AF_XDP socket 是一種高性能的網絡數據傳輸方式,適用於需要處理大量數據的高性能網絡應用程序。
我們使用普通的 socket()
系統調用創建一個 AF_XDP 套接字(XSK)。每個 XSK 都有兩個 ring:RX RING
和 TX RING
。套接字可以在 RX RING 上接收數據包,並且可以在 TX RING 環上發送數據包。這些環分別通過 setockopts() 的 XDP_RX_RING 和 XDP_TX_RING 進行註冊和調整大小。每個 socket 必須至少有一個這樣的環。RX 或 TX 描述符環指向存儲區域(稱爲 UMEM)中的數據緩衝區。RX 和 TX 可以共享同一 UMEM,因此不必在 RX 和 TX 之間複製數據包。
UMEM 也有兩個 ring:FILL RING
和 COMPLETION RING
。應用程序使用 FILL RING
向內核發送可以承載報文的 addr
(該 addr 指向 UMEM 中某個 chunk),以供內核填充 RX 數據包數據。每當收到數據包,對這些 chunks 的引用就會出現在 RX 環中。另一方面,COMPLETION RING
包含內核已完全傳輸的 chunks 地址,可以由用戶空間再次用於 TX 或 RX。
可以看到,這裏有四個環,RX RING
和 TX RING
環中的數據是描述符 (xdp_desc),而FILL RING
和 COMPLETION RING
是地址 (u64)。
-
Rx Ring:接收環(Receive Ring)是由硬件網卡或 AF_XDP 驅動程序生成的,它存儲待處理的接收數據幀描述符(Receive Descriptor),並將這些描述符傳遞給內核或用戶空間程序。接收環通常由多個隊列組成,每個隊列都有一個獨立的 Rx Ring。Rx Ring 的生產者是 XDP 程序,消費者是用戶態程序;XDP 程序消耗 Fill Ring,獲取可以承載報文的 desc 並將報文拷貝到 desc 中指定的地址,然後將 desc 填充到 Rx Ring 中,並通過 socket IO 機制通知用戶態程序從 Rx Ring 中接收報文
-
Fill Ring:填充環(Fill Ring)是用戶空間程序爲接收環生成新描述符的環,以便接收環始終有足夠的描述符可供使用。填充環也可以由多個隊列組成,每個隊列都有一個獨立的 Fill Ring。Fill Ring 的生產者是用戶態程序,消費者是內核態中的 XDP 程序;用戶態程序通過 Fill Ring 將可以用來承載報文的 UMEM frames 傳到內核,然後內核消耗 Fill Ring 中的描述符 desc,並將報文拷貝到 desc 中指定地址(該地址即 UMEM frame 的地址)
-
Tx Ring:發送環(Transmit Ring)由用戶空間程序生成,用於存儲要發送的數據幀描述符(Transmit Descriptor)。發送環也可以由多個隊列組成,每個隊列都有一個獨立的 Tx Ring。Tx Ring 的生產者是用戶態程序,消費者是 XDP 程序;用戶態程序將要發送的報文拷貝 Tx Ring 中 desc 指定的地址中,然後 XDP 程序 消耗 Tx Ring 中的 desc,將報文發送出去,並通過 Completion Ring 將成功發送的報文的 desc 告訴用戶態程序;
-
Completion Ring:完成環(Completion Ring)是用於接收已經處理完的數據幀描述符的環。完成環由內核或用戶空間程序創建,可以由多個隊列組成,每個隊列都有一個獨立的 Completion Ring。Completion Ring 的生產者是 XDP 程序,消費者是用戶態程序
當內核完成 XDP 報文的發送,會通過 completion_ring 來通知用戶態程序,哪些報文已經成功發送,然後用戶態程序消耗 completion_ring 中 desc(只是更新 consumer 計數相當於確認);
通過這四個 ring 的協同工作,AF_XDP 可以實現高性能的網絡數據傳輸,以及在用戶空間實現網絡協議棧的功能。用戶空間程序可以通過 Fill Ring 爲 Rx Ring 生成新的接收數據描述符,然後使用 Tx Ring 將處理過的數據發送出去。內核或用戶空間程序可以從 Completion Ring 中獲取已經處理完的描述符,以便進行後續的處理。這些 ring 可以實現高效的數據處理和網絡負載均衡,從而提高了網絡應用程序的性能和吞吐量。
AF_XDP Socket 在高性能網絡應用中的應用場景,包括 DDoS 攻擊防禦、網絡流量監控、負載均衡等。在這些應用場景中,AF_XDP 可以通過實時處理大量網絡流量數據,快速識別惡意流量和負載均衡,提高網絡應用的性能和安全性。
Go AF_XDP 實戰
AF_XDP Socket 相對於傳統的 AF_PACKET 的使用的複雜程序至少要高一個數量級,因爲複雜,所以容易出錯,不過幸運的是,有一個第三方的庫對它進行了封裝,更方便我們使用,這個庫就是 asavie/xdp[1]。
它將 XSK 進行了封裝,提供了非常方便的方法進行數據的讀取和發送。
我們用它的兩個例子介紹它的功能。
發送的例子
下面是一個不斷髮送 DNS 查詢的示例:
package main
import (
"encoding/hex"
"flag"
"fmt"
"math"
"net"
"time"
"github.com/asavie/xdp"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/miekg/dns"
"github.com/vishvananda/netlink"
)
// ...
var (
NIC string
QueueID int
SrcMAC string
DstMAC string
SrcIP string
DstIP string
DomainName string
)
func main() {
flag.StringVar(&NIC, "interface", "enp3s0", "Network interface to attach to.")
flag.IntVar(&QueueID, "queue", 0, "The queue on the network interface to attach to.")
flag.StringVar(&SrcMAC, "srcmac", "b2968175b211", "Source MAC address to use in sent frames.")
flag.StringVar(&DstMAC, "dstmac", "ffffffffffff", "Destination MAC address to use in sent frames.")
flag.StringVar(&SrcIP, "srcip", "192.168.111.1", "Source IP address to use in sent frames.")
flag.StringVar(&DstIP, "dstip", "192.168.111.10", "Destination IP address to use in sent frames.")
flag.StringVar(&DomainName, "domain", "asavie.com", "Domain name to use in the DNS query.")
flag.Parse()
// 初始化XDP socket.
link, err := netlink.LinkByName(NIC)
if err != nil {
panic(err)
}
xsk, err := xdp.NewSocket(link.Attrs().Index, QueueID, nil)
if err != nil {
panic(err)
}
//-----------------
// 生成DNS查詢請求
srcMAC, _ := hex.DecodeString(SrcMAC)
dstMAC, _ := hex.DecodeString(DstMAC)
eth := &layers.Ethernet{
SrcMAC: net.HardwareAddr(srcMAC),
DstMAC: net.HardwareAddr(dstMAC),
EthernetType: layers.EthernetTypeIPv4,
}
ip := &layers.IPv4{
Version: 4,
IHL: 5,
TTL: 64,
Id: 0,
Protocol: layers.IPProtocolUDP,
SrcIP: net.ParseIP(SrcIP).To4(),
DstIP: net.ParseIP(DstIP).To4(),
}
udp := &layers.UDP{
SrcPort: 1234,
DstPort: 53,
}
udp.SetNetworkLayerForChecksum(ip)
query := new(dns.Msg)
query.SetQuestion(dns.Fqdn(DomainName), dns.TypeA)
payload, err := query.Pack()
if err != nil {
panic(err)
}
buf := gopacket.NewSerializeBuffer()
opts := gopacket.SerializeOptions{
FixLengths: true,
ComputeChecksums: true,
}
err = gopacket.SerializeLayers(buf, opts, eth, ip, udp, gopacket.Payload(payload))
if err != nil {
panic(err)
}
frameLen := len(buf.Bytes())
//-----------------
// 填充UMEM中所有的frame, 使用預先生成的DNS查詢
descs := xsk.GetDescs(math.MaxInt32, false)
for i := range descs {
frameLen = copy(xsk.GetFrame(descs[i]), buf.Bytes())
}
fmt.Printf("sending DNS queries from %v (%v) to %v (%v) for domain name %s...\n", ip.SrcIP, eth.SrcMAC, ip.DstIP, eth.DstMAC, DomainName)
// 每秒輸出發送的統計數據
go func() {
var err error
var prev xdp.Stats
var cur xdp.Stats
var numPkts uint64
for i := uint64(0); ; i++ {
time.Sleep(time.Duration(1) * time.Second)
cur, err = xsk.Stats()
if err != nil {
panic(err)
}
numPkts = cur.Completed - prev.Completed
fmt.Printf("%d packets/s (%d bytes/s)\n", numPkts, numPkts*uint64(frameLen))
prev = cur
}
}()
// 無盡的發送查詢蘇局
for {
descs := xsk.GetDescs(xsk.NumFreeTxSlots(), false)
for i := range descs {
descs[i].Len = uint32(frameLen)
}
xsk.Transmit(descs)
_, _, err = xsk.Poll(1)
if err != nil {
panic(err)
}
}
}
-
首先它基於網卡生成一個 XSK, 這個 XSK 的初始化隱藏了底層的很多的初始化動作,這是這個庫做的很好的地方
-
生成一個特定的 DNS 查詢請求包, 後面會一直使用這個數據往網絡上發送
-
得到所有可用的 Desc, 並使用 DNS 請求數據進行初始化
-
啓動一個 goroutine, 每秒打印出發送的包數和數據大小,以便觀察它的性能
-
在一個死循環中,先得到可以發送的 Desc, 然後調用
Transmit
將 Desc 寫入到 Tx ring。 -
然後調用
Poll
, 等待內核發送了數據或者接收到數據,再進行下一次的數據發送
得益於 XDP 庫的封裝,很多麻煩的細節比如 mmap 的創建、socket option 的設置, ring 的操作等等,都隱藏起來了,對外提供了易於使用的接口
接下來我們看一個同時讀寫的例子。
廣播的例子
下面這個例子接收所有的數據包,並把目的 Mac 地址改成廣播地址,再發送出去:
package main
import (
"os"
"os/signal"
"syscall"
"github.com/asavie/xdp"
"github.com/vishvananda/netlink"
)
func main() {
const LinkName = "enp6s0"
const QueueID = 0
link, err := netlink.LinkByName(LinkName)
if err != nil {
panic(err)
}
// 創建一個XDP程序1
program, err := xdp.NewProgram(QueueID + 1)
if err != nil {
panic(err)
}
if err := program.Attach(link.Attrs().Index); err != nil {
panic(err)
}
// 初始化一個XDP Socket
xsk, err := xdp.NewSocket(link.Attrs().Index, QueueID, nil)
if err != nil {
panic(err)
}
// 在XDP程序中註冊這個xsk
if err := program.Register(QueueID, xsk.FD()); err != nil {
panic(err)
}
// 退出的時候移除這個XDP BPF程序
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
program.Detach(link.Attrs().Index)
os.Exit(1)
}()
// 開始勞苦工作
for {
// Fill,等待內核把收到的程序修到Rx ring中
xsk.Fill(xsk.GetDescs(xsk.NumFreeFillSlots()))
numRx, _, err := xsk.Poll(-1) // 等待接收
if err != nil {
panic(err)
}
rxDescs := xsk.Receive(numRx) // 收到的數據
for i := 0; i < len(rxDescs); i++ {
// 把Mac地址修改爲廣播地址,也就是全ff
// ff:ff:ff:ff:ff:ff
frame := xsk.GetFrame(rxDescs[i])
for i := 0; i < 6; i++ {
frame[i] = byte(0xff)
}
}
xsk.Transmit(rxDescs) // 把修改的數據發送出去
}
}
如代碼中的註釋所示,
-
先 Fill
-
調用 Poll 等待有接收的數據
-
調用 Receive 讀取接收的數據
-
修改數據中的 mac 地址
-
再發送出去
如果你也測試這個程序,最好創建一個測試用的網絡,否則會把你的網絡搞掛
參考文檔
-
https://lwn.net/Articles/750845/
-
https://www.kernel.org/doc/html/latest/networking/af_xdp.html
-
https://github.com/asavie/xdp
-
https://blog.xuegaogg.com/posts/1933/
-
https://rexrock.github.io/post/af_xdp1/
-
https://elinux.org/images/9/96/Elce-af_xdp-topel-v3.pdf
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/CH7EtdTxTUkgvu6W0DyAyA