Golang 數據庫連接池技術原理與實現

一、爲什麼需要連接池

如果不用連接池,而是每次請求都創建一個連接是比較昂貴的,因此需要完成 3 次 tcp 握手。同時在高併發場景下,由於沒有連接池的最大連接數限制,可以創建無數個連接,耗盡文件描述符。連接池就是爲了複用些創建好的連接。

二、連接池設計

基本上連接池都會設計以下幾個參數:

初始連接數:在初始化連接池時就會預先創建好的連接數量,如果設置得:

最大空閒連接數 maxIdle:池中最大緩存的連接個數,如果設置得:

最大連接數 maxCap

最大空閒時間 idleTimeout:當發現某連接空閒超過這個時間時,會將其關閉,重新去獲取連接

連接池對外提供兩個方法,Get:獲取一個連接,Put:歸還一個連接。大部分連接池的實現大同小異,基本流程如下:

三、Golang 標準庫 SQL 連接池

Golang 的連接池實現在標準庫 database/sql/sql.go 下。當我們運行:

db, err := sql.Open("mysql", "xxxx")

就會打開一個連接池。我們可以看看返回的 db 的結構體:

type DB struct {
    // Atomic access only. At top of struct to prevent mis-alignment
    // on 32-bit platforms. Of type time.Duration.
    waitDuration int64 // 等待新連接的總時間,用於統計
    connector driver.Connector // 由數據庫驅動實現的連接器
    // numClosed is an atomic counter which represents a total number of
    // closed connections. Stmt.openStmt checks it before cleaning closed
    // connections in Stmt.css.
    numClosed uint64 // 關閉的連接數
    mu           sync.Mutex // 鎖
    freeConn     []*driverConn // 可用連接池
    connRequests map[uint64]chan connRequest // 連接請求表,key 是分配的自增鍵
    nextRequest  uint64 // 連接請求的自增鍵
    numOpen      int    // 已經打開 + 即將打開的連接數
    // Used to signal the need for new connections
    // a goroutine running connectionOpener() reads on this chan and
    // maybeOpenNewConnections sends on the chan (one send per needed connection)
    // It is closed during db.Close(). The close tells the connectionOpener
    // goroutine to exit.
    openerCh          chan struct{} // 告知 connectionOpener 需要新的連接
    resetterCh        chan *driverConn // connectionResetter 函數,連接放回連接池的時候會用到
    closed            bool
    dep               map[finalCloser]depSet
    lastPut           map[*driverConn]string // debug 時使用,記錄上一個放回的連接
    maxIdle           int                    // 連接池大小,默認大小爲 2,<= 0 時不使用連接池
    maxOpen           int                    // 最大打開的連接數,<= 0 不限制
    maxLifetime       time.Duration          // 一個連接可以被重用的最大時限,也就是它在連接池中的最大存活時間,0 表示可以一直重用
    cleanerCh         chan struct{} // 告知 connectionCleaner 清理連接
    waitCount         int64 // 等待的連接總數
    maxIdleClosed     int64 // 釋放連接時,因爲連接池已滿而被關閉的連接總數
    maxLifetimeClosed int64 // 因爲超過存活時間而被關閉的連接總數
    stop func() // stop cancels the connection opener and the session resetter.
}

我們可以看的,DB 這個連接池內部存儲連接的結構 freeConn,並不是我們之前使用的 chan,而是 []*driverConn,一個連接切片。

// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
    db        *DB // 數據庫句柄
    createdAt time.Time
    sync.Mutex  // 鎖
    ci          driver.Conn // 對應具體的連接
    closed      bool // 是否標記關閉
    finalClosed bool // 是否最終關閉
    openStmt    map[*driverStmt]bool // 在這個連接上打開的狀態
    lastErr     error // connectionResetter 的返回結果
    // guarded by db.mu
    inUse      bool // 連接是否佔用
    onPut      []func() // 連接歸還時要運行的函數,在 noteUnusedDriverStatement 添加
    dbmuClosed bool     // 和 closed 狀態一致,但是由鎖保護,用於 removeClosedStmtLocked
}

繼續查看代碼,通過 query 方法一路往回找,我們可以看到這個函數:func(db*DB)conn(ctx context.Context,strategy connReuseStrategy)(*driverConn,error)。

