基於 libevent 的 C-- 線程池實現

1 功能簡介

本文利用 libevent,實現一個 C++ 線程池,,可自定義用戶任務類,繼承於任務 task 基類,重寫任務基類的純虛函數實現多態。比如將定義定義處理客戶端的請求任務類,實現對客戶端請求的併發處理。

線程池的初始化

線程池執行流程

2 線程池類的設計

線程類 XThread

線程類的接口功能
Start() ->		管道可讀就激活線程;設置管道屬性;進入事件循環,等待管道可讀激活線程執行任務
Setup() ->		設置管道屬性,將管道讀事件綁定到event_base中,等待觸發,調用回調
Main() ->		此函數只進入事件循環,等待事件循環退出
    
Notify() ->		讀取管道數據,從當前線程對象的任務隊列中取出任務,執行任務
AddTask() ->	將任務對象加入線程對象的任務隊列,將線程的事件處理器base,保存到任務對象中
Activate() ->	通過管道發送啓動標誌,來激活線程,發送一個字符'c'激活相當於加入一個任務對象到當前線程的任務隊列,通過Notify()處理。
    			調用多次Activate表示加入多個任務,任務順序被執行。

XThread.h

#pragma once
#include <vector>

/*線程類聲明*/
class XThread;

/*任務類聲明*/
class XTask;

/*線程池類*/
class XThreadPool
{
public:
    //單例模式創建返回唯一對象
    static XThreadPool* GetInstance();

    //初始化所有線程並啓動線程
    void Init(int threadCount);

    //分發線程
    void Dispatch(XTask* task);

private:
    //將構造函數的訪問屬性設置爲 private
    //將構造函數構造聲明成私有不使用
    //聲明成私有不使用
    XThreadPool(){}                                 //無參構造
    XThreadPool(const XThreadPool&);                //拷貝構造
    XThreadPool& operator= (const XThreadPool&);    //賦值運算符重載
    
    //線程數量
    int threadCount = 0;

    //用來標記下一個使用的線程號
    int lastThread = -1;

    //線程對象數組
    std::vector<XThread *> threads;

    //線程池對象
    static XThreadPool* pInstance;
};

XThread.cpp

#include "XThread.h"
#include "XTask.h"
#include <thread>
#include <iostream>
#include <event2/event.h>
#include <unistd.h>

using namespace std;

XThread::XThread()
{

}

XThread::~XThread()
{

}

//sock 文件描述符,which 事件類型 arg傳遞的參數
/*
* 函數名: NotifyCB
* 作用:   管道可讀事件觸發回調函數
*/
static void NotifyCB(evutil_socket_t fd, short which, void *arg)
{
    XThread *th = (XThread*)arg;
    th->Notify(fd, which);
}

/*
* 函數名: XThread::Start
* 作用:   啓動線程
* 解釋:   管道可讀就激活線程;設置管道屬性;進入事件循環,等待管道可讀激活線程執行任務。
*/
void XThread::Start()
{
    //安裝線程,初始化event_base和管道監聽事件用於激活
    Setup();

    //啓動線程
    thread th(&XThread::Main, this);

    //線程分離
    th.detach();
}

/*
* 函數名: XThread::Main
* 作用:   線程入口函數
* 解釋:   此函數只進入事件循環,等待事件循環退出
*/
void XThread::Main()
{
    cout << id << " XThread::Main() begin" << endl;
    event_base_dispatch(base);  //進入事件循環
    event_base_free(base);
	cout << id << " XThread::Main() end" << endl;
}

/*
* 函數名: XThread::Setup
* 作用:   安裝線程
* 解釋:   設置管道屬性,將管道讀事件綁定到event_base中,等待觸發,調用回調
*/
bool XThread::Setup()
{
	//windows用配對socket linux用管道
    //創建的管道
    int fds[2];
    if(pipe(fds)){
        cerr << "pipe failed!" << endl;
		return false;    
    }

    //讀取綁定到event事件中,寫入要保存
    //保存管道的寫fd
    notify_send_fd = fds[1];

    //創建一個新的事件處理器對象
    this->base = event_base_new();

    //創建一個新的事件對象
    //添加管道監聽事件讀fd,用於激活線程執行任務
    event *ev = event_new(base, fds[0], EV_READ|EV_PERSIST, NotifyCB, this);
    //將事件對象(struct event)添加到指定的事件處理器(event_base)中
    event_add(ev, 0);

    return true;
}


