如无特殊说明,本文涉及的源码均位于 ae_epoll.c 中


上文 我们介绍过,Redis 通过 AE 抽象层适配了多种类 UNIX 平台的 I/O 多路复用实现,接下来我们将以最常用的 Linux 平台为例,重点介绍下 Redis 是如何在 ae_epoll.c 中封装 epoll 的。

数据结构

Redis 通过结构体 aeApiState 存放 epoll 数据:

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;
  • epfd:存放 epoll 文件描述符句柄;
  • events:epoll 的事件结构体,用于接收已经就绪的事件。

aeApiCreate 初始化

aeApiCreate 函数负责初始化事件循环器,它内部会调用 epoll_create 初始化多路复用器:

static int aeApiCreate(aeEventLoop *eventLoop) {
    // [1] 初始化 aeApiState
    aeApiState *state = zmalloc(sizeof(aeApiState));
    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    // [2] 创建 epoll 实例
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    anetCloexec(state->epfd);
    // [3] 把 epoll 实例存到 eventLoop
    eventLoop->apidata = state;
    return 0;
}
  • [1]:初始化 aeApiState 结构体;
  • [2]:创建 epoll 实例,并将 epoll 文件描述符存入 epfd
  • [3]:将 aeApiState 赋值给 aeEventLoop.apidata

aeApiAddEvent 注册文件事件

aeApiAddEvent 用于向 epoll 实例中注册文件事件,被添加的文件事件将会交给 epoll 监听:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    // [1]
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    // [2]
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    // [3]
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}
  • [1]:如果该文件描述符已经存在监听对象,则使用 EPOLL_CTL_MOD 修改监听对象,否则使用 EPOLL_CTL_ADD 添加监听对象;
  • [2]:将 AE 抽象层的事件类型转换为 epoll 事件类型,而且这里使用的是默认的边缘触发方式;
  • [3]:调用 epoll_ctl 执行添加或修改监听对象。

aeApiPoll 等待事件就绪

aeApiPoll 会在事件循环器的每次循环中被调用,负责阻塞进程,等待事件就绪或等待给定的时间到期:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    // [1] 阻塞等待事件就绪或者超时
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        int j;
        numevents = retval;
        // [2] 保存所有就绪的事件
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}
  • [1]:调用 epoll_wait 阻塞进程;
  • [2]:如果存在就绪事件,则将其装载到 eventLoop.fired 中:
    • epoll 中的 EPOLLIN 会转换为 AE 中的 AE_READABLE 事件;
    • EPOLLOUT 会转换为 AE 中的 AE_WRITABLE 事件;
    • EPOLLERREPOLLHUP 这两个连接异常状态会被转换为 AE_WRITABLE|AE_READABLE 事件,当发生异常时,直接交给读写中的任意一个函数处理断开的连接。

最终,aeApiPoll 将就绪的事件数量返回给 AE 抽象层,AE 抽象层再遍历所有就绪事件,同步调用 先前注册好的回调函数 完成事件的处理。