IO 多路複用與 Go 網絡庫的實現
IO 多路複用
Unix 網絡編程裏總結了 5 種 IO 模型,其中只有異步 IO 模型是異步 IO,因爲只有異步 IO 的 recvfrom 是不阻塞進程的。首先每個 IO 讀操作都包括以下兩個過程,書裏說的異步就是指發起讀操作時,數據從內核拷貝到用戶空間是否要阻塞進程這個問題。
-
等待網絡數據到達網卡並讀取到內核緩衝區,數據準備好。
-
從內核緩衝區複製數據到進程空間。
另外阻塞 IO 模型和阻塞 IO 調用是不同的。比如非阻塞 IO 模型當中其實包含了多次非阻塞 IO 調用和一次阻塞 IO 調用,非阻塞 IO 調用是指在內核無數據準備好時,recvfrom 不阻塞進程直接返回,在內核有數據時發起的 recvfrom 其實還是阻塞的。
io
IO 複用是指:進程阻塞於 select,等待多個 IO 中的任一個變爲可讀,select 調用返回,通知相應 IO 可以讀。它可以支持單線程響應多個請求這種模式。
它本質上是同步 I/O,因爲他們都需要在讀寫事件就緒後自己負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步 I/O 則無需自己負責進行讀寫,只需要把 buffer 提交給內核,內核會負責把數據從內核拷貝到用戶空間,然後告訴你已可讀。
select & poll
select poll epoll 這三個是常用的 IO 複用的系統調用。select 和 poll 本質相同,都對同時監聽的 fd 有數量限制,因爲他們涉及大量文件描述符的數組被整體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,因此它的開銷隨着文件描述符數量的增加而線性增大。
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
-
每次調用 select,都需要把 fd 集合從用戶態拷貝到內核態,這個開銷在 fd 很多時會很大
-
同時每次調用 select 都需要在內核遍歷傳遞進來的所有 fd,把當前進程加入 fd 對應的設備等待隊列中
-
select 支持的文件描述符數量太小了,默認是 1024
epoll
epoll 涉及三個系統調用,epoll 用來創建 epollfd 文件描述符(之後要 close),epoll_ctl 用來註冊每個描述符及其等待的事件,epoll_wait 監聽 epollfd 上註冊的事件,內核負責把數據複製到這個 events 數組中。
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
-
將 fd 從用戶態到內核態拷貝的過程統一到 epoll_ctl,而不是在 epoll_wait 中每次循環調用都拷貝一次。
-
在 epoll_ctl 時把當前進程掛到 fd 對應的設備等待隊列中,併爲每個 fd 指定一個回調函數。當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調函數,而這個回調函數會把就緒的 fd 加入一個就緒鏈表。epoll_wait 的工作實際上就是在這個就緒鏈表中查看有沒有就緒的 fd,並喚醒在 epoll_wait 中進入睡眠的進程。
-
epoll 沒有最多 fd 的這個限制,它所支持的 fd 上限是最大可以打開文件的數目。
non-blocking IO
brpc-io.md 中有一段對兩種 blocking 的對比,覺得說的很好,轉載一下。
linux 一般使用 non-blocking IO 提高 IO 併發度。當 IO 併發度很低時,non-blocking IO 不一定比 blocking IO 更高效,因爲後者完全由內核負責,而 read/write 這類系統調用已高度優化,效率顯然高於一般得多個線程協作的 non-blocking IO。
但當 IO 併發度愈發提高時,blocking IO 阻塞一個線程的弊端便顯露出來:內核得不停地在線程間切換才能完成有效的工作,一個 cpu core 上可能只做了一點點事情,就馬上又換成了另一個線程,cpu cache 沒得到充分利用,另外大量的線程會使得依賴 thread-local 加速的代碼性能明顯下降,如 tcmalloc,一旦 malloc 變慢,程序整體性能往往也會隨之下降。
而 non-blocking IO 一般由少量 event dispatching 線程和一些運行用戶邏輯的 worker 線程組成,這些線程往往會被複用(換句話說調度工作轉移到了用戶態),event dispatching 和 worker 可以同時在不同的核運行(流水線化),內核不用頻繁的切換就能完成有效的工作。線程總量也不用很多,所以對 thread-local 的使用也比較充分。這時候 non-blocking IO 就往往比 blocking IO 快了。
不過 non-blocking IO 也有自己的問題,它需要調用更多系統調用,比如 epoll_ctl,由於 epoll 實現爲一棵紅黑樹,epoll_ctl 並不是一個很快的操作,特別在多核環境下,依賴 epoll_ctl 的實現往往會面臨棘手的擴展性問題。non-blocking 需要更大的緩衝,否則就會觸發更多的事件而影響效率。non-blocking 還得解決不少多線程問題,代碼比 blocking 複雜很多。
服務器常用編程模型
以下是深入理解計算機系統裏經典的 echo server。它的問題是這樣同步阻塞的服務端無法處理來自客戶端的併發請求。解決的方法有兩個:使用多線程,或者 IO 多路複用。多線程的問題是創建進程或線程需要時間和空間,連接多了之後,切換開銷很大,佔用內存多,多線程修改共享數據產生的競爭條件需要加鎖,容易造成死鎖。IO 複用的問題是不能充分利用多核 CPU,且它通常要求事件的回調函數必須是非阻塞的。
-
多線程:accept 了之後 new thread or process 來處理這個 connfd 上的請求。
-
IO 多路複用:non-blocking IO+IO multiplexing 這種 Reactor 模式。基本結構是一個 event loop,以事件驅動和事件回調的方式實現業務邏輯。
server:socket + bind + listen + accept + ——- + close client: socket + connect + ——— + close
//bind&listen
while(1) {
connfd = accept(listenfd, (struct) sockaddr *) &clientaddr, &clientlen);
// read(connfd, buf, BUFSIZE);
// write(connfd, buf, strlen(buf));
close(connfd);
}
reactor
實際上目前的高性能服務器很多都用的是 reactor 模式,即 non-blocking IO+IO multiplexing 的方式。通常主線程只做 event-loop,通過 epoll_wait 等方式監聽事件,而處理客戶請求是在其他工作線程中完成。小夥伴有個測試經羣效應的例子 thundering_herd_problem 就是用 epoll 來處理連接和請求,可參考。下圖來自 Golang 網絡層實現總結的傳統網絡實現。
-
server 端在
bind&listen
後,將 listenfd 註冊到 epollfd 中,最後進入epoll_wait
循環。循環過程中若有在 listenfd 上的 event 則調用socket.accept
,並將 connfd 加入到 epollfd 的 IO 複用隊列。 -
當 connfd 上有數據到來或寫緩衝有數據可以出發 epoll_wait 的返回,這裏讀寫 IO 都是非阻塞 IO,這樣纔不會阻塞 epoll 的下一個循環。然而,這樣容易割裂業務邏輯,不易理解和維護。
-
read 後數據進行解碼並放入隊列中,等待工作線程處理。
accept 連接以及 conn 上讀寫若是在主線程完成,則要求是非阻塞 IO,因爲 IO 操作不能阻塞 epoll_wait 循環。實際上 event loop 可能也可以是多線程的,只是單個線程裏只有一個 epoll_wait。
io
goroutine
Go 因爲有 goroutine,所以可以採用多協程來解決併發問題。accept 連接後,將連接丟給 goroutine 處理後續的讀寫操作。在開發者看到的這個 goroutine 中業務邏輯是同步的,也不用考慮 IO 是否阻塞。
func main() {
ln, err := net.Listen("tcp", ":8080")
for {
conn, _ := ln.Accept()
go echoFunc(conn)
}
}
可以肯定的是,在 linux 上 Go 語言寫的網絡服務器也是採用的 epoll 作爲最底層的數據收發驅動,Go 語言網絡的底層實現中同樣存在 “上下文切換” 的工作,只是這個切換工作由 runtime 的調度器來做了,減少了程序員的負擔。
go net core
golang 的 net 如何實現對 epoll 的封裝,在使用上看上去同步編程的呢,這是本節的問題。總結來說,所有的網絡操作都以網絡描述符 netFD 爲中心實現。netFD 與底層 PollDesc 結構綁定,當在一個 netFD 上讀寫遇到 EAGAIN 錯誤時,就將當前 goroutine 存儲到這個 netFD 對應的 PollDesc 中,同時將 goroutine 給 park 住,直到這個 netFD 上再次發生讀寫事件,纔將此 goroutine 給 ready 激活重新運行。顯然,在底層通知 goroutine 再次發生讀寫等事件的方式就是 epoll 等事件驅動機制。
netFD
服務端通過 listen 建立起的Listener
是個實現了 Accept Close 等方法的接口。通過 listener 的Accept
方法返回的 Conn 是一個實現了 Read Write 等方法的接口。Listener 和 Conn 的實現都包含一個網絡文件描述符 netFD,它其中包含一個重要的數據結構 pollDesc,它是底層事件驅動的封裝。
-
服務端的 netFD 在
listen
時會創建 epoll 的實例,並將 listenFD 加入 epoll 的事件隊列 -
netFD 在
accept
時將返回的 connFD 也加入 epoll 的事件隊列 -
netFD 在讀寫時出現
syscall.EAGAIN
錯誤,通過 pollDesc 將當前的 goroutine park 住,直到 ready,從 pollDesc 的waitRead
中返回
type TCPListener struct {
fd *netFD
}
type netFD struct {
// 省略其他成員
pd pollDesc
}
在net.Listen
過程中,新建了描述 listenFD 的數據結構netFD
,並在 netFD 的listenStream
方法中實現了建立 socket 的 bind&listen 和 netFD 的初始化。polldesc 的 init 初始化了底層的 epoll 實例,並將 fd 添加到了 epoll 的事件隊列中。ln.Accept()
實際上通過 netFD 的 accept,用系統調用 accept 返回的 connFD 新建一個新的 netFD 並初始化,即把它也加入到 epoll 的事件隊列中。
func (fd *netFD) init() error {
if err := fd.pd.init(fd); err != nil {
return err
}
return nil
}
netFD 的Read
操作在系統調用 Read 後,當有 syscall.EAGAIN 錯誤發生時,WaitRead
將當前讀這個 connFD 的 goroutine 給 park 住,直到這個 connFD 上的讀事件再次發生爲止,waitRead
調用返回,繼續 for 循環的執行。netFD 的 Write 方法和 Read 的實現原理是一樣的,都是在碰到 EAGAIN 錯誤的時候將當前 goroutine 給 park 住直到 socket 再次可寫爲止。
這樣的實現,就讓調用 netFD 的 Read 的地方變成了同步阻塞方式。
func (fd *netFD) Read(p []byte) (n int, err error) {
for {
n, err = syscall.Read(fd.sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.waitRead(); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
break
}
return
}
polldesc
polldesc 的初始化通過 runtime 層封裝的 pollServerInit 實現了 epoll 實例的 sync.Once 的初始化。初始化時通過 netpollinit 實現的,在 linux 上更底層就是 epoll_create 創建了 epollFD。初始化的第二步是要加入 epoll 的事件隊列,通過runtime_pollOpen
實現。其中 netpollopen 在 linux 上就是 epoll_ctl。
func (pd *pollDesc) init(fd *netFD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
pd.runtimeCtx = ctx
return nil
}
func net_runtime_pollServerInit() {
netpollinit()
atomic.Store(&netpollInited, 1)
}
func net_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
...
errno = netpollopen(fd, pd)
return pd, int(errno)
}
最後在 netFD 實現阻塞的 pd.waitRead 是通過 netpollblock 實現的,即 gopark 當前 goroutine,直到 IO ready 才從 netpollblock 中返回。
func (pd *pollDesc) wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res)
}
func net_runtime_pollWait(pd *pollDesc, mode int) int {
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
}
return 0
}
如何實現 IO ready 的通知,讓陷入 IO wait 的 goroutine 重新被調度呢,這個跟 proc.go 裏的調度有關。在 goroutine 調度器
findrunnable
可運行的 goroutine 時,將不阻塞的執行 netpoll,即執行 epollwait,監聽等待在 epollFD 上的事件隊列裏的 FD 是否就緒,有則 wakeUp 這個 goutine 開始運行,也就使這個 netpollblock 可以返回,使 netFD 解除阻塞。
netpoll_epoll
上面提到的 net_runtime_pollServerInit 和 net_runtime_pollOpen 都是對底層事件驅動機制的封裝,封裝的意義在於屏蔽不同操作系統的實現細節。在 linux 上是通過 runtime 包中的netpoll_epoll.go
實現的。它封裝了 epoll 的三個系統調用,即 epoll_create epoll_ctl 和 epoll_wait。
它封裝成了四個 runtime 函數。netpollinit 使用 epoll_create 創建 epollfd,netpollopen 添加一個 fd 到 epoll 中,這裏的數據結構稱爲 pollDesc,它一開始都關注了讀寫事件,並且採用的是邊緣觸發。netpollclose 函數就是從 epoll 刪除一個 fd。
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
EPOLLRDHUP
,這個事件是在較新的內核版本添加的,目的是解決對端 socket 關閉,epoll 本身並不能直接感知到這個關閉動作的問題。
netpoll 就是從 epoll wait 得到所有發生事件的 fd,並將每個 fd 對應的 goroutine 通過鏈表返回。這個操作是在 goroutine 調度器中使用的,用來將因爲 IO wait 而阻塞的 goroutine 重新調度。
func netpoll(block bool) *g {
if epfd == -1 {
return nil
}
waitms := int32(-1)
if !block {
waitms = 0
}
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
goto retry
}
var gp guintptr
for i := int32(0); i < n; i++ {
ev := &events[i]
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
netpollready(&gp, pd, mode)
}
}
if block && gp == 0 {
goto retry
}
return gp.ptr()
轉自:
ninokop.github.io/2018/02/18/go-net/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/FgvUdzY4MoZmq6Jsc5u37g