如无特殊说明,本文涉及的源码均位于 rdb.c 和 rdb.h 中


RDB 简介

Redis 是一款内存数据库,它的核心数据结构全部保存在易失性存储设备中,一旦机器重启,所有数据都将丢失。因此,Redis 提供了相应的持久化功能,可以将数据保存到磁盘文件中,以确保在 Redis 重启后能够从磁盘文件中重新加载数据。

Redis 支持两种持久化机制:RDB 和 AOF。其中,RDB 是 “Redis Database” 的缩写,它会在不同的时间点将内存中的数据库快照 (snapshot) 以二进制形式写入磁盘。

Redis 有两种生成 RDB 的方式:

  • 第一种是通过手动执行命令来生成 RDB:
    • save:由主进程执行,执行命令的客户端会被阻塞,且为了保证镜像的一致性,save 期间 Redis 是只读的。
    • bgsave:调用 glibc 的 fork 函数创建一个子进程来生成快照,父进程继续处理客户端请求。
  • 第二种是通过 serverCron 定时任务自动生成 RDB。

RDB 机制的常用配置如下:

  • save <seconds> <changes>在 seconds 时间内如果变更的键数量不少于 changes,则生成一次 RDB。示例如下:

    save 900 1 # 900 秒内,至少变更 1 次,才会自动备份 
    save 120 10 # 120 秒内,至少变更 10 次,才会自动备份 
    save 60 10000 # 60 秒内,至少变更 10000 次,才会自动备份
    save # 停用 RDB
    # save ... # 注释掉 save ,也会停用 RDB
    	

    最终该配置会被存储到 server.saveparam->secondsserver.saveparam->changes 中。

  • dbfilename:快照备份文件名字,默认为 dump.rdb;

  • dir:快照备份文件保存的目录,默认为当前目录;

  • rdbcompression:对于存储到磁盘中的快照,是否启动 LZF 压缩算法:

    • yes:默认启用
    • no:不想消耗 CPU 资源,可关闭
  • rdbchecksum:是否启动 CRC64 算法进行数据校验:

    • yes:默认启用。开启后,大约增加 10% 左右的 CPU 消耗
    • no:如果希望获得最大的性能提升,可以选择关闭

类 Unix 系统的进程技术简介

在正式介绍 RDB 生成机制之前,我们先来了解一些前置技术。

fork() 函数

Redis 内部通过调用 C 语言的 fork() 函数创建子进程,该函数在每个进程中都返回一个值。其中,子进程返回的值为 0,而主进程返回的值为子进程的 pid。

fork 后的子进程拥有在 fork 时间点之前与父进程完全相同的内存快照,后续即便父进程修改了自身的数据,也不会影响到子进程。因此,我们可以安全的在子进程中生成 RDB。

写时复制技术 (Copy On Write)

子进程刚产生时,子进程与父进程共享页表,此时父子进程可以共同读取同一个内存空间。只有当任意一个进程试图修改页表项时,内核才会为该页表项创建一个新拷贝。此后,父子进程不再共享该页表项。这就是类 Unix 系统的 Copy On Write(写时复制)技术。示意图如下:

写时复制技术

由于该技术的存在,在子进程生成 RDB 的过程中,父子进程都可以读取同一份内存数据。只有当父进程执行新的写操作时,才会触发“内存复制”过程。这样既保证了 Redis 的可用性,也保证了生成镜像内容的一致性。

当然,频繁的写操作会引发频繁的缺页中断,从而影响性能。因此,在生成 RDB 时,最好避免密集的写操作(例如 rehash)。

waitpid() 函数

C 语言的 waitpid() 函数用于等待子进程结束,其声明如下:

pid_t waitpid(pid_t pid,int *status,int options);
其参数列表如下:
  • pid
    • < -1:等待进程组编号为参数绝对值的所有子进程;
    • == -1:等待当前进程的所有子进程,这也是 Redis 这里传入的参数值;
    • == 0:等待进程组号与目前进程相同的任何子进程;
    • > 0:等待进程号为 pid 的子进程。
  • status:用于保存已结束的子进程的状态信息,可以通过以下几个宏来读取子进程的状态:
    • WIFEXITED():子进程是否正常结束;
    • WEXITSTATUS():子进程的退出码;
    • WIFSIGNALED():子进程是否是因为信号终止的;
    • WTERMSIG():不过信号代码,在 WIFSIGNALED(status) == true 时使用;
    • 其它略 …
  • options:控制函数的行为:
    • WNOHANG:非阻塞,如果没有进程结束,直接返回 0;
    • 其它略 …

