当前位置:   article > 正文

linux下共享内存和消息队列实现多进程间数据收发_通过共享内存实现消息队列

通过共享内存实现消息队列

       linux下进程通信的方式有很多,共享内存,消息队列,管道等。共享内存可以传输大量数据,但是多个进程同时读取共享内存就会出现脏读,可以借助消息队列实现多进程消息发送和接收。这种组合方式在实际开发中应用还是很多的,接下来就看一下。

目录

1.共享内存操作api

 (1)创建共享内存

(2)挂载共享内存到当前进程

(3)取消挂载

(4) 共享内存控制函数-可以删除

2.消息队列操作api

(1)创建消息附列

(2)往消息队列中发送消息

(3)从消息队列取消息

(4)操作消息队列,删除,查看等

3.命令行操作

(1)共享内存查看和删除

(2)消息队列查看和删除

(3)信号量查看和删除

4.生产者进程和消费者进程互相通信实践

4.1 代码

4.2 运行效果

5.execve系统调用


1.共享内存操作api

 (1)创建共享内存

int shmget(key_t key, size_t size, int shmflg);

(2)挂载共享内存到当前进程

void *shmat(int shmid, const void *shmaddr, int shmflg);

(3)取消挂载

int shmdt(const void *shmaddr);

(4) 共享内存控制函数-可以删除

int shmctl(int shmid, int cmd, struct shmid_ds *buf);

2.消息队列操作api

(1)创建消息附列

int msgget(key_t key, int flags);

(2)往消息队列中发送消息

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

(3)从消息队列取消息

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

(4)操作消息队列,删除,查看等

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

3.命令行操作

(1)共享内存查看和删除

ipcs -m / ipcrm -m

(2)消息队列查看和删除

 ipcs -q / ipcrm -q

(3)信号量查看和删除

ipcs -s / ipcrm -s

4.生产者进程和消费者进程互相通信实践

接下来通过一个共享内存和两条消息队列来实现进程间互发数据,本在ubuntu环境下使用vscode开发,程序目录结构如下:

4.1 代码

消息定义:  msgInfo.h

  1. #pragma once
  2. #include <time.h>
  3. #define gAvailableMsgBufPos 100
  4. #define gOccupiedMsgBufPos 101
  5. //共享内存结构
  6. struct TSharedMemoryModule
  7. {
  8. char name[40+1];
  9. int id;
  10. char reloadCommand[80+1];
  11. int index;
  12. long sizeOfUserSpace;
  13. unsigned char *removed_puserSpace;
  14. int users;
  15. int newCreated;
  16. int removed_writingLocks;
  17. int removed_readingLocks;
  18. int resID;
  19. };
  20. typedef TSharedMemoryModule *PTSharedMemoryModule;
  21. //消息队列管理结构
  22. struct TMsgBufHDL
  23. {
  24. long maxSizeOfMsg;
  25. int maxNumOfMsg;
  26. int maxStayTime;
  27. int userID;
  28. int queueIDOfFreePos;
  29. int queueIDOfOccupiedPos;
  30. };
  31. typedef TMsgBufHDL *PTMsgBufHDL;
  32. //消息头
  33. struct TMessageHeader
  34. {
  35. long type;
  36. int provider;
  37. time_t time;
  38. int len;
  39. long msgIndex;
  40. int dealer;
  41. int forward_cnt;
  42. short locked;
  43. };
  44. typedef TMessageHeader *PTMessageHeader;
  45. // 空闲消息队列元素结构
  46. struct TFreePosOfMsgBuf
  47. {
  48. long statusOfPos;
  49. char indexOfPos[4];
  50. };
  51. // 任务消息队列结构
  52. struct TOccupiedPosOfMsgBuf
  53. {
  54. long typeOfMsg;
  55. char indexOfPos[4];
  56. };

