如无特殊说明,本文涉及的源码均位于 networking.c 与 networking.h 中,且为了节省篇幅,次要代码将被移除。


Redis Server 的核心是一个事件驱动系统,它持续监听与客户端的 TCP 连接,一旦有数据可读,就会触发事件处理过程,从数据流中读取来自客户端的请求。然而,TCP 传输的特点是会出现 “拆包” 和 “粘包” 现象,因此 Redis Server 必须正确地从流中解析请求。解析完毕后再执行该请求,并将执行结果将信息返回给客户端。

接下来我们详细介绍这三个过程的源码实现。

Redis Client 的请求类型

Redis 客户端有两种请求类型:PROTO_REQ_INLINEPROTO_REQ_MULTIBULK

  1. PROTO_REQ_INLINE 内联请求:通常为 telnet 等测试工具的请求类型,请求直接以普通文本形式发送。

    例如:通过 telnet 发送 set key 8 命令时的 tcpdump 数据为 7365 7420 6b65 7920 380d 0a,即 set key 8\r\n

  2. PROTO_REQ_MULTIBULK 多批量请求:表示 Redis 官方客户端的请求类型,请求会通过 RESP 协议进行封装。

    例如:通过 redis-cli 发送 set key 8 请求时的 tcpdump 数据为 2a33 0d0a 2433 0d0a 7365 740d 0a24 330d 0a6b 6579 0d0a 2431 0d0a 380d 0a,即 *3\r\n$3\r\nset\r\n$3\r\nkey\r\n$1\r\n8\r\n

    PROTO_REQ_MULTIBULK 类型数据包的开头都为 * 字符,这也是 Redis Server 判断请求类型的依据。

请求的解析过程

《Redis 网络连接管理》 一文中,我们介绍过 readQueryFromClient() 函数负责读取来自客户端的数据流,源码如下:

void readQueryFromClient(connection *conn) {
    // [1] 从 connection 中获取 client 数据结构
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;
    // [2] 如果开启了 I/0 线程,则交给 I/0 线程去处理,这里直接返回
    if (postponeClientRead(c)) return;
    /* 原子更增加服务器总读取操作的计数器 */
    atomicIncr(server.stat_total_reads_processed, 1);
    // [3] 设置最大读取字节数
    readlen = PROTO_IOBUF_LEN;
    // [4] 如果是多批量请求,且发生了拆包,这里做特殊处理
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        // bulklen 为指令总长度,remaining 为剩余未读长度 + 2(\r\n)
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
        // 如果剩余未读长度小于 readlen,则更新 readlen 为 remaining
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }
    // [5] 计算当前查询缓冲区已被占用的长度
    qblen = sdslen(c->querybuf);
    // 并更新 querybuf_peak 变量以跟踪查询缓冲区的最大长度
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    // 确保查询缓冲区有足够的空间来存放数据,如果不够则扩容
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    // [6] 读取来自 socket 中的数据流
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    if (nread == -1) {
        // 异常处理...
    } else if (nread == 0) {
        // 断开连接 ...
        return;
    } else if (c->flags & CLIENT_MASTER) { // 如果 client 是集群主节点
        // 将查询缓冲区的数据追加到主服务器的挂起查询缓冲区
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }

    sdsIncrLen(c->querybuf,nread); //更新 sds 数据长度
    c->lastinteraction = server.unixtime;
    // 增加这个 master 客户端的读取复制偏移量
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    atomicIncr(server.stat_net_input_bytes, nread);
    //[7]查询缓冲长度超了,则断开连接
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        // free...
        return;
    }

    // [8] 处理已经读取到缓冲区中的数据
    processInputBuffer(c);
}
  • [1]:从 connection 中获取 client 数据结构;

  • [2]:如果开启了 I/O 线程,则推迟处理;

    postponeClientRead() 会在启用了 I/O 线程的情况下,将当前 client 添加到 server.clients_pending_read 链表,同时将 client.flagsCLIENT_PENDING_READ 标志置 1,等待 I/O 线程去处理。

  • [3]:将 readlen 设置为请求最大字节数,默认为 16KB;

  • [4]:如果 c->reqtype == PROTO_REQ_MULTIBULK,这表明当前客户端发送的是多批量请求,并且之前未读取完整(即发生了数据拆包,这是因为默认情况下 c->reqtype 的值为 0)。在这种情况下,我们需要根据剩余未读取的数据大小来更新 readlen,以确保正确处理数据拆包的边界。

  • [5]: 计算当前查询缓冲区已被占用的长度,并更新 querybuf_peak 变量以跟踪查询缓冲区的最大长度,如果缓冲区的 SDS 字符串剩余空间不足,则对其进行扩容;

  • [6]:将 socket 中的数据同步读取到查询缓冲区,并更新 SDS 中的数据长度标记。如果这次请求的客户端是集群中的 master 节点,还会将查询缓冲区的数据追加到主服务器的挂起查询缓冲区,同时更新这个 master 客户端的读取复制偏移量;

  • [7]:如果查询缓冲区的长度超过了服务器的最大查询缓冲区长度 (server.client_max_querybuf_len),则关闭客户端连接;

  • [8]:处理已经读取到缓冲区中的数据。

