如何在 Go 中實現百萬級 UDP 通信

近些年來,各種類型的產品得到充足的發展,交互性和複雜度都在迅速提高,都需要在極短的時間內將數據同 時投遞給大量用戶,因此傳輸技術自然變爲未來制約發展的一個重要因素。在此之前對於通信協議首選 TCP, 而今因爲 TCP 的種種限制,UDP 得到了很多開發人員的青睞,並在 UDP 的基礎上開發出了衆多的可靠算法,如 QUIC、KCP 等,在此基礎上對於 UDP 的關注面跨越了可靠性,進一步考慮放大 UDP 的通信能力,典型如何在短 時間內快速接收和處理超大量的數據包。在此之前,我曾研究 UDP 許多時日,試圖在 Go 中最大化 UDP 通信能力;本文內容源自於對於對高數量級 UDP 通 信能力的優化經驗,不同於形容 TCP 通信能力的單位,針對 UDP 的特性選擇以 “每秒多少個包”(PPS)來作爲通 信能力單位更具有現實意義。

實現最簡 epoll

對於 UDP 而言,嚴格來講並不需要自己額外再實現 epoll,但爲了利用多核性能配合端口重用做到 “多線程” 綁定 統一地址端口,實現簡化 epoll 是很必要的。與*unix相關的內容在golang.org/x/sys/unix包中,採用系統調用方式簡化 epoll。對於 epoll 的使用,主要 在三個 API 中:

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int max_events, int
timeout);

那麼只需要在 Go 中調用到這三個 API 即可。

func PollerInit() (*Poller, error) {
 fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil {
	return nil, os.NewSyscallError("epoll_create1", err)
 }
 poller := &Poller{
 fd: fd,
 }
return poller, nil
}

正常情況下使用 epoll 的流程是拿到 fd 之後可添加、修改或刪除觸發實現,爲了實現方便,將epoll_ctl的調用 改成了如下實現:

func (poller *Poller) Add(fd int, ev string) error {
 e := &unix.EpollEvent{
 Fd: int32(fd),
 }
switch ev {
case "r":
 e.Events = unix.EPOLLIN
case "w":
 e.Events = unix.EPOLLOUT
case "rw":
 e.Events = unix.EPOLLIN | unix.EPOLLOUT
default:
return fmt.Errorf("unknow epoll event type")
 }
return os.NewSyscallError("epoll_ctl add", unix.EpollCtl(poller.fd,
unix.EPOLL_CTL_ADD, fd, e))
}
func (poller *Poller) Mod(fd int, ev string) error {
 e := &unix.EpollEvent{
 Fd: int32(fd),
 }
switch ev {
case "r":
 e.Events = unix.EPOLLIN
case "w":
 e.Events = unix.EPOLLOUT
case "rw":
 e.Events = unix.EPOLLIN | unix.EPOLLOUT
default:
return fmt.Errorf("unknow epoll event type")
 }
return os.NewSyscallError("epoll_ctl mod", unix.EpollCtl(poller.fd,
unix.EPOLL_CTL_MOD, fd, e))
}
func (poller *Poller) Del(fd int) error {
return os.NewSyscallError("epoll_ctl del", unix.EpollCtl(poller.fd,
unix.EPOLL_CTL_DEL, fd, nil))
}
// 接下來是epoll_wait的調用,對應的事件通過回調函數返回給上層:
func (poller *Poller) Polling(eventHandler func(fd int32, ev uint32)) error {
 evs := make([]unix.EpollEvent, EPollEventSize)
for {
 n, err := unix.EpollWait(poller.fd, evs, 0)
if err != nil {
 log.Printf("epoll_wait err: %v", err)
 }
if n < 0 && err == unix.EINTR {
continue
 }
if err != nil {
return os.NewSyscallError("epoll_wait", err)
 }
for i := 0; i < n; i++ {
 eventHandler(evs[i].Fd, evs[i].Events)
 }
 }
}

端口重用:SO_REUSEPORT

SO_REUSEPORT 是 linux 3.9 版本新添加的,支持多個進程或線程綁定到同一地址端口。有了該選項之後,每個進 程或者線程都有屬於自己的 server socket,避免鎖的競爭,可以充分利用到 CPU 多核資源。有了該選項之後, 可以考慮這樣一種結構:每個 goroutine 擁有一個 server socket,對應的擁有 epoll fd,實際上就是 epoll-per-goroutine 結構,最大化利用 CPU 多核資源:代碼上實現起來相當簡單,遵照 SO_REUSEPORT 的使用規則即可:

