萬字解析 golang netpoll 底層原理

1 基礎理論鋪墊

1.1 io 多路複用

在正式開始,我們有必要作個預熱,提前理解一下所謂io多路複用的概念.

拆解多路複用一詞,所謂多路,指的是存在多個待服務目標,而複用,指的是重複利用一個單元來爲上述的多個目標提供服務.

聊到 io 多路複用時,我比較希望舉一個經營餐廳的例子——一個餐館在運營過程中,考慮到人力成本,一個服務員往往需要同時爲多名不同的顧客提供服務,這個服務過程本質上就屬於多路複用.

下面我們就以這個餐廳的例子作爲輔助,來一起從零到一地推演一遍 io 多路複用技術的形成思路.

1)單點阻塞 io 模型

在 linux 系統中,一切皆爲文件,即一切事物都可以抽象化爲一個文件句柄 file descriptor,後續簡稱 fd.

比如服務端希望接收來自客戶端的連接,其中一個實現方式就是讓線程 thread 以阻塞模式對 socket fd 發起 accept 系統調用,這樣當有連接到達時,thread 即可獲取結果;當沒有連接就緒事件時,thread 則會因 accept 操作而陷入阻塞態.

這樣阻塞模式的好處就在於,thread 可以在所依賴事件未就緒時,通過阻塞的模式讓渡出 cpu 執行權,在後續條件就緒時再被喚醒,這樣就能做到忙閒有度,提高 cpu 的利用率.

這樣表述完,大家未必能直接感受到該方式存在的侷限,我們將其翻譯成餐廳的例子——這就好比是餐廳爲每名顧客提供一位專屬服務員進行一對一服務的(單點),專屬服務員只關注特定顧客的指令,在此之前完全處於沉默待命狀態(阻塞態),對其他客人的傳喚也是充耳不聞.

而上述方式存在的不足之處就在於人力成本. 我們一名服務員只能爲一名顧客提供服務,做不到複用,顯得有點兒浪費. 於是接下來演進的方向所需要圍繞的目標就是——降本增效.

2)多點輪詢 + 非阻塞 io 模型

要複用,就得做到讓一個 thread 能同時監聽多個 fd,只要任意其一有就緒事件到達,就能被 thread 接收處理. 在此前提下,accept 的阻塞調用模式就需要被摒棄,否則一旦某個 fd 連接未就緒時,thread 就會立刻被 block 住,而無法兼顧到其他 fd 的情況.

於是我們可以令 thread 採用非阻塞輪詢的方式,一一對每個 fd 執行非阻塞模式下的 accept 指令:此時倘若有就緒的連接,就能立即獲得並做處理;若沒有就緒事件,accept 也會立刻返回錯誤結果(EAGAIN) ,thread 可以選擇忽略跳過,並立即開始下一次輪詢行爲.

上述方式倒是實現複用了,但其背後存在什麼問題呢?

同樣用餐廳的例子加以說明. 餐廳規定一個服務員需要同時爲多名指定的顧客提供服務,但這名服務員需要輾轉騰挪各餐桌之間,輪流不間斷地對每名客人進行主動問詢,即便得到回覆基本都是否定的,但他也一刻都不允許停歇. 這樣的操作模式下,即使客人不嫌煩,這個服務員自己也會被這種高強度的無效互動行爲給折騰到筋疲力盡.

相信這樣解釋完,大家也能看出問題所在. 在這種模式下,thread 不間斷地在對每個 fd 發起非阻塞系統調用,倘若各 fd 都沒有就緒事件,那麼 thread 就只會一直持續着無意義的空轉行爲,這無疑是一種對 cpu 資源的浪費.

3)io 多路複用

到了這裏,大家可能就會問了,餐廳能否人性化一些,雖然我們希望讓服務生與顧客之間建立一對多的服務關係,但是服務生可以基於顧客的主動招呼再採取響應,而在客人沒有明確訴求時,服務生可以小憩一會兒,一方面養足體力,另一方面也避免對客人產生打擾.

是的,這個解決方案聽起來似乎是順理成章的,然而放到計算機領域可能就並非如此了. 用戶態 thread 是一名視聽能力不好的服務生,他無法同時精確接收到多名顧客的主動傳喚,只能通過一一向顧客問詢的方式(系統調用)來獲取信息,這就是用戶態視角的侷限性.

於是爲了解決上述問題,io 多路複用技術就應運而生了. 它能在單個指令層面支持讓用戶態 thread 同時對多個 fd 發起監聽,調用模式還可以根據使用需要調整爲非阻塞、阻塞或超時模式.

在 linux 系統中,io 多路複用技術包括 select、poll、epoll. 在隨後的章節中我們將重點針對 epoll 展開介紹,並進一步揭示 golang io 模型底層對 epoll 的應用及改造.

1.2 epoll 核心知識

epoll 全稱 EventPoll,顧名思義,是一種以事件回調機制實現的 io 多路複用技術.

epoll 是一個指令組,其中包含三個指令:

以上述三個指令作爲主線,我們通過流程串聯的方式來揭示 epoll 底層實現原理.

1)epoll_create

extern int epoll_create (int __size) __THROW;

通過 epoll_create 可以開闢一片內核空間用於承載 epoll 事件表,在表中可以註冊一系列關心的 fd 、相應的監聽事件類型以及回調時需要攜帶的數據.

epoll 事件表是基於紅黑樹實現的 key-value 有序表,其中 key 是 fd,value 是監聽事件類型以及使用方自定義拓展數據.

針對 epoll 事件表的數據結構選型,可能部分同學會在心中存有疑惑——爲什麼不基於哈希表而選擇了紅黑樹這種有序表結構呢?針對該問題,我在此僅提供一些個人觀點:

  •  內存連續性:哈希表底層基於桶數組 + 鏈表實現時,桶數組部分在存儲上需要爲連續空間;而紅黑樹節點之間通過鏈表指針關聯,可以是非連續空間,在空間分配上比較靈活

  •  操作性能:雖然哈希表的時間複雜度是 O(1),但是常數係數很高;而紅黑樹雖爲 O(logN),但在 N 不大的情況下(fd 數量相對收斂),O(logN) 相對於 O(1)差距並不大,此時哈希表的高常數係數反而會導致性能瓶頸

