赞
踩
目录
进程间通信的目的
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
进程间通信的必要性本质以及技术背景
进程间通信的必要性,单进程的,那么也就无法使用并发能力,更加无法实现多进程协同。传输数据,同步执行流,消息通知等,所以进程通信不是目的,而是手段。
进程间通信的技术背景
1.进程是具有独立性的。虚拟地址空间+页表保证进程运行的独立性(进程内核数据结构+进程的代码和数据)。
2.通信成本会比较高。
进程间通信的本质理解
1.进程间通信的前提,首先需要让不同的进程看到同一块"内存"(特定的结构组织的)
2.所以你所谓的进程看到同一块"内存",属于哪一个进程呢?不能隶属于任何一个进程,而应该更强调共享。
管道
管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
管道又分为两种,匿名管道和命名管道。
管道的原理:
这个就叫做管道,分别以读写的方式打开一个文件,fork()创建子进程,双方关闭自己不需要的文件描述符,管道就是文件,两个进程通过文件的方式进行通信。
匿名管道
#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码
其实从上面这张都就可以看出管道是怎们样的,接下来就用代码来实现一下。
- #include <iostream>
- #include <string>
- #include <cstdio>
- #include <cstring>
- #include <assert.h>
- #include <unistd.h>
- #include <sys/types.h>
- #include <sys/wait.h>
-
- using namespace std;
-
- // 为什么不定义全局buffer来进行通信呢?? 因为有写时拷贝的存在,无法更改通信!
-
- int main()
- {
- // 1. 创建管道
- int pipefd[2] = {0};
- int n = pipe(pipefd);
- assert(n == 0);
- (void)n;
- #ifdef DEBUG
- cout << "pipefd[0]: " << pipefd[0] << endl; // 3
- cout << "pipefd[1]: " << pipefd[1] << endl; // 4
- #endif
- // 2. 创建子进程
- pid_t id = fork();
- assert(id != -1);
- if (id == 0)
- {
- // 子进程 - 读
- // 3. 构建单向通信的信道,父进程写入,子进程读取
- // 3.1 关闭子进程不需要的fd
- close(pipefd[1]);
- char buffer[1024 * 8];
- while (true)
- {
- // sleep(20);
- // 写入的一方,fd没有关闭,如果有数据,就读,没有数据就等
- // 写入的一方,fd关闭, 读取的一方,read会返回0,表示读到了文件的结尾!
- ssize_t s = read(pipefd[0], buffer, sizeof(buffer) - 1);
- if (s > 0)
- {
- buffer[s] = '\0';
- cout << "child get a message[" << getpid() << "] Father# " << buffer << endl;
- }
- else if (s == 0)
- {
- cout << "writer quit(father), me quit!!!" << endl;
- break;
- }
- }
- exit(1);
- }
- // 父进程 - 写
- // 3. 构建单向通信的信道
- // 3.1 关闭父进程不需要的fd
- close(pipefd[0]);
- string message = "我是父进程,我正在给你发消息";
- char buffer[1024 * 8];
- int count = 0;
- char send_buffer[1024 * 8];
- while (true)
- {
- // 3.2 构建一个变化的字符串
- snprintf(send_buffer, sizeof(send_buffer), "%s[%d] : %d",
- message.c_str(), getpid(), count++);
- // 3.3 写入
- write(pipefd[1], send_buffer, strlen(send_buffer));
- // 3.4 故意sleep
- sleep(1);
- cout << count << endl;
- if (count == 5)
- {
- cout << "writer quit(father)" << endl;
- break;
- }
- }
- close(pipefd[1]);
- pid_t ret = waitpid(id, nullptr, 0);
- cout << "id : " << id << " ret: " << ret << endl;
- assert(ret > 0);
- (void)ret;
-
- return 0;
- }
来讲一下这个代码的大致方向,第一部肯定是创建管道,然后就是创建子进程,这里我们是让父进程写子进程读,所以关闭父子进程不需要的fd,写入的一方,fd没有关闭,如果有数据,就读,没有数据就等,写入的一方,fd关闭, 读取的一方,read会返回0,表示读到了文件的结尾,最后关闭父子进程fd,回收子进程。
其从上面的代码我们就可以总结一下管道的特点。总结管道的特点,理解以前的管道 | ,管道是一个文件–读取–具有访问控制,显示器也是一个文件,父子同时往显示器写入的时候,有没有说一个会等另一个的情况呢,缺乏访问控制。
1.管道是用来进行具有血缘关系的进程进性进程间通信-- 常用于父子通信
2.管道具有通过让进程间协同,提供了访问控制
3.管道提供的是面向流式的通信服务--面向字节流--_协议
4.管道是基于文件的,文件的生命周期是随进程的,管道的生命周期是随进程的
5.管道是单向通信的,就是半双工通信的一种特殊情况
6.写快,读慢,写满不能在写了
7.写慢,读快,管道没有数据的时候,读必须等待
8.写关,读0,标识读到了文件结尾
9.读关,写继续写,oS终止写进程
扩展
分发多个任务的管理器
代码:
- //.hpp
- //#pragma once
-
- #include <iostream>
- #include <string>
- #include <vector>
- #include <unordered_map>
- #include <unistd.h>
- #include <functional>
-
- typedef std::function<void()> func;
-
- std::vector<func> callbacks;
- std::unordered_map<int, std::string> desc;
-
- void readMySQL()
- {
- std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl;
- }
-
- void execuleUrl()
- {
- std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl;
- }
-
- void cal()
- {
- std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl;
- }
-
- void save()
- {
- std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl;
- }
-
- void load()
- {
- desc.insert({callbacks.size(), "readMySQL: 读取数据库"});
- callbacks.push_back(readMySQL);
-
- desc.insert({callbacks.size(), "execuleUrl: 进行url解析"});
- callbacks.push_back(execuleUrl);
-
- desc.insert({callbacks.size(), "cal: 进行加密计算"});
- callbacks.push_back(cal);
-
- desc.insert({callbacks.size(), "save: 进行数据的文件保存"});
- callbacks.push_back(save);
- }
-
- void showHandler()
- {
- for(const auto &iter : desc )
- {
- std::cout << iter.first << "\t" << iter.second << std::endl;
- }
- }
-
- int handlerSize()
- {
- return callbacks.size();
- }
- //.cc
- #include <iostream>
- #include <vector>
- #include <cstdlib>
- #include <ctime>
- #include <cassert>
- #include <unistd.h>
- #include <sys/wait.h>
- #include <sys/types.h>
- #include "Task.hpp"
-
- #define PROCESS_NUM 5
-
- using namespace std;
-
- int waitCommand(int waitFd, bool &quit) // 如果对方不发,我们就阻塞
- {
- uint32_t command = 0;
- ssize_t s = read(waitFd, &command, sizeof(command));
- if (s == 0)
- {
- quit = true;
- return -1;
- }
- assert(s == sizeof(uint32_t));
- return command;
- }
-
- void sendAndWakeup(pid_t who, int fd, uint32_t command)
- {
- write(fd, &command, sizeof(command));
- cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl;
- }
-
- int main()
- {
- // 代码中关于fd的处理,有一个小问题,不影响我们使用,但是你能找到吗??
- load();
- // pid: pipefd
- vector<pair<pid_t, int>> slots;
- // 先创建多个进程
- for (int i = 0; i < PROCESS_NUM; i++)
- {
- int pipefd[2] = {0};
- int n = pipe(pipefd);
- assert(n == 0);
- (void)n;
- pid_t id = fork();
- assert(id != -1);
- if (id == 0)
- {
- close(pipefd[1]);
- while (true)
- {
- // pipefd[0]
- // 等命令
- bool quit = false;
- int command = waitCommand(pipefd[0], quit); // 如果对方不发,我们就阻塞
- if (quit)
- break;
- // 执行对应的命令
- if (command >= 0 && command < handlerSize())
- {
- callbacks[command]();
- }
- else
- {
- cout << "非法command: " << command << endl;
- }
- }
- exit(0);
- }
- // father,进行写入,关闭读端
- close(pipefd[0]); // pipefd[1]
- slots.push_back(pair<pid_t, int>(id, pipefd[1]));
- }
- // 父进程派发任务
- srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机
- while (true)
- {
- // 选择一个任务, 如果任务是从网络里面来的?
- int command = rand() % handlerSize();
- // 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡
- int choice = rand() % slots.size();
- // 把任务给指定的进程
- sendAndWakeup(slots[choice].first, slots[choice].second, command);
- sleep(1);
- // int select;
- // int command;
- // cout << "############################################" << endl;
- // cout << "# 1. show funcitons 2.send command #" << endl;
- // cout << "############################################" << endl;
- // cout << "Please Select> ";
- // cin >> select;
- // if (select == 1)
- // showHandler();
- // else if (select == 2)
- // {
- // cout << "Enter Your Command> ";
- // // 选择任务
- // cin >> command;
- // // 选择进程
- // int choice = rand() % slots.size();
- // // 把任务给指定的进程
- // sendAndWakeup(slots[choice].first, slots[choice].second, command);
- // }
- // else
- // {
- // }
- }
- // 关闭fd, 所有的子进程都会退出
- for (const auto &slot : slots)
- {
- close(slot.second);
- }
-
- // 回收所有的子进程信息
- for (const auto &slot : slots)
- {
- waitpid(slot.first, nullptr, 0);
- }
- }
命名管道
命名管道可以从命令行上创建,命令行方法是使用下面这个命令 mkfifo filename 删除管道文件:unlink filename
命名管道也可以从程序里创建,相关函数是:
匿名管道与命名管道的区别
匿名管道由pipe函数创建并打开。
命名管道由mkfifo函数创建,打开用open
FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义
接下来同样用一段代码来演示一下:
- //Makefile
- .PHONY:all
- all:client mutiServer
-
- client:client.cxx
- g++ -o $@ $^ -std=c++11
- mutiServer:server.cxx
- g++ -o $@ $^ -std=c++11
-
- .PHONY:clean
- clean:
- rm -f client mutiServer
- //client.cxx
- #include "comm.hpp"
-
- int main()
- {
- // 1. 获取管道文件
- int fd = open(ipcPath.c_str(), O_WRONLY);
- if(fd < 0)
- {
- perror("open");
- exit(1);
- }
-
- // 2. ipc过程
- string buffer;
- while(true)
- {
- cout << "Please Enter Message Line :> ";
- std::getline(std::cin, buffer);
- write(fd, buffer.c_str(), buffer.size());
- }
-
- // 3. 关闭
- close(fd);
- return 0;
- }
- //log.hpp
- #ifndef _LOG_H_
- #define _LOG_H_
-
- #include <iostream>
- #include <ctime>
-
- #define Debug 0
- #define Notice 1
- #define Warning 2
- #define Error 3
-
-
- const std::string msg[] = {
- "Debug",
- "Notice",
- "Warning",
- "Error"
- };
-
- std::ostream &Log(std::string message, int level)
- {
- std::cout << " | " << (unsigned)time(nullptr) << " | " << msg[level] << " | " << message;
- return std::cout;
- }
-
-
- #endif
- //comm.hpp
- #ifndef _COMM_H_
- #define _COMM_H_
-
- #include <iostream>
- #include <string>
- #include <cstdio>
- #include <cstring>
- #include <unistd.h>
- #include <sys/types.h>
- #include <sys/stat.h>
- #include <fcntl.h>
- #include "Log.hpp"
-
- using namespace std;
-
- #define MODE 0666
- #define SIZE 128
-
- string ipcPath = "./fifo.ipc";
-
-
- #endif
- //server.cxx
- #include "comm.hpp"
- #include <sys/wait.h>
-
- static void getMessage(int fd)
- {
- char buffer[SIZE];
- while (true)
- {
- memset(buffer, '\0', sizeof(buffer));
- ssize_t s = read(fd, buffer, sizeof(buffer) - 1);
- if (s > 0)
- {
- cout <<"[" << getpid() << "] "<< "client say> " << buffer << endl;
- }
- else if (s == 0)
- {
- // end of file
- cerr <<"[" << getpid() << "] " << "read end of file, clien quit, server quit too!" << endl;
- break;
- }
- else
- {
- // read error
- perror("read");
- break;
- }
- }
- }
-
- int main()
- {
- // 1. 创建管道文件
- if (mkfifo(ipcPath.c_str(), MODE) < 0)
- {
- perror("mkfifo");
- exit(1);
- }
-
- Log("创建管道文件成功", Debug) << " step 1" << endl;
-
- // 2. 正常的文件操作
- int fd = open(ipcPath.c_str(), O_RDONLY);
- if (fd < 0)
- {
- perror("open");
- exit(2);
- }
- Log("打开管道文件成功", Debug) << " step 2" << endl;
-
- int nums = 3;
- for (int i = 0; i < nums; i++)
- {
- pid_t id = fork();
- if (id == 0)
- {
- // 3. 编写正常的通信代码了
- getMessage(fd);
- exit(1);
- }
- }
- for(int i = 0; i < nums; i++)
- {
- waitpid(-1, nullptr, 0);
- }
- // 4. 关闭文件
- close(fd);
- Log("关闭管道文件成功", Debug) << " step 3" << endl;
-
- unlink(ipcPath.c_str()); // 通信完毕,就删除文件
- Log("删除管道文件成功", Debug) << " step 4" << endl;
-
- return 0;
- }
上面的代码其实就是实现server进程和client进程间的通信,其实从这些代码也可以看出来,匿名管道适用于父子进程间的通信,命名管道用于两个毫不相关的进程间通信。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。