当前位置:   article > 正文

Libevent之 bufferevent 及 bufferevent_sock 详解_bufferevent_socket_new

bufferevent_socket_new

介绍

通常已连接的套接字除了相应事件之外,应用还希望做一定的数据缓冲。比如写入数据的时候,通常的运行模式是:

  • 决定要向连接写入一些数据,把数据放入到缓冲区中。
  • 等待连接可以写入。
  • 写入尽量多的数据。
  • 记住写入了多少数据,如果还有更多数据要写入,等待连接再次可以写入。

这种缓冲IO模式很通用,libevent为此提供了一种通用机制即buffereventbufferevent由一个底层的传输端口(如已连接套接字)、一个读取缓冲区和一个写入缓冲区组成。与通常的事件在底层传输端口已经就绪,可以读取或者写入的时候执行回调不同的是,bufferevent在读取或者写入了足够量的数据之后再调用用户提供的回调。用户通过bufferevent不必处理系统底层IO操作,仅仅从bufferevent中读取相应数据即可。当前bufferevent只能用于像TCP这样的面向流的协议,将来才可能会支持像UDP这样的面向数据报的协议。

前面讲述了evconnlistener是为了简化服务器从创建套接字到accept成功这个繁琐的过程。
这里讲述的bufferevent则是为了简化在accpet成功返回已连接套接字之后,从已连接套接字接收数据和发送数据(这也需要通过epoll处理)需要自建立缓冲区的过程,bufferevent通过链表建立自己的缓冲区,使得用户不必为缓冲数据的处理而烦恼,用户仅仅需要在回调函数里面通过bufferevent_read或者bufferevent_write读出或写入数据即可。

下面需要重点关注一点:
libevent如何处理非阻塞的读和写过程?因为evconnlistener返回的连接套接字默认是非阻塞的。

当套接字读使能时候,bufferevent将从套接字尽可能读取多的数据,然后调用用户先前设定的回调函数。当套接字写使能时,bufferevent将向套接字尽可能写多数据,当达到一定条件时候,调用用户先前设定的回调函数。

bufferevent可以作用与socket、socketpair、filtter这篇博客主要围绕bufferevent作用与 bufferevent_sock讲解,其中涉及的两个文件是bufferevent.c、bufferevent_sock.c

#bufferevent结构体
每个bufferevent都有一个输入缓冲区和一个输出缓冲区,它们的类型都是struct evbuffer。有数据要写入到bufferevent时,添加数据到输出缓冲区;bufferevent中有数据供读取的时候,从输入缓冲区抽取(drain)数据,并调用用户对应的读取回调函数。

bufferevent其实也就是在event_base的基础上再进行一层封装,其本质还是离不开event和event_base,从bufferevent的结构体就可以看到这一点。
bufferevent结构体中有两个event,分别用来监听同一个fd的可读事件和可写事件。为什么不用一个event同时监听可读和可写呢?这是因为监听可写是困难的,下面会说到原因。读者也可以自问一下,自己之前有没有试过用最原始的event监听一个fd的可写。
由于socket 是全双工的,所以在bufferevent结构体中,也有两个evbuffer成员,分别是读缓冲区和写缓冲区。bufferevent结构体定义如下:

//socket、filter、socketpair都可以使用bufferevent,所以新建一个结构体保存处理不同类型的函数
struct bufferevent {
	/** Event base for which this bufferevent was created. */
	struct event_base *ev_base;
	/** Pointer to a table of function pointers to set up how this
	    bufferevent behaves. */
	const struct bufferevent_ops *be_ops;//操作结构体,成员有一些函数指针。类似struct eventop结构体

	/** A read event that triggers when a timeout has happened or a socket
	    is ready to read data.  Only used by some subtypes of
	    bufferevent. */
	struct event ev_read;//读事件event
	/** A write event that triggers when a timeout has happened or a socket
	    is ready to write data.  Only used by some subtypes of
	    bufferevent. */
	struct event ev_write;//写事件event 

	/** An input buffer. Only the bufferevent is allowed to add data to
	    this buffer, though the user is allowed to drain it. */
	struct evbuffer *input;//读缓冲

	/** An input buffer. Only the bufferevent is allowed to drain data
	    from this buffer, though the user is allowed to add it. */
	struct evbuffer *output;//写缓冲

	struct event_watermark wm_read;//读水位
	struct event_watermark wm_write;//写水位

	bufferevent_data_cb readcb;//可读回调函数
	bufferevent_data_cb writecb;//可写回调函数
	/* This should be called 'eventcb', but renaming it would break
	 * backward compatibility */
	bufferevent_event_cb errorcb;//错误发生时候回调函数
	void *cbarg;//回调参数

	struct timeval timeout_read;//读事件超时值
	struct timeval timeout_write;//写事件超时值

	/** Events that are currently enabled: currently EV_READ and EV_WRITE
	    are supported. */
	short enabled;//事件使能标志位
};

//bufferevent-internal.h文件  
//bufferevent可以处理socketpair和pair和socket,三者拥有共同的东西,抽象出来
struct bufferevent_private {
	/** The underlying bufferevent structure. */
	struct bufferevent bev;//最基础的bufferevent结构体,里面存储事件及一些属性

	/** Evbuffer callback to enforce watermarks on input. */
	struct evbuffer_cb_entry *read_watermarks_cb;//设置input evbuffer的高水位时,需要一个evbuffer回调函数配合工作。evbuffer有数据即调用

	/** If set, we should free the lock when we free the bufferevent. */
	unsigned own_lock : 1;//锁是Libevent自动分配的,还是用户分配的

	/** Flag: set if we have deferred callbacks and a read callback is
	 * pending. */
	unsigned readcb_pending : 1;//是否延迟回调函数的调用
	/** Flag: set if we have deferred callbacks and a write callback is
	 * pending. */
	unsigned writecb_pending : 1;//同上
	/** Flag: set if we are currently busy connecting. */
	unsigned connecting : 1;//连接是否处理忙
	/** Flag: set if a connect failed prematurely; this is a hack for
	 * getting around the bufferevent abstraction. */
	unsigned connection_refused : 1;//
	/** Set to the events pending if we have deferred callbacks and
	 * an events callback is pending. */
	short eventcb_pending;//

	/** If set, read is suspended until one or more conditions are over.
	 * The actual value here is a bitfield of those conditions; see the
	 * BEV_SUSPEND_* flags above. */
	bufferevent_suspend_flags read_suspended; //标志是什么原因把读挂起来

