当前位置:   article > 正文

openmp 生产者 消费者 实现_openmp程序实现生产和消费的完整代码

openmp程序实现生产和消费的完整代码
  1. #include <iostream>
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <math.h>
  5. #include <omp.h>
  6. #include <time.h>
  7. using namespace std;
  8. int thread_count;
  9. int send_max;
  10. int done_sending;
  11. struct MesgQueue {
  12. int *mesg;
  13. int enqueued, dequeued;
  14. omp_lock_t front_mutex, back_mutex;
  15. };
  16. struct MesgQueue* Msg;
  17. void Enqueue(int dest, int mesg) {
  18. int cur_p = omp_get_thread_num();
  19. printf("Thread %d send message %d to %d success!\n",cur_p, mesg, dest);
  20. Msg[dest].mesg[Msg[dest].enqueued] = mesg;
  21. Msg[dest].enqueued++;
  22. }
  23. void init(MesgQueue* MQ) {
  24. MQ->mesg = new int[send_max];
  25. MQ->dequeued = 0;
  26. MQ->enqueued = 0;
  27. omp_init_lock(&(MQ->front_mutex));
  28. omp_init_lock(&(MQ->back_mutex));
  29. }
  30. void Dequeue(int dest) {
  31. printf("Thread %d receive message %d success!\n", dest, Msg[dest].mesg[Msg[dest].dequeued]);
  32. Msg[dest].dequeued++;
  33. }
  34. void Send_msg() {
  35. int mesg = rand();
  36. int dest = rand() % thread_count;
  37. //#pragma omp critical
  38. omp_set_lock(&Msg[omp_get_thread_num()].back_mutex);
  39. Enqueue(dest, mesg);
  40. omp_unset_lock(&Msg[omp_get_thread_num()].back_mutex);
  41. }
  42. void Try_receive() {
  43. int cur_p = omp_get_thread_num();
  44. int queue_size = Msg[cur_p].enqueued - Msg[cur_p].dequeued;
  45. if (queue_size == 0) return;
  46. else if (queue_size == 1) {
  47. //#pragma omp critical
  48. Dequeue(cur_p);
  49. }
  50. else
  51. {
  52. Dequeue(cur_p);
  53. }
  54. }
  55. int Done() {
  56. int cur_p = omp_get_thread_num();
  57. int queue_size = Msg[cur_p].enqueued - Msg[cur_p].dequeued;
  58. if (queue_size == 0 && done_sending == thread_count) return 1;
  59. else return 0;
  60. }
  61. void destroy() {
  62. delete[] Msg;
  63. }
  64. int main(int argc, char* argv[]) {
  65. if (argc != 2) printf("Error Command!\n"), exit(0);
  66. thread_count = strtol(argv[1], NULL, 10);
  67. printf("thread_count = %d, Input the number of message:\n", thread_count);
  68. cin >> send_max;
  69. Msg = new MesgQueue[thread_count];
  70. srand((unsigned)time(NULL));
  71. int sent_msgs, i;
  72. clock_t s = clock();
  73. #pragma omp parallel num_threads(thread_count)
  74. {
  75. #pragma omp for
  76. for (i = 0; i < thread_count; ++i)
  77. init(&Msg[omp_get_thread_num()]);
  78. #pragma omp barrier
  79. #pragma omp for private(sent_msgs)
  80. for (i = 0; i < thread_count; ++i) {
  81. for (sent_msgs = 0; sent_msgs < send_max; ++sent_msgs) {
  82. Send_msg();
  83. Try_receive();
  84. }
  85. // printf("thread %d send message done!\n", omp_get_thread_num());
  86. #pragma omp atomic
  87. done_sending++;
  88. while (!Done())
  89. {
  90. Try_receive();
  91. }
  92. }
  93. }
  94. destroy();
  95. clock_t e = clock();
  96. printf("Running time is: %dms\n", e - s);
  97. system("pause");
  98. return 0;
  99. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/267883
推荐阅读
相关标签
  

闽ICP备14008679号