对libevent中bufferevent的深入理解

在libevent 2.x.x中,新增了bufferevent来封装了socket操作,用来简化编码操作。

当前项目有个点实在是有点儿绕,我自己在libevent基础上做了一个网络引擎的wrapper,未来打算替换,在实现的过程中,发现了一些问题,于是这些日子在空闲的时候阅读了下libevent的实现以及从前没有接触过的epoll模型,心里也算是有点数了。

在bufferevent中,封装了两个event,分别是read和write,同时也有对应的read buffer和write buffer,这也是常用的设计。然而bufferevent对于read和write事件的处理却有很大的差异。

读事件

首先我们谈谈比较容易理解的read事件,通常的步骤就是新建一个bufferevent

bufferevent_socket_new

然后我们设置对应的写回调函数,同时要记住必须调用

bufferevent_enable(pEv, EV_READ)

这个函数是将对应的event通过event_add添加到eventbase中的,我们知道在libevent的设计中,所有的事件必须加入eventbase中才会获得事件通知。

bufferevent_enable 函数中,会设置enabled变量的对应的标志位,并且将对应的event添加到eventbase中。

不然我们的读事件都会被libevent忽略,然后我们只要静静的等待着读回调即可。在libevent层面,它属于proactor模式,我们只需要从缓冲区中把socket数据取出来即可,不用调用recv/read。

从libevent的内部实现来讲,在调用bufferevent_socket_new的时候已经初始化了读事件,并且设置了读回调,这个回调是属于libevent内部的回调,用于实现proactor的机制。

static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    struct evbuffer *input;
    int res = 0;
    short what = BEV_EVENT_READING;
    ev_ssize_t howmuch = -1, readmax=-1;

    _bufferevent_incref_and_lock(bufev);

    if (event == EV_TIMEOUT) {
        /* Note that we only check for event==EV_TIMEOUT. If
         * event==EV_TIMEOUT|EV_READ, we can safely ignore the
         * timeout, since a read has occurred */
        what |= BEV_EVENT_TIMEOUT;
        goto error;
    }

    input = bufev->input;

    /*
     * If we have a high watermark configured then we don't want to
     * read more data than would make us reach the watermark.
     */
    if (bufev->wm_read.high != 0) {
        howmuch = bufev->wm_read.high - evbuffer_get_length(input);
        /* we somehow lowered the watermark, stop reading */
        if (howmuch <= 0) {
            bufferevent_wm_suspend_read(bufev);
            goto done;
        }
    }
    readmax = _bufferevent_get_read_max(bufev_p);
    if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
                           * uglifies this code. XXXX */
        howmuch = readmax;
    if (bufev_p->read_suspended)
        goto done;

    evbuffer_unfreeze(input, 0);
    res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */
    evbuffer_freeze(input, 0);

    if (res == -1) {
        int err = evutil_socket_geterror(fd);
        if (EVUTIL_ERR_RW_RETRIABLE(err))
            goto reschedule;
        /* error case */
        what |= BEV_EVENT_ERROR;
    } else if (res == 0) {
        /* eof case */
        what |= BEV_EVENT_EOF;
    }

    if (res <= 0)
        goto error;

    _bufferevent_decrement_read_buckets(bufev_p, res);

    /* Invoke the user callback - must always be called last */
    if (evbuffer_get_length(input) >= bufev->wm_read.low)
        _bufferevent_run_readcb(bufev);

    goto done;

 reschedule:
    goto done;

 error:
    bufferevent_disable(bufev, EV_READ);
    _bufferevent_run_eventcb(bufev, what);

 done:
    _bufferevent_decref_and_unlock(bufev);
}

可以看到最终会调用用户的回调函数,而对于这些函数来说,libevent已经封装了read/recv等细节,只需要从evbuffer中获取数据就可以了。

写事件

读事件非常好理解,就是有数据可读了,而读事件有点儿不一样。读事件是属于缓冲区可写事件,在socket的缓冲区未写满的时候会一直通知,所以bufferevent的写处理有点儿不一样。

在初始化bufferevent的时候,bufferevent会默认自动设置enabled的EV_WRITE标志位,这是很重要的,而EV_READ的标志位必须自己设置。

为何只是设置EV_READ标志位而不是调用 bufferevent_enable 呢?上面已经谈到了这个问题,即当socket缓冲区可写的时候,会不断的回调,而这是不需要的,我们的目的只是有用户数据还未写入socket缓冲区的时候,我们才需要这个通知,于是在正常的情况下, bufferevent_enable(pEv, EV_WRITE) 是不会设置的,这是有悖于bufferevent的设计思路的。

为了解决这个问题,我们就不处理socket的可写事件,当用户调用了 bufferevent_write 的时候才进行处理。当用户请求发送数据的时候,libevent是写入写缓冲的,大概的设计逻辑如下:

// 添加到写缓冲 ...省略

evbuffer_invoke_callbacks(buf);

查看了下libevent的实现,仅仅只是把数据append到写缓冲内,没有任何的发送机制,可实际上这段数据还是被发送出去了,这是怎么实现的呢?所以这个工作只能是 evbuffer_invoke_callbacks(buf); 这个来实现的,这个是注册到evbuffer的一个事件回调,当有数据写入的时候会被调用。于是我们继续追溯这个调用链,我们从头追溯,我们在初始化bufferevent对象的时候,有这么一句调用:

evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

我们对写缓冲设置了这么一个回调,那么在我们把数据写入了写缓冲,这个回调就会被调用,我们来看下这个的实现:

static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
    const struct evbuffer_cb_info *cbinfo,
    void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

    if (cbinfo->n_added &&
        (bufev->enabled & EV_WRITE) &&
        !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
        !bufev_p->write_suspended) {
        /* Somebody added data to the buffer, and we would like to
         * write, and we were not writing.  So, start writing. */
        if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
            /* Should we log this? */
        }
    }
}

没错,我们又看到了 be_socket_add 这个熟悉的身影,而这个函数就是添加对应的事件到eventbase中,我们来看下上述的判断条件,看到了enabled这个变量了,没错在 bufferevent_socket_new 中已经默认设置了EV_WRITE的标志位,所以当我们使用bufferevent_write写入写缓冲的时候,其实libevent会将可写事件监听,当有可写事件来临的时候,就执行到了之前在写event上注册的回调 bufferevent_writecb,在这个函数中,会尝试将写缓冲的数据写入socket缓冲区,假设写入完毕,则会删除读事件的监听,假设缓冲区已满,则等待下次缓冲区可写的通知。

这样写事件的逻辑也整理清楚了,写流程是属于有点儿复杂的,读相对而言会简单很多。

有感

我们目前项目中对于libevent的发送数据包的用法,有点儿绕,不知道是有意为之还是怎么的,我们现在的做法是主动的 bufferevent_enable EV_WRITE事件,而当缓冲区可用的时候,我们还没有把我们的数据包写入写缓冲,于是直接调用用户的回调函数,于是在用户的回调函数中再把数据给发送出去,当然还是调用了 bufferevent_write ,而在这个函数中,当缓冲区发完再把写事件从eventbase中删除,这个逻辑其实是多走了一点儿东西,其实不必等缓冲区可写才发送数据包,直接调用 bufferevent_write 就可以了,上述的逻辑会把发包的逻辑打乱,在回调的时候恢复当前的现场也是多写了很多代码。

共 0 条回复
暂时没有人回复哦,赶紧抢沙发
发表新回复

作者

sryan
today is a good day