如无特殊说明,本文涉及的源码均位于 networking.c 中


在 Redis 6 之前,网络 I/O 数据的读写都是在 aeMain 大循环中以单线程方式处理的。然而,在数据请求量大的情况下,网络 I/O 数据的读写会占用大量的 CPU 时间,容易成为性能的瓶颈。因此,Redis 6 引入了 I/O 线程机制,将不同客户端的 I/O 数据读写操作分配给不同的 I/O 线程进行处理,从而提升网络吞吐量。

我们可以通过 io-threads 配置项设置 I/O 线程的数量,默认值为 1,表示不启用 I/O 线程。

接下来我们详细介绍 Redis I/O 线程的源码实现。

POSIX 线程标准概述

在正式介绍 Redis I/O 线程之前,我们先来了解一下类 Unix 系统上的多线程实现。有过 Linux C 开发经验的同学应该对 pthread 非常熟悉,在 Linux 系统上创建线程,通常需要使用第三方库 libpthread.so。libpthread 是 POSIX 线程标准的实现,为开发者提供了在应用程序中创建、管理和同步多个线程的支持。以下是 libpthread 中常用的 API。

创建线程 pthread_create

POSIX 标准定义了如下函数用于创建线程:

int pthread_create(pthread_t *thread, 
    const pthread_attr_t *attr, 
    void *(*start_routine)(void *),
    void *arg);
  • thread:线程标识符,表示进程内的唯一线程 ID;
  • attr:设置线程属性;
  • start_routine:线程运行函数指针;
  • arg:线程运行函数的参数。

互斥量 pthread_mutex_t

POSIX 标准定义了表示互斥量的结构体 pthread_mutex_t,通过对该结构体加锁可以保证在任一时刻只有一个线程可以访问加锁资源。

通过 pthread_mutex_init 函数可以初始化互斥量:

int pthread_mutex_init(pthread_mutex_t *mutex,
    const pthread_mutexattr_t *attr);
  • mutex:要初始化的互斥锁;
  • attr:互斥量的属性,一般为 NULL,表示默认属性。

通过 pthread_mutex_lock 函数可以完成加锁操作:

int pthread_mutex_lock(pthread_mutex_t *mutex);
  • 对互斥量加锁。该操作会抢夺互斥量,如果互斥量被其他线程抢占,则加锁失败阻塞线程,直到锁定该互斥量的线程解锁,当前线程才能再次抢夺互斥量。

通过 pthread_mutex_unlock 函数可以完成解锁操作:

int pthread_mutex_unlock(pthread_mutex_t *mutex);
  • 对互斥量解锁。在解锁的同时,将阻塞在该锁上的其它线程全部唤醒,至于哪个先被唤醒,取决于系统调度。

条件变量 pthread_cond_t

POSIX 标准定义了 pthread_cond_t,这个数据类型表示条件变量,类似于 Java AQS ConditionObject,用于等待特定条件的发生。

以下是条件变量的常用 API:

  • 初始化条件变量:

        int pthread_cond_init(
            pthread_cond_t *cond,
            const pthread_condattr_t *attr
        );
        
    • cond:待初始化的条件变量;
    • attr:条件变量的属性,一般为 NULL,表示默认属性。
  • 销毁条件变量:

        int pthread_cond_destroy(pthread_cond_t *cond);
        
  • 等待条件变量:

        // 永久等待
        int pthread_cond_wait(
            pthread_cond_t *cond,
            pthread_mutex_t *mutex);
        // 超时等待
        int pthread_cond_timedwait(
            pthread_cond_t *cond,
            pthread_mutex_t *mutex,
            const struct timespec *abstime);
        
    • cond:要等待的条件变量;
    • mutex:线程互斥锁,有条件变量的地方一定要互斥锁
    • abstime:超时的绝对时间戳。

Redis I/O 线程的初始化

Redis 在 networking.c/redisServer 结构体中定义了如下变量:

  • io_threads[]pthread_t 数组,存储所有线程的标识符;
  • io_threads_mutex[]pthread_mutex_t 数组,存储启停 I/O 线程的互斥量;
  • io_threads_opint 型变量,标志当前 I/O 线程执行的是 read (0) 还是 write (1);
  • io_threads_list[]list 数组,存储每个线程需要处理的客户端;
  • io_threads_pending[]redisAtomic unsigned long 原子长整型数组,记录每个线程待处理的客户端数量。

Redis Server 启动时,会在 server.c/InitServerLast 函数中调用 initThreadedIO() 创建 I/O 线程:

void initThreadedIO(void) {
    // [1] 初始化 io 线程标志为停用状态
    server.io_threads_active = 0;
    // [2] 如果 io_threads_num 被设置为 1,禁用 io 线程
    if (server.io_threads_num == 1) return;
    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        ...
        exit(1);
    }
    for (int i = 0; i < server.io_threads_num; i++) {
        // [3] i == 0 的表示主线程,跳过
        io_threads_list[i] = listCreate();
        if (i == 0) continue;
        pthread_t tid;
        // [4] 初始化互斥量,并存入数组
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        setIOPendingCount(i, 0);
        // [5] 先锁定所有互斥量,然后创建 io 线程
        pthread_mutex_lock(&io_threads_mutex[i]);
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            ...
            exit(1);
        }
        io_threads[i] = tid;
    }
}
  • [1]:I/O 线程默认处于 停用状态,由 io_threads_active 属性标识,所以一开始先将标志设为 0;
  • [2]:获取用户配置,如果 io_threads_num 为 1,表示禁用 I/O 线程,退出初始化过程、如果 io_threads_num 超过了允许的最大值,直接报错并退出程序;
  • [3]:创建每个线程的客户端容器,如果 i == 0,表示当前线程是主线程,已经存在了直接跳过;
  • [4]:初始化 I/O 线程的互斥量,并存入 io_threads_mutex 数组;
  • [5]:先锁定所有 I/O 线程的互斥量,然后创建 I/O 线程,线程的执行函数为 IOThreadMain,线程的 ID 为从 1 开始的数组下标(0 为主线程,不需要再创建了)。

多线程场景下的请求解析

Redis 命令解析过程 一文中,我们介绍过当 Redis 启用了 I/O 线程时,主线程会将 client 添加到 server.clients_pending_read 链表,等待 I/O 线程去处理。而 server.clients_pending_read 链表上客户端的线程分配工作,则是由主线程的 beforeSleep() 触发的,实现如下:

int handleClientsWithPendingReadsUsingThreads(void) {
    // [1] 如果未启用 io 线程,直接返回
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    ...
    // [2] 将待处理的客户端划分到每个 io 线程的客户端队列
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li); // 创建迭代器
    int item_id = 0;
    while((ln = listNext(&li))) { // 迭代 pending 队列
        client *c = listNodeValue(ln);
        // 通过取余分配 io 线程
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    // [3] 设置 io_threads_op 以及每个线程的 io_threads_pending
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }
    // [4] 主线程处理分配给自己的客户端
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);
    // [5] 等待所有线程都处理完毕
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }
    // [6] 主线程遍历所有客户端,执行里边的命令
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);
        serverAssert(!(c->flags & CLIENT_BLOCKED));
        if (processPendingCommandsAndResetClient(c) == C_ERR) {
            continue;
        }
        processInputBuffer(c);
        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            clientInstallWriteHandler(c);
    }
    server.stat_io_reads_processed += processed;

    return processed;
}
  • [1]:如果 I/O 线程处于未开启状态,或者 io_threads_do_reads 配置未使能,则直接返回;

  • [2]:将 pending 队列上待处理的客户端分配到各个 I/O 线程的队列中;

    在主线程中提前分配好待处理的客户端,这样每个 I/O 线程只需要处理自己的客户端即可,避免了线程之间的竞争。

  • [3]:设置 io_threads_op 以及每个线程的待处理客户端数量 io_threads_pending[id]

    I/O 线程会不断检测自己的 io_threads_pending,当这个值不为 0 时开始工作,处理完成后再将这个值归零。

  • [4]:主线程读取并解析分配给自己的客户端的请求,这里调用的是 readQueryFromClient

  • [5]:主线程解析完自己的客户端请求后,等待其它 I/O 线程也处理完毕。判断方法为所有 I/O 线程的 io_threads_pending 总和是否为 0,如果为 0 表示已经处理完毕,可以继续向下执行;

  • [6]:由于 I/O 线程只负责读取并解析数据,不负责指令的执行,因此主线程在这里以单线程的方式统一处理,处理函数为 processPendingCommandsAndResetClient()

    由上可知,虽然 Redis 6 引入了 I/O 线程,但这些线程只负责 I/O 操作,最终命令的执行依旧是由主线程自己完成的,这么做的原因如下:

    • Redis 作为内存数据库,命令执行操作通常不会成为性能瓶颈,网络 I/O 才是开销大头。
    • 单线程处理内存数据,极大的降低了系统设计的复杂度,且避免了加锁设计带来的额外性能损耗。
    • Redis 命令执行的有序性衍生出了许多的应用场景,显然 Redis 并不想放弃这个领域的市场。

processPendingCommandsAndResetClient 的实现也很简单:

int processPendingCommandsAndResetClient(client *c) {
    if (c->flags & CLIENT_PENDING_COMMAND) {
        c->flags &= ~CLIENT_PENDING_COMMAND;
        if (processCommandAndResetClient(c) == C_ERR) {
            return C_ERR;
        }
    }
    return C_OK;
}

如果客户端状态为 CLIENT_PENDING_COMMAND,则表示数据已经解析完毕,进而调用 processCommandAndResetClient() 处理。

I/O 线程核心逻辑

IOThreadMain 是创建 I/O 线程时指定的执行函数,该函数负责处理分配给当前线程的客户端请求:

void *IOThreadMain(void *myid) {
    long id = (unsigned long)myid;
    char thdname[16];
    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    // [1] 为当前线程绑定 CPU,减少线程切换
    redisSetCpuAffinity(server.server_cpulist);
    makeThreadKillable();

    while(1) {
        // [2] 自旋获取当前线程的待处理客户端数量
        for (int j = 0; j < 1000000; j++) {
            if (getIOPendingCount(id) != 0) break;
        }
        // [3] 如果不存在待处理的客户端,则尝试一次加/解锁
        if (getIOPendingCount(id) == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }
        serverAssert(getIOPendingCount(id) != 0);

        // [4] 遍历 io_threads_list,
        //   根据 io_threads_op 执行 I/O 读写操作
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        // [5] 清空 io_threads_list,
        //   并重置 io_threads_pending[id] 为 0
        listEmpty(io_threads_list[id]);
        setIOPendingCount(id, 0);
    }
}
  • [1]:尽可能将当前线程绑定到用户配置的 CPU 列表上,减少不必要的线程切换;
  • [2]:不断检查 io_threads_pending[id],如果不为 0 则开始处理任务,否则不断自旋;
  • [3]:如果自旋结束依旧没有数据到达,则对 io_threads_mutex[id] 互斥量完成一次加、解锁操作,然后继续下一轮自旋。之所以这里要做一次锁操作,是为了让主线程能够停用 I/O 线程;
  • [4]:遍历 io_threads_list[id] 里的客户端,根据 io_threads_op 执行 I/O 读或者写操作;
  • [5]:清空 io_threads_list[id] 列表,并重置 io_threads_pending[id] 为 0。

多线程场景下的响应过程

《Redis 命令处理过程》一文中讲过,Redis 会将响应数据写到 client 的回复缓冲区,并将客户端放到 server.clients_pending_write,最后在 handleClientsWithPendingWrites() 函数中将客户端回复缓冲区内的数据写入 TCP 发送缓冲区,完成发送。

handleClientsWithPendingWrites() 函数是在 beforeSleep() 函数中由 handleClientsWithPendingWritesUsingThreads() 调用的,handleClientsWithPendingWritesUsingThreads() 的实现如下:

int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0;
    // [1] 如果禁用了 I/O 线程,或需要停止 I/O 线程
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        // 则单线程处理 pending 链表上的客户端
        return handleClientsWithPendingWrites();
    }
    // [2] 如果要启用 I/O 线程,但 I/O 线程未启动,则在这里启动
    if (!server.io_threads_active) startThreadedIO();
    // [3] 将所有客户端分配到 I/O 线程里处理
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {...}
    ...
    return processed;
}
  • [1]:该函数首先判断 I/O 线程是否启用,如果 I/O 线程被禁用,或者需要停用,则调用 handleClientsWithPendingWrites() 由主线程自己处理;
  • [2]:如果需要使用 I/O 线程,但当前 I/O 线程已经被停用,则在这里重新启动;
  • [3]:将 server.clients_pending_write 上的所有客户端分配到 I/O 线程里处理,具体流程与上文的请求解析过程一样,这里不再赘述。

I/O 线程状态的切换

I/O 线程的停用

I/O 线程初始化 部分我们了解过,Redis 启动时会先锁定所有的互斥量,也就是默认情况下 I/O 线程是不启用的,这是因为 IOThreadMain 会不断自旋,造成 CPU 资源的浪费。只有满足以下条件时,才会启用 I/O 线程:

  1. io-threads 配置的线程数量大于 1;
  2. 待处理的客户端数量大于等于 I/O 线程数量的两倍。

判断方法 stopThreadedIOIfNeeded() 实现如下:

int stopThreadedIOIfNeeded(void) {
    int pending = listLength(server.clients_pending_write);
    // [1] 如果 io 线程数配置为 1,只使用主线程
    if (server.io_threads_num == 1) return 1;
    // [2] 如果待处理客户端数量小于 io_threads_num*2,停用 io 线程
    if (pending < (server.io_threads_num*2)) {
        if (server.io_threads_active) stopThreadedIO();
        return 1;
    } else {
        return 0;
    }
}

其中,停用 I/O 线程的函数 stopThreadedIO() 的实现如下:

void stopThreadedIO(void) {
    // [1] 先处理完可能剩余的客户端读请求
    handleClientsWithPendingReadsUsingThreads();
    serverAssert(server.io_threads_active == 1);
    // [2] 阻塞所有 I/O 线程
    for (int j = 1; j < server.io_threads_num; j++)
        pthread_mutex_lock(&io_threads_mutex[j]);
    server.io_threads_active = 0; // 关闭 I/O 线程标志
}
  • [1]:首先处理 server.clients_pending_read 上可能还存在的等待解析的客户端;
  • [2]:通过互斥量,将所有 I/O 线程阻塞。在上文中的 IOThreadMain 函数的第三步中,I/O 线程会尝试加/解锁一次,如果主线程成功获取到了锁,那么就可以将 I/O 线程阻塞。

I/O 线程的启用

既然待处理的客户端数量较少时会停用 I/O 线程,那么当请求量增加时,也会重新启用 I/O 线程。

判断的地方仍然位于上文的 handleClientsWithPendingWritesUsingThreads() 函数中。如果 I/O 线程已经被停用,并且 stopThreadedIOIfNeeded() 返回 0,这意味着请求量已经足够大,因此接下来会调用 startThreadedIO() 来启动线程。

至此,Redis 的网络 I/O 多线程处理机制就介绍完了。