基於 libevent 的 C-- 線程池實現
1 功能簡介
本文利用 libevent,實現一個 C++ 線程池,,可自定義用戶任務類,繼承於任務 task 基類,重寫任務基類的純虛函數實現多態。比如將定義定義處理客戶端的請求任務類,實現對客戶端請求的併發處理。
-
工作隊列:可以理解爲線程的隊列,一個線程同時可以處理一個任務,空閒的線程回從任務隊列取出任務執行。當工作隊列空時,線程會睡眠。
-
任務隊列:用戶將任務加入任務隊列,然後通知工作隊列,取出一個任務到線程中執行。
線程池的初始化
線程池執行流程
2 線程池類的設計
線程類 XThread
線程類的接口功能
Start() -> 管道可讀就激活線程;設置管道屬性;進入事件循環,等待管道可讀激活線程執行任務
Setup() -> 設置管道屬性,將管道讀事件綁定到event_base中,等待觸發,調用回調
Main() -> 此函數只進入事件循環,等待事件循環退出
Notify() -> 讀取管道數據,從當前線程對象的任務隊列中取出任務,執行任務
AddTask() -> 將任務對象加入線程對象的任務隊列,將線程的事件處理器base,保存到任務對象中
Activate() -> 通過管道發送啓動標誌,來激活線程,發送一個字符'c'激活相當於加入一個任務對象到當前線程的任務隊列,通過Notify()處理。
調用多次Activate表示加入多個任務,任務順序被執行。
XThread.h
#pragma once
#include <vector>
/*線程類聲明*/
class XThread;
/*任務類聲明*/
class XTask;
/*線程池類*/
class XThreadPool
{
public:
//單例模式創建返回唯一對象
static XThreadPool* GetInstance();
//初始化所有線程並啓動線程
void Init(int threadCount);
//分發線程
void Dispatch(XTask* task);
private:
//將構造函數的訪問屬性設置爲 private
//將構造函數構造聲明成私有不使用
//聲明成私有不使用
XThreadPool(){} //無參構造
XThreadPool(const XThreadPool&); //拷貝構造
XThreadPool& operator= (const XThreadPool&); //賦值運算符重載
//線程數量
int threadCount = 0;
//用來標記下一個使用的線程號
int lastThread = -1;
//線程對象數組
std::vector<XThread *> threads;
//線程池對象
static XThreadPool* pInstance;
};
XThread.cpp
#include "XThread.h"
#include "XTask.h"
#include <thread>
#include <iostream>
#include <event2/event.h>
#include <unistd.h>
using namespace std;
XThread::XThread()
{
}
XThread::~XThread()
{
}
//sock 文件描述符,which 事件類型 arg傳遞的參數
/*
* 函數名: NotifyCB
* 作用: 管道可讀事件觸發回調函數
*/
static void NotifyCB(evutil_socket_t fd, short which, void *arg)
{
XThread *th = (XThread*)arg;
th->Notify(fd, which);
}
/*
* 函數名: XThread::Start
* 作用: 啓動線程
* 解釋: 管道可讀就激活線程;設置管道屬性;進入事件循環,等待管道可讀激活線程執行任務。
*/
void XThread::Start()
{
//安裝線程,初始化event_base和管道監聽事件用於激活
Setup();
//啓動線程
thread th(&XThread::Main, this);
//線程分離
th.detach();
}
/*
* 函數名: XThread::Main
* 作用: 線程入口函數
* 解釋: 此函數只進入事件循環,等待事件循環退出
*/
void XThread::Main()
{
cout << id << " XThread::Main() begin" << endl;
event_base_dispatch(base); //進入事件循環
event_base_free(base);
cout << id << " XThread::Main() end" << endl;
}
/*
* 函數名: XThread::Setup
* 作用: 安裝線程
* 解釋: 設置管道屬性,將管道讀事件綁定到event_base中,等待觸發,調用回調
*/
bool XThread::Setup()
{
//windows用配對socket linux用管道
//創建的管道
int fds[2];
if(pipe(fds)){
cerr << "pipe failed!" << endl;
return false;
}
//讀取綁定到event事件中,寫入要保存
//保存管道的寫fd
notify_send_fd = fds[1];
//創建一個新的事件處理器對象
this->base = event_base_new();
//創建一個新的事件對象
//添加管道監聽事件讀fd,用於激活線程執行任務
event *ev = event_new(base, fds[0], EV_READ|EV_PERSIST, NotifyCB, this);
//將事件對象(struct event)添加到指定的事件處理器(event_base)中
event_add(ev, 0);
return true;
}
/*
* 函數名: XThread::Notify
* 作用: 線程激活執行任務
* 解釋: 讀取管道數據,從當前線程對象的任務隊列中取出任務,執行任務
*/
void XThread::Notify(evutil_socket_t fd, short which)
{
//水平觸發 只要沒有接受完成,會再次進來
char buf[2] = {0};
int len = read(fd, buf, 1);
if (len <= 0)
return;
cout << id << " thread " << buf << endl;
//獲取任務,並初始化任務
XTask* task = NULL;
tasks_mutex.lock();
if(tasks.empty()){ //隊列爲空
tasks_mutex.unlock();
return;
}
task = tasks.front(); //先進先出
tasks.pop_front();
tasks_mutex.unlock();
task->Init();
}
/*
* 函數名: XThread::Activate
* 作用: 激活線程
* 解釋: 通過管道發送啓動標誌,來激活線程,發送一個字符'c'激活相當於加入一個任務對象到當前線程的任務隊列,通過Notify()處理。
* 調用多次Activate表示加入多個任務,任務順序被執行。
*/
void XThread::Activate()
{
char act[10] = {0};
int len = write(this->notify_send_fd, "c", 1);
if (len <= 0)
{
cerr << "XThread::Activate() failed!" << endl;
}
cout << "currect thread:" << id << ", notify_send_fd:" << this->notify_send_fd << endl;
}
/*
* 函數名: XThread::AddTask
* 作用: 將任務對象加入線程對象的任務隊列,將線程的事件處理器base,保存到任務對象中
*/
void XThread::AddTask(XTask* task)
{
if(!task)return;
task->base = this->base;
tasks_mutex.lock();
tasks.push_back(task);
tasks_mutex.unlock();
}
線程池類 XThreadPool
線程類的接口功能
GetInstance() -> 單例模式創建返回唯一對象
Init() -> 創建指定數量線程對象,啓動線程,並把線程對象加入到線程池的線程對象數組
Dispatch() -> 從線程對象數組取出線程對象,並把任務加入線程對象的任務隊列中,激活該線程執行任務
XThreadPool.h
#pragma once
#include <vector>
/*線程類聲明*/
class XThread;
/*任務類聲明*/
class XTask;
/*線程池類*/
class XThreadPool
{
public:
//單例模式創建返回唯一對象
static XThreadPool* GetInstance();
//初始化所有線程並啓動線程
void Init(int threadCount);
//分發線程
void Dispatch(XTask* task);
private:
//將構造函數的訪問屬性設置爲 private
//將構造函數構造聲明成私有不使用
//聲明成私有不使用
XThreadPool(){} //無參構造
XThreadPool(const XThreadPool&); //拷貝構造
XThreadPool& operator= (const XThreadPool&); //賦值運算符重載
//線程數量
int threadCount = 0;
//用來標記下一個使用的線程號
int lastThread = -1;
//線程對象數組
std::vector<XThread *> threads;
//線程池對象
static XThreadPool* pInstance;
};
XThreadPool.cpp
#include "XThreadPool.h"
#include "XThread.h"
#include <thread>
#include <iostream>
//#include <chrono>
using namespace std;
//靜態成員變量類外初始化
XThreadPool* XThreadPool::pInstance = NULL;
/*
* 函數名: XThreadPool::GetInstance
* 作用: 單例模式創建返回唯一對象
*/
XThreadPool* XThreadPool::GetInstance()
{
//當需要使用對象時,訪問instance 的值
//空值:創建對象,並用instance 標記
//非空值: 返回instance 標記的對象
if( pInstance == NULL )
{
pInstance = new XThreadPool();
}
return pInstance;
}
/*
* 函數名: XThreadPool::Init
* 作用: 初始化所有線程並啓動線程
* 解釋: 創建指定數量線程對象,啓動線程,並把線程對象加入到線程池的線程對象數組
*/
void XThreadPool::Init(int threadCount)
{
this->threadCount = threadCount;
this->lastThread = -1;
for (int i = 0; i < threadCount; i++)
{
XThread *t = new XThread();
t->id = i + 1;
cout << "Create thread " << i << endl;
//啓動線程
t->Start();
threads.push_back(t);
this_thread::sleep_for(std::chrono::microseconds(10));
}
}
/*
* 函數名: XThreadPool::Dispatch
* 作用: 分發線程
* 解釋: 從線程對象數組取出線程對象,並把任務加入線程對象的任務隊列中,激活該線程執行任務。
*/
void XThreadPool::Dispatch(XTask* task)
{
//輪詢
if(!task)return;
int tid = (lastThread + 1) % threadCount;
lastThread = tid;
cout << "lastThread:" << lastThread << endl;
XThread *XTh = threads[tid];
//添加任務
XTh->AddTask(task);
//線程激活
XTh->Activate();
}
任務基類 task
XTask.h
#pragma once
#include <iostream>
class XTask
{
public:
//事件處理器對象
struct event_base* base = NULL;
//客戶端連接的socket
int sock = 0;
//初始化任務 純虛函數
virtual bool Init() = 0;
};
3 自定義任務的例子
自定義任務類 ServerCMD
線程類的接口功能
Init() -> 初始化任務,註冊當前socket的讀事件和超時事件,綁定回調函數
ReadCB() -> 讀事件回調函數
EventCB() -> 客戶端超時未發請求,斷開連接退出任務
ServerCMD.h
#pragma once
#include "XTask.h"
class XFtpServerCMD : public XTask
{
public:
//初始化任務
virtual bool Init();
XFtpServerCMD();
~XFtpServerCMD();
};
ServerCMD.cpp
#include "XFtpServerCMD.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <iostream>
#include <string.h>
using namespace std;
/*
* 函數名: EventCB
* 作用: 超時事件回調函數
* 解釋: 客戶端超時未發請求,斷開連接退出任務
*/
void EventCB(struct bufferevent *bev, short what, void *arg)
{
XFtpServerCMD* cmd = (XFtpServerCMD*)arg;
//如果對方網絡斷掉,或者機器死機有可能收不到BEV_EVENT_EOF數據
if(what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT))
{
cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR |BEV_EVENT_TIMEOUT" << endl;
bufferevent_free(bev);
delete cmd;
}
}
/*
* 函數名: ReadCB
* 作用: 讀事件回調函數
*/
void ReadCB(struct bufferevent *bev, void *arg)
{
XFtpServerCMD* cmd = (XFtpServerCMD*)arg;
char data[1024] = {0};
for (;;)
{
int len = bufferevent_read(bev, data, sizeof(data)-1);
if(len <= 0)break;
data[len] = '\0';
cout << data << endl << flush;
//測試代碼,要清理掉
if(strstr(data, "quit"))
{
bufferevent_free(bev);
delete cmd;
break;
}
}
}
/*
* 函數名: XFtpServerCMD::Init
* 作用: 初始化任務
* 解釋: 初始化任務,註冊當前socket的讀事件和超時事件,綁定回調函數。
*/
bool XFtpServerCMD::Init()
{
cout << "XFtpServerCMD::Init() sock:" << sock << endl;
//監聽socket bufferevent
// base socket
bufferevent* bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, ReadCB, 0 ,EventCB, this);
bufferevent_enable(bev, EV_READ | EV_WRITE);
//添加超時
timeval rt = {10, 0}; //10秒
bufferevent_set_timeouts(bev, &rt, 0); //設置讀超時回調函數
return true;
}
XFtpServerCMD::XFtpServerCMD()
{
}
XFtpServerCMD::~XFtpServerCMD()
{
}
測試程序
#include <event2/event.h>
#include <event2/listener.h>
#include <string.h>
#include "XThreadPool.h"
#include <signal.h>
#include <iostream>
#include "XFtpServerCMD.h"
using namespace std;
#define SPORT 5001
/*
* 函數名: listen_cb
* 作用: 接收到連接的回調函數
* 解釋: 通過多態來創建任務對象,將當前socket保存到任務對象中,分發任務執行
*/
void listen_cb(struct evconnlistener *e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg)
{
cout << "listen_cb" << endl;
XTask* task = new XFtpServerCMD();
task->sock = s;
XThreadPool::GetInstance()->Dispatch(task);
}
int main()
{
//忽略管道信號,發送數據給已關閉的socket
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
return 1;
//1 初始化線程池
XThreadPool::GetInstance()->Init(5);
std::cout << "test thread pool!\n";
//創建libevent的上下文
event_base* base = event_base_new();
if (base)
{
cout << "event_base_new success!" << endl;
}
//監聽端口
//socket ,bind,listen 綁定事件
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(SPORT);
evconnlistener* ev = evconnlistener_new_bind(base, // libevent的上下文
listen_cb, //接收到連接的回調函數
base, //回調函數獲取的參數 arg
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, //地址重用,evconnlistener關閉同時關閉socket
10, //連接隊列大小,對應listen函數
(sockaddr*)&sin, //綁定的地址和端口
sizeof(sin)
);
//事件分發處理
if(base)
event_base_dispatch(base);
if(ev)
evconnlistener_free(ev);
if(base)
event_base_free(base);
return 0;
}
運行效果
初始化線程池,創建 5 個線程,通過 telnet 和網絡調試軟件模擬客戶端的接入,客戶端發送信息服務器打印出來,當客戶端超時未發請求,斷開連接退出任務。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/k2ibCBg28MB7_6X45ekvHQ