Go netpoll (上篇)- 數據結構和初始化

概述

下面是一個基礎的服務器網絡程序,主要包含如下功能:

package main

import (
 "log"
 "net"
)

func main() {
 // 初始化監聽
 listener, err := net.ListenTCP("tcp"&net.TCPAddr{
  IP:   []byte("127.0.0.1"),
  Port: 8888,
 })
 if err != nil {
  panic(err)
 }

 for {
  // 接收請求
  conn, err := listener.Accept()
  if err != nil {
   panic(err)
  }

  // 啓動 1 個 goroutine 處理請求
  go handle(conn)
 }
}

// 處理客戶端連接請求
func handle(conn net.Conn) {
 defer func() {
  _ = conn.Close()
 }()

 buf := make([]byte, 1024)
 for {
  // 接收數據
  n, err := conn.Read(buf[:])
  if err != nil {
   log.Printf("conn Read %v", err)
   break
  }
  // 如果接收到了數據,原樣返回
  if n > 0 {
   // 發送數據
   _, err = conn.Write(buf)
  }
 }
}

上述代碼採用了類似 同步模型 代碼的方式實現了功能,但是這種方式真的可以支撐高性能網絡編程嗎?答案就在隱藏在同步模型後面的底層系統調用和網絡輪詢器。

前置知識複習

在正式開始研究源代碼之前,先來複習兩個基礎知識點。

1. 多路複用接口

I/O 多路複用於處理同一個事件循環中的多個 I/O 事件,這裏的「多路」指多個 IO 事件,「複用」指處理事件的程序 (線程) 是同一個。

Go 網絡標準庫和一般的 接口 約束形式不同,並沒有明確給出具體的 多路複用接口,但是不同平臺上面都實現瞭如下幾個方法:

// 初始化網絡輪詢器 
func netpollinit() {}

// 檢測網絡文件描述符是否被網絡輪詢器使用
func netpollIsPollDescriptor(fd uintptr) bool {}

// 創建監聽事件並監聽網絡文件描述符
func netpollopen(fd uintptr, pd *pollDesc) int32 {}

// 刪除網絡文件描述符
func netpollclose(fd uintptr) int32 {}

// 檢測網絡輪詢器並返回已經就緒的 goroutine 列表
func netpoll(delay int64) gList {}

// 喚醒網絡輪詢器
func netpollBreak() {}

例如 Linux 實現直接複用了底層 epoll 的相關方法, 方法定義在 $GOROOT/src/runtime/netpoll_epoll.go 文件中,MacOS 實現直接複用了底層 kqueue, 方法定義在 $GOROOT/src/runtime/netpoll_kqueue.go 文件中,其他平臺以此類推。

最後,編譯器利用條件編譯規則,根據不同的平臺編譯對應的代碼,例如 Linux 直接編譯 $GOROOT/src/runtime/netpoll_epoll.go 文件。

2. epoll API

epoll 是 Linux 系統提供的一種 I/O 多路複用機制,它可以同時監聽多個文件描述符的 I/O 事件,當其中任意一個文件描述符發生 I/O 事件時,就會觸發相應的回調函數。與傳統的 select 和 poll 模型相比,epoll 的性能更好,具有更高的可擴展性和更好的業務邏輯處理能力。

epoll 的三個核心 API 如下:

  1. epoll_create: 創建一個新的 epoll 實例,返回一個 epoll 文件描述符,該文件描述符可用於 epoll_ctlepoll_wait 函數調用

  2. epoll_ctl   : 管理 epoll 實例中的所有文件描述符 (內部使用紅黑樹數據結構進行管理),可以註冊、修改或刪除要監聽的文件描述符,設置相應的事件類型和回調函數

  3. epoll_wait  : 等待任意文件描述符監聽的事件發生,當有事件觸發時,函數返回一個非零值,並將所有到達的事件按順序存入隊列 (數組) 中

內部實現

結合文章開頭的示例代碼,接下來我們一起探究 網絡輪詢器 的內部實現,相關文件目錄爲 $GOROOT/src/runtime,筆者的 Go 版本爲 go1.19 linux/amd64

本文着重分析一下 netpoll 的數據結構以及 IO 讀寫流程中涉及到的一些底層方法。

文件描述符數據結構

文件描述符

FD 對象表示最基礎的文件描述符抽象,net 和 os 包使用該類型來表示網絡連接或操作系統文件。

