赞
踩
今天我们来学习一下管道的应用——进程池。如果有没看过上一篇管道的小伙伴可以点击这里:
我们首先要了解一下池化技术:
池化技术(Pooling)在计算机技术中是一种常见的设计模型,主要用于优化资源使用和提高性能。其核心理念是提前保存并维护大量资源在一个特定的“池子”中,以备不时之需以及重复使用。这样可以显著减少资源创建和销毁的开销,从而提高系统的响应速度和效率。
池化技术的主类型
线程池:线程池类似于操作系统中的缓冲区概念。它预先创建并管理一定数量的线程,这些线程在初始状态下都处于睡眠状态。当有新任务或请求到来时,线程池会唤醒一个睡眠线程来处理该任务,处理完成后线程再次进入睡眠状态。这样可以避免频繁地创建和销毁线程,从而提高性能。
内存池:内存池用于管理内存资源。由于分配和释放内存涉及到系统调用,这会导致程序从用户态切换到内核态,是一个相对耗时的操作。内存池通过预先分配一定大小的内存块并统一管理,可以显著减少内存分配和释放的开销。
数据库连接池:数据库连接池用于管理数据库连接。由于创建和关闭数据库连接是一个相对耗时的操作,数据库连接池通过预先创建并管理一定数量的数据库连接,可以显著提高数据库访问的性能。
对象池:对象池是一种常见的对象缓存手段。它预先创建并管理一定数量的对象,当需要使用对象时,直接从对象池中取出而不是重新创建。这样可以减少对象创建和销毁的开销,提高对象的访问性能。
池化技术的优点主要包括:
提高资源使用效率:通过复用已有的资源,减少了频繁创建和销毁资源的开销。
降低系统资源消耗:通过统一管理资源,可以更好地控制资源的使用,避免资源的浪费。
提高系统性能:通过减少资源创建和销毁的开销,以及优化资源的使用,可以提高系统的响应速度和性能。
然而,池化技术也需要注意一些问题,如资源的管理和维护、资源的复用策略、资源的生命周期管理等。此外,不同的池化技术需要根据具体的应用场景和需求来选择和使用。
简单一点来说,就是“未雨绸缪”,计算的池化技术就是当处理某些事务的时候,先把对应的资源先准备好,到时候可以直接上手处理事务,省下了开销资源的时间。
我们今天要做的,是写一个进程池,就是提前先创建好一批进程,等到有任务来的时候,直接可以处理任务:
我们首先把架子搭好:
#include<iostream> #include<unistd.h> const int num = 5; using namespace std; #include<cassert> int main() { //创建多个子进程 for(int i = 0; i < num; i++) { //创建管道 int pipefd[2]; int n = pipe(pipefd); //检查是否创建管道成功 assert(n == 0); //创建父子进程 pid_t id = fork(); if(id == 0) //子进程 { //关闭写端 close(pipefd[1]) } //父进程 //关闭读端 close(pipefd[0]); } }
现在我们创建好了进程,但是有个问题,我们并不知道什么时候该往哪个进程发配任务,现在我们的主进程跟我们创建的进程没有任何的关系,这个时候,我们就要用信道:
通过信道(本质上也是一种管道),我们主进程就知道该往哪个进程发配任务了。我们可以创建一个类对它进行管理:
#include<iostream> #include<unistd.h> #include<cstring> #include<vector> const int num = 5; static int channel_number = 1; //信道起始数量 using namespace std; #include<cassert> class channel { public: channel(int fd,pid_t id) :ctrlfd(fd) ,workid(id) { name = "channel->" + to_string(channel_number++); } int ctrlfd; //读写端的fd pid_t workid; //子进程id string name; //管道名字 }; int main() { vector<channel> channels; //信道 //创建多个子进程 for(int i = 0; i < num; i++) { //创建管道 int pipefd[2]; int n = pipe(pipefd); //检查是否创建管道成功 assert(n == 0); //创建父子进程 pid_t id = fork(); if(id == 0) //子进程 { //关闭写端 close(pipefd[1]) exit(0); } //父进程 //关闭读端 close(pipefd[0]); channels.push_back(channel(pipefd[1],id)); //往信道写入 } }
然后我们把创建信道的过程抽象出来形成一个函数:
#include<iostream> #include<unistd.h> #include<cstring> #include<vector> const int num = 5; static int channel_number = 1; //信道起始数量 using namespace std; #include<cassert> class channel { public: channel(int fd,pid_t id) :ctrlfd(fd) ,workid(id) { name = "channel->" + to_string(channel_number++); } int ctrlfd; //读写端的fd pid_t workid; //子进程id string name; //管道名字 }; void CreateChannel( vector<channel> *channels) { //创建多个子进程 for(int i = 0; i < num; i++) { //创建管道 int pipefd[2]; int n = pipe(pipefd); //检查是否创建管道成功 assert(n == 0); //创建父子进程 pid_t id = fork(); if(id == 0) //子进程 { //关闭写端 close(pipefd[1]) exit(0); } //父进程 //关闭读端 close(pipefd[0]); channels->push_back(channel(pipefd[1],id)); //往信道写入 } } int main() { vector<channel> channels; //信道 //创建信道 CreateChannel(&channels); }
这里我们规范一下传参方式:
传参形式:
- 输入参数:const &
- 输出参数:*
- 输入输出参数:&
我们创建一个函数来表示子进程的工作:
void Work()
{
while(true)
{
cout<< "I am running "<< getpid() << endl;
sleep(1);
}
}
#include<iostream> #include<unistd.h> #include<cstring> #include<vector> const int num = 5; static int channel_number = 1; //信道起始数量 using namespace std; #include<cassert> class channel { public: channel(int fd,pid_t id) :ctrlfd(fd) ,workid(id) { name = "channel->" + to_string(channel_number++); } int ctrlfd; //读写端的fd pid_t workid; //子进程id string name; //管道名字 }; void Work() { while(true) { cout<< "I am running "<< getpid() << endl; sleep(1); } } void CreateChannel( vector<channel> *channels) //创建信道 { //创建多个子进程 for(int i = 0; i < num; i++) { //创建管道 int pipefd[2]; int n = pipe(pipefd); //检查是否创建管道成功 assert(n == 0); //创建父子进程 pid_t id = fork(); if(id == 0) //子进程 { //关闭写端 close(pipefd[1]); //子进程要完成的工作 Work(); exit(0); } //父进程 //关闭读端 close(pipefd[0]); channels->push_back(channel(pipefd[1],id)); //往信道写入 } } //测试 void PrintChannel(const vector<channel> &channels) //输入型参数 { for(auto e: channels) { cout<<e.name<<", "<<e.ctrlfd<<", "<<e.workid<<endl; } } int main() { vector<channel> channels; //信道 //创建信道 CreateChannel(&channels); PrintChannel(channels); sleep(10); return 0; }
我们可以运行一下看看:
此时我们完成了第一步,建立信道。
现在我们建立好了信道,接下来就是接收主进程给我们的任务就可以了,可是子进程如何接收和识别任务呢?我们这里规定:传不同的数字,做不同的任务:
首先,我们这里先重定向,从标准输入读取(省略传参):
if(id == 0) //子进程
{
//关闭写端
close(pipefd[1]);
//子进程要完成的工作
dup2(pipefd[0],0); //重定向,向标准输入读
Work();
exit(0);
}
void Work()
{
while(true)
{
int code = 0; //任务代码
int n = read(0,&code,sizeof(code));
assert(n == sizeof(code));
//要做的任务
}
}
我们可以开一个hpp文件,来模拟我们的任务:
#pragma once #include<iostream> #include<functional> #include<vector> #include <ctime> #include<unistd.h> typedef std::function<void()> task_t; //管理任务 void Download() { std::cout << "I am a Download" << " deal with: " << getpid() << std::endl; } void PrintLog() { std::cout << "I am a log" << " deal with: " << getpid() << std::endl; } void PushVideoStream() { std::cout << "I am a vdieo" << " deal with: " << getpid() << std::endl; } class Init { public: // 任务码,领取相应的任务码,做相应的任务 const static int g_download_code = 0; const static int g_printlog_code = 1; const static int g_push_videostream_code = 2; // 任务集合 std::vector<task_t> tasks; public: Init() { tasks.push_back(Download); tasks.push_back(PrintLog); tasks.push_back(PushVideoStream); srand(time(nullptr) ^ getpid()); } bool CheckSafe(int code) { if (code >= 0 && code < tasks.size()) return true; else return false; } void RunTask(int code) //运行任务 { return tasks[code](); } int SelectTask() //选择任务 { return rand() % tasks.size(); } std::string ToDesc(int code) { switch (code) { case g_download_code: return "Download"; case g_printlog_code: return "PrintLog"; case g_push_videostream_code: return "PushVideoStream"; default: return "Unknow"; } } }; Init init; //创建对象
我们相应文件的变化:
#include<iostream> #include<unistd.h> #include<cstring> #include<vector> const int num = 5; static int channel_number = 1; //信道起始数量 using namespace std; #include<cassert> #include"Task.hpp" class channel { public: channel(int fd,pid_t id) :ctrlfd(fd) ,workid(id) { name = "channel->" + to_string(channel_number++); } int ctrlfd; //读写端的fd pid_t workid; //子进程id string name; //管道名字 }; void Work() { while(true) { int code = 0; //任务代码 int n = read(0,&code,sizeof(code)); assert(n == sizeof(code)); //要做的任务 if(!init.CheckSafe(code)) continue; init.RunTask(code); } } void CreateChannel(vector<channel> *channels) { //创建多个子进程 for(int i = 0; i < num; i++) { //创建管道 int pipefd[2]; int n = pipe(pipefd); //检查是否创建管道成功 assert(n == 0); //创建父子进程 pid_t id = fork(); if(id == 0) //子进程 { //关闭写端 close(pipefd[1]); //子进程要完成的工作 dup2(pipefd[0],0); //重定向,向标准输入读 Work(); exit(0); } //父进程 //关闭读端 close(pipefd[0]); channels->push_back(channel(pipefd[1],id)); //往信道写入 } } void PrintChannel(const vector<channel> &channels) //输入型参数 { for(auto e: channels) { cout<<e.name<<", "<<e.ctrlfd<<", "<<e.workid<<endl; } } void SendCommand(const std::vector<channel> &channels, bool flag, int num = -1) { int pos = 0; while (true) { // 1. 选择任务 int command = init.SelectTask(); // 2. 选择信道(进程) const auto &channel = channels[pos++]; pos %= channels.size(); // debug std::cout << "send command " << init.ToDesc(command) << "[" << command << "]" << " in " << channel.name << " worker is : " << channel.workid << std::endl; // 3. 发送任务 write(channel.ctrlfd, &command, sizeof(command)); // 4. 判断是否要退出 if (!flag) { num--; if (num <= 0) break; } sleep(1); } std::cout << "SendCommand done..." << std::endl; } int main() { vector<channel> channels; //信道 //创建信道 CreateChannel(&channels); //PrintChannel(channels); //选择任务,选择信道 const bool g_always_loop = true; SendCommand(channels, !g_always_loop, 10); // sleep(10); return 0; }
我们可以运行一下:
其实,我们想让进程退出,就只需要关闭写端就可以了。(此时会读到0,表示已经读到了文件末尾)
所以,我们之前写的代码,要稍微修改一下:
int main() { vector<channel> channels; //信道 //创建信道 CreateChannel(&channels); //PrintChannel(channels); //选择任务,选择信道 const bool g_always_loop = true; SendCommand(channels, !g_always_loop, 10); //进程退出,关闭写端 for(const auto &channel : channels) //关闭写端 { close(channel.ctrlfd); } //sleep(10); return 0; }
我们可以把这几行代码封装起来(顺便回收子进程):
void ReleaseChannels(vector<channel> channels) { for (const auto &channel : channels) { close(channel.ctrlfd); } //回收子进程 for(const auto &channel : channels) { pid_t rid = waitpid(channel.workid,nullptr,0); if(rid == channel.workid) { cout<<"wait child: "<<channel.workid<<" success"<<endl; } } }
int main() { vector<channel> channels; //信道 //创建信道 CreateChannel(&channels); //PrintChannel(channels); //选择任务,选择信道 const bool g_always_loop = true; SendCommand(channels, !g_always_loop, 10); //进程退出,关闭写端 ReleaseChannels(channels); //sleep(10); return 0; }
我们可以运行一下:
其实我们之前写的创建管道的代码有一点bug:
void CreateChannel(vector<channel> *channels) { //创建多个子进程 for(int i = 0; i < num; i++) { //创建管道 int pipefd[2]; int n = pipe(pipefd); //检查是否创建管道成功 assert(n == 0); //创建父子进程 pid_t id = fork(); if(id == 0) //子进程 { //关闭写端 close(pipefd[1]); //子进程要完成的工作 dup2(pipefd[0],0); //重定向,向标准输入读 Work(); exit(0); } //父进程 //关闭读端 close(pipefd[0]); channels->push_back(channel(pipefd[1],id)); //往信道写入 } }
现在我们是结束一个进程,回收一个进程,就会有问题:
void ReleaseChannels(vector<channel> channels) { for (const auto &channel : channels) { close(channel.ctrlfd); waitpid(channel.workid,nullptr,0); //关掉一个收一个 } // //回收子进程 // for(const auto &channel : channels) // { // pid_t rid = waitpid(channel.workid,nullptr,0); // if(rid == channel.workid) // { // cout<<"wait child: "<<channel.workid<<" success"<<endl; // } // } }
这个时候,进程会卡死。这是为什么呢?
其实,第一次创建子进程时,是没有啥问题的:
从第二次开始,每次创建的子进程会继承上一个文件描述符表的写端:
这种情况会一直累积,只有最后一个文件只有一个写端。这样会导致我们的信道不会为空,子进程读不到0,不会退出,发生阻塞。
解决方法也很简单,第一种,我们倒着回收:
第二种,在新的子进程中关闭多余的文件描述符,我们要在创建信道那里做一点小改动:
void CreateChannel(vector<channel> *channels) { vector<int> tmp; //临时记录,用来记录老的fd //创建多个子进程 for(int i = 0; i < num; i++) { //创建管道 int pipefd[2]; int n = pipe(pipefd); //检查是否创建管道成功 assert(n == 0); //创建父子进程 pid_t id = fork(); if(id == 0) //子进程 { if(!tmp.empty()) { for(auto fd : tmp) { close(fd); } PrintFd(tmp); } //关闭写端 close(pipefd[1]); //子进程要完成的工作 dup2(pipefd[0],0); //重定向,向标准输入读 Work(); exit(0); } //父进程 //关闭读端 close(pipefd[0]); channels->push_back(channel(pipefd[1],id)); //往信道写入 tmp.push_back(pipefd[1]); //记录老的文件描述符 } }
void PrintFd(const std::vector<int> &fds) //用来打印看看关闭了哪些fd
{
cout << getpid() << " close fds: ";
for(auto fd : fds)
{
cout << fd << " ";
}
cout << endl;
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。