2)epoll_ctl

epoll_ctl 指令用於對 epoll 事件表內的 fd 執行變更操作,進一可分爲:

extern int epoll_ctl (int __epfd, int __op, int __fd,
              struct epoll_event *__event) __THROW;

由於 epoll 事件表是紅黑樹結構,所以上述操作時間複雜度都是 O(logN) 級別

3)epoll_wait

執行 epoll_wait 操作時,會傳入一個固定容量的就緒事件列表,當註冊監聽的 io 事件就緒時,內核中會基於事件回調機制將其添加到就緒事件列表中並進行返回.

值得一提的是 epoll_wait 操作還能夠支持非阻塞模式、阻塞模式以及超時模式的多種調用方式.

extern int epoll_wait (int __epfd, struct epoll_event *__events,
               int __maxevents, int __timeout);

我們回頭總結一下epoll 中存在的優勢,這裏主要與 select 指令進行對比(本文中沒有對 select 展開介紹,這部分需要大家自行了解):

凡事都需要辯證看待,在不同的條件與語境下,優劣勢的地位可能會發生轉換. 以 epoll 而言,其主要適用在監聽 fd 基數較大且活躍度不高的場景,這樣 epoll 事件表的空間複用以及 epoll_wait 操作的精準返回才能體現出其優勢 ;反之,如果 fd 數量不大且比較活躍時,反而適合 select 這樣的簡單指令,此時 epoll 核心優勢體現不充分,其底層紅黑樹這樣的複雜結構實現反而徒增累贅.

2 go netpoll 原理

2.1 整體架構設計

在 linux 系統下,golang 底層依賴 epoll 作爲核心基建來實現其 io 模型,但在此基礎上,golang 還設計了一套因地制宜的適配方案,通常被稱作 golang netpoll 框架.

下面我們從流程拆解的方式,來對 netpoll 框架展開介紹:

2.2 net server 流程設計

以啓動 net server 的流程爲例,來觀察其底層與 netpoll 流程的依賴關係:

2.3 因地制宜的策略選型

我在學習初始階段,常常對 golang netpoll 中的 poll_wait 流程和 epoll_wait 流程產生定位混淆,事實上兩者是完全獨立的流程.

在 golang 的 poll_wait 流程中,並沒有直接調用到 epoll_wait,而是通過 gopark 操作實現將當前 g 只爲阻塞態的操作;而真正調用 epoll_wait 操作是 gmp 輪詢調用的 netpoll 流程中,並通常是以非阻塞模式來執行 epoll_wait 指令,在找到就緒的 pollDesc 後,進一步獲取其中存儲的 g 實例,最後通過 goready 操作來喚醒 g.

上述在阻塞方式實現上的差異,正是 golang netpoll 在 epoll 基礎上所作出的最核心的改造項. 在這裏,可能有部分同學可能會產生疑惑,爲什麼 golang 不利用阻塞模式的 epoll_wait 指令來直接控制 g 的阻塞與喚醒呢?

這個問題的答案就是——epoll_wait 做不到. epoll_wait 的調用單元是 thread,及 gmp 中的 m,而非 g. 而我們都知道 golang 是門天然支持高併發的語言,它通過一套 gmp 架構,爲使用方屏蔽了有關線程 thread 的所有細節,保證語言層面的併發粒度都控制在更精細的 g 之上. 因此在 golang io 模型的設計實現中,需要儘可能避免 thread 級別的阻塞,因此當 g 因 io 未就緒而需要阻塞時,應該通過 gopark 實現用戶態下 g 粒度的阻塞,而非簡單地基於阻塞模式進行 epoll_wait 指令的調用.

建構了上述這一點認知後,大家再回頭梳理一遍有關 golang poll_wait 和 net_poll 流程的設計思路,相信大家就能夠釋然了.

然而,到這裏爲止,可能有部分同學又會產生疑問了——在本文 1.1 小節推演 io 多路複用模型時提過,這種輪詢 + 非阻塞 io 的調用模式是存在缺陷的,問題就在於輪詢單元可能因 io 事件未就緒而持續無意義的空轉,最終導致 cpu 資源的浪費.

哈哈上述問題也許只是我個人一廂情願的自說自話,但若確實有同學有在此處拋出和我一樣的問題,那請在此接收我的誇獎,你的思維很 nice,這是一個很好的問題,保持辯證思維是我們在求學一門新知識時應該持有的良好態度.

正如 2.1 小節中所說,驅動 net_poll 流程的時機主要發生在 gmp 調度流程中,因此這個問題的答案是和 gmp 底層原理息息相關的:

● 一方面,p 本就是基於輪詢模型不斷尋找合適的 g 進行調度,而 net_poll 恰好是其尋找 g 的諸多方式的其中一種,因此這個輪詢機制是與 gmp 天然契合的,並非是 golang netpoll 機制額外產生的成本;

● 再者,這種輪詢不是墨守成規,而是隨機應變的. 如果一個 p 經歷了一系列檢索操作後,仍找不到合適的 g 進行調度,那麼它不會無限空轉,而是會適時地進行縮容操作——首先保證全局會留下一個 p 進行 netpoll 留守,其會通過阻塞或超時模式觸發執行 epoll_wait 操作,保證有 io 事件就緒時不產生延遲(具體細節參見 3.8 小節);而在有留守 p 後,其它空閒的 p 會將 m 和 p 自身都置爲 idle 態,讓出 cpu 執行權,等待後續有新的 g 產生時再被重新喚醒

gmp 是整個 golang 知識體系的基石,我也在 23 年初也曾寫過一篇——Golang GMP 原理,不過當時同樣存在視野侷限問題,理解廣度與深度都有所不足,所以這裏也留個預告彩蛋,很快我將會針對 gmp 重啓一個篇章進行查缺補漏,爭取做到溫故知新.

