Go 網絡輪詢器 epoll Listen 實現

Go 運行時中支持 I/O 多路複用的網絡輪詢器(netpoll)在各種操作系統中都有相應的實現:

本文將基於 Linux 操作系統,聚焦 Go 網絡輪詢器的 I/O 多路複用底層是如何基於 epoll 實現的。

所有的網絡操作都以網絡描述符 netFD 爲中心實現。netFD 與底層 PollDesc 結構綁定,當在一個 netFD 上讀寫遇到 EAGAIN 錯誤時,就將當前 goroutine 存儲到這個 netFD 對應的 PollDesc 中,同時調用 gopark 把當前 goroutine 給 park 住,直到這個 netFD 上再次發生讀寫事件,纔將此 goroutine 給 ready 激活重新運行。顯然,在底層通知 goroutine 再次發生讀寫等事件的方式就是 epoll/kqueue/iocp 等事件驅動機制。

首先看一眼 src/runtime/netpoll_epoll.go ,其中定義了幾個以 netpoll 爲前綴的函數:

net.Listen

我們自行編寫 TCP server(https://github.com/smallnest/1m-go-tcp-server/blob/master/1_simple_tcp_server/server.go) 時,一定要 net.Listen,並且傳入協議和監聽地址(端口),就先從這個函數入手:

func Listen(network, address string) (Listener, error) {
    var lc ListenConfig
    return lc.Listen(context.Background(), network, address)
}

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
    addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
    if err != nil {
        return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
    }
    sl := &sysListener{
        ListenConfig: *lc,
        network:      network,
        address:      address,
    }
    var l Listener
    la := addrs.first(isIPv4)
    switch la := la.(type) {
    case *TCPAddr:
        l, err = sl.listenTCP(ctx, la)
    case *UnixAddr:
        l, err = sl.listenUnix(ctx, la)
    default:
        return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
    }
    if err != nil {
        return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
    }
    return l, nil
}

順着 sl.ListenTCP 方法找到其定義:

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
    fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
    if err != nil {
        return nil, err
    }
    return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

internetSocket 函數的返回值變量名稱看着和文件描述符有關,在 src/net/ipsock_posix.go 定義:

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
        raddr = raddr.toLocal(net)
    }
    family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
    return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

其返回值爲 netFD 結構的指針。

netFD

netFD 是 Golang 自己封裝的一個 “網絡文件描述符” 結構:

type netFD struct {
    pfd poll.FD

    // immutable until Close
    family      int
    sotype      int
    isConnected bool // handshake completed or use of association with peer
    net         string
    laddr       Addr
    raddr       Addr
}

最主要的就是名爲 pfd 的字段,是一個 poll.FD 結構,一眼就能看出來和 epoll 有關:

type FD struct {
    // Lock sysfd and serialize access to Read and Write methods.
    fdmu fdMutex

    // System file descriptor. Immutable until Close.
    Sysfd int

    // I/O poller.
    pd pollDesc

    // Writev cache.
    iovecs *[]syscall.Iovec

    // Semaphore signaled when file is closed.
    csema uint32

    // Non-zero if this file has been set to blocking mode.
    isBlocking uint32

    // Whether this is a streaming descriptor, as opposed to a
    // packet-based descriptor like a UDP socket. Immutable.
    IsStream bool

    // Whether a zero byte read indicates EOF. This is false for a
    // message based socket connection.
    ZeroReadIsEOF bool

    // Whether this is a file rather than a network socket.
    isFile bool
}

poll.FD 又包含了兩個重要字段,Sysfd 就是真正的 Linux 操作系統分配的文件描述符,而 pollDesc 結構則是對底層事件驅動的封裝:

type pollDesc struct {
    runtimeCtx uintptr
}

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        if ctx != 0 {
            runtime_pollUnblock(ctx)
            runtime_pollClose(ctx)
        }
        return errnoErr(syscall.Errno(errno))
    }
    pd.runtimeCtx = ctx
    return nil
}

真正的 pollDesc 結構在 src/runtime/netpoll.go 文件中:

type pollDesc struct {
    link *pollDesc // in pollcache, protected by pollcache.lock

    // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
    // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
    // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
    // proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
    // in a lock-free way by all operations.
    // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
    // that will blow up when GC starts moving objects.
    lock    mutex // protects the following fields
    fd      uintptr
    closing bool
    everr   bool      // marks event scanning error happened
    user    uint32    // user settable cookie
    rseq    uintptr   // protects from stale read timers
    rg      uintptr   // pdReady, pdWait, G waiting for read or nil
    rt      timer     // read deadline timer (set if rt.f != nil)
    rd      int64     // read deadline
    wseq    uintptr   // protects from stale write timers
    wg      uintptr   // pdReady, pdWait, G waiting for write or nil
    wt      timer     // write deadline timer
    wd      int64     // write deadline
    self    *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}

