Go 原生網絡輪詢器(netpoller)剖析

關於 C10K

C10K 問題是上個十年的問題,C10K 的意思就是服務端單機不能承受超過 1w 個客戶端連接,就算擴容對應的 CPU、內存的物理資源也無效,在 2022 年看來,這個問題看起來很奇怪,現在各個大廠吹噓的不都是百萬甚至千萬連接的服務器嗎?1w 連接算啥?

以史爲鑑, 可以知興替

我們可以瞭解下當初 C10K 問題的背景是啥,瓶頸在什麼地方,再用發展的眼光看待問題,可以收穫更多。一個服務器該怎麼處理一個來自客戶端的連接,最簡單的想法就是我們可以分配一個進程,讓每個進程和自己負責的連接做交互,連接關閉時就回收進程,對應資源的調度交給操作系統,實現這部分邏輯就是一個網絡服務器了。對應的瓶頸在於操作系統的進程數是有限的,外加操作系統自己的也有不少的進程數,對操作系統的壓力是很大的。

什麼是多路複用?

既然進程數是服務器的瓶頸,原來的進程和連接的關係是 1:1 的關係,那自然而然的想法就是把 1:1 變爲 1:n,這就是所謂的多路複用了。

多路複用的核心就是用一個線程來監聽多個網絡 io,網絡 io 是個複雜的東西,io 可以分爲阻塞 io 和非阻塞 io,何爲阻塞,簡單來說就是一個線程進行某種 io 操作的時候需要等待 io 操作完成纔會返回,一致阻塞在操作上,非阻塞 io 就是立馬返回,然後定時去輪詢 io 操作是否完成。

非阻塞 io 和阻塞 io 都有自己存在的問題,比如阻塞 io 等待的時候阻塞了線程,非阻塞 io 如何定義輪詢的頻次,如果輪詢頻次高了,會有 cpu 資源的消耗,頻次低了會增加客戶端的響應時間。

阻塞 io

非阻塞 io

通常多路複用需要和非阻塞 io 配合使用。

常見的多路複用的方法有 select、poll、epoll 三種,這裏簡單說下 epoll,epoll 底層的數據結構是紅黑樹和就緒鏈表,操作系統對用戶層提供了加入 epoll 的方法,當服務端的 fd 初始化完成之後通過調用加入 epoll 的方法把 fd 交給操作系統監聽,加入 epoll 方法調用後操作系統會把 fd 加入紅黑樹,同時會註冊一箇中斷處理函數,當 fd 處於就緒狀態(這裏可以分爲邊緣觸發水平觸發模式,兩種模式的就緒狀態的判斷有些差異增加 fd 的時候可以設置模式)的時候會觸發中斷處理函數把對應 fd 加入到就緒鏈表中,當用戶程序調用 epoll_wait 的時候就可以把就緒鏈表返回,用戶程序只需要處理就緒的 fd 即可。

很多初學者(包括我自己)可能很難理解 epoll,主要的原因感覺是網絡上對於 epoll 的講解魚龍混雜很容易看的一頭霧水,建議自己寫一個 demo 就可以理解了,這裏給個簡單的 demo。

#include<stdio.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#include<unistd.h>
#include<ctype.h>
#define MAXLEN 1024
#define SERV_PORT 8000
#define MAX_OPEN_FD 1024