func NewUDPSocket(network, addr string, reusePort bool) (int, unix.Sockaddr,
error) {
var sa unix.Sockaddr
 udpAddr, err := net.ResolveUDPAddr(network, addr)
if err != nil {
return 0, nil, fmt.Errorf("resolve addr err: %v", err)
 }
 netFamily := unix.AF_INET
if udpAddr.IP.To4() == nil {
 netFamily = unix.AF_INET6
 }
// listen socket fd
 syscall.ForkLock.Lock()
 fd, err := unix.Socket(netFamily,
unix.SOCK_DGRAM|unix.SOCK_NONBLOCK|unix.SOCK_CLOEXEC, unix.IPPROTO_UDP)
if err == nil {
 unix.CloseOnExec(fd)
 }
 syscall.ForkLock.Unlock()
defer func() {
if err != nil {
 _ = unix.Close(fd)
 }
 }()
switch network {
case "udp":
 sockaddr := &unix.SockaddrInet4{}
 sockaddr.Port = udpAddr.Port
 sa = sockaddr
case "udp4":
 sockaddr := &unix.SockaddrInet4{}
 sockaddr.Port = udpAddr.Port
copy(sockaddr.Addr[:], udpAddr.IP.To4())
 sa = sockaddr
case "udp6":
// IPv6 zone
 sockaddr := &unix.SockaddrInet6{}
copy(sockaddr.Addr[:], udpAddr.IP.To16())
if udpAddr.Zone != "" {
var iface *net.Interface
 iface, err = net.InterfaceByName(udpAddr.Zone)
if err != nil {
return 0, nil, fmt.Errorf("parse UDPAddr.Zone err: %v", err)
 }
 sockaddr.ZoneId = uint32(iface.Index)
 }
 sockaddr.Port = udpAddr.Port
 netFamily = unix.AF_INET6
default:
return 0, nil, fmt.Errorf("not support network")
 }
if reusePort {
if err = os.NewSyscallError("setsockopt", unix.SetsockoptInt(fd,
unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)); err != nil {
return 0, nil, err
 }
 }
if err = os.NewSyscallError("bind", unix.Bind(fd, sa)); err != nil {
return 0, nil, err
 }
return fd, sa, nil
}

使用 recvmmsg 代替 recvmsg

我們知道在調用 recvmsg 時會將收到的數據從內核空間拷貝至用戶空間,每調用一次就會產生一次內核開銷, 短時間內接收超大量的數據包累積起來的內核開銷也很可觀了,所以從 linux 2.6.33 開始,新增了recvmmsg,允許用戶一次性接收多個數據包,對於recvmmsg的說明可以參考:recvmmsg document,這裏主要說下如何在 Go 中調用recvmmsg。``recvmmsg依賴以下幾個結構:

#include <sys/socket.h>
struct mmsghdr {
struct msghdr msg_hdr; /* Message header */
unsigned int msg_len; /* Number of received bytes for header */
};
struct iovec { /* Scatter/gather array items */
void *iov_base; /* Starting address */
size_t iov_len; /* Number of bytes to transfer */
};
struct msghdr {
void *msg_name; /* Optional address */
socklen_t msg_namelen; /* Size of address */
struct iovec *msg_iov; /* Scatter/gather array */
size_t msg_iovlen; /* # elements in msg_iov */
void *msg_control; /* Ancillary data, see below */
size_t msg_controllen; /* Ancillary data buffer len */
int msg_flags; /* Flags on received message */
};

帶外數據不在本文考慮範圍內,因此msg_controlmsg_controllen可以忽略,其中iovec爲接收數據緩衝 區,本質上recvmmsg的傳入參數就是 mmsghdr 數組,數據長度即爲期望收到多少個數據包,理解這個結構之 後就好辦了,我們可以構建在 Go 中構建對應的數據結構通過unix.syscall6實現調用recvmmsg。調用recvmmsg之前

func prepare(n, mtu int) ([]mmsghdr, [][]byte, [][]byte) {
 mms := make([]mmsghdr, n)
 buffers := make([][]byte, n)
 names := make([][]byte, n)
for i := range mms {
 buffers[i] = make([]byte, mtu)
 names[i] = make([]byte, sizeofSockaddrInet6)
 v := []iovec{
 {Base: (*byte)(unsafe.Pointer(&buffers[i][0])), Len:
uint64(len(buffers[i]))},
 }
 mms[i].Hdr.Iov = &v[0]
 mms[i].Hdr.Iovlen = uint64(len(v))
 mms[i].Hdr.Name = (*byte)(unsafe.Pointer(&names[i][0]))
 mms[i].Hdr.Namelen = uint32(len(names[i]))
// ignore mms[i].Hdr.Control and mms[i].Hdr.Controllen
 }
return mms, buffers, names
}