收据收发头文件:msgExchange.h

  1. #pragma once
  2. #include "msgInfo.h"
  3. /*
  4. ================ 消息队列 ====================
  5. // 创建共享内存
  6. int shmget(key_t key, size_t size, int shmflg);
  7. // 挂载
  8. void *shmat(int shmid, const void *shmaddr, int shmflg);
  9. // 取消挂载
  10. int shmdt(const void *shmaddr);
  11. // 共享内存控制函数-可以删除
  12. int shmctl(int shmid, int cmd, struct shmid_ds *buf);
  13. */
  14. /*
  15. ================ 消息队列 ====================
  16. // 创建和获取 ipc 内核对象
  17. int msgget(key_t key, int flags);
  18. // 将消息发送到消息队列
  19. int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
  20. // 从消息队列获取消息
  21. ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
  22. // 查看、设置、删除 ipc 内核对象(用法和 shmctl 一样)
  23. int msgctl(int msqid, int cmd, struct msqid_ds *buf);
  24. 消息数据格式
  25. struct Msg{
  26. long type; // 消息类型。这个是必须的,而且值必须 > 0,这个值被系统使用
  27. // 消息正文,多少字节随你而定
  28. };
  29. */
  30. // 初始化共享内存和消息队列
  31. int InitSharedMemoryMsgBuf();
  32. // 销毁共享内存和消息队列
  33. void UnInitSharedMemoryMsgBuf();
  34. //创建消息队列 - 调用创建共享内存
  35. int CreateMsgBuf();
  36. //创建共享内存
  37. PTSharedMemoryModule CreateSharedMemoryModule(long sizeOfUserSpace);
  38. //初始化消息队列
  39. int InitAllMsgIndex();
  40. //从空闲消息队列取位置
  41. int GetAvailableMsgBufPos();
  42. // 空闲消息队列放入消息
  43. int FreeMsgBufPos(int index);
  44. // 任务消息队列放入消息
  45. int OccupyMsgBufPos(int index, long typeOfMsg);
  46. // 任务消息队列取消息
  47. int GetOccupiedMsgBufPos(long *typeOfMsg);
  48. // 从任务消息队列获取指定类型的消息
  49. int GetOccupiedMsgBufPosOfMsgType(long typeOfMsg);
  50. // 发送消息
  51. int WriteBufferMessage(const unsigned char * msg,
  52. int lenOfMsg,
  53. long typeOfMsg,
  54. const TMessageHeader* poriHeader,
  55. PTMessageHeader pNewHeader);
  56. // 用指定位置发消息
  57. int WriteBufferMessageUsingIndex(const unsigned char * msg,
  58. int lenOfMsg, long typeOfMsg,
  59. const TMessageHeader* poriHeader,
  60. PTMessageHeader pNewHeader,
  61. const int *piIndexOfMsg);
  62. //接收消息 - 释放索引
  63. int ReadBufferMessagReleaseIndex(unsigned char * msg, int lenOfMsgBuf,
  64. long typeOfMsg,
  65. PTMessageHeader pOutputHeader);
  66. //接收消息 - 不释放索引
  67. int ReadBufferMessageNotReleaseIndex(unsigned char *msg, int lenOfMsgBuf,
  68. long typeOfMsg,
  69. PTMessageHeader pOutputHeader,
  70. int *indexOfMsg, int iSigRetryFlag);

