网络连接管理
本文详细介绍了 Redis 6 的客户端网络连接管理过程,包括 connection 与 client 数据结构、网络连接过程的处理以及网络断开后的处理过程。
如无特殊说明,本文涉及的源码均位于 connection.c 与 connection.h 中
Redis 服务启动后,便可接受来自客户端的连接,本文将详细介绍 Redis Server 如何管理这些来自客户端的连接。
数据结构
Redis Server 中负责存储客户端连接的结构体是 connection.h 中的 connection
:
struct connection {
ConnectionType *type;
ConnectionState state;
short int flags;
short int refs;
int last_errno;
void *private_data;
ConnectionCallbackFunc conn_handler;
ConnectionCallbackFunc write_handler;
ConnectionCallbackFunc read_handler;
int fd;
};
type
:用于操作连接通道的函数集:typedef struct ConnectionType { void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask); int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler); int (*write)(struct connection *conn, const void *data, size_t data_len); int (*read)(struct connection *conn, void *buf, size_t buf_len); void (*close)(struct connection *conn); int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler); ... } ConnectionType;
state
:连接状态定义,包括以下几种:typedef enum { CONN_STATE_NONE = 0, CONN_STATE_CONNECTING, CONN_STATE_ACCEPTING, CONN_STATE_CONNECTED, CONN_STATE_CLOSED, CONN_STATE_ERROR } ConnectionState;
last_errno
:该连接的最新错误信息;private_data
:存放附加数据;(conn|write|read)_handler
:连接、读取、写入操作的回调函数;fd
:连接的文件描述符。
当 Redis 服务器收到新的连接请求时,它会在连接事件处理器 acceptTcpHandler
中调用 connCreateAcceptedSocket()
函数,以创建一个 connection
结构体实例。这些 connection
的 type
属性都会被设置为 connection.c/CT_Socket
,它内部注册了一大批用于 I/O 操作的函数指针:
ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,
.close = connSocketClose,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
.connect = connSocketConnect,
...
};
除了网络连接信息,Redis 服务还会保存这个连接所属的 Client 信息,客户端信息由 server.h/client
结构体保存:
typedef struct client {
...
sds querybuf; // 查询缓冲,存放客户端请求数据
size_t qb_pos; // 查询缓冲最新读取位置
sds pending_querybuf;
size_t querybuf_peak; // 客户端单次请求数据量峰值
...
int reqtype; // 请求数据的协议类型
int multibulklen; // 当前解析的命令请求中尚未处理的命令参数数量
long bulklen; // 当前读取命令参数长度
list *reply; // 链表回复缓冲区
unsigned long long reply_bytes; // 链表回复缓冲区字节数
...
uint64_t flags; //客户端标志
...
int bufpos; // 固定回复缓冲区最新操作位置
char buf[PROTO_REPLY_CHUNK_BYTES]; // 固定回复缓冲区
} client;
了解了上述结构体以后,接下来我们详细介绍下客户端网络连接的处理过程。
客户端连接事件处理
Redis 服务启动后,会为 AE_READABLE
文件事件注册连接事件处理器 acceptTcpHandler。acceptTcpHandler
函数会处理来自客户端的连接,创建数据交换套接字,并为该套接字注册后续的事件回调函数:
#define MAX_ACCEPTS_PER_CALL 1000
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
// [1] 控制每次处理的请求数量
while(max--) {
// [2] 接受连接
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
// [3] fork 后台进程时,不要继承该连接
anetCloexec(cfd);
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
// [4] connCreateAcceptedSocket 创建 connection 结构体
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
[1]
:每次事件循环最多接受MAX_ACCEPTS_PER_CALL
(1000) 个连接请求,防止进程被长时间阻塞;[2]
:调用 C 语言的accept
函数接受新的客户端连接,并返回数据交换套接字文件描述符。[3]
:将 Socket 文件描述符的标志位设置为非继承,这样当主进程 fork 子进程时,会自动关闭子进程的文件描述符;Redis 的后台进程无需与用户进行交互,比如 RDB 生成和 Rehash 等任务,它们只需拥有当前内存数据库的快照即可。因此,关闭子进程的套接字连接可以有效地避免发生竞争。
[4]
:connCreateAcceptedSocket
函数创建并返回connection
结构体。
acceptCommonHandler
函数拿到 connCreateAcceptedSocket
创建的 connection
后,再完成创建 client
结构体、为 Socket 注册文件事件处理器等后续逻辑:
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
client *c;
char conninfo[100];
UNUSED(ip);
...
// [1] 如果 client 数量加上 Cluster 数量超过限制,返回错误
if (listLength(server.clients) + getClusterConnectionsCount()
>= server.maxclients) {
...
server.stat_rejected_conn++;
connClose(conn);
return;
}
// [2] 创建 client 结构体
if ((c = createClient(conn)) == NULL) {
...
connClose(conn);
return;
}
c->flags |= flags;
// [3] 设置 clientAcceptHandler 回调,并立刻调用
if (connAccept(conn, clientAcceptHandler) == C_ERR) {
...
return;
}
}
[1]
:如果 client 数量加上 Cluster 数量超过限制,返回错误信息;[2]
:创建 client 结构体;[3]
:将clientAcceptHandler
设置为 accept 回调,不过由于执行acceptCommonHandler
方法时已经处于 accept 过程中了,所以这个回调会被立刻调用。
创建 client
结构体的方法为 createClient
:
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
// [1] 如果 connection 不为 null,则创建普通客户端
if (conn) {
// [2] 设定非阻塞模式
connNonBlock(conn);
connEnableTcpNoDelay(conn);
// [3] 启用 keepalive
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
// [4] 设置可读事件回调
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
// [5] 选择 0 号数据库,并初始化 client 属性
selectDb(c,0);
uint64_t client_id;
atomicGetIncr(server.next_client_id, client_id, 1);
c->id = client_id;
c->resp = 2;
c->co ...
// [6] 将 client 结构体添加到 redisServer 结构体
if (conn) linkClient(c);
// [7] 初始化 client 事务上下文
initClientMultiState(c);
return c;
}
[1]
:如果connection
不为 null,则为连接创建普通客户端,否则创建伪客户端:通常情况下,Redis 命令必须在客户端上下文中执行。然而,当命令需要在其他上下文环境中执行时,就需要创建一个伪客户端来进行操作,典型的例子就是 Lua 脚本的执行上下文。
[2]
、[3]
、[4]
:如果是真实的客户端连接,则完成以下调用:[2]
:将连接设置为非阻塞模式;[3]
:启用 keepalive 保持连接活跃;[4]
:为 socket 注册AE_READABLE
事件的处理函数 networking.c/readQueryFromClient()。connSetReadHandler
内部会将readQueryFromClient()
注册到上文介绍的ConnectionType.read
函数指针上,后续当有数据到达时,便会调用readQueryFromClient()
处理来自客户端的请求。
[5]
:选择 0 号数据库,并初始化 client 属性;[6]
:将client
结构体添加到redisServer
结构体;[7]
:初始化 client 事务上下文。
与客户端断开连接
当客户端发送 quit
命令,或 socket 连接断开 时,服务器会调用 networking.c/freeClientAsync
函数将 client
添加到 server.clients_to_close
链表。而后在 AE Loop 执行 beforeSleep()
钩子函数时,再调用 freeClientsInAsyncFreeQueue()
函数遍历这个链表,将链上的节点交给 freeClient
函数关闭。freeClient
主要完成以下工作:
- 如果断开连接的客户端是集群中的 Master 节点,则缓存这个客户端的信息,并将主从状态转换为待连接状态以便后续与主节点重新建立连接;
- 否则释放查询缓存等内存空间,并取消客户端监听的 watcher,取消客户端订阅的所有 topic;
- 如果断开连接的客户端是集群中的从节点,则将其从
server.monitors
或server.slaves
中删除,并减少server.repl_good_slaves_count
的计数; - 最终调用
unlinkClient
函数,关闭 Socket 连接。
另外,在时间事件处理器 serverCron
中也会调用 clientsCron
关闭超时未发送命令的客户端:
#define CLIENTS_CRON_MIN_ITERATIONS 5
void clientsCron(void) {
// [1] 每次处理 iterations 个客户端
int numclients = listLength(server.clients);
int iterations = numclients/server.hz;
mstime_t now = mstime();
if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
numclients : CLIENTS_CRON_MIN_ITERATIONS;
int curr_peak_mem_usage_slot = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS;
int zeroidx = (curr_peak_mem_usage_slot+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS;
ClientsPeakMemInput[zeroidx] = 0;
ClientsPeakMemOutput[zeroidx] = 0;
while(listLength(server.clients) && iterations--) {
client *c;
listNode *head;
listRotateTailToHead(server.clients);
head = listFirst(server.clients);
c = listNodeValue(head);
// [2] 关闭超时的客户端连接
if (clientsCronHandleTimeout(c,now)) continue;
// 压缩客户端的查询缓存
if (clientsCronResizeQueryBuffer(c)) continue;
if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue;
// 统计几秒内使用内存最多的客户端
if (clientsCronTrackClientsMemUsage(c)) continue;
if (closeClientOnOutputBufferLimitReached(c, 0)) continue;
}
}
[1]
:每次处理numclients/server.hz
个客户端。由于serverCron
每秒调用server.hz
次,所以每秒都会将所有客户端处理一轮;[2]
:关闭超时server.maxidletime
时间内没有发送请求的客户端连接;