Go channel 的妙用

昨天在內網上看到一篇講數據庫連接的文章,列出了一些 sql 包的一些源碼,我注意到其中取用、歸還連接的方式非常有意思——通過臨時創建的 channel 來傳遞連接。

在 sql.DB 結構體裏,使用 freeConn 字段來表示當前所有的連接,也就是一個連接池。

type DB struct {
    freeConn     []*driverConn
}

當需要拿連接的時候,從 freeConn 中取出第一個元素:

conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true

取 slice 切片的第一個元素,然後將 slice 後面的元素往前挪,最後通過截斷來 “釋放” 最後一個元素。

當然,能進行上述操作的前提是切片 db.freeConn 長度大於 0,即有空閒連接存在。如果當前沒有空閒連接,那如何處理呢?接下來就是 channel 的妙用的地方。

sql.DB 結構體裏還有另一個字段 connRequests,它用來存儲當前有哪些 “協程” 在申請連接:

type DB struct {
    freeConn     []*driverConn
    connRequests map[uint64]chan connRequest
}

connRequests 的 key 是一個 uint64 類型,其實就是一個遞增加 1 的 key;而 connRequest 表示申請一個新連接的請求:

type connRequest struct {
 conn *driverConn
 err  error
}

這裏的 conn 正是需要的連接。

當連接池中沒有空閒連接的時候:

req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req

先是構建了一個 chan connRequest,同時拿到了一個 reqKey,將它和 req 綁定到 connRequests 中。

接下來,在 select 中等待超時或者從 req 這個 channel 中拿到空閒連接:

select {
 case <-ctx.Done():
  
 case ret, ok := <-req:
  if !ok {
   return nil, errDBClosed
  }
  
  return ret.conn, ret.err
}

可以看到,select 有兩個 case,第一個是通過 context 控制的 <-Done;第二個則是前面構造的 <-req,如果從 req 中讀出了元素,那就相當於獲得了連接:ret.conn。

那什麼時候會向 req 中發送連接呢?答案是在向連接池歸還連接的時候。

前面提到,空閒連接是一個切片,歸還的時候直接 append 到這個切片就可以了:

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    db.freeConn = append(db.freeConn, dc)
}

但其實在 append 之前,還會去檢查當前 connRequests 中是否有申請空閒連接的請求:

if c := len(db.connRequests); c > 0 {
 var req chan connRequest
 var reqKey uint64
 for reqKey, req = range db.connRequests {
  break
 }
 delete(db.connRequests, reqKey) // Remove from pending requests.
 if err == nil {
  dc.inUse = true
 }
 req <- connRequest{
  conn: dc,
  err:  err,
 }
 return true
}

如果有請求的話,直接將當前連接 “塞到” req channel 裏去了。另一邊,申請連接的 goroutine 就可以從 req channel 中讀出 conn。

於是,通過 channel 就實現了一次 “連接傳輸” 的功能。

這讓我想到不久之前芮神寫的一篇《高併發服務遇 redis 瓶頸引發 time-wait 事故》,文中提到了將多個 redis command 組裝爲一個 pipeline:

調用方把 redis command 和接收結果的 chan 推送到任務隊列中,然後由一個 worker 去消費,worker 組裝多個 redis cmd 爲 pipeline,向 redis 發起請求並拿回結果,拆解結果集後,給每個命令對應的結果 chan 推送結果。調用方在推送任務到隊列後,就一直監聽傳輸結果的 chan。

redis commnd 組裝成 pipeline

這裏的用法就和本文描述的 channel 用法一致。

細想一下,以上提到的 channel 用法很神奇嗎?我們平時沒有接觸過嗎?

我用過最多的是 “生產者 - 消費者” 模式,先啓動 N 個 goroutine 消費者,讀某個 channel,之後,生產者再在某個時候向 channel 中發送元素:

for i := 0; i < engine.workerNum; i++ {
    go func() {
        for {
            work = <-engine.workChan
        }
    }
}

另外,我還會用 channel 充當一個 “ready” 的信號,用來指示某個 “過程” 準備好了,可以接收結果了:

func (j *Job) Finished() <-chan bool {
 return j.finish
}

前面提到的 “生產者 - 消費者” 和 “ready” 信號這兩種 channel 用法和本文的 channel 用法並沒有什麼本質區別。唯一不同的點是前者的 channel 是事先創建好的,並且是 “公用” 的;而本文中用到的 channel 實際上是 “臨時” 創建的,並且只有這一個請求使用。

最後,用曹大最近在讀者羣裏說的話結尾:

  1. 抄代碼是很好的學習方式。

  2. 選一兩個感興趣的方向,自己嘗試實現相應的 feature list,實現完和標準實現做對比。

  3. 先積累再創造,別一上來就想着造輪子,看的多了碰上很多東西就有新思路了。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/DKZNmBoUqUqG7DZ0v3bR9w