3 go netpoll 源碼

下面我們圍繞着第 2 章中介紹的內容,開啓大家最喜聞樂見的源碼走讀環節.

此處使用的 golang 源碼版本爲 v1.19.3,操作系統爲 linux 系統,netpoll 底層基於 epoll 技術實現.

3.1 核心流程入口

這裏給出簡易版 tcp 服務器框架的實現示例,麻雀雖小五臟俱全,其中包含了 2.2 小節中介紹到的有關net server 幾大核心流程相關的代碼入口:

// 啓動 tcp server 代碼示例
func main(){
    /*
        - 創建 tcp 端口監聽器
            - 創建 socket fd,bind、accept
            - 創建 epoll 事件表(epoll_create)
            - socket fd 註冊到 epoll 事件表(epoll_ctl:add)

    */
    l, _ := net.Listen("tcp",":8080")

    for{
        /*
            - 等待 tcp 連接到達
                - loop + 非阻塞模式調用 accept
                - 若未就緒,則通過 gopark 進行阻塞
                - 等待 netpoller 輪詢喚醒
                     - 檢查是否有 io 事件就緒(epoll_wait——nonblock)
                     - 若發現事件就緒 通過 goready 喚醒 g
                - accept 獲取 conn fd 後註冊到 epoll 事件表(epoll_ctl:add)
                - 返回 conn
        */
        conn, _ := l.Accept()
        // goroutine per conn
        go serve(conn)
    }
}

// 處理一筆到來的 tcp 連接
func serve(conn net.Conn){
    /*
        - 關閉 conn
           - 從 epoll 事件表中移除該 fd(epoll_ctl:remove)
           - 銷燬該 fd
    */
    defer conn.Close()
    var buf []byte
    /*
        - 讀取連接中的數據
           - loop + 非阻塞模式調用 recv (read)
           - 若未就緒,則通過 gopark 進行阻塞
           - 等待 netpoller 輪詢喚醒
                - 檢查是否有 io 事件就緒(epoll_wait——nonblock)
                - 若發現事件就緒 通過 goready 喚醒 g
    */
    _, _ = conn.Read(buf)
    /*
        - 向連接中寫入數據
           - loop + 非阻塞模式調用 writev (write)
           - 若未就緒,則通過 gopark 進行阻塞
           - 等待 netpoller 輪詢喚醒
                - 檢查是否有 io 事件就緒(epoll_wait:nonblock)
                - 若發現事件就緒 通過 goready 喚醒 g
    */
    _, _ = conn.Write(buf)
}

3.2 pollDesc 存儲設計

在 golang netpoll 實現中,pollDesc 是一個重要的類型,定義位於 internel/poll/fd_poll_runtime.go 文件中:

type pollDesc struct {
    runtimeCtx uintptr
}

不同操作系統對 pollDesc 有着不同的底層實現,此處通過 runtimeCtx 指針指向其底層實現類型實例.

本文基於 linux 系統進行源碼走讀,有關 pollDesc 具體底層實現代碼位於runtime/netpoll.go 文件中,實現類型同樣叫做 pollDesc

// Network poller descriptor.
// No heap pointers.
// 網絡 poller 描述符
type pollDesc struct{
    // next 指針,指向其在pollCache 中相鄰的下一個 pollDesc 實例
    link *pollDesc 
    // 關聯的 fd 句柄
    fd   uintptr

    /*
        讀事件狀態標識器. 裏面可能存儲的內容包括:
            - pdReady:標識讀操作已就緒的狀態
            - pdWait:標識 g 阻塞等待讀操作就緒的狀態
            - g:阻塞等待讀操作就緒的  g
            - 0:無內容
    */
    rg atomic.Uintptr// pdReady, pdWait, G waiting for read or nil
    /*
        寫事件狀態標識器. 裏面可能存儲的內容包括:
            - pdReady:標識寫操作已就緒的狀態
            - pdWait:標識 g 阻塞等待寫操作就緒的狀態
            - g:阻塞等待寫操作就緒的  g
            - 0:無內容
    */
    wg atomic.Uintptr// pdReady, pdWait, G waiting for write or nil
    // ...
}

爲避免講解過程中產生歧義,此後我們統一將internel/poll/fd_poll_runtime.go 中的 pollDesc 類稱爲表層pollDescruntime/netpoll.go 文件中的 pollDesc 類則維持稱呼爲pollDesc裏層pollDesc.

在與 epoll 事件表交互前,需要爲每個 fd 分配一個 pollDesc 實例,進入事件表時,fd 作爲 key,pollDesc 則是與之關聯的 value.

在 pollDesc 中包含兩個核心字段——讀/寫事件狀態標識器 rg/wg,其用於標識 fd 的 io 事件狀態以及存儲因 io 事件未就緒而 park 的 g 實例. 後續在 io 事件就緒時,能通過 pollDesc 逆向追溯得到 g 實例,創造將其喚醒的機會.

在存儲結構上,golang 設計了一個名爲 pollCache 的緩衝池結構,用於實現 pollDesc 實例的複用,內部採用一個單向鏈表維繫 pollDesc 之間的拓撲關係.

// pollDesc 緩衝池,用於實現 pollDesc 對象實例的複用
type pollCache struct {
    // 互斥鎖 保證操作的併發安全
    lock  mutex
    // 隊首的 pollDesc 實例
    first *pollDesc
}

pollCache 中包含兩個核心方法,alloc 和 free,分別實現從 cache 中獲取 pollDesc 實例以及將用完的 pollDesc 歸還給 cache 的操作.