type FD struct {
 // 對 Sysfd 加鎖,串行化 Read 和 Write
 fdmu fdMutex

 // 操作系統的文件描述符
 Sysfd int

 // 網絡輪詢 IO 描述符
 pd pollDesc

 // 描述符關閉信號
 csema uint32

 // 是否阻塞模式
 isBlocking uint32

 // 區分當前描述符是一個 stream, 還是一個基於包的描述符 (區分 TCP/UDP)
 // 不可變
 IsStream bool

 // 讀取零字節是否表示 EOF
 ZeroReadIsEOF bool

 // 區分當前描述符是一個文件,還是一個 socket
 isFile bool
}

文件描述符初始化方法如下:

func (fd *FD) Init(net string, pollable bool) error {
    ...
 err := fd.pd.init(fd)
    ...
 return err
}

網絡文件描述符

netFD 對象表示網絡文件描述符。

type netFD struct {
 pfd poll.FD // 包裝了一個 FD 結構體

 // 下列字段在 Close 之前不可變
 family      int
 sotype      int
 isConnected bool
 net         string
 laddr       Addr
 raddr       Addr
}

newFD 方法實例化一個 netFD 對象,並返回該對象的指針。

func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
 ret := &netFD{
  ...
 }
 return ret, nil
}

網絡文件描述符方法如下:

func (fd *netFD) init() error {
 return fd.pfd.Init(fd.net, true)
}

網絡輪詢 IO 文件描述符

pollDesc 對象表示網絡輪詢 IO 文件描述符,主要用於被 Go 的網絡輪詢器監聽狀態變化,是網絡底層實現中的核心對象。

這裏有一個需要學習的知識點: rg 字段和 wg 字段的數據類型都是 atomic.Uintptr, 而且可以用來表示 4 種數據:

  1. pdReady 信號

  2. pdWait 信號

  3. goroutine

  4. nil

// pollDesc 包含兩種信號量,rg 和 wg, 可以表示多種狀態
// Tips: 通過將字段設置爲 atomic.Uintptr 類型 (效果和 guintptr 類似),可以支持多種類型表示

// 幾種信號量狀態:
// pdReady - IO 準備就緒
// pdWait - goroutine 準備休眠  
// G pointer - goroutine 阻塞
const (
    pdReady uintptr = 1
    pdWait  uintptr = 2
)

type pollDesc struct {
 link *pollDesc // 鏈表結構 (後面的元素) 指針
 fd   uintptr   
 
 atomicInfo atomic.Uint32
 
 rg atomic.Uintptr // 表示信號量,可能爲 pdReady、pdWait、等待文件描述符可讀的 goroutine 或者 nil
 wg atomic.Uintptr // 表示信號量,可能爲 pdReady、pdWait、等待文件描述符可寫的 goroutine 或者 nil

 lock    mutex     // 保護下面的字段
 closing bool
 rseq    uintptr   // 表示文件描述符被重用或者計時器被重置
 rt      timer     // 可讀截至時間計時器
 rd      int64     // 等待文件描述符可讀截至時間,-1 表示過期 (goroutine 被喚醒)
 wseq    uintptr   // 表示文件描述符被重用或者計時器被重置
 wt      timer     // 可寫截至時間計時器
 wd      int64     // 等待文件描述符可寫截至時間,-1 表示過期 (goroutine 被喚醒)
}

輪詢文件描述符管理

pollCache 對象用來管理網絡 IO 文件描述符,內置了一個互斥鎖字段和一個 pollDesc 對象鏈表。

type pollCache struct {
 lock  mutex
 // 指向一個 pollDesc 鏈表
 first *pollDesc
}

數據結構圖

網絡文件描述符

Listen 流程

TCP 監聽流程圖

Listener 接口

type Listener interface {
 // 返回一個實現了 Conn 接口的連接實例
 Accept() (Conn, error)
 
 Close() error
 
 Addr() Addr
}

TCP 監聽對象

type TCPListener struct {
 // 包裝了一個 netFD 對象
 fd *netFD   
 lc ListenConfig
}

TCP 監聽

ListenTCP 方法返回一個 TCP 監聽對象的指針。

func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error) {
 ...
 
 sl := &sysListener{network: network, address: laddr.String()}
 ln, err := sl.listenTCP(context.Background(), laddr)
 
 ...

 return ln, nil
}

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
 fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
    
 ...
 
 return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
 ...
 
 family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
 return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

獲取系統配置

listenerBacklog 方法緩存了系統全連接隊列配置參數值,內部通過內嵌 sync.Once 的方式,保證了僅調用一次 maxListenerBacklog 方法。