調用recvmmsg

func (rw *ReaderWriter) read() (int, error) {
 n, _, err := unix.Syscall6(unix.SYS_RECVMMSG, uintptr(rw.fd),
uintptr(unsafe.Pointer(&rw.msgs[0])), uintptr(len(rw.msgs)),
unix.MSG_WAITFORONE,
0, 0,
 )
if err != 0 {
if err == unix.EAGAIN || err == unix.EWOULDBLOCK {
return 0, nil
 }
return 0, os.NewSyscallError("recvmmsg", fmt.Errorf("%v",
unix.ErrnoName(err)))
 }
return int(n), nil
}

至於mmsghr結構,可以參考:golang.org/x/net/internal/socket/zsys_linux_amd64.go。至於從mmsghdr結構中解析到遠端地址和數據,可翻閱 linux 文檔。

網卡多隊列綁定 CPU 核心優化

我們知道,如果一個 socket 的所有操作都固定在某個 CPU 核心上是能獲得一定的性能提升,如果網卡支持多隊 列可以嘗試這樣一種優化方案:將網卡多隊列均勻綁定到 CPU 多核心上,同時設置SO_INCOMING_CPU屬性,將 socket 的處理與某個 CPU 核心綁定,同時邏輯線程與某個 CPU 核心進行親和性綁定,最終的結果是:某個邏輯線 程上總是處理特定的 socket 操作,簡單來說就是路寬了,每條路上都井然有序,擁擠程度降級,性能得到提 升。那麼在 Go 中能否實現這種優化方案?很可惜我沒有找到明確的方法實施這種方案,主要原因是 Go 刻意弱化了線 程概念和操作,在 Go 中無法直接設置線程和 CPU 核心的親和性以實現上述目的,有線索的同學可以指點一下。即便如此,但將網絡多隊列均勻到 CPU 多核心上是具有意義的。在實際測試中發現,偶爾會出現吞吐量下降, 重現率不高,偶然發現是某個 CPU 核心壓力過高,查了網卡隊列數據流向之後發現某些核心比較繁忙,開啓網 卡多隊列綁定到各個 CPU 核心上之後再次測試各個核心的壓力都比較均勻,不至於會出現某個核心壓力過高影 響 runtime 調度。關於這部分內容,這篇文章說的比較好可以作爲優化參考:TCP 加速技術解決方案

考慮 CPU Cache

在我們的代碼結構上實現了類似事件循環(event-loop),在這個 event-loop 中調用epoll_wait;前面我們說 過epoll_wait的事件是通過回調函數回調到 event-loop 中,也就是說會 event-loop 會被頻繁的讀寫訪問,此時 就有可能會出現 event-loop 在 CPU 中的訪問命中率下降,其原理:單個 CPU 核在讀取一個變量時,以 cache line 的方式將後續的變量也讀取進來,緩存在自己這個核的 cache 中,而後續的變量也可能被其他 CPU 核並行緩存。當前面的 CPU 對前面的變量進行寫入時,該變量同樣是以 cache line 爲單位寫回內存。此時在其他核上,儘管緩 存的是該變量之後的變量,但是由於沒法區分自身變量是否被修改,所以它只能認爲自己的緩存失效,重新從 內存中讀取。爲了能夠讓 CPU 儘快從高速緩衝中訪問到 event-loop 變量,有必要讓 event-loop 結構恰好填滿一個 cache line, 避免重複寫回,至於手段上比較簡單,即在結構中按照 cache line 大小填充無意義數組變量。

系統調用分離

在壓測過程中,我們發現當 PPS 達到 70w/s 之後數據再也上不去了,通過 pprof 看到是系統調用開銷,我們所涉 及到的幾個系統調用均爲阻塞的,阻塞調用在一定程度上會影響吞吐量,解決辦法是再獨立出 goroutine 專門負 責系統調用,避免阻塞 event-loop。通過上述的優化內容,我實現了最簡代碼,在 12 核(E5-2640 2.5GHz)24G 機器上跑出了 128w PPS 的數據,還 有一些比較細化的優化點,如降低 GC 頻率在此不表,做此類優化的難點在於扣細節,結合 pprof 和實際測試數 據逐點分析哪部分可能會影響吞吐量,哪種優化方案能有效應對,需要反覆對比測試數據,有時候還需要考慮 到代碼結構上的實現,文中內容略淺顯,表述不當的地方請指正。本文測試代碼在這裏:fastudp

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/BuZwjRNwuKx19PU6E95Oeg