如无特殊说明,本文涉及的源码均位于 ae.c 与 ae.h 中


Redis 事件驱动机制概述

Redis 服务的核心是一个事件驱动器,它主要处理以下两种事件:

  • 文件事件 (File Event):利于操作系统的 I/O 多路复用机制,监听 Socket 文件描述符上发生的事件;
  • 时间事件 (Time Event):负责完成 Redis 内部生成 RDB 文件、清理过期数据等定时任务。

官方的 Redis Server 仅适配类 UNIX 系统,因此它选择了利用 I/O 多路复用机制来实现网络通信。I/O 多路复用是一种高性能 I/O 模型,几乎所有的类 UNIX 系统都支持它。该模型允许单个线程监视多个客户端连接,并在某个连接的状态发生变化时发送就绪事件,通知相应的线程来处理该连接的数据。

大多数类 UNIX 系统都支持 POSIX(可移植操作系统接口)定义的 select 函数,而且在此基础上,一些系统还提供了更高性能的非跨平台实现,比如 Linux 上的 epoll、Solaris 上的 evport,以及 FreeBSD 上的 kqueue 等。Redis 为这些提供了高性能多路复用 API 的操作系统编写了特定的 AE 实现:Linux 平台的是 ae_epoll.c、Solaris 平台的是 ae_evport.c、FreeBSD 平台的是 ae_kqueue.c。当操作系统不支持上述 API 时,Redis 也提供了 ae_select.c,以调用符合 POSIX 标准的多路复用实现作为兜底方案。

不同平台上的高性能 I/O 多路复用 API 并不相同,而且它们与 POSIX 标准的接口也存在差异。因此,Redis 引入了一个事件驱动机制的抽象层,该抽象层会尽可能选择当前平台上性能最出色的多路复用器,并为不同的类 UNIX 系统提供一个统一的多路复用 API,以隐藏各个平台之间的差异。与这一抽象层相关的代码实现位于 ae.c 和 ae.h 文件中:

  • aeApiCreate:初始化 I/O 复用机制的上下文环境;
  • aeApiAddEvent & aeApiDelEvent:添加或删除一个监听对象;
  • aeApiPoll:阻塞线程,等待事件绪。

Redis 会在编译期间通过条件预处理指令选择适当的实现代码:

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

接下来,我们将详细介绍 Redis 事件机制的抽象层实现。

AE 数据结构

aeEventLoop 结构体

aeEventLoop 是 Redis 事件循环器的核心数据结构,负责管理事件:

typedef struct aeEventLoop {
    int maxfd;
    int setsize;
    long long timeEventNextId;
    aeFileEvent *events;
    aeFiredEvent *fired;
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata;
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;
  • maxfd:当前已经注册的最大文件描述符;
  • setsize:该事件循环器允许监听的最大文件描述符;
  • timeEventNextId:下一个时间事件的 ID;
  • events:已经注册的事件;
  • fired:已经就绪的事件;
  • timeEventHead:时间事件的头节点指针;
  • stop:事件循环器是否停止;
  • apidata:存放 epoll 等底层多路复用器的句柄等数据;
  • beforesleep & aftersleep:进程阻塞前后调用的钩子函数。

aeFileEvent 结构体

aeFileEvent 用于存储文件描述符上已注册的文件事件:

typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;
  • mask:已注册的文件事件类型,可选值有:AE_READABLEAE_WRITABLEAE_BARRIER
  • rfileProcAE_READABLE 事件处理函数;
  • wfileProcAE_WRITABLE 事件处理函数;
  • clientData:附加数据。

aeFileEvent 并不记录文件描述符的 fd 句柄,这是因为 aeFileEventaeEventLoop 中作为 events 数组的一个元素存在,Redis 巧妙地将 aeEventLoop.events 数组中的每个索引值映射为当前文件的文件描述符。例如:aeEventLoop.events[x] 便能表示值为 x 的文件描述符的文件事件。

aeFiredEvent

当事件就绪后,操作系统的 I/O 复用器会将已就绪的事件转换为 aeFiredEvent 并存入 aeFiredEvent *fired,等待事件循环器的处理:

typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;
  • fd:产生事件的文件描述符;
  • mask:产生的事件类型。

aeTimeEvent

aeTimeEvent 用于存储时间事件的信息:

typedef struct aeTimeEvent {
    long long id;
    monotime when;
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
    int refcount;
} aeTimeEvent;
  • id:时间事件的 ID;
  • when:时间事件的下次执行时间;
  • timeProc:时间事件处理函数;
  • finalizerProc:时间事件终结函数;
  • clientData:客户端传入的附加数据;
  • prevnext:链表指针;
  • refcount:引用计数,用于防止在递归时间事件调用中释放计时器事件。

Redis 事件机制的初始化过程

Redis 在启动时,会在 initServer 函数中完成事件机制的初始化。

事件循环器的创建

initServer 首先会通过 aeCreateEventLoop 函数创建事件循环器,该方法的核心就是创建操作系统层的 I/O 多路复用器句柄,例如 Linux 平台的 epfd,实现如下:

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
    monotonicInit(); /* just in case the calling app didn't initialize */
    // [1] 初始化 aeEventLoop
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    ...
    // [2] 创建 I/O 多路复用器
    if (aeApiCreate(eventLoop) == -1) goto err;
    // [3] 初始化事件类型
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    // [4]
    return eventLoop;

err:
    ... // free
}
  • [1]:初始化 aeEventLoop 结构体属性;
  • [2]:调用操作系统层的 I/O 多路复用器的函数,完成具体的多路复用器的创建;
  • [3]:初始化所有文件描述符的事件类型;
  • [4]:成功后返回,最终赋值给上层的 redisServer server.el

