On this page
article
AE_EOLL 多路复用器
本文详细介绍了 Redis 6 的 AE 事件管理机制对 epoll 的封装,即 aeApiCreate()、aeApiAddEvent()、aeApiPoll() 函数的实现。
如无特殊说明,本文涉及的源码均位于 ae_epoll.c 中
上文 我们介绍过,Redis 通过 AE 抽象层适配了多种类 UNIX 平台的 I/O 多路复用实现,接下来我们将以最常用的 Linux 平台为例,重点介绍下 Redis 是如何在 ae_epoll.c 中封装 epoll 的。
数据结构
Redis 通过结构体 aeApiState
存放 epoll 数据:
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
epfd
:存放 epoll 文件描述符句柄;events
:epoll 的事件结构体,用于接收已经就绪的事件。
aeApiCreate 初始化
aeApiCreate
函数负责初始化事件循环器,它内部会调用 epoll_create
初始化多路复用器:
static int aeApiCreate(aeEventLoop *eventLoop) {
// [1] 初始化 aeApiState
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
// [2] 创建 epoll 实例
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
anetCloexec(state->epfd);
// [3] 把 epoll 实例存到 eventLoop
eventLoop->apidata = state;
return 0;
}
[1]
:初始化aeApiState
结构体;[2]
:创建 epoll 实例,并将 epoll 文件描述符存入epfd
;[3]
:将aeApiState
赋值给aeEventLoop.apidata
。
aeApiAddEvent 注册文件事件
aeApiAddEvent
用于向 epoll 实例中注册文件事件,被添加的文件事件将会交给 epoll 监听:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
// [1]
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
// [2]
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
// [3]
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
[1]
:如果该文件描述符已经存在监听对象,则使用EPOLL_CTL_MOD
修改监听对象,否则使用EPOLL_CTL_ADD
添加监听对象;[2]
:将 AE 抽象层的事件类型转换为 epoll 事件类型,而且这里使用的是默认的边缘触发方式;[3]
:调用epoll_ctl
执行添加或修改监听对象。
aeApiPoll 等待事件就绪
aeApiPoll
会在事件循环器的每次循环中被调用,负责阻塞进程,等待事件就绪或等待给定的时间到期:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// [1] 阻塞等待事件就绪或者超时
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
// [2] 保存所有就绪的事件
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
[1]
:调用epoll_wait
阻塞进程;[2]
:如果存在就绪事件,则将其装载到eventLoop.fired
中:- epoll 中的
EPOLLIN
会转换为 AE 中的AE_READABLE
事件; EPOLLOUT
会转换为 AE 中的AE_WRITABLE
事件;EPOLLERR
和EPOLLHUP
这两个连接异常状态会被转换为AE_WRITABLE|AE_READABLE
事件,当发生异常时,直接交给读写中的任意一个函数处理断开的连接。
- epoll 中的
最终,aeApiPoll
将就绪的事件数量返回给 AE 抽象层,AE 抽象层再遍历所有就绪事件,同步调用 先前注册好的回调函数 完成事件的处理。