其返回值如下:

  • pid_t:结束进程的 pid。如果出错则返回 -1。当启用非阻塞模式时,如果没有结束的子进程,则返回 0。

RDB 定时生成策略

RDB 文件的定时生成任务由 serverCron() 函数负责,核心实现如下:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // [1] 如果生成 RDB 、重写 AOF 的后台进程仍在运行
    if (hasActiveChildProcess() || ldbPendingChildren())
    {
        // 接收子进程的变更信息
        run_with_period(1000) receiveChildInfo();
        // 进行收尾工作
        checkChildrenDone();
    } else {
        // [2] 否则判断是否要进行新一轮的 RDB 持久化
        for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;
            if (server.dirty >= sp->changes && // 判断脏 key 数量
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == C_OK))
            {
                rdbSaveInfo rsi, *rsiptr;
                // 生成控制 RDB 生成的辅助变量 rdbSaveInfo
                rsiptr = rdbPopulateSaveInfo(&rsi);
                rdbSaveBackground(server.rdb_filename,rsiptr);
                break;
            }
        }
        ...
    }
    ...
    // [3] 补偿被 AOF 重写推迟的 RDB 后台生成任务
    if ...
    ...
}
  • [1]:通过 hasActiveChildProcess() 判断 RDB 子进程是否正在运行。若子进程正在运行,则执行以下两个操作:
    1. 调用 receiveChildInfo() 接收子进程修改了的 server 属性,由于父子进程的内存空间互不影响,因此这里需要通过 pipe 管道将子进程修改的数据更新到父进程
    2. 调用 checkChildrenDone() 检查子进程的执行进度。一旦执行完成,则进行收尾工作,详细内容将在 下文 介绍;
  • [2]:以下条件均满足时,在后台生成新的 RDB:
    1. server.dirty 大于 server.saveparams 中的 配置

      server.dirty 记录上一次生成 RDB 后,Redis 服务器变更了多少个 key

    2. 距离上次手动执行 SAVE 命令的时间间隔 (server.lastsave) 大于 sp->seconds 的配置;
    3. 距离上次后台生成 RDB 的时间间隔 (server.lastbgsave_try) 已经超过了 CONFIG_BGSAVE_RETRY_DELAY == 5 秒,或者当前的后台生成 RDB 状态 server.lastbgsave_status == C_OK
  • [3]:补偿被 AOF 重写推迟的 RDB 后台生成任务。

RDB 后台持久化过程

实际生产中很少使用 SAVE 命令生成 RDB,因此本文我们只讲解 RDB 的后台生成过程。

fork 子进程

rdbSaveBackground() 函数负责在后台生成 RDB 文件,核心实现如下:

int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
    pid_t childpid;
    // [1] 如果仍有在生成 RDB 的子进程,则返回
    if (hasActiveChildProcess()) return C_ERR;
    // 缓存当前的脏数据量
    server.dirty_before_bgsave = server.dirty;
    server.lastbgsave_try = time(NULL); // 更新时间戳
    // [2] fork 子进程
    if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
        int retval; // 子进程执行这里的代码
        redisSetProcTitle("redis-rdb-bgsave");
        // [3] 将子进程绑定到用户配置的 cpu 列表上
        redisSetCpuAffinity(server.bgsave_cpulist);
        // [4] 调用 rdbSave 生成 RDB 文件
        retval = rdbSave(filename,rsi);
        if (retval == C_OK) {
            // [5] 将子进程修改的内容,通过 pipe 管道发送给主进程
            sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
        }
        // [6] 退出子进程
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        // [7] 主进程执行这里
        ...
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        return C_OK;
    }
    return C_OK;
}
  • [1]:如果仍有子进程正在处理 RDB,则直接返回。防止 RDB 定时生成任务与 BGSAVE 指令重叠;
  • [2]:fork 用于生成 RDB 的子进程;
  • [3]:子进程将自己绑定到用户配置的 CPU 列表 bgsave_cpulist 上,减少不必要的进程上下文切换;
  • [4]:子进程调用 rdbSave() 函数生成 RDB 文件;
  • [5]:将子进程在 rdbSave 中修改的内容,通过 pipe 管道发送给主进程
  • [6]:处理结束后,退出子进程;
  • [7]:主进程执行这里的代码,更新 server 的运行时数据,server.rdb_child_type 会记录刚刚 fork 的子进程 ID,如果这个值不等于 -1,表示存在子进程。