// 從 pollCache 中分配得到一個 pollDesc 實例
func (c *pollCache) alloc()*pollDesc {
    lock(&c.lock)
    // 如果 pollCache 爲空,則需要進行初始化
    if c.first ==nil{
         // pdSize = 240
        const pdSize =unsafe.Sizeof(pollDesc{})
        // const pollBlockSize = 4 * 1024
        n := pollBlockSize / pdSize
        // ...

        // Must be in non-GC memory because can be referenced
        // only from epoll/kqueue internals.
        // 分配指定大小的內存空間
        mem := persistentalloc(n*pdSize,0,&memstats.other_sys)
        // 完成指定數量  pollDesc 的初始化
        for i :=uintptr(0); i < n; i++{
            pd :=(*pollDesc)(add(mem, i*pdSize))
            pd.link = c.first
            c.first = pd
        }
    }
    // 取出 pollCache 隊首元素
    pd := c.first
    // pollCache 隊首指針指向下一個元素
    c.first = pd.link
    lockInit(&pd.lock, lockRankPollDesc)
    unlock(&c.lock)
    return pd
}
// 釋放一個 pollDesc 實例,將其放回到 pollCache 中
func (c *pollCache) free(pd *pollDesc) {
    lock(&c.lock)
    // 調整指針指向原本 pollCache 中的隊首元素
    pd.link = c.first
    // 成爲 pollCache 新的隊首
    c.first = pd
    unlock(&c.lock)
}

3.3 socket 創建流程

下面以net.Listen 方法爲入口,沿着創建 socket fd 的流程進行源碼走讀,該過程中涉及的方法調用棧關係如下:

ftS0aF

該流程中最核心的方法爲位於:net/sock_posix.go 文件的 socket 和 netFD.listenStream 方法,其核心執行步驟包括:

// socket returns a network file descriptor that is ready for
// asynchronous I/O using the network poller.
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn)error)(fd *netFD, err error){
// 通過 syscall socket,以 nonblock 模式創建 socket fd
    s, err := sysSocket(family, sotype, proto)
    fd, err = newFD(s, family, sotype, net)

    // ...
    /*
        - 通過 syscall bind 將 socket 綁定到指定地址
        - 通過 syscall listen 發起對 socket 監聽
        - 完成 epoll 事件表創建(全局只執行一次)
        - 將 socket fd 註冊到 epoll 事件表中,監聽讀寫就緒事件
    */
    fd.listenStream(laddr, listenerBacklog(), ctrlFn)}
    // ...
}
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn)error)error{
    // ...
    // 通過 syscall bind 將 socket 綁定到指定地址
    syscall.Bind(fd.pfd.Sysfd, lsa)
    // ...
    // 通過 syscall listen 發起對 socket 監聽
    listenFunc(fd.pfd.Sysfd, backlog)
    // ...
    /*
        - 完成 epoll 事件表創建(全局只執行一次)
        - 將 socket fd 註冊到 epoll 事件表中,監聽讀寫就緒事件
    */
     fd.init()
    // ...
 }

3.4 poll_init 流程

順着 3.3 小節的流程繼續往下,在表層 pollDesc 的init 方法中,會首先確保全局必須調用一次 poll_init 流程,完成 epoll 事件表的初始化,其方法調用棧如下:

E5cmfs

在表層 pollDesc.init 方法中,會通過 sync.Once 保證執行一次 runtime_pollServerInit 方法,該方法在 linux 系統下的實現爲位於 runtime/netpoll.go 中的 runtime.poll_runtime_pollServerInit 方法,最終通過調用 netpollinit 方法,執行epoll_create 指令,完成 epoll 事件表的創建:

// 單例工具
var serverInit sync.Once

func (pd *pollDesc) init(fd *FD) error {
    // 完成 epoll 事件表的創建——全局只執行一次
    serverInit.Do(runtime_pollServerInit)
    // ...
}

func runtime_pollServerInit()
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
    // ...
}
func netpollinit() {
    // 通過 epoll_create 操作創建 epoll 事件表
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    // ...
    /*
        創建 pipe 管道,用於接收信號,如程序終止:
            - r:信號接收端,會註冊對應的 read 事件到 epoll 事件表中
            - w:信號發送端,當有信號到達時,會往 w 中發送信號,並對 r 產生讀就緒事件
    */
    r, w, errno := nonblockingPipe()
    // 在 epoll 事件表中註冊監聽 r 的讀就緒事件
    ev := epollevent{
        events: _EPOLLIN,
}
    *(**uintptr)(unsafe.Pointer(&ev.data))=&netpollBreakRd
    errno = epollctl(epfd, _EPOLL_CTL_ADD, r,&ev)
    // ...
    // 使用全局變量緩存 pipe 的讀寫端
    netpollBreakRd =uintptr(r)
    netpollBreakWr =uintptr(w)
}

3.5 poll_open 流程

表層 pollDesc.init 方法中,在確保已完成 poll_init 流程後,就會執行 poll_open 流程,將當前 fd 及 pollDesc 註冊到 epoll 事件表中,方法調用棧如下:

V91qt6

在表層 pollDesc.init 方法中,執行完 poll_open 流程後,會獲取到裏層返回的 pollDesc 實例,將其引用存放在 runtimeCtx 字段中:

func (pd *pollDesc) init(fd *FD)error{
    // ...
    // 將 fd 註冊到 epoll 事件表中
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    // ...
    // 通過 runtimeCtx 關聯與之映射的 netpoll.pollDesc
    pd.runtimeCtx = ctx
}

func runtime_pollOpen(fd uintptr)(uintptr,int)

runtime_pollOpen 方法在 linux 系統下的實現爲位於 runtime/netpoll.go 中的 runtime.poll_runtime_pollOpen 方法,其中會從 pollCache 中獲取一個 pollDesc 實例,並調用 netpollopen方法,執行epoll_ctl(ADD)指令將其添加到 epoll 事件表中:

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr)(*pollDesc,int){
    // 從 pollcache 中分配出一個 pollDesc 實例
    pd := pollcache.alloc()
    lock(&pd.lock)
    // pollDesc 與 fd 關聯
    pd.fd = fd
    // ...
    /*
        讀就緒事件的狀態標識器初始化  
            - 0:無動作
            - 1:讀就緒
            - 2:阻塞等待讀就緒
    */
    pd.rg.Store(0)
    // ...
    /*
        寫就緒事件的狀態標識器初始化  
            - 0:無動作
            - 1:寫就緒
            - 2:阻塞等待寫就緒
    */
    pd.wg.Store(0)
    // ...
    unlock(&pd.lock)
    // ...
    // 將 fd 添加進入 epoll 事件表中
    errno := netpollopen(fd, pd)
    // ...
    // 返回 pollDesc實例
    return pd,0
}

