首页 » NoSQL » 【Redis深入】aof与reaof源码剖析

【Redis深入】aof与reaof源码剖析

原文 http://blog.csdn.net/baiye_xing/article/details/76407939

2017-07-31 08:20:02阅读(606)

引入

如果你对Redis的 aof 和 reaof 机制还不太了解,请点击Redis持久化机制的介绍

我们先来简单的回顾一下aof持久化,它是通过保存redis服务器所执行的写命令来记录数据库状态的。通过配置选项来设置刷新(写入同步)到磁盘上aof文件的间隔(默认是每秒)。

reaof是因为AOF文件的内容会越来越多,为了解决AOF文件体积膨胀的问题,重写AOF机制出现了,其实AOF重写并不需要对现有的AOF进行读取或写入操作,而是通过读取服务器当前的数据库状态来实现的。

重写触发机制

Redis会记录上次重写时的AOF大小,默认配置是当AOF文件大小是上次rewrite后大小的一倍且文件大于64M时触发(默认) 当前没有BGREWRITEAOF子线程正在运行,才可以执行。 源码剖析 1.feedAppendOnlyFile()
/*
 * 将命令追加到 AOF 文件中,
 * 如果 AOF 重写正在进行,那么也将命令追加到 AOF 重写缓存中。
 */
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];
    //使用 SELECT 命令,显式设置数据库,确保之后的命令被设置到正确的数据库
    if (dictid != server.aof_selected_db) {
        char seldb[64];
        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }
    // EXPIRE 、 PEXPIRE 和 EXPIREAT 命令
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        //将 EXPIRE 、 PEXPIRE 和 EXPIREAT 都翻译成 PEXPIREAT
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    // SETEX 和 PSETEX 命令
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        //将两个命令都翻译成 SET 和 PEXPIREAT
        //SET
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        // PEXPIREAT
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    // 其他命令
    } else {
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }
    /*
     * 将命令追加到 AOF 缓存中,
     * 在重新进入事件循环之前,这些命令会被冲洗到磁盘上,
     * 并向客户端返回一个回复。
     */
    if (server.aof_state == REDIS_AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
    /*
     * 如果 BGREWRITEAOF 正在进行,
     * 那么我们还需要将命令追加到重写缓存中,
     * 从而记录当前正在重写的 AOF 文件和数据库当前状态的差异。
     */
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
    // 释放
    sdsfree(buf);
}

流程图

【Redis深入】aof与reaof源码剖析

注:对于缓存块的大小,因为程序需要不断对这个缓存执行 append 操作,而分配一个非常大的空间并不总是可能的,也可能产生大量的复制工作, 所以这里使用多个大小为 AOF_RW_BUF_BLOCK_SIZE 的空间来保存命令。默认每个缓存块的大小是10MB。源码如下:

// 每个缓存块的大小
#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block */
2.flushAppendOnlyFile()
void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;
    // 缓冲区中没有任何内容,直接返回
    if (sdslen(server.aof_buf) == 0) return;
    // 策略为每秒 FSYNC 
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        // 是否有 SYNC 正在后台进行?
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
    // 每秒 fsync ,并且强制写入为假
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        /*
         * 当 fsync 策略为每秒钟一次时, fsync 在后台执行。
         * 如果后台仍在执行 FSYNC ,那么我们可以延迟写操作一两秒
         * (如果强制执行 write 的话,服务器主线程将阻塞在 write 上面)
         */
        if (sync_in_progress) {
            // 有 fsync 正在后台进行 。。。
            if (server.aof_flush_postponed_start == 0) {
                /*
                 * 前面没有推迟过 write 操作,这里将推迟写操作的时间记录下来
                 * 然后就返回,不执行 write 或者 fsync
                 */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* 
                 * 如果之前已经因为 fsync 而推迟了 write 操作
                 * 但是推迟的时间不超过 2 秒,那么直接返回
                 * 不执行 write 或者 fsync
                 */
                return;
            }
            /* 
             * 如果后台还有 fsync 在执行,并且 write 已经推迟 >= 2 秒
             * 那么执行写操作(write 将被阻塞)
             */
            server.aof_delayed_fsync++;
            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). 
            Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }
    /*
     * 执行到这里,程序会对 AOF 文件进行写入。
     * 清零延迟 write 的时间记录
     */
    server.aof_flush_postponed_start = 0;
    /* 
     * 执行单个 write 操作,如果写入设备是物理的话,那么这个操作应该是原子的
     * 当然,如果出现像电源中断这样的不可抗现象,那么 AOF 文件也是可能会出现问题的
     * 这时就要用 redis-check-aof 程序来进行修复。
     */
    nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    if (nwritten != (signed)sdslen(server.aof_buf)) {
        static time_t last_write_error_log = 0;
        int can_log = 0;
        // 将日志的记录频率限制在每行 AOF_WRITE_LOG_ERROR_RATE 秒
        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
            can_log = 1;
            last_write_error_log = server.unixtime;
        }
        // 如果写入出错,那么尝试将该情况写入到日志里面
        if (nwritten == -1) {
            if (can_log) {
                redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",
                    strerror(errno));
                server.aof_last_write_errno = errno;
            }
        } else {
            if (can_log) {
                redisLog(REDIS_WARNING,"Short write while writing to "
                                       "the AOF file: (nwritten=%lld, "
                                       "expected=%lld)",
                                       (long long)nwritten,
                                       (long long)sdslen(server.aof_buf));
            }
            // 尝试移除新追加的不完整内容
            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                if (can_log) {
                    redisLog(REDIS_WARNING, "Could not remove short write "
                             "from the append-only file.  Redis may refuse "
                             "to load the AOF the next time it starts.  "
                             "ftruncate: %s", strerror(errno));
                }
            } else {
                nwritten = -1;
            }
            server.aof_last_write_errno = ENOSPC;
        }
        /* Handle the AOF write error. */
        // 处理写入 AOF 文件时出现的错误
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
            exit(1);
        } else {
            server.aof_last_write_status = REDIS_ERR;
            if (nwritten > 0) {
                server.aof_current_size += nwritten;
                sdsrange(server.aof_buf,nwritten,-1);
            }
            return; /* We'll try again on the next call... */
        }
    } else {
        // 写入成功,更新最后写入状态
        if (server.aof_last_write_status == REDIS_ERR) {
            redisLog(REDIS_WARNING,
                "AOF write error looks solved, Redis can write again.");
            server.aof_last_write_status = REDIS_OK;
        }
    }
    // 更新写入后的 AOF 文件大小
    server.aof_current_size += nwritten;
    /* 
     * 如果 AOF 缓存的大小足够小的话,那么重用这个缓存,
     * 否则的话,释放 AOF 缓存。
     */
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        // 清空缓存中的内容,等待重用
        sdsclear(server.aof_buf);
    } else {
        // 释放缓存
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }
    /* 
     * 如果 no-appendfsync-on-rewrite 选项为开启状态,
     * 并且有 BGSAVE 或者 BGREWRITEAOF 正在进行的话,
     * 那么不执行 fsync 
     */
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
            return;
    // 总是执行 fsnyc
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        // 更新最后一次执行 fsnyc 的时间
        server.aof_last_fsync = server.unixtime;
    // 策略为每秒 fsnyc ,并且距离上次 fsync 已经超过 1 秒
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        // 放到后台执行
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        // 更新最后一次执行 fsync 的时间
        server.aof_last_fsync = server.unixtime;
    }
    // 其实上面无论执行 if 部分还是 else 部分都要更新 fsync 的时间
    // 可以将代码挪到下面来
    // server.aof_last_fsync = server.unixtime;
}

分析:由配置选项来设置刷新(写入同步)到磁盘上aof文件的间隔(默认是每秒)。然后该过程就由flushAppendOnlyFile()函数执行,此外还会记录刷新AOF文件的时间,以便在判断是否reaof时作参考。