包含一個自身類型的指針,這是要組鏈表的節奏。表頭是一個叫 pollCache 的結構:

type pollCache struct {
    lock  mutex
    first *pollDesc
    // PollDesc objects must be type-stable,
    // because we can get ready notification from epoll/kqueue
    // after the descriptor is closed/reused.
    // Stale notifications are detected using seq variable,
    // seq is incremented when deadlines are changed or descriptor is reused.
}

但是這條以 pollDesc 爲表頭的鏈表的方法名稱卻與行爲相反,alloc 方法從鏈表中取出表頭指向的 pollDesc 節點,而 free 方法反而是將新的 pollDesc 節點掛接到表頭。

socket

我們繼續回到上面 internetSocket 函數,它在返回時又調用了 socket 函數:

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    s, err := sysSocket(family, sotype, proto)
    if err != nil {
        return nil, err
    }
    if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }
    if fd, err = newFD(s, family, sotype, net); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }

    // This function makes a network file descriptor for the
    // following applications:
    //
    // - An endpoint holder that opens a passive stream
    //   connection, known as a stream listener
    //
    // - An endpoint holder that opens a destination-unspecific
    //   datagram connection, known as a datagram listener
    //
    // - An endpoint holder that opens an active stream or a
    //   destination-specific datagram connection, known as a
    //   dialer
    //
    // - An endpoint holder that opens the other connection, such
    //   as talking to the protocol stack inside the kernel
    //
    // For stream and datagram listeners, they will only require
    // named sockets, so we can assume that it's just a request
    // from stream or datagram listeners when laddr is not nil but
    // raddr is nil. Otherwise we assume it's just for dialers or
    // the other connection holders.

    if laddr != nil && raddr == nil {
        switch sotype {
        case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
            if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
        case syscall.SOCK_DGRAM:
            if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
        }
    }
    if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
        fd.Close()
        return nil, err
    }
    return fd, nil
}

sysSocket 函數當然是要追的,應該離真正的系統調用不遠了:

func sysSocket(family, sotype, proto int) (int, error) {
    s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
    // On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were
    // introduced in 2.6.27 kernel and on FreeBSD both flags were
    // introduced in 10 kernel. If we get an EINVAL error on Linux
    // or EPROTONOSUPPORT error on FreeBSD, fall back to using
    // socket without them.
    switch err {
    case nil:
        return s, nil
    default:
        return -1, os.NewSyscallError("socket", err)
    case syscall.EPROTONOSUPPORT, syscall.EINVAL:
    }

    // See ../syscall/exec_unix.go for description of ForkLock.
    syscall.ForkLock.RLock()
    s, err = socketFunc(family, sotype, proto)
    if err == nil {
        syscall.CloseOnExec(s)
    }
    syscall.ForkLock.RUnlock()
    if err != nil {
        return -1, os.NewSyscallError("socket", err)
    }
    if err = syscall.SetNonblock(s, true); err != nil {
        poll.CloseFunc(s)
        return -1, os.NewSyscallError("setnonblock", err)
    }
    return s, nil
}

socketFunc        func(int, int, int) (int, error)  = syscall.Socket

果然在這裏就有 Linux 的 socket 系統調用創建一個非阻塞的 socket 並返回其文件描述符,在實例化 netFD 時要用的。

再往下看 fd.listenStream 方法:

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    var err error
    if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
        return err
    }
    var lsa syscall.Sockaddr
    if lsa, err = laddr.sockaddr(fd.family); err != nil {
        return err
    }
    if ctrlFn != nil {
        c, err := newRawConn(fd)
        if err != nil {
            return err
        }
        if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
            return err
        }
    }
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }
    if err = fd.init(); err != nil {
        return err
    }
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

listenFunc        func(int, int) error              = syscall.Listen

完成了對 socket 的 bind 和 listen,至此 socket 從初始化到監聽都走完,就差 accept 了。

繼續往下是 netFD 初始化 init

func (fd *netFD) init() error {
    return fd.pfd.Init(fd.net, true)
}

找到 poll.FD 結構的 Init 方法:

func (fd *FD) Init(net string, pollable bool) error {
    // We don't actually care about the various network types.
    if net == "file" {
        fd.isFile = true
    }
    if !pollable {
        fd.isBlocking = 1
        return nil
    }
    err := fd.pd.init(fd)
    if err != nil {
        // If we could not initialize the runtime poller,
        // assume we are using blocking mode.
        fd.isBlocking = 1
    }
    return err
}

