赞
踩
环形缓冲区(ring buffer),环形队列(ring queue) 多用于2个线程之间传递数据,是标准的先入先出(FIFO)模型。一般来说,对于多线程共享数据,需要使用mutex来同步,这样共享数据才不至于发生不可预测的修改/读取,然而,mutex的使用也带来了额外的系统开销,ring buffer/queue 的引入,就是为了有效地解决这个问题,因其特殊的结构及算法,可以用于2个线程中共享数据的同步,而且必须遵循A线程push in,B线程pull out的原则。采用这个机制另外一个用处是A push完数据后可以马上接收另外一个数据,当输入数据快速时不容易造成数据阻塞而丢包.
当然基于这个模型还可以新增一些功能,如在push前加锁,就可以让多个线程来push数据。
线程 A 媒介 线程 B
data in --------> ring buffer/queue -------> data out
===============RingBuffer完整Python测试代码=======================
- class RingBuffer:
- def __init__(self, size_max):
- self.max = size_max + 1
- self.data = [None] * self.max
- self.wrPos = 0
- self.rdPos = 0
-
- def is_ringbuffer_full(self):
- if (self.wrPos + 1) % self.max == self.rdPos:
- return 1
- else:
- return 0
-
- def is_ringbuffer_empty(self):
- if self.wrPos == self.rdPos:
- return 1
- else:
- return 0
-
- # pop up data from rb
- def get(self):
- if self.is_ringbuffer_empty():
- return None
- ret = self.data[self.rdPos]
- self.rdPos = (self.rdPos + 1) % self.max
- return ret
-
- # push data to rb
- def append(self, item):
- if self.is_ringbuffer_full():
- return 0
- self.data[self.wrPos] = item
- self.wrPos = (self.wrPos + 1) % self.max
- return 1
-
-
- if __name__ == "__main__":
- rb = RingBuffer(5)
- rb.append(1)
- rb.append(2)
- rb.append(3)
- rb.append(4)
- rb.append(5)
- rb.append(6)
- print(rb.get()) # 输出1
- print(rb.get()) # 输出2
- print(rb.get()) # 输出3
- print(rb.get()) # 输出4
- print(rb.get()) # 输出5
- print(rb.get()) # 输出None,因为buffer已经清空
-
- rb.append("Chinese") # 重新push
- rb.append("English")
- rb.append("Franch")
- rb.append([10, 20, 30])
-
- print(rb.get()) # 输出 Chinse
- print(rb.get()) # 输出 English
- print(rb.get()) # 输出 Franch
- print(rb.get()) # 输出 [10,20,30]
- print(rb.get()) # 输出None,因为buffer已经清空
=============RingBuffer完整C测试代码[单元素]==========
- #include <stdio.h>
- #include <stdlib.h>
-
- typedef struct TAG_RING_BUFFER
- {
- int wrPos;
- int rdPos;
- int maxCount;
- int remainCnt;
- char *buffer;
-
- } TagRingBuffer;
-
- TagRingBuffer* ringBuffer_init(int size)
- {
- //Note: (size-1) is full!!
- TagRingBuffer * rb;
- rb = (TagRingBuffer*)malloc(sizeof(TagRingBuffer));
- if( rb == NULL)
- {
- return NULL;
- }
- rb->buffer = (char*)malloc( size );
- if( rb->buffer == NULL )
- {
- return 0;
- }
- rb->maxCount = size;
- rb->wrPos = 0;
- rb->rdPos = 0;
- rb->remainCnt = 0;
- return rb;
- }
-
- void ringBuffer_free(TagRingBuffer* rb)
- {
- if( rb )
- {
- if(rb-> buffer)
- {
- free( rb-> buffer );
- }
- free( rb );
- }
- }
-
- int is_ringBuffer_full(TagRingBuffer* rb)
- {
- //Note:(MAXCOUNT-1) is full!!
- if( ( (rb->wrPos)+1)%(rb->maxCount) == rb->rdPos )
- return 1;
-
- return 0;
- }
-
- int is_ringBuffer_empty(TagRingBuffer* rb)
- {
- if( (rb->wrPos) == rb->rdPos ){
- return 1;
- }
-
- return 0;
- }
-
- int ringBuffer_pop(TagRingBuffer* rb,char *ch)
- { //read
- if( is_ringBuffer_empty(rb))
- {
- return 0;
- }
- *ch = rb->buffer[ rb->rdPos ];
- rb->rdPos = (rb->rdPos +1)%rb->maxCount;
- rb->remainCnt --;
- return 1;
- }
-
- int ringBuffer_push(TagRingBuffer* rb,char ch)
- { //write
- if( is_ringBuffer_full(rb))
- {
- return 0;
- }
- rb->buffer[ rb->wrPos ] = ch;
- rb->wrPos = (rb->wrPos +1)%(rb->maxCount);
- rb->remainCnt++;
- return 1;
- }
-
- int ringBuffer_getRemainCnt(TagRingBuffer* rb)
- {
- return rb->remainCnt;
- }
-
- //RingBuffer 测试[单元素]
- void main()
- {
-
- printf("----C_BuildTime: %s----\r\n", __TIME__ );
-
- TagRingBuffer *rb ;
- char ch;
- int retn = 1;
- int remain = 0;
-
- rb = ringBuffer_init(5);
-
- ringBuffer_push(rb,1);
- ringBuffer_push(rb,3);
- ringBuffer_push(rb,5);
- ringBuffer_push(rb,7);//effect data
-
- ringBuffer_push(rb,9); //buffer is full
- ringBuffer_push(rb,11);
- ringBuffer_push(rb,13);
- ringBuffer_push(rb,15);
-
-
- for(int i=0;i<10;i++)
- { //测试超出ringbuffer范围的数据
- ch = 0;
- remain = ringBuffer_getRemainCnt(rb);
- retn = ringBuffer_pop(rb,&ch);
- printf("[%d] out_data=(%d),remain=%d,ret=%d.\n",i,ch,remain,retn );
- }
- }
=============RingQueue完整C测试代码[多元素]==========
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
-
- typedef struct TAG_RING_QUEUE
- {
- int wrPos;
- int rdPos;
- int maxCount;
- int remainCnt;
- char**buffer;//storage address
-
- } TagRingQueue;
-
- TagRingQueue* ringQueue_init(int size)
- {
- //Note: (size-1) is full!!
- TagRingQueue * rb;
- rb = (TagRingQueue*)malloc(sizeof(TagRingQueue));
- if( rb == NULL)
- {
- return NULL;
- }
- rb->buffer = (char**)malloc( size* sizeof(char**) );
- if( rb->buffer == NULL )
- {
- return 0;
- }
- rb->maxCount = size;
- rb->wrPos = 0;
- rb->rdPos = 0;
- rb->remainCnt = 0;
- return rb;
- }
-
- void ringQueue_free(TagRingQueue* rb)
- {
- if( rb )
- {
- if(rb-> buffer)
- {
- free( rb-> buffer );
- }
- free( rb );
- }
- }
-
- //首字节为长度数据
- void ringQueue_free_pt(char*datPoint)
- {
- if( (datPoint!=NULL) && ((datPoint-1)!=NULL))
- {
- free(datPoint-1);
- }
- }
-
- int is_ringQueue_full(TagRingQueue* rb)
- {
- //Note:(MAXCOUNT-1) is full!!
- if( ( (rb->wrPos)+1)%(rb->maxCount) == rb->rdPos )
- return 1;
-
- return 0;
- }
-
- int is_ringQueue_empty(TagRingQueue* rb)
- {
- if( (rb->wrPos) == rb->rdPos ){
- return 1;
- }
-
- return 0;
- }
-
- //注意:data前1节存数据长度,后面的数据才是本体
- //注意使用完后必须释放指针.
- char* ringQueue_pop(TagRingQueue* rb,short *lenOut)
- { //read
- char *buff_pt = NULL;
- *lenOut = 0;
- if( is_ringQueue_empty(rb))
- {
- return NULL;
- }
- buff_pt = (char*)rb->buffer[ rb->rdPos ];
- rb->rdPos = (rb->rdPos +1)%rb->maxCount;
- rb->remainCnt --;
- if( buff_pt != NULL)
- {
- *lenOut = *buff_pt;
- buff_pt++;
- }
- return buff_pt;//must free after use! NOTE: free(buff_pt-1)
- }
-
- //注意:data前1字节存数据长度,后面的数据才是本体
- int ringQueue_push(TagRingQueue* rb,char *data,short len)
- { //write
- char *buff_pt = NULL;
- if( is_ringQueue_full(rb))
- {
- return 0;
- }
- buff_pt = (char*)malloc(len+1);
- if( buff_pt == NULL )
- {
- return -1;//no mem
- }
- buff_pt[0] = len; //data第一字节存数据长度
- memcpy(buff_pt+1,data,len);
- rb->buffer[ rb->wrPos ] = buff_pt;
- rb->wrPos = (rb->wrPos +1)%(rb->maxCount);
- rb->remainCnt++;
- return 1;
- }
-
- int ringQueue_getRemainCnt(TagRingQueue* rb)
- {
- return rb->remainCnt;
- }
-
-
- void dump_hex(char *infoStr,char *buf,int len)
- {
- printf("%s(%d)",infoStr,len);
- for(int i=0;i<len;i++)
- {
- printf("%02X ",(unsigned char)buf[i]);
- }
- printf("\n");
- }
-
- //RingQueue 测试 [多元素]
- void main()
- {
- printf("----C_BuildTime: %s----\r\n", __TIME__ );
-
- TagRingQueue *rb ;
- char data[50];
-
- rb = ringQueue_init(5);//MAXCOUNT-1=4
-
- char rawDat[]={0x39,0x38,0x03,0x00,0x35,0x00,0x00};
- ringQueue_push(rb,rawDat,sizeof(rawDat)); //test raw data
-
- sprintf(data,"English");
- ringQueue_push(rb,data,strlen(data)+1);
-
- sprintf(data,"USA");
- ringQueue_push(rb,data,strlen(data)+1);
-
- sprintf(data,"Japan");
- ringQueue_push(rb,data,strlen(data)+1);
-
- sprintf(data,"India");
- ringQueue_push(rb,data,strlen(data)+1);
-
- sprintf(data,"Yemen");
- ringQueue_push(rb,data,strlen(data)+1);
-
-
- char* datOut = NULL;
- short lenOut = 0;
-
-
- for(int i=0;i<10;i++)
- { //测试超出ringQueue范围的数据
- datOut = ringQueue_pop(rb,&lenOut);
- printf("\n[%d] len_out=%d,data=[%s].",i,lenOut,datOut);
- dump_hex("HEX",datOut,lenOut);
- ringQueue_free_pt(datOut);//must free after use!!
-
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。