int main(int argc,char *argv[])
{
    int  listenfd,connfd,efd,ret;
    char buf[MAXLEN];
    struct sockaddr_in cliaddr,servaddr;
    socklen_t clilen = sizeof(cliaddr);
    struct epoll_event tep,ep[MAX_OPEN_FD];

    listenfd = socket(AF_INET,SOCK_STREAM,0);

    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(SERV_PORT);
    bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
    listen(listenfd,20);
    // 創建一個epoll fd
    efd = epoll_create(MAX_OPEN_FD);
    tep.events = EPOLLIN;tep.data.fd = listenfd;
    // 把監聽socket 先添加到efd中
    ret = epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&tep);
    // 循環等待
    for (;;)
    {
        // 返回已就緒的epoll_event,-1表示阻塞,沒有就緒的epoll_event,將一直等待
        size_t nready = epoll_wait(efd,ep,MAX_OPEN_FD,-1);
        for (int i = 0; i < nready; ++i)
        {
            // 如果是新的連接,需要把新的socket添加到efd中
            if (ep[i].data.fd == listenfd )
            {
                connfd = accept(listenfd,(struct sockaddr*)&cliaddr,&clilen);
                tep.events = EPOLLIN;
                tep.data.fd = connfd;
                ret = epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep);
            }
            // 否則,讀取數據
            else
            {
                connfd = ep[i].data.fd;
                int bytes = read(connfd,buf,MAXLEN);
                // 客戶端關閉連接
                if (bytes == 0){
                    ret =epoll_ctl(efd,EPOLL_CTL_DEL,connfd,NULL);
                    close(connfd);
                    printf("client[%d] closed\n", i);
                }
                else
                {
                    for (int j = 0; j < bytes; ++j)
                    {
                        buf[j] = toupper(buf[j]);
                    }
                    // 向客戶端發送數據
                    write(connfd,buf,bytes);
                }
            }
        }
    }
    return 0;
}

go netpoller 剖析

可以先簡單看個 tcp 協議的 echo server demo,可以用nc 127.0.0.1 8990去連接測試

package main

import (
 "log"
 "net"
 "strings"
)

func handler(conn net.Conn){
 defer conn.Close()
 const BUFF_SIZE = 1024
 var buff = make([]byte,BUFF_SIZE)
 for {
  n,err := conn.Read(buff)
  if err != nil{
   log.Println(err)
   break
  }
  if n > 0{
   if strings.Contains(string(buff),"exist") {
    log.Println("close connection",conn.LocalAddr())
    break
   }
   log.Println(string(buff[:n-1]))
  }
 }
}

func startServer(){
 l,err := net.Listen("tcp","127.0.0.1:8990")
 if err != nil{
  log.Println("err",err)
  return
 }
 for {
  c,err := l.Accept()
  if err != nil{
   log.Println(err)
   continue
  }
  log.Println("accept connection")
  go handler(c)
 }

}

func main() {
 startServer()
}

可以看到,核心的函數有三個,分別是net.Listenl.Acceptconn.Read,接下來將會一一解析。

在解析之前,我們可以明確一些信息:

  1. go net 包中 server 的實現是基於 epoll(這裏指的是 linux 平臺,其他平臺有其他的多路複用的實現例如 kqueue,go 目前已經能支持跨平臺了)

  2. go net 的連接模型會爲每個連接都建立一個 goroutinue,這個 goroutinue 在對應 fd 沒有被就緒的時候會被 park 住(什麼叫做被 park 住,簡單來說就是這個 goroutinue 被打包了之後找到一個地方存,直到 socket 就緒的時候會被喚醒),合適的時候會被喚醒(喚醒是由 runtime 層做的,需要先了解下 go 的 GMP)

net.Listen

我們初始化使用的是 tcp 的服務器,所以大概的調用路徑如下

listenTCP->internetSocket->socket->newFD,newFD這個函數是比較重要的,這裏引出了 net 包中兩個重要的數據結構 netFD 和 poll.FD,poll.FD 下面還有一個關鍵的數據結構 pollDesc(這三個數據結構可以理解爲對操作系統接口調用的層層封裝)。

func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
 ret := &netFD{
  pfd: poll.FD{
   Sysfd:         sysfd,
   IsStream:      sotype == syscall.SOCK_STREAM,
   ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
  },
  family: family,
  sotype: sotype,
  net:    net,
 }
 return ret, nil
}

在 pollDesc 中有個 init 方法

func (pd *pollDesc) init(fd *FD) error {
 serverInit.Do(runtime_pollServerInit)
 ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
 if errno != 0 {
  return errnoErr(syscall.Errno(errno))
 }
 pd.runtimeCtx = ctx
 return nil
}

