当前位置:   article > 正文

libevent的bufferevent剖析_libbufferevent

libbufferevent

1.什么是bufferevent

libevent为输入输出缓冲区做出了一个抽象,这个抽象就是bufferevent。用来自动管理输入输出缓冲区,我们要输入输出就可以对bufferevent操作了。有数据写入bufferevent时,添加数据到输出缓冲区。bufferevent有数据供读取的时候,从输入缓冲区抽取数据。

2.bufferevent结构体

  1. struct bufferevent {
  2. struct event_base *ev_base; //关联的event_base
  3. struct event ev_read; //读事件
  4. struct event ev_write; //写事件
  5. struct evbuffer *input; //输入buffer
  6. struct evbuffer *output; //输出buffer
  7. struct event_watermark wm_read; //读水位
  8. struct event_watermark wm_write; //写水位
  9. evbuffercb readcb; //buffer读回调函数
  10. evbuffercb writecb; //buffer写回调函数
  11. everrorcb errorcb; //错误回调函数
  12. void *cbarg; //回调函数的参数
  13. int timeout_read; /* in seconds */
  14. int timeout_write; /* in seconds */
  15. short enabled; /* events that are currently enabled */
  16. };
3.关于回调函数和水位

bufferevent一共有三个回调,两个用于读写数据。

readcb:读取任意量的数据后都会触发读取回调,默认0

writecb:数据被消耗一定数量后会触发写入回调,默认0

那么水位是什么呢?

一共有四个水位,读取低水位,读取高水位,写入低水位,写入高水位。程序注释中我们就可以看到这几个的作用了。

  读取低水位:读取操作使得输入缓冲区的数据量在此级别或者更高时,读取回调将被调用。默认值为0,所以每个读取操作都会导致读取回调被调用。

   读取高水位:输入缓冲区中的数据量达到此级别后,bufferevent将停止读取,直到输入缓冲区中足够量的数据被抽取,使得数据量低于此级别。默认值是无限,所以永远不会因为输入缓冲区的大小而停止读取。

  写入低水位:写入操作使得输出缓冲区的数据量达到或者低于此级别时,写入回调将被调用。默认值是0,所以只有输出缓冲区空的时候才会调用写入回调。

  写入高水位:bufferevent没有直接使用这个水位。它在bufferevent用作另外一个bufferevent的底层传输端口时有特殊意义。请看后面关于过滤型

bufferevent也有“错误”或者“事件”回调,用于向应用通知非面向数据的事件,如连接已经关闭或者发生错误:

比如我们关注了EV_TIMEOUT属性,在read_cb和write_cb函数中就会直接goto error,调用error回调函数。


