当前位置:   article > 正文

(C语言)多进程实现消费者生产者pv操作,Windows和LInux版本_c实现多生产者消费者问题

c实现多生产者消费者问题

多进程实现消费者生产者问题

一,实验目的

1,了解生产者消费者的互斥与同步问题

2,掌握Windows和Linux的进程通信方法

二,实验要求

完成Windows版本和Linux版本。

一个大小为3的缓冲区,初始为空。

2个生产者

    • 随机等待一段时间,往缓冲区添加数据,
    • 若缓冲区已满,等待消费者取走数据后再添加
    • 重复6次

3个消费者

    • 随机等待一段时间,从缓冲区读取数据
    • 若缓冲区为空,等待生产者添加数据后再读取
    • 重复4次

显示每次添加和读取数据的时间及缓冲区的状态

三,实验环境

Windows版本:Windows 10 64位系统,Dev-cpp编译器

Linux版本:Fedora29版本,gcc环境 vim文本编辑器

四,实验代码结构

1),pv操作伪代码:

array[3]:interger//缓冲区定义,大小为三

int empty=3,full=0;

int mutex=1;

i=0,j=0//缓冲区指针

x,y:item //产品变量

生产者:                                                                                              消费者:

begin:

produce a product to x;

P(empty);

P(mutex);

array[i]=x;

ii=(i+1)%3;

V(full);

V(mutex);

,,,,,, .........,

End

消费者:

end

 

 

2)实验代码分析

Windows版本:

思路分析:Windows创建多进程使用creatprocess()函数调用自己,通过多次创建得到两个生产者进程三个消费者进程,在之中运行相应的生产者函数,消费者函数。在通过传入参数不同,来辨别是第一次主进程还是生产者进程,消费者进程。通过构建共享内存区进行进程间通信。

①多进程创建

 

② 构建共享内存区,再将文件映射到本进程,初始化

③在主创建进程间信号量full empty

分别在生产者消费者进程创建互斥访问量mutex

④同过argv量的不同判断进程归属

 

运行结果:

全部代码见后

Linux版本:

思路分析:Linux使用fork进行多进程创建,分别在进程中运行消费者函数,生产者函数。建立共享主存区很信号量在进程建进行通信和缓存访问

①,创建进程分别运行消费者和生产者函数

②建立共享主存并进行映射

③创建进程间信号量full,empty和互斥量 mutex,并初始化

 

④实验结果:

五, 实验总结

本次实验获得圆满成功。

本次实验通过分别编写Windows和Linux版本的多进程实现消费者和生产者问题,了解生产者消费者的互斥与同步问题,掌握Windows和Linux的进程通信方法,也同时加强自己对多进程操作的理解。

