当前位置:   article > 正文

操作系统实验——利用Linux的消息队列通信机制实现两个线程间的通信_linux线程间消息队列 开源

linux线程间消息队列 开源

目录

一. 题目描述

二.实验思路

 三.代码及实验结果

四.遇到问题及解决方法

五.参考文献


一. 题目描述

编写程序创建三个线程:sender1线程、sender2线程和receive线程,三个线程的功能描述如下:
①sender1线程:运行函数sender1(),它创建一个消息队列,然后等待用户通过终端输入一串字符,并将这串字符通过消息队列发送给receiver线程;可循环发送多个消息,直到用户输入“exit”为止,表示它不再发送消息,最后向receiver线程发送消息“end1”,并且等待receiver的应答,等到应答消息后,将接收到的应答信息显示在终端屏幕上,结束线程的运行。
②sender2线程:运行函数sender2(),共享sender1创建的消息队列,等待用户通过终端输入一串字符,并将这串字符通过消息队列发送给receiver线程;可循环发送多个消息,直到用户输入“exit”为止,表示它不再发送消息,最后向receiver线程发送消息“end2”,并且等待receiver的应答,等到应答消息后,将接收到的应答信息显示在终端屏幕上,结束线程的运行。
③Receiver线程:运行函数receive(),它通过消息队列接收来自sender1和sender2两个线程的消息,将消息显示在终端屏幕上,当收到内容为“end1”的消息时,就向sender1发送一个应答消息“over1”;当收到内容为“end2”的消息时,就向sender2发送一个应答消息“over2”;消息接收完成后删除消息队列,结束线程的运行。选择合适的信号量机制实现三个线程之间的同步与互斥。

二.实验思路

首先明确线程的功能,实验要求要有两个sender线程发送和一个receive线程进行接收

Receive:接收sender线程发送的信息并显示在终端,当接收到sender线程发送的end后,回应一个over并释放over信号。

Sender:发送消息,在用户输入exit后,sender发送end,接收receive线程的over信号。

信号量:

      //互斥信号量mutex,互斥使用消息队列
      sem_init(&mutex, 0, 1);
      //同步信号量full,empty
      sem_init(&full, 0, 0);
      sem_init(&empty, 0, 5);
      //用over信号判断某个sender线程的结束
      sem_init(&over, 0, 0);
      //display信号量实际作用只是为了能让sender线程和receive线程实现一发一收的格式
      //不使用这两个信号量可能会无法保证线程的执行顺序
      sem_init(&s_display, 0, 1);
      sem_init(&r_display, 0, 0);

