一文說透 Go 語言 HTTP 標準庫

本篇文章來分析一下 Go 語言 HTTP 標準庫是如何實現的

本文使用的 go 的源碼 1.15.7

基於 HTTP 構建的服務標準模型包括兩個端,客戶端 (Client) 和服務端 (Server)。HTTP 請求從客戶端發出,服務端接受到請求後進行處理然後將響應返回給客戶端。所以 http 服務器的工作就在於如何接受來自客戶端的請求,並向客戶端返回響應。

一個典型的 HTTP 服務應該如圖所示:

HTTP client

在 Go 中可以直接通過 HTTP 包的 Get 方法來發起相關請求數據,一個簡單例子:

func main() {
 resp, err := http.Get("http://httpbin.org/get?)
 if err != nil {
  fmt.Println(err)
  return
 }
 defer resp.Body.Close()
 body, _ := ioutil.ReadAll(resp.Body)
 fmt.Println(string(body))
}

我們下面通過這個例子來進行分析。

HTTP 的 Get 方法會調用到 DefaultClient 的 Get 方法,DefaultClient 是 Client 的一個空實例,所以最後會調用到 Client 的 Get 方法:

Client 結構體

type Client struct { 
 Transport RoundTripper 
 CheckRedirect func(req *Request, via []*Request) error 
 Jar CookieJar 
 Timeout time.Duration
}

Client 結構體總共由四個字段組成:

Transport:表示 HTTP 事務,用於處理客戶端的請求連接並等待服務端的響應;

CheckRedirect:用於指定處理重定向的策略;

Jar:用於管理和存儲請求中的 cookie;

Timeout:指定客戶端請求的最大超時時間,該超時時間包括連接、任何的重定向以及讀取相應的時間;

初始化請求

func (c *Client) Get(url string) (resp *Response, err error) {
    // 根據方法名、URL 和請求體構建請求
 req, err := NewRequest("GET", url, nil)
 if err != nil {
  return nil, err
 }
    // 執行請求
 return c.Do(req)
}

我們要發起一個請求首先需要根據請求類型構建一個完整的請求頭、請求體、請求參數。然後纔是根據請求的完整結構來執行請求。

NewRequest 初始化請求

NewRequest 會調用到 NewRequestWithContext 函數上。這個函數會根據請求返回一個 Request 結構體,它裏面包含了一個 HTTP 請求所有信息。

Request

Request 結構體有很多字段,我這裏列舉幾個大家比較熟悉的字段:

NewRequestWithContext

func NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error) {
 ...
 // parse url
 u, err := urlpkg.Parse(url)
 if err != nil {
  return nil, err
 }
 rc, ok := body.(io.ReadCloser)
 if !ok && body != nil {
  rc = ioutil.NopCloser(body)
 } 
 u.Host = removeEmptyPort(u.Host)
 req := &Request{
  ctx:        ctx,
  Method:     method,
  URL:        u,
  Proto:      "HTTP/1.1",
  ProtoMajor: 1,
  ProtoMinor: 1,
  Header:     make(Header),
  Body:       rc,
  Host:       u.Host,
 } 
 ...
 return req, nil
}

NewRequestWithContext 函數會將請求封裝成一個 Request 結構體並返回。

準備 http 發送請求

如上圖所示,Client 調用 Do 方法處理發送請求最後會調用到 send 函數中。

func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
 resp, didTimeout, err = send(req, c.transport(), deadline)
 if err != nil {
  return nil, didTimeout, err
 }
 ...
 return resp, nil, nil
}

Transport

Client 的 send 方法在調用 send 函數進行下一步的處理前會先調用 transport 方法獲取 DefaultTransport 實例,該實例如下:

var DefaultTransport RoundTripper = &Transport{
    // 定義 HTTP 代理策略
 Proxy: ProxyFromEnvironment,
 DialContext: (&net.Dialer{
  Timeout:   30 * time.Second,
  KeepAlive: 30 * time.Second,
  DualStack: true,
 }).DialContext,
 ForceAttemptHTTP2:     true,
    // 最大空閒連接數
 MaxIdleConns:          100,
    // 空閒連接超時時間
 IdleConnTimeout:       90 * time.Second,
    // TLS 握手超時時間
 TLSHandshakeTimeout:   10 * time.Second,
 ExpectContinueTimeout: 1 * time.Second,
}

Transport 實現 RoundTripper 接口,該結構體會發送 http 請求並等待響應。

type RoundTripper interface { 
 RoundTrip(*Request) (*Response, error)
}

從 RoundTripper 接口我們也可以看出,該接口定義的 RoundTrip 方法會具體的處理請求,處理完畢之後會響應 Response。