收据收发实现:msgExchange.cpp

  1. #include <sys/types.h>
  2. #include <sys/ipc.h>
  3. #include <sys/msg.h>
  4. #include <sys/shm.h>
  5. #include <string.h>
  6. #include <errno.h>
  7. #include <sys/types.h>
  8. #include <unistd.h>
  9. #include <iostream>
  10. #include "msgExchange.h"
  11. // ipcs -m / ipcrm -m
  12. // ipcs -q / ipcrm -q
  13. // ipcs -s / ipcrm -s
  14. // 以下三部分都是从共享内存申请
  15. static PTSharedMemoryModule s_pSharedMemMDL = NULL;
  16. static PTMsgBufHDL s_pMsgBufHDL = NULL;
  17. static unsigned char *s_pMsgBuf = NULL;
  18. //共享内存,消息队列初始化信息
  19. static TSharedMemoryModule s_sharedMemoryModule;
  20. static TMsgBufHDL s_msgBufHDL;
  21. static unsigned long gMsgIndex = 1;
  22. #if defined(__GNUC__)
  23. static unsigned long addAndFetchUlongAtomic(volatile unsigned long *p_var, unsigned long value) {
  24. return __sync_add_and_fetch(p_var, value);
  25. }
  26. #endif
  27. // 初始化共享内存和消息队列
  28. int InitSharedMemoryMsgBuf()
  29. {
  30. s_sharedMemoryModule.id = 1234567;
  31. s_sharedMemoryModule.newCreated = 1;
  32. s_sharedMemoryModule.users = 0;
  33. s_msgBufHDL.maxSizeOfMsg = 8192;
  34. s_msgBufHDL.maxNumOfMsg = 1024;
  35. CreateMsgBuf();
  36. return 0;
  37. }
  38. void UnInitSharedMemoryMsgBuf() {
  39. struct shmid_ds buf;
  40. shmdt(s_pSharedMemMDL);
  41. shmctl(s_sharedMemoryModule.resID, IPC_RMID, &buf);
  42. msgctl(s_msgBufHDL.queueIDOfOccupiedPos, IPC_RMID, NULL);
  43. msgctl(s_msgBufHDL.queueIDOfFreePos, IPC_RMID, NULL);
  44. }
  45. //创建共享内存
  46. PTSharedMemoryModule CreateSharedMemoryModule(long sizeOfUserSpace)
  47. {
  48. PTSharedMemoryModule pSharedMem = nullptr;
  49. int ret;
  50. int resID;
  51. unsigned char *pmUserSpace = nullptr;
  52. if ((resID = shmget(s_sharedMemoryModule.id, sizeof(TSharedMemoryModule) + sizeOfUserSpace, 0660)) == -1) {
  53. if ((resID = shmget(s_sharedMemoryModule.id, sizeof(TSharedMemoryModule) + sizeOfUserSpace, IPC_CREAT|0660)) == -1) {
  54. return nullptr;
  55. }
  56. s_sharedMemoryModule.newCreated = 1;
  57. }
  58. else {
  59. s_sharedMemoryModule.newCreated = 0;
  60. }
  61. std::cout << "shared memory id is :: " << resID << std::endl;
  62. s_sharedMemoryModule.resID = resID;
  63. if (((pSharedMem = (PTSharedMemoryModule)shmat(resID, 0, SHM_RND)) == NULL) || (pSharedMem == (void*)(-1))) {
  64. if (errno == 24) {
  65. pSharedMem = (PTSharedMemoryModule)shmat(resID, 0, SHM_RND);
  66. }
  67. if ((pSharedMem == NULL) || (pSharedMem == (void*)(-1))) {
  68. return nullptr;
  69. }
  70. }
  71. if (!s_sharedMemoryModule.newCreated) {
  72. ++(s_sharedMemoryModule.users);
  73. s_sharedMemoryModule.newCreated = 0;
  74. pSharedMem->newCreated = s_sharedMemoryModule.newCreated;
  75. }
  76. else {
  77. memcpy(pSharedMem, &s_sharedMemoryModule, sizeof(s_sharedMemoryModule));
  78. pSharedMem->users = 1;
  79. pSharedMem->removed_readingLocks = 0;
  80. pSharedMem->removed_writingLocks = 0;
  81. pSharedMem->sizeOfUserSpace = sizeOfUserSpace;
  82. }
  83. if (sizeOfUserSpace == 0) {
  84. pmUserSpace = nullptr;
  85. }
  86. else {
  87. pmUserSpace = (unsigned char *)((unsigned char *)pSharedMem + sizeof(*pSharedMem));
  88. }
  89. // 清空消息内容
  90. if ((pSharedMem->newCreated) && (pmUserSpace)) {
  91. memset(pmUserSpace, 0, sizeOfUserSpace);
  92. }
  93. return pSharedMem;
  94. }
  95. //创建消息队列 - 调用创建共享内存
  96. int CreateMsgBuf() {
  97. TMsgBufHDL msgBufHDL;
  98. int ret;
  99. int newCreatedQueue;
  100. memset(&msgBufHDL,0,sizeof(msgBufHDL));
  101. msgBufHDL = s_msgBufHDL;
  102. if ((s_pSharedMemMDL = CreateSharedMemoryModule(
  103. sizeof(*s_pMsgBufHDL) +
  104. ((sizeof(TMessageHeader) + sizeof(unsigned char) * msgBufHDL.maxSizeOfMsg) * (msgBufHDL.maxNumOfMsg + 1)))) == NULL)
  105. {
  106. return -1;
  107. }
  108. if ((s_pMsgBufHDL = (PTMsgBufHDL)((unsigned char *)s_pSharedMemMDL + sizeof(TSharedMemoryModule))) == NULL)
  109. {
  110. return -1;
  111. }
  112. s_pMsgBuf = (unsigned char *)s_pMsgBufHDL + sizeof(*s_pMsgBufHDL);
  113. if (s_pSharedMemMDL->newCreated) {
  114. memcpy(s_pMsgBufHDL, &msgBufHDL, sizeof(*s_pMsgBufHDL));
  115. }
  116. s_pMsgBufHDL->userID = s_pSharedMemMDL->id;
  117. if ((s_pMsgBufHDL->queueIDOfFreePos = msgget(s_pMsgBufHDL->userID, 0660)) == -1)
  118. {
  119. if ((s_pMsgBufHDL->queueIDOfFreePos = msgget(s_pMsgBufHDL->userID, 0660 | IPC_CREAT)) == -1)
  120. {
  121. return -1;
  122. }
  123. else {
  124. newCreatedQueue = 1;
  125. }
  126. }
  127. std::cout << "free msg queue id is :: " << s_pMsgBufHDL->queueIDOfFreePos << std::endl;
  128. s_msgBufHDL.queueIDOfFreePos = s_pMsgBufHDL->queueIDOfFreePos;
  129. if ((s_pMsgBufHDL->queueIDOfOccupiedPos = msgget(s_pMsgBufHDL->userID + 1,0660)) == -1)
  130. {
  131. if ((s_pMsgBufHDL->queueIDOfOccupiedPos = msgget(s_pMsgBufHDL->userID + 1,0660 | IPC_CREAT)) == -1)
  132. {
  133. return -1;
  134. }
  135. else {
  136. newCreatedQueue = 1;
  137. }
  138. }
  139. std::cout << "occupied msg queue id is :: " << s_pMsgBufHDL->queueIDOfOccupiedPos << std::endl;
  140. s_msgBufHDL.queueIDOfOccupiedPos = s_pMsgBufHDL->queueIDOfOccupiedPos;
  141. if (newCreatedQueue) {
  142. InitAllMsgIndex();
  143. }
  144. return 0;
  145. }
  146. //初始化消息队列
  147. int InitAllMsgIndex() {
  148. TFreePosOfMsgBuf freePos;
  149. TOccupiedPosOfMsgBuf occupiedPos;
  150. PTMessageHeader pheader;
  151. int ret;
  152. int index;
  153. while(true) {
  154. if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfFreePos, &freePos, sizeof(freePos.indexOfPos), 0, IPC_NOWAIT)) < 0) {
  155. break;
  156. }
  157. }
  158. while(true) {
  159. if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfOccupiedPos, &occupiedPos, sizeof(occupiedPos.indexOfPos), 0, IPC_NOWAIT)) < 0) {
  160. break;
  161. }
  162. }
  163. // 位置消息队列中放入共享内存索引编号
  164. for (index = 0; index < s_pMsgBufHDL->maxNumOfMsg; index++)
  165. {
  166. FreeMsgBufPos(index);
  167. }
  168. return(0);
  169. }
  170. //从空闲消息队列取位置
  171. int GetAvailableMsgBufPos()
  172. {
  173. TFreePosOfMsgBuf posOfMsgBuf;
  174. int ret;
  175. int index;
  176. memset(posOfMsgBuf.indexOfPos, 0, sizeof(posOfMsgBuf.indexOfPos));
  177. if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfFreePos, &posOfMsgBuf, sizeof(int), 0, 0)) < 0) {
  178. return -1;
  179. }
  180. memcpy(&index, posOfMsgBuf.indexOfPos, sizeof(int));
  181. return index;
  182. }
  183. // 空闲消息队列放入消息
  184. int FreeMsgBufPos(int index)
  185. {
  186. TFreePosOfMsgBuf posOfMsgBuf;
  187. int ret;
  188. PTMessageHeader pheader;
  189. if ((index < 0) || (index >= s_pMsgBufHDL->maxNumOfMsg)) {
  190. return -1;
  191. }
  192. pheader = (PTMessageHeader)(s_pMsgBuf + index * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg));
  193. pheader->type = 0;
  194. posOfMsgBuf.statusOfPos = gAvailableMsgBufPos;
  195. memcpy(posOfMsgBuf.indexOfPos, &index, sizeof(int));
  196. if (msgsnd(s_pMsgBufHDL->queueIDOfFreePos, &posOfMsgBuf, sizeof(int), 0) < 0) {
  197. return -1;
  198. }
  199. return 0;
  200. }
  201. // 任务消息队列放入消息
  202. int OccupyMsgBufPos(int index, long typeOfMsg)
  203. {
  204. TOccupiedPosOfMsgBuf posOfMsgBuf;
  205. int ret;
  206. posOfMsgBuf.typeOfMsg = typeOfMsg;
  207. memcpy(posOfMsgBuf.indexOfPos, &index, sizeof(int));
  208. if (ret = msgsnd(s_pMsgBufHDL->queueIDOfOccupiedPos, &posOfMsgBuf, sizeof(posOfMsgBuf.indexOfPos), 0)) {
  209. return -1;
  210. }
  211. return 0;
  212. }
  213. // 任务消息队列取消息
  214. int GetOccupiedMsgBufPos(long *typeOfMsg)
  215. {
  216. TOccupiedPosOfMsgBuf posOfMsgBuf;
  217. int ret;
  218. int index;
  219. memset(posOfMsgBuf.indexOfPos, 0, sizeof(posOfMsgBuf.indexOfPos));
  220. if ((ret = msgrcv(s_pMsgBufHDL->queueIDOfOccupiedPos, &posOfMsgBuf, sizeof(int), 0, 0)) < 0) {
  221. return -1;
  222. }
  223. memcpy(&index, posOfMsgBuf.indexOfPos, sizeof(int));
  224. *typeOfMsg = posOfMsgBuf.typeOfMsg;
  225. return index;
  226. }
  227. // 从任务消息队列获取指定类型的消息
  228. int GetOccupiedMsgBufPosOfMsgType(long typeOfMsg)
  229. {
  230. TOccupiedPosOfMsgBuf posOfMsgBuf;
  231. int ret;
  232. int index;
  233. int iRetryTime = 0;
  234. memset(posOfMsgBuf.indexOfPos, 0, sizeof(posOfMsgBuf.indexOfPos));
  235. posOfMsgBuf.typeOfMsg = typeOfMsg;
  236. do {
  237. ret = msgrcv(s_pMsgBufHDL->queueIDOfOccupiedPos, &posOfMsgBuf, sizeof(int), typeOfMsg, 0);
  238. iRetryTime ++;
  239. }while(iRetryTime < 10 && 1 == ret && EINTR == errno);
  240. if (-1 != ret) {
  241. memcpy(&index, posOfMsgBuf.indexOfPos, sizeof(int));
  242. return index;
  243. }
  244. return -1;
  245. }
  246. // 发送消息
  247. int WriteBufferMessage(const unsigned char * msg,
  248. int lenOfMsg,
  249. long typeOfMsg,
  250. const TMessageHeader* poriHeader,
  251. PTMessageHeader pNewHeader)
  252. {
  253. int indexOfMsg;
  254. unsigned char *pAddr;
  255. int ret;
  256. PTMessageHeader pheader;
  257. if ((indexOfMsg = GetAvailableMsgBufPos()) < 0) {
  258. return indexOfMsg;
  259. }
  260. if ((indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg)) {
  261. FreeMsgBufPos(indexOfMsg);
  262. return(-1);
  263. }
  264. // 得到存放消息的首地址
  265. pAddr = s_pMsgBuf + indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
  266. pheader = (PTMessageHeader)pAddr;
  267. pheader->len = lenOfMsg;
  268. if (NULL == poriHeader) {
  269. time(&(pheader->time));
  270. pheader->provider = getpid();
  271. pheader->dealer = 0;
  272. pheader->msgIndex = addAndFetchUlongAtomic(&gMsgIndex, 1);
  273. pheader->forward_cnt = 0;
  274. }
  275. else
  276. {
  277. pheader->time = poriHeader->time;
  278. pheader->provider = poriHeader->provider;
  279. pheader->dealer = getpid();
  280. pheader->msgIndex = poriHeader->msgIndex;
  281. pheader->forward_cnt = poriHeader->forward_cnt;
  282. }
  283. pheader->type = typeOfMsg;
  284. if(pNewHeader) {
  285. memcpy(pNewHeader, pheader, sizeof(*pNewHeader));
  286. }
  287. memcpy(pAddr + sizeof(TMessageHeader), msg, lenOfMsg);
  288. if ((ret = OccupyMsgBufPos(indexOfMsg, typeOfMsg)) < 0) {
  289. FreeMsgBufPos(indexOfMsg);
  290. return ret;
  291. }
  292. std::cout << "WriteBufferMessage -> indexOfMsg:: " << indexOfMsg << " typeOfMsg::" << typeOfMsg << " pheader->len::" << pheader->len << std::endl;
  293. return 0;
  294. }
  295. // 用指定位置发消息
  296. int WriteBufferMessageUsingIndex(const unsigned char * msg,
  297. int lenOfMsg, long typeOfMsg,
  298. const TMessageHeader* poriHeader,
  299. PTMessageHeader pNewHeader,
  300. const int *piIndexOfMsg)
  301. {
  302. int indexOfMsg = *piIndexOfMsg;
  303. int ret;
  304. unsigned char *pAddr;
  305. PTMessageHeader pheader;
  306. if ((indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg)) {
  307. return -1;
  308. }
  309. pAddr = s_pMsgBuf + indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
  310. pheader = (PTMessageHeader)pAddr;
  311. pheader->len = lenOfMsg;
  312. if (poriHeader == NULL)
  313. {
  314. time(&(pheader->time));
  315. pheader->provider = getpid();
  316. pheader->dealer = 0;
  317. pheader->msgIndex = addAndFetchUlongAtomic(&gMsgIndex, 1);
  318. pheader->forward_cnt = 0;
  319. }
  320. else
  321. {
  322. pheader->time = poriHeader->time;
  323. pheader->provider = poriHeader->provider;
  324. pheader->dealer = getpid();
  325. pheader->msgIndex = poriHeader->msgIndex;
  326. pheader->forward_cnt = poriHeader->forward_cnt;
  327. }
  328. pheader->type = typeOfMsg;
  329. if(pNewHeader) {
  330. memcpy(pNewHeader, pheader, sizeof(*pNewHeader));
  331. }
  332. memcpy(pAddr + sizeof(TMessageHeader), msg, lenOfMsg);
  333. if ((ret = OccupyMsgBufPos(indexOfMsg, typeOfMsg)) < 0)
  334. {
  335. return ret;
  336. }
  337. return(0);
  338. }
  339. //接收消息 - 释放索引
  340. int ReadBufferMessagReleaseIndex(unsigned char * msg, int lenOfMsgBuf,
  341. long typeOfMsg,
  342. PTMessageHeader pOutputHeader)
  343. {
  344. int indexOfMsg;
  345. unsigned char *pAddr;
  346. int lenOfMsg;
  347. PTMessageHeader pheader;
  348. if (((indexOfMsg = GetOccupiedMsgBufPosOfMsgType(typeOfMsg)) < 0) || (indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg))
  349. {
  350. return -1;
  351. }
  352. pAddr = s_pMsgBuf + indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
  353. pheader = (PTMessageHeader)pAddr;
  354. if (((lenOfMsg = pheader->len) < 0) || (lenOfMsg >= lenOfMsgBuf) || (lenOfMsg > s_pMsgBufHDL->maxSizeOfMsg))
  355. {
  356. return -1;
  357. }
  358. memcpy(msg, pAddr + sizeof(TMessageHeader), lenOfMsg);
  359. if (pOutputHeader != NULL) {
  360. memcpy(pOutputHeader, pheader, sizeof(*pheader));
  361. }
  362. FreeMsgBufPos(indexOfMsg);
  363. std::cout << "ReadBufferMessagReleaseIndex -> indexOfMsg:: " << indexOfMsg << " typeOfMsg::" << typeOfMsg << " pheader->len::" << pheader->len << std::endl;
  364. return(lenOfMsg);
  365. }
  366. //接收消息 - 不释放索引
  367. int ReadBufferMessageNotReleaseIndex(unsigned char *msg, int lenOfMsgBuf,
  368. long typeOfMsg,
  369. PTMessageHeader pOutputHeader,
  370. int *indexOfMsg, int iSigRetryFlag)
  371. {
  372. unsigned char *pAddr;
  373. int lenOfMsg;
  374. PTMessageHeader pheader;
  375. if (((*indexOfMsg = GetOccupiedMsgBufPosOfMsgType(typeOfMsg)) < 0) || (*indexOfMsg >= s_pMsgBufHDL->maxNumOfMsg))
  376. {
  377. return -1;
  378. }
  379. pAddr = s_pMsgBuf + *indexOfMsg * (sizeof(TMessageHeader) + s_pMsgBufHDL->maxSizeOfMsg);
  380. pheader = (PTMessageHeader)pAddr;
  381. if (((lenOfMsg = pheader->len) < 0) || (lenOfMsg >= lenOfMsgBuf) || (lenOfMsg > s_pMsgBufHDL->maxSizeOfMsg))
  382. {
  383. FreeMsgBufPos(*indexOfMsg);
  384. return -1;
  385. }
  386. memcpy(msg, pAddr + sizeof(TMessageHeader), lenOfMsg);
  387. if (pOutputHeader != NULL) {
  388. memcpy(pOutputHeader, pheader, sizeof(*pheader));
  389. }
  390. return lenOfMsg;
  391. }

