当前位置:   article > 正文

libevent高并发网络编程 - 06_基于libevent的C++线程池实现_windows c++ 开发 客户端 libevent

windows c++ 开发 客户端 libevent


链接: C/C++Linux服务器开发/后台架构师【零声教育】-学习视频教程-腾讯课堂

1 功能简介

本文利用libevent,实现一个C++线程池,,可自定义用户任务类,继承于任务task基类,重写任务基类的纯虚函数实现多态。比如将定义定义处理客户端的请求任务类,实现对客户端请求的并发处理。

  • 工作队列:可以理解为线程的队列,一个线程同时可以处理一个任务,空闲的线程回从任务队列取出任务执行。当工作队列空时,线程会睡眠。

  • 任务队列:用户将任务加入任务队列,然后通知工作队列,取出一个任务到线程中执行。

线程池的初始化

请添加图片描述

线程池执行流程

请添加图片描述

2 线程池类的设计

线程类XThread

线程类的接口功能
Start() ->		管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务
Setup() ->		设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调
Main() ->		此函数只进入事件循环,等待事件循环退出
    
Notify() ->		读取管道数据,从当前线程对象的任务队列中取出任务,执行任务
AddTask() ->	将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中
Activate() ->	通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。
    			调用多次Activate表示加入多个任务,任务顺序被执行。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
XThread.h
#pragma once
#include <vector>

/*线程类声明*/
class XThread;

/*任务类声明*/
class XTask;

/*线程池类*/
class XThreadPool
{
public:
    //单例模式创建返回唯一对象
    static XThreadPool* GetInstance();

    //初始化所有线程并启动线程
    void Init(int threadCount);

    //分发线程
    void Dispatch(XTask* task);

private:
    //将构造函数的访问属性设置为 private
    //将构造函数构造声明成私有不使用
    //声明成私有不使用
    XThreadPool(){}                                 //无参构造
    XThreadPool(const XThreadPool&);                //拷贝构造
    XThreadPool& operator= (const XThreadPool&);    //赋值运算符重载
    
    //线程数量
    int threadCount = 0;

    //用来标记下一个使用的线程号
    int lastThread = -1;

    //线程对象数组
    std::vector<XThread *> threads;

