Go netpoll (上篇)- 數據結構和初始化
概述
下面是一個基礎的服務器網絡程序,主要包含如下功能:
-
監聽 TCP 連接,綁定 8888 端口
-
收到新的客戶端連接後,啓動一個新的
goroutine
進行處理 -
收到客戶端的數據後,不做任何處理,原樣返回
package main
import (
"log"
"net"
)
func main() {
// 初始化監聽
listener, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: []byte("127.0.0.1"),
Port: 8888,
})
if err != nil {
panic(err)
}
for {
// 接收請求
conn, err := listener.Accept()
if err != nil {
panic(err)
}
// 啓動 1 個 goroutine 處理請求
go handle(conn)
}
}
// 處理客戶端連接請求
func handle(conn net.Conn) {
defer func() {
_ = conn.Close()
}()
buf := make([]byte, 1024)
for {
// 接收數據
n, err := conn.Read(buf[:])
if err != nil {
log.Printf("conn Read %v", err)
break
}
// 如果接收到了數據,原樣返回
if n > 0 {
// 發送數據
_, err = conn.Write(buf)
}
}
}
上述代碼採用了類似 同步模型 代碼的方式實現了功能,但是這種方式真的可以支撐高性能網絡編程嗎?答案就在隱藏在同步模型後面的底層系統調用和網絡輪詢器。
前置知識複習
在正式開始研究源代碼之前,先來複習兩個基礎知識點。
1. 多路複用接口
I/O 多路複用於處理同一個事件循環中的多個 I/O 事件,這裏的「多路」指多個 IO 事件,「複用」指處理事件的程序 (線程) 是同一個。
Go 網絡標準庫和一般的 接口
約束形式不同,並沒有明確給出具體的 多路複用接口,但是不同平臺上面都實現瞭如下幾個方法:
// 初始化網絡輪詢器
func netpollinit() {}
// 檢測網絡文件描述符是否被網絡輪詢器使用
func netpollIsPollDescriptor(fd uintptr) bool {}
// 創建監聽事件並監聽網絡文件描述符
func netpollopen(fd uintptr, pd *pollDesc) int32 {}
// 刪除網絡文件描述符
func netpollclose(fd uintptr) int32 {}
// 檢測網絡輪詢器並返回已經就緒的 goroutine 列表
func netpoll(delay int64) gList {}
// 喚醒網絡輪詢器
func netpollBreak() {}
例如 Linux
實現直接複用了底層 epoll
的相關方法, 方法定義在 $GOROOT/src/runtime/netpoll_epoll.go
文件中,MacOS
實現直接複用了底層 kqueue
, 方法定義在 $GOROOT/src/runtime/netpoll_kqueue.go
文件中,其他平臺以此類推。
最後,編譯器利用條件編譯規則,根據不同的平臺編譯對應的代碼,例如 Linux
直接編譯 $GOROOT/src/runtime/netpoll_epoll.go
文件。
2. epoll API
epoll 是 Linux 系統提供的一種 I/O 多路複用機制,它可以同時監聽多個文件描述符的 I/O 事件,當其中任意一個文件描述符發生 I/O 事件時,就會觸發相應的回調函數。與傳統的 select 和 poll 模型相比,epoll 的性能更好,具有更高的可擴展性和更好的業務邏輯處理能力。
epoll 的三個核心 API 如下:
-
epoll_create
: 創建一個新的epoll
實例,返回一個epoll
文件描述符,該文件描述符可用於epoll_ctl
和epoll_wait
函數調用 -
epoll_ctl
: 管理epoll
實例中的所有文件描述符 (內部使用紅黑樹數據結構進行管理),可以註冊、修改或刪除要監聽的文件描述符,設置相應的事件類型和回調函數 -
epoll_wait
: 等待任意文件描述符監聽的事件發生,當有事件觸發時,函數返回一個非零值,並將所有到達的事件按順序存入隊列 (數組) 中
內部實現
結合文章開頭的示例代碼,接下來我們一起探究 網絡輪詢器
的內部實現,相關文件目錄爲 $GOROOT/src/runtime
,筆者的 Go 版本爲 go1.19 linux/amd64
。
本文着重分析一下 netpoll
的數據結構以及 IO 讀寫流程中涉及到的一些底層方法。
文件描述符數據結構
文件描述符
FD 對象表示最基礎的文件描述符抽象,net 和 os 包使用該類型來表示網絡連接或操作系統文件。
type FD struct {
// 對 Sysfd 加鎖,串行化 Read 和 Write
fdmu fdMutex
// 操作系統的文件描述符
Sysfd int
// 網絡輪詢 IO 描述符
pd pollDesc
// 描述符關閉信號
csema uint32
// 是否阻塞模式
isBlocking uint32
// 區分當前描述符是一個 stream, 還是一個基於包的描述符 (區分 TCP/UDP)
// 不可變
IsStream bool
// 讀取零字節是否表示 EOF
ZeroReadIsEOF bool
// 區分當前描述符是一個文件,還是一個 socket
isFile bool
}
文件描述符初始化方法如下:
func (fd *FD) Init(net string, pollable bool) error {
...
err := fd.pd.init(fd)
...
return err
}
網絡文件描述符
netFD 對象表示網絡文件描述符。
type netFD struct {
pfd poll.FD // 包裝了一個 FD 結構體
// 下列字段在 Close 之前不可變
family int
sotype int
isConnected bool
net string
laddr Addr
raddr Addr
}
newFD 方法實例化一個 netFD 對象,並返回該對象的指針。
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
...
}
return ret, nil
}
網絡文件描述符方法如下:
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}
網絡輪詢 IO 文件描述符
pollDesc 對象表示網絡輪詢 IO 文件描述符,主要用於被 Go 的網絡輪詢器監聽狀態變化,是網絡底層實現中的核心對象。
這裏有一個需要學習的知識點: rg 字段和 wg 字段的數據類型都是 atomic.Uintptr
, 而且可以用來表示 4 種數據:
-
pdReady 信號
-
pdWait 信號
-
goroutine
-
nil
// pollDesc 包含兩種信號量,rg 和 wg, 可以表示多種狀態
// Tips: 通過將字段設置爲 atomic.Uintptr 類型 (效果和 guintptr 類似),可以支持多種類型表示
// 幾種信號量狀態:
// pdReady - IO 準備就緒
// pdWait - goroutine 準備休眠
// G pointer - goroutine 阻塞
const (
pdReady uintptr = 1
pdWait uintptr = 2
)
type pollDesc struct {
link *pollDesc // 鏈表結構 (後面的元素) 指針
fd uintptr
atomicInfo atomic.Uint32
rg atomic.Uintptr // 表示信號量,可能爲 pdReady、pdWait、等待文件描述符可讀的 goroutine 或者 nil
wg atomic.Uintptr // 表示信號量,可能爲 pdReady、pdWait、等待文件描述符可寫的 goroutine 或者 nil
lock mutex // 保護下面的字段
closing bool
rseq uintptr // 表示文件描述符被重用或者計時器被重置
rt timer // 可讀截至時間計時器
rd int64 // 等待文件描述符可讀截至時間,-1 表示過期 (goroutine 被喚醒)
wseq uintptr // 表示文件描述符被重用或者計時器被重置
wt timer // 可寫截至時間計時器
wd int64 // 等待文件描述符可寫截至時間,-1 表示過期 (goroutine 被喚醒)
}
輪詢文件描述符管理
pollCache 對象用來管理網絡 IO 文件描述符,內置了一個互斥鎖字段和一個 pollDesc 對象鏈表。
type pollCache struct {
lock mutex
// 指向一個 pollDesc 鏈表
first *pollDesc
}
數據結構圖
網絡文件描述符
Listen 流程
TCP 監聽流程圖
Listener 接口
type Listener interface {
// 返回一個實現了 Conn 接口的連接實例
Accept() (Conn, error)
Close() error
Addr() Addr
}
TCP 監聽對象
type TCPListener struct {
// 包裝了一個 netFD 對象
fd *netFD
lc ListenConfig
}
TCP 監聽
ListenTCP 方法返回一個 TCP 監聽對象的指針。
func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error) {
...
sl := &sysListener{network: network, address: laddr.String()}
ln, err := sl.listenTCP(context.Background(), laddr)
...
return ln, nil
}
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
...
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
...
family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}
獲取系統配置
listenerBacklog 方法緩存了系統全連接隊列配置參數值,內部通過內嵌 sync.Once 的方式,保證了僅調用一次 maxListenerBacklog 方法。
func listenerBacklog() int {
listenerBacklogCache.Do(func() { listenerBacklogCache.val = maxListenerBacklog() })
return listenerBacklogCache.val
}
maxListenerBacklog 方法用於獲取系統全連接隊列配置參數值。
// Linux 讀取配置文件
func maxListenerBacklog() int {
fd, err := open("/proc/sys/net/core/somaxconn")
...
n, _, ok := dtoi(f[0])
return n
}
創建 socket
socket 方法返回一個網絡文件描述符,該描述符使用網絡輪詢器異步接收數據。
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// 使用系統調用創建一個 socket 文件描述符
// 並將 socket 文件描述符包裝爲 netFD 對象
s, err := sysSocket(family, sotype, proto)
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
...
if laddr != nil && raddr == nil {
switch sotype {
// 基於流: TCP
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
// 1. 獲取系統配置
// 2. 綁定並監聽端口
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
...
}
// 基於數據報: UDP
case syscall.SOCK_DGRAM:
...
}
}
return fd, nil
}
綁定並監聽端口
listenStream 方法內部實現了 TCP 的綁定端口和監聽端口,並完成了 epoll
的初始化工作。
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
...
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
...
// 綁定端口由系統調用實現
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// 監聽端口由系統調用實現
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
// 初始化 epoll
if err = fd.init(); err != nil {
return err
}
...
return nil
}
epoll 初始化
serverInit 類型是 sync.Once
的類型別名,保證了 poll_runtime_pollServerInit 方法只會被調用一次 (也就是單個進程全局只有一個 epoll 實例,避免驚羣效應)。
func (pd *pollDesc) init(fd *FD) error {
// runtime_pollServerInit 通過鏈接器指向了 poll_runtime_pollServerInit
// 初始化 epoll
serverInit.Do(runtime_pollServerInit)
// runtime_pollOpen 通過鏈接器指向了 poll_runtime_pollOpen
// 將文件描述符加入 epoll 監聽
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
....
return nil
}
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
...
if netpollInited == 0 {
netpollinit()
}
...
}
}
netpollinit 方法實現了 多路複用接口
,主要用於網絡輪詢器 epoll
具體的初始化工作,和上面的 netpollGenericInit 方法一樣,該方法也只會被調用一次。
var (
// epoll 全局對象 (也是一個文件描述符)
// 相當於調用 epoll_create 函數返回的對象
// 後續的 epoll_ctl, epoll_wait 函數都是基於這個對象操作的
epfd int32 = -1
// 數據讀寫管道
netpollBreakRd, netpollBreakWr uintptr
// 標識變量,避免重複調用 netpollBreak 方法
netpollWakeSig uint32
)
func netpollinit() {
// 創建 epoll 描述符,賦值到全局變量 epfd
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
epfd = epollcreate(1024)
...
}
// 創建一個通信管道
r, w, errno := nonblockingPipe()
...
// 將用於讀取數據的文件描述符轉換爲 epollevent 結構,進行監聽
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
...
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
netpollopen 方法實現了 多路複用接口
,將新的文件描述符和監聽事件加入到全局變量 epfd
表示的網絡輪詢文件描述符。
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
...
errno := netpollopen(fd, pd)
...
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
小結
ListenTCP 方法內部實現了創建 socket,綁定端口,監聽端口三個操作,相對於傳統的 C 系列語言編程,將初始化過程簡化爲一個方法 API, 當方法執行完成後,epoll
也已經完成初始化工作,進入輪詢狀態等待連接到來以及 IO 事件。
ListenTCP
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Zl66w4q3jDrn7uvA_tPmYA