/*
* 函數名: XThread::Notify
* 作用:   線程激活執行任務
* 解釋:   讀取管道數據,從當前線程對象的任務隊列中取出任務,執行任務
*/
void XThread::Notify(evutil_socket_t fd, short which)
{
    //水平觸發 只要沒有接受完成,會再次進來
    char buf[2] = {0};

	int len = read(fd, buf, 1);
    if (len <= 0)
		return;
	cout << id << " thread " << buf << endl;

    //獲取任務,並初始化任務
    XTask* task = NULL;
    tasks_mutex.lock();
    if(tasks.empty()){  //隊列爲空
        tasks_mutex.unlock();
        return;
    }

    task = tasks.front();   //先進先出
    tasks.pop_front();
    tasks_mutex.unlock();
    task->Init();
}

/*
* 函數名: XThread::Activate
* 作用:   激活線程
* 解釋:   通過管道發送啓動標誌,來激活線程,發送一個字符'c'激活相當於加入一個任務對象到當前線程的任務隊列,通過Notify()處理。
*         調用多次Activate表示加入多個任務,任務順序被執行。
*/
void XThread::Activate()
{
    char act[10] = {0};

    int len = write(this->notify_send_fd, "c", 1);

    if (len <= 0)
	{
		cerr << "XThread::Activate() failed!" << endl;
	}
    cout << "currect thread:" << id << ", notify_send_fd:" << this->notify_send_fd << endl;
}


/*
* 函數名: XThread::AddTask
* 作用:   將任務對象加入線程對象的任務隊列,將線程的事件處理器base,保存到任務對象中
*/
void XThread::AddTask(XTask* task)
{
    if(!task)return;

    task->base = this->base;

    tasks_mutex.lock();
    tasks.push_back(task);
    tasks_mutex.unlock();
}

線程池類 XThreadPool

線程類的接口功能
GetInstance() ->	單例模式創建返回唯一對象
Init() ->			創建指定數量線程對象,啓動線程,並把線程對象加入到線程池的線程對象數組 
Dispatch() ->		從線程對象數組取出線程對象,並把任務加入線程對象的任務隊列中,激活該線程執行任務

XThreadPool.h

#pragma once
#include <vector>

/*線程類聲明*/
class XThread;

/*任務類聲明*/
class XTask;

/*線程池類*/
class XThreadPool
{
public:
    //單例模式創建返回唯一對象
    static XThreadPool* GetInstance();

    //初始化所有線程並啓動線程
    void Init(int threadCount);

    //分發線程
    void Dispatch(XTask* task);

private:
    //將構造函數的訪問屬性設置爲 private
    //將構造函數構造聲明成私有不使用
    //聲明成私有不使用
    XThreadPool(){}                                 //無參構造
    XThreadPool(const XThreadPool&);                //拷貝構造
    XThreadPool& operator= (const XThreadPool&);    //賦值運算符重載
    
    //線程數量
    int threadCount = 0;

    //用來標記下一個使用的線程號
    int lastThread = -1;

    //線程對象數組
    std::vector<XThread *> threads;

    //線程池對象
    static XThreadPool* pInstance;
};

XThreadPool.cpp

#include "XThreadPool.h"
#include "XThread.h"
#include <thread>
#include <iostream>
//#include <chrono>

using namespace std;

//靜態成員變量類外初始化
XThreadPool* XThreadPool::pInstance = NULL;

/*
* 函數名: XThreadPool::GetInstance
* 作用:   單例模式創建返回唯一對象
*/
XThreadPool* XThreadPool::GetInstance()
{
    //當需要使用對象時,訪問instance 的值
    //空值:創建對象,並用instance 標記
    //非空值: 返回instance 標記的對象
    if( pInstance == NULL )
    {
        pInstance = new XThreadPool();
    }
    
    return pInstance;
}

/*
* 函數名: XThreadPool::Init
* 作用:   初始化所有線程並啓動線程
* 解釋:   創建指定數量線程對象,啓動線程,並把線程對象加入到線程池的線程對象數組       
*/
void XThreadPool::Init(int threadCount)
{
    this->threadCount = threadCount;
    this->lastThread = -1;
    for (int i = 0; i < threadCount; i++)
    {
        XThread *t = new XThread();
        t->id = i + 1;
        cout << "Create thread " << i << endl;

        //啓動線程
        t->Start();
        threads.push_back(t);
        this_thread::sleep_for(std::chrono::microseconds(10));
    }
}

/*
* 函數名: XThreadPool::Dispatch
* 作用:   分發線程
* 解釋:   從線程對象數組取出線程對象,並把任務加入線程對象的任務隊列中,激活該線程執行任務。
*/
void XThreadPool::Dispatch(XTask* task)
{
    //輪詢
    if(!task)return;
    int tid = (lastThread + 1) % threadCount;
    lastThread = tid;
    cout << "lastThread:" << lastThread << endl;

    XThread *XTh = threads[tid];

    //添加任務
    XTh->AddTask(task);

    //線程激活
    XTh->Activate();
}

任務基類 task

XTask.h

#pragma once
#include <iostream>

class XTask
{
public:
    //事件處理器對象
    struct event_base* base = NULL;

