赞
踩
linux下进程通信的方式有很多,共享内存,消息队列,管道等。共享内存可以传输大量数据,但是多个进程同时读取共享内存就会出现脏读,可以借助消息队列实现多进程消息发送和接收。这种组合方式在实际开发中应用还是很多的,接下来就看一下。
目录
int shmget(key_t key, size_t size, int shmflg);
void *shmat(int shmid, const void *shmaddr, int shmflg);
int shmdt(const void *shmaddr);
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
int msgget(key_t key, int flags);
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
ipcs -m / ipcrm -m
ipcs -q / ipcrm -q
ipcs -s / ipcrm -s
接下来通过一个共享内存和两条消息队列来实现进程间互发数据,本在ubuntu环境下使用vscode开发,程序目录结构如下:
消息定义: msgInfo.h
- #pragma once
- #include <time.h>
-
- #define gAvailableMsgBufPos 100
- #define gOccupiedMsgBufPos 101
-
- //共享内存结构
- struct TSharedMemoryModule
- {
- char name[40+1];
- int id;
- char reloadCommand[80+1];
- int index;
- long sizeOfUserSpace;
- unsigned char *removed_puserSpace;
- int users;
- int newCreated;
- int removed_writingLocks;
- int removed_readingLocks;
- int resID;
- };
- typedef TSharedMemoryModule *PTSharedMemoryModule;
-
- //消息队列管理结构
- struct TMsgBufHDL
- {
- long maxSizeOfMsg;
- int maxNumOfMsg;
- int maxStayTime;
- int userID;
- int queueIDOfFreePos;
- int queueIDOfOccupiedPos;
- };
- typedef TMsgBufHDL *PTMsgBufHDL;
-
- //消息头
- struct TMessageHeader
- {
- long type;
- int provider;
- time_t time;
- int len;
- long msgIndex;
- int dealer;
- int forward_cnt;
- short locked;
- };
- typedef TMessageHeader *PTMessageHeader;
-
- // 空闲消息队列元素结构
- struct TFreePosOfMsgBuf
- {
- long statusOfPos;
- char indexOfPos[4];
- };
-
- // 任务消息队列结构
- struct TOccupiedPosOfMsgBuf
- {
- long typeOfMsg;
- char indexOfPos[4];
- };
收据收发头文件:msgExchange.h
- #pragma once
-
- #include "msgInfo.h"
-
- /*
- ================ 消息队列 ====================
- // 创建共享内存
- int shmget(key_t key, size_t size, int shmflg);
- // 挂载
- void *shmat(int shmid, const void *shmaddr, int shmflg);
- // 取消挂载
- int shmdt(const void *shmaddr);
- // 共享内存控制函数-可以删除
- int shmctl(int shmid, int cmd, struct shmid_ds *buf);
- */
-
- /*
- ================ 消息队列 ====================
- // 创建和获取 ipc 内核对象
- int msgget(key_t key, int flags);
- // 将消息发送到消息队列
- int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
- // 从消息队列获取消息
- ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
- // 查看、设置、删除 ipc 内核对象(用法和 shmctl 一样)
- int msgctl(int msqid, int cmd, struct msqid_ds *buf);
- 消息数据格式
- struct Msg{
- long type; // 消息类型。这个是必须的,而且值必须 > 0,这个值被系统使用
- // 消息正文,多少字节随你而定
- };
- */
-
- // 初始化共享内存和消息队列
- int InitSharedMemoryMsgBuf();
-
- // 销毁共享内存和消息队列
- void UnInitSharedMemoryMsgBuf();
-
- //创建消息队列 - 调用创建共享内存
- int CreateMsgBuf();
-
- //创建共享内存
- PTSharedMemoryModule CreateSharedMemoryModule(long sizeOfUserSpace);
-
- //初始化消息队列
- int InitAllMsgIndex();
-
- //从空闲消息队列取位置
- int GetAvailableMsgBufPos();
-
- // 空闲消息队列放入消息
- int FreeMsgBufPos(int index);
-
- // 任务消息队列放入消息
- int OccupyMsgBufPos(int index, long typeOfMsg);
-
- // 任务消息队列取消息
- int GetOccupiedMsgBufPos(long *typeOfMsg);
- // 从任务消息队列获取指定类型的消息
- int GetOccupiedMsgBufPosOfMsgType(long typeOfMsg);
-
- // 发送消息
- int WriteBufferMessage(const unsigned char * msg,
- int lenOfMsg,
- long typeOfMsg,
- const TMessageHeader* poriHeader,
- PTMessageHeader pNewHeader);
-
- // 用指定位置发消息
- int WriteBufferMessageUsingIndex(const unsigned char * msg,
- int lenOfMsg, long typeOfMsg,
- const TMessageHeader* poriHeader,
- PTMessageHeader pNewHeader,
- const int *piIndexOfMsg);
-
- //接收消息 - 释放索引
- int ReadBufferMessagReleaseIndex(unsigned char * msg, int lenOfMsgBuf,
- long typeOfMsg,
- PTMessageHeader pOutputHeader);
-
- //接收消息 - 不释放索引
- int ReadBufferMessageNotReleaseIndex(unsigned char *msg, int lenOfMsgBuf,
- long typeOfMsg,
- PTMessageHeader pOutputHeader,
- int *indexOfMsg, int iSigRetryFlag);
收据收发实现:msgExchange.cpp
- #include <sys/types.h>
- #include <sys/ipc.h>
- #include <sys/msg.h>
- #include <sys/shm.h>
- #include <string.h>
- #include <errno.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include <iostream>
-
- #include "msgExchange.h"
-
- // ipcs -m / ipcrm -m
- // ipcs -q / ipcrm -q
- // ipcs -s / ipcrm -s
-
- // 以下三部分都是从共享内存申请
- static PTSharedMemoryModule s_pSharedMemMDL = NULL;
- static PTMsgBufHDL s_pMsgBufHDL = NULL;
- static unsigned char *s_pMsgBuf = NULL;
-
- //共享内存,消息队列初始化信息
- static TSharedMemoryModule s_sharedMemoryModule;
- static TMsgBufHDL s_msgBufHDL;
-
- static unsigned long gMsgIndex = 1;
-
- #if defined(__GNUC__)
- static unsigned long addAndFetchUlongAtomic(volatile unsigned long *p_var, unsigned long value) {
- return __sync_add_and_fetch(p_var, value);
- }
- #endif
-
- // 初始化共享内存和消息队列
- int InitSharedMemoryMsgBuf()
- {
- s_sharedMemoryModule.id = 1234567;
- s_sharedMemoryModule.newCreated = 1;
- s_sharedMemoryModule.users = 0;
-
- s_msgBufHDL.maxSizeOfMsg = 8192;
- s_msgBufHDL.maxNumOfMsg = 1024;
-
- CreateMsgBuf();
-
- return 0;
- }
-
- void UnInitSharedMemoryMsgBuf() {
- struct shmid_ds buf;
- shmdt(s_pSharedMemMDL);
- shmctl(s_sharedMemoryModule.resID, IPC_RMID, &buf);
- msgctl(s_msgBufHDL.queueIDOfOccupiedPos, IPC_RMID, NULL);
- msgctl(s_msgBufHDL.queueIDOfFreePos, IPC_RMID, NULL);
- }
-
- //创建共享内存
- PTSharedMemoryModule CreateSharedMemoryModule(long sizeOfUserSpace)
- {
- PTSharedMemoryModule pSharedMem = nullptr;
- int ret;
- int resID;
- unsigned char *pmUserSpace = nullptr;
-
- if ((resID = shmget(s_sharedMemoryModule.id, sizeof(TSharedMemoryModule) + sizeOfUserSpace, 0660)) == -1) {
- if ((resID = shmget(s_sharedMemoryModule.id, sizeof(TSharedMemoryModule) + sizeOfUserSpace, IPC_CREAT|0660)) == -1) {
- return nullptr;
- }
- s_sharedMemoryModule.newCreated = 1;
- }
- else {
- s_sharedMemoryModule.newCreated = 0;
- }
-
- std::cout << "shared memory id is :: " << resID << std::endl;
- s_sharedMemoryModule.resID = resID;
-
- if (((pSharedMem = (PTSharedMemoryModule)shmat(resID, 0, SHM_RND)) == NULL) || (pSharedMem == (void*)(-1))) {
- if (errno == 24) {
- pSharedMem = (PTSharedMemoryModule)shmat(resID, 0, SHM_RND);
- }
- if ((pSharedMem == NULL) || (pSharedMem == (void*)(-1))) {
- return nullptr;
- }
- }
-
- if (!s_sharedMemoryModule.newCreated) {
- ++(s_sharedMemoryModule.users);
- s_sharedMemoryModule.newCreated = 0;
- pSharedMem->newCreated = s_sharedMemoryModule.newCreated;
- }
- else {
- memcpy(pSharedMem, &s_sharedMemoryModule, sizeof(s_sharedMemoryModule));
- pSharedMem->users = 1;
- pSharedMem->removed_readingLocks = 0;
- pSharedMem->removed_writingLocks = 0;
- pSharedMem->sizeOfUserSpace = sizeOfUserSpace;
- }
- if (sizeOfUserSpace == 0) {
- pmUserSpace = nullptr;
- }
- else {
- pmUserSpace = (unsigned char *)((unsigned char *)pSharedMem + sizeof(*pSharedMem));
- }
-
- // 清空消息内容
- if ((pSharedMem->newCreated) && (pmUserSpace)) {
- memset(pmUserSpace, 0, sizeOfUserSpace);
- }
-
- return pSharedMem;
- }
-
- //创建消息队列 - 调用创建共享内存
- int CreateMsgBuf() {
- TMsgBufHDL msgBufHDL;
- int ret;
- int newCreatedQueue;
-
- memset(&msgBufHDL,0,sizeof(msgBufHDL));
- msgBufHDL = s_msgBufHDL;
-
- if ((s_pSharedMemMDL = CreateSharedMemoryModule(
- sizeof(*s_pMsgBufHDL) +
- ((sizeof(TMessageHeader) + sizeof(unsigned char) * msgBufHDL.maxSizeOfMsg) * (msgBufHDL.maxNumOfMsg + 1)))) == NULL)
- {
- return -1;
- }
-
- if ((s_pMsgBufHDL = (PTMsgBufHDL)((unsigned char *)s_pSharedMemMDL + sizeof(TSharedMemoryModule))) == NULL)
- {
- return -1;
- }
-
- s_pMsgBuf = (unsigned char *)s_pMsgBufHDL + sizeof(*s_pMsgBufHDL);
-
- if (s_pSharedMemMDL->newCreated) {
- memcpy(s_pMsgBufHDL, &msgBufHDL, sizeof(*s_pMsgBufHDL));
- }
-
- s_pMsgBufHDL->userID = s_pSharedMemMDL->id;
-
- if ((s_pMsgBufHDL->queueIDOfFreePos = msgget(s_pMsgBufHDL->userID, 0660)) == -1)
- {
- if ((s_pMsgBufHDL->queueIDOfFreePos = msgget(s_pMsgBufHDL->userID, 0660 | IPC_CREAT)) == -1)
- {
- return -1;
- }
- else {
- newCreatedQueue = 1;
- }
- }
-
- std::cout << "free msg queue id is :: " << s_pMsgBufHDL->queueIDOfFreePos << std::endl;
- s_msgBufHDL.queueIDOfFreePos = s_pMsgBufHDL->queueIDOfFreePos;
-
- if ((s_pMsgBufHDL->queueIDOfOccupiedPos = msgget(s_pMsgBufHDL->userID + 1,0660)) == -1)
- {
- if ((s_pMsgBufHDL->queueIDOfOccupiedPos = msgget(s_pMsgBufHDL->userID + 1,0660 | IPC_CREAT)) == -1)
- {
- return -1;
- }
- else {
- newCreatedQueue = 1;
- }
-
- }
-
- std::cout << "occupied msg queue id is :: " << s_pMsgBufHDL->queueIDOfOccupiedPos << std::endl;
- s_msgBufHDL.queueIDOfOccupiedPos = s_pMsgBufHDL->queueIDOfOccupiedPos;
-
- if (newCreatedQueue) {
- InitAllMsgIndex();
- }
-
- return 0;
- }
-
-
- //初始化消息队列
- int InitAllMsgIndex() {
- TFreePosOfMsgBuf freePos;
- TOccupiedPosOfMsgBuf occupiedPos;
- PTMessageHeader pheader;
- int ret;
- int index;
-
- while(true) {
- if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfFreePos, &freePos, sizeof(freePos.indexOfPos), 0, IPC_NOWAIT)) < 0) {
- break;
- }
-
- }
-
- while(true) {
- if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfOccupiedPos, &occupiedPos, sizeof(occupiedPos.indexOfPos), 0, IPC_NOWAIT)) < 0) {
- break;
- }
- }
-
- // 位置消息队列中放入共享内存索引编号
- for (index = 0; index < s_pMsgBufHDL->maxNumOfMsg; index++)
- {
- FreeMsgBufPos(index);
- }
-
- return(0);
- }
-
- //从空闲消息队列取位置
- int GetAvailableMsgBufPos()
- {
- TFreePosOfMsgBuf posOfMsgBuf;
- int ret;
- int index;
-
- memset(posOfMsgBuf.indexOfPos, 0, sizeof(posOfMsgBuf.indexOfPos));
- if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfFreePos, &posOfMsgBuf, sizeof(int), 0, 0)) < 0) {
- return -1;
- }
- memcpy(&index, posOfMsgBuf.indexOfPos, sizeof(int));
-
- return index;
- }
-
- // 空闲消息队列放入消息
- int FreeMsgBufPos(int index)
- {
- TFreePosOfMsgBuf posOfMsgBuf;
- int ret;
- PTMessageHeader pheader;
-
- if ((index < 0) || (index >= s_pMsgBufHDL->maxNumOfMsg)) {
- return -1;
- }
-
- pheader = (PTMessageHeader)(s_pMsgBuf + index * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg));
- pheader->type = 0;
-
- posOfMsgBuf.statusOfPos = gAvailableMsgBufPos;
- memcpy(posOfMsgBuf.indexOfPos, &index, sizeof(int));
-
- if (msgsnd(s_pMsgBufHDL->queueIDOfFreePos, &posOfMsgBuf, sizeof(int), 0) < 0) {
- return -1;
- }
- return 0;
- }
-
- // 任务消息队列放入消息
- int OccupyMsgBufPos(int index, long typeOfMsg)
- {
- TOccupiedPosOfMsgBuf posOfMsgBuf;
- int ret;
-
- posOfMsgBuf.typeOfMsg = typeOfMsg;
- memcpy(posOfMsgBuf.indexOfPos, &index, sizeof(int));
-
- if (ret = msgsnd(s_pMsgBufHDL->queueIDOfOccupiedPos, &posOfMsgBuf, sizeof(posOfMsgBuf.indexOfPos), 0)) {
- return -1;
- }
-
- return 0;
- }
-
- // 任务消息队列取消息
- int GetOccupiedMsgBufPos(long *typeOfMsg)
- {
- TOccupiedPosOfMsgBuf posOfMsgBuf;
- int ret;
- int index;
-
- memset(posOfMsgBuf.indexOfPos, 0, sizeof(posOfMsgBuf.indexOfPos));
- if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfOccupiedPos, &posOfMsgBuf, sizeof(int), 0, 0)) < 0) {
- return -1;
- }
- memcpy(&index, posOfMsgBuf.indexOfPos, sizeof(int));
- *typeOfMsg = posOfMsgBuf.typeOfMsg;
-
- return index;
- }
-
- // 从任务消息队列获取指定类型的消息
- int GetOccupiedMsgBufPosOfMsgType(long typeOfMsg)
- {
- TOccupiedPosOfMsgBuf posOfMsgBuf;
- int ret;
- int index;
- int iRetryTime = 0;
-
- memset(posOfMsgBuf.indexOfPos, 0, sizeof(posOfMsgBuf.indexOfPos));
- posOfMsgBuf.typeOfMsg = typeOfMsg;
-
- do {
- ret = msgrcv(s_pMsgBufHDL->queueIDOfOccupiedPos, &posOfMsgBuf, sizeof(int), typeOfMsg, 0);
- iRetryTime ++;
- }while(iRetryTime < 10 && 1 == ret && EINTR == errno);
-
- if (-1 != ret) {
- memcpy(&index, posOfMsgBuf.indexOfPos, sizeof(int));
- return index;
- }
-
- return -1;
- }
-
- // 发送消息
- int WriteBufferMessage(const unsigned char * msg,
- int lenOfMsg,
- long typeOfMsg,
- const TMessageHeader* poriHeader,
- PTMessageHeader pNewHeader)
- {
- int indexOfMsg;
- unsigned char *pAddr;
- int ret;
- PTMessageHeader pheader;
-
-
- if ((indexOfMsg = GetAvailableMsgBufPos()) < 0) {
- return indexOfMsg;
- }
- if ((indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg)) {
- FreeMsgBufPos(indexOfMsg);
- return(-1);
- }
-
- // 得到存放消息的首地址
- pAddr = s_pMsgBuf + indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
- pheader = (PTMessageHeader)pAddr;
- pheader->len = lenOfMsg;
- if (NULL == poriHeader) {
- time(&(pheader->time));
- pheader->provider = getpid();
- pheader->dealer = 0;
- pheader->msgIndex = addAndFetchUlongAtomic(&gMsgIndex, 1);
- pheader->forward_cnt = 0;
- }
- else
- {
- pheader->time = poriHeader->time;
- pheader->provider = poriHeader->provider;
- pheader->dealer = getpid();
- pheader->msgIndex = poriHeader->msgIndex;
- pheader->forward_cnt = poriHeader->forward_cnt;
- }
- pheader->type = typeOfMsg;
- if(pNewHeader) {
- memcpy(pNewHeader, pheader, sizeof(*pNewHeader));
- }
- memcpy(pAddr + sizeof(TMessageHeader), msg, lenOfMsg);
- if ((ret = OccupyMsgBufPos(indexOfMsg, typeOfMsg)) < 0) {
- FreeMsgBufPos(indexOfMsg);
- return ret;
- }
-
- std::cout << "WriteBufferMessage -> indexOfMsg:: " << indexOfMsg << " typeOfMsg::" << typeOfMsg << " pheader->len::" << pheader->len << std::endl;
- return 0;
- }
-
- // 用指定位置发消息
- int WriteBufferMessageUsingIndex(const unsigned char * msg,
- int lenOfMsg, long typeOfMsg,
- const TMessageHeader* poriHeader,
- PTMessageHeader pNewHeader,
- const int *piIndexOfMsg)
- {
- int indexOfMsg = *piIndexOfMsg;
- int ret;
- unsigned char *pAddr;
- PTMessageHeader pheader;
-
- if ((indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg)) {
- return -1;
- }
-
- pAddr = s_pMsgBuf + indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
- pheader = (PTMessageHeader)pAddr;
- pheader->len = lenOfMsg;
- if (poriHeader == NULL)
- {
- time(&(pheader->time));
- pheader->provider = getpid();
- pheader->dealer = 0;
- pheader->msgIndex = addAndFetchUlongAtomic(&gMsgIndex, 1);
- pheader->forward_cnt = 0;
- }
- else
- {
- pheader->time = poriHeader->time;
- pheader->provider = poriHeader->provider;
- pheader->dealer = getpid();
- pheader->msgIndex = poriHeader->msgIndex;
- pheader->forward_cnt = poriHeader->forward_cnt;
- }
- pheader->type = typeOfMsg;
- if(pNewHeader) {
- memcpy(pNewHeader, pheader, sizeof(*pNewHeader));
- }
- memcpy(pAddr + sizeof(TMessageHeader), msg, lenOfMsg);
- if ((ret = OccupyMsgBufPos(indexOfMsg, typeOfMsg)) < 0)
- {
- return ret;
- }
- return(0);
- }
-
- //接收消息 - 释放索引
- int ReadBufferMessagReleaseIndex(unsigned char * msg, int lenOfMsgBuf,
- long typeOfMsg,
- PTMessageHeader pOutputHeader)
- {
- int indexOfMsg;
- unsigned char *pAddr;
- int lenOfMsg;
- PTMessageHeader pheader;
-
- if (((indexOfMsg = GetOccupiedMsgBufPosOfMsgType(typeOfMsg)) < 0) || (indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg))
- {
- return -1;
- }
- pAddr = s_pMsgBuf + indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
- pheader = (PTMessageHeader)pAddr;
- if (((lenOfMsg = pheader->len) < 0) || (lenOfMsg >= lenOfMsgBuf) || (lenOfMsg > s_pMsgBufHDL->maxSizeOfMsg))
- {
- return -1;
- }
- memcpy(msg, pAddr + sizeof(TMessageHeader), lenOfMsg);
- if (pOutputHeader != NULL) {
- memcpy(pOutputHeader, pheader, sizeof(*pheader));
- }
-
- FreeMsgBufPos(indexOfMsg);
-
- std::cout << "ReadBufferMessagReleaseIndex -> indexOfMsg:: " << indexOfMsg << " typeOfMsg::" << typeOfMsg << " pheader->len::" << pheader->len << std::endl;
-
- return(lenOfMsg);
- }
-
-
- //接收消息 - 不释放索引
- int ReadBufferMessageNotReleaseIndex(unsigned char *msg, int lenOfMsgBuf,
- long typeOfMsg,
- PTMessageHeader pOutputHeader,
- int *indexOfMsg, int iSigRetryFlag)
- {
- unsigned char *pAddr;
- int lenOfMsg;
- PTMessageHeader pheader;
-
- if (((*indexOfMsg = GetOccupiedMsgBufPosOfMsgType(typeOfMsg)) < 0) || (*indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg))
- {
- return -1;
- }
- pAddr = s_pMsgBuf + *indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
- pheader = (PTMessageHeader)pAddr;
- if (((lenOfMsg = pheader->len) < 0) || (lenOfMsg >= lenOfMsgBuf) || (lenOfMsg > s_pMsgBufHDL->maxSizeOfMsg))
- {
- FreeMsgBufPos(*indexOfMsg);
- return -1;
- }
- memcpy(msg, pAddr + sizeof(TMessageHeader), lenOfMsg);
- if (pOutputHeader != NULL) {
- memcpy(pOutputHeader, pheader, sizeof(*pheader));
- }
-
- return lenOfMsg;
- }
-
-
生产者进程:productionProcess.cpp
- #include <stdio.h>
- #include <sys/shm.h>
- #include <string.h>
- #include <iostream>
- #include "msgExchange.h"
-
- using namespace std;
-
- int SimpleTestWrite() {
- // 1. 创建共享内存
- int shmid = shmget(1000, 4096, IPC_CREAT|0660);
- if(shmid == -1)
- {
- perror("shmget error");
- return -1;
- }
-
- // 2. 进程和共享内存关联
- void* ptr = shmat(shmid, NULL, 0);
- if(ptr == (void *) -1)
- {
- perror("shmat error");
- return -1;
- }
-
- // 3. 写共享内存
- const char* p = "hello, nice to meet you !";
- memcpy(ptr, p, strlen(p)+1);
-
- getchar();
-
- // 4. 解除内存和共享内存关联
- shmdt(ptr);
-
- // 5. 删除共享内存
- shmctl(shmid, IPC_RMID, NULL);
- std::cout << "delete share memory !" << std::endl;
- }
-
- int main(int argc, char *argv[])
- {
- InitSharedMemoryMsgBuf();
-
- // 发送消息
- TMessageHeader newHeader;
- std::string sendMsg("Hello, I am producer !");
- WriteBufferMessage((unsigned char *)sendMsg.data(),
- sendMsg.size(),
- 10101010,
- nullptr,
- &newHeader);
- // 读消息
- unsigned char recvMsg[8192] = {0};
- ReadBufferMessagReleaseIndex(recvMsg, 8192,
- newHeader.provider,
- &newHeader);
- std::cout << "recvMsg:: " << recvMsg << std::endl;
-
- UnInitSharedMemoryMsgBuf();
-
- return 0;
- }
消费者进程:consumptionProcess.cpp
- #include <sys/shm.h>
- #include <string.h>
- #include <stdio.h>
- #include <iostream>
-
- #include "msgExchange.h"
-
- using namespace std;
-
- int SimpleTestRead() {
- // 1. 创建共享内存
- int shmid = shmget(1000, 0, 0660);
- if(shmid == -1)
- {
- perror("shmget error");
- return -1;
- }
-
- // 2. 获取共享内存地址
- void* ptr = shmat(shmid, NULL, 0);
- if(ptr == (void *) -1)
- {
- perror("shmat error");
- return -1;
- }
-
- // 3. 读共享内存
- std::cout << "read memory:: " << (char*)ptr << std::endl;
-
- // 4. 解除关联
- shmdt(ptr);
-
- // 5. 删除
- shmctl(shmid, IPC_RMID, NULL);
- std::cout << "delete share memory !" << std::endl;
- }
-
- int main(int argc, char *argv[])
- {
- InitSharedMemoryMsgBuf();
-
- // 读消息
- TMessageHeader recvHeader;
- unsigned char recvMsg[8192] = {0};
- ReadBufferMessagReleaseIndex(recvMsg, 8192,
- 10101010,
- &recvHeader);
- std::cout << "recvMsg:: " << recvMsg << std::endl;
-
- // 发送消息
- TMessageHeader newHeader;
- std::string sendMsg("Hello, I am consumer, I have receiving your msg, thank you!");
- WriteBufferMessage((unsigned char *)sendMsg.data(),
- sendMsg.size(),
- recvHeader.provider,
- &recvHeader,
- &newHeader);
-
- UnInitSharedMemoryMsgBuf();
-
- return 0;
- }
编译脚本:Makefile
- app: productionProcess consumptionProcess
-
- #说明:$^代表依赖项
- productionProcess: productionProcess.cpp msgExchange.cpp
- g++ -g $^ -o productionProcess
-
- consumptionProcess: consumptionProcess.cpp msgExchange.cpp
- g++ -g $^ -o consumptionProcess
-
- clean:
- -rm productionProcess consumptionProcess -f
调试配置:launch.json
- {
- "version": "0.2.0",
- "configurations": [
- {
- "name": "C/C++ Runner: Debug Session",
- "type": "cppdbg",
- "request": "launch",
- "args": [],
- "stopAtEntry": false,
- "externalConsole": false,
- "cwd": "/home/tiger/ipc",
- "program": "/home/tiger/ipc/productionProcess",
- "MIMode": "gdb",
- "miDebuggerPath": "gdb",
- "setupCommands": [
- {
- "description": "Enable pretty-printing for gdb",
- "text": "-enable-pretty-printing",
- "ignoreFailures": true
- }
- ]
- }
- ]
- }
从运行效果看,程序创建了共享内存5308433,空闲消息队列12,任务队列13,生产者进程从空闲队列去取出位置0,按照位置在共享内存写入信息,然后发送消息到任务队列,然后等待消费者进程答复,消费着进程启动后从任务队列中取出位置,按照位置从共享内存读取数据,并打印,然后从空闲队列取出1号位置,按照位置在共享内存中写入要发送的信息。然后将位置信息放入任务队列,生产者进程从任务队列取出消息位置,按照位置从共享内存中取出消费者进程的消息并打印出来,同时归还位置信息到空闲队列,下次再继续使用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。