代码:

  1. WIndows版本:
  2. //实验三生产者消费者
  3. #include <windows.h>
  4. #include <stdio.h>
  5. #include <time.h>
  6. HANDLE handleOfProcess[5];
  7. struct buf
  8. {
  9. char buffer[3];
  10. int write;
  11. int read;
  12. };
  13. int rand_1()
  14. {
  15. return rand()%100+1000;
  16. }
  17. char rand_char()
  18. {
  19. return rand()%26+'A';
  20. }
  21. void StartClone(int nCloneID)
  22. {
  23. TCHAR szFilename[MAX_PATH];
  24. GetModuleFileName(NULL,szFilename,MAX_PATH);
  25. TCHAR szCmdLine[MAX_PATH];
  26. sprintf(szCmdLine,"\"%s\" %d",szFilename,nCloneID);
  27. //printf("%s\n",szCmdLine);
  28. STARTUPINFO si;
  29. ZeroMemory(reinterpret_cast<void*>(&si),sizeof(si));
  30. si.cb=sizeof(si);
  31. PROCESS_INFORMATION pi;
  32. BOOL bCreateOK=CreateProcess(
  33. szFilename,
  34. szCmdLine,
  35. NULL,
  36. NULL,
  37. FALSE,
  38. CREATE_DEFAULT_ERROR_MODE,
  39. NULL,
  40. NULL,
  41. &si,
  42. &pi);
  43. if(bCreateOK)
  44. handleOfProcess[nCloneID]=pi.hProcess;
  45. else
  46. {
  47. printf("Error in create process!\n");
  48. exit(0);
  49. }
  50. }
  51. void pro()//生产者
  52. {
  53. HANDLE mutex = CreateMutex(NULL,FALSE,"MYMUTEX");
  54. HANDLE empty = OpenSemaphore(SEMAPHORE_ALL_ACCESS,FALSE,"MYEMPTY");
  55. HANDLE full = OpenSemaphore(SEMAPHORE_ALL_ACCESS,FALSE,"MYFULL");
  56. HANDLE hMap = OpenFileMapping(FILE_MAP_ALL_ACCESS,FALSE,"myfilemap");
  57. LPVOID Data=MapViewOfFile(//文件映射
  58. hMap,
  59. FILE_MAP_ALL_ACCESS,
  60. 0,
  61. 0,
  62. 0);
  63. struct buf *pint=reinterpret_cast<struct buf *>(Data);
  64. for (int i = 0; i < 6; i++)
  65. {
  66. WaitForSingleObject(empty, INFINITE);
  67. //sleep
  68. srand((unsigned)time(0));
  69. int tim=rand_1();
  70. Sleep(tim);
  71. WaitForSingleObject(mutex, INFINITE);
  72. //code
  73. pint->buffer[pint->write]=rand_char();
  74. pint->write=(pint->write+1)%3;
  75. ReleaseMutex(mutex);
  76. ReleaseSemaphore(full,1,NULL);
  77. SYSTEMTIME syst;
  78. time_t t=time(0);
  79. GetSystemTime(&syst);
  80. char tmpBuf[10];
  81. strftime(tmpBuf,10,"%H:%M:%S",localtime(&t));
  82. printf("生产者%d向缓冲区写入数据:\t%c\t%c\t%c\t@%s.%d\n",
  83. (int)GetCurrentProcessId(),pint->buffer[0],pint->buffer[1],
  84. pint->buffer[2],tmpBuf,syst.wMilliseconds);
  85. fflush(stdout);
  86. }
  87. UnmapViewOfFile(Data);//解除映射
  88. Data=NULL;
  89. CloseHandle(mutex);
  90. CloseHandle(empty);
  91. CloseHandle(full);
  92. }
  93. void con()//消费者
  94. {
  95. HANDLE mutex = CreateMutex(NULL,FALSE,"MYMUTEX");
  96. HANDLE empty = OpenSemaphore(SEMAPHORE_ALL_ACCESS,FALSE,"MYEMPTY");
  97. HANDLE full = OpenSemaphore(SEMAPHORE_ALL_ACCESS,FALSE,"MYFULL");
  98. HANDLE hMap = OpenFileMapping(FILE_MAP_ALL_ACCESS,FALSE,"myfilemap");
  99. LPVOID Data=MapViewOfFile(//文件映射
  100. hMap,
  101. FILE_MAP_ALL_ACCESS,
  102. 0,
  103. 0,
  104. 0);
  105. struct buf *pint=reinterpret_cast<struct buf *>(Data);
  106. for (int i = 0; i < 4; i++)
  107. {
  108. WaitForSingleObject(full,INFINITE);
  109. //sleep
  110. srand((unsigned)time(0));
  111. int tim=rand_1();
  112. Sleep(tim);
  113. WaitForSingleObject(mutex,INFINITE);
  114. pint->buffer[pint->read]=' ';
  115. pint->read=(pint->read+1)%3;
  116. ReleaseMutex(mutex);
  117. ReleaseSemaphore(empty,1,NULL);
  118. //code
  119. time_t t=time(0);
  120. char tmpBuf[10];
  121. SYSTEMTIME syst;
  122. GetSystemTime(&syst);
  123. strftime(tmpBuf,10,"%H:%M:%S",localtime(&t));
  124. printf("消费者%d从缓冲区读取数据:\t%c\t%c\t%c\t@%s.%d\n",
  125. (int)GetCurrentProcessId(),pint->buffer[0],pint->buffer[1],
  126. pint->buffer[2],tmpBuf,syst.wMilliseconds);
  127. fflush(stdout);
  128. }
  129. UnmapViewOfFile(Data);//解除映射
  130. Data=NULL;
  131. CloseHandle(mutex);
  132. CloseHandle(empty);
  133. CloseHandle(full);
  134. }
  135. int main(int argc,char * argv[])
  136. {
  137. int nCloneID=20;
  138. if(argc>1)
  139. {
  140. sscanf(argv[1],"%d",&nCloneID);
  141. }
  142. if(nCloneID<2)//生产者进程
  143. {
  144. pro();
  145. }
  146. else if(nCloneID<5)//消费者进程
  147. {
  148. con();
  149. }
  150. else//主进程
  151. {
  152. HANDLE hMap=CreateFileMapping(
  153. NULL,
  154. NULL,
  155. PAGE_READWRITE,
  156. 0,
  157. sizeof(struct buf),
  158. "myfilemap");
  159. if (hMap!=INVALID_HANDLE_VALUE)
  160. {
  161. LPVOID Data=MapViewOfFile(//文件映射
  162. hMap,
  163. FILE_MAP_ALL_ACCESS,
  164. 0,
  165. 0,
  166. 0);
  167. if (Data!=NULL)
  168. {
  169. ZeroMemory(Data,sizeof(struct buf));
  170. }
  171. struct buf *pnData=reinterpret_cast<struct buf *>(Data);
  172. pnData->read=0;
  173. pnData->write=0;
  174. memset(pnData->buffer,0,sizeof(pnData->buffer));
  175. UnmapViewOfFile(Data);//解除映射
  176. Data=NULL;
  177. }
  178. HANDLE empty = CreateSemaphore(NULL,3,3,"MYEMPTY");
  179. HANDLE full = CreateSemaphore(NULL,0,3,"MYFULL");
  180. for(int i=0;i<5;i++)//创建子进程
  181. StartClone(i);
  182. WaitForMultipleObjects(5,handleOfProcess,TRUE,INFINITE);
  183. CloseHandle(empty);
  184. CloseHandle(full);
  185. }
  186. }