图1.线程关系

 三.代码及实验结果

  1. /*
  2. 2022/11/11
  3. author:chenchen4396
  4. */
  5. //API参考手册:https://www.bookstack.cn/read/linuxapi/SUMMARY.md
  6. #include <stdio.h>
  7. #include <stdlib.h>
  8. #include <string.h>
  9. #include <unistd.h>
  10. #include <pthread.h>
  11. #include <sys/types.h>
  12. #include <sys/ipc.h>
  13. #include <sys/msg.h>
  14. #include <semaphore.h>
  15. #include <fcntl.h>
  16. pthread_t s1, s2, r;
  17. sem_t mutex;
  18. sem_t full, empty;
  19. sem_t over;
  20. sem_t s_display, r_display;
  21. struct msgbuff
  22. {
  23. long msg_type; //消息类型,可以自定义,在使用msgrcv函数的时候会使用
  24. char msg[100]; //消息数组大小
  25. };
  26. void receive(void *arg)
  27. {
  28. struct msgbuff buf;
  29. int flag1 = 1;
  30. int flag2 = 1;
  31. int ret = -1;
  32. while ((flag1 + flag2) != 0)
  33. {
  34. //初始化消息队列
  35. memset(&buf, 0, sizeof(buf));
  36. //互斥信号量mutex,资源信号量full
  37. // mutex和full的wait顺序不能调换,否则会发生死锁,参考生存者与消费者模型
  38. sem_wait(&r_display);
  39. sem_wait(&full);
  40. sem_wait(&mutex);
  41. // msgrcv接受消息
  42. // arg是传入的msgid
  43. // 1代表msgtyp,指定一个接受的消息类型
  44. // IPC_NOWAIT 如果需要等待,则不发送消息并且调用进程立即返回,避免阻塞
  45. ret = msgrcv(*(int *)arg, &buf, sizeof(buf.msg), 1, IPC_NOWAIT);
  46. if (strlen(buf.msg) != 0)
  47. {
  48. printf("receive: %s \n", buf.msg); //打印接收到的内容
  49. printf("-----------------------------------------\n");
  50. }
  51. //判断sender1和sender2是否发送end
  52. if (flag1 && strncmp(buf.msg, "end1", 4) == 0)
  53. {
  54. memset(&buf, 0, sizeof(buf));
  55. strcpy(buf.msg, "over1");
  56. //使over消息的msgtyp为2,方便分开接受
  57. buf.msg_type = 2;
  58. ret = msgsnd(*(int *)arg, &buf, sizeof(buf.msg), 0);
  59. if (ret == -1)
  60. {
  61. printf("sender1 msgsnd error\n");
  62. sem_post(&mutex);
  63. exit(1);
  64. }
  65. sem_post(&over);
  66. flag1 = 0;
  67. }
  68. else if (flag2 && strncmp(buf.msg, "end2", 4) == 0)
  69. {
  70. memset(&buf, 0, sizeof(buf));
  71. strcpy(buf.msg, "over2");
  72. buf.msg_type = 2;
  73. ret = msgsnd(*(int *)arg, &buf, sizeof(buf.msg), 0);
  74. if (ret == -1)
  75. {
  76. printf("sender2 msgsnd error\n");
  77. sem_post(&mutex);
  78. exit(1);
  79. }
  80. sem_post(&over);
  81. flag2 = 0;
  82. }
  83. sem_post(&mutex);
  84. sem_post(&empty);
  85. sem_post(&s_display);
  86. sleep(1);
  87. }
  88. printf("sender1 and sender2 all over!\n");
  89. }
  90. void sender1(void *arg)
  91. {
  92. struct msgbuff buf;
  93. int flag = 1;
  94. int ret = -1;
  95. while (flag)
  96. {
  97. memset(&buf, 0, sizeof(buf));
  98. sem_wait(&s_display);
  99. sem_wait(&empty);
  100. sem_wait(&mutex);
  101. printf("sender1> ");
  102. scanf("%s", buf.msg);
  103. buf.msg_type = 1;
  104. if (!strncmp(buf.msg, "exit", 4))
  105. {
  106. strcpy(buf.msg, "end1");
  107. flag = 0;
  108. }
  109. else
  110. {
  111. strcat(buf.msg, " ——s1");
  112. }
  113. ret = msgsnd(*(int *)arg, &buf, sizeof(buf.msg), IPC_NOWAIT);
  114. if (ret == -1)
  115. {
  116. printf("sender1 msgsnd error\n");
  117. sem_post(&mutex);
  118. exit(1);
  119. }
  120. sem_post(&r_display);
  121. sem_post(&full);
  122. sem_post(&mutex);
  123. sleep(1);
  124. }
  125. sem_wait(&over);
  126. sem_wait(&empty);
  127. sem_wait(&mutex);
  128. ret = msgrcv(*(int *)arg, &buf, sizeof(buf.msg), 2, IPC_NOWAIT);
  129. if (strncmp(buf.msg, "over1", 5) == 0)
  130. {
  131. sem_post(&full);
  132. sem_post(&mutex);
  133. pthread_exit(NULL);
  134. }
  135. }
  136. void sender2(void *arg)
  137. {
  138. struct msgbuff buf;
  139. int flag = 1;
  140. int ret = -1;
  141. while (flag)
  142. {
  143. //初始化buf
  144. memset(&buf, 0, sizeof(buf));
  145. //等待信号量,申请顺序有些不能反!参考生产者消费者死锁问题
  146. sem_wait(&s_display);
  147. sem_wait(&empty);
  148. sem_wait(&mutex);
  149. printf("sender2> ");
  150. scanf("%s", buf.msg);
  151. //设置msg_type为1,与msgrcv第四个参数有关
  152. buf.msg_type = 1;
  153. if (!strncmp(buf.msg, "exit", 4)){
  154. //当sender1输入exit,则对receive发送end1
  155. strcpy(buf.msg, "end2");
  156. flag = 0;
  157. }
  158. else{
  159. strcat(buf.msg, " ——s2");
  160. }
  161. // msgsnd添加消息到队列
  162. ret = msgsnd(*(int *)arg, &buf, sizeof(buf.msg), IPC_NOWAIT);
  163. if (ret == -1){
  164. printf("sender2 msgsnd error\n");
  165. sem_post(&mutex);
  166. exit(1);
  167. }
  168. //释放信号量
  169. sem_post(&r_display);
  170. sem_post(&full);
  171. sem_post(&mutex);
  172. sleep(1);
  173. }
  174. //接受over信号
  175. sem_wait(&over);
  176. sem_wait(&empty);
  177. sem_wait(&mutex);
  178. ret = msgrcv(*(int *)arg, &buf, sizeof(buf.msg), 2, IPC_NOWAIT);
  179. if (strncmp(buf.msg, "over2", 5) == 0)
  180. {
  181. sem_post(&full);
  182. sem_post(&mutex);
  183. pthread_exit(NULL);
  184. }
  185. }
  186. /*
  187. 初始化信号量
  188. sem为初始化的信号量名,pshare为0表示只在该进程的所有线程间共享,非0则可以在进程之间共享,value为信号量的大小
  189. kernel源码中有static inline void sema_init(struct semaphore *sem, int val)
  190. sema_init是Linux内核的计数信号量实现初始化函数。
  191. sem_init是Posix线程库的初始化程序(因此被用户空间代码使用)
  192. */
  193. int main()
  194. {
  195. //互斥信号量mutex,互斥使用消息队列
  196. sem_init(&mutex, 0, 1);
  197. //同步信号量full,empty
  198. sem_init(&full, 0, 0);
  199. sem_init(&empty, 0, 5);
  200. //用over信号判断某个sender线程的结束
  201. sem_init(&over, 0, 0);
  202. //display信号量实际作用只是为了能让sender线程和receive线程实现一发一收的格式
  203. //不使用这两个信号量可能会无法保证线程的执行顺序
  204. sem_init(&s_display, 0, 1);
  205. sem_init(&r_display, 0, 0);
  206. // 可以用判断一下信号量是否初始化成功,sem_getvalue(&mutex,&x);
  207. int ret = 1; //返回值
  208. key_t key = 100;//消息队列的key值
  209. //IPC_CREAT表示key值不存在就创建,0666是设置权限
  210. int msgid = msgget(key, IPC_CREAT | 0666);
  211. if (msgid == -1)
  212. {
  213. printf("msgget error\n");
  214. exit(1);
  215. }
  216. // pthread_create创建三个线程
  217. //第二个参数用来 设置线程的属性,设为NULL
  218. //第四个参数为传入执行函数的实参
  219. ret = pthread_create(&s1, NULL, sender1, (void *)&msgid);
  220. if (ret != 0)
  221. {
  222. printf("create sender1 error!\n");
  223. exit(1);
  224. }
  225. ret = pthread_create(&r, NULL, receive, (void *)&msgid);
  226. if (ret != 0)
  227. {
  228. printf("create receiver error!\n");
  229. exit(1);
  230. }
  231. ret = pthread_create(&s2, NULL, sender2, (void *)&msgid);
  232. if (ret != 0)
  233. {
  234. printf("create sender2 error!\n");
  235. exit(1);
  236. }
  237. // pthread_join使三个进程阻塞
  238. //为了回收资源,主线程会等待子线程结束。该函数就是用来等待线程终止的
  239. pthread_join(s1, NULL);
  240. pthread_join(s2, NULL);
  241. pthread_join(r, NULL);
  242. //删除消息队列
  243. msgctl(msgid, IPC_RMID, 0);
  244. return 0;
  245. }

