聊聊 go 語言對於 socket 的抽象
寫在文章開頭
go語言
對於網絡抽象做了非常通用且高性能的封裝,所以就從net
包源碼入手介紹一下go語言
對於socket
的抽象。
Hi,我是 sharkChili ,是個不斷在硬核技術上作死的 java coder ,是 CSDN 的博客專家 ,也是開源項目 Java Guide 的維護者之一,熟悉 Java 也會一點 Go ,偶爾也會在 C 源碼 邊緣徘徊。寫過很多有意思的技術博客,也還在研究並輸出技術的路上,希望我的文章對你有幫助,非常歡迎你關注我的公衆號: 寫代碼的 SharkChili 。
詳解 go 語言對 socket 的抽象
服務端 socket 與客戶端的交互流程
在正式介紹源碼之前,我們需要簡單的過一下 socket 通信的流程:
-
服務端創建
socket
。 -
根據配置的端口號調用
bind
綁定端口監聽連接。 -
調用
accept
阻塞監聽連接。 -
客戶端
socket
通過connect
和服務端建立連接(其底層實際上會經歷一次TCP三次握手)
。 -
雙方進行數據收發。
-
完成通信後,客戶端調用
close
結束通信(這期間會經歷4次揮手)
。
代碼示例
我們給出下面這樣一段代碼,他通過Listen
創建服務端監聽socket
,然後通過Accept
阻塞接收新連接,一旦收到連接後開啓協程進行數據讀寫:
func main() {
//綁定8080端口
listen, err := net.Listen("tcp", "localhost:8080")
if err != nil {
fmt.Println("Error:", err.Error())
return
}
//設置程序結束後關閉監聽
defer listen.Close()
for {
//阻塞等待連接
conn, err := listen.Accept()
if err != nil {
fmt.Println("Error:", err.Error())
return
}
go func() {
defer conn.Close()
//讀取消息到buf並回復客戶端Message received.
buf := make([]byte, 1024)
conn.Read(buf)
fmt.Printf("收到消息:%s \r\n", string(buf))
conn.Write([]byte("Message received."))
}()
}
}
啓動後我們用 telnet 建立連接連接,並隨意出入一個字符串 a,該程序就會輸出如下消息:
收到消息:a
對應我們的終端也會收到程序的回覆,然後連接被斷開:
Message received.
遺失對主機的連接。
go 語言如何完成 socket 的創建
我們以Linux
系統爲例,我們通過 net.Listen
方法創建TCP
監聽socket
並綁定傳入的端口號,其底層會調用內核創建socket
並將這個socket
的文件描述符fd
封裝到go
語言的netFD
對象。
我們從上文的Listen
函數爲入口,可以看到其內部調用了ListenConfig
的Listen
方法:
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
//調用ListenConfig的Listen實現基於配置的TCP連接初始化
return lc.Listen(context.Background(), network, address)
}
步入Listen
就可以看到基於配置初始化監聽對象的核心邏輯:
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
//......
//封裝監聽對象
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
//創建監聽socket
l, err = sl.listenTCP(ctx, la)
//......
}
步入其內部邏輯查看,它會基於我們傳入 ip 端口號等配置封裝一個sysListener
並調用listenTCP
得到一個TCPListener
對象,這就是我們的監聽socket
對象。
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
//......
//封裝監聽對象
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
//生成TCP監聽對象
l, err = sl.listenTCP(ctx, la)
case *UnixAddr:
//......
default:
//......
return l, nil
}
最終步入listenTCP
查看邏輯,就可以看到它會通過internetSocket
創建socket
並基於這個socket
的文件描述符fd
封裝成TCPListener
返回:
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
//......
//調用internetSocket,其底層會根據操作系統調用不同的函數完成socket創建
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", ctrlCtxFn)
if err != nil {
return nil, err
}
//基於socket的文件描述符fd封裝成TCPListener返回
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
接收新連接
完成監聽socket
創建之後就可以進行監聽並處理接入的連接,Accept
本上就調用socket
的accept
方法獲取socket
對象,如果沒有連接則直接將當前服務端監聽socket
的對應協程掛起,反之若收到新連接則基於內核函數封裝成一個establish
的socket
並將其封裝成TCPConn
對象返回:
我們查看Accept
函數內部,即可看到核心調用accept
,其底層本質就是調用當前TCPListener
對應socket
的accept
方法從而得到一個已建立連接且封裝establish socket
的對象TCPConn
:
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
//調用當前socket的accept方法得到一個新連接TCPConn
c, err := l.accept()
//......
return c, nil
}
查看accept
內部邏輯,如上文所說基於當前連接的socket
的文件描述符定位到socket
調用accept
阻塞監聽新連接:
func (ln *TCPListener) accept() (*TCPConn, error) {
//
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
//基於新連接的fd封裝成newTCPConn客戶端連接
return newTCPConn(fd, ln.lc.KeepAlive, nil), nil
}
以Linux
爲例,我們可在fd_unix.go
看到ln.fd.accept()
的實現,其本質就是調用 accept 方法獲取就緒的socket
,若存在需要建立連接的socket
則返回,反之調用waitRead
將當前協程掛起,等待系統輪詢得到當前監聽socket
就緒的事件後將其喚醒:
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
//......
for {
//調用accept獲取就緒socket信息
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
//若accept沒有得到socket則調用waitRead將當前協程掛起
case syscall.EAGAIN:
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
//......
}
return -1, nil, errcall, err
}
}
讀數據
讀寫數據和監聽socket
獲取就緒連接處理差不多,我們以Linux
系統讀爲例,調用read
進行數據讀時就調用底層邏輯進行系統讀,如果有就緒的讀事件則返回處理,反之將當前協程掛起:
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
//調用當前socket的read方法並返回
n, err := c.fd.Read(b)
//......
return n, err
}
查看Read
底層實現即可看到它調用 socket 原生非阻塞讀方法,若沒有就緒的讀數據則將協程掛起,若有數據則返回數據長度 n:
func (fd *FD) Read(p []byte) (int, error) {
//......
for {
//調用socket原生讀方法
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
//若返回EAGAIN 則說明當前非阻塞讀沒有得到就緒的數據,調用waitRead將協程掛起
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
寫數據
有了讀數據的源碼的學習基礎,對於寫數據的邏輯也就可以很直觀的理解了,同樣的調用原生 socket 的非阻塞寫,若發現不可寫 (內核緩衝區已滿) 則將當前協程掛起,反之直接將數據到內核緩衝區等待發送:
func (c *conn) Write(b []byte) (int, error) {
//......
//調用當前socket的寫方法
n, err := c.fd.Write(b)
//......
return n, err
}
最終我們也可以在fd_unix.go
看到Write
的核心邏輯非阻塞寫的邏輯,若可寫則寫入後返回寫入長度若非阻塞寫失敗,則將當前協程掛起,等待可寫時喚醒:
// Write implements io.Writer.
func (fd *FD) Write(p []byte) (int, error) {
//......
var nn int
for {
//......
n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
//......
if n > 0 {
nn += n
}
//返回寫入長度nn
if nn == len(p) {
return nn, err
}
//......
//若非阻塞寫失敗,則將當前協程掛起,等待可寫時喚醒
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
//......
}
}
小結
自此我們關於 go 語言網絡層抽象設計與實現的所有篇章都已完成,感謝您的支持。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/q-hzoGvNbRkEGxjzODZYTw