	/** If set, writing is suspended until one or more conditions are over.
	 * The actual value here is a bitfield of those conditions; see the
	 * BEV_SUSPEND_* flags above. */
	bufferevent_suspend_flags write_suspended; //标志是什么原因把写挂起来

	/** Set to the current socket errno if we have deferred callbacks and
	 * an events callback is pending. */
	int errno_pending;

	/** The DNS error code for bufferevent_socket_connect_hostname */
	int dns_error;

	/** Used to implement deferred callbacks */
	struct deferred_cb deferred;

	/** The options this bufferevent was constructed with */
	enum bufferevent_options options;

	/** Current reference count for this bufferevent. */
	int refcnt;//引用计数

	/** Lock for this bufferevent.  Shared by the inbuf and the outbuf.
	 * If NULL, locking is disabled. */
	void *lock;//所变量

	/** Rate-limiting information for this bufferevent */
	struct bufferevent_rate_limit *rate_limiting;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103

上述是实现bufferevent的基本结构体。struct bufferevent是基本结构体。struct bufferevent_private是重新包装第一结构体,使得实现功能非常简单。

bufferevent_socket_new

创建一个用于socket的bufferevent,用来缓存套接字读或写的数据,然后调用对应的回调函数,是的用户可以仅仅关注生成业务逻辑功能。

struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
	struct bufferevent_private *bufev_p;
	struct bufferevent *bufev;

	if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)//分配bufferevent_private结构体
		return NULL;

//因为用于socket,所以通过bufferevent_ops_socket全局结构体初始化

	if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
				    options) < 0) {
		mm_free(bufev_p);
		return NULL;
	}//初始化socket对应的bufferevent_private
	bufev = &bufev_p->bev;
	//设置将evbuffer的数据向利用最高传输方式fd传
	evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);

	//将读写事件与fd关联起来。
	event_assign(&bufev->ev_read, bufev->ev_base, fd,
	    EV_READ|EV_PERSIST, bufferevent_readcb, bufev);//将读事件与内部回调函数bufferevent_readcb接合起来
	event_assign(&bufev->ev_write, bufev->ev_base, fd,
	    EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);//将写事件与内部回调函数bufferevent_writecb接合起来


    //设置evbuffer的回调函数,使得外界给写缓冲区添加数据时,能触发  
	//写操作,这个回调对于写事件的监听是很重要的 
	evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);


    //冻结读缓冲区的尾部,未解冻之前不能往读缓冲区追加数据  
    //也就是说不能从socket fd中读取数据  
	evbuffer_freeze(bufev->input, 0);

	//冻结写缓冲区的头部,未解冻之前不能把写缓冲区的头部数据删除  
    //也就是说不能把数据写到socket fd  
	evbuffer_freeze(bufev->output, 1);

	return bufev;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

上述函数必须注意三个回调函数的设置:bufferevent_readcbbufferevent_writecbbufferevent_socket_outbuf_cb分别对应于bufferevent读就绪的回调函数,写就绪的回调函数,以及evbuff写数据(向evbuffer写入数据,调用此函数)的回调函数。这三个回调函数都是libevent内部的函数,用户完全不知道系统调用这此函数。后续分析libevent内部将socket缓冲区数据读或写到evbuffer,都依靠这几个函数。


//初始化bufferevent公共的部分。
int
bufferevent_init_common(struct bufferevent_private *bufev_private,
    struct event_base *base,
    const struct bufferevent_ops *ops,
    enum bufferevent_options options)
{
	struct bufferevent *bufev = &bufev_private->bev;

	if (!bufev->input) {//动态创建读evbuffer
		if ((bufev->input = evbuffer_new()) == NULL)
			return -1;
	}

	if (!bufev->output) {//动态创建写evbuffer
		if ((bufev->output = evbuffer_new()) == NULL) {
			evbuffer_free(bufev->input);//失败则释放先前分配的内存
			return -1;
		}
	}

	bufev_private->refcnt = 1;
	bufev->ev_base = base;

	/* Disable timeouts. */
	//默认情况下,读和写event都是不支持超时的
	evutil_timerclear(&bufev->timeout_read);
	evutil_timerclear(&bufev->timeout_write);

	bufev->be_ops = ops;

	/*
	 * Set to EV_WRITE so that using bufferevent_write is going to
	 * trigger a callback.  Reading needs to be explicitly enabled
	 * because otherwise no data will be available.
	 */
	bufev->enabled = EV_WRITE;

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
	if (options & BEV_OPT_THREADSAFE) {
		if (bufferevent_enable_locking(bufev, NULL) < 0) {
			/* cleanup */
			evbuffer_free(bufev->input);
			evbuffer_free(bufev->output);
			bufev->input = NULL;
			bufev->output = NULL;
			return -1;
		}
	}
#endif
	if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
	    == BEV_OPT_UNLOCK_CALLBACKS) {
		event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
		return -1;
	}
	if (options & BEV_OPT_DEFER_CALLBACKS) {
		if (options & BEV_OPT_UNLOCK_CALLBACKS)
			event_deferred_cb_init(&bufev_private->deferred,
			    bufferevent_run_deferred_callbacks_unlocked,
			    bufev_private);
		else
			event_deferred_cb_init(&bufev_private->deferred,
			    bufferevent_run_deferred_callbacks_locked,
			    bufev_private);
	}

	bufev_private->options = options;

	//将evbuffer和bufferevent相关联
	evbuffer_set_parent(bufev->input, bufev);
	evbuffer_set_parent(bufev->output, bufev);

	return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

这主要初始化bufferevent中的evbuffer,这里动态分配evbuffer操作很牛逼。注意上述bufferevent_ops_socket的选择,这里因为用于socket,所以选择了ops_socket。