值得一提的是,golang 在執行epoll_ctl(ADD)指令時,會同時將讀寫就緒事件(EPOLLIN/EPOLLOUT)設爲 fd 的監聽事件類型,而後續在 netpoll 輪詢環節中,則會通過pollDesc 的 rg 和wg 來甄別出 g 關心的具體事件類型究竟是讀事件還是寫事件.

func netpollopen(fd uintptr, pd *pollDesc) int32{
    /*
        通過 epollctl 操作,在 epoll 事件表中註冊針對 fd 監聽事件
          - 操作類型宏指令:_EPOLL_CTL_ADD —— 添加 fd 並註冊監聽事件
          - 事件類型:epollevent.events
                - _EPOLLIN:監聽讀就緒事件
                - _EPOLLOUT:監聽寫就緒事件
                - _EPOLLRDHUP:監聽中斷事件
                - _EPOLLET:採用 edge trigger 邊緣觸發模式進行監聽
          - 回調數據:epollevent.data —— pollDesc 實例指針
    */
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data))= pd
    return-epollctl(epfd, _EPOLL_CTL_ADD,int32(fd),&ev)
}

接下來梳理一下,有哪些流程中會觸發到poll open流程呢?

首先是 net.Listen 流程,在 socket fd 創建完成後,需要通過 poll open 流程將其註冊到 epoll 事件表中,完整的調用鏈路如下:

vtFDPU

接下來是在 net.Listener.Accept 流程中,當 accept 得到新連接後,會將連接封裝成表層 pollDesc 實例,並執行 poll open 流程將其註冊到 epoll 事件表中:

gkiKD2

func (fd *netFD) accept()(netfd *netFD, err error){
    // 通過 syscall accept 接收到來的 conn fd
    d, rsa, errcall, err := fd.pfd.Accept()
    // ...
    // 封裝到來的 conn fd
    netfd, err = newFD(d, fd.family, fd.sotype, fd.net)
    // 將 conn fd 註冊到 epoll 事件表中
    err = netfd.init()
    // ...
    return netfd,nil
}

3.6 poll_close 流程

當一筆 conn 要被關閉時,會執行 poll close 流程,此時會通過表層 pollDesc的 runtimeCtx 字段獲取到裏層 pollDesc 的引用,並通過 epoll_ctl(DEL)指令實現從 epoll 事件表中移除指定 fd 及 pollDesc 的效果. 其核心方法調用棧如下:

0ZR6pr

runtime_pollClose 方法在 linux 系統下的實現爲位於 runtime/netpoll.go 中的 runtime.poll_runtime_pollClose 方法,其中會調用 epoll_ctl(DEL)指令將 fd 從 epoll 事件表中刪除並將 pollDesc 實例歸還到 pollCache 中.

func (pd *pollDesc) close() {
    // 通過 runtimeCtx 映射到netpoll.pollDesc
    runtime_pollClose(pd.runtimeCtx)
    pd.runtimeCtx = 0
}

func runtime_pollClose(ctx uintptr)
//go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose
func poll_runtime_pollClose(pd *pollDesc) {
    // 通過 epoll_ctl_del 操作,從 epoll 事件表中移除指定 fd
    netpollclose(pd.fd)
    // 從 pollCache 中移除對應的 pollDesc 實例
    pollcache.free(pd)
}
func netpollclose(fd uintptr) int32 {
    var ev epollevent
    return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev)
}

3.7 poll_wait 流程

接下來是 poll_wait 操作,其最終會通過 gopark 操作來使得當前 g 陷入到用戶態阻塞,源碼方法調用棧如下:

tPJXix

在表層 pollDesc.wait 方法中,會通過runtimeCtx獲取到裏層 pollDesc 引用,進而調用 linux 系統下位於 runtime/netpoll.go 文件的 poll_runtime_pollWait 方法,執行 epoll_ctl(DEL)指令.

/*
    - 標識出當前 g 關心的 io 事件
         - mode:r——等待讀就緒事件 w——等待寫就緒事件
    - gopark 當前g 陷入用戶態阻塞
*/
func (pd *pollDesc) wait(mode int, isFile bool)error{
    // 確保已經關聯映射到某個 netpoll.pollDesc
    if pd.runtimeCtx ==0{
        return errors.New("waiting for unsupported file type")
    }
    res := runtime_pollWait(pd.runtimeCtx, mode)
    // ...
}

func runtime_pollWait(ctx uintptr, mode int) int
// poll_runtime_pollWait, which is internal/poll.runtime_pollWait,
// waits for a descriptor to be ready for reading or writing,
// according to mode, which is 'r' or 'w'.

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int)int{
    // ...
    for !netpollblock(pd,int32(mode),false){
        // ...  
    }
    // ...
}

在該流程最底層的 netpollblock 方法中,針對於依賴 io 事件未就緒的 g,會通過 gopark 操作令其陷入用戶態阻塞中,在 gopark 方法中會閉包調用 netpollblockcommit 方法,其中會根據 g 關心的事件類型將 g 實例存儲在 pollDesc 的 rg 或 wg 容器中.

