如无特殊说明,本文涉及的源码均位于 connection.c 与 connection.h 中


Redis 服务启动后,便可接受来自客户端的连接,本文将详细介绍 Redis Server 如何管理这些来自客户端的连接。

数据结构

Redis Server 中负责存储客户端连接的结构体是 connection.h 中的 connection

struct connection {
    ConnectionType *type;
    ConnectionState state;
    short int flags;
    short int refs;
    int last_errno;
    void *private_data;
    ConnectionCallbackFunc conn_handler;
    ConnectionCallbackFunc write_handler;
    ConnectionCallbackFunc read_handler;
    int fd;
};
  • type:用于操作连接通道的函数集:
        typedef struct ConnectionType {
            void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
            int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
            int (*write)(struct connection *conn, const void *data, size_t data_len);
            int (*read)(struct connection *conn, void *buf, size_t buf_len);
            void (*close)(struct connection *conn);
            int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
            ...
        } ConnectionType;
        
  • state:连接状态定义,包括以下几种:
        typedef enum {
            CONN_STATE_NONE = 0,
            CONN_STATE_CONNECTING,
            CONN_STATE_ACCEPTING,
            CONN_STATE_CONNECTED,
            CONN_STATE_CLOSED,
            CONN_STATE_ERROR
        } ConnectionState;
        
  • last_errno:该连接的最新错误信息;
  • private_data:存放附加数据;
  • (conn|write|read)_handler:连接、读取、写入操作的回调函数;
  • fd:连接的文件描述符。

当 Redis 服务器收到新的连接请求时,它会在连接事件处理器 acceptTcpHandler 中调用 connCreateAcceptedSocket() 函数,以创建一个 connection 结构体实例。这些 connectiontype 属性都会被设置为 connection.c/CT_Socket,它内部注册了一大批用于 I/O 操作的函数指针:

ConnectionType CT_Socket = {
    .ae_handler = connSocketEventHandler,
    .close = connSocketClose,
    .write = connSocketWrite,
    .read = connSocketRead,
    .accept = connSocketAccept,
    .connect = connSocketConnect,
    ...
};

除了网络连接信息,Redis 服务还会保存这个连接所属的 Client 信息,客户端信息由 server.h/client 结构体保存:

typedef struct client {
    ...
    sds querybuf; // 查询缓冲,存放客户端请求数据
    size_t qb_pos; // 查询缓冲最新读取位置
    sds pending_querybuf;
    size_t querybuf_peak; // 客户端单次请求数据量峰值
    ...
    int reqtype; // 请求数据的协议类型
    int multibulklen; // 当前解析的命令请求中尚未处理的命令参数数量
    long bulklen; // 当前读取命令参数长度
    list *reply; // 链表回复缓冲区
    unsigned long long reply_bytes; // 链表回复缓冲区字节数
    ...
    uint64_t flags; //客户端标志
    ...
    int bufpos; // 固定回复缓冲区最新操作位置
    char buf[PROTO_REPLY_CHUNK_BYTES]; // 固定回复缓冲区
} client;

了解了上述结构体以后,接下来我们详细介绍下客户端网络连接的处理过程。

客户端连接事件处理

Redis 服务启动后,会为 AE_READABLE 文件事件注册连接事件处理器 acceptTcpHandleracceptTcpHandler 函数会处理来自客户端的连接,创建数据交换套接字,并为该套接字注册后续的事件回调函数:

#define MAX_ACCEPTS_PER_CALL 1000

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);
    // [1] 控制每次处理的请求数量
    while(max--) {
        // [2] 接受连接
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        // [3] fork 后台进程时,不要继承该连接
        anetCloexec(cfd);
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        // [4] connCreateAcceptedSocket 创建 connection 结构体
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}
  • [1]:每次事件循环最多接受 MAX_ACCEPTS_PER_CALL (1000) 个连接请求,防止进程被长时间阻塞;
  • [2]:调用 C 语言的 accept 函数接受新的客户端连接,并返回数据交换套接字文件描述符。
  • [3]:将 Socket 文件描述符的标志位设置为非继承,这样当主进程 fork 子进程时,会自动关闭子进程的文件描述符;

    Redis 的后台进程无需与用户进行交互,比如 RDB 生成和 Rehash 等任务,它们只需拥有当前内存数据库的快照即可。因此,关闭子进程的套接字连接可以有效地避免发生竞争。

  • [4]connCreateAcceptedSocket 函数创建并返回 connection 结构体。

acceptCommonHandler 函数拿到 connCreateAcceptedSocket 创建的 connection 后,再完成创建 client 结构体、为 Socket 注册文件事件处理器等后续逻辑:

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    client *c;
    char conninfo[100];
    UNUSED(ip);
    ...
    // [1] 如果 client 数量加上 Cluster 数量超过限制,返回错误
    if (listLength(server.clients) + getClusterConnectionsCount()
        >= server.maxclients) {
        ... 
        server.stat_rejected_conn++;
        connClose(conn);
        return;
    }
    // [2] 创建 client 结构体
    if ((c = createClient(conn)) == NULL) {
        ...
        connClose(conn);
        return;
    }
    c->flags |= flags;
    // [3] 设置 clientAcceptHandler 回调,并立刻调用
    if (connAccept(conn, clientAcceptHandler) == C_ERR) {
        ...
        return;
    }
}
  • [1]:如果 client 数量加上 Cluster 数量超过限制,返回错误信息;
  • [2]:创建 client 结构体;
  • [3]:将 clientAcceptHandler 设置为 accept 回调,不过由于执行 acceptCommonHandler 方法时已经处于 accept 过程中了,所以这个回调会被立刻调用。