// 存储bufferevent对应操作的函数指针,后续方便调用
const struct bufferevent_ops bufferevent_ops_socket = {
	"socket",                    //类型名称 
	evutil_offsetof(struct bufferevent_private, bev),//成员bev的偏移量
	be_socket_enable, //启动。将event加入到event_base中
	be_socket_disable,
	be_socket_destruct,
	be_socket_adj_timeouts,
	be_socket_flush,
	be_socket_ctrl,
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

bufferevent_ops_socket里面存储操作socket 套接字的选项函数,因为bufferevent可以用于多个模块,所以这里将控制其操作封装成结构体指针数组。上述里面的函数全部都是静态的,意味着全部都有libevent内部调用,用户无须调用这些函数

#define be_socket_add(ev, t)			\
	_bufferevent_add_event((ev), (t))

int
_bufferevent_add_event(struct event *ev, const struct timeval *tv)
{
	if (tv->tv_sec == 0 && tv->tv_usec == 0)
		return event_add(ev, NULL);
	else
		return event_add(ev, tv);
}

//将bufferevent里的读或者写事件加入epoll监听
static int
be_socket_enable(struct bufferevent *bufev, short event)
{
	if (event & EV_READ) {
		if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
			return -1;
	}
	if (event & EV_WRITE) {
		if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
			return -1;
	}
	return 0;
}

//将bufferevent里的读或者写事件从epoll中移除,并删除事件。
static int
be_socket_disable(struct bufferevent *bufev, short event)
{
	struct bufferevent_private *bufev_p =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
	if (event & EV_READ) {
		if (event_del(&bufev->ev_read) == -1)
			return -1;
	}
	/* Don't actually disable the write if we are trying to connect. */
	if ((event & EV_WRITE) && ! bufev_p->connecting) {
		if (event_del(&bufev->ev_write) == -1)//删掉这个event
			return -1;
	}
	return 0;
}

//删除先前事件,并将读写事件和新的fd绑定,并加入到epoll监听。
static void
be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd)
{
	BEV_LOCK(bufev);
	EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket);

	event_del(&bufev->ev_read);//删除旧事件
	event_del(&bufev->ev_write);

	event_assign(&bufev->ev_read, bufev->ev_base, fd,
	    EV_READ|EV_PERSIST, bufferevent_readcb, bufev);//设置读事件属性
	event_assign(&bufev->ev_write, bufev->ev_base, fd,
	    EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);//设置写事件属性

	if (fd >= 0)
		bufferevent_enable(bufev, bufev->enabled);//使能,加入epoll

	BEV_UNLOCK(bufev);
}

//删除事件,并关闭socket套接字
static void
be_socket_destruct(struct bufferevent *bufev)
{
	struct bufferevent_private *bufev_p =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
	evutil_socket_t fd;
	EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket);

	fd = event_get_fd(&bufev->ev_read);

	event_del(&bufev->ev_read);
	event_del(&bufev->ev_write);

	if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0)
		EVUTIL_CLOSESOCKET(fd);
}

