命令处理过程
本文详细介绍了 Redis 6 内联请求 (INLINE) 与多批次请求 (MULTIBULK) 命令类型, 以及这两种请求的解析、 执行以及响应的详细过程。
如无特殊说明,本文涉及的源码均位于 networking.c 与 networking.h 中,且为了节省篇幅,次要代码将被移除。
Redis Server 的核心是一个事件驱动系统,它持续监听与客户端的 TCP 连接,一旦有数据可读,就会触发事件处理过程,从数据流中读取来自客户端的请求。然而,TCP 传输的特点是会出现 “拆包” 和 “粘包” 现象,因此 Redis Server 必须正确地从流中解析请求。解析完毕后再执行该请求,并将执行结果将信息返回给客户端。
接下来我们详细介绍这三个过程的源码实现。
Redis Client 的请求类型
Redis 客户端有两种请求类型:PROTO_REQ_INLINE
和 PROTO_REQ_MULTIBULK
:
PROTO_REQ_INLINE
内联请求:通常为 telnet 等测试工具的请求类型,请求直接以普通文本形式发送。例如:通过 telnet 发送
set key 8
命令时的 tcpdump 数据为7365 7420 6b65 7920 380d 0a
,即set key 8\r\n
。PROTO_REQ_MULTIBULK
多批量请求:表示 Redis 官方客户端的请求类型,请求会通过 RESP 协议进行封装。例如:通过 redis-cli 发送
set key 8
请求时的 tcpdump 数据为2a33 0d0a 2433 0d0a 7365 740d 0a24 330d 0a6b 6579 0d0a 2431 0d0a 380d 0a
,即*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$1\r\n8\r\n
。PROTO_REQ_MULTIBULK
类型数据包的开头都为*
字符,这也是 Redis Server 判断请求类型的依据。
请求的解析过程
在 《Redis 网络连接管理》 一文中,我们介绍过 readQueryFromClient()
函数负责读取来自客户端的数据流,源码如下:
void readQueryFromClient(connection *conn) {
// [1] 从 connection 中获取 client 数据结构
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
// [2] 如果开启了 I/0 线程,则交给 I/0 线程去处理,这里直接返回
if (postponeClientRead(c)) return;
/* 原子更增加服务器总读取操作的计数器 */
atomicIncr(server.stat_total_reads_processed, 1);
// [3] 设置最大读取字节数
readlen = PROTO_IOBUF_LEN;
// [4] 如果是多批量请求,且发生了拆包,这里做特殊处理
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
// bulklen 为指令总长度,remaining 为剩余未读长度 + 2(\r\n)
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
// 如果剩余未读长度小于 readlen,则更新 readlen 为 remaining
if (remaining > 0 && remaining < readlen) readlen = remaining;
}
// [5] 计算当前查询缓冲区已被占用的长度
qblen = sdslen(c->querybuf);
// 并更新 querybuf_peak 变量以跟踪查询缓冲区的最大长度
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
// 确保查询缓冲区有足够的空间来存放数据,如果不够则扩容
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// [6] 读取来自 socket 中的数据流
nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
// 异常处理...
} else if (nread == 0) {
// 断开连接 ...
return;
} else if (c->flags & CLIENT_MASTER) { // 如果 client 是集群主节点
// 将查询缓冲区的数据追加到主服务器的挂起查询缓冲区
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
sdsIncrLen(c->querybuf,nread); //更新 sds 数据长度
c->lastinteraction = server.unixtime;
// 增加这个 master 客户端的读取复制偏移量
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
atomicIncr(server.stat_net_input_bytes, nread);
//[7]查询缓冲长度超了,则断开连接
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
// free...
return;
}
// [8] 处理已经读取到缓冲区中的数据
processInputBuffer(c);
}
[1]
:从connection
中获取client
数据结构;[2]
:如果开启了 I/O 线程,则推迟处理;postponeClientRead()
会在启用了 I/O 线程的情况下,将当前client
添加到server.clients_pending_read
链表,同时将client.flags
的CLIENT_PENDING_READ
标志置 1,等待 I/O 线程去处理。[3]
:将readlen
设置为请求最大字节数,默认为 16KB;[4]
:如果c->reqtype == PROTO_REQ_MULTIBULK
,这表明当前客户端发送的是多批量请求,并且之前未读取完整(即发生了数据拆包,这是因为默认情况下c->reqtype
的值为 0)。在这种情况下,我们需要根据剩余未读取的数据大小来更新readlen
,以确保正确处理数据拆包的边界。[5]
: 计算当前查询缓冲区已被占用的长度,并更新querybuf_peak
变量以跟踪查询缓冲区的最大长度,如果缓冲区的 SDS 字符串剩余空间不足,则对其进行扩容;[6]
:将 socket 中的数据同步读取到查询缓冲区,并更新 SDS 中的数据长度标记。如果这次请求的客户端是集群中的 master 节点,还会将查询缓冲区的数据追加到主服务器的挂起查询缓冲区,同时更新这个 master 客户端的读取复制偏移量;[7]
:如果查询缓冲区的长度超过了服务器的最大查询缓冲区长度 (server.client_max_querybuf_len
),则关闭客户端连接;[8]
:处理已经读取到缓冲区中的数据。
上述函数读取到查询缓冲区中的数据可能是内联请求,也可能是多批量请求,如果是多批量请求,那么本次调用读取到的请求很可能并不完整。这些情况都需要交给 processInputBuffer()
函数处理:
void processInputBuffer(client *c) {
// [1] 循环,直到处理完读取缓冲区中的数据
while(c->qb_pos < sdslen(c->querybuf)) {
// [2] 判断是否要终止处理:
// 如果客户端正处于某种中止状态,跳出
if (c->flags & CLIENT_BLOCKED) break;
// 如果当前客户端的请求数据已经读取完毕,则跳出循环
if (c->flags & CLIENT_PENDING_COMMAND) break;
// 如果收到请求的服务器处于 Lua 脚本执行状态,
// 且发送请求的客户端是 Master 节点,也中止处理,用于累积复制流
if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
// 如果客户端包含以下两个状态,表示需要关闭连接,也立刻终止处理
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
// [3] 如果是第一次处理这个请求,则设定请求类型
if (!c->reqtype) {
if (c->querybuf[c->qb_pos] == '*') {
// 如果请求以 * 开头,则表示 PROTO REQ MULTIBULK
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
// [4] 根据请求类型,调用不同的函数处理
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
if (server.gopher_enabled && !server.io_threads_do_reads &&
((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
c->argc == 0))
{
processGopherRequest(c);
resetClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
break;
}
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
// 到这里,请求就读取完毕了
if (c->argc == 0) {
// [5] 如果命令未携带参数,则重置客户端
resetClient(c);
} else { // 如果有参数
// [6] 如果启用了 I/0 线程,则跳出循环先不处理
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
// 否则先执行命令,再重置客户端 (即返回响应)
if (processCommandAndResetClient(c) == C_ERR) {
return;
}
}
}
// [8] 命令执行成功,
// 删除查询缓冲中已经处理过的请求,并初始化 qb_pos
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
}
[1]
:client.qb_pos
为查询缓冲区最新读取位置,该位置小于查询缓冲区内容长度时,循环继续执行;[2]
:当客户端处于特殊状态时,跳出循环;[3]
:当该函数第一次处理这个命令的数据流时,根据数据流中的第一个字符设定请求类型:- Redis 的官方客户端会在发送的请求前附加
*{lineNumber}\r\n
字符串。如果服务端发现请求中的第一个字符为*
,则将这个请求的类型设置为PROTO_REQ_MULTIBULK
; - 如果用户通过 telnet 等工具测试 Redis 服务,服务器将会收到普通字符串,进而将其类型设置为
PROTO_REQ_INLINE
。
- Redis 的官方客户端会在发送的请求前附加
[4]
:根据请求类型,调用不同的函数处理;[5]
:到这里,客户端请求就解析完毕了。如果命令没有携带参数,则调用resetClient()
直接重置客户端。该步骤主要用于主从同步场景,主从同步时会通过发送空行请求来重置客户端;[6]
:如果命令携带了参数,但是客户端设置了CLIENT_PENDING_READ
标志,表明服务器启用了 I/O 线程,所以先不处理请求;[7]
:如果命令携带了参数,且未启用 I/O 线程,则调用processCommandAndResetClient()
执行命令并返回响应;[8]
:命令执行成功后,删除查询缓冲中已经处理过的请求,并初始化client.qb_pos
。
MULTIBULK 请求解析流程
当客户端的请求类型为 PROTO_REQ_MULTIBULK
时,调用 processMultibulkBuffer()
函数进行处理:
int processMultibulkBuffer(client *c) {
char *newline = NULL;
int ok;
long long ll;
// [1] 读取 Multibulk 请求头部的指令行数
if (c->multibulklen == 0) {
...
newline = strchr(c->querybuf+c->qb_pos,'\r');
...
ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
...
/* 更新 qb_pos:
* newline
* |
* v
* *123123\r\n...\r\n...\r\n
* ^ ^
* | |
* querybuf qb_pos
*/
c->qb_pos = (newline-c->querybuf)+2;
...
c->multibulklen = ll; // 存储指令行数
...
}
...
// [2] 每次处理一行数据,直到处理完
while(c->multibulklen) {
[1]
:如果c->multibulklen == 0
,则说明上一个指令已经解析完毕,这里开始解析新的请求了,所以通过\r\n
分隔符获取当前请求中参数的数量,赋值给client.multibulklen
,并将client.qb_pos
的值更新为请求中第一个参数的首地址;[2]
:一次处理一个参数,直到解析完;// [3] 如果还没读取到参数的长度,则先读取长度 if (c->bulklen == -1) { // [4] newline 指向长度信息后边的 '\r' newline = strchr(c->querybuf+c->qb_pos,'\r'); ... // [5] 有个 \n 字符没读取到,跳出继续读取 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2)) break; // [6] 如果当前行不以 $ 开头,说明是非法请求 if (c->querybuf[c->qb_pos] != '$') { ... return C_ERR; } // [7] 读取当前参数的长度信息 ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll); if (!ok || ll < 0 || (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) { ... return C_ERR; } else if (ll > 16384 && authRequired(c)) { ... return C_ERR; } // qb_pos 指向当前参数首地址 c->qb_pos = newline-c->querybuf+2; // [8] 优化超大参数 if (ll >= PROTO_MBULK_BIG_ARG) { if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-sdslen(c->querybuf)); } } // [9] 更新当前参数的长度 c->bulklen = ll; }
[3]
:RESP 协议会在每个请求参数前添加 “${n}
” 前缀,以指定请求参数的长度,例如$3\r\nset\r\n
。因此,当client.bulklen == 0
时,表示还未读取当前参数的长度,所以在这里读取;[4]
:通过\r\n
获取参数长度,用newline
指向长度信息后边的\r
;[5]
:比较长度信息的字符个数与查询缓冲中的剩余有效内容的字符个数,如果当前长度信息字符数大于缓冲区剩余内容长度 - 2,说明还有一个\n
字符没读取到,因此退出等待下一次 I/O 操作;[6]
:如果当前行不以$
开头,说明是非法请求,返回错误;[7]
:跳过$
符号读取当前参数的长度信息,并将client.qb_pos
指向当前参数首地址;[8]
:如果当前参数是一个超大参数,且剩余空间不足了,则清除缓冲区中已经被处理的数据并适当扩容;[9]
:将当前参数的长度存入client.bulklen
;if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) { // [10] 未读取完全,跳出接着读取 break; } else { if (c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) { // [11] 超大参数处理 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); c->argv_len_sum += c->bulklen; sdsIncrLen(c->querybuf,-2); c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2); sdsclear(c->querybuf); } else { // [12] 小参数处理 c->argv[c->argc++] = createStringObject(c->querybuf+c->qb_pos,c->bulklen); c->argv_len_sum += c->bulklen; c->qb_pos += c->bulklen+2; } c->bulklen = -1; c->multibulklen--; } } // [13] multibulklen 降到 0 时,表明请求解析成功 if (c->multibulklen == 0) return C_OK; return C_ERR; }
[10]
:如果查询缓冲的未处理字符串长度,小于当前参数的长度,说明当前参数未读完整,退出等待下一次 I/O 操作;[11]
:如果是超大参数,经过上边的处理,此时缓冲区中只有当前参数自己,所以直接使用查询缓冲区中的内容创建一个redisObject
作为参数(redisObject
指针直接指向该地址),同时新申请一块内存作为新的查询缓冲区;创建的
redisObject
参数顺次追加到client.argv[]
数组中[12]
:如果是小参数,则调用createStringObject()
复制缓冲区中的参数到新对象空间;创建的
redisObject
小参数也顺次追加到client.argv[]
数组中[13]
:如果请求解析成功,返回C_OK
,否则返回C_ERR
等下次 I/O 时接着读取请求的剩余部分。
INLINE 请求解析流程
当客户端的请求类型为 PROTO_REQ_INLINE
时,调用 processInlineBuffer()
函数解析命令,这里就不展开了。总之就是在没有 RESP 协议来帮助解析的场景下,需要通过分隔符解析字符串中的参数。此外,内联请求场景还支持处理 Gopher 协议。
命令的执行过程
上文我们讲过,processInputBuffer() 函数读取到完整的请求后,会调用 processCommandAndResetClient()
执行命令并重置客户端,此时请求参数已经全部存储在 client.argv[]
数组中。在该数组中,argv[0]
代表指令名称,例如 GET、SET 等,而 argv[1]
、argv[2]
等则表示请求的参数列表,比如 key、value 等。processCommandAndResetClient()
的源码如下:
int processCommandAndResetClient(client *c) {
int deadclient = 0;
client *old_client = server.current_client;
// 先将当前客户端设为 current_client
server.current_client = c;
// 执行命令
if (processCommand(c) == C_OK) {
// 执行成功后,做一些收尾工作
commandProcessed(c);
}
if (server.current_client == NULL) deadclient = 1;
// current_client 恢复至原客户端
server.current_client = old_client;
return deadclient ? C_ERR : C_OK;
}
resetCommand() 函数
Redis 通过 该结构体作为 在 lua 脚本中调用 命令执行完毕后,最终由 networking.c 中提供了 待回复内容写入缓冲区,且 processCommandAndResetClient()
函数的核心是 processCommand()
函数:int processCommand(client *c) {
// [1] Mudule Filter
moduleCallCommandFilters(c);
// [2] 优先处理退出命令
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
// [3] 根据指令名称,查找指令格式
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
...
[1]
:触发 Mudule Filter;[2]
:优先处理退出命令;[3]
:根据用户请求中的指令名称,查找 redisCommand
:server.h/redisCommand
结构体存储命令的格式: struct redisCommand {
char *name; // 指令名称
redisCommandProc *proc; // 指令处理函数指针
int arity; // 请求参数的数量,包括指令名称本身
...
int id; // 请求 id
};
>
server.c/redisCommandTable[]
数组的元素存在,Redis Server 在启动时会调用 initServerConfig() 函数加载 redisCommandTable
,并将其存储 redisServer.commands
命令字典中。redisCommandTable
结构体格式如下: struct redisCommand redisCommandTable[] = {
...
{"get",getCommand,2,
"read-only fast @string",
0,NULL,1,1,1,0,0,0},
...
{"set",setCommand,-3,
"write use-memory @string",
0,NULL,1,1,1,0,0,0},
...
}
// [4] 执行以下逻辑:
// 获取请求类型
int is_read_command = (c->cmd->flags & CMD_READONLY) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_READONLY));
...
// 如果启用了用户验证,则在这里完成验证
if (authRequired(c)) {...}
// 权鉴
int acl_errpos;
int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
// cluster 模式特别处理
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand)) {...}
// 如果服务器配置了内存最大限制,则按需淘汰数据
if (server.maxmemory && !server.lua_timedout) {...}
// tracking 机制要求记录查询过的 key
if (server.tracking_clients) trackingLimitUsedSlots();
// 如果当前服务是 master,当持久化异常了或者正常的从节点数量小于阈值,拒绝执行请求
int deny_write_type = writeCommandsDeniedByDiskError();
...
// 如果当前服务是 slave,且客户端并非 master,则拒绝写命令
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
is_write_command) {...}
// 如果客户端处于 pub/sub 模式,而且使用的是 RESP2 协议,
// 则只允许执行 ping、sub、unsub、psub、punsub、reset 这几种命令
// 其他类型命令都拒绝执行
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
c->cmd->proc != pingCommand && ... {...}
// 如果当前服务是 slave,且与 master 断连,则拒绝执行 is_denystale_command 类型命令
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
is_denystale_command) {...}
// 服务器正在加载数据期间,只允许执行执行特定命令
if (server.loading && is_denyloading_command) {...}
// lua 脚本执行超时期间,只有特定命令能执行
if (server.lua_timedout &&
c->cmd->proc != authCommand && ... {...}
// 阻止 slave 发送访问密钥空间的命令
if ((c->flags & CLIENT_SLAVE) && (is_may_replicate_command || is_write_command || is_read_command)) {...}
// 如果服务器已暂停,则将非 slave 客户端阻塞
if (!(c->flags & CLIENT_SLAVE) &&
((server.client_pause_type == CLIENT_PAUSE_ALL) ||
(server.client_pause_type == CLIENT_PAUSE_WRITE && is_may_replicate_command)))
{...}
[4]
:该部分内容主要涉及 ACL、Tracking、数据淘汰、特殊场景过滤等内容。 /* [5] Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&
c->cmd->proc != resetCommand)
{
// 如果当前 client 处于事务上下文,将非 exec、discard、watch、reset 命令放入事务队列
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 非事务上下文,或者是 exec、discard、watch、reset 命令,则直接执行
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
return C_OK;
}
[5]
:到了这一步,开始真正地执行命令:如果当前 client 处于事务上下文,且命令处理函数不是 execCommand
、discardCommand
、multiCommand
、resetCommand
这四种,则将命令放入事务队列;否则直接调用 call()
函数运行命令。call() 函数
server.c/call()
函数的源码如下:void call(client *c, int flags) {
...
// [1] 清空传播控制标志
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
...
// [2] 调用命令处理函数 proc,执行命令处理逻辑
c->cmd->proc(c);
...
// [3] 根据状态设定 propagate_flags 标志,再根据 propagate_flags 将命令记录到 AOF 文件或发送到从服务器
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
// 产生脏数据,即修改了数据库
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
...
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
[1]
:命令执行前,重置传播控制标志 (CLIENT_FORCE_AOF
、CLIENT_FORCE_REPL
、CLIENT_PREVENT_PROP
),这些标志应该只在命令执行过程中开启,因为 call
可能会递归调用,所以执行前先清除这些标志;[2]
:调用命令处理函数 proc
,执行命令处理逻辑。命令执行完毕后,执行以下逻辑:CLIENT_CLOSE_AFTER_COMMAND
,则将其修改为 CLIENT_CLOSE_AFTER_REPLY
,以便尽快关闭客户端;CLIENT_FORCE_REPL
和 CLIENT_FORCE_AOF
标志转移到真实客户端里;redis.call()
函数,Redis 会构建伪客户端调用 call()
函数,并将真实客户端 client
记录到 server.lua_caller
中。这样一来命令执行过程中打开的 CLIENT_FORCE_REPL
和 CLIENT_FORCE_AOF
都是暂存在伪客户端里的,所以需要转移这些标志。[3]
:根据状态设定 propagate_flags
标志,再根据 propagate_flags
将命令记录到 AOF 文件或发送到从服务器。propagate_flags
标志根据以下条件生成:PROPAGATE_AOF
和 PROPAGATE_REPL
标志;client
被开启了 CLIENT_FORCE_REPL
,则设置 PROPAGATE_REPL
标志;client
被开启了 CLIENT_FORCE_AOF
,则设置 PROPAGATE_AOF
标志;client
被开启了 CLIENT_PREVENT_REPL_PROP
且未开启 CMD_CALL_PROPAGATE_REPL
,则清除 PROPAGATE_REPL
标志;client
被开启了 CLIENT_PREVENT_AOF_PROP
且未开启 CMD_CALL_PROPAGATE_AOF
,则清除 PROPAGATE_AOF
标志; // [4] 补偿先前被去清空的传播控制标志
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
c->flags |= client_old_flags &
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
// [5] 记录 also_propagate 中存放的额外传播控制命令
if (server.also_propagate.numops) {...}
server.also_propagate = prev_also_propagate;
...
// [6] 如果是查询类型的命令,那么记忆该命令,后续当 key 变化时通知客户端(Tracking 机制)
if (c->cmd->flags & CMD_READONLY) {
client *caller = (c->flags
& CLIENT_LUA && server.lua_caller)
? server.lua_caller : c;
if (caller->flags & CLIENT_TRACKING &&
!(caller->flags & CLIENT_TRACKING_BCAST))
{
// 存储当前请求的所有 keys
trackingRememberKeys(caller);
}
}
// clean
...
}
[4]
:命令执行前会清空 client
中的传播控制标志,如果 client.flag
中本来存在这些标志,那么这里需要恢复这些的标志;[5]
:server.also_propagate
中存放了一系列需要额外传播的命令,这里将它们记录到 AOF 或者复制到从服务器;[6]
:Redis 6 新增的 Tracking 机制:如果执行的是查询请求,那么 Redis 会存储该命令中的 keys,后续当这些 key 变化时,需要通知客户端。commandProcessed()
处理后续逻辑:void commandProcessed(client *c) {
if (c->flags & CLIENT_BLOCKED) return;
// [1] 重置客户端
resetClient(c);
long long prev_offset = c->reploff;
// [2] 更新已同步命令的偏移量
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
}
// [3] 如果客户端是 master,则将这个命令复制到当前节点的从节点
if (c->flags & CLIENT_MASTER) {
long long applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}
[1]
:重置客户端,该操作不会清空 client
的回复缓冲区(下文介绍);[2]
:如果客户端是集群中的 master 节点,且不处于事务上下文,则更新已同步命令的偏移量 client.reploff
;[3]
:如果客户端是集群的 master 节点,则调用 replicationFeedSlavesFromMasterStream
将该命令复制到当前从节点的下级从节点上。返回响应的过程
client
中定义了两个回复缓冲区,用于缓存返回给客户端的响应数据:char buf[PROTO_REPLY_CHUNK_BYTES]
:字符数组,大小为 16KB,通过 client.bufpos
属性记录最新的写入位置;list *reply
:SDS 字符串链表,当回复内容较长时使用。addReply()
、addReplySds()
、addReplyProto()
等一系列函数,用于将响应数据写入缓冲区,这里我们以 addReplySds()
函数为例讲解,其它的流程均与之类似:void addReplySds(client *c, sds s) {
// [1] 如果客户端允许响应回复,则将其加到阻塞队列
if (prepareClientToWrite(c) != C_OK) {
/* The caller expects the sds to be free'd. */
sdsfree(s);
return;
}
// [2] 先尝试将其写入 client.buf
if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK)
// buf 存不下,则写入 list
_addReplyProtoToList(c,s,sdslen(s));
sdsfree(s);
}
[1]
:先通过 prepareClientToWrite()
判断客户端是否允许回复数据,如果允许则这个 client
追加到 server.clients_pending_write
;[2]
:然后尝试将回复数据写入 client.buf
,如果数据太长写不下,则将其写入 client.reply
链表。addReplyXxx
系列方法会在客户端请求执行期间出错时会被调用,存入错误信息,以及在客户端请求执行成功后调用 resetClient()
重置客户端时被调用,存入成功信息。client
被加入 server.clients_pending_write
队列上以后,接下来就是将缓冲区中的数据写入 TCP 发送缓冲了。该步骤由 handleClientsWithPendingWrites()
函数完成,该函数会在事件循环 休眠之前 的 beforeSleep()
函数中被调用,或者在 I/O 线程停止之前被调用,实现如下:int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
// [1] 遍历 clients_pending_write
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
...
// [2] 写入 tcp 发送缓冲
if (writeToClient(c,0) == C_ERR) continue;
// [3] 如果回复缓冲中数据太多,则注册回调慢慢发送
if (clientHasPendingReplies(c)) {
int ae_barrier = 0;
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_barrier = 1;
}
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
freeClientAsync(c);
}
}
}
return processed;
}
[1]
:遍历 clients_pending_write
链表,取下节点;[2]
:将节点内容写入 TCP 发送缓冲;[3]
:如果 client
的回复缓冲区中还有数据,说明无法一次写入完全,则将 sendReplyToClient()
函数注册到事件处理器,监听可写事件。