当前位置:   article > 正文

环形缓冲区(ring buffer),环形队列(ring queue) 原理_环形缓冲队列

环形缓冲队列

   环形缓冲区(ring buffer),环形队列(ring queue) 多用于2个线程之间传递数据,是标准的先入先出(FIFO)模型。一般来说,对于多线程共享数据,需要使用mutex来同步,这样共享数据才不至于发生不可预测的修改/读取,然而,mutex的使用也带来了额外的系统开销,ring buffer/queue 的引入,就是为了有效地解决这个问题,因其特殊的结构及算法,可以用于2个线程中共享数据的同步,而且必须遵循A线程push in,B线程pull out的原则。采用这个机制另外一个用处是A push完数据后可以马上接收另外一个数据,当输入数据快速时不容易造成数据阻塞而丢包.

    当然基于这个模型还可以新增一些功能,如在push前加锁,就可以让多个线程来push数据。

线程 A                       媒介                        线程 B
data in --------> ring buffer/queue -------> data out

===============RingBuffer完整Python测试代码======================= 

  1. class RingBuffer:
  2. def __init__(self, size_max):
  3. self.max = size_max + 1
  4. self.data = [None] * self.max
  5. self.wrPos = 0
  6. self.rdPos = 0
  7. def is_ringbuffer_full(self):
  8. if (self.wrPos + 1) % self.max == self.rdPos:
  9. return 1
  10. else:
  11. return 0
  12. def is_ringbuffer_empty(self):
  13. if self.wrPos == self.rdPos:
  14. return 1
  15. else:
  16. return 0
  17. # pop up data from rb
  18. def get(self):
  19. if self.is_ringbuffer_empty():
  20. return None
  21. ret = self.data[self.rdPos]
  22. self.rdPos = (self.rdPos + 1) % self.max
  23. return ret
  24. # push data to rb
  25. def append(self, item):
  26. if self.is_ringbuffer_full():
  27. return 0
  28. self.data[self.wrPos] = item
  29. self.wrPos = (self.wrPos + 1) % self.max
  30. return 1
  31. if __name__ == "__main__":
  32. rb = RingBuffer(5)
  33. rb.append(1)
  34. rb.append(2)
  35. rb.append(3)
  36. rb.append(4)
  37. rb.append(5)
  38. rb.append(6)
  39. print(rb.get()) # 输出1
  40. print(rb.get()) # 输出2
  41. print(rb.get()) # 输出3
  42. print(rb.get()) # 输出4
  43. print(rb.get()) # 输出5
  44. print(rb.get()) # 输出None,因为buffer已经清空
  45. rb.append("Chinese") # 重新push
  46. rb.append("English")
  47. rb.append("Franch")
  48. rb.append([10, 20, 30])
  49. print(rb.get()) # 输出 Chinse
  50. print(rb.get()) # 输出 English
  51. print(rb.get()) # 输出 Franch
  52. print(rb.get()) # 输出 [10,20,30]
  53. print(rb.get()) # 输出None,因为buffer已经清空