3.rewriteAppendOnlyFileBackground()
//后台重写 AOF 文件(BGREWRITEAOF)
int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    long long start;
    // 已经有进程在进行 AOF 重写了
    if (server.aof_child_pid != -1) return REDIS_ERR;
    // 记录 fork 开始前的时间,计算 fork 耗时用
    start = ustime();
    if ((childpid = fork()) == 0) {
        char tmpfile[256];
        /* Child */
        // 关闭网络连接 fd
        closeListeningSockets(0);
        // 为进程设置名字,方便记认
        redisSetProcTitle("redis-aof-rewrite");
        // 创建临时文件,并进行 AOF 重写
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
            size_t private_dirty = zmalloc_get_private_dirty();
            if (private_dirty) {
                redisLog(REDIS_NOTICE,
                    "AOF rewrite: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
            // 发送重写成功信号
            exitFromChild(0);
        } else {
            // 发送重写失败信号
            exitFromChild(1);
        }
    } else {
        /* Parent */
        // 记录执行 fork 所消耗的时间
        server.stat_fork_time = ustime()-start;
        if (childpid == -1) {
            redisLog(REDIS_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            return REDIS_ERR;
        }
        redisLog(REDIS_NOTICE,
            "Background append only file rewriting started by pid %d",childpid);
        // 记录 AOF 重写的信息
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        server.aof_child_pid = childpid;
        // 关闭字典自动 rehash
        updateDictResizePolicy();
        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
         * accumulated by the parent into server.aof_rewrite_buf will start
         * with a SELECT statement and it will be safe to merge. 
         *
         * 将 aof_selected_db 设为 -1 ,
         * 强制让 feedAppendOnlyFile() 下次执行时引发一个 SELECT 命令,
         * 从而确保之后新添加的命令会设置到正确的数据库中
         */
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return REDIS_OK;
    }
    return REDIS_OK; /* unreached */
}

分析:如果启动了重写触发机制(最前面讲过),那么主线程会调用rewriteAppendOnlyFileBackground()函数,开启一个子进程,而不会开启子线程,这是为了在避免使用锁的情况下,保证数据的安全性。子进程然后调用rewriteAppendOnlyFile()函数进行重写,而主线程不会阻塞。

4.rewriteAppendOnlyFile()
int rewriteAppendOnlyFile(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    rio aof;
    FILE *fp;
    char tmpfile[256];
    int j;
    long long now = mstime();
    /*
     * 创建临时文件
     * 注意这里创建的文件名和 rewriteAppendOnlyFileBackground() 创建的文件名稍有不同
     */
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return REDIS_ERR;
    }
    // 初始化文件 io
    rioInitWithFile(&aof,fp);
    // 设置每写入 REDIS_AOF_AUTOSYNC_BYTES 字节
    // 就执行一次 FSYNC 
    // 防止缓存中积累太多命令内容,造成 I/O 阻塞时间过长
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
    // 遍历所有数据库
    for (j = 0; j < server.dbnum; j++) {
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db+j;
        // 指向键空间
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        // 创建键空间迭代器
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }
        /* SELECT the new DB 
         *
         * 首先写入 SELECT 命令,确保之后的数据会被插入到正确的数据库上
         */
        if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
        if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
        /* Iterate this DB writing every entry 
         *
         * 遍历数据库所有键,并通过命令将它们的当前状态(值)记录到新 AOF 文件中
         */
        while((de = dictNext(di)) != NULL) {
            sds keystr;
            robj key, *o;
            long long expiretime;
            // 取出键
            keystr = dictGetKey(de);
            // 取出值
            o = dictGetVal(de);
            initStaticStringObject(key,keystr);
            // 取出过期时间
            expiretime = getExpire(db,&key);
            /* 
             * 如果键已经过期,那么跳过它,不保存
             */
            if (expiretime != -1 && expiretime < now) continue;
            /* Save the key and associated value 
             *
             * 根据值的类型,选择适当的命令来保存值
             */
            if (o->type == REDIS_STRING) {
                /* Emit a SET command */
                char cmd[]="*3\r\n$3\r\nSET\r\n";
                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                /* Key and value */
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                if (rioWriteBulkObject(&aof,o) == 0) goto werr;
            } else if (o->type == REDIS_LIST) {
                if (rewriteListObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_SET) {
                if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_ZSET) {
                if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_HASH) {
                if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
            } else {
                redisPanic("Unknown object type");
            }
            /* 
             * 保存键的过期时间
             */
            if (expiretime != -1) {
                char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
                // 写入 PEXPIREAT expiretime 命令
                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
            }
        }
        // 释放迭代器
        dictReleaseIterator(di);
    }
    // 冲洗并关闭新 AOF 文件
    if (fflush(fp) == EOF) goto werr;
    if (aof_fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;
    /* 
     * 原子地改名,用重写后的新 AOF 文件覆盖旧 AOF 文件
     */
    if (rename(tmpfile,filename) == -1) {
        redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }
    redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
    return REDIS_OK;
werr:
    fclose(fp);
    unlink(tmpfile);
    redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
    if (di) dictReleaseIterator(di);
    return REDIS_ERR;
}

分析:这个函数被 REWRITEAOF 和 BGREWRITEAOF 两个命令调用。将还原当前数据集的命令写入到 filename 指定的文件中。为了最小化重建数据集所需执行的命令数量,Redis 会尽可能地使用接受可变参数数量的命令,比如 RPUSH 、SADD 和 ZADD 等。不过单个命令每次处理的元素数量不能超过 REDIS_AOF_REWRITE_ITEMS_PER_CMD 的限制。

5.backgroundRewriteDoneHandler()
//当子线程完成 AOF 重写时,父进程调用这个函数。
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    if (!bysignal && exitcode == 0) {
        int newfd, oldfd;
        char tmpfile[256];
        long long now = ustime();
        redisLog(REDIS_NOTICE,
            "Background AOF rewrite terminated with success");
        // 打开保存新 AOF 文件内容的临时文件
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
            (int)server.aof_child_pid);
        newfd = open(tmpfile,O_WRONLY|O_APPEND);
        if (newfd == -1) {
            redisLog(REDIS_WARNING,
                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
            goto cleanup;
        }
        // 将累积的重写缓存写入到临时文件中
        // 这个函数调用的 write 操作会阻塞主进程
        if (aofRewriteBufferWrite(newfd) == -1) {
            redisLog(REDIS_WARNING,
                "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
            close(newfd);
            goto cleanup;
        }
        redisLog(REDIS_NOTICE,
            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize());
        /* 
         * 剩下的工作就是将临时文件改名为 AOF 程序指定的文件名,
         * 并将新文件的 fd 设为 AOF 程序的写目标。
         *
         * 不过这里有一个问题 ——
         * 我们不想 close(2) 或者 rename(2) 在删除旧文件时阻塞。
         *
         * 以下是两个可能的场景:
         *1)
         * AOF 被关闭,这个是一次单次的写操作。
         * 临时文件会被改名为 AOF 文件。
         * 本来已经存在的 AOF 文件会被 unlink ,这可能会阻塞服务器。
         * 2) 
         * AOF 被开启,并且重写后的 AOF 文件会立即被用于接收新的写入命令。
         * 当临时文件被改名为 AOF 文件时,原来的 AOF 文件描述符会被关闭。
         * 因为 Redis 会是最后一个引用这个文件的进程,
         * 所以关闭这个文件会引起 unlink ,这可能会阻塞服务器。
         * 为了避免出现阻塞现象,程序会将 close(2) 放到后台线程执行,
         * 这样服务器就可以持续处理请求,不会被中断。
         */
        if (server.aof_fd == -1) {
                        oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
        } else {
            /* AOF enabled */
            oldfd = -1; /* We'll set this to the current AOF filedes later. */
        }
        /* 
         * 对临时文件进行改名,替换现有的 AOF 文件。
         *
         * 旧的 AOF 文件不会在这里被 unlink ,因为 oldfd 引用了它。
         */
        if (rename(tmpfile,server.aof_filename) == -1) {
            redisLog(REDIS_WARNING,
                "Error trying to rename the temporary AOF file: %s", strerror(errno));
            close(newfd);
            if (oldfd != -1) close(oldfd);
            goto cleanup;
        }
        if (server.aof_fd == -1) {
            /* 
             * AOF 被关闭,直接关闭 AOF 文件,
             * 因为关闭 AOF 本来就会引起阻塞,所以这里就算 close 被阻塞也无所谓
             */
            close(newfd);
        } else {
            /* 
             * 用新 AOF 文件的 fd 替换原来 AOF 文件的 fd
             */
            oldfd = server.aof_fd;
            server.aof_fd = newfd;
            // 因为前面进行了 AOF 重写缓存追加,所以这里立即 fsync 一次
            if (server.aof_fsync == AOF_FSYNC_ALWAYS)
                aof_fsync(newfd);
            else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
                aof_background_fsync(newfd);
            // 强制引发 SELECT
            server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
            // 更新 AOF 文件的大小
            aofUpdateCurrentSize();
            // 记录前一次重写时的大小
            server.aof_rewrite_base_size = server.aof_current_size;
            /* 
             * 清空 AOF 缓存,因为它的内容已经被写入过了,没用了
             */
            sdsfree(server.aof_buf);
            server.aof_buf = sdsempty();
        }
        server.aof_lastbgrewrite_status = REDIS_OK;
        redisLog(REDIS_NOTICE, "Background AOF rewrite finished successfully");
        /* 
         * 如果是第一次创建 AOF 文件,那么更新 AOF 状态
         */
        if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
            server.aof_state = REDIS_AOF_ON;
        /* 
         * 异步关闭旧 AOF 文件
         */
        if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
        redisLog(REDIS_VERBOSE,
            "Background AOF rewrite signal handler took %lldus", ustime()-now);
    // BGREWRITEAOF 重写出错
    } else if (!bysignal && exitcode != 0) {
        server.aof_lastbgrewrite_status = REDIS_ERR;
        redisLog(REDIS_WARNING,
            "Background AOF rewrite terminated with error");
    // 未知错误
    } else {
        server.aof_lastbgrewrite_status = REDIS_ERR;
        redisLog(REDIS_WARNING,
            "Background AOF rewrite terminated by signal %d", bysignal);
    }
cleanup:
    // 清空 AOF 缓冲区
    aofRewriteBufferReset();
    // 移除临时文件
    aofRemoveTempFile(server.aof_child_pid);
    // 重置默认属性
    server.aof_child_pid = -1;
    server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
    server.aof_rewrite_time_start = -1;
    /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
    if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
        server.aof_rewrite_scheduled = 1;
}

分析:当子进程完成AOF重写操作时,子进程会向父进程发送一个信号,父进程在接收到该信号后,会调用这个函数,它将AOF重写缓冲区中的所有内容写入到新的AOF文件中,保证数据的一致性,但它调用的 write 操作会阻塞主进程。

reaof完整的流程图

【Redis深入】aof与reaof源码剖析

画图不易啊,如需使用请注明出处,谢谢!

注:蓝色方框和黑色线条表示主进程,红色方框和红色线条表示fork的子进程,而绿色线条表示缓冲块,蓝色AOF文件方框表示最新的AOF文件。

1.图中的4个步骤

(1)在每次事件结束调用前,将aof_buf_blocks刷新(写入同步)到磁盘上aof文件(默认是每秒)。该过程就由flushAppendOnlyFile()函数执行,此外还会记录刷新AOF文件的时间,以便在判断是否reaof时作参考。

(2)如果达到了aof重写的条件,即AOF文件大小是上次rewrite后大小的一倍且文件大于64M时触发(默认),当前并没有BGREWRITEAOF子线程正在运行。然后调用rewriteAppendOnlyFileBackground(),该函数会fork()出一个子进程,用于重写文件。

(3)rewriteAppendOnlyFile()函数会遍历16个数据库,将对象写入tmpfile中。

(4)当子进程完成AOF重写,它会向父进程发送一个信号,父进程在接收到该信号后,会调用backgroundRewriteDoneHandler()函数

2.流程分析

如果客户端有命令执行,然后判断是否开启了AOF标识,若开启,则将命令放入aof_buf_blocks中,继续判断是否有子进程在运行,若有,则说明正在进行reaof,就将命令放入aof_rewrite_buf_blocks中。

服务器有文件事件和时间事件,而时间事件是通过ServerCron()函数执行的。该函数会一直查看是否有reaof或需要刷新事件。

若有reaof事件,用户调用 BGREWRITEAOF,Redis 调用rewriteAppendOnlyFileBackground()函数,它执行 fork() ,然后子进程在临时文件中对 AOF 文件进行重写,完成后子进程结束,通知父进程。

父进程会捕捉子进程的退出信号,如果子进程的退出状态是 OK ,那么父进程将新输入命令的缓存追加到临时文件,然后使用 rename(2) 对临时文件改名,用它代替旧的 AOF 文件,

到现在,后台 AOF 重写已经全部完成了。



本人才疏学浅,若有错,请指出,谢谢!
如果你有更好的建议,可以留言我们一起讨论,共同进步!
衷心的感谢您能耐心的读完本篇博文!

参考链接
1.Redis持久化之AOF
2.《Redis源码学习札记》AOF

最新发布

CentOS专题

关于本站

5ibc.net旗下博客站精品博文小部分原创、大部分从互联网收集整理。尊重作者版权、传播精品博文,让更多编程爱好者知晓!

小提示

按 Ctrl+D 键,
把本文加入收藏夹