需要注意,針對於同一個 fd 的同種事件類型,同一時刻有且只能有一個 g 被掛載在事件狀態標識器中,參見方法註釋

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// can hold only a single waiting goroutine for each mode.
/*
    針對某個 pollDesc 實例,監聽指定的mode 就緒事件
        - 返回true——已就緒  返回false——因超時或者關閉導致中斷
        - 其他情況下,會通過 gopark 操作將當前g 阻塞在該方法中
*/
func netpollblock(pd *pollDesc, mode int32, waitio bool)bool{
// 根據mode判斷關心的是讀就緒事件r 還是寫就緒事件w,取得對應的狀態標識器
    gpp :=&pd.rg
    if mode =='w'{
        gpp =&pd.wg
    }

    // loop 自旋模型
    for{
    // const pdRead = 1 
        /*
             關心的 io事件已就緒,則 cas更新狀態標識器,並直接返回
        */
        if gpp.CompareAndSwap(pdReady,0){
            returntrue
        }
        // const pdWait = 2
        /*
             關心的 io事件未就緒,則 cas更新狀態標識器爲阻塞等待狀態,並打破循環        
        */
        if gpp.CompareAndSwap(0, pdWait){
            break
        }
        // ...
    }

    // ...
    // gopark 進入阻塞態
    gopark(netpollblockcommit,unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet,5)

    // 當前g 從阻塞態被喚醒,把pollDesc 狀態標識器置爲 0,並判斷是否因爲所關心io 事件就緒而被喚醒
    old := gpp.Swap(0)
    // ...
    return old == pdReady
}
// 將 gpp 狀態標識器的值由 pdWait 修改爲當前 g 
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
    r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
    if r {.
        atomic.Xadd(&netpollWaiters, 1)
    }
    return r
}

接下來觀察會觸發 poll_wait 的流程.

首先是在 listener.Accept 流程中,如果 socket fd 下尚無連接到達,則會執行 poll wait 將當前 g 阻塞並掛載到 socket fd 對應 pollDesc 的 rg 中:

xUBRUy

// Accept wraps the accept network call.
func (fd *FD)Accept()(int, syscall.Sockaddr,string,error){
    // ...
    for{
        // 以nonblock 模式發起一次 syscall accept 嘗試接收到來的 conn
        s, rsa, errcall, err := accept(fd.Sysfd)
        // 接收conn成功,直接返回結果
        if err ==nil{
            return s, rsa,"", err
        }
        switch err {
            // 中斷類錯誤直接忽略
            case syscall.EINTR:
                    continue
            // 當前未有到達的conn 
            case syscall.EAGAIN:
            // 走入 poll_wait 流程,並標識關心的是 socket fd 的讀就緒事件
            // (當conn 到達時,表現爲 socket fd 可讀)
                if fd.pd.pollable(){
                // 倘若讀操作未就緒,當前g 會 park 阻塞在該方法內部,直到因超時或者事件就緒而被 netpoll ready 喚醒
                    if err = fd.pd.waitRead(fd.isFile); err ==nil{
                        continue
                    }
                }
                // ...
        }
        // ...
    }
}
// 指定 mode 爲 r 標識等待的是讀就緒事件,然後走入更底層的 poll_wait 流程
func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}

其次是在 conn.Read 流程中,如果 conn fd 下讀操作尚未就緒(尚無數據到達),則會執行 poll wait 將當前 g 阻塞並掛載到 conn fd 對應 pollDesc 的 rg 中:

2ot11t

// Read implements io.Reader.
func (fd *FD)Read(p []byte)(int,error){
    // ... 
    for{
        // 以非阻塞模式執行一次syscall read 操作 
        n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
        if err !=nil{
            n =0
            // 走入 poll_wait 流程,並標識關心的是該 fd 的讀就緒事件
            if err == syscall.EAGAIN && fd.pd.pollable(){
            // 倘若讀操作未就緒,當前g 會 park 阻塞在該方法內部,直到因超時或者事件就緒而被 netpoll ready 喚醒
                if err = fd.pd.waitRead(fd.isFile); err ==nil{
                    continue
                }
            }
        }
        err = fd.eofError(n, err)
        return n, err
    }
}

最後是 conn.Write 流程,如果 conn fd 下寫操作尚未就緒(緩衝區空間不足),則會執行 poll wait 將當前 g 阻塞並掛載到 conn fd 對應 pollDesc 的wg中:

lrcZOJ