其中有個核心函數 runtime_pollOpen,我們仔細看看它的實現

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
        ...
 errno := netpollopen(fd, pd)
 if errno != 0 {
  pollcache.free(pd)
  return nil, int(errno)
 }
 return pd, 0
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
 var ev epollevent
 ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
 *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
 return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd)&ev)
}

在_poll_runtime_pollOpen_ 函數中可以看到最終是調用了 epollctl 的操作系統接口把我們 fd 放入(epoll__create,epoll_waite,epoll 三個函數可以看上面的 epoll demo_)

l.Accept

這個函數的作用是 accept 得到一個連接(返回的連接初始化的時候會加入 epoll 中

沿着調用路徑查看下去,最終可以看到 poll.FD 中的 Accept 函數,我們需要重點關注fd.pd.waitRead這個函數

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
        ...
 for {
  s, rsa, errcall, err := accept(fd.Sysfd)
  if err == nil {
   return s, rsa, "", err
  }
  switch err {
  case syscall.EINTR:
   continue
  case syscall.EAGAIN:
                        // 重點看這裏
   if fd.pd.pollable() {
    if err = fd.pd.waitRead(fd.isFile); err == nil {
     continue
    }
   }
  case syscall.ECONNABORTED:
   // This means that a socket on the listen
   // queue was closed before we Accept()ed it;
   // it's a silly error, so try again.
   continue
  }
  return -1, nil, errcall, err
 }
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
 if waitio || netpollcheckerr(pd, mode) == 0 {
  gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
 }
}

最終會調用netpollblock,當 waitio 爲 false 的時候就會調用 gopark 暫停當前 goroutinue,也就是如果當前 socket 沒有處於就緒狀態,會把 goroutinue park 住,使其不使用 cpu 資源進行空轉之類的操作。

那這裏被 park 住了,那麼有個問題,這個 goroutinue 又是怎麼被重新調度起來的呢

對應 epoll_wait 的執行函數在 netpoll_epoll.go 文件中_

func netpoll(delay int64) gList {
...
}

netpoll函數的作用是通過 epoll_wait 去拿到就緒的 goroutinue 的列表,可以看到函數的返回是 gList。

這個netpoll在 findrunnable 函數中被調用,findrunnable 函數會被 schedule 函數調用,瞭解 GMP 模型的話可以知道,schedule 是爲了讓可執行的 goroutinue 再次調度起來。除了 schedule 函數 sysmon 函數中也會調用 netpoll,sysmon 是一個 golang runtime 自帶的監控任務,不需要額外的 P 就可以綁定運行,程序運行時會分配一個線程去執行 sysmon 任務,sysmon 可以用來搶佔一些陷入系統調用或者超時運行的協程,以及定時的 gc。

conn.Read

conn.Read 的邏輯和 Accept 類似,都是基於非阻塞 IO + 多路複用 + GMP 實現的,可以簡單總結一下

  1. 當調用 conn.Read 的時候先會執行 fd 的 read 方法,當 fd 沒有數據可以讀的時候會返回 EAGAIN

  2. 當返回 EAGAIN 的時候表示沒有數據,需要重試

  3. 這時候就會被 gopark 住

  4. 當 socket 處於就緒狀態的時候會把對應協程喚醒,執行業務邏輯,總的來說就是 event loop 不需要我們去維護,runtime 的 sysmon 以及 schedule 會幫我們維護 event loop

簡易的 go netpoller 模型

存在的問題

go netpoller 存在的問題是每個連接都需要一個 goroutinue,雖然 goroutinue 分配的棧空間和其他資源相比線程來說是很小的,但是依舊需要一定內存佔用,當應用程序的連接數上百萬的時候還是容易出現瓶頸。

其他高性能的網絡庫

基於上面存在的問題,目前比較成熟的方案是多 reactor 模型,大概一個 event loop 去做多路複用,然後把就緒的連接交給後續的工作線程 / 協程或者是線程 / 協程池去處理,包括 redis 和 nginx 都用上了類似模型,關於 go 語言的話,netpoller 和 gnet 都是多 reactor 模型的實現。

轉自:

zhuanlan.zhihu.com/p/463017601

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/oDLYJqkwF2Em_hcRNLZ9qg