回到我們上面的 Client 的 send 方法中,它會調用 send 函數,這個函數主要邏輯都交給 Transport  的 RoundTrip 方法來執行。

RoundTrip 會調用到 roundTrip 方法中:

func (t *Transport) roundTrip(req *Request) (*Response, error) {
 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
 ctx := req.Context()
 trace := httptrace.ContextClientTrace(ctx) 
 ...  
 for {
  select {
  case <-ctx.Done():
   req.closeBody()
   return nil, ctx.Err()
  default:
  }

  // 封裝請求
  treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey} 
  cm, err := t.connectMethodForRequest(treq)
  if err != nil {
   req.closeBody()
   return nil, err
  } 
  // 獲取連接
  pconn, err := t.getConn(treq, cm)
  if err != nil {
   t.setReqCanceler(cancelKey, nil)
   req.closeBody()
   return nil, err
  }
  
  // 等待響應結果
  var resp *Response
  if pconn.alt != nil {
   // HTTP/2 path.
   t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
   resp, err = pconn.alt.RoundTrip(req)
  } else {
   resp, err = pconn.roundTrip(treq)
  }
  if err == nil {
   resp.Request = origReq
   return resp, nil
  } 
  ...
 }
}

roundTrip 方法會做兩件事情:

  1. 調用 Transport 的 getConn 方法獲取連接;

  2. 在獲取到連接後,調用 persistConn 的 roundTrip 方法等待請求響應結果;

獲取連接 getConn

getConn 有兩個階段:

  1. 調用 queueForIdleConn 獲取空閒 connection;

  2. 調用 queueForDial 等待創建新的 connection;

func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
 req := treq.Request
 trace := treq.trace
 ctx := req.Context()
 if trace != nil && trace.GetConn != nil {
  trace.GetConn(cm.addr())
 } 
 // 將請求封裝成 wantConn 結構體
 w := &wantConn{
  cm:         cm,
  key:        cm.key(),
  ctx:        ctx,
  ready:      make(chan struct{}, 1),
  beforeDial: testHookPrePendingDial,
  afterDial:  testHookPostPendingDial,
 }
 defer func() {
  if err != nil {
   w.cancel(t, err)
  }
 }()
 
 // 獲取空閒連接
 if delivered := t.queueForIdleConn(w); delivered {
  pc := w.pc
  ...
  t.setReqCanceler(treq.cancelKey, func(error) {})
  return pc, nil
 }
 
 // 創建連接
 t.queueForDial(w)
 
 select {
 // 獲取到連接後進入該分支
 case <-w.ready:
  ...
  return w.pc, w.err
 ...
}

獲取空閒連接 queueForIdleConn

成功獲取到空閒 connection:

成功獲取 connection 分爲如下幾步:

  1. 根據當前的請求的地址去空閒 connection 字典中查看存不存在空閒的 connection 列表;

  2. 如果能獲取到空閒的 connection 列表,那麼獲取到列表的最後一個 connection;

  3. 返回;

獲取不到空閒 connection:

當獲取不到空閒 connection 時:

  1. 根據當前的請求的地址去空閒 connection 字典中查看存不存在空閒的 connection 列表;

  2. 不存在該請求的 connection 列表,那麼將該 wantConn 加入到 等待獲取空閒 connection 字典中;

從上面的圖解應該就很能看出這一步會怎麼操作了,這裏簡要的分析一下代碼,讓大家更清楚裏面的邏輯:

func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
 if t.DisableKeepAlives {
  return false
 }

 t.idleMu.Lock()
 defer t.idleMu.Unlock() 
 t.closeIdle = false

 if w == nil { 
  return false
 }
 
 // 計算空閒連接超時時間
 var oldTime time.Time
 if t.IdleConnTimeout > 0 {
  oldTime = time.Now().Add(-t.IdleConnTimeout)
 }
 // Look for most recently-used idle connection.
 // 找到key相同的 connection 列表
 if list, ok := t.idleConn[w.key]; ok {
  stop := false
  delivered := false
  for len(list) > 0 && !stop {
   // 找到connection列表最後一個
   pconn := list[len(list)-1] 
   // 檢查這個 connection 是不是等待太久了
   tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
   if tooOld { 
    go pconn.closeConnIfStillIdle()
   }
   // 該 connection 被標記爲 broken 或 閒置太久 continue
   if pconn.isBroken() || tooOld { 
    list = list[:len(list)-1]
    continue
   }
   // 嘗試將該 connection 寫入到 w 中
   delivered = w.tryDeliver(pconn, nil)
   if delivered {
    // 操作成功,需要將 connection 從空閒列表中移除
    if pconn.alt != nil { 
    } else { 
     t.idleLRU.remove(pconn)
     list = list[:len(list)-1]
    }
   }
   stop = true
  }
  if len(list) > 0 {
   t.idleConn[w.key] = list
  } else {
   // 如果該 key 對應的空閒列表不存在,那麼將該key從字典中移除
   delete(t.idleConn, w.key)
  }
  if stop {
   return delivered
  }
 } 
 // 如果找不到空閒的 connection
 if t.idleConnWait == nil {
  t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
 }
  // 將該 wantConn 加入到 等待獲取空閒 connection 字典中
 q := t.idleConnWait[w.key] 
 q.cleanFront()
 q.pushBack(w)
 t.idleConnWait[w.key] = q
 return false
}