//依据选项获取不同的sockcet操作
static int
be_socket_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
    union bufferevent_ctrl_data *data)
{
	switch (op) {
	case BEV_CTRL_SET_FD:
		be_socket_setfd(bev, data->fd);
		return 0;
	case BEV_CTRL_GET_FD:
		data->fd = event_get_fd(&bev->ev_read);
		return 0;
	case BEV_CTRL_GET_UNDERLYING:
	case BEV_CTRL_CANCEL_ALL:
	default:
		return -1;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102

上述函数在socket读或者写的时候,经常用到通过bufev->be_ops进行调用。

bufferevent处理可读事件

每个bufferevent有两个数据相关的回调:一个读取回调和一个写入回调。默认情况下,从底层传输端口读取了任意量的数据之后会调用读取回调;输出缓冲区中足够量的数据被清空到底层传输端口后写入回调会被调用。通过调整 bufferevent 的读取和写入水位可以覆盖这些函数的默认行为。函数bufferevent_setwatermark可以设置读和写的水位。

每个bufferevent有四个水位:

  • 读低水位:就是当可读的数据量到达这个低水位后,才会调用用户设置的回调函数。比如用户想每次读取100字节,那么就可以把低水位设置为100。当可读数据的字节数小于100时,即使有数据都不会打扰用户(即不会调用用户设置的回调函数)。可读数据大于等于100字节后,才会调用用户的回调函数。默认值为0,所以每个读取操作都会导致读取回调被调用。
  • 读高水位:把读事件的evbuffer的数据量限制在高水位之下。比如,用户认为读缓冲区不能太大(太大的话,链表会很长)。那么用户就会设置读事件的高水位。当读缓冲区的数据量达到这个高水位后,即使socket fd还有数据没有读,也不会读进这个读缓冲区里面。一句话说,就是控制evbuffer的大小。
  • 写低水位:写入操作使得输出缓冲区的数据量达到或者低于此级别时,写入回调函数将被调用。默认值是0,所以只有输出缓冲区空的时候才会调用此回调。从后面源代码可以看出作者使用这四个水位的用途。完全是为了指示用户Libevent内核操作状态而已。
  • 写高水位:bufferevent没有直接使用这个水位。它在bufferevent用作另外一个bufferevent的底层传输端口时有特殊意义。

##处理高水位问题
因为可以设定高水位,控制evbuffer的大小,但socket fd可能还有数据。有数据就会触发可读事件,但处理可读的时候,又会发现设置了高水位,不能读取数据evbuffer。socket fd的数据没有被读完,又触发……。这个貌似是一个死循环。实际上是不会出现这个死循环的,因为Libevent发现evbuffer的数据量到达高水位后,就会把可读事件给挂起来,让它不能再触发了。Libevent使用函数bufferevent_wm_suspend_read把监听读事件的event挂起来。可以解决数据到达高水位之后,不会将数据从fd缓冲区读到evbuffer中。

//高水位挂起
#define bufferevent_wm_suspend_read(b) \
	bufferevent_suspend_read((b), BEV_SUSPEND_WM)


void
bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
	struct bufferevent_private *bufev_private =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
	BEV_LOCK(bufev);
	if (!bufev_private->read_suspended)//不能挂多次 
		bufev->be_ops->disable(bufev, EV_READ);//实际调用be_socket_disable函数,前面已经介绍了这几个函数
	bufev_private->read_suspended |= what;//因何而被挂起
	BEV_UNLOCK(bufev);
}

//接挂起
#define bufferevent_wm_unsuspend_read(b) \
	bufferevent_unsuspend_read((b), BEV_SUSPEND_WM)

void
bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
	struct bufferevent_private *bufev_private =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
	BEV_LOCK(bufev);
	bufev_private->read_suspended &= ~what;
	if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
		bufev->be_ops->enable(bufev, EV_READ);//重新插入事件
	BEV_UNLOCK(bufev);
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

右前面可知bufev->be_ops->disable直接将socket读事件删除,不会再去监听。不能随便设置高水位,因为它会暂停读。如果只想设置低水位而不想设置高水位,那么在调用bufferevent_setwatermark函数时,高水位的参数设为0。那么当读事件删除后,什么时候加入epoll继续读取呢?也就是用户从evbuffer读取一定数据之后,已经小于高水位了,如何再次激活读事件?
默认情况libevent都不会设置高水位和低水位,全部将数值设定为0。如果通过bufferevent_setwatermark设定了水位,那么就会设置读缓冲区evbuffer数据添加或者移除的回调函数。通过在回调函数里面比较缓冲区数据的大小,考虑是否将读事件添加到epoll中。这就解决了上述出现的问题。


/*
 * Sets the water marks
 */

void
bufferevent_setwatermark(struct bufferevent *bufev, short events,
    size_t lowmark, size_t highmark)
{
	struct bufferevent_private *bufev_private =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

	BEV_LOCK(bufev);
	if (events & EV_WRITE) {
		bufev->wm_write.low = lowmark;
		bufev->wm_write.high = highmark;
	}

	if (events & EV_READ) {
		bufev->wm_read.low = lowmark;
		bufev->wm_read.high = highmark;

		if (highmark) {//高水位
			/* There is now a new high-water mark for read.
			   enable the callback if needed, and see if we should
			   suspend/bufferevent_wm_unsuspend. */

			//还没设置高水位的回调函数
			if (bufev_private->read_watermarks_cb == NULL) {
				bufev_private->read_watermarks_cb =
				    evbuffer_add_cb(bufev->input,
						    bufferevent_inbuf_wm_cb,//内部回调函数
						    bufev);//添加回调函数
			}
			evbuffer_cb_set_flags(bufev->input,
				      bufev_private->read_watermarks_cb,
				      EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);
			
            //设置(修改)高水位时,evbuffer的数据量已经超过了水位值  
            //可能是把之前的高水位调高或者调低  
            //挂起操作和取消挂起操作都是幂等的(即多次挂起的作用等同于挂起一次) 
			if (evbuffer_get_length(bufev->input) >= highmark)
				bufferevent_wm_suspend_read(bufev);
			else if (evbuffer_get_length(bufev->input) < highmark)//调低了
				bufferevent_wm_unsuspend_read(bufev);
		} else {
			/* There is now no high-water mark for read. */

			//高水位值等于0,那么就要取消挂起 读事件  
            //取消挂起操作是幂等的 	
			if (bufev_private->read_watermarks_cb)
				evbuffer_cb_clear_flags(bufev->input,
				    bufev_private->read_watermarks_cb,
				    EVBUFFER_CB_ENABLED);
			bufferevent_wm_unsuspend_read(bufev);
		}
	}
	BEV_UNLOCK(bufev);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

上述函数重点是bufferevent_inbuf_wm_cb这个回调函数,当读缓冲数据变化的时候,会调用这回调函数。函数内部考虑是否激活读事件。



/* Callback to implement watermarks on the input buffer.  Only enabled
 * if the watermark is set. */

//高水位导致读的挂起和之后读的恢复,一切工作都是由Libevent内部完成的,用户不用做任何工作
static void
bufferevent_inbuf_wm_cb(struct evbuffer *buf,
    const struct evbuffer_cb_info *cbinfo,
    void *arg)
{
	struct bufferevent *bufev = arg;
	size_t size;

	size = evbuffer_get_length(buf);//获取缓冲区数据大小

	if (size >= bufev->wm_read.high)//大于高水位
		bufferevent_wm_suspend_read(bufev);//挂起读
	else
		bufferevent_wm_unsuspend_read(bufev);//解挂起读
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

高水位导致读的挂起和之后读的恢复,一切工作都是由Libevent内部完成的,用户不用做任何工作。

处理读数据问题

由前面bufferevent_socket_new函数可以知道,当socket fd可读的时候,会调用libevent内部回调函数bufferevent_readcb将尽量多的数据从fd读取到evbuffer缓冲区中,当数据到达低水位时候,在调用用户设定bufferevent_setcb函数中设置的读事件回调函数。所以,当用户的读事件回调函数被调用时,数据已经在evbuffer中了,用户拿来就用,无需再次调用read这类会阻塞的函数。

static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
{
	struct bufferevent *bufev = arg;
	struct bufferevent_private *bufev_p =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
	struct evbuffer *input;
	int res = 0;
	short what = BEV_EVENT_READING;
	ev_ssize_t howmuch = -1, readmax=-1;

	_bufferevent_incref_and_lock(bufev);

	if (event == EV_TIMEOUT) {
		/* Note that we only check for event==EV_TIMEOUT. If
		 * event==EV_TIMEOUT|EV_READ, we can safely ignore the
		 * timeout, since a read has occurred */
		what |= BEV_EVENT_TIMEOUT;
		goto error;
	}

	input = bufev->input;

	/*
	 * If we have a high watermark configured then we don't want to
	 * read more data than would make us reach the watermark.
	 */
	 //用户设置了高水位,则到达高水位,挂起读事件 
	if (bufev->wm_read.high != 0) {
		howmuch = bufev->wm_read.high - evbuffer_get_length(input);
		/* we somehow lowered the watermark, stop reading */
		if (howmuch <= 0) {
			bufferevent_wm_suspend_read(bufev);//到达高水位直接挂起事件
			goto done;
		}
	}

	//因为用户可以限速,所以这么要检测最大的可读大小。  
    //如果没有限速的话,那么将返回16384字节,即16K  
    //默认情况下是没有限速的。 
	readmax = _bufferevent_get_read_max(bufev_p);
	if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
					       * uglifies this code. XXXX */
		howmuch = readmax;
	
	//一些原因导致读被挂起,比如加锁了。 
	if (bufev_p->read_suspended)
		goto done;

	//解冻,使得可以在input的后面追加数据
	evbuffer_unfreeze(input, 0);
	//从socket fd中读取数据
	res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */
	evbuffer_freeze(input, 0);//冻结,防止外部修改数据

	if (res == -1) {
		int err = evutil_socket_geterror(fd);
		if (EVUTIL_ERR_RW_RETRIABLE(err))
			goto reschedule;

	   //因为非阻塞读,不是 EINTER or EAGAIN 这两个可以重试的错误,那么就应该是其他致命的错误  
	   //此时,应该报告给用户  	
		/* error case */
		what |= BEV_EVENT_ERROR;
	} else if (res == 0) {//断开了连接
		/* eof case */
		what |= BEV_EVENT_EOF;
	}

	if (res <= 0)
		goto error;

	//速率相关的操作
	_bufferevent_decrement_read_buckets(bufev_p, res);

	/* Invoke the user callback - must always be called last */
	//evbuffer的数据量大于低水位值。
	if (evbuffer_get_length(input) >= bufev->wm_read.low)
		_bufferevent_run_readcb(bufev);//激活用户设定的回调函数

	goto done;

 reschedule:
	goto done;

 error:
 	//把监听可读事件的event从event_base的事件队列中删除掉.event_del
	bufferevent_disable(bufev, EV_READ);
	_bufferevent_run_eventcb(bufev, what);//会调用用户设置的错误处理函数
	//到处使用goto跳转操作,这就是牛人写的代码。

 done:
	_bufferevent_decref_and_unlock(bufev);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94

对用户的读事件回调函数的触发是边缘触发的。这也就要求,在回调函数中,用户应该尽可能地把evbuffer的所有数据都读出来。如果想等到下一次回调时再读,那么需要等到下一次socket fd接收到数据才会触发用户的回调函数。如果之后socket fd一直收不到任何数据,那么即使evbuffer还有数据,用户的回调函数也不会被调用了。

bufferevent处理可写事件

可写需要解决的问题有两个:

1、libevent内部socket fd本身是非阻塞的,对于客户端,connect之后立即返回,需要判断连接是否成功?

2、不可能通过epoll一直监听可写事件,可写就是fd的写缓冲区(这个缓冲区在内核)还没满,可以往里面放数据。这就有一个问题,如果写缓冲区没有满,那么就一直是可写状态。如果一个event监听了可写事件,那么这个event就会一直被触发(死循环)。因为一般情况下,如果不是发大量的数据这个写缓冲区是不会满的。也就是说,不能一直监听可写事件。

Libevent的解决方案是:

对于问题1:因为这个socket fd是非阻塞的,所以它调用connect时,可能还没连接上就返回了。对于非阻塞的socket fd,一般是通过判断这个socket是否可写,从而得知这个socket是否已经连接上服务器。如果可写,那么它就已经成功连接上服务器了。

对于问题2:当我们确实要写入数据时,才监听可写事件。也就是说我们调用bufferevent_write写入数据时,Libevent才会把监听可写事件的那个event注册到event_base中。当Libevent把数据都写入到fd的缓冲区后,Libevent又会把这个event从event_base中删除。

以上两点是可写需要需要处理的问题,后续源代码处理了这两种情况。注意下面几个回调函数。bufferevent_socket_outbuf_cbbufferevent_writecb

##bufferevent_socket_new
对于sokcet传入参数是正数,那么表示已连接的套接字,通过在evlistener回调函数里面将fd通过此函数绑定到bufferevent上面。传入负数,则表示后面需要调用bufferevent_socket_connect连接服务器。所以这里主要有这两个用途。


struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
	struct bufferevent_private *bufev_p;
	struct bufferevent *bufev;


	if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)//分配bufferevent_private结构体
		return NULL;

//因为用于socket,所以通过bufferevent_ops_socket全局结构体初始化

	if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
				    options) < 0) {
		mm_free(bufev_p);
		return NULL;
	}//初始化socket对应的bufferevent_private
	bufev = &bufev_p->bev;
	//设置将evbuffer的数据向利用最高传输方式fd传
	evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);

	//将读写事件与fd关联起来。
	event_assign(&bufev->ev_read, bufev->ev_base, fd,
	    EV_READ|EV_PERSIST, bufferevent_readcb, bufev);//将读事件与内部回调函数bufferevent_readcb接合起来
	event_assign(&bufev->ev_write, bufev->ev_base, fd,
	    EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);//将写事件与内部回调函数bufferevent_writecb接合起来


    //设置evbuffer的回调函数,使得外界给写缓冲区添加数据时,能触发  
	//写操作,这个回调对于写事件的监听是很重要的 
	evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);


    //冻结读缓冲区的尾部,未解冻之前不能往读缓冲区追加数据  
    //也就是说不能从socket fd中读取数据  
	evbuffer_freeze(bufev->input, 0);

	//冻结写缓冲区的头部,未解冻之前不能把写缓冲区的头部数据删除  
    //也就是说不能把数据写到socket fd  
	evbuffer_freeze(bufev->output, 1);

	return bufev;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