3.1 獲取連接

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    // 先判斷db是否已經關閉。
  db.mu.Lock()
  if db.closed {
    db.mu.Unlock()
    return nil, errDBClosed
  }
    // 注意檢測context是否已經被超時等原因被取消。
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime
    // 這邊如果在freeConn這個切片有空閒連接的話,就left pop一個出列。注意的是,這邊因爲是切片操作,所以需要前面需要加鎖且獲取後進行解鎖操作。同時判斷返回的連接是否已經過期。
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        // Lock around reading lastErr to ensure the session resetter finished.
        conn.Lock()
        err := conn.lastErr
        conn.Unlock()
        if err == driver.ErrBadConn {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }
    // 這邊就是等候獲取連接的重點了。當空閒的連接爲空的時候,這邊將會新建一個request(的等待連接 的請求)並且開始等待
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // 下面的動作相當於往connRequests這個map插入自己的號碼牌。
        // 插入號碼牌之後這邊就不需要阻塞等待繼續往下走邏輯。
        req := make(chan connRequest, 1)
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.waitCount++
        db.mu.Unlock()
        waitStart := time.Now()
        // Timeout the connection request with the context.
        select {
        case <-ctx.Done():
            // context取消操作的時候,記得從connRequests這個map取走自己的號碼牌。
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            select {
            default:
            case ret, ok := <-req:
                // 這邊值得注意了,因爲現在已經被context取消了。但是剛剛放了自己的號碼牌進去排隊裏面。意思是說不定已經發了連接了,所以得注意歸還!
                if ok && ret.conn != nil {
                    db.putConn(ret.conn, ret.err, false)
                }
            }
            return nil, ctx.Err()
        case ret, ok := <-req:
            // 下面是已經獲得連接後的操作了。檢測一下獲得連接的狀況。因爲有可能已經過期了等等。
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            if !ok {
                return nil, errDBClosed
            }
            if ret.err == nil && ret.conn.expired(lifetime) {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            if ret.conn == nil {
                return nil, ret.err
            }
            ret.conn.Lock()
            err := ret.conn.lastErr
            ret.conn.Unlock()
            if err == driver.ErrBadConn {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }
    // 下面就是如果上面說的限制情況不存在,可以創建先連接時候,要做的創建連接操作了。
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.connector.Connect(ctx)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
        inUse:     true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc, nil
}

3.2 釋放連接

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
  if db.closed {
    return false
  }
  if db.maxOpen > 0 && db.numOpen > db.maxOpen {
    return false
  }
  // 這邊是重點了,基本來說就是從connRequest這個map裏面隨機抽一個
  // 在排隊等着的請求。取出來後發給他。就不用歸還池子了。
  if c := len(db.connRequests); c > 0 {
    var req chan connRequest
    var reqKey uint64
    for reqKey, req = range db.connRequests {
      break
    }
    delete(db.connRequests, reqKey) // Remove from pending requests.
    if err == nil {
      dc.inUse = true
    }
    // 把連接給這個正在排隊的連接
    req <- connRequest{
      conn: dc,
      err:  err,
    }
    return true
  } else if err == nil && !db.closed {
   // 既然沒人排隊,就看看到了最大連接數目沒有。沒到就歸還給freeConn
    if db.maxIdleConnsLocked() > len(db.freeConn) {
      db.freeConn = append(db.freeConn, dc)
      db.startCleanerLocked()
      return true
    }
    db.maxIdleClosed++
  }
  return false
}

四、總結

資源重用: 數據庫連接得到重用,避免了頻繁創建、釋放連接引起的大量性能開銷。

更快的系統響應速度: 數據庫連接池在初始化過程中,往往已經創建了若干數據庫連接置於池中備用。 對於業務請求處理而言,直接利用現有可用連接,避免了數據庫連接初始化和釋放過程的時間開銷,從而縮減了系統整體響應時間。

新的資源分配手段: 對於多應用共享同一數據庫的系統而言,可在應用層通過數據庫連接的配置,實現數據庫連接池技術。設置某一應用最大可用數據庫連接數的限制,避免某一應用獨佔所有數據庫資源。

統一的連接管理,避免數據庫連接泄漏: 在較爲完備的數據庫連接池實現中,可根據預先的連接佔用超時設定,強制收回被佔用連接。從而避免了常規數據庫連接操作中可能出現的資源泄漏。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/cZMvKB5Wu-pdOSZsizpulA