上面的註釋已經很清楚了,我這裏就不再解釋了。

建立連接  queueForDial

在獲取不到空閒連接之後,會嘗試去建立連接,從上面的圖大致可以看到,總共分爲以下幾個步驟:

  1. 在調用 queueForDial 方法的時候會校驗 MaxConnsPerHost 是否未設置或已達上限;

  2. 檢驗不通過則將當前的請求放入到 connsPerHostWait 等待字典中;

  3. 如果校驗通過那麼會異步的調用 dialConnFor 方法創建連接;

  4. dialConnFor 方法首先會調用 dialConn 方法創建 TCP 連接,然後啓動兩個異步線程來處理讀寫數據,然後調用 tryDeliver 將連接綁定到  wantConn 上面。

下面進行代碼分析:

func (t *Transport) queueForDial(w *wantConn) {
 w.beforeDial()
 // 小於零說明無限制,異步建立連接
 if t.MaxConnsPerHost <= 0 {
  go t.dialConnFor(w)
  return
 }

 t.connsPerHostMu.Lock()
 defer t.connsPerHostMu.Unlock()
 // 每個 host 建立的連接數沒達到上限,異步建立連接
 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
  if t.connsPerHost == nil {
   t.connsPerHost = make(map[connectMethodKey]int)
  }
  t.connsPerHost[w.key] = n + 1
  go t.dialConnFor(w)
  return
 }
 //每個 host 建立的連接數已達到上限,需要進入等待隊列
 if t.connsPerHostWait == nil {
  t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
 }
 q := t.connsPerHostWait[w.key]
 q.cleanFront()
 q.pushBack(w)
 t.connsPerHostWait[w.key] = q
}

這裏主要進行參數校驗,如果最大連接數限制爲零,亦或是每個 host 建立的連接數沒達到上限,那麼直接異步建立連接。

dialConnFor

func (t *Transport) dialConnFor(w *wantConn) {
 defer w.afterDial()
 // 建立連接
 pc, err := t.dialConn(w.ctx, w.cm)
 // 連接綁定 wantConn
 delivered := w.tryDeliver(pc, err)
 // 建立連接成功,但是綁定 wantConn 失敗
 // 那麼將該連接放置到空閒連接字典或調用 等待獲取空閒 connection 字典 中的元素執行
 if err == nil && (!delivered || pc.alt != nil) { 
  t.putOrCloseIdleConn(pc)
 }
 if err != nil {
  t.decConnsPerHost(w.key)
 }
}

dialConnFor 會調用 dialConn 進行 TCP 連接創建,創建完畢之後調用 tryDeliver 方法和 wantConn 進行綁定。

dialConn

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
 // 創建連接結構體
 pconn = &persistConn{
  t:             t,
  cacheKey:      cm.key(),
  reqch:         make(chan requestAndChan, 1),
  writech:       make(chan writeRequest, 1),
  closech:       make(chan struct{}),
  writeErrCh:    make(chan error, 1),
  writeLoopDone: make(chan struct{}),
 }
 ...
 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
  ...
 } else {
  // 建立 tcp 連接
  conn, err := t.dial(ctx, "tcp", cm.addr())
  if err != nil {
   return nil, wrapErr(err)
  }
  pconn.conn = conn 
 } 
 ...

 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
  if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
   alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
   if e, ok := alt.(http2erringRoundTripper); ok {
    // pconn.conn was closed by next (http2configureTransport.upgradeFn).
    return nil, e.err
   }
   return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
  }
 }

 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
 //爲每個連接異步處理讀寫數據
 go pconn.readLoop()
 go pconn.writeLoop()
 return pconn, nil
}

這裏會根據 schema 的不同設置不同的連接配置,我上面顯示的是我們常用的 HTTP 連接的創建過程。對於 HTTP 來說會建立 tcp 連接,然後爲連接異步處理讀寫數據,最後將創建好的連接返回。