##bufferevent_socket_connect
用户可以在调用bufferevent_socket_new函数时,传一个-1作为socket的文件描述符,然后调用bufferevent_socket_connect函数连接服务器,无需自己写代码调用connect函数连接服务器。
bufferevent_socket_connect函数会调用socket函数申请一个套接字fd,然后把这个fd设置成非阻塞。接着就connect服务器,因为该socket fd是非阻塞的,所以不会等待,而是马上返回,连接这工作交给内核来完成。所以,返回后这个socket还没有真正连接上服务器。那么什么时候连接上呢?内核又是怎么通知通知用户呢?
一般来说,当可以往socket fd可写,那就说明已经连接上了。也就是说这个socket fd变成可写状态,就连接上了。
所以,对于“非阻塞connect”比较流行的做法是:用select或者poll这类多路IO复用函数监听该socket的可写事件。当这个socket触发了可写事件,然后再对这个socket调用getsockopt函数,做进一步的判断即可。


/*
直接连接
*/
int
bufferevent_socket_connect(struct bufferevent *bev,
    struct sockaddr *sa, int socklen)
{
	struct bufferevent_private *bufev_p =
	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);

	evutil_socket_t fd;
	int r = 0;
	int result=-1;
	int ownfd = 0;

	_bufferevent_incref_and_lock(bev);

	if (!bufev_p)
		goto done;

	fd = bufferevent_getfd(bev);//初始化设定fd小于0
	if (fd < 0) {//该bufferevent还没有设置fd
		if (!sa)
			goto done;
		fd = socket(sa->sa_family, SOCK_STREAM, 0);//创建套接字,并将其设定为非阻塞
		if (fd < 0)
			goto done;
		if (evutil_make_socket_nonblocking(fd)<0)//设置为非阻塞
			goto done;
		ownfd = 1;
	}
	if (sa) {
		r = evutil_socket_connect(&fd, sa, socklen);//非阻塞调用connect连接服务器,不会等待,而是马上返回,连接工作交给内核来完成。
		if (r < 0)//小于则错误
		/*
			0-EINPROGRESS或EINTR 正在连接
			1-都在本机,连接成功
			2-refuse拒绝连接
			<0 错误		
		*/
			goto freesock;
	}
	//删除旧事件,并将事件和新的fd对应,并加入到epoll监听可写。
	bufferevent_setfd(bev, fd);
	if (r == 0) {//返回值等于0,则握手开始,但是还没有连接上,必须监听可写

	    //此时需要监听可写事件,当可写了,并且没有错误的话,就成功连接上了
		if (! be_socket_enable(bev, EV_WRITE)) {//将可写加入epoll监听
			bufev_p->connecting = 1;//epoll返回即可写了
			result = 0;
			goto done;
		}
	} else if (r == 1) {//当服务器和客户机处于同一个主机,connect直接返回可能发生。
		/* The connect succeeded already. How very BSD of it. */
		result = 0;
		bufev_p->connecting = 1;
		event_active(&bev->ev_write, EV_WRITE, 1);
	} else {
		/* The connect failed already.  How very BSD of it. */
		bufev_p->connection_refused = 1;
		bufev_p->connecting = 1;
		result = 0;
		event_active(&bev->ev_write, EV_WRITE, 1);
	}

	goto done;