rdbSave() 生成 RDB 文件

rdbSave() 函数负责生成 RDB 文件,核心实现如下:

int rdbSave(char *filename, rdbSaveInfo *rsi) {
    rio rdb;
    ...
    // [1] 根据 pid 生成临时文件名,然后打开文件
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    // [2] 初始化 rio 变量
    rioInitWithFile(&rdb,fp);
    startSaving(RDBFLAGS_NONE);
    // [3] 根据用户配置设定 sutosync
    if (server.rdb_save_incremental_fsync)
        rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
    // [4] 将内存数据写入临时文件
    if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
        goto werr;
    }
    // [5] 刷盘
    if (fflush(fp)) goto werr;
    if (fsync(fileno(fp))) goto werr;
    if (fclose(fp)) { fp = NULL; goto werr; }
    fp = NULL;
    // [6] 替换旧文件
    if (rename(tmpfile,filename) == -1) {
        stopSaving(0);
        return C_ERR;
    }
    // [7] 更新 server 中的属性
    server.dirty = 0;
    server.lastsave = time(NULL);
    server.lastbgsave_status = C_OK;
    stopSaving(1);
    return C_OK;
werr: ...
}
  • [1]:调用 C 标准库的 fopen() 打开一个临时文件用于保存 RDB 数据,命名规则为 temp-<pid>.rdb
  • [2]:初始化 rio 变量,该变量指向 rioFileIO,负责读写文件;
  • [3]:如果配置了 server.rdb_save_incremental_fsync,则将该配置赋值给 rio->io.file.autosync。当系统缓存的数据量大于该属性值时,触发一次 fsync 操作;
  • [4]:调用 rdbSaveRio 将内存快照数据写入临时文件;
  • [5]:调用 fflushfsync 函数将缓存数据落盘到文件;
  • [6]:重命名临时文件,替换旧的 RDB 文件;
  • [7]:更新 server 中的相关属性,该操作会触发 COW。

RDB 文件生成期间会频繁进行磁盘 I/O 操作。因此,Redis 封装了 rio 抽象层,以便统一处理不同的存储介质,简化 I/O 操作。相关的结构题类型为 _rio

  typedef struct _rio rio;
  struct _rio {
      // 读操作
      size_t (*read)(struct _rio *, void *buf, size_t len);
      // 写操作
      size_t (*write)(struct _rio *, const void *buf, size_t len);
      // 刷盘操作
      int (*flush)(struct _rio *);
      ...
      // 存储介质共用体 io
      union {
          /* 内存 buffer */
          struct {
              sds ptr;
              off_t pos;
          } buffer;
          /* Stdio 标准 I/O */
          struct {
              FILE *fp;
              off_t buffered;
              off_t autosync;
          } file;
          /* 网络连接对象 (used to read from socket) */
          struct {
              connection *conn;
              off_t pos;
              sds buf;
              size_t read_limit;
              size_t read_so_far;
          } conn;
          /* 管道 FD 描述符 (used to write to pipe). */
          struct {
              int fd;
              off_t pos;
              sds buf;
          } fd;
      } io;
  };
  
  • readwriteflush:对底层存储介质的读、写、刷盘操作函数指针;
  • io:底层存储介质句柄,支持 file 文件、内存 buffer、conn 连接和管道文件描述符 fd。

写入 RDB 数据

RDB 文件标记类型

首先,我们先来了解下 RDB 文件的标记类型,Redis 会在 RDB 文件的每一部分内容之前添加一个类型字节,以标志内容的类型:

标志含义:
RDB_OPCODE_IDLE键空闲时间,用于 LRU 算法
RDB_OPCODE_FREQ键 LFU 计数,用于 LFU 算法
RDB_OPCODE_AUXRDB 辅助字段
RDB_OPCODE_MODULE_AUXmodule 自定义类型的辅助字段
RDB_OPCODE_RESIZEDB数据库大小和过期字典大小
RDB_OPCODE_EXPIRETIME_MS键过期时间戳,单位 ms
RDB_OPCODE_EXPIRETIME键过期时间戳,单位 s
RDB_OPCODE_SELECTDB数据库索引标志
RDB_OPCODE_EOF文件结束标志

rdbSaveRio()

然后,我们再来看 rdbSaveRio 的实现,该函数负责将内存数据写入 RDB 文件,源码如下:

int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
    ...
    // [1] 写入 RDB_VERSION
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
    // [2] 写入辅助字段
    if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
    // [3] 触发模块回调
    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
    // [4] 遍历所有编号的 db
    for (j = 0; j < server.dbnum; j++) {
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);
        // [5] 写入 OPCODE_SELECTDB 标志,跟数据库 id
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
        if (rdbSaveLen(rdb,j) == -1) goto werr;
        // [6] 写入 OPCODE_RESIZEDB 标志和数据库大小、过期字典大小
        uint64_t db_size, expires_size;
        db_size = dictSize(db->dict);
        expires_size = dictSize(db->expires);
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
        // [7] 遍历所有键值对,写入 rdb
        while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;

            initStaticStringObject(key,keystr);
            expire = getExpire(db,&key);
            if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
            ...
        }
        dictReleaseIterator(di);
        di = NULL;
    }

    // [8] 所有 db 写完后,持久化 lua 脚本内容
    if (rsi && dictSize(server.lua_scripts)) {
        di = dictGetIterator(server.lua_scripts);
        while((de = dictNext(di)) != NULL) {
            robj *body = dictGetVal(de);
            if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                goto werr;
        }
        dictReleaseIterator(di);
        di = NULL;
    }
    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
    // [9] 写入 EOF 文件结束标志
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

    // [10] 写入 CRC64 校验码
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return C_OK;
werr:...
}
  • [1]:首先写入一个 REDIS<RDB_VERSION> 标记,以记录 RDB 文件的版本号,防止将来恢复时使用了不匹配的版本;

  • [2]:调用 rdbSaveInfoAuxFields() 依次写入以下辅助字段:

    • redis-ver:Redis Server 的版本号;
    • redis-bits:Redis 是 64 位还是 32 位;
    • ctime:RDB 创建时间;
    • used-mem:当前内存使用量。

    如果传入的第三个参数 rsi 不为空,还会接着写入以下字段:

    • repl-stream-db:存储 server.slaveseldb 属性;
    • repl-id:存储 server.replid 属性;
    • repl-offset:存储 server.master_repl_offset 属性。

    这三个字段用于主从复制。

  • [3]:触发 module 指定的回调;

  • [4]:遍历所有 redisDB:

    • [5]:写入 OPCODE_SELECTDB 标志,和数据库 id;
    • [6]:写入 OPCODE_RESIZEDB 标志,和数据库大小、过期字典大小;
    • [7]遍历所有键值对,调用 rdbSaveKeyValuePair() 将 key、value、expire 写入 RDB
  • [8]:所有 db 内容都写完后,将 lua 脚本内容也写入 RDB;

  • [9]:写入 RDB_OPCODE_EOF 文件结束标志;

  • [10]:计算文件 hash,用于文件完整性校验。

上述实现的核心是 rdbSaveKeyValuePair(),该函数负责将一个键值对写入磁盘文件,实现如下:

int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
    ...
    // [1] 写入过期时间
    if (expiretime != -1) {
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }
    // [2] lru lfu
    if (savelru) {
        ...
    }
    if (savelfu) {
        ...
    }
    // [3] 写入键值对类型标志
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    // 写入 key
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    // 写入 value
    if (rdbSaveObject(rdb,val,key) == -1) return -1;
    ...
    return 1;
}
  • [1]:如果设置了过期时间,则写入 RDB_OPCODE_EXPIRETIME_MS 标志和过期时间;
  • [2]:根据内存淘汰算法,记录空闲的时间或 LFU 计数;
  • [3]:首先写入键值对的类型标记,然后写入 key,最后写入 value。