生产者进程:productionProcess.cpp

  1. #include <stdio.h>
  2. #include <sys/shm.h>
  3. #include <string.h>
  4. #include <iostream>
  5. #include "msgExchange.h"
  6. using namespace std;
  7. int SimpleTestWrite() {
  8. // 1. 创建共享内存
  9. int shmid = shmget(1000, 4096, IPC_CREAT|0660);
  10. if(shmid == -1)
  11. {
  12. perror("shmget error");
  13. return -1;
  14. }
  15. // 2. 进程和共享内存关联
  16. void* ptr = shmat(shmid, NULL, 0);
  17. if(ptr == (void *) -1)
  18. {
  19. perror("shmat error");
  20. return -1;
  21. }
  22. // 3. 写共享内存
  23. const char* p = "hello, nice to meet you !";
  24. memcpy(ptr, p, strlen(p)+1);
  25. getchar();
  26. // 4. 解除内存和共享内存关联
  27. shmdt(ptr);
  28. // 5. 删除共享内存
  29. shmctl(shmid, IPC_RMID, NULL);
  30. std::cout << "delete share memory !" << std::endl;
  31. }
  32. int main(int argc, char *argv[])
  33. {
  34. InitSharedMemoryMsgBuf();
  35. // 发送消息
  36. TMessageHeader newHeader;
  37. std::string sendMsg("Hello, I am producer !");
  38. WriteBufferMessage((unsigned char *)sendMsg.data(),
  39. sendMsg.size(),
  40. 10101010,
  41. nullptr,
  42. &newHeader);
  43. // 读消息
  44. unsigned char recvMsg[8192] = {0};
  45. ReadBufferMessagReleaseIndex(recvMsg, 8192,
  46. newHeader.provider,
  47. &newHeader);
  48. std::cout << "recvMsg:: " << recvMsg << std::endl;
  49. UnInitSharedMemoryMsgBuf();
  50. return 0;
  51. }