freesock:
	_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
	if (ownfd)
		evutil_closesocket(fd);
	/* do something about the error? */
done:
	_bufferevent_decref_and_unlock(bev);
	return result;
}

/* XXX we should use an enum here. */
/* 2 for connection refused, 1 for connected, 0 for not yet, -1 for error. */
int
evutil_socket_connect(evutil_socket_t *fd_ptr, struct sockaddr *sa, int socklen)
{
	int made_fd = 0;

	if (*fd_ptr < 0) {
		if ((*fd_ptr = socket(sa->sa_family, SOCK_STREAM, 0)) < 0)
			goto err;
		made_fd = 1;
		if (evutil_make_socket_nonblocking(*fd_ptr) < 0) {
			goto err;
		}
	}

	if (connect(*fd_ptr, sa, socklen) < 0) {//调用系统connect函数
		int e = evutil_socket_geterror(*fd_ptr);//获取socket错误
		if (EVUTIL_ERR_CONNECT_RETRIABLE(e))//EINTR或EINPROGRESS则正在握手
			return 0;
		if (EVUTIL_ERR_CONNECT_REFUSED(e))
			return 2;
		goto err;
	} else {
		return 1;//返回 >=0 则表示连接成功,UNPv1有详细说明。客户和服务器位于本机则可能立即返回成功。
	}

err:
	if (made_fd) {
		evutil_closesocket(*fd_ptr);
		*fd_ptr = -1;
	}
	return -1;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112

上述函数将bufev_p->connecting设为1,表示正在连接,暂时还没有连接上,那么在bufferevent_writecb函数里面就会通过这个标志位判断是否成功连接上了。

当connect之后,就会监听ev_write是否可写,当可写则会调用libevent内部的可写回调函数bufferevent_writecb


static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
	struct bufferevent *bufev = arg;
	struct bufferevent_private *bufev_p =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
	int res = 0;
	short what = BEV_EVENT_WRITING;
	int connected = 0;
	ev_ssize_t atmost = -1;

	_bufferevent_incref_and_lock(bufev);

	if (event == EV_TIMEOUT) {
		/* Note that we only check for event==EV_TIMEOUT. If
		 * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
		 * timeout, since a read has occurred */
		what |= BEV_EVENT_TIMEOUT;
		goto error;
	}

	/*
	判断这个非阻塞的socket是否已经连接上服务器。非阻塞的connect操作,参考UNPv1
	*/
	if (bufev_p->connecting) {
		int c = evutil_socket_finished_connecting(fd);//判断socket的状态
		/* we need to fake the error if the connection was refused
		 * immediately - usually connection to localhost on BSD */
		if (bufev_p->connection_refused) {
		  bufev_p->connection_refused = 0;
		  c = -1;
		}

		if (c == 0)
			goto done;

		bufev_p->connecting = 0;
		if (c < 0) {
			event_del(&bufev->ev_write);
			event_del(&bufev->ev_read);
			_bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
			goto done;
		} else {
			connected = 1;
			_bufferevent_run_eventcb(bufev,
					BEV_EVENT_CONNECTED);
			if (!(bufev->enabled & EV_WRITE) ||
			    bufev_p->write_suspended) {
				event_del(&bufev->ev_write);
				goto done;
			}
		}
	}

	//用户可能设置了限速,如果没有限速,那么atmost将返回16384(16K)
	atmost = _bufferevent_get_write_max(bufev_p);


	//一些原因导致写被挂起来了 
	if (bufev_p->write_suspended)
		goto done;

	//如果evbuffer有数据可以写到sockfd中
	if (evbuffer_get_length(bufev->output)) {
		//解冻链表头,防止加入数据
		evbuffer_unfreeze(bufev->output, 1);
		//将output这个evbuffer的数据写到socket fd 的缓冲区中  
        //会把已经写到socket fd缓冲区的数据,从evbuffer中删除
		res = evbuffer_write_atmost(bufev->output, fd, atmost);
		evbuffer_freeze(bufev->output, 1);
		if (res == -1) {
			int err = evutil_socket_geterror(fd);
			if (EVUTIL_ERR_RW_RETRIABLE(err))//可以恢复的错误。一般是EINTR或者EAGAIN
				goto reschedule;
			what |= BEV_EVENT_ERROR;
		} else if (res == 0) {//该socket已经断开连接了
			/* eof case
			   XXXX Actually, a 0 on write doesn't indicate
			   an EOF. An ECONNRESET might be more typical.
			 */
			what |= BEV_EVENT_EOF;
		}
		if (res <= 0)
			goto error;

		_bufferevent_decrement_write_buckets(bufev_p, res);
	}

	if (evbuffer_get_length(bufev->output) == 0) {
		event_del(&bufev->ev_write);
	}

	/*
	 * Invoke the user callback if our buffer is drained or below the
	 * low watermark.
	 */
	 
	//如果evbuffer里面的数据量已经写得七七八八了,小于设置的低水位值,那么  
    //就会调用用户设置的写事件回调函数 
	if ((res || !connected) &&
	    evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
		_bufferevent_run_writecb(bufev);
	}

	goto done;

 reschedule:
 	
    //如果把写缓冲区的数据都写完成了。为了防止event_base不断地触发可写  
    //事件,此时要把这个监听可写的event删除。  
    //前面的atmost限制了一次最大的可写数据。如果还没写所有的数据  
    //那么就不能delete这个event,而是要继续监听可写事情,知道把所有的  
    //数据都写到socket fd中。	
	if (evbuffer_get_length(bufev->output) == 0) {
		event_del(&bufev->ev_write);
	}
	goto done;

 error:
	bufferevent_disable(bufev, EV_WRITE);
	_bufferevent_run_eventcb(bufev, what);//有错误。把这个写event删除

 done:
	_bufferevent_decref_and_unlock(bufev);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126

bufferevent_writecb函数过程也很简单,首先判断是否需要判断socket连上服务器,然后调用evbuffer_write_atmost函数把数据从evbuffer中写到evbuffer缓冲区中,此时要注意函数的返回值,因为可能写的时候发生错误。如果发生了错误,就要调用用户设置的event回调函数。
之后,还要判断evbuffer的数据是否已经全部写到socket 的缓冲区了。如果已经全部写了,那么就要把监听写事件的event从event_base的插入队列中删除。并判断是否低于低水位需要调用回调函数,如果还没写完,那么就不能删除,因为还要继续监听可写事件,下次接着写。
当上次用户写完数据结束后,libevent将结束写事件监听,那么何时再次监听写事件呢?那么当然是用户往evbuffer写入数据的时候,将写事件监听。用户写调用会bufferevent_write,其工作很简单就是将数据从用户区域,复制到evbuffer的写缓冲区。有因为先前设置了写缓冲区数据变化,evbuffer就会调用回调函数bufferevent_socket_outbuf_cb,该函数会将写事件加入epoll。至此libevent简单的解决了问题1。

int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{
	if (evbuffer_add(bufev->output, data, size) == -1)//复制数据到evbuffer
		return (-1);

	return 0;
}

static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
    const struct evbuffer_cb_info *cbinfo,
    void *arg)
{
	struct bufferevent *bufev = arg;
	struct bufferevent_private *bufev_p =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

	if (cbinfo->n_added &&//evbuffer添加了数据
	    (bufev->enabled & EV_WRITE) &&//默认情况下是enable EV_WRITE的
	    !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&//这个event已经被踢出event_base了 
	    !bufev_p->write_suspended) {//这个bufferevent的写并没有被挂起
		/* Somebody added data to the buffer, and we would like to
		 * write, and we were not writing.  So, start writing. */
		//把这个event添加到event_base中
		if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
		    /* Should we log this? */
		}
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

下面这段是为了解决问题2,通过标志位bufev_p->connecting,判断是否成功连接上的操作。这段代码不复杂,逻辑也很清楚,具体过程可以参考UNPv1中非阻塞IO的讲解。不管连接成功还是失败,都会调用bufferevent_setcb里面的bufferevent_event_cb,并传入成功或失败的标志,所以在这个函数里面可以判断连接是否成功或失败。这个回调函数如何编写很重要。

	/*
	判断这个非阻塞的socket是否已经连接上服务器。非阻塞的connect操作,参考UNPv1
	*/
	if (bufev_p->connecting) {
		int c = evutil_socket_finished_connecting(fd);//判断socket的状态
		/* we need to fake the error if the connection was refused
		 * immediately - usually connection to localhost on BSD */
		if (bufev_p->connection_refused) {
		  bufev_p->connection_refused = 0;
		  c = -1;
		}

		if (c == 0)//还没有连接好,继续监听
			goto done;

		bufev_p->connecting = 0;
		if (c < 0) {//小于0,则发生错误,删除事件
			event_del(&bufev->ev_write);
			event_del(&bufev->ev_read);
			_bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);//并调用errorcb,传入BEV_EVENT_ERROR参数
			goto done;
		} else {//大于0,则连接成功
			connected = 1;
			_bufferevent_run_eventcb(bufev,
					BEV_EVENT_CONNECTED);//并调用errorcb,传入BEV_EVENT_CONNECTED参数
			if (!(bufev->enabled & EV_WRITE) ||
			    bufev_p->write_suspended) {
				event_del(&bufev->ev_write);//删除读监听
				goto done;
			}
		}
	}


