聊聊 go 語言對於 socket 的抽象

寫在文章開頭

go語言對於網絡抽象做了非常通用且高性能的封裝,所以就從net包源碼入手介紹一下go語言對於socket的抽象。

Hi,我是 sharkChili ,是個不斷在硬核技術上作死的 java coder ,是 CSDN 的博客專家 ,也是開源項目 Java Guide 的維護者之一,熟悉 Java 也會一點 Go ,偶爾也會在 C 源碼 邊緣徘徊。寫過很多有意思的技術博客,也還在研究並輸出技術的路上,希望我的文章對你有幫助,非常歡迎你關注我的公衆號: 寫代碼的 SharkChili

詳解 go 語言對 socket 的抽象

服務端 socket 與客戶端的交互流程

在正式介紹源碼之前,我們需要簡單的過一下 socket 通信的流程:

  1. 服務端創建socket

  2. 根據配置的端口號調用bind綁定端口監聽連接。

  3. 調用accept阻塞監聽連接。

  4. 客戶端socket通過connect和服務端建立連接(其底層實際上會經歷一次TCP三次握手)

  5. 雙方進行數據收發。

  6. 完成通信後,客戶端調用close結束通信(這期間會經歷4次揮手)

代碼示例

我們給出下面這樣一段代碼,他通過Listen創建服務端監聽socket,然後通過Accept阻塞接收新連接,一旦收到連接後開啓協程進行數據讀寫:

func main() {
 //綁定8080端口
 listen, err := net.Listen("tcp""localhost:8080")

 if err != nil {
  fmt.Println("Error:", err.Error())
  return
 }

 //設置程序結束後關閉監聽
 defer listen.Close()

 for {
  //阻塞等待連接
  conn, err := listen.Accept()
  if err != nil {
   fmt.Println("Error:", err.Error())
   return
  }

  go func() {
   defer conn.Close()
   //讀取消息到buf並回復客戶端Message received.
   buf := make([]byte, 1024)
   conn.Read(buf)

   fmt.Printf("收到消息:%s \r\n", string(buf))
   conn.Write([]byte("Message received."))

  }()

 }
}

啓動後我們用 telnet 建立連接連接,並隨意出入一個字符串 a,該程序就會輸出如下消息:

收到消息:a

對應我們的終端也會收到程序的回覆,然後連接被斷開:

 Message received.
                  
遺失對主機的連接。

go 語言如何完成 socket 的創建

我們以Linux系統爲例,我們通過 net.Listen方法創建TCP監聽socket並綁定傳入的端口號,其底層會調用內核創建socket並將這個socket文件描述符fd封裝到go語言的netFD對象。

我們從上文的Listen函數爲入口,可以看到其內部調用了ListenConfigListen方法:

func Listen(network, address string) (Listener, error) {
 var lc ListenConfig
 //調用ListenConfig的Listen實現基於配置的TCP連接初始化
 return lc.Listen(context.Background(), network, address)
}

步入Listen就可以看到基於配置初始化監聽對象的核心邏輯:

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
    //......
 //封裝監聽對象
 sl := &sysListener{
  ListenConfig: *lc,
  network:      network,
  address:      address,
 }
 var l Listener
 la := addrs.first(isIPv4)
 switch la := la.(type) {
 case *TCPAddr:
  //創建監聽socket
  l, err = sl.listenTCP(ctx, la)
    //......
}

步入其內部邏輯查看,它會基於我們傳入 ip 端口號等配置封裝一個sysListener並調用listenTCP得到一個TCPListener對象,這就是我們的監聽socket對象。

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
 //......
 //封裝監聽對象
 sl := &sysListener{
  ListenConfig: *lc,
  network:      network,
  address:      address,
 }
 var l Listener
 la := addrs.first(isIPv4)
 switch la := la.(type) {
 case *TCPAddr:
  //生成TCP監聽對象
  l, err = sl.listenTCP(ctx, la)
 case *UnixAddr:
  //......
 default:
 //......
 return l, nil
}

最終步入listenTCP查看邏輯,就可以看到它會通過internetSocket創建socket並基於這個socket的文件描述符fd封裝成TCPListener返回:

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
 //......
 //調用internetSocket,其底層會根據操作系統調用不同的函數完成socket創建
 fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", ctrlCtxFn)
 if err != nil {
  return nil, err
 }
 //基於socket的文件描述符fd封裝成TCPListener返回
 return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

接收新連接

