Go 網絡輪詢器 epoll Listen 實現
Go 運行時中支持 I/O 多路複用的網絡輪詢器(netpoll)在各種操作系統中都有相應的實現:
-
Linux
epoll
:src/runtime/netpoll_epoll.go -
Darwin(macOS)
kqueue
:src/runtime/netpoll_kqueue.go -
Windows
iocp
:src/runtime/netpoll_windows.go -
Solaris:src/runtime/netpoll_solaris.go
-
AIX:src/runtime/netpoll_aix.go
-
WebAssembly:src/runtime/netpoll_fake.go
本文將基於 Linux 操作系統,聚焦 Go 網絡輪詢器的 I/O 多路複用底層是如何基於 epoll
實現的。
所有的網絡操作都以網絡描述符 netFD 爲中心實現。netFD 與底層 PollDesc 結構綁定,當在一個 netFD 上讀寫遇到 EAGAIN 錯誤時,就將當前 goroutine 存儲到這個 netFD 對應的 PollDesc 中,同時調用 gopark 把當前 goroutine 給 park 住,直到這個 netFD 上再次發生讀寫事件,纔將此 goroutine 給 ready 激活重新運行。顯然,在底層通知 goroutine 再次發生讀寫等事件的方式就是 epoll/kqueue/iocp 等事件驅動機制。
首先看一眼 src/runtime/netpoll_epoll.go ,其中定義了幾個以 netpoll 爲前綴的函數:
-
func netpollinit()
-
func netpollopen(fd uintptr, pd *pollDesc) int32
-
func netpoll(delay int64) gList
-
func netpollclose(fd uintptr) int32
net.Listen
我們自行編寫 TCP server(https://github.com/smallnest/1m-go-tcp-server/blob/master/1_simple_tcp_server/server.go) 時,一定要 net.Listen
,並且傳入協議和監聽地址(端口),就先從這個函數入手:
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
if err != nil {
return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
}
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
}
if err != nil {
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
}
return l, nil
}
順着 sl.ListenTCP
方法找到其定義:
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
internetSocket
函數的返回值變量名稱看着和文件描述符有關,在 src/net/ipsock_posix.go 定義:
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
raddr = raddr.toLocal(net)
}
family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}
其返回值爲 netFD
結構的指針。
netFD
netFD 是 Golang 自己封裝的一個 “網絡文件描述符” 結構:
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
最主要的就是名爲 pfd
的字段,是一個 poll.FD
結構,一眼就能看出來和 epoll 有關:
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// Writev cache.
iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed.
csema uint32
// Non-zero if this file has been set to blocking mode.
isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool
// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool
// Whether this is a file rather than a network socket.
isFile bool
}
poll.FD
又包含了兩個重要字段,Sysfd
就是真正的 Linux 操作系統分配的文件描述符,而 pollDesc
結構則是對底層事件驅動的封裝:
type pollDesc struct {
runtimeCtx uintptr
}
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
真正的 pollDesc
結構在 src/runtime/netpoll.go 文件中:
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
// that will blow up when GC starts moving objects.
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}
包含一個自身類型的指針,這是要組鏈表的節奏。表頭是一個叫 pollCache
的結構:
type pollCache struct {
lock mutex
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}
但是這條以 pollDesc
爲表頭的鏈表的方法名稱卻與行爲相反,alloc
方法從鏈表中取出表頭指向的 pollDesc
節點,而 free
方法反而是將新的 pollDesc
節點掛接到表頭。
socket
我們繼續回到上面 internetSocket
函數,它在返回時又調用了 socket
函數:
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
poll.CloseFunc(s)
return nil, err
}
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
// This function makes a network file descriptor for the
// following applications:
//
// - An endpoint holder that opens a passive stream
// connection, known as a stream listener
//
// - An endpoint holder that opens a destination-unspecific
// datagram connection, known as a datagram listener
//
// - An endpoint holder that opens an active stream or a
// destination-specific datagram connection, known as a
// dialer
//
// - An endpoint holder that opens the other connection, such
// as talking to the protocol stack inside the kernel
//
// For stream and datagram listeners, they will only require
// named sockets, so we can assume that it's just a request
// from stream or datagram listeners when laddr is not nil but
// raddr is nil. Otherwise we assume it's just for dialers or
// the other connection holders.
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
}
if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
sysSocket
函數當然是要追的,應該離真正的系統調用不遠了:
func sysSocket(family, sotype, proto int) (int, error) {
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
// On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were
// introduced in 2.6.27 kernel and on FreeBSD both flags were
// introduced in 10 kernel. If we get an EINVAL error on Linux
// or EPROTONOSUPPORT error on FreeBSD, fall back to using
// socket without them.
switch err {
case nil:
return s, nil
default:
return -1, os.NewSyscallError("socket", err)
case syscall.EPROTONOSUPPORT, syscall.EINVAL:
}
// See ../syscall/exec_unix.go for description of ForkLock.
syscall.ForkLock.RLock()
s, err = socketFunc(family, sotype, proto)
if err == nil {
syscall.CloseOnExec(s)
}
syscall.ForkLock.RUnlock()
if err != nil {
return -1, os.NewSyscallError("socket", err)
}
if err = syscall.SetNonblock(s, true); err != nil {
poll.CloseFunc(s)
return -1, os.NewSyscallError("setnonblock", err)
}
return s, nil
}
socketFunc func(int, int, int) (int, error) = syscall.Socket
果然在這裏就有 Linux 的 socket 系統調用創建一個非阻塞的 socket 並返回其文件描述符,在實例化 netFD
時要用的。
再往下看 fd.listenStream
方法:
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
if ctrlFn != nil {
c, err := newRawConn(fd)
if err != nil {
return err
}
if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
return err
}
}
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
listenFunc func(int, int) error = syscall.Listen
完成了對 socket 的 bind 和 listen,至此 socket 從初始化到監聽都走完,就差 accept 了。
繼續往下是 netFD
初始化 init
:
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}
找到 poll.FD
結構的 Init
方法:
func (fd *FD) Init(net string, pollable bool) error {
// We don't actually care about the various network types.
if net == "file" {
fd.isFile = true
}
if !pollable {
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd)
if err != nil {
// If we could not initialize the runtime poller,
// assume we are using blocking mode.
fd.isBlocking = 1
}
return err
}
pollDesc
結構的 init
方法,剛纔就見過了:
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
終於看到和 epoll 相關的函數了 runtime_pollServerInit 還有 runtime_pollOpen
:
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
go:linkname
註釋會引導編譯器將 runtime_pollServerInit
函數鏈接至 src/runtime/netpoll.go 中的 poll_runtime_pollServerInit
函數,一開始就看到的 netpollinit
來了:
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
epfd = epollcreate(1024)
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
netpollinit
函數中創建了一個 epoll 實例,並將返回的 epoll 文件描述符賦值給全局的 epfd
變量。
epollcreate1
函數在 Golang 源碼中是一串彙編代碼 https://github.com/golang/go/blob/go1.16.3/src/runtime/sys_linux_amd64.s#L696-L702:
#define SYS_epoll_create1 291
// int32 runtime·epollcreate1(int32 flags);
TEXT runtime·epollcreate1(SB),NOSPLIT,$0
MOVL flags+0(FP), DI
MOVL $SYS_epoll_create1, AX
SYSCALL
MOVL AX, ret+8(FP)
RET
雖然看不太懂但是 SYS_epoll_create1
與 Linux 系統調用表中的 epoll_create1
系統調用編號是一致的(291)。
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
if pd.wg != 0 && pd.wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
if pd.rg != 0 && pd.rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
pd.closing = false
pd.everr = false
pd.rseq++
pd.rg = 0
pd.rd = 0
pd.wseq++
pd.wg = 0
pd.wd = 0
pd.self = pd
unlock(&pd.lock)
var errno int32
errno = netpollopen(fd, pd)
return pd, int(errno)
}
go:linkname
註釋會引導編譯器將 runtime_pollOpen
函數鏈接至 src/runtime/netpoll.go 中的 poll_runtime_pollOpen
函數。
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
netpollopen
將 socket 文件描述符註冊到 epfd
代表的 epoll 實例中。
epollctl
也是彙編代碼 https://github.com/golang/go/blob/go1.16.3/src/runtime/sys_linux_amd64.s#L704-#L713:
#define SYS_epoll_ctl 233
// func epollctl(epfd, op, fd int32, ev *epollEvent) int
TEXT runtime·epollctl(SB),NOSPLIT,$0
MOVL epfd+0(FP), DI
MOVL op+4(FP), SI
MOVL fd+8(FP), DX
MOVQ ev+16(FP), R10
MOVL $SYS_epoll_ctl, AX
SYSCALL
MOVL AX, ret+24(FP)
RET
最後我們來看一下 epoll_create1
和 epoll_ctl
系統調用的相關文檔:
-
epoll_create1
If flags is 0, then, other than the fact that the obsolete size argument is dropped, epoll_create1() is the same as epoll_create().
EPOLL_CLOEXEC
Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor. See the description of the O_CLOEXEC flag in open(2) for reasons why this may be useful.
-
epoll_ctl
This system call is used to add, modify, or remove entries in the interest list of the epoll(7) instance referred to by the file descriptor epfd. It requests that the operation op be performed for the target file descriptor, fd.
epoll_create
創建一個 epoll 實例並返回其文件描述符;epoll_ctl
註冊想要監控的文件描述符和關心的 I/O 事件(_EPOLLIN
、_EPOLLOUT
、_EPOLLRDHUP
、_EPOLLET
)到 epoll 實例上。至於 epoll_wait
,將在 Accept 篇中討論。
轉自:
blog.crazytaxii.com/posts/golang_linux_netpoll1/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/NUIueOOGkQyE2WV-ib2C3w