AE 事件处理抽象层
本文详细介绍了 Redis 6 的事件机制抽象层,以及事件数据结构 aeEventLoop、初始化过程和 aeMain 事件循环器的运行机制。
如无特殊说明,本文涉及的源码均位于 ae.c 与 ae.h 中
Redis 事件驱动机制概述
Redis 服务的核心是一个事件驱动器,它主要处理以下两种事件:
- 文件事件 (File Event):利于操作系统的 I/O 多路复用机制,监听 Socket 文件描述符上发生的事件;
- 时间事件 (Time Event):负责完成 Redis 内部生成 RDB 文件、清理过期数据等定时任务。
官方的 Redis Server 仅适配类 UNIX 系统,因此它选择了利用 I/O 多路复用机制来实现网络通信。I/O 多路复用是一种高性能 I/O 模型,几乎所有的类 UNIX 系统都支持它。该模型允许单个线程监视多个客户端连接,并在某个连接的状态发生变化时发送就绪事件,通知相应的线程来处理该连接的数据。
大多数类 UNIX 系统都支持 POSIX(可移植操作系统接口)定义的 select
函数,而且在此基础上,一些系统还提供了更高性能的非跨平台实现,比如 Linux 上的 epoll、Solaris 上的 evport
,以及 FreeBSD 上的 kqueue
等。Redis 为这些提供了高性能多路复用 API 的操作系统编写了特定的 AE 实现:Linux 平台的是 ae_epoll.c
、Solaris 平台的是 ae_evport.c
、FreeBSD 平台的是 ae_kqueue.c
。当操作系统不支持上述 API 时,Redis 也提供了 ae_select.c
,以调用符合 POSIX 标准的多路复用实现作为兜底方案。
不同平台上的高性能 I/O 多路复用 API 并不相同,而且它们与 POSIX 标准的接口也存在差异。因此,Redis 引入了一个事件驱动机制的抽象层,该抽象层会尽可能选择当前平台上性能最出色的多路复用器,并为不同的类 UNIX 系统提供一个统一的多路复用 API,以隐藏各个平台之间的差异。与这一抽象层相关的代码实现位于 ae.c 和 ae.h 文件中:
aeApiCreate
:初始化 I/O 复用机制的上下文环境;aeApiAddEvent
&aeApiDelEvent
:添加或删除一个监听对象;aeApiPoll
:阻塞线程,等待事件绪。
Redis 会在编译期间通过条件预处理指令选择适当的实现代码:
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
接下来,我们将详细介绍 Redis 事件机制的抽象层实现。
AE 数据结构
aeEventLoop 结构体
aeEventLoop
是 Redis 事件循环器的核心数据结构,负责管理事件:
typedef struct aeEventLoop {
int maxfd;
int setsize;
long long timeEventNextId;
aeFileEvent *events;
aeFiredEvent *fired;
aeTimeEvent *timeEventHead;
int stop;
void *apidata;
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;
maxfd
:当前已经注册的最大文件描述符;setsize
:该事件循环器允许监听的最大文件描述符;timeEventNextId
:下一个时间事件的 ID;events
:已经注册的事件;fired
:已经就绪的事件;timeEventHead
:时间事件的头节点指针;stop
:事件循环器是否停止;apidata
:存放 epoll 等底层多路复用器的句柄等数据;beforesleep
&aftersleep
:进程阻塞前后调用的钩子函数。
aeFileEvent 结构体
aeFileEvent
用于存储文件描述符上已注册的文件事件:
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
mask
:已注册的文件事件类型,可选值有:AE_READABLE
、AE_WRITABLE
、AE_BARRIER
;rfileProc
:AE_READABLE
事件处理函数;wfileProc
:AE_WRITABLE
事件处理函数;clientData
:附加数据。
aeFileEvent
并不记录文件描述符的 fd
句柄,这是因为 aeFileEvent
在 aeEventLoop
中作为 events
数组的一个元素存在,Redis 巧妙地将 aeEventLoop.events
数组中的每个索引值映射为当前文件的文件描述符。例如:aeEventLoop.events[x]
便能表示值为 x
的文件描述符的文件事件。
aeFiredEvent
当事件就绪后,操作系统的 I/O 复用器会将已就绪的事件转换为 aeFiredEvent
并存入 aeFiredEvent *fired
,等待事件循环器的处理:
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
fd
:产生事件的文件描述符;mask
:产生的事件类型。
aeTimeEvent
aeTimeEvent
用于存储时间事件的信息:
typedef struct aeTimeEvent {
long long id;
monotime when;
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount;
} aeTimeEvent;
id
:时间事件的 ID;when
:时间事件的下次执行时间;timeProc
:时间事件处理函数;finalizerProc
:时间事件终结函数;clientData
:客户端传入的附加数据;prev
、next
:链表指针;refcount
:引用计数,用于防止在递归时间事件调用中释放计时器事件。
Redis 事件机制的初始化过程
Redis 在启动时,会在 initServer 函数中完成事件机制的初始化。
事件循环器的创建
initServer
首先会通过 aeCreateEventLoop
函数创建事件循环器,该方法的核心就是创建操作系统层的 I/O 多路复用器句柄,例如 Linux 平台的 epfd
,实现如下:
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
monotonicInit(); /* just in case the calling app didn't initialize */
// [1] 初始化 aeEventLoop
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
...
// [2] 创建 I/O 多路复用器
if (aeApiCreate(eventLoop) == -1) goto err;
// [3] 初始化事件类型
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
// [4]
return eventLoop;
err:
... // free
}
[1]
:初始化aeEventLoop
结构体属性;[2]
:调用操作系统层的 I/O 多路复用器的函数,完成具体的多路复用器的创建;[3]
:初始化所有文件描述符的事件类型;[4]
:成功后返回,最终赋值给上层的redisServer server.el
。
定时事件监听器创建
然后再通过 aeCreateTimeEvent
函数创建时间事件,时间事件的处理函数为 serverCron
,负责处理 Redis 中的定时任务:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc) {
// [1] 初始化 aeTimeEvent
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
// [2] 设定定时任务第一次执行的时间
te->when = getMonotonicUs() + milliseconds * 1000;
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
// [3] 头插到 timeEventHead 链表
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
[1]
:开始初始化aeTimeEvent
结构体;[2]
:设置定时任务的第一次执行时间,即 1s 后;[3]
:头插到eventLoop->timeEventHead
链表头部。
timeProc
指向的定时任务处理函数 serverCron
非常重要,它负责处理 Redis 中的大部分定时任务,例如定时清除过期数据、定时持久化数据、清除过期客户端等(剩下的部分在 beforeSleep
中触发)。
Socket 事件监听器创建
最后通过 aeCreateFileEvent
创建 TCP Socket、TLS Socket、Pipe 等套接字的可读事件监听器:
/**
* @param fd 需要监听的文件描述符
* @param mask 监听的事件类型
* @param proc 事件处理函数
* @param clientData 附加数据
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData) {
// [1] fd 超了返回错误
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
// [2] 添加事件到多路复用器
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
// [3] 注册事件处理的回调函数
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
[1]
:超出最大文件描述符限制后,返回错误;[2]
:调用操作系统层的多路复用 API,注册事件监听对象;[3]
:初始化aeFileEvent
结构体,这里的重点是 20 ~ 21 行对文件可读、可写事件处理函数的注册。
Redis 事件循环器的运行细节
在完成上述事件处理器的初始化后,Redis 会调用 aeMain()
启动事件循环器:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// flag:监听所有类型事件,并启用 xxxSleep 回调策略
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
aeMain()
就是个大循环,内部不断调用 aeProcessEvents()
处理 Redis 中已就绪的事件:
/**
* @param flags 指定要处理的事件类型和处理策略,可选值如下:
* AE_ALL_EVENTS:处理所有事件
* AE_FILE_EVENTS:处理文件事件
* AE_TIME_EVENTS:处理时间事件
* AE_DONT_WAIT:是否阻塞进程
* AE_CALL_AFTER_SLEEP:阻塞后是否调用 afterSleep 回调
* AE_CALL_BEFORE_SLEEP:阻塞前是否调用 beforeSleep 回调
*/
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int processed = 0, numevents;
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
// [1] 计算下次定时任务的执行时间
struct timeval tv, *tvp;
int64_t usUntilTimer = -1;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
// 计算时间间隔
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
} else {
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
tvp = NULL; // 表示关闭定时任务
}
}
// [2] 阻塞前,调用 beforesleep
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
// [3] 拉取就绪的事件,第二个参数由定时任务控制
numevents = aeApiPoll(eventLoop, tvp);
// [4] 唤醒后,调用 aftersleep
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
[1]
:计算下次定时任务执行时间,计算方法如下:- 调用
usUntilEarliestTimer
计算eventLoop.timeEventHead
链表上最先需要执行的节点的时间戳,到当前时间点的间隔usUntilTimer
:- 如果链表为 NULL 则返回 -1;
- 如果已经迟到了,则返回 0;
- 否则返回正常的时间间隔。
- 如果
usUntilTimer
> 0,则正常计算超时时间,存入tvp
。 - 如果
usUntilTimer
<= 0,则判断flags
参数:- 如果上层配置了
AE_DONT_WAIT
,则将tvp
设为 0,表示aeApiPoll
不会阻塞; - 否则将
tvp
设为 NULL,表示aeApiPoll
将一直阻塞直到有 I/O 事件产生,这意味着时间事件机制将被关闭。
- 如果上层配置了
- 调用
[2]
:休眠前调用beforesleep
钩子函数;[3]
:调用底层的 I/O 复用器,阻塞等待事件就绪或到达定时任务执行时间;[4]
:唤醒后调用aftersleep
钩子函数。
// [5] 开始处理每个就绪的事件
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0;
// [6] 未反转情形下,先处理可读事件
int invert = fe->mask & AE_BARRIER;
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
// [7] 再处理可写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
// [8] 反转情况下,后处理可读事件
if (invert) {
fe = &eventLoop->events[fd];
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
// [9] 执行时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed;
}
[5]
:处理已经就绪的文件事件;[6]
:如果就绪的是AE_READABLE
事件,且mask
中未设置AE_BARRIER
标志,则调用rfileProc
处理可读事件;[7]
:如果就绪的是AE_WRITABLE
事件,则调用wfileProc
处理可写事件;[8]
:如果就绪事件的mask
中设置了AE_BARRIER
标志,则在这里延后处理可读事件;通常情况下,Redis 都会优先处理可读事件,再处理可写事件,这有助于服务器尽快处理请求并回复结果。但当设置了
AE_BARRIER
时,Redis 会优先处理写请求。[9]
:调用processTimeEvents
处理时间事件。
processTimeEvents
的实现如下:
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
// [1] 从链表头开始向后遍历
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
monotime now = getMonotonicUs();
while(te) {
long long id;
// [2] 删除已经失效的节点
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
...
zfree(te);
te = next;
continue;
}
...
// [3] 当前节点执行事件到了,执行并更新下次执行时间
if (te->when <= now) {
int retval;
id = te->id;
te->refcount++;
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
now = getMonotonicUs();
if (retval != AE_NOMORE) {
te->when = now + retval * 1000;
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
// [4] 处理下一个节点
te = te->next;
}
return processed;
}
[1]
:从timeEventHead
链表头开始,遍历时间事件;[2]
:如果时间事件已被删除,则将其从链表上移除;[3]
:当前事件已到达执行时间,则执行aeTimeEvent.timeProc
指向的serverCron()
函数。该函数会执行定时任务并返回下次再执行的时间间隔,如果返回值等于AE_NOMORE
代表该事件需要被删除,然后将aeTimeEvent.id
设置为AE_DELETED_EVENT_ID
,以便在下一轮循环中删除;[4]
:处理下一个事件。
由于 Redis 中只有 serverCron
一种时间事件,所以这里直接遍历所有时间事件也不会有性能问题。
serverCron 定时任务细节
serverCron()
在 Redis 中扮演着相当重要的角色,其内部主要承载着以下任务:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
// [1] 启动定时看门狗
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
...
// [2] 如果启用 dynamic_hz,则根据客户端的数量调整 server.hz 的值
server.hz = server.config_hz;
if (server.dynamic_hz) {
...
}
...
// [3] 更新 server 指标信息
run_with_period(100) {
... // Metric
}
// [4] 设置 lruclock
unsigned int lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
// [5] 更新内存指标信息
cronUpdateMemoryStats();
// [6] 判断服务器关闭标识
if (server.shutdown_asap) {
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
...
server.shutdown_asap = 0;
}
...
// [7] 管理客户端资源
clientsCron();
// [8] 删除过期 key、压缩内存数据库、进行 rehash 操作
databasesCron();
[1]
:启动定时看门狗,防止定时任务执行时间太长;[2]
:如果启用了server.dynamic_hz
,则根据客户端的数量调整server.hz
的值。这意味着当连接的客户端数量增多时,Redis 服务器会相应地提高定时任务的执行频率,上限为CONFIG_MAX_HZ (500)
;[3]
:更新 server 统计指标信息;[4]
:设置 lruclock;[5]
:更新内存指标信息;[6]
:判断服务器关闭标识,如果收到了停机信号,则退出主程序;[7]
:管理客户端资源,释放超时的客户端连接;[8]
:删除过期 key、压缩内存数据库、根据需要进行 rehash 操作;// [9] 存在延迟的 AOF 重写操作,则在这里完成 if (!hasActiveChildProcess() && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); } if (hasActiveChildProcess() || ldbPendingChildren()) { run_with_period(1000) receiveChildInfo(); // [10] 主进程收尾 checkChildrenDone(); } else { // [11] 条件满足时,后台生成 RDB ... rsiptr = rdbPopulateSaveInfo(&rsi); rdbSaveBackground(server.rdb_filename,rsiptr); ... // [12] 条件满足时,后台重写 AOF ... rewriteAppendOnlyFileBackground(); ... } ... // [13] 存在延迟的 AOF 落盘操作,则在这里完成 if (server.aof_state == AOF_ON && server.aof_flush_postponed_start) flushAppendOnlyFile(0); // [14] 由 period 宏控制的正常周期内,也需要落盘 AOF run_with_period(1000) { if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR) flushAppendOnlyFile(0); } ...
[9] - [14]
:RDB 后台生成和 AOF 重写、AOF 落盘相关内容;// [15] 重新连接到主服务器、检测传输失败、启动后台 RDB 传输等 if (server.failover_state != NO_FAILOVER) { run_with_period(100) replicationCron(); } else { run_with_period(1000) replicationCron(); } // [16] cluster 定时任务 run_with_period(100) { if (server.cluster_enabled) clusterCron(); } // [17] sentinel 定时任务 if (server.sentinel_mode) sentinelTimer(); ... // [18] 如果没有足够的任务处理,则停用 I/O 线程 stopThreadedIOIfNeeded(); ... // [19] 补偿被 AOF 重写推迟的 RDB 后台生成任务 if (!hasActiveChildProcess() && server.rdb_bgsave_scheduled && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) server.rdb_bgsave_scheduled = 0; } // [20] 触发模块的 cron 循环事件 RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz}; moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP, 0, &ei); // [21] 返回 serverCron 执行频率 server.cronloops++; return 1000/server.hz; }
[15] - [18]
:cluster、sentinel、I/O 线程相关操作;[19]
:补偿被 AOF 重写推迟的 RDB 后台生成任务;[20]
:触发模块的 cron 循环事件;[21]
:返回 serverCron 执行频率:- 可以通过
server.hz
控制serverCron
的执行频率,默认为 10,表示每秒执行 10 次,每次间隔 100ms; - 如果启用了
server.dynamic_hz
,当客户端数量增多时,server.hz
最多可以增至 500,即每 2ms 执行一次。
- 可以通过