=============RingBuffer完整C测试代码[单元素]==========

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. typedef struct TAG_RING_BUFFER
  4. {
  5. int wrPos;
  6. int rdPos;
  7. int maxCount;
  8. int remainCnt;
  9. char *buffer;
  10. } TagRingBuffer;
  11. TagRingBuffer* ringBuffer_init(int size)
  12. {
  13. //Note: (size-1) is full!!
  14. TagRingBuffer * rb;
  15. rb = (TagRingBuffer*)malloc(sizeof(TagRingBuffer));
  16. if( rb == NULL)
  17. {
  18. return NULL;
  19. }
  20. rb->buffer = (char*)malloc( size );
  21. if( rb->buffer == NULL )
  22. {
  23. return 0;
  24. }
  25. rb->maxCount = size;
  26. rb->wrPos = 0;
  27. rb->rdPos = 0;
  28. rb->remainCnt = 0;
  29. return rb;
  30. }
  31. void ringBuffer_free(TagRingBuffer* rb)
  32. {
  33. if( rb )
  34. {
  35. if(rb-> buffer)
  36. {
  37. free( rb-> buffer );
  38. }
  39. free( rb );
  40. }
  41. }
  42. int is_ringBuffer_full(TagRingBuffer* rb)
  43. {
  44. //Note:(MAXCOUNT-1) is full!!
  45. if( ( (rb->wrPos)+1)%(rb->maxCount) == rb->rdPos )
  46. return 1;
  47. return 0;
  48. }
  49. int is_ringBuffer_empty(TagRingBuffer* rb)
  50. {
  51. if( (rb->wrPos) == rb->rdPos ){
  52. return 1;
  53. }
  54. return 0;
  55. }
  56. int ringBuffer_pop(TagRingBuffer* rb,char *ch)
  57. { //read
  58. if( is_ringBuffer_empty(rb))
  59. {
  60. return 0;
  61. }
  62. *ch = rb->buffer[ rb->rdPos ];
  63. rb->rdPos = (rb->rdPos +1)%rb->maxCount;
  64. rb->remainCnt --;
  65. return 1;
  66. }
  67. int ringBuffer_push(TagRingBuffer* rb,char ch)
  68. { //write
  69. if( is_ringBuffer_full(rb))
  70. {
  71. return 0;
  72. }
  73. rb->buffer[ rb->wrPos ] = ch;
  74. rb->wrPos = (rb->wrPos +1)%(rb->maxCount);
  75. rb->remainCnt++;
  76. return 1;
  77. }
  78. int ringBuffer_getRemainCnt(TagRingBuffer* rb)
  79. {
  80. return rb->remainCnt;
  81. }
  82. //RingBuffer 测试[单元素]
  83. void main()
  84. {
  85. printf("----C_BuildTime: %s----\r\n", __TIME__ );
  86. TagRingBuffer *rb ;
  87. char ch;
  88. int retn = 1;
  89. int remain = 0;
  90. rb = ringBuffer_init(5);
  91. ringBuffer_push(rb,1);
  92. ringBuffer_push(rb,3);
  93. ringBuffer_push(rb,5);
  94. ringBuffer_push(rb,7);//effect data
  95. ringBuffer_push(rb,9); //buffer is full
  96. ringBuffer_push(rb,11);
  97. ringBuffer_push(rb,13);
  98. ringBuffer_push(rb,15);
  99. for(int i=0;i<10;i++)
  100. { //测试超出ringbuffer范围的数据
  101. ch = 0;
  102. remain = ringBuffer_getRemainCnt(rb);
  103. retn = ringBuffer_pop(rb,&ch);
  104. printf("[%d] out_data=(%d),remain=%d,ret=%d.\n",i,ch,remain,retn );
  105. }
  106. }

