C-- 實現數據庫連接池

前言

在學習 Mysql 的時候,我們都有這個常識:對於 DB 的操作,其實本質上是對於磁盤的操作,如果對於 DB 的訪問次數過多,其實就是涉及了大量的磁盤 IO,這就會導致 MYsql 出現性能上的瓶頸。

項目背景

爲了提高 Mysql 數據庫的訪問瓶頸,常用的方法有如下兩個:

注:常見的 MySQL、Oracle、SQLServer 等數據庫都是基於 C/S 架構設計的。

市面上主流的 Mysql 數據庫連接池,對於短時間內的大量增刪改查操作的性能提升很明顯,但大多數都是 Java 實現的,該項目的開展就是爲了提高 Mysql Server 的訪問效率,實現基於 C++ 代碼的數據庫連接池模塊。

針對於系統啓動時就創建一定數量的連接,用戶一旦執行 CURD 操作,直接拿出一條連接即可,不需要 TCP 的連接過程和資源回收過程,使用完該連接後歸還給連接池的連接隊列,供之後使用。

功能點介紹

連接池一般包含了數據庫連接所用的 ip 地址、port 端口號、username 用戶名、password 密碼以及其他一些性能參數:比如初始連接量、最大連接量、最大空閒時間、連接超時時間等,本項目重點實現上述通用功能:

1、初始連接量(initSize):

初始連接量表示連接池事先會和 MySQL Server 創建的 initSize 數量的 Connection 連接。在完成初始連接量之後,當應用發起 MySQL 訪問時,不用創建新的 MySQLServer 連接,而是從連接池中直接獲取一個連接,當使用完成後,再把連接歸還到連接池中。

2、最大連接量(maxSize)

當併發訪問 MySQL Server 的請求增加,初始連接量不夠用了,此時會增加連接量,但是增加的連接量的上限就是 maxSIze。因爲每一個連接都會佔用一個 socket 資源,一般連接池和服務器都是部署在一臺主機上,如果連接池的連接數量過多,那麼服務器就不能響應太多的客戶端請求了。

3、最大空閒時間(maxIdleTime)

當高併發過去,因爲高併發而新創建的連接在很長時間(maxIdleTime)內沒有得到使用,那麼這些新創建的連接處於空閒,並且佔用着一定的資源,這個時候就需要將其釋放掉,最終只用保存 iniSize 個連接就行。

4、連接超時時間(connectionTimeOut)

當 MySQL 的併發訪問請求量過大,連接池中的連接數量已經達到了 maxSize,並且此時連接池中沒有可以使用的連接,那麼此時應用阻塞 connectionTimeOut 的時間,如果此時間內有使用完的連接歸還到連接池,那麼他就可以使用,如果超過這個時間還是沒有連接,那麼它獲取數據庫連接池失敗,無法訪問數據庫。