Linux版本:

  1. //实验三:生产者消费者
  2. #include<string.h>
  3. #include <sys/time.h>
  4. #include<stdio.h>
  5. #include<sys/types.h>
  6. #include<unistd.h>
  7. #include<stdlib.h>
  8. #include<sys/sem.h>
  9. #include<sys/select.h>
  10. #include<sys/wait.h>
  11. #include<sys/ipc.h>
  12. #include<sys/shm.h>
  13. #include<time.h>
  14. #define SEM_ID1 225
  15. #define SEM_ID2 97
  16. #define SEM_ID3 234
  17. #define SHMKEY 75
  18. struct buf{
  19. char buffer[3];
  20. int read;
  21. int write;
  22. };
  23. int rand_1()
  24. {
  25. return rand()%300;
  26. }
  27. void sleep_ms(int s)
  28. {
  29. usleep(s*10000);
  30. }
  31. char* cur_time()
  32. {
  33. time_t timep;
  34. time(&timep);
  35. return ctime(&timep);
  36. }
  37. char rand_char()
  38. {
  39. return rand()%26+'A';
  40. }
  41. void P (int s)//p操作
  42. {
  43. struct sembuf sem_op;
  44. sem_op.sem_num=0;
  45. sem_op.sem_op=-1;
  46. sem_op.sem_flg=0;
  47. semop(s,&sem_op,1);
  48. }
  49. void V(int s)//v操作
  50. {
  51. struct sembuf sem_op;
  52. sem_op.sem_num=0;
  53. sem_op.sem_op=1;
  54. sem_op.sem_flg=0;
  55. semop(s,&sem_op,1);
  56. }
  57. void pro()//生产者
  58. {
  59. int tim,shmid,i=6;
  60. int sem_mutex,sem_empty,sem_full;
  61. void * addr;
  62. struct buf *pint;
  63. struct sembuf sem_op;
  64. struct timeval tv;
  65. sem_mutex=semget(SEM_ID1,1,0600);
  66. sem_empty=semget(SEM_ID2,1,0600);
  67. sem_full=semget(SEM_ID3,1,0600);
  68. shmid=shmget(SHMKEY,sizeof(struct buf),0777);
  69. addr=shmat(shmid,0,0);
  70. while(i--)
  71. {
  72. gettimeofday(&tv,NULL);
  73. srand((unsigned)tv.tv_usec);
  74. tim=rand_1();
  75. sleep_ms(tim);
  76. //P(empty)
  77. P(sem_empty);
  78. //P(mutex)
  79. P(sem_mutex);
  80. pint=(struct buf *)addr;
  81. // pint[semctl(sem_full,0,GETVAL)]=time;
  82. pint->buffer[pint->write]=rand_char();
  83. pint->write=(pint->write+1)%3;
  84. printf("当前生产者进程:%d 写入数据:\t%c\t%c\t%c\t@%lds%ldus\n",
  85. getpid(),pint->buffer[0],pint->buffer[1],
  86. pint->buffer[2],tv.tv_sec,tv.tv_usec);
  87. //V(mutex)
  88. V(sem_mutex);
  89. //V(full)
  90. V(sem_full);
  91. }
  92. shmdt(addr);
  93. }
  94. void con()//消费者
  95. {
  96. int tim,shmid,i=4;
  97. int sem_mutex,sem_empty,sem_full;
  98. void * addr;
  99. struct buf *pint;
  100. struct sembuf sem_op;
  101. struct timeval tv;
  102. sem_mutex=semget(SEM_ID1,1,0600);
  103. sem_empty=semget(SEM_ID2,1,0600);
  104. sem_full=semget(SEM_ID3,1,0600);
  105. shmid=shmget(SHMKEY,sizeof(struct buf),0777);
  106. addr=shmat(shmid,0,0);
  107. while(i--)
  108. {
  109. gettimeofday(&tv,NULL);
  110. srand((unsigned)tv.tv_usec);
  111. tim=rand_1();
  112. sleep_ms(tim);
  113. //P(full)
  114. P(sem_full);
  115. //P(mutex)
  116. P(sem_mutex);
  117. pint=(struct buf *)addr;
  118. pint->buffer[pint->read]=' ';
  119. pint->read=(pint->read+1)%3;
  120. printf("当前消费者进程:%d 读取数据:\t%c\t%c\t%c\t@%ldst%ldms\n",
  121. getpid(),pint->buffer[0],pint->buffer[1],
  122. pint->buffer[2],tv.tv_sec,tv.tv_usec);
  123. //V(mutex)
  124. V(sem_mutex);
  125. //V(empty)
  126. V(sem_empty);
  127. }
  128. shmdt(addr);
  129. }
  130. int main()
  131. {
  132. int sem_mutex,sem_empty,sem_full,shmid;
  133. void * addr;
  134. union semun {
  135. int val;
  136. }empty,full,mutex;
  137. //建立信号量
  138. sem_mutex=semget(SEM_ID1,1,IPC_CREAT|0600);
  139. sem_empty=semget(SEM_ID2,1,IPC_CREAT|0600);
  140. sem_full=semget(SEM_ID3,1,IPC_CREAT|0600);
  141. full.val=0;
  142. empty.val=3;
  143. mutex.val=1;
  144. semctl(sem_mutex,0,SETVAL,mutex);
  145. semctl(sem_empty,0,SETVAL,empty);
  146. semctl(sem_full,0,SETVAL,full);
  147. //建立共享内存并进行映射
  148. shmid=shmget(SHMKEY,sizeof(struct buf),0777|IPC_CREAT);
  149. if(-1==shmid)
  150. {
  151. printf("建立共享内存失败\n");
  152. exit(0);
  153. }
  154. addr=shmat(shmid,0,0);
  155. memset(addr,0,sizeof(struct buf));
  156. //执行生产者进程
  157. for(int i=0;i<2;i++)
  158. if(fork()==0)
  159. {
  160. pro();
  161. exit(0);
  162. }
  163. //执行消费者进程
  164. for(int i=0;i<3;i++)
  165. if(fork()==0)
  166. {
  167. con();
  168. exit(0);
  169. }
  170. while(-1 != wait(0));
  171. semctl(sem_mutex,0,IPC_RMID);
  172. semctl(sem_empty,0,IPC_RMID);
  173. semctl(sem_full,0,IPC_RMID);
  174. shmdt(addr);
  175. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/267924
推荐阅读
相关标签
  

闽ICP备14008679号