图2.实验结果

四.遇到问题及解决方法

1.信号量的申请顺序不对,导致程序发生死锁,full信号和互斥型号量mutex申请顺序不对,造成了死锁,类似生产者消费者的经典死锁问题。

2.在未使用s_display和r_display信号量对线程执行顺序进行约束的时候,线程的执行顺序得不到保证,增加这两个信号量后,可以有效保证在sender线程后会执行receive线程。

3.信号量的使用和初始化要很注意,该实验的很多bug是来源于信号量的使用不恰当。

4.msgrcv 此类函数中的参数IPC_NOWAIT要注意,如果不设置此参数,会导致进程被挂起直到有消息进入队列。

ps:Sleep()是多线程任务常用的一个函数,通过sleep()可以让线程直接告诉操作系统结束时间片进入休眠,在部分情况下可能可以提升很大的CPU利用率。两个display同步信号量可以让sender线程执行完成后就是receive线程,达成一发一收的效果。

五.参考文献

1 linux 多线程之信号量[EB/OL]. [].

【Linux】多线程 之 信号量_am brother的博客-CSDN博客.

2 . linux 多线程信号量[EB/OL]. [].

linux 多线程之信号量 sem_init_岁月斑驳7的博客-CSDN博客.

3 . linux API速查手册[EB/OL]. [].

https://www.bookstack.cn/read/linuxapi/SUMMARY.md.

4. . Linux源码[EB/OL]. []. 

https://elixir.bootlin.com/linux/v4.16.3/source.

var code = "771885a0-7704-4c6b-bb9b-490157eeb898"
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/601596
推荐阅读
相关标签
  

闽ICP备14008679号