    //线程池对象
    static XThreadPool* pInstance;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
XThread.cpp
#include "XThread.h"
#include "XTask.h"
#include <thread>
#include <iostream>
#include <event2/event.h>
#include <unistd.h>

using namespace std;

XThread::XThread()
{

}

XThread::~XThread()
{

}

//sock 文件描述符,which 事件类型 arg传递的参数
/*
* 函数名: NotifyCB
* 作用:   管道可读事件触发回调函数
*/
static void NotifyCB(evutil_socket_t fd, short which, void *arg)
{
    XThread *th = (XThread*)arg;
    th->Notify(fd, which);
}

/*
* 函数名: XThread::Start
* 作用:   启动线程
* 解释:   管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务。
*/
void XThread::Start()
{
    //安装线程,初始化event_base和管道监听事件用于激活
    Setup();

    //启动线程
    thread th(&XThread::Main, this);

    //线程分离
    th.detach();
}

/*
* 函数名: XThread::Main
* 作用:   线程入口函数
* 解释:   此函数只进入事件循环,等待事件循环退出
*/
void XThread::Main()
{
    cout << id << " XThread::Main() begin" << endl;
    event_base_dispatch(base);  //进入事件循环
    event_base_free(base);
	cout << id << " XThread::Main() end" << endl;
}

/*
* 函数名: XThread::Setup
* 作用:   安装线程
* 解释:   设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调
*/
bool XThread::Setup()
{
	//windows用配对socket linux用管道
    //创建的管道
    int fds[2];
    if(pipe(fds)){
        cerr << "pipe failed!" << endl;
		return false;    
    }

    //读取绑定到event事件中,写入要保存
    //保存管道的写fd
    notify_send_fd = fds[1];

    //创建一个新的事件处理器对象
    this->base = event_base_new();

    //创建一个新的事件对象
    //添加管道监听事件读fd,用于激活线程执行任务
    event *ev = event_new(base, fds[0], EV_READ|EV_PERSIST, NotifyCB, this);
    //将事件对象(struct event)添加到指定的事件处理器(event_base)中
    event_add(ev, 0);

    return true;
}


/*
* 函数名: XThread::Notify
* 作用:   线程激活执行任务
* 解释:   读取管道数据,从当前线程对象的任务队列中取出任务,执行任务
*/
void XThread::Notify(evutil_socket_t fd, short which)
{
    //水平触发 只要没有接受完成,会再次进来
    char buf[2] = {0};

	int len = read(fd, buf, 1);
    if (len <= 0)
		return;
	cout << id << " thread " << buf << endl;

    //获取任务,并初始化任务
    XTask* task = NULL;
    tasks_mutex.lock();
    if(tasks.empty()){  //队列为空
        tasks_mutex.unlock();
        return;
    }

    task = tasks.front();   //先进先出
    tasks.pop_front();
    tasks_mutex.unlock();
    task->Init();
}

/*
* 函数名: XThread::Activate
* 作用:   激活线程
* 解释:   通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。
*         调用多次Activate表示加入多个任务,任务顺序被执行。
*/
void XThread::Activate()
{
    char act[10] = {0};

    int len = write(this->notify_send_fd, "c", 1);

    if (len <= 0)
	{
		cerr << "XThread::Activate() failed!" << endl;
	}
    cout << "currect thread:" << id << ", notify_send_fd:" << this->notify_send_fd << endl;
}


/*
* 函数名: XThread::AddTask
* 作用:   将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中
*/
void XThread::AddTask(XTask* task)
{
    if(!task)return;

    task->base = this->base;

    tasks_mutex.lock();
    tasks.push_back(task);
    tasks_mutex.unlock();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155

线程池类XThreadPool

线程类的接口功能
GetInstance() ->	单例模式创建返回唯一对象
Init() ->			创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组 
Dispatch() ->		从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务
  • 1
  • 2
  • 3
  • 4
XThreadPool.h
#pragma once
#include <vector>

/*线程类声明*/
class XThread;

/*任务类声明*/
class XTask;

/*线程池类*/
class XThreadPool
{
public:
    //单例模式创建返回唯一对象
    static XThreadPool* GetInstance();

    //初始化所有线程并启动线程
    void Init(int threadCount);

    //分发线程
    void Dispatch(XTask* task);

private:
    //将构造函数的访问属性设置为 private
    //将构造函数构造声明成私有不使用
    //声明成私有不使用
    XThreadPool(){}                                 //无参构造
    XThreadPool(const XThreadPool&);                //拷贝构造
    XThreadPool& operator= (const XThreadPool&);    //赋值运算符重载
    
    //线程数量
    int threadCount = 0;

    //用来标记下一个使用的线程号
    int lastThread = -1;

    //线程对象数组
    std::vector<XThread *> threads;

    //线程池对象
    static XThreadPool* pInstance;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
XThreadPool.cpp
#include "XThreadPool.h"
#include "XThread.h"
#include <thread>
#include <iostream>
//#include <chrono>

using namespace std;

//静态成员变量类外初始化
XThreadPool* XThreadPool::pInstance = NULL;

/*
* 函数名: XThreadPool::GetInstance
* 作用:   单例模式创建返回唯一对象
*/
XThreadPool* XThreadPool::GetInstance()
{
    //当需要使用对象时,访问instance 的值
    //空值:创建对象,并用instance 标记
    //非空值: 返回instance 标记的对象
    if( pInstance == NULL )
    {
        pInstance = new XThreadPool();
    }
    
    return pInstance;
}

/*
* 函数名: XThreadPool::Init
* 作用:   初始化所有线程并启动线程
* 解释:   创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组       
*/
void XThreadPool::Init(int threadCount)
{
    this->threadCount = threadCount;
    this->lastThread = -1;
    for (int i = 0; i < threadCount; i++)
    {
        XThread *t = new XThread();
        t->id = i + 1;
        cout << "Create thread " << i << endl;

        //启动线程
        t->Start();
        threads.push_back(t);
        this_thread::sleep_for(std::chrono::microseconds(10));
    }
}

/*
* 函数名: XThreadPool::Dispatch
* 作用:   分发线程
* 解释:   从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务。     
*/
void XThreadPool::Dispatch(XTask* task)
{
    //轮询
    if(!task)return;
    int tid = (lastThread + 1) % threadCount;
    lastThread = tid;
    cout << "lastThread:" << lastThread << endl;

    XThread *XTh = threads[tid];

    //添加任务
    XTh->AddTask(task);

    //线程激活
    XTh->Activate();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

任务基类task

XTask.h
#pragma once
#include <iostream>

class XTask
{
public:
    //事件处理器对象
    struct event_base* base = NULL;

    //客户端连接的socket
    int sock = 0;

    //初始化任务 纯虚函数
    virtual bool Init() = 0;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

3 自定义任务的例子

自定义任务类ServerCMD

线程类的接口功能
Init() ->		初始化任务,注册当前socket的读事件和超时事件,绑定回调函数
ReadCB() ->		读事件回调函数 
EventCB() ->	客户端超时未发请求,断开连接退出任务
  • 1
  • 2
  • 3
  • 4
ServerCMD.h
#pragma once

#include "XTask.h"

class XFtpServerCMD : public XTask
{
public:
    //初始化任务
    virtual bool Init();

    XFtpServerCMD();
    ~XFtpServerCMD();
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
ServerCMD.cpp
#include "XFtpServerCMD.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <iostream>
#include <string.h>

using namespace std;


/*
* 函数名: EventCB
* 作用:   超时事件回调函数
* 解释:   客户端超时未发请求,断开连接退出任务
*/
void EventCB(struct bufferevent *bev, short what, void *arg)
{
    XFtpServerCMD* cmd = (XFtpServerCMD*)arg;
    //如果对方网络断掉,或者机器死机有可能收不到BEV_EVENT_EOF数据
    if(what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT))
    {
        cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR |BEV_EVENT_TIMEOUT" << endl;
		bufferevent_free(bev);
		delete cmd;
    }
}


/*
* 函数名: ReadCB
* 作用:   读事件回调函数
*/
void ReadCB(struct bufferevent *bev, void *arg)
{
    XFtpServerCMD* cmd = (XFtpServerCMD*)arg;
    char data[1024] = {0};

    for (;;)
    {
        int len = bufferevent_read(bev, data, sizeof(data)-1);
        if(len <= 0)break;
        data[len] = '\0';
        cout << data << endl << flush;

        //测试代码,要清理掉
        if(strstr(data, "quit"))
        {
            bufferevent_free(bev);
            delete cmd;
            break;
        }
    }
}


/*
* 函数名: XFtpServerCMD::Init
* 作用:   初始化任务
* 解释:   初始化任务,注册当前socket的读事件和超时事件,绑定回调函数。
*/
bool XFtpServerCMD::Init()
{
    cout << "XFtpServerCMD::Init() sock:" << sock << endl;
	//监听socket bufferevent
	// base socket
    bufferevent* bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(bev, ReadCB, 0 ,EventCB, this);
    bufferevent_enable(bev, EV_READ | EV_WRITE);

    //添加超时
    timeval rt = {10, 0};       //10秒
    bufferevent_set_timeouts(bev, &rt, 0);  //设置读超时回调函数
	return true;
}

XFtpServerCMD::XFtpServerCMD()
{

}

XFtpServerCMD::~XFtpServerCMD()
{

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83

测试程序

#include <event2/event.h>
#include <event2/listener.h>
#include <string.h>
#include "XThreadPool.h"
#include <signal.h>
#include <iostream>

#include "XFtpServerCMD.h"

using namespace std;
#define SPORT 5001

/*
* 函数名: listen_cb
* 作用:   接收到连接的回调函数
* 解释:   通过多态来创建任务对象,将当前socket保存到任务对象中,分发任务执行
*/
void listen_cb(struct evconnlistener *e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg)
{
	cout << "listen_cb" << endl;
	XTask* task = new XFtpServerCMD();
	task->sock = s;
	XThreadPool::GetInstance()->Dispatch(task);
}

int main()
{
	//忽略管道信号,发送数据给已关闭的socket
	if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
		return 1;

	//1 初始化线程池
    XThreadPool::GetInstance()->Init(5);

	std::cout << "test thread pool!\n"; 
	//创建libevent的上下文
	event_base* base = event_base_new();
	if (base)
	{
		cout << "event_base_new success!" << endl;
	}

	//监听端口
	//socket ,bind,listen 绑定事件
	sockaddr_in sin;
	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(SPORT);

	evconnlistener* ev = evconnlistener_new_bind(base,	// libevent的上下文
		listen_cb,										//接收到连接的回调函数
		base,											//回调函数获取的参数 arg
		LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,		//地址重用,evconnlistener关闭同时关闭socket
		10,												//连接队列大小,对应listen函数
		(sockaddr*)&sin,								//绑定的地址和端口
		sizeof(sin)
		);

	//事件分发处理
	if(base)
		event_base_dispatch(base);
	if(ev)
		evconnlistener_free(ev);
	if(base)
		event_base_free(base);


    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

运行效果

初始化线程池,创建5个线程,通过telnet和网络调试软件模拟客户端的接入,客户端发送信息服务器打印出来,当客户端超时未发请求,断开连接退出任务。

请添加图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/554109
推荐阅读
相关标签
  

闽ICP备14008679号