在libevent 2.x.x中,新增了bufferevent来封装了socket操作,用来简化编码操作。
当前项目有个点实在是有点儿绕,我自己在libevent基础上做了一个网络引擎的wrapper,未来打算替换,在实现的过程中,发现了一些问题,于是这些日子在空闲的时候阅读了下libevent的实现以及从前没有接触过的epoll模型,心里也算是有点数了。
在bufferevent中,封装了两个event,分别是read和write,同时也有对应的read buffer和write buffer,这也是常用的设计。然而bufferevent对于read和write事件的处理却有很大的差异。
首先我们谈谈比较容易理解的read事件,通常的步骤就是新建一个bufferevent
bufferevent_socket_new
然后我们设置对应的写回调函数,同时要记住必须调用
bufferevent_enable(pEv, EV_READ)
这个函数是将对应的event通过event_add添加到eventbase中的,我们知道在libevent的设计中,所有的事件必须加入eventbase中才会获得事件通知。
在 bufferevent_enable
函数中,会设置enabled变量的对应的标志位,并且将对应的event添加到eventbase中。
不然我们的读事件都会被libevent忽略,然后我们只要静静的等待着读回调即可。在libevent层面,它属于proactor模式,我们只需要从缓冲区中把socket数据取出来即可,不用调用recv/read。
从libevent的内部实现来讲,在调用bufferevent_socket_new的时候已经初始化了读事件,并且设置了读回调,这个回调是属于libevent内部的回调,用于实现proactor的机制。
static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
struct evbuffer *input;
int res = 0;
short what = BEV_EVENT_READING;
ev_ssize_t howmuch = -1, readmax=-1;
_bufferevent_incref_and_lock(bufev);
if (event == EV_TIMEOUT) {
/* Note that we only check for event==EV_TIMEOUT. If
* event==EV_TIMEOUT|EV_READ, we can safely ignore the
* timeout, since a read has occurred */
what |= BEV_EVENT_TIMEOUT;
goto error;
}
input = bufev->input;
/*
* If we have a high watermark configured then we don't want to
* read more data than would make us reach the watermark.
*/
if (bufev->wm_read.high != 0) {
howmuch = bufev->wm_read.high - evbuffer_get_length(input);
/* we somehow lowered the watermark, stop reading */
if (howmuch <= 0) {
bufferevent_wm_suspend_read(bufev);
goto done;
}
}
readmax = _bufferevent_get_read_max(bufev_p);
if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
* uglifies this code. XXXX */
howmuch = readmax;
if (bufev_p->read_suspended)
goto done;
evbuffer_unfreeze(input, 0);
res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */
evbuffer_freeze(input, 0);
if (res == -1) {
int err = evutil_socket_geterror(fd);
if (EVUTIL_ERR_RW_RETRIABLE(err))
goto reschedule;
/* error case */
what |= BEV_EVENT_ERROR;
} else if (res == 0) {
/* eof case */
what |= BEV_EVENT_EOF;
}
if (res <= 0)
goto error;
_bufferevent_decrement_read_buckets(bufev_p, res);
/* Invoke the user callback - must always be called last */
if (evbuffer_get_length(input) >= bufev->wm_read.low)
_bufferevent_run_readcb(bufev);
goto done;
reschedule:
goto done;
error:
bufferevent_disable(bufev, EV_READ);
_bufferevent_run_eventcb(bufev, what);
done:
_bufferevent_decref_and_unlock(bufev);
}
可以看到最终会调用用户的回调函数,而对于这些函数来说,libevent已经封装了read/recv等细节,只需要从evbuffer中获取数据就可以了。
读事件非常好理解,就是有数据可读了,而读事件有点儿不一样。读事件是属于缓冲区可写事件,在socket的缓冲区未写满的时候会一直通知,所以bufferevent的写处理有点儿不一样。
在初始化bufferevent的时候,bufferevent会默认自动设置enabled的EV_WRITE标志位,这是很重要的,而EV_READ的标志位必须自己设置。
为何只是设置EV_READ标志位而不是调用 bufferevent_enable
呢?上面已经谈到了这个问题,即当socket缓冲区可写的时候,会不断的回调,而这是不需要的,我们的目的只是有用户数据还未写入socket缓冲区的时候,我们才需要这个通知,于是在正常的情况下, bufferevent_enable(pEv, EV_WRITE)
是不会设置的,这是有悖于bufferevent的设计思路的。
为了解决这个问题,我们就不处理socket的可写事件,当用户调用了 bufferevent_write
的时候才进行处理。当用户请求发送数据的时候,libevent是写入写缓冲的,大概的设计逻辑如下:
// 添加到写缓冲 ...省略
evbuffer_invoke_callbacks(buf);
查看了下libevent的实现,仅仅只是把数据append到写缓冲内,没有任何的发送机制,可实际上这段数据还是被发送出去了,这是怎么实现的呢?所以这个工作只能是 evbuffer_invoke_callbacks(buf);
这个来实现的,这个是注册到evbuffer的一个事件回调,当有数据写入的时候会被调用。于是我们继续追溯这个调用链,我们从头追溯,我们在初始化bufferevent对象的时候,有这么一句调用:
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
我们对写缓冲设置了这么一个回调,那么在我们把数据写入了写缓冲,这个回调就会被调用,我们来看下这个的实现:
static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo,
void *arg)
{
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (cbinfo->n_added &&
(bufev->enabled & EV_WRITE) &&
!event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
!bufev_p->write_suspended) {
/* Somebody added data to the buffer, and we would like to
* write, and we were not writing. So, start writing. */
if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
/* Should we log this? */
}
}
}
没错,我们又看到了 be_socket_add
这个熟悉的身影,而这个函数就是添加对应的事件到eventbase中,我们来看下上述的判断条件,看到了enabled这个变量了,没错在 bufferevent_socket_new
中已经默认设置了EV_WRITE的标志位,所以当我们使用bufferevent_write写入写缓冲的时候,其实libevent会将可写事件监听,当有可写事件来临的时候,就执行到了之前在写event上注册的回调 bufferevent_writecb
,在这个函数中,会尝试将写缓冲的数据写入socket缓冲区,假设写入完毕,则会删除读事件的监听,假设缓冲区已满,则等待下次缓冲区可写的通知。
这样写事件的逻辑也整理清楚了,写流程是属于有点儿复杂的,读相对而言会简单很多。
我们目前项目中对于libevent的发送数据包的用法,有点儿绕,不知道是有意为之还是怎么的,我们现在的做法是主动的 bufferevent_enable
EV_WRITE事件,而当缓冲区可用的时候,我们还没有把我们的数据包写入写缓冲,于是直接调用用户的回调函数,于是在用户的回调函数中再把数据给发送出去,当然还是调用了 bufferevent_write
,而在这个函数中,当缓冲区发完再把写事件从eventbase中删除,这个逻辑其实是多走了一点儿东西,其实不必等缓冲区可写才发送数据包,直接调用 bufferevent_write
就可以了,上述的逻辑会把发包的逻辑打乱,在回调的时候恢复当前的现场也是多写了很多代码。