上述函数读取到查询缓冲区中的数据可能是内联请求,也可能是多批量请求,如果是多批量请求,那么本次调用读取到的请求很可能并不完整。这些情况都需要交给 processInputBuffer() 函数处理:

void processInputBuffer(client *c) {
    // [1] 循环,直到处理完读取缓冲区中的数据
    while(c->qb_pos < sdslen(c->querybuf)) {
        // [2] 判断是否要终止处理:
        // 如果客户端正处于某种中止状态,跳出
        if (c->flags & CLIENT_BLOCKED) break;
        // 如果当前客户端的请求数据已经读取完毕,则跳出循环
        if (c->flags & CLIENT_PENDING_COMMAND) break;
        // 如果收到请求的服务器处于 Lua 脚本执行状态,
        //  且发送请求的客户端是 Master 节点,也中止处理,用于累积复制流
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
        // 如果客户端包含以下两个状态,表示需要关闭连接,也立刻终止处理
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        // [3] 如果是第一次处理这个请求,则设定请求类型
        if (!c->reqtype) {
            if (c->querybuf[c->qb_pos] == '*') {
                // 如果请求以 * 开头,则表示 PROTO REQ MULTIBULK
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }
        // [4] 根据请求类型,调用不同的函数处理
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
            if (server.gopher_enabled && !server.io_threads_do_reads &&
                ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
                  c->argc == 0))
            {
                processGopherRequest(c);
                resetClient(c);
                c->flags |= CLIENT_CLOSE_AFTER_REPLY;
                break;
            }
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        // 到这里,请求就读取完毕了
        if (c->argc == 0) {
            // [5] 如果命令未携带参数,则重置客户端
            resetClient(c);
        } else { // 如果有参数
            // [6] 如果启用了 I/0 线程,则跳出循环先不处理
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }
            // 否则先执行命令,再重置客户端 (即返回响应)
            if (processCommandAndResetClient(c) == C_ERR) {
                return;
            }
        }
    }

    // [8] 命令执行成功,
        // 删除查询缓冲中已经处理过的请求,并初始化 qb_pos
    if (c->qb_pos) {
        sdsrange(c->querybuf,c->qb_pos,-1);
        c->qb_pos = 0;
    }
}
  • [1]client.qb_pos 为查询缓冲区最新读取位置,该位置小于查询缓冲区内容长度时,循环继续执行;
  • [2]:当客户端处于特殊状态时,跳出循环;
  • [3]:当该函数第一次处理这个命令的数据流时,根据数据流中的第一个字符设定请求类型:
    • Redis 的官方客户端会在发送的请求前附加 *{lineNumber}\r\n 字符串。如果服务端发现请求中的第一个字符为 *,则将这个请求的类型设置为 PROTO_REQ_MULTIBULK
    • 如果用户通过 telnet 等工具测试 Redis 服务,服务器将会收到普通字符串,进而将其类型设置为 PROTO_REQ_INLINE
  • [4]:根据请求类型,调用不同的函数处理;
  • [5]:到这里,客户端请求就解析完毕了。如果命令没有携带参数,则调用 resetClient() 直接重置客户端。该步骤主要用于主从同步场景,主从同步时会通过发送空行请求来重置客户端;
  • [6]:如果命令携带了参数,但是客户端设置了 CLIENT_PENDING_READ 标志,表明服务器启用了 I/O 线程,所以先不处理请求
  • [7]:如果命令携带了参数,且未启用 I/O 线程,则调用 processCommandAndResetClient() 执行命令并返回响应;
  • [8]:命令执行成功后,删除查询缓冲中已经处理过的请求,并初始化 client.qb_pos