等待響應

這一部分的內容會稍微複雜一些,但確實非常的有趣。

在創建連接的時候會初始化兩個 channel :writech 負責寫入請求數據,reqch 負責讀取響應數據。我們在上面創建連接的時候,也提到了會爲連接創建兩個異步循環 readLoop 和 writeLoop 來負責處理讀寫數據。

在獲取到連接之後,會調用連接的 roundTrip 方法,它首先會將請求數據寫入到 writech 管道中,writeLoop 接收到數據之後就會處理請求。

然後 roundTrip 會將 requestAndChan 結構體寫入到 reqch 管道中,然後 roundTrip 會循環等待。readLoop 讀取到響應數據之後就會通過  requestAndChan 結構體中保存的管道將數據封裝成 responseAndError 結構體回寫,這樣 roundTrip 就可以接受到響應數據結束循環等待並返回。

roundTrip

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
 ...
 writeErrCh := make(chan error, 1)
 // 將請求數據寫入到 writech 管道中
 pc.writech <- writeRequest{req, writeErrCh, continueCh}

 // 用於接收響應的管道
 resc := make(chan responseAndError)
 // 將用於接收響應的管道封裝成 requestAndChan 寫入到 reqch 管道中
 pc.reqch <- requestAndChan{
  req:        req.Request,
  cancelKey:  req.cancelKey,
  ch:         resc,
  ...
 }
 ...
 for {
  testHookWaitResLoop()
  select { 
  // 接收到響應數據
  case re := <-resc:
   if (re.res == nil) == (re.err == nil) {
    panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
   }
   if debugRoundTrip {
    req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
   }
   if re.err != nil {
    return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
   }
   // 返回響應數據
   return re.res, nil
  ...
 }
}

這裏會封裝好 writeRequest 作爲發送請求的數據,並將用於接收響應的管道封裝成 requestAndChan 寫入到 reqch 管道中,然後循環等待接受響應。

然後 writeLoop 會進行請求數據 writeRequest :

func (pc *persistConn) writeLoop() {
 defer close(pc.writeLoopDone)
 for {
  select {
  case wr := <-pc.writech:
   startBytesWritten := pc.nwrite
   // 向 TCP 連接中寫入數據,併發送至目標服務器
   err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
   ...
  case <-pc.closech:
   return
  }
 }
}

這裏會將從 writech 管道中獲取到的數據寫入到 TCP 連接中,併發送至目標服務器。

readLoop

func (pc *persistConn) readLoop() {
 closeErr := errReadLoopExiting // default value, if not changed below
 defer func() {
  pc.close(closeErr)
  pc.t.removeIdleConn(pc)
 }()
 ... 
 alive := true
 for alive {
  pc.readLimit = pc.maxHeaderResponseSize()
  // 獲取 roundTrip 發送的結構體
  rc := <-pc.reqch
  trace := httptrace.ContextClientTrace(rc.req.Context())

  var resp *Response
  if err == nil {
   // 讀取數據
   resp, err = pc.readResponse(rc, trace)
  } else {
   err = transportReadFromServerError{err}
   closeErr = err
  }

  ...  
  // 將響應數據寫回到管道中
  select {
  case rc.ch <- responseAndError{res: resp}:
  case <-rc.callerGone:
   return
  }
  ...
 }
}

這裏是從 TCP 連接中讀取到對應的請求響應數據,通過 roundTrip 傳入的管道再回寫,然後 roundTrip 就會接受到數據並獲取的響應數據返回。

http server

我這裏繼續以一個簡單的例子作爲開頭:

func HelloHandler(w http.ResponseWriter, r *http.Request) {
 fmt.Fprintf(w, "Hello World")
}

func main () {
 http.HandleFunc("/", HelloHandler)
 http.ListenAndServe(":8000", nil)
}

在實現上面我先用一張圖進行簡要的介紹一下:

其實我們從上面例子的方法名就可以知道一些大致的步驟:

  1. 註冊處理器到一個 hash 表中,可以通過鍵值路由匹配;

  2. 註冊完之後就是開啓循環監聽,每監聽到一個連接就會創建一個 Goroutine;

  3. 在創建好的 Goroutine 裏面會循環的等待接收請求數據,然後根據請求的地址去處理器路由表中匹配對應的處理器,然後將請求交給處理器處理;

註冊處理器

處理器的註冊如上面的例子所示,是通過調用 HandleFunc 函數來實現的。

HandleFunc 函數會一直調用到 ServeMux 的 Handle 方法中。