// Write implements io.Writer.
func (fd *FD)Write(p []byte)(int,error){
    // ... 
    for{
    // ...
    // 以非阻塞模式執行一次syscall write操作
        n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
        if n >0{
            nn += n
        }
        // 緩衝區內容都已寫完,直接退出
        if nn ==len(p){
            return nn, err
        }

    // 走入 poll_wait 流程,並標識關心的是該 fd 的寫就緒事件
    if err == syscall.EAGAIN && fd.pd.pollable(){
        // 倘若寫操作未就緒,當前g 會 park 阻塞在該方法內部,直到因超時或者事件就緒而被 netpoll ready 喚醒
        if err = fd.pd.waitWrite(fd.isFile); err ==nil{
            continue
        }
    }
    // ...  
    
}
// 指定 mode 爲 r 標識等待的是讀就緒事件,然後走入更底層的 poll_wait 流程
func (pd *pollDesc) waitWrite(isFile bool) error {
    return pd.wait('w', isFile)
}

3.8 net_poll 流程

最後壓軸登場的是尤其關鍵的 net poll 流程.

3.7 小節中交待了,當 g 發現關心的 io 事件未就緒時,會通過 gopark 操作將自身陷入阻塞,並且將 g 掛載在 pollDesc 的 rg/wg 中.

而本小節介紹的 net_poll 流程就負責輪詢獲取已就緒 pollDesc 對應的 g,將其返回給上游的 gmp 調度系統,對其進行喚醒和調度.

在常規的 net poll 流程中,會採用非阻塞模式執行 epoll_wait 操作,但唯獨在 p 大面積空閒時,全局會有一個 p 負責留守 net_poll,此時其會以阻塞或超時模式執行 net_poll 流程並以同樣的模式調用 epoll_wait 指令.

net_poll 流程的調用棧如下,其本身只用於返回達到就緒條件的 g list,具體的喚醒和調度操作是由上游執行的:

hzg3aJ

net_poll 流程入口位於 runtime/netpoll_epoll.go 文件中,其中有幾個關鍵點我們作個概述,其他內容大家參考源碼以及其中給出的註釋:

// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
/*
    - netpoll 流程用於輪詢檢查是否有就緒的 io 事件
    - 如果有就緒 io 事件,還需要檢查是否有 pollDesc 中的 g 關心該事件
    - 找到所有關心該就緒 io 事件的 g,添加到 list 中返回給上游進行 goready 喚醒
*/
func netpoll(delay int64) gList {
    /*
        根據傳入的 delay 參數,決定調用 epoll_wait 的模式
            - delay < 0:設爲 -1 阻塞模式(在 gmp 調度流程中,如果某個 p 遲遲獲取不到可執行的 g 時,會通過該模式,使得 thread 陷入阻塞態,但該情況全局最多僅有一例)
            - delay = 0:設爲 0 非阻塞模式(通常情況下爲此模式,包括 gmp 常規調度流程、gc 以及全局監控線程 sysmon 都是以此模式觸發的 netpoll 流程)
            - delay > 0:設爲超時模式(在 gmp 調度流程中,如果某個 p 遲遲獲取不到可執行的 g 時,並且通過 timer 啓動了定時任務時,會令 thread 以超時模式執行 epoll_wait 操作)
    */
    var waitms int32
    if delay <0{
        waitms =-1
    }elseif delay ==0{
        waitms =0
    // 針對 delay 時長取整
    }elseif delay <1e6{
        waitms =1
    }elseif delay <1e15{
        waitms =int32(delay /1e6)
    }else{
    // 1e9 ms == ~11.5 days.
        waitms =1e9
    }
    // 一次最多接收 128 個 io 就緒事件 
    var events [128]epollevent
retry:
    // 以指定模式,調用 epoll_wait 指令
    n := epollwait(epfd,&events[0],int32(len(events)), waitms)
    // ...

    // 遍歷就緒的每個 io 事件 
    var toRun gList
    for i :=int32(0); i < n; i++{
        ev :=&events[i]
        if ev.events ==0{
            continue
        }

        // pipe 接收端的信號量處理
        if*(**uintptr)(unsafe.Pointer(&ev.data))==&netpollBreakRd {
            // ...
        }

        /*
             根據 io 事件類型,標識出 mode:
                 - EPOLL_IN -> r;
                 - EPOLL_OUT -> w;
                 - 錯誤或者中斷事件 -> r & w;
        */
        var mode int32
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR)!=0{
            mode +='r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR)!=0{
            mode +='w'
        }
        // 根據 epollevent.data 獲取到監聽了該事件的 pollDesc 實例
        if mode !=0{
            pd :=*(**pollDesc)(unsafe.Pointer(&ev.data))
        // ...   
        // 嘗試針對對應 pollDesc 進行喚醒操作
            netpollready(&toRun, pd, mode)
        }
    }
    return toRun
}
/*
    epollwait 操作:
        - epfd:epoll 事件表 fd 句柄
        - ev:用於承載就緒 epoll event 的容器
        - nev:ev 的容量
        - timeout:
            - -1:阻塞模式
            - 0:非阻塞模式:
            - >0:超時模式. 單位 ms
        - 返回值 int32:就緒的 event 數量
*/
func epollwait(epfd int32, ev *epollevent, nev, timeout int32) int32
// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
/*
    根據 pd 以及 mode 標識的 io 就緒事件,獲取需要進行 ready 喚醒的 g list
    對應 g 會存儲到 toRun 這個 list 容器當中
*/
func netpollready(toRun *gList, pd *pollDesc, mode int32){
    var rg, wg *g
    if mode =='r'|| mode =='r'+'w'{
    // 倘若到達事件包含讀就緒,嘗試獲取需要 ready 喚醒的 g
        rg = netpollunblock(pd,'r',true)
    }
    if mode =='w'|| mode =='r'+'w'{
    // 倘若到達事件包含寫就緒,嘗試獲取需要 ready 喚醒的 g
        wg = netpollunblock(pd,'w',true)
    }
    // 找到需要喚醒的 g,添加到 glist 中返回給上層
    if rg !=nil{
        toRun.push(rg)
    }
    if wg !=nil{
        toRun.push(wg)
    }
}
/*
    根據指定的就緒io 事件類型以及 pollDesc,判斷是否有 g 需要被喚醒. 若返回結果非空,則爲需要喚醒的 g
*/
func netpollunblock(pd *pollDesc, mode int32, ioready bool)*g {
// 根據 io 事件類型,獲取 pollDesc 中對應的狀態標識器
    gpp :=&pd.rg
    if mode =='w'{
        gpp =&pd.wg
    }

    for{
        // 從 gpp 中取出值,此時該值應該爲調用過 park 操作的 g
        old := gpp.Load()
        // ...  
        if ioready {
            new= pdReady
        }
        // 通過 cas 操作,將 gpp 值由 g 置換成 pdReady
        if gpp.CompareAndSwap(old,new){
            // 返回需要喚醒的 g   
            return(*g)(unsafe.Pointer(old))
        }
    }
}

那麼,net_poll 流程究竟會在哪個環節中被觸發呢?我們同樣通過源碼加以佐證.

1)gmp 調度流程

這是屬於最常規的 net poll 觸發流程,方法調用棧如下:

NOHQav

runtime.findrunnable 方法用於給 p 尋找合適的 g 進行調度. 檢索優先級可以參照下方給出的代碼註釋,這裏單獨強調兩個點:

// gmp 核心調度流程:g0 爲當前 p 找到下一個調度的  g
    /*
        pick g 的核心邏輯:
             1)每調度 61 次,需要專門嘗試處理一次全局隊列(防止飢餓)
             2)嘗試從本地隊列中獲取 g
             3)嘗試從全局隊列中獲取 g
             4)以【非阻塞模式】調度 netpoll 流程,獲取所有需要喚醒的 g 進行喚醒,並獲取其中的首個g
             5)從其他 p 中竊取一半的 g 填充到本地隊列
             6)仍找不到合適的 g,則協助 gc 
             7)以【阻塞或者超時】模式,調度netpoll 流程(全局僅有一個 p 能走入此分支)
             8)當前m 添加到全局隊列的空閒隊列中,停止當前 m
    */