创建 client 结构体的方法为 createClient

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));
    // [1] 如果 connection 不为 null,则创建普通客户端
    if (conn) {
        // [2] 设定非阻塞模式
        connNonBlock(conn);
        connEnableTcpNoDelay(conn);
        // [3] 启用 keepalive
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
        // [4] 设置可读事件回调
        connSetReadHandler(conn, readQueryFromClient);
        connSetPrivateData(conn, c);
    }
    // [5] 选择 0 号数据库,并初始化 client 属性
    selectDb(c,0);
    uint64_t client_id;
    atomicGetIncr(server.next_client_id, client_id, 1);
    c->id = client_id;
    c->resp = 2;
    c->co ...
    // [6] 将 client 结构体添加到 redisServer 结构体
    if (conn) linkClient(c);
    // [7] 初始化 client 事务上下文
    initClientMultiState(c);
    return c;
}
  • [1]:如果 connection 不为 null,则为连接创建普通客户端,否则创建伪客户端:

    通常情况下,Redis 命令必须在客户端上下文中执行。然而,当命令需要在其他上下文环境中执行时,就需要创建一个伪客户端来进行操作,典型的例子就是 Lua 脚本的执行上下文。

  • [2][3][4]:如果是真实的客户端连接,则完成以下调用:

    • [2]:将连接设置为非阻塞模式;
    • [3]:启用 keepalive 保持连接活跃;
    • [4]:为 socket 注册 AE_READABLE 事件的处理函数 networking.c/readQueryFromClient()connSetReadHandler 内部会将 readQueryFromClient() 注册到上文介绍的 ConnectionType.read 函数指针上,后续当有数据到达时,便会调用 readQueryFromClient() 处理来自客户端的请求。
  • [5]:选择 0 号数据库,并初始化 client 属性;

  • [6]:将 client 结构体添加到 redisServer 结构体;

  • [7]:初始化 client 事务上下文。

与客户端断开连接

当客户端发送 quit 命令,或 socket 连接断开 时,服务器会调用 networking.c/freeClientAsync 函数将 client 添加到 server.clients_to_close 链表。而后在 AE Loop 执行 beforeSleep() 钩子函数时,再调用 freeClientsInAsyncFreeQueue() 函数遍历这个链表,将链上的节点交给 freeClient 函数关闭。freeClient 主要完成以下工作:

  1. 如果断开连接的客户端是集群中的 Master 节点,则缓存这个客户端的信息,并将主从状态转换为待连接状态以便后续与主节点重新建立连接;
  2. 否则释放查询缓存等内存空间,并取消客户端监听的 watcher,取消客户端订阅的所有 topic;
  3. 如果断开连接的客户端是集群中的从节点,则将其从 server.monitorsserver.slaves 中删除,并减少 server.repl_good_slaves_count 的计数;
  4. 最终调用 unlinkClient 函数,关闭 Socket 连接。

另外,在时间事件处理器 serverCron 中也会调用 clientsCron 关闭超时未发送命令的客户端:

#define CLIENTS_CRON_MIN_ITERATIONS 5
void clientsCron(void) {
    // [1] 每次处理 iterations 个客户端
    int numclients = listLength(server.clients);
    int iterations = numclients/server.hz;
    mstime_t now = mstime();
    if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
        iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
                     numclients : CLIENTS_CRON_MIN_ITERATIONS;
    int curr_peak_mem_usage_slot = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS;
    int zeroidx = (curr_peak_mem_usage_slot+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS;
    ClientsPeakMemInput[zeroidx] = 0;
    ClientsPeakMemOutput[zeroidx] = 0;
    while(listLength(server.clients) && iterations--) {
        client *c;
        listNode *head;
        listRotateTailToHead(server.clients);
        head = listFirst(server.clients);
        c = listNodeValue(head);
        // [2] 关闭超时的客户端连接
        if (clientsCronHandleTimeout(c,now)) continue;
        // 压缩客户端的查询缓存
        if (clientsCronResizeQueryBuffer(c)) continue;
        if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue;
        // 统计几秒内使用内存最多的客户端
        if (clientsCronTrackClientsMemUsage(c)) continue;
        if (closeClientOnOutputBufferLimitReached(c, 0)) continue;
    }
}
  • [1]:每次处理 numclients/server.hz 个客户端。由于 serverCron 每秒调用 server.hz 次,所以每秒都会将所有客户端处理一轮;
  • [2]:关闭超时 server.maxidletime 时间内没有发送请求的客户端连接;