func (mux *ServeMux) Handle(pattern string, handler Handler) {
 mux.mu.Lock()
 defer mux.mu.Unlock()
 ...
 e := muxEntry{h: handler, pattern: pattern}
 mux.m[pattern] = e
 if pattern[len(pattern)-1] == '/' {
  mux.es = appendSorted(mux.es, e)
 }

 if pattern[0] != '/' {
  mux.hosts = true
 }
}

Handle 會根據路由作爲 hash 表的鍵來保存 muxEntry 對象,muxEntry封裝了 pattern 和 handler。如果路由表達式以'/'結尾,則將對應的muxEntry對象加入到[]muxEntry中。

hash 表是用於路由精確匹配,[]muxEntry用於部分匹配。

監聽

監聽是通過調用 ListenAndServe 函數,裏面會調用 server 的 ListenAndServe 方法:

func (srv *Server) ListenAndServe() error {
 if srv.shuttingDown() {
  return ErrServerClosed
 }
 addr := srv.Addr
 if addr == "" {
  addr = ":http"
 }
    // 監聽端口
 ln, err := net.Listen("tcp", addr)
 if err != nil {
  return err
 }
    // 循環接收監聽到的網絡請求
 return srv.Serve(ln)
}

Serve

func (srv *Server) Serve(l net.Listener) error { 
 ...
 baseCtx := context.Background()  
 ctx := context.WithValue(baseCtx, ServerContextKey, srv)
 for {
  // 接收 listener 過來的網絡連接
  rw, err := l.Accept()
  ... 
  tempDelay = 0
  c := srv.newConn(rw)
  c.setState(c.rwc, StateNew) 
  // 創建協程處理連接
  go c.serve(connCtx)
 }
}

Serve 這個方法裏面會用一個循環去接收監聽到的網絡連接,然後創建協程處理連接。所以難免就會有一個問題,如果併發很高的話,可能會一次性創建太多協程,導致處理不過來的情況。

處理請求

處理請求是通過爲每個連接創建 goroutine 來處理對應的請求:

func (c *conn) serve(ctx context.Context) {
 c.remoteAddr = c.rwc.RemoteAddr().String()
 ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr()) 
 ... 
 ctx, cancelCtx := context.WithCancel(ctx)
 c.cancelCtx = cancelCtx
 defer cancelCtx() 
 c.r = &connReader{conn: c}
 c.bufr = newBufioReader(c.r)
 c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)  
 for {
  // 讀取請求
  w, err := c.readRequest(ctx) 
  ... 
  // 根據請求路由調用處理器處理請求
  serverHandler{c.server}.ServeHTTP(w, w.req)
  w.cancelCtx()
  if c.hijacked() {
   return
  }
  w.finishRequest() 
  ...
 }
}

當一個連接建立之後,該連接中所有的請求都將在這個協程中進行處理,直到連接被關閉。在 for 循環裏面會循環調用 readRequest 讀取請求進行處理。

請求處理是通過調用 ServeHTTP 進行的:

type serverHandler struct {
   srv *Server
}

func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
 handler := sh.srv.Handler
 if handler == nil {
  handler = DefaultServeMux
 }
 if req.RequestURI == "*" && req.Method == "OPTIONS" {
  handler = globalOptionsHandler{}
 }
 handler.ServeHTTP(rw, req)
}

serverHandler 其實就是 Server 包裝了一層。這裏的 sh.srv.Handler參數實際上是傳入的 ServeMux 實例,所以這裏最後會調用到 ServeMux 的 ServeHTTP 方法。

最終會通過 handler 調用到 match 方法進行路由匹配:

func (mux *ServeMux) match(path string) (h Handler, pattern string) {
 v, ok := mux.m[path]
 if ok {
  return v.h, v.pattern
 }
 
 for _, e := range mux.es {
  if strings.HasPrefix(path, e.pattern) {
   return e.h, e.pattern
  }
 }
 return nil, ""
}

這個方法裏首先會利用進行精確匹配,如果匹配成功那麼直接返回;匹配不成功,那麼會根據 []muxEntry中保存的和當前路由最接近的已註冊的父節點路由進行匹配,否則繼續匹配下一個父節點路由,直到根路由/。最後會調用對應的處理器進行處理。

Reference

https://cloud.tencent.com/developer/article/1515297

https://duyanghao.github.io/http-transport/

https://draveness.me/golang/docs/part4-advanced/ch09-stdlib/golang-net-http

https://laravelacademy.org/post/21003

https://segmentfault.com/a/1190000021653550

關注 luozhiyun,和他一起學習👆

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/3JhRuTUxRc6gIcLIqHJ5Tw