Go netpoll (下篇)- 數據接收發送和關閉
接收 TCP 連接流程
TCP 連接對象
type TCPConn struct {
conn
}
type conn struct {
fd *netFD
}
Conn 接口
Conn 表示通用的面向流的網絡連接。
type Conn interface {
Read(b []byte) (n int, err error)
Write(b []byte) (n int, err error)
Close() error
LocalAddr() Addr
RemoteAddr() Addr
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
}
接收 TCP 連接
TCPListener (TCP 監聽對象) 的 Accept 方法返回一個 TCP 連接對象。
func (l *TCPListener) Accept() (Conn, error) {
...
c, err := l.accept()
...
return c, nil
}
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
...
tc := newTCPConn(fd)
...
return tc, nil
}
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
...
return netfd, nil
}
FD.Accept 方法內部不斷輪詢調用 accept 方法獲取 TCP 連接並處理相應的錯誤。
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
...
for {
// 輪詢調用 accept 方法獲取 TCP 連接
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
...
}
return -1, nil, errcall, err
}
}
accept 方法內部封裝了一層 系統調用 accept
,返回一個非阻塞的文件描述符。
func accept(s int) (int, syscall.Sockaddr, string, error) {
// 先嚐試 accept4 調用,如果報錯了,改用 accept
// nonblock: 設置爲非阻塞模式
// accept4 通過 1 次系統調用完成 accept 和設置 nonblock 兩個操作
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
switch err {
case nil:
return ns, sa, "", nil
...
}
// accept 通過 2 次系統調用完成 accept 和設置 nonblock 兩個操作
ns, sa, err = AcceptFunc(s)
...
if err = syscall.SetNonblock(ns, true); err != nil {
...
}
return ns, sa, "", nil
}
newTCPConn 方法返回一個包裝好的 TCP 連接對象。
func newTCPConn(fd *netFD) *TCPConn {
c := &TCPConn{conn{fd}}
setNoDelay(c.fd, true)
return c
}
接收 TCP 連接流程圖
TCPAccept
數據接收和發送
接收方法
接收數據的對象是具體的 TCP 連接,所以從 conn.Read 方法開始。
func (c *conn) Read(b []byte) (int, error) {
...
n, err := c.fd.Read(b)
...
return n, err
}
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
// 文件描述符保活機制
runtime.KeepAlive(fd)
return n, wrapSyscallError(readSyscallName, err)
}
FD.Read 方法內部不斷輪詢 系統調用 Read
並處理相應的錯誤。
func (fd *FD) Read(p []byte) (int, error) {
...
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
// 如果沒有可用數據,拋出 syscall.EAGAIN
// 將當前連接所在的 goroutine 休眠
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
...
// runtime_pollWait 通過鏈接器指向了 poll_runtime_pollWait
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
poll_runtime_pollWait 方法等待網絡文件描述符準備好讀或寫 (讀寫取決於參數 mode)。
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
...
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
}
return pollNoError
}
netpollblock 方法用於檢測網絡文件描述符準備好讀或寫。
// 如果 IO 已經準備好,返回 true
// 如果 IO 已經超時或關閉,返回 false
// 如果 waitio 參數爲 true, 阻塞等待 IO 完成, 忽略錯誤
// 禁止使用同一種模式併發調用 netpollblock
// 因爲 pollDesc 只能爲每種模式保存 1 個等待的 goroutine
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
if gpp.CompareAndSwap(pdReady, 0) {
return true
}
if gpp.CompareAndSwap(0, pdWait) {
break
}
if v := gpp.Load(); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}
if waitio || netpollcheckerr(pd, mode) == pollNoError {
// 休眠 goroutine, 等待 IO 完成
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
...
return old == pdReady
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// 增加等待網絡輪詢器的 goroutine 數量
// 調度器使用這個值決定是否阻塞,如果沒有其他工作的情況下,調度器會阻塞等待網絡輪詢器的 IO 事件
atomic.Xadd(&netpollWaiters, 1)
}
return r
}
發送方法
發送數據的對象是具體的 TCP 連接,所以從 conn.Write 方法開始。
func (c *conn) Write(b []byte) (int, error) {
...
n, err := c.fd.Write(b)
...
return n, err
}
func (fd *netFD) Write(p []byte) (nn int, err error) {
nn, err = fd.pfd.Write(p)
// 文件描述符保活機制
runtime.KeepAlive(fd)
return nn, wrapSyscallError(writeSyscallName, err)
}
FD.Write 方法內部不斷輪詢 系統調用 Write
並處理相應的錯誤。
func (fd *FD) Write(p []byte) (int, error) {
...
var nn int
for {
...
n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
...
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
...
}
}
代碼執行到這裏,後面的流程就和 Read 接收數據
流程一樣了,這裏不再贅述。
func (pd *pollDesc) waitWrite(isFile bool) error {
return pd.wait('w', isFile)
}
小結
數據發送和接收流程圖
網絡輪詢器
netpoll 方法用於檢測網絡輪詢器並返回已經就緒的 goroutine 列表。
// 輪詢檢測準備就緒的網絡連接
// 返回一個可運行 (可讀/可寫/可讀寫) 的 goroutine 列表
// 參數規則:
// delay < 0: 無限阻塞
// delay == 0: 非阻塞
// delay > 0: 阻塞時間 (單位: 納秒)
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
...
// 每次讀取 128 個 IO 事件
var events [128]epollevent
retry:
// 調用 epoll_wait 獲取接收到的 IO 事件
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
...
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
...
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
netpollready(&toRun, pd, mode)
}
}
return toRun
}
netpollready 方法表示網絡文件描述符關聯的 IO 事件已經就緒,並將參數 pd 網絡文件描述符內部的 goroutine 添加到參數隊列中。
// 參數 toRun 是一個 goroutine 列表
// 參數 mode 規則
// 'r': IO 讀
// 'w': IO 寫
// 'r'+'w': IO 讀寫
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
netpollunblock 方法將網絡文件描述符中的讀信號或者寫信號轉換爲 pdReady 狀態,然後返回存儲在內部的 goroutine。
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
...
for {
...
var new uintptr
if ioready {
new = pdReady
}
if gpp.CompareAndSwap(old, new) {
if old == pdWait {
old = 0
}
return (*g)(unsafe.Pointer(old))
}
}
}
小結
網絡輪詢器調用關係圖
netpoll 方法會返回一個可運行的 goroutine
列表,然後調用方會將返回的 goroutine
逐個加入處理器的本地隊列或者全局隊列。 從圖中可以看到調用方主要有 4 個,其中調度線程 schedule
和監控線程 sysmon
在 GMP 調度器一文中已經講過了,這裏不再贅述,剩下的 GC 和 STW 後面有機會再講。
超時控制
接收數據超時
conn.SetReadDeadline 方法設置連接的接收數據超時時間。
func (c *conn) SetReadDeadline(t time.Time) error {
...
if err := c.fd.SetReadDeadline(t); err != nil {
return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
}
return nil
}
func (fd *netFD) SetWriteDeadline(t time.Time) error {
return fd.pfd.SetWriteDeadline(t)
}
func (fd *FD) SetWriteDeadline(t time.Time) error {
return setDeadlineImpl(fd, t, 'w')
}
func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
...
runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
return nil
}
poll_runtime_pollSetDeadline 方法會設置參數 pd 網絡文件描述符內部的定時器 (goroutine 持有),並在定時器到期後進行相關的操作。
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
// 主要是對 pd 進行定時器的相關設置,這裏直接跳過這部分內容
...
// 如果截止時間已經過期,取消等待 IO 而導致的阻塞
var rg, wg *g
if pd.rd < 0 {
rg = netpollunblock(pd, 'r', false)
}
if pd.wd < 0 {
wg = netpollunblock(pd, 'w', false)
}
// 如果有取消讀事件的 goroutine, 則進行喚醒
if rg != nil {
netpollgoready(rg, 3)
}
// 如果有取消寫事件的 goroutine, 則進行喚醒
if wg != nil {
netpollgoready(wg, 3)
}
}
func netpollgoready(gp *g, traceskip int) {
atomic.Xadd(&netpollWaiters, -1)
goready(gp, traceskip+1)
}
發送數據超時
發送數據超時和接收數據流程基本一致,只是調用的方法不同,這裏就不再展開了。
關閉連接
conn.Close 方法用於關閉網絡連接。
func (c *conn) Close() error {
...
err := c.fd.Close()
...
return err
}
func (fd *netFD) Close() error {
runtime.SetFinalizer(fd, nil)
return fd.pfd.Close()
}
func (fd *FD) Close() error {
...
fd.pd.evict()
...
return err
}
evict 方法會關閉網絡文件描述符,並取消所有阻塞在等待該文件描述符的 IO 事件。
func (pd *pollDesc) evict() {
...
runtime_pollUnblock(pd.runtimeCtx)
}
func poll_runtime_pollUnblock(pd *pollDesc) {
...
pd.closing = true
var rg, wg *g
rg = netpollunblock(pd, 'r', false)
wg = netpollunblock(pd, 'w', false)
...
if rg != nil {
netpollgoready(rg, 3)
}
if wg != nil {
netpollgoready(wg, 3)
}
}
流程圖
關閉連接流程圖
小結
本文用一個基礎的服務器網絡程序爲示例,分析了網絡標準庫中的端口監聽、接收連接、發送 / 接收數據, 關閉連接 4 個主要流程的 Linux 版本實現代碼。 Go 網絡標準庫通過在底層封裝 epoll
實現了 IO 多路複用,通過網絡輪詢器加 GMP 調度器避免了傳統網絡編程中的線程切換和 IO 阻塞,兩者的完美配合是 Go 網絡編程高性能的基石。
鏈接
[1]
RPC 漫談: 連接問題: https://blog.joway.io/posts/deep-into-rpc-connection/
[2]
RPC 漫談:序列化問題: https://blog.joway.io/posts/deep-into-rpc-serialization/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/_yn2Efytq8F8ovf43di23g