概述
队列作为经典的数据结构,应用非常广泛,比如:消息队列FIFO进行相应处理。在实现过程中,可以采用链表、数组等存储结构。而RingBuffer环形队列,通过固定长度循环存储数据,在使用中无需再进行内存分配,不容易形成内存碎片,且进、出队列的时间为O(1),因此有着非常广泛的应用。相关内容可以参见:
http://en.wikipedia.org/wiki/Circular_buffer
在工作中,在证券行情接入、处理时,可以采用RingBuffer存放的数据对象。
实现代码如下:
#ifndef __RINGBUFFER_H
#define __RINGBUFFER_H
#include "stockquote.h"
#define RING_BUFFER_SIZE 500
typedef struct T_RING_BUFFER{
int size; //the ringbuffer size
int start; //the start pointer
int count; //elements count
T_STOCKQUOTE *elements;
} T_RING_BUFFER, *PT_RING_BUFFER;
void
InitRingBuffer(T_RING_BUFFER *rb, int size);
int
IsRbFull(T_RING_BUFFER *rb);
int
IsRbEmpty(T_RING_BUFFER *rb);
void
RbWrite(T_RING_BUFFER *rb, T_STOCKQUOTE *e);
void
RbRead(T_RING_BUFFER *rb, T_STOCKQUOTE *e);
void
DelRingBuffer(T_RING_BUFFER *rb);
#endif
以上为ringbuffer结构的定义,如下为具体实现:
void
InitRingBuffer(T_RING_BUFFER *rb, int size)
{
int idx;
rb->size = size;
rb->start = 0;
rb->count = 0;
rb->elements = (T_STOCKQUOTE *) malloc(rb->size * sizeof(T_STOCKQUOTE));
memset(rb->elements, 0 , size * sizeof(T_STOCKQUOTE));
}
int
IsRbFull(T_RING_BUFFER *rb)
{
return rb->count == rb->size;
}
int
IsRbEmpty(T_RING_BUFFER *rb)
{
return rb->count == 0;
}
void
RbWrite(T_RING_BUFFER *rb, T_STOCKQUOTE *e)
{
int end = (rb->start + rb->count) % rb->size;
rb->elements[end] = *e;
if (rb->count == rb->size)
rb->start = (rb->start + 1) % rb->size;
else
++rb->count;
}
void
RbRead(T_RING_BUFFER *rb, T_STOCKQUOTE *e)
{
*e = rb->elements[rb->start];
rb->start = (rb->start + 1) % rb->size;
--rb->count;
}
void
DelRingBuffer(T_RING_BUFFER *rb)
{
free(rb->elements);
free(rb);
}
在实际使用过程中,采用了多线程共享访问一个Ringbuffer的机制,并且不能进行覆盖。通过BSD pthread_cond_wait,pthread_mutex_lock方式对于队列的读写进行控制。在测试程序中,共有N个producer,每个producer发送10w个消息,1个consumer,对于各消息进行读取,读取共10w*N之后,程序推出。代码如下:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "ringbuffer.h"
#include "stockquote.h"
#define PRONUM 5
void *producer(void *args);
void *consumer(void *args);
pthread_mutex_t *mutex;
pthread_cond_t *notFull, *notEmpty;
int
main(int argc, char *argv[])
{
pthread_t proc[PRONUM];
pthread_t cons;
T_RING_BUFFER *rb;
int count = 0;
mutex = (pthread_mutex_t *) malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(mutex, NULL);
notFull = (pthread_cond_t *) malloc(sizeof(pthread_cond_t));
pthread_cond_init(notFull, NULL);
notEmpty = (pthread_cond_t *) malloc(sizeof(pthread_cond_t));
pthread_cond_init(notEmpty, NULL);
rb = (T_RING_BUFFER *) malloc(sizeof(T_RING_BUFFER));
InitRingBuffer(rb, RING_BUFFER_SIZE);
for (count = 0; count < PRONUM; count++)
{
pthread_create(&proc[count], NULL, producer, rb);
}
pthread_create(&cons, NULL, consumer, rb);
for (count = 0; count < PRONUM; count++)
{
printf("join return :%dn", pthread_join(proc[count], NULL));
}
printf("wait for procn");
pthread_join(cons, NULL);
printf("wait for consn");
printf("==========wait for thread finished.n");
DelRingBuffer(rb);
pthread_mutex_destroy(mutex);
pthread_cond_destroy(notFull);
pthread_cond_destroy(notEmpty);
return 0;
}
void *
producer(void* q)
{
int NUM = 100000;
printf("in producern");
T_RING_BUFFER *rb;
int i;
rb = (T_RING_BUFFER *)q;
while(NUM--)
{
pthread_mutex_lock(mutex);
while(IsRbFull(rb))
{
pthread_cond_wait(notFull, mutex);
}
T_STOCKQUOTE is;
is.number = 10;
RbWrite(rb, &is);
pthread_mutex_unlock(mutex);
pthread_cond_signal(notEmpty);
}
return NULL;
}
void *
consumer(void *q)
{
long long get_num = 0;
printf("in consumern");
T_RING_BUFFER *rb;
int i;
rb = (T_RING_BUFFER *)q;
while(get_num < (100000 * PRONUM) )
{
pthread_mutex_lock(mutex);
while(IsRbEmpty(rb))
{
pthread_cond_wait(notEmpty, mutex);
}
T_STOCKQUOTE s;
RbRead(rb, &s);
get_num++;
pthread_mutex_unlock(mutex);
pthread_cond_signal(notFull);
}
return NULL;
}
转帖:http://www.financecomputing.net/wordpress/?p=151
最后
以上就是精明鞋垫为你收集整理的RingBuffer队列多线程应用的全部内容,希望文章能够帮你解决RingBuffer队列多线程应用所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复