/* Check whether a socket on which we called connect() is done
   connecting. Return 1 for connected, 0 for not yet, -1 for error.  In the error case, set the current socket errno to the error that happened during the connect operation. */
int
evutil_socket_finished_connecting(evutil_socket_t fd)
{
	int e;
	ev_socklen_t elen = sizeof(e);

	if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*)&e, &elen) < 0)
		return -1;

	if (e) {
		if (EVUTIL_ERR_CONNECT_RETRIABLE(e))//错误是EINTR或EINPROGRESS,没有连接上
			return 0;
		EVUTIL_SET_SOCKET_ERROR(e);//错误则返回-1
		return -1;
	}

	return 1;//大于0,则表示连接成功。
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

最后注意一点bufferevent_setcb中设置的bufev->errorcb函数,其函数原型为typedef void (*bufferevent_event_cb)(struct bufferevent *bev, short what, void *ctx);。形参中包含了what,用来指明bufferevent事件状态,其包括如下几个选项:

/** @name Bufferevent event codes
    These flags are passed as arguments to a bufferevent's event callback.

    @{
*/
#define BEV_EVENT_READING	0x01	/**< error encountered while reading */
#define BEV_EVENT_WRITING	0x02	/**< error encountered while writing */
#define BEV_EVENT_EOF		0x10	/**< eof file reached */
#define BEV_EVENT_ERROR		0x20	/**< unrecoverable error encountered */
#define BEV_EVENT_TIMEOUT	0x40	/**< user-specified timeout reached */
#define BEV_EVENT_CONNECTED	0x80	/**< connect operation finished. */