下面是bufferevent的代码及注释:

  1. void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
  2. static int
  3. bufferevent_add(struct event *ev, int timeout)
  4. {
  5. struct timeval tv, *ptv = NULL;
  6. if (timeout) {
  7. evutil_timerclear(&tv); //时间清空
  8. tv.tv_sec = timeout;
  9. ptv = &tv;
  10. }
  11. return (event_add(ev, ptv)); //加入反应堆相应队列,如果没有设置timeout,不会注册timeout事件,返回0
  12. }
  13. /*
  14. * This callback is executed when the size of the input buffer changes.
  15. * We use it to apply back pressure on the reading side.
  16. */ //apply back means 申请回来
  17. void
  18. bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, //加read压力
  19. void *arg) {
  20. struct bufferevent *bufev = arg;
  21. /*
  22. * If we are below the watermark then reschedule reading if it's
  23. * still enabled.
  24. */
  25. if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { //如果高水位线为0,或者当前小于高水位线,不回调
  26. evbuffer_setcb(buf, NULL, NULL); //为什么要调用这个函数?
  27. if (bufev->enabled & EV_READ)
  28. bufferevent_add(&bufev->ev_read, bufev->timeout_read); //让这个时间过一定时间再读
  29. } //timeout_read在bufferevent_settimeout函数里面会设置,
  30. }
  31. static void
  32. bufferevent_readcb(int fd, short event, void *arg) //缓冲区可读会触发该回调
  33. {
  34. struct bufferevent *bufev = arg;
  35. int res = 0;
  36. short what = EVBUFFER_READ;
  37. size_t len;
  38. int howmuch = -1;
  39. /* Note that we only check for event==EV_TIMEOUT. If
  40. * event==EV_TIMEOUT|EV_READ, we can safely ignore the
  41. * timeout, since a read has occurred */
  42. if (event == EV_TIMEOUT) { //如果是超时事件,忽略
  43. what |= EVBUFFER_TIMEOUT;
  44. goto error;
  45. }
  46. /*
  47. * If we have a high watermark configured then we don't want to // 是否设置了输入缓冲区的最大大小
  48. * read more data than would make us reach the watermark.
  49. */ //先检测input缓冲区已有的数据
  50. if (bufev->wm_read.high != 0) { //如果读取高水位不为0
  51. howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input); //高水位长度-实际数据长度
  52. /* we might have lowered the watermark, stop reading */ //我们可能已经降低了水位,停止阅读,水位改动
  53. if (howmuch <= 0) { //如果为<=0,说明输入缓冲区的数据量达到了一定级别,bufferevent停止读取
  54. struct evbuffer *buf = bufev->input;
  55. event_del(&bufev->ev_read); //不能读了,删除可读事件
  56. evbuffer_setcb(buf,
  57. bufferevent_read_pressure_cb, bufev);
  58. return; //直接返回,不再读取
  59. }
  60. }
  61. res = evbuffer_read(bufev->input, fd, howmuch); //从fd读取数据到bufev->input
  62. if (res == -1) {
  63. if (errno == EAGAIN || errno == EINTR) //EAGAIN字面意思是再试一次,比如打开以nonblock打开一个socket?
  64. //连续read但无数据可读,就会返回EAGAIN,意思让你再试一次。EINTR
  65. goto reschedule; //跳到重新计划
  66. /* error case */
  67. what |= EVBUFFER_ERROR; //否则加上EVBUFFER_ERROR
  68. } else if (res == 0) {
  69. /* eof case */
  70. what |= EVBUFFER_EOF; //缓冲区结尾了
  71. }
  72. if (res <= 0)
  73. goto error;
  74. bufferevent_add(&bufev->ev_read, bufev->timeout_read); //添加超时
  75. /* See if this callbacks meets the water marks */
  76. len = EVBUFFER_LENGTH(bufev->input);
  77. if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) //如果最低水位线不等于0,且实际字节小于最低水位线,不处理,继续读取fd
  78. return;
  79. if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {//最高水位线不为0,且长度大于最高水位线 //这是从fd读取数据之后的情况
  80. struct evbuffer *buf = bufev->input;
  81. event_del(&bufev->ev_read); //从注册链表中删除
  82. /* Now schedule a callback for us when the buffer changes */
  83. evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); //设置读压回调函数
  84. }
  85. /* Invoke the user callback - must always be called last */
  86. if (bufev->readcb != NULL)
  87. (*bufev->readcb)(bufev, bufev->cbarg);
  88. return;
  89. reschedule:
  90. bufferevent_add(&bufev->ev_read, bufev->timeout_read); //一定时间后再读
  91. return;
  92. error:
  93. (*bufev->errorcb)(bufev, what, bufev->cbarg);
  94. }
  95. static void
  96. bufferevent_writecb(int fd, short event, void *arg)
  97. {
  98. struct bufferevent *bufev = arg;
  99. int res = 0;
  100. short what = EVBUFFER_WRITE;
  101. if (event == EV_TIMEOUT) {
  102. what |= EVBUFFER_TIMEOUT; //同样ignore
  103. goto error;
  104. }
  105. if (EVBUFFER_LENGTH(bufev->output)) { //如果有数据
  106. res = evbuffer_write(bufev->output, fd); //有数据就直接写到fd
  107. if (res == -1) {
  108. #ifndef WIN32
  109. /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
  110. *set errno. thus this error checking is not portable*/
  111. if (errno == EAGAIN ||
  112. errno == EINTR ||
  113. errno == EINPROGRESS)
  114. goto reschedule;
  115. /* error case */
  116. what |= EVBUFFER_ERROR;
  117. #else
  118. goto reschedule;
  119. #endif
  120. } else if (res == 0) {
  121. /* eof case */
  122. what |= EVBUFFER_EOF;
  123. }
  124. if (res <= 0)
  125. goto error;
  126. }
  127. if (EVBUFFER_LENGTH(bufev->output) != 0) //如果输出缓冲区还有数据,那就过段时间再输出
  128. bufferevent_add(&bufev->ev_write, bufev->timeout_write);
  129. /*
  130. * Invoke the user callback if our buffer is drained or below the
  131. * low watermark.
  132. */
  133. if (bufev->writecb != NULL &&
  134. EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)//到达低水位的回调
  135. (*bufev->writecb)(bufev, bufev->cbarg);
  136. return;
  137. reschedule:
  138. if (EVBUFFER_LENGTH(bufev->output) != 0) //如果数据不为0,一段时间后重写
  139. bufferevent_add(&bufev->ev_write, bufev->timeout_write);
  140. return;
  141. error:
  142. (*bufev->errorcb)(bufev, what, bufev->cbarg);
  143. }
  144. /*
  145. * Create a new buffered event object.
  146. *
  147. * The read callback is invoked whenever we read new data.
  148. * The write callback is invoked whenever the output buffer is drained.
  149. * The error callback is invoked on a write/read error or on EOF.
  150. *
  151. * Both read and write callbacks maybe NULL. The error callback is not
  152. * allowed to be NULL and have to be provided always.
  153. */
  154. //分配bufferevent结构体
  155. struct bufferevent *
  156. bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
  157. everrorcb errorcb, void *cbarg)
  158. {
  159. struct bufferevent *bufev;
  160. if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
  161. return (NULL);
  162. if ((bufev->input = evbuffer_new()) == NULL) { //分配evbuffer
  163. free(bufev);
  164. return (NULL);
  165. }
  166. if ((bufev->output = evbuffer_new()) == NULL) {
  167. evbuffer_free(bufev->input); //如果失败,析构掉之前的,避免内存泄漏
  168. free(bufev);
  169. return (NULL);
  170. }
  171. event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); //设置bufferevent内部读写事件
  172. event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
  173. bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
  174. /*
  175. * Set to EV_WRITE so that using bufferevent_write is going to
  176. * trigger a callback. Reading needs to be explicitly enabled
  177. * because otherwise no data will be available.
  178. */ //自动开启EV_WRITE,EV_READ需要显示声明
  179. bufev->enabled = EV_WRITE; // /** Events that are currently enabled: currently EV_READ and EV_WRITE are supported. */
  180. return (bufev);
  181. }
  182. void
  183. bufferevent_setcb(struct bufferevent *bufev,
  184. evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
  185. {
  186. bufev->readcb = readcb;
  187. bufev->writecb = writecb;
  188. bufev->errorcb = errorcb;
  189. bufev->cbarg = cbarg;
  190. }
  191. void
  192. bufferevent_setfd(struct bufferevent *bufev, int fd) //
  193. {
  194. event_del(&bufev->ev_read); //先解注册之前的事件
  195. event_del(&bufev->ev_write);
  196. event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
  197. event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
  198. if (bufev->ev_base != NULL) {
  199. event_base_set(bufev->ev_base, &bufev->ev_read);
  200. event_base_set(bufev->ev_base, &bufev->ev_write);
  201. }
  202. /* might have to manually trigger event registration */
  203. }
  204. int
  205. bufferevent_priority_set(struct bufferevent *bufev, int priority)
  206. {
  207. if (event_priority_set(&bufev->ev_read, priority) == -1) //设置优先级
  208. return (-1);
  209. if (event_priority_set(&bufev->ev_write, priority) == -1)
  210. return (-1);
  211. return (0);
  212. }
  213. /* Closing the file descriptor is the responsibility of the caller */
  214. void
  215. bufferevent_free(struct bufferevent *bufev)
  216. {
  217. event_del(&bufev->ev_read);
  218. event_del(&bufev->ev_write);
  219. evbuffer_free(bufev->input);
  220. evbuffer_free(bufev->output);
  221. free(bufev);
  222. }
  223. /*
  224. * Returns 0 on success;
  225. * -1 on failure.
  226. */
  227. //bufferevent写事件
  228. int
  229. bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
  230. {
  231. int res;
  232. res = evbuffer_add(bufev->output, data, size); //调用追加data函数,即evbuffer_add函数
  233. if (res == -1)
  234. return (res);
  235. /* If everything is okay, we need to schedule a write */
  236. if (size > 0 && (bufev->enabled & EV_WRITE)) //上面data追加完毕,现在将bufev->ev_write事件加入反应堆,等候
  237. bufferevent_add(&bufev->ev_write, bufev->timeout_write);
  238. return (res);
  239. }
  240. int
  241. bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
  242. {
  243. int res;
  244. res = bufferevent_write(bufev, buf->buffer, buf->off);
  245. if (res != -1)
  246. evbuffer_drain(buf, buf->off); //为什么发送失败要drain
  247. return (res);
  248. }
  249. size_t
  250. bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
  251. {
  252. struct evbuffer *buf = bufev->input;
  253. if (buf->off < size) //如果小于要读的字节数
  254. size = buf->off; //就读实际数据
  255. /* Copy the available data to the user buffer */
  256. memcpy(data, buf->buffer, size);
  257. if (size)
  258. evbuffer_drain(buf, size); //读完size字节后调整一下
  259. return (size);
  260. }
  261. //将某个event添加到event_base中,bufferevent开关
  262. int
  263. bufferevent_enable(struct bufferevent *bufev, short event)
  264. {
  265. if (event & EV_READ) {
  266. if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
  267. return (-1);
  268. }
  269. if (event & EV_WRITE) {
  270. if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
  271. return (-1);
  272. }
  273. bufev->enabled |= event;
  274. return (0);
  275. }
  276. //从对应队列删除
  277. int
  278. bufferevent_disable(struct bufferevent *bufev, short event)
  279. {
  280. if (event & EV_READ) {
  281. if (event_del(&bufev->ev_read) == -1)
  282. return (-1);
  283. }
  284. if (event & EV_WRITE) {
  285. if (event_del(&bufev->ev_write) == -1)
  286. return (-1);
  287. }
  288. bufev->enabled &= ~event;
  289. return (0);
  290. }
  291. /*
  292. * Sets the read and write timeout for a buffered event.
  293. */
  294. //设置超时
  295. void
  296. bufferevent_settimeout(struct bufferevent *bufev,
  297. int timeout_read, int timeout_write) {
  298. bufev->timeout_read = timeout_read;
  299. bufev->timeout_write = timeout_write;
  300. if (event_pending(&bufev->ev_read, EV_READ, NULL))//检测时间是否处于未决状态
  301. bufferevent_add(&bufev->ev_read, timeout_read);
  302. if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
  303. bufferevent_add(&bufev->ev_write, timeout_write);
  304. }
  305. /*
  306. * Sets the water marks
  307. */
  308. //设置水位线啊
  309. void
  310. bufferevent_setwatermark(struct bufferevent *bufev, short events,
  311. size_t lowmark, size_t highmark)
  312. {
  313. if (events & EV_READ) {
  314. bufev->wm_read.low = lowmark;
  315. bufev->wm_read.high = highmark;
  316. }
  317. if (events & EV_WRITE) {
  318. bufev->wm_write.low = lowmark;
  319. bufev->wm_write.high = highmark;
  320. }
  321. /* If the watermarks changed then see if we should call read again */
  322. bufferevent_read_pressure_cb(bufev->input,
  323. 0, EVBUFFER_LENGTH(bufev->input), bufev);
  324. }
  325. //和event_base关联
  326. int
  327. bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
  328. {
  329. int res;
  330. bufev->ev_base = base;
  331. res = event_base_set(base, &bufev->ev_read);
  332. if (res == -1)
  333. return (res);
  334. res = event_base_set(base, &bufev->ev_write);
  335. return (res);
  336. }

下面上一个测试例子:

  1. include <iostream>
  2. #include <event.h>
  3. #include <assert.h>
  4. #include <socket.h>
  5. using namespace std;
  6. struct event_base* base = NULL;
  7. struct bufferevent* buf_ev = NULL;
  8. void read_cb(struct bufferevent* buf, void* arg)
  9. {
  10. ///read thing from bufferevent///
  11. cout<<"read"<<endl;
  12. char data[1024] = {0};
  13. assert(buf_ev->input->off >= 0);
  14. cout<<buf_ev->input->off<<endl;
  15. bufferevent_read(buf_ev, data, sizeof(data));
  16. assert(data != NULL);
  17. cout<<data;
  18. /
  19. int connfd = *(int *)arg;
  20. bufferevent_write(buf_ev, data, strlen(data)+1);
  21. }
  22. void write_cb(struct bufferevent* buf, void* arg)
  23. {
  24. cout<<"write"<<endl;
  25. }
  26. void error_cb(struct bufferevent* buf, short what, void* arg)
  27. {
  28. cout<<"error"<<endl;
  29. ///read thing from bufferevent///
  30. char data[1024] = {0};
  31. assert(buf_ev->input->off > 0);
  32. cout<<buf_ev->input->off<<endl;
  33. bufferevent_read(buf_ev, data, sizeof(data));
  34. assert(data != NULL);
  35. cout<<data;
  36. /
  37. int connfd = *(int *)arg;
  38. bufferevent_write(buf_ev, data, strlen(data)+1);
  39. }
  40. int main()
  41. {
  42. base = event_init();
  43. assert(base != NULL);
  44. int sockfd = socket_create("127.0.0.1", 6666);
  45. assert(sockfd != -1);
  46. int connfd = accept(sockfd, NULL, NULL);
  47. assert(connfd != -1);
  48. buf_ev = bufferevent_new(connfd, read_cb, write_cb, error_cb, &connfd);
  49. assert(buf_ev != NULL);
  50. int ret;
  51. ret = bufferevent_base_set(base, buf_ev);
  52. assert(ret != -1);
  53. ret = bufferevent_enable(buf_ev, EV_READ);
  54. assert(ret != -1);
  55. //ret = bufferevent_disable(buf_ev, EV_READ); //if the bufferevent was disabled, ex: shut the EV_READ, reading will not excute.
  56. //assert(ret != -1);
  57. ///
  58. bufferevent_setwatermark(buf_ev, EV_READ, 3, 10);
  59. bufferevent_settimeout(buf_ev, 4, 100);
  60. event_base_dispatch(base);
  61. return 0;
  62. }
(FreeeLinux的博客:http://blog.csdn.net/freeelinux/article/details/52862170)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/518120
推荐阅读
相关标签
  

闽ICP备14008679号