消费者进程:consumptionProcess.cpp

  1. #include <sys/shm.h>
  2. #include <string.h>
  3. #include <stdio.h>
  4. #include <iostream>
  5. #include "msgExchange.h"
  6. using namespace std;
  7. int SimpleTestRead() {
  8. // 1. 创建共享内存
  9. int shmid = shmget(1000, 0, 0660);
  10. if(shmid == -1)
  11. {
  12. perror("shmget error");
  13. return -1;
  14. }
  15. // 2. 获取共享内存地址
  16. void* ptr = shmat(shmid, NULL, 0);
  17. if(ptr == (void *) -1)
  18. {
  19. perror("shmat error");
  20. return -1;
  21. }
  22. // 3. 读共享内存
  23. std::cout << "read memory:: " << (char*)ptr << std::endl;
  24. // 4. 解除关联
  25. shmdt(ptr);
  26. // 5. 删除
  27. shmctl(shmid, IPC_RMID, NULL);
  28. std::cout << "delete share memory !" << std::endl;
  29. }
  30. int main(int argc, char *argv[])
  31. {
  32. InitSharedMemoryMsgBuf();
  33. // 读消息
  34. TMessageHeader recvHeader;
  35. unsigned char recvMsg[8192] = {0};
  36. ReadBufferMessagReleaseIndex(recvMsg, 8192,
  37. 10101010,
  38. &recvHeader);
  39. std::cout << "recvMsg:: " << recvMsg << std::endl;
  40. // 发送消息
  41. TMessageHeader newHeader;
  42. std::string sendMsg("Hello, I am consumer, I have receiving your msg, thank you!");
  43. WriteBufferMessage((unsigned char *)sendMsg.data(),
  44. sendMsg.size(),
  45. recvHeader.provider,
  46. &recvHeader,
  47. &newHeader);
  48. UnInitSharedMemoryMsgBuf();
  49. return 0;
  50. }