功能點實現的相關原理綜述

  1. 連接池只需要一個實例,所以 ConnectionPool 以單例 ` 模式設計;

  2. 從 ConnectionPool 中可以獲取和 Mysql 的連接 Connection;

  3. 空閒連接 Connection 全部維護在一個線程安全的 Connection 隊列中,使用線程互斥鎖保證隊列的線程安;

  4. 如果 Connection 隊列爲空,還需要再獲取連接,此時需要動態創建連接,上限數量是 maxSize;

  5. 隊列中空閒連接時間超過 maxIdleTime 的就會被釋放掉,只保留初始的 initSize 個連接就可以了,這個功能點肯定要放在獨立的線程中去做;

  6. 如果 Connection 隊列爲空,而此時連接的數量已達上限 maxSize,那麼等待 ConnectionTimeout 時間還獲取不到空閒的連接,那麼獲取連接失敗,此處從 Connection 隊列獲取空閒連接,可以使用帶超時時間的 mutex 互斥鎖來實現連接超時時間;

  7. 用戶獲取的連接用 shared_ptr 智能指針來管理,用 lambda 表達式定製連接釋放的功能(不真正釋放連接,而是把連接歸還到連接池中);

  8. 連接的生產和連接的消費採用生產者 - 消費者線程模型來設計,使用了線程間的同步通信機制條件變量和互斥鎖。

圖示如下:

關鍵技術點

1、MySql 數據庫編程

目的:在 C++ 下輸入 Sql 語句對數據庫進行操作的代碼封裝

說明:這裏的 MYSQL 的數據庫編程直接採用 oracle 公司提供的 C++ 客戶端開發包 , 讀者可以自己查閱資料或搜索官方文檔自行學習相關 API 的使用方法。

Connection.h:

class Connection
{
public:
	// 初始化數據庫連接
	Connection();
	// 釋放數據庫連接資源
	~Connection();
	// 連接數據庫
	bool connect(string ip,
		unsigned short port,
		string user,
		string password,
		string dbname);
	// 更新操作 insert、delete、update
	bool update(string sql);
	// 查詢操作 select
	MYSQL_RES* query(string sql);
	// 刷新一下連接的起始的空閒時間點
	void refreshAliveTime() { _alivetime = clock(); }
	// 返回存活的時間
	clock_t getAliveeTime()const { return clock() - _alivetime; }
private:
	MYSQL* _conn; // 表示和MySQL Server的一條連接
	clock_t _alivetime; // 記錄進入空閒狀態後的起始存活時間
};

Connection.cpp:

Connection::Connection()
{
	// 初始化數據庫連接
	_conn = mysql_init(nullptr);
}

Connection::~Connection()
{
	// 釋放數據庫連接資源
	if (_conn != nullptr)
		mysql_close(_conn);
}

bool Connection::connect(string ip, unsigned short port,
	string username, string password, string dbname)
{
	// 連接數據庫
	MYSQL* p = mysql_real_connect(_conn, ip.c_str(), username.c_str(),
		password.c_str(), dbname.c_str(), port, nullptr, 0);
	return p != nullptr;
}

bool Connection::update(string sql)
{
	// 更新操作 insert、delete、update
	if (mysql_query(_conn, sql.c_str()))
	{
		LOG("更新失敗:" + sql);
		return false;
	}
	return true;
}

MYSQL_RES* Connection::query(string sql)
{
	// 查詢操作 select
	if (mysql_query(_conn, sql.c_str()))
	{
		LOG("查詢失敗:" + sql);
		return nullptr;
	}
	return mysql_use_result(_conn);
}

這裏需要說明的是:在 Windows 上使用數據庫需要進行相關配置,大致配置內容如下:

2、數據庫連接池單例代碼

連接池僅需要一個實例,同時服務器肯定是多線程的,必須保證線程安全,所以採用懶漢式線程安全的單例:

CommonConnectionPool.h: 部分代碼

class ConnectionPool
{
public:
	// 獲取連接池對象實例
	static ConnectionPool* getConnectionPool();
	// 給外部提供接口,從連接池中獲取一個可用的空閒連接
	shared_ptr<Connection> getConnection();
private:
	// 單例#1 構造函數私有化
	ConnectionPool();
};

CommonConnectionPool.cpp: 部分代碼

// 線程安全的懶漢單例函數接口
ConnectionPool* ConnectionPool::getConnectionPool()
{
	static ConnectionPool pool; //靜態對象初始化由編譯器自動進行lock和unlock
	return &pool;
}

3、queue 隊列容器

連接池的數據結構是 queue 隊列,最早生成的連接 connection 放在隊頭,此時記錄一個起始時間,這一點在後面最大空閒時間時會發揮作用:如果隊頭都沒有超過最大空閒時間,那麼其他的一定沒有

CommonConnectionPool.cpp 的連接池構造函數:

// 連接池的構造
ConnectionPool::ConnectionPool()
{
	// 加載配置項了
	if (!loadConfigFile())
	{
		return;
	}

	// 創建初始數量的連接
	for (int i = 0; i < _initSize; ++i)
	{
		Connection* p = new Connection();//創建一個新的連接
		p->connect(_ip, _port, _username, _password, _dbname);
		p->refreshAliveTime(); // 刷新一下開始空閒的起始時間
		_connectionQue.push(p);
		_connectionCnt++;
	}
}

連接數量沒有到達上限,繼續創建新的連接

if (_connectionCnt < _maxSize)
{
	Connection* p = new Connection();
	p->connect(_ip, _port, _username, _password, _dbname);
	p->refreshAliveTime(); // 刷新一下開始空閒的起始時間
	_connectionQue.push(p);
	_connectionCnt++;
}

掃描整個隊列,釋放多餘的連接(高併發過後,新建的連接超過最大超時時間時)

unique_lock<mutex> lock(_queueMutex);
while (_connectionCnt > _initSize)
{
	Connection* p = _connectionQue.front();
	if (p->getAliveTime() >= (_maxIdleTime * 1000))
	{
		_connectionQue.pop();
		_connectionCnt--;
		// 調用~Connection()釋放連接
		delete p;
	}
	else
	{
		// 如果隊頭的連接沒有超過_maxIdleTime,其他連接肯定沒有
		break;
	}
}

4、多線程編程

爲了將多線程編程的相關操作應用到實際,也爲了進行壓力測試,用結果證明使用連接池之後對數據庫的訪問效率確實比不使用連接池的時候高很多,使用了多線程來進行數據庫的訪問操作,並且觀察多線程下連接池對於性能的提升。

代碼如下:

int main()
{
	thread t1([]() {
		for (int i = 0; i < 250; ++i)
		{
			Connection conn;
			char sql[1024] = { 0 };
			sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
				"zhang san", 20, "M");
			conn.connect("127.0.0.1", 3306, "root", "991205", "chat");
			conn.update(sql);
		}
		});
	thread t2([]() {
		for (int i = 0; i < 250; ++i)
		{
			Connection conn;
			char sql[1024] = { 0 };
			sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
				"zhang san", 20, "M");
			conn.connect("127.0.0.1", 3306, "root", "991205", "chat");
			conn.update(sql);
		}
		});
	thread t3([]() {
		for (int i = 0; i < 250; ++i)
		{
			Connection conn;
			char sql[1024] = { 0 };
			sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
				"zhang san", 20, "M");
			conn.connect("127.0.0.1", 3306, "root", "991205", "chat");
			conn.update(sql);
		}
		});
	thread t4([]() {
		for (int i = 0; i < 250; ++i)
		{
			Connection conn;
			char sql[1024] = { 0 };
			sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
				"zhang san", 20, "male");
			conn.connect("127.0.0.1", 3306, "root", "991205", "chat");
			conn.update(sql);
		}
		});

	t1.join();
	t2.join();
	t3.join();
	t4.join();

	return 0;
}

五、線程互斥、線程同步通信(生產者 - 消費者模型)、unique_lock

連接池中連接隊列的連接的生產和消費需要保證其線程安全,於是我們需要引入互斥鎖 mutex,線程同步通信確保執行順序,以及唯一鎖。

代碼如下:

class ConnectionPool
{
private:
	// 設置條件變量,用於連接生產線程和連接消費線程的通信
	condition_variable cv;				
	// 維護連接隊列的線程安全互斥鎖
	mutex _queueMutex;
};

for (;;)
{
	unique_lock<mutex> lock(_queueMutex);
	while (!_connectionQue.empty())
	{
		// 隊列不爲空,此處生產線程進入等待狀態
		cv.wait(lock);
	}

	// 連接數量沒有達到上限,繼續創建新的連接
	if (_connectionCnt < _maxSize)
	{
		Connection* p = new Connection();
		p->connect(_ip, _port, _username, _password, _dbname);
		// 刷新一下開始空閒的起始時間
		p->refreshAliveTime();
		_connectionQue.push(p);
		_connectionCnt++;
	}

	// 通知消費者線程,可以消費連接了
	cv.notify_all();
}
// 啓動一個新的線程,作爲連接的生產者 linux thread => pthread_create
thread produce(std::bind(&ConnectionPool::produceConnectionTask, this));
produce.detach();

// 啓動一個新的定時線程,掃描超過maxIdleTime時間的空閒連接,進行對於的連接回收
thread scanner(std::bind(&ConnectionPool::scannerConnectionTask, this));
scanner.detach();

六、CAS 原子操作

對於連接池內的連接數量,生產者和消費者線程都會去改變其值,那麼這個變量的修改就必須保證其原子性,於是使用 C++11 中提供的原子類:atomic_int

atomic_int _connectionCnt; // 記錄連接所創建的connection連接的總數量 

// 生產新連接時:
_connectionCnt++;
// 當新連接超過最大超時時間後被銷燬時
_connectionCnt--;

七、shared_ptr 及 lambda 表達式

對於使用完成的連接,不能直接銷燬該連接,而是需要將該連接歸還給連接池的隊列,供之後的其他消費者使用,於是我們使用智能指針,自定義其析構函數,完成放回的操作:

shared_ptr<Connection> sp(_connectionQue.front(),
	[&](Connection* pcon) {
		// 這裏是在服務器應用線程中調用的,所以一定要考慮隊列的線程安全操作
		unique_lock<mutex> lock(_queueMutex);
		pcon->refreshAliveTime();
		_connectionQue.push(pcon);
	});

八、壓力測試

測試添加連接池後效率是否提升:

未使用連接池

  1. 單線程
int main()
{
	clock_t begin = clock();
	for (int i = 0; i < 1000; ++i)
	{
		Connection conn;
		char sql[1024] = { 0 };
		sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
			"zhang san", 20, "M");
		conn.connect("127.0.0.1", 3306, "root", "123456", "chat");
		conn.update(sql);
	}
	clock_t end = clock();
	cout << (end - begin) << "ms" << endl;
	return 0;
}

運行時間如下:

添加圖片註釋,不超過 140 字(可選)

  1. 多線程
int main()
{
	Connection conn;
	conn.connect("127.0.0.1", 3306, "root", "991205", "chat");
	clock_t begin = clock();

	thread t1([]() {
		for (int i = 0; i < 250; ++i)
		{
			Connection conn;
			char sql[1024] = { 0 };
			sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
				"zhang san", 20, "M");
			conn.connect("127.0.0.1", 3306, "root", "123456", "chat");
			conn.update(sql);
		}
		});
	thread t2([]() {
		for (int i = 0; i < 250; ++i)
		{
			Connection conn;
			char sql[1024] = { 0 };
			sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
				"zhang san", 20, "M");
			conn.connect("127.0.0.1", 3306, "root", "123456", "chat");
			conn.update(sql);
		}
		});
	thread t3([]() {
		for (int i = 0; i < 250; ++i)
		{
			Connection conn;
			char sql[1024] = { 0 };
			sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
				"zhang san", 20, "M");
			conn.connect("127.0.0.1", 3306, "root", "123456", "chat");
			conn.update(sql);
		}
});
	thread t4([]() {
		for (int i = 0; i < 250; ++i)
		{
			Connection conn;
			char sql[1024] = { 0 };
			sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
				"zhang san", 20, "M");
			conn.connect("127.0.0.1", 3306, "root", "123456", "chat");
			conn.update(sql);
		}
		});

	t1.join();
	t2.join();
	t3.join();
	t4.join();

	clock_t end = clock();
	cout << (end - begin) << "ms" << endl;
	return 0;
}

運行時間如下:

使用連接池

  1. 單線程
int main()
{
	clock_t begin = clock();
	ConnectionPool* cp = ConnectionPool::getConnectionPool();
	for (int i = 0; i < 1000; ++i)
	{
		shared_ptr<Connection> sp = cp->getConnection();
		char sql[1024] = { 0 };
		sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
			"zhang san", 20, "M");
		sp->update(sql);
	}

	clock_t end = clock();
	cout << (end - begin) << "ms" << endl;
	return 0;
}

運行時間如下:

  1. 多線程
int main()
{
	clock_t begin = clock();

	thread t1([]() {
		ConnectionPool* cp = ConnectionPool::getConnectionPool();
		for (int i = 0; i < 250; ++i)
		{
			char sql[1024] = { 0 };
			sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
				"zhang san", 20, "M");
			shared_ptr<Connection> sp = cp->getConnection();
			sp->update(sql);
		}
		});
	thread t2([]() {
		ConnectionPool* cp = ConnectionPool::getConnectionPool();
		for (int i = 0; i < 250; ++i)
		{
			char sql[1024] = { 0 };
			sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
				"zhang san", 20, "M");
			shared_ptr<Connection> sp = cp->getConnection();
			sp->update(sql);
		}
		});
	thread t3([]() {
		ConnectionPool* cp = ConnectionPool::getConnectionPool();
		for (int i = 0; i < 250; ++i)
		{
			char sql[1024] = { 0 };
			sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
				"zhang san", 20, "M");
			shared_ptr<Connection> sp = cp->getConnection();
			sp->update(sql);
		}
		});
	thread t4([]() {
		ConnectionPool* cp = ConnectionPool::getConnectionPool();
		for (int i = 0; i < 250; ++i)
		{
			char sql[1024] = { 0 };
			sprintf(sql, "insert into user(name,age,sex) values('%s',%d,'%s')",
				"zhang san", 20, "M");
			shared_ptr<Connection> sp = cp->getConnection();
			sp->update(sql);
		}
		});

	t1.join();
	t2.join();
	t3.join();
	t4.join();

	clock_t end = clock();
	cout << (end - begin) << "ms" << endl;
	return 0;
		}

比較

在使用了連接池之後,性能確實提升了不少

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/jctLG5xMg-xmMUSeYAN-ZA