=============RingQueue完整C测试代码[多元素]==========

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. typedef struct TAG_RING_QUEUE
  5. {
  6. int wrPos;
  7. int rdPos;
  8. int maxCount;
  9. int remainCnt;
  10. char**buffer;//storage address
  11. } TagRingQueue;
  12. TagRingQueue* ringQueue_init(int size)
  13. {
  14. //Note: (size-1) is full!!
  15. TagRingQueue * rb;
  16. rb = (TagRingQueue*)malloc(sizeof(TagRingQueue));
  17. if( rb == NULL)
  18. {
  19. return NULL;
  20. }
  21. rb->buffer = (char**)malloc( size* sizeof(char**) );
  22. if( rb->buffer == NULL )
  23. {
  24. return 0;
  25. }
  26. rb->maxCount = size;
  27. rb->wrPos = 0;
  28. rb->rdPos = 0;
  29. rb->remainCnt = 0;
  30. return rb;
  31. }
  32. void ringQueue_free(TagRingQueue* rb)
  33. {
  34. if( rb )
  35. {
  36. if(rb-> buffer)
  37. {
  38. free( rb-> buffer );
  39. }
  40. free( rb );
  41. }
  42. }
  43. //首字节为长度数据
  44. void ringQueue_free_pt(char*datPoint)
  45. {
  46. if( (datPoint!=NULL) && ((datPoint-1)!=NULL))
  47. {
  48. free(datPoint-1);
  49. }
  50. }
  51. int is_ringQueue_full(TagRingQueue* rb)
  52. {
  53. //Note:(MAXCOUNT-1) is full!!
  54. if( ( (rb->wrPos)+1)%(rb->maxCount) == rb->rdPos )
  55. return 1;
  56. return 0;
  57. }
  58. int is_ringQueue_empty(TagRingQueue* rb)
  59. {
  60. if( (rb->wrPos) == rb->rdPos ){
  61. return 1;
  62. }
  63. return 0;
  64. }
  65. //注意:data前1节存数据长度,后面的数据才是本体
  66. //注意使用完后必须释放指针.
  67. char* ringQueue_pop(TagRingQueue* rb,short *lenOut)
  68. { //read
  69. char *buff_pt = NULL;
  70. *lenOut = 0;
  71. if( is_ringQueue_empty(rb))
  72. {
  73. return NULL;
  74. }
  75. buff_pt = (char*)rb->buffer[ rb->rdPos ];
  76. rb->rdPos = (rb->rdPos +1)%rb->maxCount;
  77. rb->remainCnt --;
  78. if( buff_pt != NULL)
  79. {
  80. *lenOut = *buff_pt;
  81. buff_pt++;
  82. }
  83. return buff_pt;//must free after use! NOTE: free(buff_pt-1)
  84. }
  85. //注意:data前1字节存数据长度,后面的数据才是本体
  86. int ringQueue_push(TagRingQueue* rb,char *data,short len)
  87. { //write
  88. char *buff_pt = NULL;
  89. if( is_ringQueue_full(rb))
  90. {
  91. return 0;
  92. }
  93. buff_pt = (char*)malloc(len+1);
  94. if( buff_pt == NULL )
  95. {
  96. return -1;//no mem
  97. }
  98. buff_pt[0] = len; //data第一字节存数据长度
  99. memcpy(buff_pt+1,data,len);
  100. rb->buffer[ rb->wrPos ] = buff_pt;
  101. rb->wrPos = (rb->wrPos +1)%(rb->maxCount);
  102. rb->remainCnt++;
  103. return 1;
  104. }
  105. int ringQueue_getRemainCnt(TagRingQueue* rb)
  106. {
  107. return rb->remainCnt;
  108. }
  109. void dump_hex(char *infoStr,char *buf,int len)
  110. {
  111. printf("%s(%d)",infoStr,len);
  112. for(int i=0;i<len;i++)
  113. {
  114. printf("%02X ",(unsigned char)buf[i]);
  115. }
  116. printf("\n");
  117. }
  118. //RingQueue 测试 [多元素]
  119. void main()
  120. {
  121. printf("----C_BuildTime: %s----\r\n", __TIME__ );
  122. TagRingQueue *rb ;
  123. char data[50];
  124. rb = ringQueue_init(5);//MAXCOUNT-1=4
  125. char rawDat[]={0x39,0x38,0x03,0x00,0x35,0x00,0x00};
  126. ringQueue_push(rb,rawDat,sizeof(rawDat)); //test raw data
  127. sprintf(data,"English");
  128. ringQueue_push(rb,data,strlen(data)+1);
  129. sprintf(data,"USA");
  130. ringQueue_push(rb,data,strlen(data)+1);
  131. sprintf(data,"Japan");
  132. ringQueue_push(rb,data,strlen(data)+1);
  133. sprintf(data,"India");
  134. ringQueue_push(rb,data,strlen(data)+1);
  135. sprintf(data,"Yemen");
  136. ringQueue_push(rb,data,strlen(data)+1);
  137. char* datOut = NULL;
  138. short lenOut = 0;
  139. for(int i=0;i<10;i++)
  140. { //测试超出ringQueue范围的数据
  141. datOut = ringQueue_pop(rb,&lenOut);
  142. printf("\n[%d] len_out=%d,data=[%s].",i,lenOut,datOut);
  143. dump_hex("HEX",datOut,lenOut);
  144. ringQueue_free_pt(datOut);//must free after use!!
  145. }
  146. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/601711
推荐阅读
相关标签
  

闽ICP备14008679号