void
bufferevent_setcb(struct bufferevent *bufev,
    bufferevent_data_cb readcb, bufferevent_data_cb writecb,
    bufferevent_event_cb eventcb, void *cbarg)
{
	BEV_LOCK(bufev);

	bufev->readcb = readcb;//设定对应的回调函数,很简单,就是简单的赋值操作。
	bufev->writecb = writecb;//设定对应的回调函数,很简单,就是简单的赋值操作。
	bufev->errorcb = eventcb;//设定对应的回调函数,很简单,就是简单的赋值操作。

	bufev->cbarg = cbarg;
	BEV_UNLOCK(bufev);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

所以在写bufferevent事件回调函数的时候,一定要判断what是什么,然后做相应的处理,这点很重要。下面就判断了这些条件,然后做了不同的处理操作。

使用bufferevent_socket_connect的例子

客户端的简单例子:

#include<sys/socket.h>
#include<arpa/inet.h>
#include<unistd.h>

#include<stdio.h>
#include<string.h>
#include<stdlib.h>

#include<event.h>
#include<event2/bufferevent.h>


void cmd_msg_cb(int fd, short events, void* arg);
void server_msg_cb(struct bufferevent* bev, void* arg);
void socket_error_cb(struct bufferevent *bev, short event, void *arg);

int main(int argc, char** argv)
{
    //新建base发动机
    struct event_base* base = event_base_new();

    //新建一个bufferevent,并设定fd = -1则表示客户端需要用bufferevent_socket_connect连接服务器
    struct bufferevent *bev = bufferevent_socket_new(base, -1,
                                                     BEV_OPT_CLOSE_ON_FREE);
    //监听终端输入事件,客户端需要从终端输入。
    struct event *ev_cmd = event_new(base, STDIN_FILENO,
                                      EV_READ | EV_PERSIST, cmd_msg_cb,
                                      (void*)bev);
    event_add(ev_cmd, NULL);//将终端输入添加epoll


    //设定服务器IP地址信息
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr) );

    server_addr.sin_family = AF_INET;
    inet_pton(AF_INET , "192.168.24.128" , &server_addr.sin_addr);
    server_addr.sin_port = htons(9999);
    //调用socket_connect连接,此时设定了fd为非阻塞
    bufferevent_socket_connect(bev , (struct sockaddr *)&server_addr , sizeof(struct sockaddr_in));//连接

    //设定bufferevent接收数据回调函数
    bufferevent_setcb(bev, server_msg_cb, NULL, socket_error_cb, (void*)ev_cmd);
    //将bev时间添加到epoll
    bufferevent_enable(bev, EV_READ | EV_PERSIST);

    //发动机启动,开始监听
    event_base_dispatch(base);

    //清理缓存情况
    event_base_free(base);

    printf("finished \n");

    return 0;
}


void cmd_msg_cb(int fd, short events, void* arg)
{
    char msg[1024];

    int ret = read(fd, msg, sizeof(msg));
    if( ret < 0 )
    {
        perror("read fail ");
        exit(1);
    }

    struct bufferevent* bev = (struct bufferevent*)arg;

    //把终端的消息发送给服务器端
    bufferevent_write(bev, msg, ret);
}


void server_msg_cb(struct bufferevent* bev, void* arg)
{
    char msg[1024];

    size_t len = bufferevent_read(bev, msg, sizeof(msg));
    msg[len] = '\0';

    printf("recv %s from server\n", msg);
}

//这个函数超级重要
void socket_error_cb(struct bufferevent *bev, short event, void *arg)
{

    if (event & BEV_EVENT_EOF)//连接关闭
        printf("connection closed\n");
    else if (event & BEV_EVENT_ERROR)//连接错误
        printf("some other error\n");
    else if(event & BEV_EVENT_CONNECTED){//连接成功,回调直接退出
        printf("the client has connected to server\n");
        return ;
    }
    //运行到这里已经可以确定连接关闭或者错误,则清楚内存。
    bufferevent_free(bev);

    struct event *ev = (struct event*)arg;
    //因为socket已经没有,所以这个event也没有存在的必要了
    event_free(ev);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105

以下调试信息和分析得到一模一样:


[debug] event_add: event: 0x163c7e0 (fd 0), EV_READ   call 0x400eca
[debug] Epoll ADD(1) on fd 0 okay. [old events were 0; read change was 1; write change was 0]
[debug] event_del: 0x163c4b0 (fd -1), callback 0x7f1a0334294f
[debug] event_del: 0x163c538 (fd -1), callback 0x7f1a03342b44
[debug] event_add: event: 0x163c538 (fd 6),  EV_WRITE  call 0x7f1a03342b44
[debug] Epoll ADD(4) on fd 6 okay. [old events were 0; read change was 0; write change was 1]
[debug] event_add: event: 0x163c538 (fd 6),  EV_WRITE  call 0x7f1a03342b44
[debug] event_add: event: 0x163c4b0 (fd 6), EV_READ   call 0x7f1a0334294f
[debug] Epoll MOD(5) on fd 6 okay. [old events were 4; read change was 1; write change was 0]
[debug] epoll_dispatch: epoll_wait reports 1
[debug] event_active: 0x163c538 (fd 6), res 4, callback 0x7f1a03342b44
[debug] event_process_active: event: 0x163c538,  EV_WRITE call 0x7f1a03342b44
the client has connected to server
[debug] event_del: 0x163c538 (fd 6), callback 0x7f1a03342b44
[debug] Epoll MOD(1) on fd 6 okay. [old events were 6; read change was 0; write change was 2]
wangjun
[debug] epoll_dispatch: epoll_wait reports 1
[debug] event_active: 0x163c7e0 (fd 0), res 2, callback 0x400eca
[debug] event_process_active: event: 0x163c7e0, EV_READ  call 0x400eca
[debug] event_add: event: 0x163c538 (fd 6),  EV_WRITE  call 0x7f1a03342b44
[debug] Epoll MOD(5) on fd 6 okay. [old events were 2; read change was 0; write change was 1]
[debug] epoll_dispatch: epoll_wait reports 1
[debug] event_active: 0x163c538 (fd 6), res 4, callback 0x7f1a03342b44
[debug] event_process_active: event: 0x163c538,  EV_WRITE call 0x7f1a03342b44
[debug] event_del: 0x163c538 (fd 6), callback 0x7f1a03342b44
[debug] Epoll MOD(1) on fd 6 okay. [old events were 6; read change was 0; write change was 2]
[debug] epoll_dispatch: epoll_wait reports 1
[debug] event_active: 0x163c4b0 (fd 6), res 2, callback 0x7f1a0334294f
[debug] event_process_active: event: 0x163c4b0, EV_READ  call 0x7f1a0334294f
recv I have recvieced the msg: wangjun
 from server

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

参考

https://blog.csdn.net/luotuo44/article/details/39344743#t4

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/458451
推荐阅读
相关标签
  

闽ICP备14008679号