赞
踩
本文利用libevent,实现一个C++线程池,,可自定义用户任务类,继承于任务task基类,重写任务基类的纯虚函数实现多态。比如将定义定义处理客户端的请求任务类,实现对客户端请求的并发处理。
工作队列:可以理解为线程的队列,一个线程同时可以处理一个任务,空闲的线程回从任务队列取出任务执行。当工作队列空时,线程会睡眠。
任务队列:用户将任务加入任务队列,然后通知工作队列,取出一个任务到线程中执行。
线程类的接口功能
Start() -> 管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务
Setup() -> 设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调
Main() -> 此函数只进入事件循环,等待事件循环退出
Notify() -> 读取管道数据,从当前线程对象的任务队列中取出任务,执行任务
AddTask() -> 将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中
Activate() -> 通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。
调用多次Activate表示加入多个任务,任务顺序被执行。
#pragma once #include <vector> /*线程类声明*/ class XThread; /*任务类声明*/ class XTask; /*线程池类*/ class XThreadPool { public: //单例模式创建返回唯一对象 static XThreadPool* GetInstance(); //初始化所有线程并启动线程 void Init(int threadCount); //分发线程 void Dispatch(XTask* task); private: //将构造函数的访问属性设置为 private //将构造函数构造声明成私有不使用 //声明成私有不使用 XThreadPool(){} //无参构造 XThreadPool(const XThreadPool&); //拷贝构造 XThreadPool& operator= (const XThreadPool&); //赋值运算符重载 //线程数量 int threadCount = 0; //用来标记下一个使用的线程号 int lastThread = -1; //线程对象数组 std::vector<XThread *> threads; //线程池对象 static XThreadPool* pInstance; };
#include "XThread.h" #include "XTask.h" #include <thread> #include <iostream> #include <event2/event.h> #include <unistd.h> using namespace std; XThread::XThread() { } XThread::~XThread() { } //sock 文件描述符,which 事件类型 arg传递的参数 /* * 函数名: NotifyCB * 作用: 管道可读事件触发回调函数 */ static void NotifyCB(evutil_socket_t fd, short which, void *arg) { XThread *th = (XThread*)arg; th->Notify(fd, which); } /* * 函数名: XThread::Start * 作用: 启动线程 * 解释: 管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务。 */ void XThread::Start() { //安装线程,初始化event_base和管道监听事件用于激活 Setup(); //启动线程 thread th(&XThread::Main, this); //线程分离 th.detach(); } /* * 函数名: XThread::Main * 作用: 线程入口函数 * 解释: 此函数只进入事件循环,等待事件循环退出 */ void XThread::Main() { cout << id << " XThread::Main() begin" << endl; event_base_dispatch(base); //进入事件循环 event_base_free(base); cout << id << " XThread::Main() end" << endl; } /* * 函数名: XThread::Setup * 作用: 安装线程 * 解释: 设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调 */ bool XThread::Setup() { //windows用配对socket linux用管道 //创建的管道 int fds[2]; if(pipe(fds)){ cerr << "pipe failed!" << endl; return false; } //读取绑定到event事件中,写入要保存 //保存管道的写fd notify_send_fd = fds[1]; //创建一个新的事件处理器对象 this->base = event_base_new(); //创建一个新的事件对象 //添加管道监听事件读fd,用于激活线程执行任务 event *ev = event_new(base, fds[0], EV_READ|EV_PERSIST, NotifyCB, this); //将事件对象(struct event)添加到指定的事件处理器(event_base)中 event_add(ev, 0); return true; } /* * 函数名: XThread::Notify * 作用: 线程激活执行任务 * 解释: 读取管道数据,从当前线程对象的任务队列中取出任务,执行任务 */ void XThread::Notify(evutil_socket_t fd, short which) { //水平触发 只要没有接受完成,会再次进来 char buf[2] = {0}; int len = read(fd, buf, 1); if (len <= 0) return; cout << id << " thread " << buf << endl; //获取任务,并初始化任务 XTask* task = NULL; tasks_mutex.lock(); if(tasks.empty()){ //队列为空 tasks_mutex.unlock(); return; } task = tasks.front(); //先进先出 tasks.pop_front(); tasks_mutex.unlock(); task->Init(); } /* * 函数名: XThread::Activate * 作用: 激活线程 * 解释: 通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。 * 调用多次Activate表示加入多个任务,任务顺序被执行。 */ void XThread::Activate() { char act[10] = {0}; int len = write(this->notify_send_fd, "c", 1); if (len <= 0) { cerr << "XThread::Activate() failed!" << endl; } cout << "currect thread:" << id << ", notify_send_fd:" << this->notify_send_fd << endl; } /* * 函数名: XThread::AddTask * 作用: 将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中 */ void XThread::AddTask(XTask* task) { if(!task)return; task->base = this->base; tasks_mutex.lock(); tasks.push_back(task); tasks_mutex.unlock(); }
线程类的接口功能
GetInstance() -> 单例模式创建返回唯一对象
Init() -> 创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组
Dispatch() -> 从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务
#pragma once #include <vector> /*线程类声明*/ class XThread; /*任务类声明*/ class XTask; /*线程池类*/ class XThreadPool { public: //单例模式创建返回唯一对象 static XThreadPool* GetInstance(); //初始化所有线程并启动线程 void Init(int threadCount); //分发线程 void Dispatch(XTask* task); private: //将构造函数的访问属性设置为 private //将构造函数构造声明成私有不使用 //声明成私有不使用 XThreadPool(){} //无参构造 XThreadPool(const XThreadPool&); //拷贝构造 XThreadPool& operator= (const XThreadPool&); //赋值运算符重载 //线程数量 int threadCount = 0; //用来标记下一个使用的线程号 int lastThread = -1; //线程对象数组 std::vector<XThread *> threads; //线程池对象 static XThreadPool* pInstance; };
#include "XThreadPool.h" #include "XThread.h" #include <thread> #include <iostream> //#include <chrono> using namespace std; //静态成员变量类外初始化 XThreadPool* XThreadPool::pInstance = NULL; /* * 函数名: XThreadPool::GetInstance * 作用: 单例模式创建返回唯一对象 */ XThreadPool* XThreadPool::GetInstance() { //当需要使用对象时,访问instance 的值 //空值:创建对象,并用instance 标记 //非空值: 返回instance 标记的对象 if( pInstance == NULL ) { pInstance = new XThreadPool(); } return pInstance; } /* * 函数名: XThreadPool::Init * 作用: 初始化所有线程并启动线程 * 解释: 创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组 */ void XThreadPool::Init(int threadCount) { this->threadCount = threadCount; this->lastThread = -1; for (int i = 0; i < threadCount; i++) { XThread *t = new XThread(); t->id = i + 1; cout << "Create thread " << i << endl; //启动线程 t->Start(); threads.push_back(t); this_thread::sleep_for(std::chrono::microseconds(10)); } } /* * 函数名: XThreadPool::Dispatch * 作用: 分发线程 * 解释: 从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务。 */ void XThreadPool::Dispatch(XTask* task) { //轮询 if(!task)return; int tid = (lastThread + 1) % threadCount; lastThread = tid; cout << "lastThread:" << lastThread << endl; XThread *XTh = threads[tid]; //添加任务 XTh->AddTask(task); //线程激活 XTh->Activate(); }
#pragma once
#include <iostream>
class XTask
{
public:
//事件处理器对象
struct event_base* base = NULL;
//客户端连接的socket
int sock = 0;
//初始化任务 纯虚函数
virtual bool Init() = 0;
};
线程类的接口功能
Init() -> 初始化任务,注册当前socket的读事件和超时事件,绑定回调函数
ReadCB() -> 读事件回调函数
EventCB() -> 客户端超时未发请求,断开连接退出任务
#pragma once
#include "XTask.h"
class XFtpServerCMD : public XTask
{
public:
//初始化任务
virtual bool Init();
XFtpServerCMD();
~XFtpServerCMD();
};
#include "XFtpServerCMD.h" #include <event2/event.h> #include <event2/bufferevent.h> #include <iostream> #include <string.h> using namespace std; /* * 函数名: EventCB * 作用: 超时事件回调函数 * 解释: 客户端超时未发请求,断开连接退出任务 */ void EventCB(struct bufferevent *bev, short what, void *arg) { XFtpServerCMD* cmd = (XFtpServerCMD*)arg; //如果对方网络断掉,或者机器死机有可能收不到BEV_EVENT_EOF数据 if(what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR |BEV_EVENT_TIMEOUT" << endl; bufferevent_free(bev); delete cmd; } } /* * 函数名: ReadCB * 作用: 读事件回调函数 */ void ReadCB(struct bufferevent *bev, void *arg) { XFtpServerCMD* cmd = (XFtpServerCMD*)arg; char data[1024] = {0}; for (;;) { int len = bufferevent_read(bev, data, sizeof(data)-1); if(len <= 0)break; data[len] = '\0'; cout << data << endl << flush; //测试代码,要清理掉 if(strstr(data, "quit")) { bufferevent_free(bev); delete cmd; break; } } } /* * 函数名: XFtpServerCMD::Init * 作用: 初始化任务 * 解释: 初始化任务,注册当前socket的读事件和超时事件,绑定回调函数。 */ bool XFtpServerCMD::Init() { cout << "XFtpServerCMD::Init() sock:" << sock << endl; //监听socket bufferevent // base socket bufferevent* bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, ReadCB, 0 ,EventCB, this); bufferevent_enable(bev, EV_READ | EV_WRITE); //添加超时 timeval rt = {10, 0}; //10秒 bufferevent_set_timeouts(bev, &rt, 0); //设置读超时回调函数 return true; } XFtpServerCMD::XFtpServerCMD() { } XFtpServerCMD::~XFtpServerCMD() { }
#include <event2/event.h> #include <event2/listener.h> #include <string.h> #include "XThreadPool.h" #include <signal.h> #include <iostream> #include "XFtpServerCMD.h" using namespace std; #define SPORT 5001 /* * 函数名: listen_cb * 作用: 接收到连接的回调函数 * 解释: 通过多态来创建任务对象,将当前socket保存到任务对象中,分发任务执行 */ void listen_cb(struct evconnlistener *e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg) { cout << "listen_cb" << endl; XTask* task = new XFtpServerCMD(); task->sock = s; XThreadPool::GetInstance()->Dispatch(task); } int main() { //忽略管道信号,发送数据给已关闭的socket if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) return 1; //1 初始化线程池 XThreadPool::GetInstance()->Init(5); std::cout << "test thread pool!\n"; //创建libevent的上下文 event_base* base = event_base_new(); if (base) { cout << "event_base_new success!" << endl; } //监听端口 //socket ,bind,listen 绑定事件 sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(SPORT); evconnlistener* ev = evconnlistener_new_bind(base, // libevent的上下文 listen_cb, //接收到连接的回调函数 base, //回调函数获取的参数 arg LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, //地址重用,evconnlistener关闭同时关闭socket 10, //连接队列大小,对应listen函数 (sockaddr*)&sin, //绑定的地址和端口 sizeof(sin) ); //事件分发处理 if(base) event_base_dispatch(base); if(ev) evconnlistener_free(ev); if(base) event_base_free(base); return 0; }
初始化线程池,创建5个线程,通过telnet和网络调试软件模拟客户端的接入,客户端发送信息服务器打印出来,当客户端超时未发请求,断开连接退出任务。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。