定时事件监听器创建

然后再通过 aeCreateTimeEvent 函数创建时间事件,时间事件的处理函数为 serverCron,负责处理 Redis 中的定时任务:

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc) {
    // [1] 初始化 aeTimeEvent
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    // [2] 设定定时任务第一次执行的时间
    te->when = getMonotonicUs() + milliseconds * 1000;
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    // [3] 头插到 timeEventHead 链表
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}
  • [1]:开始初始化 aeTimeEvent 结构体;
  • [2]:设置定时任务的第一次执行时间,即 1s 后;
  • [3]:头插到 eventLoop->timeEventHead 链表头部。

timeProc 指向的定时任务处理函数 serverCron 非常重要,它负责处理 Redis 中的大部分定时任务,例如定时清除过期数据、定时持久化数据、清除过期客户端等(剩下的部分在 beforeSleep 中触发)。

Socket 事件监听器创建

最后通过 aeCreateFileEvent 创建 TCP Socket、TLS Socket、Pipe 等套接字的可读事件监听器:

/**
 * @param fd 需要监听的文件描述符
 * @param mask 监听的事件类型
 * @param proc 事件处理函数
 * @param clientData 附加数据
 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData) {
    // [1] fd 超了返回错误
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];
    // [2] 添加事件到多路复用器
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    // [3] 注册事件处理的回调函数
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}
  • [1]:超出最大文件描述符限制后,返回错误;
  • [2]:调用操作系统层的多路复用 API,注册事件监听对象;
  • [3]:初始化 aeFileEvent 结构体,这里的重点是 20 ~ 21 行对文件可读、可写事件处理函数的注册。

Redis 事件循环器的运行细节

在完成上述事件处理器的初始化后,Redis 会调用 aeMain() 启动事件循环器:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        // flag:监听所有类型事件,并启用 xxxSleep 回调策略
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
            AE_CALL_BEFORE_SLEEP|
            AE_CALL_AFTER_SLEEP);
    }
}

aeMain() 就是个大循环,内部不断调用 aeProcessEvents() 处理 Redis 中已就绪的事件:

/**
 * @param flags 指定要处理的事件类型和处理策略,可选值如下:
 *    AE_ALL_EVENTS:处理所有事件
 *    AE_FILE_EVENTS:处理文件事件
 *    AE_TIME_EVENTS:处理时间事件
 *    AE_DONT_WAIT:是否阻塞进程
 *    AE_CALL_AFTER_SLEEP:阻塞后是否调用 afterSleep 回调
 *    AE_CALL_BEFORE_SLEEP:阻塞前是否调用 beforeSleep 回调
 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    int processed = 0, numevents;
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        // [1] 计算下次定时任务的执行时间
        struct timeval tv, *tvp;
        int64_t usUntilTimer = -1;
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // 计算时间间隔
            usUntilTimer = usUntilEarliestTimer(eventLoop);
        if (usUntilTimer >= 0) {
            tv.tv_sec = usUntilTimer / 1000000;
            tv.tv_usec = usUntilTimer % 1000000;
            tvp = &tv;
        } else {
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                tvp = NULL; // 表示关闭定时任务
            }
        }
        // [2] 阻塞前,调用 beforesleep
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        // [3] 拉取就绪的事件,第二个参数由定时任务控制
        numevents = aeApiPoll(eventLoop, tvp);

        // [4] 唤醒后,调用 aftersleep
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);
  • [1]:计算下次定时任务执行时间,计算方法如下:
    • 调用 usUntilEarliestTimer 计算 eventLoop.timeEventHead 链表上最先需要执行的节点的时间戳,到当前时间点的间隔 usUntilTimer
      • 如果链表为 NULL 则返回 -1;
      • 如果已经迟到了,则返回 0;
      • 否则返回正常的时间间隔。
    • 如果 usUntilTimer > 0,则正常计算超时时间,存入 tvp
    • 如果 usUntilTimer <= 0,则判断 flags 参数:
      • 如果上层配置了 AE_DONT_WAIT,则将 tvp 设为 0,表示 aeApiPoll 不会阻塞;
      • 否则将 tvp 设为 NULL,表示 aeApiPoll 将一直阻塞直到有 I/O 事件产生,这意味着时间事件机制将被关闭。
  • [2]:休眠前调用 beforesleep 钩子函数;
  • [3]:调用底层的 I/O 复用器,阻塞等待事件就绪或到达定时任务执行时间;
  • [4]:唤醒后调用 aftersleep 钩子函数。
        // [5] 开始处理每个就绪的事件
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0;
            // [6] 未反转情形下,先处理可读事件
            int invert = fe->mask & AE_BARRIER;
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }
            // [7] 再处理可写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            // [8] 反转情况下,后处理可读事件
            if (invert) {
                fe = &eventLoop->events[fd];
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    // [9] 执行时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    return processed;
}
  • [5]:处理已经就绪的文件事件;
  • [6]:如果就绪的是 AE_READABLE 事件,且 mask 中未设置 AE_BARRIER 标志,则调用 rfileProc 处理可读事件;
  • [7]:如果就绪的是 AE_WRITABLE 事件,则调用 wfileProc 处理可写事件;
  • [8]:如果就绪事件的 mask 中设置了 AE_BARRIER 标志,则在这里延后处理可读事件;

    通常情况下,Redis 都会优先处理可读事件,再处理可写事件,这有助于服务器尽快处理请求并回复结果。但当设置了 AE_BARRIER 时,Redis 会优先处理写请求。

  • [9]:调用 processTimeEvents 处理时间事件。

processTimeEvents 的实现如下:

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    // [1] 从链表头开始向后遍历
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    monotime now = getMonotonicUs();
    while(te) {
        long long id;
        // [2] 删除已经失效的节点
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            ...
            zfree(te);
            te = next;
            continue;
        }
        ...
        // [3] 当前节点执行事件到了,执行并更新下次执行时间
        if (te->when <= now) {
            int retval;
            id = te->id;
            te->refcount++;
            retval = te->timeProc(eventLoop, id, te->clientData);
            te->refcount--;
            processed++;
            now = getMonotonicUs();
            if (retval != AE_NOMORE) {
                te->when = now + retval * 1000;
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        // [4] 处理下一个节点
        te = te->next;
    }
    return processed;
}
  • [1]:从 timeEventHead 链表头开始,遍历时间事件;
  • [2]:如果时间事件已被删除,则将其从链表上移除;
  • [3]:当前事件已到达执行时间,则执行 aeTimeEvent.timeProc 指向的 serverCron() 函数。该函数会执行定时任务并返回下次再执行的时间间隔,如果返回值等于 AE_NOMORE 代表该事件需要被删除,然后将 aeTimeEvent.id 设置为 AE_DELETED_EVENT_ID,以便在下一轮循环中删除;
  • [4]:处理下一个事件。

由于 Redis 中只有 serverCron 一种时间事件,所以这里直接遍历所有时间事件也不会有性能问题。

serverCron 定时任务细节

serverCron() 在 Redis 中扮演着相当重要的角色,其内部主要承载着以下任务:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
    // [1] 启动定时看门狗
    if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
    ...
    // [2] 如果启用 dynamic_hz,则根据客户端的数量调整 server.hz 的值
    server.hz = server.config_hz;
    if (server.dynamic_hz) {
        ...
    }
    ...
    // [3] 更新 server 指标信息
    run_with_period(100) {
        ... // Metric
    }
    // [4] 设置 lruclock
    unsigned int lruclock = getLRUClock();
    atomicSet(server.lruclock,lruclock);
    // [5] 更新内存指标信息
    cronUpdateMemoryStats();
    // [6] 判断服务器关闭标识
    if (server.shutdown_asap) {
        if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
        ...
        server.shutdown_asap = 0;
    }
    ...
    // [7] 管理客户端资源
    clientsCron();
    // [8] 删除过期 key、压缩内存数据库、进行 rehash 操作
    databasesCron();
  • [1]:启动定时看门狗,防止定时任务执行时间太长;
  • [2]:如果启用了 server.dynamic_hz,则根据客户端的数量调整 server.hz 的值。这意味着当连接的客户端数量增多时,Redis 服务器会相应地提高定时任务的执行频率,上限为 CONFIG_MAX_HZ (500)
  • [3]:更新 server 统计指标信息;
  • [4]:设置 lruclock;
  • [5]:更新内存指标信息;
  • [6]:判断服务器关闭标识,如果收到了停机信号,则退出主程序;
  • [7]:管理客户端资源,释放超时的客户端连接;
  • [8]删除过期 key、压缩内存数据库、根据需要进行 rehash 操作
        // [9] 存在延迟的 AOF 重写操作,则在这里完成
        if (!hasActiveChildProcess() &&
            server.aof_rewrite_scheduled) {
            rewriteAppendOnlyFileBackground();
        }
        if (hasActiveChildProcess() || ldbPendingChildren()) {
            run_with_period(1000) receiveChildInfo();
            // [10] 主进程收尾
            checkChildrenDone();
        } else {
            // [11] 条件满足时,后台生成 RDB
            ...
            rsiptr = rdbPopulateSaveInfo(&rsi);
            rdbSaveBackground(server.rdb_filename,rsiptr);
            ...
            // [12] 条件满足时,后台重写 AOF
            ...
            rewriteAppendOnlyFileBackground();
            ...
        }
        ...
        // [13] 存在延迟的 AOF 落盘操作,则在这里完成
        if (server.aof_state == AOF_ON && server.aof_flush_postponed_start)
            flushAppendOnlyFile(0);
        // [14] 由 period 宏控制的正常周期内,也需要落盘 AOF
        run_with_period(1000) {
            if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR)
                flushAppendOnlyFile(0);
        }
        ...
    
  • [9] - [14]RDB 后台生成和 AOF 重写、AOF 落盘相关内容
        // [15] 重新连接到主服务器、检测传输失败、启动后台 RDB 传输等
        if (server.failover_state != NO_FAILOVER) {
            run_with_period(100) replicationCron();
        } else {
            run_with_period(1000) replicationCron();
        }
        // [16] cluster 定时任务
        run_with_period(100) {
            if (server.cluster_enabled) clusterCron();
        }
        // [17] sentinel 定时任务
        if (server.sentinel_mode) sentinelTimer();
        ...
        // [18] 如果没有足够的任务处理,则停用 I/O 线程
        stopThreadedIOIfNeeded();
        ...
        // [19] 补偿被 AOF 重写推迟的 RDB 后台生成任务
        if (!hasActiveChildProcess() &&
            server.rdb_bgsave_scheduled &&
            (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
             server.lastbgsave_status == C_OK))
        {
            rdbSaveInfo rsi, *rsiptr;
            rsiptr = rdbPopulateSaveInfo(&rsi);
            if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
                server.rdb_bgsave_scheduled = 0;
        }
    
        // [20] 触发模块的 cron 循环事件
        RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
        moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP, 0, &ei);
        // [21] 返回 serverCron 执行频率
        server.cronloops++;
        return 1000/server.hz;
    }
    
  • [15] - [18]:cluster、sentinel、I/O 线程相关操作;
  • [19]:补偿被 AOF 重写推迟的 RDB 后台生成任务;
  • [20]:触发模块的 cron 循环事件;
  • [21]:返回 serverCron 执行频率:
    • 可以通过 server.hz 控制 serverCron 的执行频率,默认为 10,表示每秒执行 10 次,每次间隔 100ms;
    • 如果启用了 server.dynamic_hz,当客户端数量增多时,server.hz 最多可以增至 500,即每 2ms 执行一次。