pollDesc 結構的 init 方法,剛纔就見過了:

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        if ctx != 0 {
            runtime_pollUnblock(ctx)
            runtime_pollClose(ctx)
        }
        return errnoErr(syscall.Errno(errno))
    }
    pd.runtimeCtx = ctx
    return nil
}

終於看到和 epoll 相關的函數了 runtime_pollServerInit 還有 runtime_pollOpen

//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
    netpollGenericInit()
}

func netpollGenericInit() {
    if atomic.Load(&netpollInited) == 0 {
        lockInit(&netpollInitLock, lockRankNetpollInit)
        lock(&netpollInitLock)
        if netpollInited == 0 {
            netpollinit()
            atomic.Store(&netpollInited, 1)
        }
        unlock(&netpollInitLock)
    }
}

go:linkname 註釋會引導編譯器將 runtime_pollServerInit 函數鏈接至 src/runtime/netpoll.go 中的 poll_runtime_pollServerInit 函數,一開始就看到的 netpollinit 來了:

func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd < 0 {
        epfd = epollcreate(1024)
        if epfd < 0 {
            println("runtime: epollcreate failed with", -epfd)
            throw("runtime: netpollinit failed")
        }
        closeonexec(epfd)
    }
    r, w, errno := nonblockingPipe()
    if errno != 0 {
        println("runtime: pipe failed with", -errno)
        throw("runtime: pipe failed")
    }
    ev := epollevent{
        events: _EPOLLIN,
    }
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
    if errno != 0 {
        println("runtime: epollctl failed with", -errno)
        throw("runtime: epollctl failed")
    }
    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}

netpollinit 函數中創建了一個 epoll 實例,並將返回的 epoll 文件描述符賦值給全局的 epfd 變量。

epollcreate1 函數在 Golang 源碼中是一串彙編代碼 https://github.com/golang/go/blob/go1.16.3/src/runtime/sys_linux_amd64.s#L696-L702:

#define SYS_epoll_create1	291

// int32 runtime·epollcreate1(int32 flags);
TEXT runtime·epollcreate1(SB),NOSPLIT,$0
    MOVL	flags+0(FP), DI
    MOVL	$SYS_epoll_create1, AX
    SYSCALL
    MOVL	AX, ret+8(FP)
    RET

雖然看不太懂但是 SYS_epoll_create1 與 Linux 系統調用表中的 epoll_create1 系統調用編號是一致的(291)。

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    pd := pollcache.alloc()
    lock(&pd.lock)
    if pd.wg != 0 && pd.wg != pdReady {
        throw("runtime: blocked write on free polldesc")
    }
    if pd.rg != 0 && pd.rg != pdReady {
        throw("runtime: blocked read on free polldesc")
    }
    pd.fd = fd
    pd.closing = false
    pd.everr = false
    pd.rseq++
    pd.rg = 0
    pd.rd = 0
    pd.wseq++
    pd.wg = 0
    pd.wd = 0
    pd.self = pd
    unlock(&pd.lock)

    var errno int32
    errno = netpollopen(fd, pd)
    return pd, int(errno)
}

go:linkname 註釋會引導編譯器將 runtime_pollOpen 函數鏈接至 src/runtime/netpoll.go 中的 poll_runtime_pollOpen 函數。

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd)&ev)
}

netpollopen 將 socket 文件描述符註冊到 epfd 代表的 epoll 實例中。

epollctl 也是彙編代碼 https://github.com/golang/go/blob/go1.16.3/src/runtime/sys_linux_amd64.s#L704-#L713:

#define SYS_epoll_ctl		233

// func epollctl(epfd, op, fd int32, ev *epollEvent) int
TEXT runtime·epollctl(SB),NOSPLIT,$0
    MOVL	epfd+0(FP), DI
    MOVL	op+4(FP), SI
    MOVL	fd+8(FP), DX
    MOVQ	ev+16(FP), R10
    MOVL	$SYS_epoll_ctl, AX
    SYSCALL
    MOVL	AX, ret+24(FP)
    RET

最後我們來看一下 epoll_create1epoll_ctl 系統調用的相關文檔:

epoll_create 創建一個 epoll 實例並返回其文件描述符;epoll_ctl 註冊想要監控的文件描述符和關心的 I/O 事件(_EPOLLIN_EPOLLOUT_EPOLLRDHUP_EPOLLET)到 epoll 實例上。至於 epoll_wait,將在 Accept 篇中討論。

轉自:

blog.crazytaxii.com/posts/golang_linux_netpoll1/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/NUIueOOGkQyE2WV-ib2C3w