Redis 向 RDB 写入内容时,调用了多种 rdbSaveXxx() 函数,不同的后缀用于写入不同类型的数据,以 rdbSaveType 为例:

int rdbSaveType(rio *rdb, unsigned char type) {
    return rdbWriteRaw(rdb,&type,1);
}
static ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) {
    if (rdb && rioWrite(rdb,p,len) == 0)
        return -1;
    return len;
}

这类方法最终是通过 rio 抽象层中的 rioWrite() 完成磁盘写入的。最终一个键值对在 RDB 中的格式如下:

[RDB_OPCODE_EXPIRETIME_MS<time>]<键值对标志><key><value>

主进程收尾工作

上文 讲过,主进程在 fork 子进程后,更新一下 server.rdb_child_type 等信息便返回执行其它定时任务了。等到下次运行定时任务,再执行 checkChildrenDone() 判断 RDB 是否生成完毕,如果处理完成再执行收尾工作。实现如下:

void checkChildrenDone(void) {
    int statloc = 0;
    pid_t pid;
    // [1] 以非阻塞方式,判断子进程是否完毕
    if ((pid = waitpid(-1, &statloc, WNOHANG)) != 0) {
        // [2] 获取已经结束子进程的退出码、中断信号
        int exitcode = WIFEXITED(statloc) ? WEXITSTATUS(statloc) : -1;
        int bysignal = 0;
        if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
        // 如果退出码为 255,表示是被 kill 的,更新信息
        if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {
            bysignal = SIGUSR1;
            exitcode = 1;
        }
        if (pid == -1) {
            ...// waitpid 函数执行异常
        } else if (pid == server.child_pid) {
            // [3] 如果结束的子进程是 RDB 进程
            if (server.child_type == CHILD_TYPE_RDB) {
                // 更新状态
                backgroundSaveDoneHandler(exitcode, bysignal);
            } else if ...
            if (!bysignal && exitcode == 0) receiveChildInfo();
            resetChildState();
        } ...
    }
}
  • [1]:调用 C 语言的 waitpid,以非阻塞的方式(WNOHANG 控制)判断是否有已经结束的子进程;
  • [2]:如果存在结束的子进程,则获取其退出码。如果子进程是被信号终止的,还要获取其信号代码;
  • [3]:如果结束的子进程是 RDB 生成进程,则调用 backgroundSaveDoneHandler() 函数收尾。该函数内部会更新 server.dirtyserver.server.lastsaveserver.lastbgsave_status 等属性,最后还会将 RDB 文件发送给等待全量同步的从服务器。

RDB 文件示例

在分析 RDB 加载过程之前,我们需要先了解一下 RDB 文件的结构。

这里我们重新启动一个 Redis Server,然后将 k1-v1k2-v2k3-v3 这三个键值对存入 Redis,再利用 hexdump 工具观察生成的 RDB 文件,可以得到如下内容:

00000000  52 45 44 49 53 30 30 30  39 fa 09 72 65 64 69 73  |REDIS0009..redis|
00000010  2d 76 65 72 06 36 2e 32  2e 31 33 fa 0a 72 65 64  |-ver.6.2.13..red|
00000020  69 73 2d 62 69 74 73 c0  40 fa 05 63 74 69 6d 65  |is-bits.@..ctime|
00000030  c2 f0 40 60 65 fa 08 75  73 65 64 2d 6d 65 6d c2  |..@`e..used-mem.|
00000040  60 b6 0c 00 fa 0c 61 6f  66 2d 70 72 65 61 6d 62  |`.....aof-preamb|
00000050  6c 65 c0 00 fe 00 fb 03  00 00 02 6b 33 02 76 33  |le.........k3.v3|
00000060  00 02 6b 32 02 76 32 00  02 6b 31 02 76 31 ff fa  |..k2.v2..k1.v1..|
00000070  db b5 5b f9 5f b5 80                              |..[._..|
00000077

而在实际工作中,我们更多的是通过 redis-check-rdb 工具来分析 RDB,该工具输出的内容更加直观,示例如下:

[offset 0] Checking RDB file dump.rdb
[offset 27] AUX FIELD redis-ver = '6.2.13'
[offset 41] AUX FIELD redis-bits = '64'
[offset 53] AUX FIELD ctime = '1700806896'
[offset 68] AUX FIELD used-mem = '833120'
[offset 84] AUX FIELD aof-preamble = '0'
[offset 86] Selecting DB ID 0
[offset 119] Checksum OK
[offset 119] \o/ RDB looks OK! \o/
[info] 3 keys read
[info] 0 expires
[info] 0 already expired

在 RDB 中,数据是按照顺序排列的,每条数据都以 AUX 编码和内容的形式存储。因此,解析 RDB 文件时,从文件头开始向后逐步读取即可。

RDB 文件加载过程

Redis 在 启动过程 中调用 loadDataFromDisk() 来完成 RDB 的加载。详细的调用链路如下:loadDataFromDisk -> rdbLoad -> rdbLoadRio。最终,在 rdbLoadRio() 函数中完成对 RDB 文件的加载。结合前面的 RDB 文件内容样式,分析起来就非常容易了:

int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
    ...
    redisDb *db = server.db+0;
    char buf[1024];
    ...
    // [1] 读取 REDIS0009 字符串
    if (rioRead(rdb,buf,9) == 0) goto eoferr;
    buf[9] = '\0';
    // 如果开头不是 REDIS,则格式错误
    if (memcmp(buf,"REDIS",5) != 0) {
        ... ERR
    }
    // 获取 RDB 版本,如果不对就报错
    rdbver = atoi(buf+5);
    if (rdbver < 1 || rdbver > RDB_VERSION) {
        ERR
    }
  • [1]:读取 RDB 开头的九个字节,正常情况下会读取到字符串 REDIS + 版本号 0009,然后校验头部信息:如果 RDB 不以字符串 “REDIS” 开头,且 RDB 版本号小于 1、大于 RDB_VERSION,则报错;

    while(1) {
        sds key;
        robj *val;

        // [2] 读取一个字节,判断是不是 RDB_OPCODE,如果是,处理
        if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
        if (type == RDB_OPCODE_EXPIRETIME) {
            ...
            continue; /* Read next opcode. */
        } else if ...
        // [3] 如果是普通键值对,则获取 key、value
        if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
            goto eoferr;
        val = rdbLoadObject(type,rdb,key,&error);
        // [4] 如果是空的 key,则直接丢弃,否则报错推出
        if (val == NULL) {
            ...
        } 
        // [5] 如果 key 过期了,且当前服务是 master,则丢弃 key
        else if (iAmMaster() &&
            !(rdbflags&RDBFLAGS_AOF_PREAMBLE) &&
            expiretime != -1 && expiretime < now)
        {
            sdsfree(key);
            decrRefCount(val);
            expired_keys_skipped++;
        }
        else {
            robj keyobj;
            initStaticStringObject(keyobj,key);
            // [6] 正常状态下, 将键值对加载到内存
            int added = dbAddRDBLoad(db,key,val);
            ...
            //LRU、LFU、过期时间省略。..
        }
        ...
    }
开始循环向后读取:
  • [2]:接着读取一个字节的信息,判断是不是 RDB_OPCODE,如果是则处理相应的类型,例如 FREQ、EXPIRETIME、EOF 等;
  • [3]:如果不是 RDB_OPCODE,则表明是普通的键值对,然后调用 rdbGenericLoadStringObject 将 key 封装为 SDS 字符串、调用 rdbLoadObject 将 value 封装为 redisObject
  • [4]:如果获取到的 value 为 NULL,且是因为空的 key 导致的,则丢弃 key,否则报错;
  • [5]:如果 key 过期了,且当前服务是 master 节点,则丢弃 key;
  • [6]:正常状态下,调用 dbAddRDBLoad 将键值对加载到内存;
    // [7] 当 RDB 文件版本大于等于 5 时,检验 CRC64
    if (rdbver >= 5) {
        ... memrev64ifbe
    }
    ...
}
  • [7]:当 RDB 文件版本大于等于 5 且启用了 rdb_checksum 时,检验文件的 CRC64。

至此,Redis RDB 备份机制的主要内容就介绍完了。