RDB 快照
本文详细介绍了 Redis 6 的 RDB 后台生成策略,包括 fork 子进程、定时生成策略、写时拷贝 (COW)、主进程收尾、RDB 文件二进制结构、RDB 加载原理等内容。
如无特殊说明,本文涉及的源码均位于 rdb.c 和 rdb.h 中
RDB 简介
Redis 是一款内存数据库,它的核心数据结构全部保存在易失性存储设备中,一旦机器重启,所有数据都将丢失。因此,Redis 提供了相应的持久化功能,可以将数据保存到磁盘文件中,以确保在 Redis 重启后能够从磁盘文件中重新加载数据。
Redis 支持两种持久化机制:RDB 和 AOF。其中,RDB 是 “Redis Database” 的缩写,它会在不同的时间点将内存中的数据库快照 (snapshot) 以二进制形式写入磁盘。
Redis 有两种生成 RDB 的方式:
- 第一种是通过手动执行命令来生成 RDB:
save
:由主进程执行,执行命令的客户端会被阻塞,且为了保证镜像的一致性,save
期间 Redis 是只读的。bgsave
:调用 glibc 的fork
函数创建一个子进程来生成快照,父进程继续处理客户端请求。
- 第二种是通过
serverCron
定时任务自动生成 RDB。
RDB 机制的常用配置如下:
save <seconds> <changes>
:在 seconds 时间内如果变更的键数量不少于 changes,则生成一次 RDB。示例如下:save 900 1 # 900 秒内,至少变更 1 次,才会自动备份 save 120 10 # 120 秒内,至少变更 10 次,才会自动备份 save 60 10000 # 60 秒内,至少变更 10000 次,才会自动备份 save # 停用 RDB # save ... # 注释掉 save ,也会停用 RDB
最终该配置会被存储到
server.saveparam->seconds
和server.saveparam->changes
中。dbfilename
:快照备份文件名字,默认为 dump.rdb;dir
:快照备份文件保存的目录,默认为当前目录;rdbcompression
:对于存储到磁盘中的快照,是否启动 LZF 压缩算法:yes
:默认启用no
:不想消耗 CPU 资源,可关闭
rdbchecksum
:是否启动 CRC64 算法进行数据校验:yes
:默认启用。开启后,大约增加 10% 左右的 CPU 消耗no
:如果希望获得最大的性能提升,可以选择关闭
类 Unix 系统的进程技术简介
在正式介绍 RDB 生成机制之前,我们先来了解一些前置技术。
fork() 函数
Redis 内部通过调用 C 语言的 fork()
函数创建子进程,该函数在每个进程中都返回一个值。其中,子进程返回的值为 0,而主进程返回的值为子进程的 pid。
fork
后的子进程拥有在 fork 时间点之前与父进程完全相同的内存快照,后续即便父进程修改了自身的数据,也不会影响到子进程。因此,我们可以安全的在子进程中生成 RDB。
写时复制技术 (Copy On Write)
子进程刚产生时,子进程与父进程共享页表,此时父子进程可以共同读取同一个内存空间。只有当任意一个进程试图修改页表项时,内核才会为该页表项创建一个新拷贝。此后,父子进程不再共享该页表项。这就是类 Unix 系统的 Copy On Write(写时复制)技术。示意图如下:
![写时复制技术](images/redis-unix-cow.webp)
由于该技术的存在,在子进程生成 RDB 的过程中,父子进程都可以读取同一份内存数据。只有当父进程执行新的写操作时,才会触发“内存复制”过程。这样既保证了 Redis 的可用性,也保证了生成镜像内容的一致性。
当然,频繁的写操作会引发频繁的缺页中断,从而影响性能。因此,在生成 RDB 时,最好避免密集的写操作(例如 rehash)。
waitpid() 函数
C 语言的 waitpid()
函数用于等待子进程结束,其声明如下:
pid_t waitpid(pid_t pid,int *status,int options);
pid
:< -1
:等待进程组编号为参数绝对值的所有子进程;== -1
:等待当前进程的所有子进程,这也是 Redis 这里传入的参数值;== 0
:等待进程组号与目前进程相同的任何子进程;> 0
:等待进程号为pid
的子进程。
status
:用于保存已结束的子进程的状态信息,可以通过以下几个宏来读取子进程的状态:WIFEXITED()
:子进程是否正常结束;WEXITSTATUS()
:子进程的退出码;WIFSIGNALED()
:子进程是否是因为信号终止的;WTERMSIG()
:不过信号代码,在WIFSIGNALED(status) == true
时使用;- 其它略 …
options
:控制函数的行为:WNOHANG
:非阻塞,如果没有进程结束,直接返回 0;- 其它略 …
其返回值如下:
pid_t
:结束进程的 pid。如果出错则返回 -1。当启用非阻塞模式时,如果没有结束的子进程,则返回 0。
RDB 定时生成策略
RDB 文件的定时生成任务由 serverCron() 函数负责,核心实现如下:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// [1] 如果生成 RDB 、重写 AOF 的后台进程仍在运行
if (hasActiveChildProcess() || ldbPendingChildren())
{
// 接收子进程的变更信息
run_with_period(1000) receiveChildInfo();
// 进行收尾工作
checkChildrenDone();
} else {
// [2] 否则判断是否要进行新一轮的 RDB 持久化
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
if (server.dirty >= sp->changes && // 判断脏 key 数量
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
rdbSaveInfo rsi, *rsiptr;
// 生成控制 RDB 生成的辅助变量 rdbSaveInfo
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}
...
}
...
// [3] 补偿被 AOF 重写推迟的 RDB 后台生成任务
if ...
...
}
[1]
:通过hasActiveChildProcess()
判断 RDB 子进程是否正在运行。若子进程正在运行,则执行以下两个操作:- 调用
receiveChildInfo()
接收子进程修改了的server
属性,由于父子进程的内存空间互不影响,因此这里需要通过 pipe 管道将子进程修改的数据更新到父进程; - 调用
checkChildrenDone()
检查子进程的执行进度。一旦执行完成,则进行收尾工作,详细内容将在 下文 介绍;
- 调用
[2]
:以下条件均满足时,在后台生成新的 RDB:server.dirty
大于server.saveparams
中的 配置;server.dirty
记录上一次生成 RDB 后,Redis 服务器变更了多少个 key- 距离上次手动执行
SAVE
命令的时间间隔 (server.lastsave
) 大于sp->seconds
的配置; - 距离上次后台生成 RDB 的时间间隔 (
server.lastbgsave_try
) 已经超过了CONFIG_BGSAVE_RETRY_DELAY == 5
秒,或者当前的后台生成 RDB 状态server.lastbgsave_status == C_OK
。
[3]
:补偿被 AOF 重写推迟的 RDB 后台生成任务。
RDB 后台持久化过程
实际生产中很少使用 SAVE
命令生成 RDB,因此本文我们只讲解 RDB 的后台生成过程。
fork 子进程
rdbSaveBackground()
函数负责在后台生成 RDB 文件,核心实现如下:
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
// [1] 如果仍有在生成 RDB 的子进程,则返回
if (hasActiveChildProcess()) return C_ERR;
// 缓存当前的脏数据量
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL); // 更新时间戳
// [2] fork 子进程
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
int retval; // 子进程执行这里的代码
redisSetProcTitle("redis-rdb-bgsave");
// [3] 将子进程绑定到用户配置的 cpu 列表上
redisSetCpuAffinity(server.bgsave_cpulist);
// [4] 调用 rdbSave 生成 RDB 文件
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
// [5] 将子进程修改的内容,通过 pipe 管道发送给主进程
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}
// [6] 退出子进程
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
// [7] 主进程执行这里
...
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
return C_OK;
}
return C_OK;
}
[1]
:如果仍有子进程正在处理 RDB,则直接返回。防止 RDB 定时生成任务与BGSAVE
指令重叠;[2]
:fork 用于生成 RDB 的子进程;[3]
:子进程将自己绑定到用户配置的 CPU 列表bgsave_cpulist
上,减少不必要的进程上下文切换;[4]
:子进程调用rdbSave()
函数生成 RDB 文件;[5]
:将子进程在rdbSave
中修改的内容,通过 pipe 管道发送给主进程;[6]
:处理结束后,退出子进程;[7]
:主进程执行这里的代码,更新server
的运行时数据,server.rdb_child_type
会记录刚刚 fork 的子进程 ID,如果这个值不等于 -1,表示存在子进程。
rdbSave() 生成 RDB 文件
rdbSave()
函数负责生成 RDB 文件,核心实现如下:
int rdbSave(char *filename, rdbSaveInfo *rsi) {
rio rdb;
...
// [1] 根据 pid 生成临时文件名,然后打开文件
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
// [2] 初始化 rio 变量
rioInitWithFile(&rdb,fp);
startSaving(RDBFLAGS_NONE);
// [3] 根据用户配置设定 sutosync
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
// [4] 将内存数据写入临时文件
if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
goto werr;
}
// [5] 刷盘
if (fflush(fp)) goto werr;
if (fsync(fileno(fp))) goto werr;
if (fclose(fp)) { fp = NULL; goto werr; }
fp = NULL;
// [6] 替换旧文件
if (rename(tmpfile,filename) == -1) {
stopSaving(0);
return C_ERR;
}
// [7] 更新 server 中的属性
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
stopSaving(1);
return C_OK;
werr: ...
}
[1]
:调用 C 标准库的fopen()
打开一个临时文件用于保存 RDB 数据,命名规则为temp-<pid>.rdb
;[2]
:初始化rio
变量,该变量指向rioFileIO
,负责读写文件;[3]
:如果配置了server.rdb_save_incremental_fsync
,则将该配置赋值给rio->io.file.autosync
。当系统缓存的数据量大于该属性值时,触发一次fsync
操作;[4]
:调用rdbSaveRio
将内存快照数据写入临时文件;[5]
:调用fflush
、fsync
函数将缓存数据落盘到文件;[6]
:重命名临时文件,替换旧的 RDB 文件;[7]
:更新server
中的相关属性,该操作会触发 COW。
RDB 文件生成期间会频繁进行磁盘 I/O 操作。因此,Redis 封装了 rio 抽象层,以便统一处理不同的存储介质,简化 I/O 操作。相关的结构题类型为
_rio
:typedef struct _rio rio; struct _rio { // 读操作 size_t (*read)(struct _rio *, void *buf, size_t len); // 写操作 size_t (*write)(struct _rio *, const void *buf, size_t len); // 刷盘操作 int (*flush)(struct _rio *); ... // 存储介质共用体 io union { /* 内存 buffer */ struct { sds ptr; off_t pos; } buffer; /* Stdio 标准 I/O */ struct { FILE *fp; off_t buffered; off_t autosync; } file; /* 网络连接对象 (used to read from socket) */ struct { connection *conn; off_t pos; sds buf; size_t read_limit; size_t read_so_far; } conn; /* 管道 FD 描述符 (used to write to pipe). */ struct { int fd; off_t pos; sds buf; } fd; } io; };
read
、write
、flush
:对底层存储介质的读、写、刷盘操作函数指针;io
:底层存储介质句柄,支持 file 文件、内存 buffer、conn 连接和管道文件描述符 fd。
写入 RDB 数据
RDB 文件标记类型
首先,我们先来了解下 RDB 文件的标记类型,Redis 会在 RDB 文件的每一部分内容之前添加一个类型字节,以标志内容的类型:
标志 | 含义: |
---|---|
RDB_OPCODE_IDLE | 键空闲时间,用于 LRU 算法 |
RDB_OPCODE_FREQ | 键 LFU 计数,用于 LFU 算法 |
RDB_OPCODE_AUX | RDB 辅助字段 |
RDB_OPCODE_MODULE_AUX | module 自定义类型的辅助字段 |
RDB_OPCODE_RESIZEDB | 数据库大小和过期字典大小 |
RDB_OPCODE_EXPIRETIME_MS | 键过期时间戳,单位 ms |
RDB_OPCODE_EXPIRETIME | 键过期时间戳,单位 s |
RDB_OPCODE_SELECTDB | 数据库索引标志 |
RDB_OPCODE_EOF | 文件结束标志 |
rdbSaveRio()
然后,我们再来看 rdbSaveRio
的实现,该函数负责将内存数据写入 RDB 文件,源码如下:
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
...
// [1] 写入 RDB_VERSION
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
// [2] 写入辅助字段
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
// [3] 触发模块回调
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
// [4] 遍历所有编号的 db
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
// [5] 写入 OPCODE_SELECTDB 标志,跟数据库 id
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(rdb,j) == -1) goto werr;
// [6] 写入 OPCODE_RESIZEDB 标志和数据库大小、过期字典大小
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
// [7] 遍历所有键值对,写入 rdb
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
...
}
dictReleaseIterator(di);
di = NULL;
}
// [8] 所有 db 写完后,持久化 lua 脚本内容
if (rsi && dictSize(server.lua_scripts)) {
di = dictGetIterator(server.lua_scripts);
while((de = dictNext(di)) != NULL) {
robj *body = dictGetVal(de);
if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
goto werr;
}
dictReleaseIterator(di);
di = NULL;
}
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
// [9] 写入 EOF 文件结束标志
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
// [10] 写入 CRC64 校验码
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return C_OK;
werr:...
}
[1]
:首先写入一个REDIS<RDB_VERSION>
标记,以记录 RDB 文件的版本号,防止将来恢复时使用了不匹配的版本;[2]
:调用rdbSaveInfoAuxFields()
依次写入以下辅助字段:redis-ver
:Redis Server 的版本号;redis-bits
:Redis 是 64 位还是 32 位;ctime
:RDB 创建时间;used-mem
:当前内存使用量。
如果传入的第三个参数
rsi
不为空,还会接着写入以下字段:repl-stream-db
:存储server.slaveseldb
属性;repl-id
:存储server.replid
属性;repl-offset
:存储server.master_repl_offset
属性。
这三个字段用于主从复制。
[3]
:触发 module 指定的回调;[4]
:遍历所有 redisDB:[5]
:写入OPCODE_SELECTDB
标志,和数据库 id;[6]
:写入OPCODE_RESIZEDB
标志,和数据库大小、过期字典大小;[7]
:遍历所有键值对,调用rdbSaveKeyValuePair()
将 key、value、expire 写入 RDB;
[8]
:所有 db 内容都写完后,将 lua 脚本内容也写入 RDB;[9]
:写入RDB_OPCODE_EOF
文件结束标志;[10]
:计算文件 hash,用于文件完整性校验。
上述实现的核心是 rdbSaveKeyValuePair()
,该函数负责将一个键值对写入磁盘文件,实现如下:
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
...
// [1] 写入过期时间
if (expiretime != -1) {
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}
// [2] lru lfu
if (savelru) {
...
}
if (savelfu) {
...
}
// [3] 写入键值对类型标志
if (rdbSaveObjectType(rdb,val) == -1) return -1;
// 写入 key
if (rdbSaveStringObject(rdb,key) == -1) return -1;
// 写入 value
if (rdbSaveObject(rdb,val,key) == -1) return -1;
...
return 1;
}
[1]
:如果设置了过期时间,则写入RDB_OPCODE_EXPIRETIME_MS
标志和过期时间;[2]
:根据内存淘汰算法,记录空闲的时间或 LFU 计数;[3]
:首先写入键值对的类型标记,然后写入 key,最后写入 value。
Redis 向 RDB 写入内容时,调用了多种 rdbSaveXxx()
函数,不同的后缀用于写入不同类型的数据,以 rdbSaveType
为例:
int rdbSaveType(rio *rdb, unsigned char type) {
return rdbWriteRaw(rdb,&type,1);
}
static ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) {
if (rdb && rioWrite(rdb,p,len) == 0)
return -1;
return len;
}
这类方法最终是通过 rio 抽象层中的 rioWrite()
完成磁盘写入的。最终一个键值对在 RDB 中的格式如下:
[RDB_OPCODE_EXPIRETIME_MS<time>]<键值对标志><key><value>
主进程收尾工作
上文 讲过,主进程在 fork 子进程后,更新一下 server.rdb_child_type
等信息便返回执行其它定时任务了。等到下次运行定时任务,再执行 checkChildrenDone()
判断 RDB 是否生成完毕,如果处理完成再执行收尾工作。实现如下:
void checkChildrenDone(void) {
int statloc = 0;
pid_t pid;
// [1] 以非阻塞方式,判断子进程是否完毕
if ((pid = waitpid(-1, &statloc, WNOHANG)) != 0) {
// [2] 获取已经结束子进程的退出码、中断信号
int exitcode = WIFEXITED(statloc) ? WEXITSTATUS(statloc) : -1;
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
// 如果退出码为 255,表示是被 kill 的,更新信息
if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {
bysignal = SIGUSR1;
exitcode = 1;
}
if (pid == -1) {
...// waitpid 函数执行异常
} else if (pid == server.child_pid) {
// [3] 如果结束的子进程是 RDB 进程
if (server.child_type == CHILD_TYPE_RDB) {
// 更新状态
backgroundSaveDoneHandler(exitcode, bysignal);
} else if ...
if (!bysignal && exitcode == 0) receiveChildInfo();
resetChildState();
} ...
}
}
[1]
:调用 C 语言的 waitpid,以非阻塞的方式(WNOHANG
控制)判断是否有已经结束的子进程;[2]
:如果存在结束的子进程,则获取其退出码。如果子进程是被信号终止的,还要获取其信号代码;[3]
:如果结束的子进程是 RDB 生成进程,则调用backgroundSaveDoneHandler()
函数收尾。该函数内部会更新server.dirty
、server.server.lastsave
、server.lastbgsave_status
等属性,最后还会将 RDB 文件发送给等待全量同步的从服务器。
RDB 文件示例
在分析 RDB 加载过程之前,我们需要先了解一下 RDB 文件的结构。
这里我们重新启动一个 Redis Server,然后将 k1-v1
、k2-v2
、k3-v3
这三个键值对存入 Redis,再利用 hexdump
工具观察生成的 RDB 文件,可以得到如下内容:
00000000 52 45 44 49 53 30 30 30 39 fa 09 72 65 64 69 73 |REDIS0009..redis|
00000010 2d 76 65 72 06 36 2e 32 2e 31 33 fa 0a 72 65 64 |-ver.6.2.13..red|
00000020 69 73 2d 62 69 74 73 c0 40 fa 05 63 74 69 6d 65 |is-bits.@..ctime|
00000030 c2 f0 40 60 65 fa 08 75 73 65 64 2d 6d 65 6d c2 |..@`e..used-mem.|
00000040 60 b6 0c 00 fa 0c 61 6f 66 2d 70 72 65 61 6d 62 |`.....aof-preamb|
00000050 6c 65 c0 00 fe 00 fb 03 00 00 02 6b 33 02 76 33 |le.........k3.v3|
00000060 00 02 6b 32 02 76 32 00 02 6b 31 02 76 31 ff fa |..k2.v2..k1.v1..|
00000070 db b5 5b f9 5f b5 80 |..[._..|
00000077
而在实际工作中,我们更多的是通过 redis-check-rdb
工具来分析 RDB,该工具输出的内容更加直观,示例如下:
[offset 0] Checking RDB file dump.rdb
[offset 27] AUX FIELD redis-ver = '6.2.13'
[offset 41] AUX FIELD redis-bits = '64'
[offset 53] AUX FIELD ctime = '1700806896'
[offset 68] AUX FIELD used-mem = '833120'
[offset 84] AUX FIELD aof-preamble = '0'
[offset 86] Selecting DB ID 0
[offset 119] Checksum OK
[offset 119] \o/ RDB looks OK! \o/
[info] 3 keys read
[info] 0 expires
[info] 0 already expired
在 RDB 中,数据是按照顺序排列的,每条数据都以 AUX 编码和内容的形式存储。因此,解析 RDB 文件时,从文件头开始向后逐步读取即可。
RDB 文件加载过程
Redis 在 启动过程 中调用 loadDataFromDisk()
来完成 RDB 的加载。详细的调用链路如下:loadDataFromDisk -> rdbLoad -> rdbLoadRio
。最终,在 rdbLoadRio()
函数中完成对 RDB 文件的加载。结合前面的 RDB 文件内容样式,分析起来就非常容易了:
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
...
redisDb *db = server.db+0;
char buf[1024];
...
// [1] 读取 REDIS0009 字符串
if (rioRead(rdb,buf,9) == 0) goto eoferr;
buf[9] = '\0';
// 如果开头不是 REDIS,则格式错误
if (memcmp(buf,"REDIS",5) != 0) {
... ERR
}
// 获取 RDB 版本,如果不对就报错
rdbver = atoi(buf+5);
if (rdbver < 1 || rdbver > RDB_VERSION) {
ERR
}
[1]
:读取 RDB 开头的九个字节,正常情况下会读取到字符串REDIS
+ 版本号0009
,然后校验头部信息:如果 RDB 不以字符串 “REDIS” 开头,且 RDB 版本号小于 1、大于RDB_VERSION
,则报错;
while(1) {
sds key;
robj *val;
// [2] 读取一个字节,判断是不是 RDB_OPCODE,如果是,处理
if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
if (type == RDB_OPCODE_EXPIRETIME) {
...
continue; /* Read next opcode. */
} else if ...
// [3] 如果是普通键值对,则获取 key、value
if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
goto eoferr;
val = rdbLoadObject(type,rdb,key,&error);
// [4] 如果是空的 key,则直接丢弃,否则报错推出
if (val == NULL) {
...
}
// [5] 如果 key 过期了,且当前服务是 master,则丢弃 key
else if (iAmMaster() &&
!(rdbflags&RDBFLAGS_AOF_PREAMBLE) &&
expiretime != -1 && expiretime < now)
{
sdsfree(key);
decrRefCount(val);
expired_keys_skipped++;
}
else {
robj keyobj;
initStaticStringObject(keyobj,key);
// [6] 正常状态下, 将键值对加载到内存
int added = dbAddRDBLoad(db,key,val);
...
//LRU、LFU、过期时间省略。..
}
...
}
[2]
:接着读取一个字节的信息,判断是不是RDB_OPCODE
,如果是则处理相应的类型,例如 FREQ、EXPIRETIME、EOF 等;[3]
:如果不是RDB_OPCODE
,则表明是普通的键值对,然后调用rdbGenericLoadStringObject
将 key 封装为 SDS 字符串、调用rdbLoadObject
将 value 封装为redisObject
;[4]
:如果获取到的 value 为 NULL,且是因为空的 key 导致的,则丢弃 key,否则报错;[5]
:如果 key 过期了,且当前服务是 master 节点,则丢弃 key;[6]
:正常状态下,调用dbAddRDBLoad
将键值对加载到内存;
// [7] 当 RDB 文件版本大于等于 5 时,检验 CRC64
if (rdbver >= 5) {
... memrev64ifbe
}
...
}
[7]
:当 RDB 文件版本大于等于 5 且启用了rdb_checksum
时,检验文件的 CRC64。
至此,Redis RDB 备份机制的主要内容就介绍完了。