完成監聽socket創建之後就可以進行監聽並處理接入的連接,Accept本上就調用socketaccept方法獲取socket對象,如果沒有連接則直接將當前服務端監聽socket的對應協程掛起,反之若收到新連接則基於內核函數封裝成一個establishsocket並將其封裝成TCPConn對象返回:

我們查看Accept函數內部,即可看到核心調用accept,其底層本質就是調用當前TCPListener對應socketaccept方法從而得到一個已建立連接且封裝establish socket的對象TCPConn

func (l *TCPListener) Accept() (Conn, error) {
 if !l.ok() {
  return nil, syscall.EINVAL
 }
 //調用當前socket的accept方法得到一個新連接TCPConn
 c, err := l.accept()
 //......
 return c, nil
}

查看accept內部邏輯,如上文所說基於當前連接的socket的文件描述符定位到socket調用accept阻塞監聽新連接:

func (ln *TCPListener) accept() (*TCPConn, error) {
 //
 fd, err := ln.fd.accept()
 if err != nil {
  return nil, err
 }
 //基於新連接的fd封裝成newTCPConn客戶端連接
 return newTCPConn(fd, ln.lc.KeepAlive, nil), nil
}

Linux爲例,我們可在fd_unix.go看到ln.fd.accept()的實現,其本質就是調用 accept 方法獲取就緒的socket,若存在需要建立連接的socket則返回,反之調用waitRead將當前協程掛起,等待系統輪詢得到當前監聽socket就緒的事件後將其喚醒:

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
 //......

 for {
  //調用accept獲取就緒socket信息
  s, rsa, errcall, err := accept(fd.Sysfd)
  if err == nil {
   return s, rsa, "", err
  }
  switch err {
  case syscall.EINTR:
   continue
  //若accept沒有得到socket則調用waitRead將當前協程掛起
  case syscall.EAGAIN:
   if fd.pd.pollable() {
    if err = fd.pd.waitRead(fd.isFile); err == nil {
     continue
    }
   }
  //......
  }
  return -1, nil, errcall, err
 }
}

讀數據

讀寫數據和監聽socket獲取就緒連接處理差不多,我們以Linux系統讀爲例,調用read進行數據讀時就調用底層邏輯進行系統讀,如果有就緒的讀事件則返回處理,反之將當前協程掛起:

func (c *conn) Read([]byte) (int, error) {
 if !c.ok() {
  return 0, syscall.EINVAL
 }
 //調用當前socket的read方法並返回
 n, err := c.fd.Read(b)
 //......
 return n, err
}

查看Read底層實現即可看到它調用 socket 原生非阻塞讀方法,若沒有就緒的讀數據則將協程掛起,若有數據則返回數據長度 n:

func (fd *FD) Read([]byte) (int, error) {
 //......
 
 for {
  //調用socket原生讀方法
  n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
  if err != nil {
   n = 0
   //若返回EAGAIN 則說明當前非阻塞讀沒有得到就緒的數據,調用waitRead將協程掛起
   if err == syscall.EAGAIN && fd.pd.pollable() {
    if err = fd.pd.waitRead(fd.isFile); err == nil {
     continue
    }
   }
  }
  err = fd.eofError(n, err)
  return n, err
 }
}

寫數據

有了讀數據的源碼的學習基礎,對於寫數據的邏輯也就可以很直觀的理解了,同樣的調用原生 socket 的非阻塞寫,若發現不可寫 (內核緩衝區已滿) 則將當前協程掛起,反之直接將數據到內核緩衝區等待發送:

func (c *conn) Write([]byte) (int, error) {
 //......
 //調用當前socket的寫方法
 n, err := c.fd.Write(b)
 //......
 return n, err
}

最終我們也可以在fd_unix.go看到Write的核心邏輯非阻塞寫的邏輯,若可寫則寫入後返回寫入長度若非阻塞寫失敗,則將當前協程掛起,等待可寫時喚醒:

// Write implements io.Writer.
func (fd *FD) Write([]byte) (int, error) {
 //......
 var nn int
 for {
  //......
  n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
  //......
  if n > 0 {
   nn += n
  }
  //返回寫入長度nn
  if nn == len(p) {
   return nn, err
  }
  //......
  //若非阻塞寫失敗,則將當前協程掛起,等待可寫時喚醒
  if err == syscall.EAGAIN && fd.pd.pollable() {
   if err = fd.pd.waitWrite(fd.isFile); err == nil {
    continue
   }
  }
  //......
 }
}

小結

自此我們關於 go 語言網絡層抽象設計與實現的所有篇章都已完成,感謝您的支持。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/q-hzoGvNbRkEGxjzODZYTw