赞
踩
1.什么是bufferevent
libevent为输入输出缓冲区做出了一个抽象,这个抽象就是bufferevent。用来自动管理输入输出缓冲区,我们要输入输出就可以对bufferevent操作了。有数据写入bufferevent时,添加数据到输出缓冲区。bufferevent有数据供读取的时候,从输入缓冲区抽取数据。
2.bufferevent结构体
- struct bufferevent {
- struct event_base *ev_base; //关联的event_base
-
- struct event ev_read; //读事件
- struct event ev_write; //写事件
-
- struct evbuffer *input; //输入buffer
- struct evbuffer *output; //输出buffer
-
- struct event_watermark wm_read; //读水位
- struct event_watermark wm_write; //写水位
-
- evbuffercb readcb; //buffer读回调函数
- evbuffercb writecb; //buffer写回调函数
- everrorcb errorcb; //错误回调函数
- void *cbarg; //回调函数的参数
-
- int timeout_read; /* in seconds */
- int timeout_write; /* in seconds */
-
- short enabled; /* events that are currently enabled */
- };
3.关于回调函数和水位
bufferevent一共有三个回调,两个用于读写数据。
readcb:读取任意量的数据后都会触发读取回调,默认0
writecb:数据被消耗一定数量后会触发写入回调,默认0
那么水位是什么呢?
一共有四个水位,读取低水位,读取高水位,写入低水位,写入高水位。程序注释中我们就可以看到这几个的作用了。
读取低水位:读取操作使得输入缓冲区的数据量在此级别或者更高时,读取回调将被调用。默认值为0,所以每个读取操作都会导致读取回调被调用。
读取高水位:输入缓冲区中的数据量达到此级别后,bufferevent将停止读取,直到输入缓冲区中足够量的数据被抽取,使得数据量低于此级别。默认值是无限,所以永远不会因为输入缓冲区的大小而停止读取。
写入低水位:写入操作使得输出缓冲区的数据量达到或者低于此级别时,写入回调将被调用。默认值是0,所以只有输出缓冲区空的时候才会调用写入回调。
写入高水位:bufferevent没有直接使用这个水位。它在bufferevent用作另外一个bufferevent的底层传输端口时有特殊意义。请看后面关于过滤型
bufferevent也有“错误”或者“事件”回调,用于向应用通知非面向数据的事件,如连接已经关闭或者发生错误:
比如我们关注了EV_TIMEOUT属性,在read_cb和write_cb函数中就会直接goto error,调用error回调函数。
下面是bufferevent的代码及注释:
- void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
-
- static int
- bufferevent_add(struct event *ev, int timeout)
- {
- struct timeval tv, *ptv = NULL;
-
- if (timeout) {
- evutil_timerclear(&tv); //时间清空
- tv.tv_sec = timeout;
- ptv = &tv;
- }
-
- return (event_add(ev, ptv)); //加入反应堆相应队列,如果没有设置timeout,不会注册timeout事件,返回0
- }
-
- /*
- * This callback is executed when the size of the input buffer changes.
- * We use it to apply back pressure on the reading side.
- */ //apply back means 申请回来
-
- void
- bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, //加read压力
- void *arg) {
- struct bufferevent *bufev = arg;
- /*
- * If we are below the watermark then reschedule reading if it's
- * still enabled.
- */
- if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { //如果高水位线为0,或者当前小于高水位线,不回调
- evbuffer_setcb(buf, NULL, NULL); //为什么要调用这个函数?
-
- if (bufev->enabled & EV_READ)
- bufferevent_add(&bufev->ev_read, bufev->timeout_read); //让这个时间过一定时间再读
- } //timeout_read在bufferevent_settimeout函数里面会设置,
- }
-
- static void
- bufferevent_readcb(int fd, short event, void *arg) //缓冲区可读会触发该回调
- {
- struct bufferevent *bufev = arg;
- int res = 0;
- short what = EVBUFFER_READ;
- size_t len;
- int howmuch = -1;
-
- /* Note that we only check for event==EV_TIMEOUT. If
- * event==EV_TIMEOUT|EV_READ, we can safely ignore the
- * timeout, since a read has occurred */
- if (event == EV_TIMEOUT) { //如果是超时事件,忽略
- what |= EVBUFFER_TIMEOUT;
- goto error;
- }
-
- /*
- * If we have a high watermark configured then we don't want to // 是否设置了输入缓冲区的最大大小
- * read more data than would make us reach the watermark.
- */ //先检测input缓冲区已有的数据
- if (bufev->wm_read.high != 0) { //如果读取高水位不为0
- howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input); //高水位长度-实际数据长度
- /* we might have lowered the watermark, stop reading */ //我们可能已经降低了水位,停止阅读,水位改动
- if (howmuch <= 0) { //如果为<=0,说明输入缓冲区的数据量达到了一定级别,bufferevent停止读取
- struct evbuffer *buf = bufev->input;
- event_del(&bufev->ev_read); //不能读了,删除可读事件
- evbuffer_setcb(buf,
- bufferevent_read_pressure_cb, bufev);
- return; //直接返回,不再读取
- }
- }
-
- res = evbuffer_read(bufev->input, fd, howmuch); //从fd读取数据到bufev->input
- if (res == -1) {
- if (errno == EAGAIN || errno == EINTR) //EAGAIN字面意思是再试一次,比如打开以nonblock打开一个socket?
- //连续read但无数据可读,就会返回EAGAIN,意思让你再试一次。EINTR
- goto reschedule; //跳到重新计划
- /* error case */
- what |= EVBUFFER_ERROR; //否则加上EVBUFFER_ERROR
- } else if (res == 0) {
- /* eof case */
- what |= EVBUFFER_EOF; //缓冲区结尾了
- }
-
- if (res <= 0)
- goto error;
-
- bufferevent_add(&bufev->ev_read, bufev->timeout_read); //添加超时
-
- /* See if this callbacks meets the water marks */
- len = EVBUFFER_LENGTH(bufev->input);
- if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) //如果最低水位线不等于0,且实际字节小于最低水位线,不处理,继续读取fd
- return;
- if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {//最高水位线不为0,且长度大于最高水位线 //这是从fd读取数据之后的情况
- struct evbuffer *buf = bufev->input;
- event_del(&bufev->ev_read); //从注册链表中删除
-
- /* Now schedule a callback for us when the buffer changes */
- evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); //设置读压回调函数
- }
-
- /* Invoke the user callback - must always be called last */
- if (bufev->readcb != NULL)
- (*bufev->readcb)(bufev, bufev->cbarg);
- return;
-
- reschedule:
- bufferevent_add(&bufev->ev_read, bufev->timeout_read); //一定时间后再读
- return;
-
- error:
- (*bufev->errorcb)(bufev, what, bufev->cbarg);
- }
-
- static void
- bufferevent_writecb(int fd, short event, void *arg)
- {
- struct bufferevent *bufev = arg;
- int res = 0;
- short what = EVBUFFER_WRITE;
-
- if (event == EV_TIMEOUT) {
- what |= EVBUFFER_TIMEOUT; //同样ignore
- goto error;
- }
-
- if (EVBUFFER_LENGTH(bufev->output)) { //如果有数据
- res = evbuffer_write(bufev->output, fd); //有数据就直接写到fd
- if (res == -1) {
- #ifndef WIN32
- /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
- *set errno. thus this error checking is not portable*/
- if (errno == EAGAIN ||
- errno == EINTR ||
- errno == EINPROGRESS)
- goto reschedule;
- /* error case */
- what |= EVBUFFER_ERROR;
-
- #else
- goto reschedule;
- #endif
-
- } else if (res == 0) {
- /* eof case */
- what |= EVBUFFER_EOF;
- }
- if (res <= 0)
- goto error;
- }
-
- if (EVBUFFER_LENGTH(bufev->output) != 0) //如果输出缓冲区还有数据,那就过段时间再输出
- bufferevent_add(&bufev->ev_write, bufev->timeout_write);
-
- /*
- * Invoke the user callback if our buffer is drained or below the
- * low watermark.
- */
- if (bufev->writecb != NULL &&
- EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)//到达低水位的回调
- (*bufev->writecb)(bufev, bufev->cbarg);
-
- return;
-
- reschedule:
- if (EVBUFFER_LENGTH(bufev->output) != 0) //如果数据不为0,一段时间后重写
- bufferevent_add(&bufev->ev_write, bufev->timeout_write);
- return;
-
- error:
- (*bufev->errorcb)(bufev, what, bufev->cbarg);
- }
-
- /*
- * Create a new buffered event object.
- *
- * The read callback is invoked whenever we read new data.
- * The write callback is invoked whenever the output buffer is drained.
- * The error callback is invoked on a write/read error or on EOF.
- *
- * Both read and write callbacks maybe NULL. The error callback is not
- * allowed to be NULL and have to be provided always.
- */
-
- //分配bufferevent结构体
- struct bufferevent *
- bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
- everrorcb errorcb, void *cbarg)
- {
- struct bufferevent *bufev;
-
- if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
- return (NULL);
-
- if ((bufev->input = evbuffer_new()) == NULL) { //分配evbuffer
- free(bufev);
- return (NULL);
- }
-
- if ((bufev->output = evbuffer_new()) == NULL) {
- evbuffer_free(bufev->input); //如果失败,析构掉之前的,避免内存泄漏
- free(bufev);
- return (NULL);
- }
-
- event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); //设置bufferevent内部读写事件
- event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
-
- bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
-
- /*
- * Set to EV_WRITE so that using bufferevent_write is going to
- * trigger a callback. Reading needs to be explicitly enabled
- * because otherwise no data will be available.
- */ //自动开启EV_WRITE,EV_READ需要显示声明
- bufev->enabled = EV_WRITE; // /** Events that are currently enabled: currently EV_READ and EV_WRITE are supported. */
-
- return (bufev);
- }
-
- void
- bufferevent_setcb(struct bufferevent *bufev,
- evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
- {
- bufev->readcb = readcb;
- bufev->writecb = writecb;
- bufev->errorcb = errorcb;
-
- bufev->cbarg = cbarg;
- }
-
- void
- bufferevent_setfd(struct bufferevent *bufev, int fd) //
- {
- event_del(&bufev->ev_read); //先解注册之前的事件
- event_del(&bufev->ev_write);
-
- event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
- event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
- if (bufev->ev_base != NULL) {
- event_base_set(bufev->ev_base, &bufev->ev_read);
- event_base_set(bufev->ev_base, &bufev->ev_write);
- }
-
- /* might have to manually trigger event registration */
- }
-
- int
- bufferevent_priority_set(struct bufferevent *bufev, int priority)
- {
- if (event_priority_set(&bufev->ev_read, priority) == -1) //设置优先级
- return (-1);
- if (event_priority_set(&bufev->ev_write, priority) == -1)
- return (-1);
-
- return (0);
- }
-
- /* Closing the file descriptor is the responsibility of the caller */
-
- void
- bufferevent_free(struct bufferevent *bufev)
- {
- event_del(&bufev->ev_read);
- event_del(&bufev->ev_write);
-
- evbuffer_free(bufev->input);
- evbuffer_free(bufev->output);
-
- free(bufev);
- }
-
- /*
- * Returns 0 on success;
- * -1 on failure.
- */
-
- //bufferevent写事件
- int
- bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
- {
- int res;
-
- res = evbuffer_add(bufev->output, data, size); //调用追加data函数,即evbuffer_add函数
-
- if (res == -1)
- return (res);
-
- /* If everything is okay, we need to schedule a write */
- if (size > 0 && (bufev->enabled & EV_WRITE)) //上面data追加完毕,现在将bufev->ev_write事件加入反应堆,等候
- bufferevent_add(&bufev->ev_write, bufev->timeout_write);
-
- return (res);
- }
-
- int
- bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
- {
- int res;
-
- res = bufferevent_write(bufev, buf->buffer, buf->off);
- if (res != -1)
- evbuffer_drain(buf, buf->off); //为什么发送失败要drain
-
- return (res);
- }
-
- size_t
- bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
- {
- struct evbuffer *buf = bufev->input;
-
- if (buf->off < size) //如果小于要读的字节数
- size = buf->off; //就读实际数据
-
- /* Copy the available data to the user buffer */
- memcpy(data, buf->buffer, size);
-
- if (size)
- evbuffer_drain(buf, size); //读完size字节后调整一下
-
- return (size);
- }
-
- //将某个event添加到event_base中,bufferevent开关
- int
- bufferevent_enable(struct bufferevent *bufev, short event)
- {
- if (event & EV_READ) {
- if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
- return (-1);
- }
- if (event & EV_WRITE) {
- if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
- return (-1);
- }
-
- bufev->enabled |= event;
- return (0);
- }
-
- //从对应队列删除
- int
- bufferevent_disable(struct bufferevent *bufev, short event)
- {
- if (event & EV_READ) {
- if (event_del(&bufev->ev_read) == -1)
- return (-1);
- }
- if (event & EV_WRITE) {
- if (event_del(&bufev->ev_write) == -1)
- return (-1);
- }
-
- bufev->enabled &= ~event;
- return (0);
- }
-
- /*
- * Sets the read and write timeout for a buffered event.
- */
-
- //设置超时
- void
- bufferevent_settimeout(struct bufferevent *bufev,
- int timeout_read, int timeout_write) {
- bufev->timeout_read = timeout_read;
- bufev->timeout_write = timeout_write;
-
- if (event_pending(&bufev->ev_read, EV_READ, NULL))//检测时间是否处于未决状态
- bufferevent_add(&bufev->ev_read, timeout_read);
- if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
- bufferevent_add(&bufev->ev_write, timeout_write);
- }
-
- /*
- * Sets the water marks
- */
- //设置水位线啊
- void
- bufferevent_setwatermark(struct bufferevent *bufev, short events,
- size_t lowmark, size_t highmark)
- {
- if (events & EV_READ) {
- bufev->wm_read.low = lowmark;
- bufev->wm_read.high = highmark;
- }
-
- if (events & EV_WRITE) {
- bufev->wm_write.low = lowmark;
- bufev->wm_write.high = highmark;
- }
-
- /* If the watermarks changed then see if we should call read again */
- bufferevent_read_pressure_cb(bufev->input,
- 0, EVBUFFER_LENGTH(bufev->input), bufev);
- }
- //和event_base关联
- int
- bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
- {
- int res;
-
- bufev->ev_base = base;
-
- res = event_base_set(base, &bufev->ev_read);
- if (res == -1)
- return (res);
-
- res = event_base_set(base, &bufev->ev_write);
- return (res);
- }
下面上一个测试例子:
- include <iostream>
- #include <event.h>
- #include <assert.h>
- #include <socket.h>
- using namespace std;
-
- struct event_base* base = NULL;
- struct bufferevent* buf_ev = NULL;
-
- void read_cb(struct bufferevent* buf, void* arg)
- {
- ///read thing from bufferevent///
- cout<<"read"<<endl;
- char data[1024] = {0};
- assert(buf_ev->input->off >= 0);
- cout<<buf_ev->input->off<<endl;
- bufferevent_read(buf_ev, data, sizeof(data));
- assert(data != NULL);
- cout<<data;
- /
- int connfd = *(int *)arg;
- bufferevent_write(buf_ev, data, strlen(data)+1);
- }
-
- void write_cb(struct bufferevent* buf, void* arg)
- {
- cout<<"write"<<endl;
- }
-
- void error_cb(struct bufferevent* buf, short what, void* arg)
- {
- cout<<"error"<<endl;
- ///read thing from bufferevent///
- char data[1024] = {0};
- assert(buf_ev->input->off > 0);
- cout<<buf_ev->input->off<<endl;
- bufferevent_read(buf_ev, data, sizeof(data));
- assert(data != NULL);
- cout<<data;
- /
- int connfd = *(int *)arg;
- bufferevent_write(buf_ev, data, strlen(data)+1);
- }
-
- int main()
- {
- base = event_init();
- assert(base != NULL);
-
- int sockfd = socket_create("127.0.0.1", 6666);
- assert(sockfd != -1);
-
- int connfd = accept(sockfd, NULL, NULL);
- assert(connfd != -1);
-
- buf_ev = bufferevent_new(connfd, read_cb, write_cb, error_cb, &connfd);
- assert(buf_ev != NULL);
-
- int ret;
- ret = bufferevent_base_set(base, buf_ev);
- assert(ret != -1);
-
- ret = bufferevent_enable(buf_ev, EV_READ);
- assert(ret != -1);
-
-
- //ret = bufferevent_disable(buf_ev, EV_READ); //if the bufferevent was disabled, ex: shut the EV_READ, reading will not excute.
- //assert(ret != -1);
- ///
- bufferevent_setwatermark(buf_ev, EV_READ, 3, 10);
-
- bufferevent_settimeout(buf_ev, 4, 100);
-
-
- event_base_dispatch(base);
-
- return 0;
- }
(FreeeLinux的博客:http://blog.csdn.net/freeelinux/article/details/52862170)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。