func listenerBacklog() int {
 listenerBacklogCache.Do(func() { listenerBacklogCache.val = maxListenerBacklog() })
 return listenerBacklogCache.val
}

maxListenerBacklog 方法用於獲取系統全連接隊列配置參數值。

// Linux 讀取配置文件
func maxListenerBacklog() int {
 fd, err := open("/proc/sys/net/core/somaxconn")
 ...
 n, _, ok := dtoi(f[0])
 return n
}

創建 socket

socket 方法返回一個網絡文件描述符,該描述符使用網絡輪詢器異步接收數據。

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
 // 使用系統調用創建一個 socket 文件描述符
 // 並將 socket 文件描述符包裝爲 netFD 對象 
 s, err := sysSocket(family, sotype, proto)
 if fd, err = newFD(s, family, sotype, net); err != nil {
     poll.CloseFunc(s)
     return nil, err
 }
 
 ...

 if laddr != nil && raddr == nil {
  switch sotype {
   // 基於流: TCP
   case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
     // 1. 獲取系統配置
     // 2. 綁定並監聽端口
     if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
        ...
     }
  
   // 基於數據報: UDP
   case syscall.SOCK_DGRAM:
      ...
   }
 }

 return fd, nil
}

綁定並監聽端口

listenStream 方法內部實現了 TCP 的綁定端口和監聽端口,並完成了 epoll 的初始化工作。

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
 ...
 
 var lsa syscall.Sockaddr
 if lsa, err = laddr.sockaddr(fd.family); err != nil {
  return err
 }

 ...
 
 // 綁定端口由系統調用實現
 if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
  return os.NewSyscallError("bind", err)
 }
 // 監聽端口由系統調用實現
 if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
  return os.NewSyscallError("listen", err)
 }
 // 初始化 epoll
 if err = fd.init(); err != nil {
    return err
 }
 
 ...
 
 return nil
}

epoll 初始化

serverInit 類型是 sync.Once 的類型別名,保證了 poll_runtime_pollServerInit 方法只會被調用一次 (也就是單個進程全局只有一個 epoll 實例,避免驚羣效應)。

func (pd *pollDesc) init(fd *FD) error {
 // runtime_pollServerInit 通過鏈接器指向了 poll_runtime_pollServerInit
 // 初始化 epoll
 serverInit.Do(runtime_pollServerInit)
 
 // runtime_pollOpen 通過鏈接器指向了 poll_runtime_pollOpen
 // 將文件描述符加入 epoll 監聽
 ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
 ....
 return nil
}
func poll_runtime_pollServerInit() {
 netpollGenericInit()
}

func netpollGenericInit() {
 if atomic.Load(&netpollInited) == 0 {
        ...
  if netpollInited == 0 {
   netpollinit()
  }
  ...
 }
}

netpollinit 方法實現了 多路複用接口,主要用於網絡輪詢器 epoll 具體的初始化工作,和上面的 netpollGenericInit 方法一樣,該方法也只會被調用一次。

var (
    // epoll 全局對象 (也是一個文件描述符)
    // 相當於調用 epoll_create 函數返回的對象
    // 後續的 epoll_ctl, epoll_wait 函數都是基於這個對象操作的
    epfd int32 = -1

    // 數據讀寫管道
    netpollBreakRd, netpollBreakWr uintptr

    // 標識變量,避免重複調用 netpollBreak 方法
    netpollWakeSig uint32
)

func netpollinit() {
    // 創建 epoll 描述符,賦值到全局變量 epfd
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd < 0 {
      epfd = epollcreate(1024)
      ...
    }

    // 創建一個通信管道
    r, w, errno := nonblockingPipe()

   ...

    // 將用於讀取數據的文件描述符轉換爲 epollevent 結構,進行監聽
    ev := epollevent{
      events: _EPOLLIN,
    }
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd

    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)

    ...

    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}

netpollopen 方法實現了 多路複用接口,將新的文件描述符和監聽事件加入到全局變量 epfd 表示的網絡輪詢文件描述符。

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    ...
    errno := netpollopen(fd, pd)
    ...
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd)&ev)
}

小結

ListenTCP 方法內部實現了創建 socket,綁定端口,監聽端口三個操作,相對於傳統的 C 系列語言編程,將初始化過程簡化爲一個方法 API, 當方法執行完成後,epoll 也已經完成初始化工作,進入輪詢狀態等待連接到來以及 IO 事件。

ListenTCP

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Zl66w4q3jDrn7uvA_tPmYA