编译脚本:Makefile

  1. app: productionProcess consumptionProcess
  2. #说明:$^代表依赖项
  3. productionProcess: productionProcess.cpp msgExchange.cpp
  4. g++ -g $^ -o productionProcess
  5. consumptionProcess: consumptionProcess.cpp msgExchange.cpp
  6. g++ -g $^ -o consumptionProcess
  7. clean:
  8. -rm productionProcess consumptionProcess -f

调试配置:launch.json

  1. {
  2. "version": "0.2.0",
  3. "configurations": [
  4. {
  5. "name": "C/C++ Runner: Debug Session",
  6. "type": "cppdbg",
  7. "request": "launch",
  8. "args": [],
  9. "stopAtEntry": false,
  10. "externalConsole": false,
  11. "cwd": "/home/tiger/ipc",
  12. "program": "/home/tiger/ipc/productionProcess",
  13. "MIMode": "gdb",
  14. "miDebuggerPath": "gdb",
  15. "setupCommands": [
  16. {
  17. "description": "Enable pretty-printing for gdb",
  18. "text": "-enable-pretty-printing",
  19. "ignoreFailures": true
  20. }
  21. ]
  22. }
  23. ]
  24. }

4.2 运行效果

从运行效果看,程序创建了共享内存5308433,空闲消息队列12,任务队列13,生产者进程从空闲队列去取出位置0,按照位置在共享内存写入信息,然后发送消息到任务队列,然后等待消费者进程答复,消费着进程启动后从任务队列中取出位置,按照位置从共享内存读取数据,并打印,然后从空闲队列取出1号位置,按照位置在共享内存中写入要发送的信息。然后将位置信息放入任务队列,生产者进程从任务队列取出消息位置,按照位置从共享内存中取出消费者进程的消息并打印出来,同时归还位置信息到空闲队列,下次再继续使用。

5.execve系统调用

进程调度之 4:系统调用execve - 我叫半桶水的个人空间 - OSCHINA - 中文开源技术交流社区

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/601603
推荐阅读
相关标签
  

闽ICP备14008679号