    //客戶端連接的socket
    int sock = 0;

    //初始化任務 純虛函數
    virtual bool Init() = 0;
};

3 自定義任務的例子

自定義任務類 ServerCMD

線程類的接口功能
Init() ->		初始化任務,註冊當前socket的讀事件和超時事件,綁定回調函數
ReadCB() ->		讀事件回調函數 
EventCB() ->	客戶端超時未發請求,斷開連接退出任務

ServerCMD.h

#pragma once

#include "XTask.h"

class XFtpServerCMD : public XTask
{
public:
    //初始化任務
    virtual bool Init();

    XFtpServerCMD();
    ~XFtpServerCMD();
};

ServerCMD.cpp

#include "XFtpServerCMD.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <iostream>
#include <string.h>

using namespace std;


/*
* 函數名: EventCB
* 作用:   超時事件回調函數
* 解釋:   客戶端超時未發請求,斷開連接退出任務
*/
void EventCB(struct bufferevent *bev, short what, void *arg)
{
    XFtpServerCMD* cmd = (XFtpServerCMD*)arg;
    //如果對方網絡斷掉,或者機器死機有可能收不到BEV_EVENT_EOF數據
    if(what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT))
    {
        cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR |BEV_EVENT_TIMEOUT" << endl;
		bufferevent_free(bev);
		delete cmd;
    }
}


/*
* 函數名: ReadCB
* 作用:   讀事件回調函數
*/
void ReadCB(struct bufferevent *bev, void *arg)
{
    XFtpServerCMD* cmd = (XFtpServerCMD*)arg;
    char data[1024] = {0};

    for (;;)
    {
        int len = bufferevent_read(bev, data, sizeof(data)-1);
        if(len <= 0)break;
        data[len] = '\0';
        cout << data << endl << flush;

        //測試代碼,要清理掉
        if(strstr(data, "quit"))
        {
            bufferevent_free(bev);
            delete cmd;
            break;
        }
    }
}


/*
* 函數名: XFtpServerCMD::Init
* 作用:   初始化任務
* 解釋:   初始化任務,註冊當前socket的讀事件和超時事件,綁定回調函數。
*/
bool XFtpServerCMD::Init()
{
    cout << "XFtpServerCMD::Init() sock:" << sock << endl;
	//監聽socket bufferevent
	// base socket
    bufferevent* bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(bev, ReadCB, 0 ,EventCB, this);
    bufferevent_enable(bev, EV_READ | EV_WRITE);

    //添加超時
    timeval rt = {10, 0};       //10秒
    bufferevent_set_timeouts(bev, &rt, 0);  //設置讀超時回調函數
	return true;
}

XFtpServerCMD::XFtpServerCMD()
{

}

XFtpServerCMD::~XFtpServerCMD()
{

}

測試程序

#include <event2/event.h>
#include <event2/listener.h>
#include <string.h>
#include "XThreadPool.h"
#include <signal.h>
#include <iostream>

#include "XFtpServerCMD.h"

using namespace std;
#define SPORT 5001

/*
* 函數名: listen_cb
* 作用:   接收到連接的回調函數
* 解釋:   通過多態來創建任務對象,將當前socket保存到任務對象中,分發任務執行
*/
void listen_cb(struct evconnlistener *e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg)
{
	cout << "listen_cb" << endl;
	XTask* task = new XFtpServerCMD();
	task->sock = s;
	XThreadPool::GetInstance()->Dispatch(task);
}

int main()
{
	//忽略管道信號,發送數據給已關閉的socket
	if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
		return 1;

	//1 初始化線程池
    XThreadPool::GetInstance()->Init(5);

	std::cout << "test thread pool!\n"; 
	//創建libevent的上下文
	event_base* base = event_base_new();
	if (base)
	{
		cout << "event_base_new success!" << endl;
	}

	//監聽端口
	//socket ,bind,listen 綁定事件
	sockaddr_in sin;
	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(SPORT);

	evconnlistener* ev = evconnlistener_new_bind(base,	// libevent的上下文
		listen_cb,										//接收到連接的回調函數
		base,											//回調函數獲取的參數 arg
		LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,		//地址重用,evconnlistener關閉同時關閉socket
		10,												//連接隊列大小,對應listen函數
		(sockaddr*)&sin,								//綁定的地址和端口
		sizeof(sin)
		);

	//事件分發處理
	if(base)
		event_base_dispatch(base);
	if(ev)
		evconnlistener_free(ev);
	if(base)
		event_base_free(base);


    return 0;
}

運行效果

初始化線程池,創建 5 個線程,通過 telnet 和網絡調試軟件模擬客戶端的接入,客戶端發送信息服務器打印出來,當客戶端超時未發請求,斷開連接退出任務。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/k2ibCBg28MB7_6X45ekvHQ