赞
踩
进程池是一种并发编程模式,用于管理和重用多个处理任务的进程。它通常用于需要频繁创建和销毁进程的情况,以避免因此产生的开销。
进程池的优点包括:
- 减少进程创建销毁的开销:避免频繁创建和销毁进程所带来的系统资源开销。
- 提高系统响应速度:由于进程已经初始化并且一直保持在内存中,可以立即分配执行任务,减少了任务等待时间。
- 控制资源使用:通过限制进程池中的进程数量,可以控制系统资源的使用情况,避免资源过度消耗。
预先创建一些空闲进程,管理进程会把工作分发到空闲进程来处理,空闲进程处理结束后,通知管理进程。
管理进程需要将任务发给空闲的工作进程,这里就涉及到进程之间的通信,进程间的通信有以下三种方式,我们之前都做了详细的解释
管道:https://blog.csdn.net/weixin_43903639/article/details/138155634?spm=1001.2014.3001.5501
消息队列:https://blog.csdn.net/weixin_43903639/article/details/138155723?spm=1001.2014.3001.5501
共享内存:https://blog.csdn.net/weixin_43903639/article/details/138189200?spm=1001.2014.3001.5501
对于一个进程池,我们需要维护一个进程队列,如果进程在忙就等待,如果进程空闲,那么就给空闲进程发任务让进程去处理。
我们这里采用了有名管道的方式。实际上使用匿名管道是一样的。为了方便管理,我们这里创建了一个Fifo的类,通过类将管道视为一个个的对象。
// 权限 #define Mode 0666 // 文件地址 #define Path "./default" class Fifo { public: Fifo(string path = Path, int mode = Mode) : _path(path), _mode(mode) { int return_mkfifo_val = mkfifo(_path.c_str(), _mode); if (return_mkfifo_val < 0) { cout << "mkfifo error:" << errno << " reason :" << strerror(errno) << endl; exit(1); } cout << "mkfifo success" << endl; } ~Fifo() { int return_unlink_val = unlink(_path.c_str()); if (return_unlink_val < 0) { cout << "unlink error:" << errno << " reason :" << strerror(errno) << endl; } cout << "unlink namepipe success" << endl; } private: string _path; int _mode; };
管道的默认权限是 0666
,管道的默认文件地址是 ./default
,这样管理的优势是便于管道的创建与销毁。
我们将一个进程也视为一个对象,那么一个进程就需要以下的元素
fd0
: 管道,通过这个管道接收主进程的数据fd1
:管道,通过这个管道给主进程发数据pid
:子进程的pidisbusy
:此子进程是否在忙class Process { public: Process(int fd0, int fd1, pid_t process_pid) : _fd0(fd0), _fd1(fd1), _process_pid(process_pid), _isbusy(false) {} ~Process() {} // 获取父进程要写入的管道 int get_fd0() { return _fd0; } // 获取子进程要写入的管道 int get_fd1() { return _fd1; } // 获取子进程pid pid_t get_process_pid() { return _process_pid; } // 获取是否忙碌标志位 bool get_isbusy() { return _isbusy; } // 修改标志为 void set_isbusy(bool flag) { _isbusy = flag; } private: int _fd0; // 父进程要写入的管道 int _fd1; // 父进程要读入的管道 pid_t _process_pid; // 子进程pid bool _isbusy; // 是否忙碌标志位 };
进程池就是同时管理管道和进程的。其中包含多个进程对象,每个进程对象又要包含两个管道。
vector<string> pipe0
主进程写入,子进程读的管道名vector<string> pipe1
子进程写入,主进程读的管道名vector<Fifo *> fifo0
主进程写入,子进程读的管道vector<Fifo *> fifo1
子进程写入,主进程读的管道vector<Process> _processpool
进程池管理的进程int _processnum
进程池管理的进程数量// 进程池 class ProcessPool { public: ProcessPool(int processnum) : _processnum(processnum) {} ~ProcessPool() { for (int i = 0; i < _processnum; i++) { delete fifo0[i]; delete fifo1[i]; } // 要释放所有的子进程 for (int i = 0; i<_processnum; i++) { // 通知子进程结束,通知失败的话直接杀死子进程 if (kill(_processpool[i].get_process_pid(), SIGTERM) != 0) { // 杀死子进程 kill(_processpool[i].get_process_pid(), SIGUSR1); } } } // 生成管道的名字 void makePipeName() { pipe0.clear(); pipe1.clear(); for (int i = 0; i < _processnum; i++) { string s0; s0 += "pipe0_" + to_string(i + 1); pipe0.push_back(s0); string s1; s1 += "pipe1_" + to_string(i + 1); pipe1.push_back(s1); } } // 创建进程池,接收一个参数的函数指针 void CreateProcessPool(work_t work = worker) { for (int i = 0; i < _processnum; i++) { // 创建命名管道 fifo0.push_back(new Fifo(pipe0[i])); fifo1.push_back(new Fifo(pipe1[i])); int id = fork(); if (id == 0) { // 子进程 int fd0 = open(pipe0[i].c_str(), O_RDONLY); int fd1 = open(pipe1[i].c_str(), O_WRONLY); work(fd0, fd1); exit(0); } // 父进程打开管道,未发送任务 int fd0 = open(pipe0[i].c_str(), O_WRONLY); int fd1 = open(pipe1[i].c_str(), O_RDONLY | O_NONBLOCK); // 设置为非阻塞模式 fcntl(fd1, F_SETFL, O_NONBLOCK); _processpool.push_back({fd0, fd1, id}); } } // 找到空闲进程返回进程在_processpool中的序号,没找到返回-1 int getAChannal() { for (int i = 0; i < _processnum; i++) { if (_processpool[i].get_isbusy() == false) { _processpool[i].set_isbusy(true); return i; } } return -1; } // 发送任务,任务就是数据 void SendTask(char *data, int datasize) { // 随机选中管道 int luck_process = getAChannal(); while (luck_process == -1) { // 看一下哪些进程是空闲的。每个进程结束后都会发送自己的pid。 for (int i=0; i<_processnum; i++) { pid_t pid = 0; ssize_t bytes_read = read(_processpool[i].get_fd1(), &pid, sizeof(pid_t)); if (bytes_read == -1) continue; // 哪个进程返回了数据,就认为Ta结束了。 else { _processpool[i].set_isbusy(false); } } luck_process = getAChannal(); } // 父进程向管道发送任务,发送任务就是向相应的管道写入数据 write(_processpool[luck_process].get_fd0(), data, datasize); } static void worker(int fd0, int fd1) { char buf[BUFSIZ]; while (1) { int read_return_value = read(fd0, buf, BUFSIZ); // 处理读取到的数据 if (read_return_value > 0) { cout << "my data is : " << buf << " my pid is : " << getpid() << endl; // 模拟处理读取到的数据 sleep(1); } // 向消息队列传递自己的pid,表示已经完成任务 pid_t pid = getpid(); write(fd1, &pid, sizeof(pid_t)); } } private: int _processnum; vector<string> pipe0; // 父进程写入,子进程读 vector<string> pipe1; // 子进程写入,父进程读 vector<Fifo *> fifo0; vector<Fifo *> fifo1; vector<Process> _processpool; };
这里我们的工作函数是默认的工作函数,也就是打印传入的数据。
#include "ProcessPool.h" #include <iostream> int main(int argc, char* argv[]) { ProcessPool* processpool = new ProcessPool(5); processpool->makePipeName(); processpool->CreateProcessPool(); char buf[3]; for (int i=0;i<20;i++) { sprintf(buf,"%d",i+100); processpool->SendTask(buf,3); } delete processpool; return 0; }
看一下仿真结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。