Golang 數據庫連接池技術原理與實現
一、爲什麼需要連接池
如果不用連接池,而是每次請求都創建一個連接是比較昂貴的,因此需要完成 3 次 tcp 握手。同時在高併發場景下,由於沒有連接池的最大連接數限制,可以創建無數個連接,耗盡文件描述符。連接池就是爲了複用些創建好的連接。
二、連接池設計
基本上連接池都會設計以下幾個參數:
初始連接數:在初始化連接池時就會預先創建好的連接數量,如果設置得:
-
過大:可能造成浪費
-
過小:請求到來時需要新建連接
最大空閒連接數 maxIdle:池中最大緩存的連接個數,如果設置得:
-
過大:造成浪費,自己不用還把持着連接。因爲數據庫整體的連接數是有限的,當前進程佔用多了,其他進程能獲取的就少了
-
過小:無法應對突發流量
最大連接數 maxCap:
- 如果已經用了 maxCap 個連接,要申請第 maxCap+1 個連接時,一般會阻塞在那裏,直到超時或者別人歸還一個連接
最大空閒時間 idleTimeout:當發現某連接空閒超過這個時間時,會將其關閉,重新去獲取連接
- 避免連接長時間沒用,自動失效的問題
連接池對外提供兩個方法,Get:獲取一個連接,Put:歸還一個連接。大部分連接池的實現大同小異,基本流程如下:
三、Golang 標準庫 SQL 連接池
Golang 的連接池實現在標準庫 database/sql/sql.go 下。當我們運行:
db, err := sql.Open("mysql", "xxxx")
就會打開一個連接池。我們可以看看返回的 db 的結構體:
type DB struct {
// Atomic access only. At top of struct to prevent mis-alignment
// on 32-bit platforms. Of type time.Duration.
waitDuration int64 // 等待新連接的總時間,用於統計
connector driver.Connector // 由數據庫驅動實現的連接器
// numClosed is an atomic counter which represents a total number of
// closed connections. Stmt.openStmt checks it before cleaning closed
// connections in Stmt.css.
numClosed uint64 // 關閉的連接數
mu sync.Mutex // 鎖
freeConn []*driverConn // 可用連接池
connRequests map[uint64]chan connRequest // 連接請求表,key 是分配的自增鍵
nextRequest uint64 // 連接請求的自增鍵
numOpen int // 已經打開 + 即將打開的連接數
// Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection)
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
openerCh chan struct{} // 告知 connectionOpener 需要新的連接
resetterCh chan *driverConn // connectionResetter 函數,連接放回連接池的時候會用到
closed bool
dep map[finalCloser]depSet
lastPut map[*driverConn]string // debug 時使用,記錄上一個放回的連接
maxIdle int // 連接池大小,默認大小爲 2,<= 0 時不使用連接池
maxOpen int // 最大打開的連接數,<= 0 不限制
maxLifetime time.Duration // 一個連接可以被重用的最大時限,也就是它在連接池中的最大存活時間,0 表示可以一直重用
cleanerCh chan struct{} // 告知 connectionCleaner 清理連接
waitCount int64 // 等待的連接總數
maxIdleClosed int64 // 釋放連接時,因爲連接池已滿而被關閉的連接總數
maxLifetimeClosed int64 // 因爲超過存活時間而被關閉的連接總數
stop func() // stop cancels the connection opener and the session resetter.
}
我們可以看的,DB 這個連接池內部存儲連接的結構 freeConn,並不是我們之前使用的 chan,而是 []*driverConn,一個連接切片。
// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
db *DB // 數據庫句柄
createdAt time.Time
sync.Mutex // 鎖
ci driver.Conn // 對應具體的連接
closed bool // 是否標記關閉
finalClosed bool // 是否最終關閉
openStmt map[*driverStmt]bool // 在這個連接上打開的狀態
lastErr error // connectionResetter 的返回結果
// guarded by db.mu
inUse bool // 連接是否佔用
onPut []func() // 連接歸還時要運行的函數,在 noteUnusedDriverStatement 添加
dbmuClosed bool // 和 closed 狀態一致,但是由鎖保護,用於 removeClosedStmtLocked
}
繼續查看代碼,通過 query 方法一路往回找,我們可以看到這個函數:func(db*DB)conn(ctx context.Context,strategy connReuseStrategy)(*driverConn,error)。
3.1 獲取連接
// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
// 先判斷db是否已經關閉。
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
// 注意檢測context是否已經被超時等原因被取消。
select {
default:
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
}
lifetime := db.maxLifetime
// 這邊如果在freeConn這個切片有空閒連接的話,就left pop一個出列。注意的是,這邊因爲是切片操作,所以需要前面需要加鎖且獲取後進行解鎖操作。同時判斷返回的連接是否已經過期。
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// Lock around reading lastErr to ensure the session resetter finished.
conn.Lock()
err := conn.lastErr
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
// 這邊就是等候獲取連接的重點了。當空閒的連接爲空的時候,這邊將會新建一個request(的等待連接 的請求)並且開始等待
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
// 下面的動作相當於往connRequests這個map插入自己的號碼牌。
// 插入號碼牌之後這邊就不需要阻塞等待繼續往下走邏輯。
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()
waitStart := time.Now()
// Timeout the connection request with the context.
select {
case <-ctx.Done():
// context取消操作的時候,記得從connRequests這個map取走自己的號碼牌。
db.mu.Lock()
delete(db.connRequests, reqKey)
db.mu.Unlock()
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
select {
default:
case ret, ok := <-req:
// 這邊值得注意了,因爲現在已經被context取消了。但是剛剛放了自己的號碼牌進去排隊裏面。意思是說不定已經發了連接了,所以得注意歸還!
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false)
}
}
return nil, ctx.Err()
case ret, ok := <-req:
// 下面是已經獲得連接後的操作了。檢測一下獲得連接的狀況。因爲有可能已經過期了等等。
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
if !ok {
return nil, errDBClosed
}
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
ret.conn.Lock()
err := ret.conn.lastErr
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
}
// 下面就是如果上面說的限制情況不存在,可以創建先連接時候,要做的創建連接操作了。
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
}
3.2 釋放連接
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
// 這邊是重點了,基本來說就是從connRequest這個map裏面隨機抽一個
// 在排隊等着的請求。取出來後發給他。就不用歸還池子了。
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
if err == nil {
dc.inUse = true
}
// 把連接給這個正在排隊的連接
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
// 既然沒人排隊,就看看到了最大連接數目沒有。沒到就歸還給freeConn
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
return false
}
四、總結
資源重用: 數據庫連接得到重用,避免了頻繁創建、釋放連接引起的大量性能開銷。
更快的系統響應速度: 數據庫連接池在初始化過程中,往往已經創建了若干數據庫連接置於池中備用。 對於業務請求處理而言,直接利用現有可用連接,避免了數據庫連接初始化和釋放過程的時間開銷,從而縮減了系統整體響應時間。
新的資源分配手段: 對於多應用共享同一數據庫的系統而言,可在應用層通過數據庫連接的配置,實現數據庫連接池技術。設置某一應用最大可用數據庫連接數的限制,避免某一應用獨佔所有數據庫資源。
統一的連接管理,避免數據庫連接泄漏: 在較爲完備的數據庫連接池實現中,可根據預先的連接佔用超時設定,強制收回被佔用連接。從而避免了常規數據庫連接操作中可能出現的資源泄漏。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/cZMvKB5Wu-pdOSZsizpulA