func findRunnable()(gp *g, inheritTime, tryWakeP bool){
    // ..
    /*
        同時滿足下述三個條件,發起一次【非阻塞模式】的 netpoll 流程:
            - epoll事件表初始化過
            - 有 g 在等待io 就緒事件
            - 沒有空閒 p 在以【阻塞或超時】模式發起 netpoll 流程
    */
    if netpollinited()&& atomic.Load(&netpollWaiters)>0&& atomic.Load64(&sched.lastpoll)!=0{
        // 以非阻塞模式發起一輪 netpoll,如果有 g 需要喚醒,一一喚醒之,並返回首個 g 給上層進行調度
        if list := netpoll(0);!list.empty(){// non-blocking
            // 獲取就緒 g 隊列中的首個 g
            gp := list.pop()
            // 將就緒 g 隊列中其餘 g 一一置爲就緒態,並添加到全局隊列
            injectglist(&list)
            // 把首個g 也置爲就緒態
            casgstatus(gp,_Gwaiting,_Grunnable)
            // ...   
            //返回 g 給當前 p進行調度
            return gp,false,false
        }
    }

    // ...
    /*
        同時滿足下述三個條件,發起一次【阻塞或超時模式】的 netpoll 流程:
            - epoll事件表初始化過
            - 有 g 在等待io 就緒事件
            - 沒有空閒 p 在以【阻塞或超時】模式發起 netpoll 流程
    */
    if netpollinited()&&(atomic.Load(&netpollWaiters)>0|| pollUntil !=0)&& atomic.Xchg64(&sched.lastpoll,0)!=0{
    // 默認爲阻塞模式  
        delay :=int64(-1)
        // 存在定時時間,則設爲超時模式
        if pollUntil !=0{
            delay = pollUntil - now
        // ...   
        }
        // 以【阻塞或超時模式】發起一輪 netpoll
        list := netpoll(delay)// block until new work is available 
    }
    // ...    
}

2)gc 併發標記流程:

爲了避免因 gc 而導致 io 事件的處理產生延時或者阻塞,當有 p 以空閒模式 idleMode(當前 p 因找不到合適的 g 進行調度,而選擇主動參與 gc 協作) 執行 gc 併發標記流程時,會間隔性地以非阻塞模式觸發 net_poll 流程:

Jm9OwE

// gc 
func gcDrain(gcw *gcWork, flags gcDrainFlags){
    // ...
    // 判斷是否以 idle 模式執行 gc 標記流程 
    idle := flags&gcDrainIdle !=0

    // ... 
    var check func()bool
    // ...
    if idle {
        check = pollWork
    }

    for(...some condition){
        // do something...
        // do check function
        if check !=nil&& check(){
            break
        }
        // ...
    }
    // ...
}
func pollWork() bool{
    // ...
    // 若全局隊列或 p 的本地隊列非空,則提前返回
    /*
        同時滿足下述三個條件,發起一次【非阻塞模式】的 netpoll 流程:
            - epoll事件表初始化過
            - 有 g 在等待io 就緒事件
            - 沒有空閒 p 在以【阻塞或超時】模式發起 netpoll 流程
    */
    if netpollinited()&& atomic.Load(&netpollWaiters)>0&& sched.lastpoll !=0{
    // 所有取得 g 更新爲就緒態並添加到全局隊列
        if list := netpoll(0);!list.empty(){
            injectglist(&list)
            return true
        }
    }
    // ...
}

此外,當程序在經歷過一次 STW(stop the world)後,隨後到來的 start the world 流程中也會執行 net_poll 操作,同樣也是採用非阻塞模式:

b6pMLF

func startTheWorldWithSema(emitTraceEvent bool) int64{
    // 斷言世界已停止
    assertWorldStopped()
    // ...
    // 如果 epoll 事件表初始化過,則以非阻塞模式執行一次 netpoll
    if netpollinited(){
    // 所有取得的 g 置爲就緒態並添加到全局隊列
        list := netpoll(0)// non-blocking
        injectglist(&list)
    }
    // ...
}

3)sysmon 流程:

在 golang 程序啓動時,有一個全局唯一的 sysmon thread 負責執行監控任務,比如因 g 執行過久或者 m syscall 時間過長而發起的搶佔調度流程都是由這個 sysmon 負責的. 在其中也會每隔 10 ms 發起一次非阻塞的 net_poll 流程:

i2jhEp

// The main goroutine.
func main(){
// ...
// 新建一個 m,直接運行 sysmon 函數
    systemstack(func(){
        newm(sysmon,nil,-1)
    })

    // ...
}

// 全局唯一監控線程的執行函數
func sysmon(){
// ...
for{
// ...
/*
        同時滿足下述三個條件,發起一次【非阻塞模式】的 netpoll 流程:
            - epoll事件表初始化過
            - 沒有空閒 p 在以【阻塞或超時】模式發起 netpoll 流程
            - 距離上一次發起 netpoll 流程的時間間隔已超過 10 ms
    */
        lastpoll :=int64(atomic.Load64(&sched.lastpoll))
        if netpollinited()&& lastpoll !=0&& lastpoll+10*1000*1000< now {
            // 以非阻塞模式發起 netpoll
            list := netpoll(0)// non-blocking - returns list of goroutines
            // 獲取到的  g 置爲就緒態並添加到全局隊列中
            if!list.empty(){
                // ...
                injectglist(&list)
                // ...
            }
        }
    // ...  
    }
}

4 總結

祝賀各位,至此我們已完成本系列的首篇內容的學習,在本篇中,我們介紹的知識點包括:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/_FTvpvLIWfYzgNhOJgKypA