MULTIBULK 请求解析流程

当客户端的请求类型为 PROTO_REQ_MULTIBULK 时,调用 processMultibulkBuffer() 函数进行处理:

int processMultibulkBuffer(client *c) {
    char *newline = NULL;
    int ok;
    long long ll;
    // [1] 读取 Multibulk 请求头部的指令行数
    if (c->multibulklen == 0) {
        ...
        newline = strchr(c->querybuf+c->qb_pos,'\r');
        ...
        ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
        ...
        /* 更新 qb_pos:
         *        newline
         *           |
         *           v
         *   *123123\r\n...\r\n...\r\n
         *   ^          ^
         *   |          |
         * querybuf   qb_pos
         */
        c->qb_pos = (newline-c->querybuf)+2;
        ...
        c->multibulklen = ll; // 存储指令行数
        ...
    }
    ...
    // [2] 每次处理一行数据,直到处理完
    while(c->multibulklen) {
  • [1]:如果 c->multibulklen == 0,则说明上一个指令已经解析完毕,这里开始解析新的请求了,所以通过 \r\n 分隔符获取当前请求中参数的数量,赋值给 client.multibulklen,并将 client.qb_pos 的值更新为请求中第一个参数的首地址;
  • [2]:一次处理一个参数,直到解析完;
            // [3] 如果还没读取到参数的长度,则先读取长度
            if (c->bulklen == -1) {
                // [4] newline 指向长度信息后边的 '\r'
                newline = strchr(c->querybuf+c->qb_pos,'\r');
                ...
                // [5] 有个 \n 字符没读取到,跳出继续读取
                if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
                    break;
                // [6] 如果当前行不以 $ 开头,说明是非法请求
                if (c->querybuf[c->qb_pos] != '$') {
                    ...
                    return C_ERR;
                }
                // [7] 读取当前参数的长度信息
                ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
                if (!ok || ll < 0 ||
                    (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) {
                    ...
                    return C_ERR;
                } else if (ll > 16384 && authRequired(c)) {
                    ...
                    return C_ERR;
                }
                // qb_pos 指向当前参数首地址
                c->qb_pos = newline-c->querybuf+2;
                // [8] 优化超大参数
                if (ll >= PROTO_MBULK_BIG_ARG) {
                    if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
                        sdsrange(c->querybuf,c->qb_pos,-1);
                        c->qb_pos = 0;
                        c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-sdslen(c->querybuf));
                    }
                }
                // [9] 更新当前参数的长度
                c->bulklen = ll;
            }
    
  • [3]:RESP 协议会在每个请求参数前添加 “${n}” 前缀,以指定请求参数的长度,例如 $3\r\nset\r\n。因此,当 client.bulklen == 0 时,表示还未读取当前参数的长度,所以在这里读取;
  • [4]:通过 \r\n 获取参数长度,用 newline 指向长度信息后边的 \r
  • [5]:比较长度信息的字符个数与查询缓冲中的剩余有效内容的字符个数,如果当前长度信息字符数大于缓冲区剩余内容长度 - 2,说明还有一个 \n 字符没读取到,因此退出等待下一次 I/O 操作;
  • [6]:如果当前行不以 $ 开头,说明是非法请求,返回错误;
  • [7]:跳过 $ 符号读取当前参数的长度信息,并将 client.qb_pos 指向当前参数首地址;
  • [8]:如果当前参数是一个超大参数,且剩余空间不足了,则清除缓冲区中已经被处理的数据并适当扩容;
  • [9]:将当前参数的长度存入 client.bulklen
            if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
                // [10] 未读取完全,跳出接着读取
                break;
            } else {
                if (c->qb_pos == 0 &&
                    c->bulklen >= PROTO_MBULK_BIG_ARG &&
                    sdslen(c->querybuf) == (size_t)(c->bulklen+2))
                {
                    // [11] 超大参数处理
                    c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
                    c->argv_len_sum += c->bulklen;
                    sdsIncrLen(c->querybuf,-2);
                    c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
                    sdsclear(c->querybuf);
                } else {
                    // [12] 小参数处理
                    c->argv[c->argc++] =
                        createStringObject(c->querybuf+c->qb_pos,c->bulklen);
                    c->argv_len_sum += c->bulklen;
                    c->qb_pos += c->bulklen+2;
                }
                c->bulklen = -1;
                c->multibulklen--;
            }
        }
    
        // [13] multibulklen 降到 0 时,表明请求解析成功
        if (c->multibulklen == 0) return C_OK;
        return C_ERR;
    }
    
  • [10]:如果查询缓冲的未处理字符串长度,小于当前参数的长度,说明当前参数未读完整,退出等待下一次 I/O 操作;
  • [11]:如果是超大参数,经过上边的处理,此时缓冲区中只有当前参数自己,所以直接使用查询缓冲区中的内容创建一个 redisObject 作为参数(redisObject 指针直接指向该地址),同时新申请一块内存作为新的查询缓冲区;

    创建的 redisObject 参数顺次追加到 client.argv[] 数组中

  • [12]:如果是小参数,则调用 createStringObject() 复制缓冲区中的参数到新对象空间;

    创建的 redisObject 小参数也顺次追加到 client.argv[] 数组中

  • [13]:如果请求解析成功,返回 C_OK,否则返回 C_ERR 等下次 I/O 时接着读取请求的剩余部分。

INLINE 请求解析流程

当客户端的请求类型为 PROTO_REQ_INLINE 时,调用 processInlineBuffer() 函数解析命令,这里就不展开了。总之就是在没有 RESP 协议来帮助解析的场景下,需要通过分隔符解析字符串中的参数。此外,内联请求场景还支持处理 Gopher 协议。

命令的执行过程

上文我们讲过,processInputBuffer() 函数读取到完整的请求后,会调用 processCommandAndResetClient() 执行命令并重置客户端,此时请求参数已经全部存储在 client.argv[] 数组中。在该数组中,argv[0] 代表指令名称,例如 GET、SET 等,而 argv[1]argv[2] 等则表示请求的参数列表,比如 key、value 等。processCommandAndResetClient() 的源码如下:

int processCommandAndResetClient(client *c) {
    int deadclient = 0;
    client *old_client = server.current_client;
    // 先将当前客户端设为 current_client
    server.current_client = c;
    // 执行命令
    if (processCommand(c) == C_OK) {
        // 执行成功后,做一些收尾工作
        commandProcessed(c);
    }
    if (server.current_client == NULL) deadclient = 1;
    // current_client 恢复至原客户端
    server.current_client = old_client;
    return deadclient ? C_ERR : C_OK;
}

resetCommand() 函数

processCommandAndResetClient() 函数的核心是 processCommand() 函数:

int processCommand(client *c) {
    // [1] Mudule Filter
    moduleCallCommandFilters(c);
    // [2] 优先处理退出命令
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }
    // [3] 根据指令名称,查找指令格式
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    ...
  • [1]:触发 Mudule Filter;
  • [2]:优先处理退出命令;
  • [3]:根据用户请求中的指令名称,查找 redisCommand

    Redis 通过 server.h/redisCommand 结构体存储命令的格式:

          struct redisCommand {
              char *name; // 指令名称
              redisCommandProc *proc; // 指令处理函数指针
              int arity; // 请求参数的数量,包括指令名称本身
              ...
              int id; // 请求 id
          };
        > 

    该结构体作为 server.c/redisCommandTable[] 数组的元素存在,Redis Server 在启动时会调用 initServerConfig() 函数加载 redisCommandTable,并将其存储 redisServer.commands 命令字典中。redisCommandTable 结构体格式如下:

          struct redisCommand redisCommandTable[] = {
              ...
              {"get",getCommand,2,
              "read-only fast @string",
              0,NULL,1,1,1,0,0,0},
              ...
              {"set",setCommand,-3,
              "write use-memory @string",
              0,NULL,1,1,1,0,0,0},
              ...
          }
          
    它内部定义了 Redis 支持的所有指令的格式。
    // [4] 执行以下逻辑: 
    // 获取请求类型
    int is_read_command = (c->cmd->flags & CMD_READONLY) ||
                           (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_READONLY));
    ...
    // 如果启用了用户验证,则在这里完成验证
    if (authRequired(c)) {...}
    // 权鉴
    int acl_errpos;
    int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
    // cluster 模式特别处理
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0 &&
          c->cmd->proc != execCommand)) {...}
    // 如果服务器配置了内存最大限制,则按需淘汰数据
    if (server.maxmemory && !server.lua_timedout) {...}
    // tracking 机制要求记录查询过的 key
    if (server.tracking_clients) trackingLimitUsedSlots();
    // 如果当前服务是 master,当持久化异常了或者正常的从节点数量小于阈值,拒绝执行请求
    int deny_write_type = writeCommandsDeniedByDiskError();
    ...
    // 如果当前服务是 slave,且客户端并非 master,则拒绝写命令
    if (server.masterhost && server.repl_slave_ro &&
        !(c->flags & CLIENT_MASTER) &&
        is_write_command) {...}
    // 如果客户端处于 pub/sub 模式,而且使用的是 RESP2 协议,
    //   则只允许执行 ping、sub、unsub、psub、punsub、reset 这几种命令
    //   其他类型命令都拒绝执行
    if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
        c->cmd->proc != pingCommand && ... {...}
    // 如果当前服务是 slave,且与 master 断连,则拒绝执行 is_denystale_command 类型命令
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
        server.repl_serve_stale_data == 0 &&
        is_denystale_command) {...}
    // 服务器正在加载数据期间,只允许执行执行特定命令
    if (server.loading && is_denyloading_command) {...}
    // lua 脚本执行超时期间,只有特定命令能执行
    if (server.lua_timedout &&
          c->cmd->proc != authCommand && ... {...}
    // 阻止 slave 发送访问密钥空间的命令
    if ((c->flags & CLIENT_SLAVE) && (is_may_replicate_command || is_write_command || is_read_command)) {...}
    // 如果服务器已暂停,则将非 slave 客户端阻塞
    if (!(c->flags & CLIENT_SLAVE) && 
        ((server.client_pause_type == CLIENT_PAUSE_ALL) ||
        (server.client_pause_type == CLIENT_PAUSE_WRITE && is_may_replicate_command)))
    {...}
  • [4]:该部分内容主要涉及 ACL、Tracking、数据淘汰、特殊场景过滤等内容。
    /* [5] Exec the command */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&
        c->cmd->proc != resetCommand)
    {
        // 如果当前 client 处于事务上下文,将非 exec、discard、watch、reset 命令放入事务队列
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        // 非事务上下文,或者是 exec、discard、watch、reset 命令,则直接执行
        call(c,CMD_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    
    return C_OK;
}
  • [5]:到了这一步,开始真正地执行命令:如果当前 client 处于事务上下文,且命令处理函数不是 execCommanddiscardCommandmultiCommandresetCommand 这四种,则将命令放入事务队列;否则直接调用 call() 函数运行命令。

call() 函数

server.c/call() 函数的源码如下:

void call(client *c, int flags) {
    ...
    // [1] 清空传播控制标志
    c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    ...
    // [2] 调用命令处理函数 proc,执行命令处理逻辑
    c->cmd->proc(c);
    ...
    // [3] 根据状态设定 propagate_flags 标志,再根据 propagate_flags 将命令记录到 AOF 文件或发送到从服务器
    if (flags & CMD_CALL_PROPAGATE &&
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
    {
        int propagate_flags = PROPAGATE_NONE;
        // 产生脏数据,即修改了数据库
        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
        ...
        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    }
  • [1]:命令执行前,重置传播控制标志 (CLIENT_FORCE_AOFCLIENT_FORCE_REPLCLIENT_PREVENT_PROP),这些标志应该只在命令执行过程中开启,因为 call 可能会递归调用,所以执行前先清除这些标志;
  • [2]:调用命令处理函数 proc,执行命令处理逻辑。命令执行完毕后,执行以下逻辑:
    1. 如果客户端的标记包含 CLIENT_CLOSE_AFTER_COMMAND,则将其修改为 CLIENT_CLOSE_AFTER_REPLY,以便尽快关闭客户端;
    2. 如果当前正在加载数据,且当前命令执行的是 lua 脚本,则清除慢日志,清除命令统计,即 lua 命令不录入这两个项目中;
    3. 如果当前客户端是 lua 的伪客户端,则将 CLIENT_FORCE_REPLCLIENT_FORCE_AOF 标志转移到真实客户端里;

      在 lua 脚本中调用 redis.call() 函数,Redis 会构建伪客户端调用 call() 函数,并将真实客户端 client 记录到 server.lua_caller 中。这样一来命令执行过程中打开的 CLIENT_FORCE_REPLCLIENT_FORCE_AOF 都是暂存在伪客户端里的,所以需要转移这些标志。

    4. 记录慢日志,并统计命令信息。
  • [3]:根据状态设定 propagate_flags 标志,再根据 propagate_flags 将命令记录到 AOF 文件或发送到从服务器。propagate_flags 标志根据以下条件生成:
    1. 如果执行命令导致数据被修改,则设置 PROPAGATE_AOFPROPAGATE_REPL 标志;
    2. 如果 client 被开启了 CLIENT_FORCE_REPL,则设置 PROPAGATE_REPL 标志;
    3. 如果 client 被开启了 CLIENT_FORCE_AOF,则设置 PROPAGATE_AOF 标志;
    4. 如果 client 被开启了 CLIENT_PREVENT_REPL_PROP 且未开启 CMD_CALL_PROPAGATE_REPL,则清除 PROPAGATE_REPL 标志;
    5. 如果 client 被开启了 CLIENT_PREVENT_AOF_PROP 且未开启 CMD_CALL_PROPAGATE_AOF,则清除 PROPAGATE_AOF 标志;
    // [4] 补偿先前被去清空的传播控制标志
    c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    c->flags |= client_old_flags &
        (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    // [5] 记录 also_propagate 中存放的额外传播控制命令
    if (server.also_propagate.numops) {...}
    server.also_propagate = prev_also_propagate;
    ...
    // [6] 如果是查询类型的命令,那么记忆该命令,后续当 key 变化时通知客户端(Tracking 机制)
     if (c->cmd->flags & CMD_READONLY) {
        client *caller = (c->flags 
            & CLIENT_LUA && server.lua_caller) 
            ? server.lua_caller : c;
        if (caller->flags & CLIENT_TRACKING &&
            !(caller->flags & CLIENT_TRACKING_BCAST))
        {
            // 存储当前请求的所有 keys
            trackingRememberKeys(caller);
        }
     }
     // clean
     ...
}
  • [4]:命令执行前会清空 client 中的传播控制标志,如果 client.flag 中本来存在这些标志,那么这里需要恢复这些的标志;
  • [5]server.also_propagate 中存放了一系列需要额外传播的命令,这里将它们记录到 AOF 或者复制到从服务器;
  • [6]:Redis 6 新增的 Tracking 机制:如果执行的是查询请求,那么 Redis 会存储该命令中的 keys,后续当这些 key 变化时,需要通知客户端。

命令执行完毕后,最终由 commandProcessed() 处理后续逻辑:

void commandProcessed(client *c) {
    if (c->flags & CLIENT_BLOCKED) return;
    // [1] 重置客户端
    resetClient(c);
    long long prev_offset = c->reploff;
    // [2] 更新已同步命令的偏移量
    if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
        c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
    }
    // [3] 如果客户端是 master,则将这个命令复制到当前节点的从节点
    if (c->flags & CLIENT_MASTER) {
        long long applied = c->reploff - prev_offset;
        if (applied) {
            replicationFeedSlavesFromMasterStream(server.slaves,
                    c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf,applied,-1);
        }
    }
}
  • [1]:重置客户端,该操作不会清空 client 的回复缓冲区(下文介绍);
  • [2]:如果客户端是集群中的 master 节点,且不处于事务上下文,则更新已同步命令的偏移量 client.reploff
  • [3]:如果客户端是集群的 master 节点,则调用 replicationFeedSlavesFromMasterStream 将该命令复制到当前从节点的下级从节点上。

返回响应的过程

client 中定义了两个回复缓冲区,用于缓存返回给客户端的响应数据:

  • char buf[PROTO_REPLY_CHUNK_BYTES]:字符数组,大小为 16KB,通过 client.bufpos 属性记录最新的写入位置;
  • list *reply:SDS 字符串链表,当回复内容较长时使用。

networking.c 中提供了 addReply()addReplySds()addReplyProto() 等一系列函数,用于将响应数据写入缓冲区,这里我们以 addReplySds() 函数为例讲解,其它的流程均与之类似:

void addReplySds(client *c, sds s) {
    // [1] 如果客户端允许响应回复,则将其加到阻塞队列
    if (prepareClientToWrite(c) != C_OK) {
        /* The caller expects the sds to be free'd. */
        sdsfree(s);
        return;
    }
    // [2] 先尝试将其写入 client.buf
    if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK)
        // buf 存不下,则写入 list
        _addReplyProtoToList(c,s,sdslen(s));
    sdsfree(s);
}
  • [1]:先通过 prepareClientToWrite() 判断客户端是否允许回复数据,如果允许则这个 client 追加到 server.clients_pending_write
  • [2]:然后尝试将回复数据写入 client.buf,如果数据太长写不下,则将其写入 client.reply 链表。

addReplyXxx 系列方法会在客户端请求执行期间出错时会被调用,存入错误信息,以及在客户端请求执行成功后调用 resetClient() 重置客户端时被调用,存入成功信息。

待回复内容写入缓冲区,且 client 被加入 server.clients_pending_write 队列上以后,接下来就是将缓冲区中的数据写入 TCP 发送缓冲了。该步骤由 handleClientsWithPendingWrites() 函数完成,该函数会在事件循环 休眠之前beforeSleep() 函数中被调用,或者在 I/O 线程停止之前被调用,实现如下:

int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    // [1] 遍历 clients_pending_write
    int processed = listLength(server.clients_pending_write);
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);
        ...
        // [2] 写入 tcp 发送缓冲
        if (writeToClient(c,0) == C_ERR) continue;

        // [3] 如果回复缓冲中数据太多,则注册回调慢慢发送
        if (clientHasPendingReplies(c)) {
            int ae_barrier = 0;
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
            {
                ae_barrier = 1;
            }
            if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
                freeClientAsync(c);
            }
        }
    }
    return processed;
}
  • [1]:遍历 clients_pending_write 链表,取下节点;
  • [2]:将节点内容写入 TCP 发送缓冲;
  • [3]:如果 client 的回复缓冲区中还有数据,说明无法一次写入完全,则将 sendReplyToClient() 函数注册到事件处理器,监听可写事件。