Skip to content

一个高性能的批处理AOF

AOF简介

什么是AOF?

AOF(Append Only File)是Redis的持久化机制之一,它通过记录所有的写命令来保证数据的持久性。与RDB快照相比,AOF能提供更好的数据安全性,因为它几乎可以做到不丢失数据

为什么需要AOF?

Redis作为一个内存数据库,所有数据都存在内存中。这带来了极致的性能,但同时也面临着数据易失性的问题:

  • 服务器断电会导致数据丢失
  • 程序崩溃会导致内存数据消失
  • 系统重启需要重新加载数据

AOF通过持久化写命令来解决这些问题,它将每个写命令追加到文件末尾,在Redis重启时重放这些命令来恢复数据。

为什么需要高性能的AOF?

传统的AOF实现面临几个关键挑战:

  1. 写入性能:频繁的磁盘I/O可能成为性能瓶颈
  2. 数据安全性:需要在性能和数据安全性之间做权衡
  3. 资源占用:大量小写入会导致系统调用过多

本文提出的批处理AOF方案通过以下特性解决这些问题:

  • 智能批处理:动态调整批次大小,平衡延迟和吞吐量
  • 低延迟优先:基于时间的批处理策略,确保数据及时持久化
  • 资源优化:使用内存池和NIO,减少系统开销
  • 可靠性保证:完整的错误处理和原子性文件操作

这是笔者整个项目中最不自卑,最觉得能拿出手的一部分。

项目演进历史

早期版本的问题

其实在笔者的仓库当中存在一个更简陋的redis,这个已经是重构一遍的版本了(是黑历史)。 当时用了一个很糟糕的双缓冲队列,后来我发现根本不是必要的,大致就是对大命令做了特殊优化。但是提升非常微小,大概是20%左右对比JDK官方的阻塞队列。 最后直接弃用了,因为实现思路太过稚嫩,方向完全是错误的。

迭代过程

这个版本经过了两次重要的迭代优化:

第一次迭代:引入批处理

第一次迭代的主要改进是引入了批处理机制:

  • 在命令处理器处理完命令后,会把承载命令的ByteBuf放入阻塞队列中
  • 采用生产者消费者模型
  • 设置了最小批次(16个命令)和最大批次(50个命令)的限制
  • 大命令进行特殊处理,直接写入
  • 实现了简单的背压机制,通过计算超时时间来控制

第二次迭代:性能瓶颈分析与优化

为什么需要迭代呢? 因为我在Benchmark的过程中发现,我的AOF的写入性能足够好,真正的瓶颈在网络IO和多路复用上。 所以完全是不用担心AOF的效率,那么这个时候背压机制就不是被需要的了。这是因为背压是为了上游命令生产的太多了,下游消费不过来对上游进行阻塞。 但很显然,我的性能瓶颈在命令处理以及网络IO来处理命令上。因此背压机制在一代后被淘汰了。

第一代实现中的问题:

  • 根据ByteBuf数量进行聚合可能导致不必要的延迟
  • 当实际命令量很少时,等待聚合会增加写入延迟
  • 阻塞队列大小的确定比较困难
  • 无界队列会带来潜在的内存问题

最终优化版本

因此在第二次迭代当中,我舍弃了这一切。 最后的实现模式是:

  • 保留阻塞队列的生产者消费者模型,但移除了背压机制
  • 保留最大命令上限,删除最小命令的下限
  • 改为以固定的延迟时间为导向,仿照Kafka的累积器模式
  • 实现智能的刷盘机制,替代Redis的EVERYSEC模式
  • 引入队列满时的降级处理:直接同步写入

这是一个更为成熟的设计,为什么? 因为对于redis来说,如果你想提供一个更为高效的模式,你不应该等待一定要到最小批次再去进行写入,是自断一臂的操作。

核心实现解析

命令处理流程

首先当respcommandhandler处理到了一个写命令时,会执行以下操作:

  1. 检查命令是否需要AOF持久化
  2. 创建临时ByteBuf用于编码命令
  3. 将命令写入AOF系统
  4. 释放临时资源
java
 if (needAof) {
            try {
                final ByteBuf tempBuf = PooledByteBufAllocator.DEFAULT.buffer();
                try {
                    respArray.encode(respArray, tempBuf);
                    final byte[] commandBytes = new byte[tempBuf.readableBytes()];
                    tempBuf.readBytes(commandBytes);
                    redisContext.writeAof(commandBytes);
                    log.debug("[AOF] 写命令已持久化: {}", commandType);
                } finally {
                    tempBuf.release();
                }
            } catch (Exception e) {
                log.error("[AOF] 持久化失败,命令: {}, 错误: {}", commandType, e.getMessage(), e);
            }
        }

用一块池化内存拿到这个具体的命令,然后写入到aof当中,经过多层接口会来到aofmanager

AOF写入流程

当命令到达AOF管理器时,会进行如下处理:

  1. 验证命令有效性
  2. 分配ByteBuf并写入数据
  3. 通过线程安全机制写入批处理器
  4. 确保资源正确释放
java
public void appendBytes(final byte[] commandBytes) throws IOException {
        // 1. 参数验证
        validateCommandBytes(commandBytes);
        
        // 2. 分配ByteBuf并写入数据
        final ByteBuf byteBuf = allocator.buffer(commandBytes.length);
        boolean committedToBatchWriter = false;

        try {
            byteBuf.writeBytes(commandBytes);
            
            // 3. 线程安全的写入操作
            writeLock.lock();
            try {
                batchWriter.write(byteBuf);
                committedToBatchWriter = true;
            } finally {
                writeLock.unlock();
            }
        } catch (final Exception e) {
            throw new IOException("Failed to write bytes to AOF file", e);
        } finally {
            // 4. 确保资源释放
            if (!committedToBatchWriter) {
                ReferenceCountUtil.release(byteBuf);
            }
        }
    }

把这部分内容写入到AOF的批处理器当中,接下来会被AOF的批处理器进行处理。

批处理器核心组件

AOF批处理器(AofBatchWriter)包含以下关键组件:

  1. 底层写入器:负责实际的文件操作
  2. 阻塞队列:用于存储待写入的命令
  3. 写入线程:负责从队列获取并处理命令
  4. 刷盘策略:控制数据持久化的时机和方式
java
/** 底层文件写入器 */
private final Writer writer;
/** 默认写入队列大小 */
private static final int DEFAULT_QUEUE_SIZE = 1000;
/** 命令写入队列 */
private final BlockingQueue<ByteBuf> writeQueue;
/** 写入处理线程 */
private final Thread writeThread;
/** 运行状态标志 */
private final AtomicBoolean running = new AtomicBoolean(true);
/** 批处理参数 */
public static final int MAX_BATCH_SIZE = 50;
/** 批处理超时时间(纳秒) - 低延迟优先策略 */
private static final long BATCH_TIMEOUT_NANOS = 5_000_000L; // 5毫秒
/** 大命令阈值(字节) */
private static final int LARGE_COMMAND_THRESHOLD = 512 * 1024;
/** AOF 刷盘策略 */
private final AofSyncPolicy syncPolicy;

命令写入策略

批处理器对命令的处理策略:

  1. 大命令处理:超过阈值的命令直接写入
  2. 小命令处理:进入队列等待批处理
  3. 队列满时的降级处理:直接同步写入
java
public void write(ByteBuf byteBuf) throws IOException {
        int byteSize = byteBuf.readableBytes();

        if (byteSize > LARGE_COMMAND_THRESHOLD) {
            writeLargeCommand(byteBuf, byteSize);
        } else {
            writeToQueue(byteBuf);
        }
    }

大命令处理实现:

  • 直接调用底层写入器
  • 根据刷盘策略决定是否立即刷盘
  • 确保资源释放
java
private void writeLargeCommand(ByteBuf byteBuf, int byteSize) throws IOException {
        log.debug("处理大命令,大小: {}KB", byteSize / 1024);

        try {
            ByteBuffer byteBuffer = byteBuf.nioBuffer();
            writer.write(byteBuffer);
            handleSyncPolicy();

            log.debug("大命令直接写入完成,大小: {}KB", byteSize / 1024);
        } catch (Exception e) {
            throw new IOException("大命令写入失败,大小: " + byteSize + " bytes", e);
        } finally {
            byteBuf.release();
        }
    }

刷盘策略处理:

  • ALWAYS:每次写入后立即刷盘
  • EVERYSEC:标记待刷盘,由定时器处理
  • NO:不主动刷盘
java
 private void handleSyncPolicy() throws IOException {
        if (syncPolicy == AofSyncPolicy.ALWAYS) {
            performFlush();
        } else if (syncPolicy == AofSyncPolicy.EVERYSEC) {
            hasPendingFlush.set(true);
        }
    }

小命令处理流程:

  1. 尝试放入队列
  2. 队列满时降级为同步写入
  3. 异常处理和资源释放
java
private void writeToQueue(ByteBuf byteBuf) throws IOException {
        try {
            boolean success = writeQueue.offer(byteBuf);
            if (!success) {
                handleQueueFull(byteBuf);
            }
        } catch (Exception e) {
            throw new IOException("写入AOF失败", e);
        }
    }

private void handleQueueFull(ByteBuf byteBuf) throws IOException {
        log.warn("AOF队列满,直接同步写入 - 队列大小: {}", writeQueue.size());

        try {
            ByteBuffer byteBuffer = byteBuf.nioBuffer();
            writer.write(byteBuffer);
            handleSyncPolicy();
        } catch (Exception e) {
            throw e;
        } finally {
            byteBuf.release();
        }
    }

批处理主循环实现

写入线程的初始化和启动:

  • 创建守护线程
  • 设置线程名称
  • 启动处理循环
java
// 启动写入线程
        this.writeThread = new Thread(this::processWriteQueue);
        this.writeThread.setName("AOF-Writer-Thread");
        this.writeThread.setDaemon(true);
        this.writeThread.start();

主循环实现:

  1. 维护命令批次数组
  2. 循环收集和处理命令
  3. 异常处理
  4. 资源清理
java
    public void processWriteQueue() {
        ByteBuf[] batch = new ByteBuf[MAX_BATCH_SIZE];

        while (running.get() || !writeQueue.isEmpty()) {
            int batchSize = 0;

            try {
                // 收集批次数据
                batchSize = collectBatch(batch);
                if (batchSize > 0) {
                    // 批量写入磁盘
                    writeBatch(batch, batchSize);
                }
            } catch (Exception e) {
                log.error("AOF 批处理过程中发生异常", e);
            } finally {
                // 清理当前批次的资源
                cleanupBatch(batch, batchSize);
            }
        }

        log.info("AOF 批处理主循环已退出");
    }

批次收集机制

批次收集过程的三个主要步骤:

  1. 等待第一个命令(阻塞操作)
  2. 将第一个命令加入批次
  3. 在超时时间内收集更多命令
java
private int collectBatch(ByteBuf[] batch) {
        // 第一步:等待第一个元素(阻塞等待)
        ByteBuf firstItem = waitForFirstItem();
        if (firstItem == null) {
            return 0; // 被中断或停止运行
        }

        // 第二步:将第一个元素加入批次
        batch[0] = firstItem;
        int batchSize = 1;

        // 第三步:在超时时间内收集更多元素
        long batchStartTime = System.nanoTime();
        batchSize += collectAdditionalItems(batch, batchSize, batchStartTime);

        return batchSize;
    }

等待第一个命令的实现:

  • 使用阻塞方式等待
  • 处理中断情况
  • 返回null表示需要退出
java
private ByteBuf waitForFirstItem() {
        try {
            return writeQueue.take();
        } catch (InterruptedException e) {
            if (!running.get()) {
                return null;
            }
            Thread.currentThread().interrupt();
            return null;
        }
}

收集额外命令的实现:

  1. 检查批次大小限制
  2. 计算剩余超时时间
  3. 尝试获取更多命令
  4. 超时或队列空时返回
java
 private int collectAdditionalItems(ByteBuf[] batch, int currentSize, long batchStartTime) {
        int additionalCount = 0;

        while (currentSize + additionalCount < MAX_BATCH_SIZE) {
            long elapsed = System.nanoTime() - batchStartTime;
            if (elapsed >= BATCH_TIMEOUT_NANOS) {
                break; // 超时,立即处理当前批次
            }

            // 计算剩余超时时间
            long remainingTimeout = BATCH_TIMEOUT_NANOS - elapsed;

            ByteBuf item = pollWithTimeout(remainingTimeout);
            if (item == null) {
                break; // 超时或队列为空,处理当前批次
            }

            batch[currentSize + additionalCount] = item;
            additionalCount++;
        }

        return additionalCount;
    }

超时等待实现:

  • 使用带超时的poll操作
  • 处理中断异常
  • 返回null表示获取失败
java
private ByteBuf pollWithTimeout(long timeoutNanos) {
        try {
            return writeQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            if (!running.get()) {
                return null;
            }
            Thread.currentThread().interrupt();
            return null;
        }
    }

批量写入实现

批量写入的主要步骤:

  1. 计算总字节数
  2. 分配合适大小的缓冲区
  3. 将所有命令写入缓冲区
  4. 执行实际的文件写入
  5. 根据刷盘策略处理
java
private void writeBatch(ByteBuf[] batch, int batchSize) {
        if (batchSize <= 0) return;

        try {
            int totalBytes = 0;
            for (int i = 0; i < batchSize; i++) {
                totalBytes += batch[i].readableBytes();
            }
            ByteBuffer buffer = ByteBuffer.allocate(totalBytes);
            for (int i = 0; i < batchSize; i++) {
                buffer.put(batch[i].nioBuffer());
            }
            buffer.flip();
            writer.write(buffer);
            batchCount.incrementAndGet();
            totalBatchedCommands.addAndGet(batchSize);

            // 根据 AOF 刷盘策略决定是否立即刷盘或标记待刷盘
            if (syncPolicy == AofSyncPolicy.ALWAYS) {
                performFlush(); // ALWAYS 模式直接刷盘
            } else if (syncPolicy == AofSyncPolicy.EVERYSEC) { // EVERYSEC模式:标记有待刷盘数据
                hasPendingFlush.set(true);
            }
            // NO 模式下不主动刷盘
        } catch (Exception e) {
            log.error("Failed to write batch to AOF file", e);
            throw new RuntimeException("批次写入失败", e);
        }
    }

然后稍微来说一下我们的底层写入器AofWriter 里面比较关键的组件是,FileChannel和RandomAccessFile,文件实际大小(用于预分配空间),以及一些重写组建,我们后面再说。 这里来稍微说一下为什么采用这个只在开头预分配的机制,有几个原因。 首先是避免运行时的抖动,可能在高峰期会有大量的写入请求,如果每次都要去扩展文件大小,会反而导致性能下降。 其次是因为AOF文件的大小通常是可预测的,尤其是在高负载的情况下。 因为他是可调整的参数,所以可以根据实际情况来调整,使用的时候可以比较灵活来确定,因为AOF的日志大小大致是可控的。

底层写入器实现

关键组件说明

AofWriter是整个AOF系统的底层实现,负责直接与文件系统交互。其核心组件包括:

  1. FileChannel:提供高效的文件写入能力
  2. RandomAccessFile:支持文件随机访问操作
  3. 文件大小追踪:用于空间预分配和管理
  4. 重写相关组件:支持AOF重写功能
java
/** AOF 文件对象 */
    private File file;
    /** 文件通道,用于写入操作 */
    private FileChannel channel;
    /** 随机访问文件,用于文件操作 */
    private RandomAccessFile raf;
    /** 是否启用预分配空间 */
    private boolean isPreallocated;
    /** 文件实际大小(不包含预分配空间) */
    private AtomicLong realSize;
    /** 重写状态标志 */
    private final AtomicBoolean rewriting = new AtomicBoolean(false);
    /** 默认重写缓冲区大小 */
    public static final int DEFAULT_REWRITE_BUFFER_SIZE = 100000;
    /** 重写缓冲区队列 */
    BlockingQueue<ByteBuf> rewriteBufferQueue;
    /** Redis 核心接口 */
    private final RedisCore redisCore;
    /** 默认预分配空间大小(4MB) */
    private static final int DEFAULT_PREALLOCATE_SIZE = 4 * 1024 * 1024;

预分配机制设计

预分配机制的设计考虑了以下几个关键因素:

  1. 性能优化:避免频繁的文件大小调整导致的性能抖动
  2. 可预测性:利用AOF文件大小的可预测特性
  3. 灵活配置:支持根据实际需求调整预分配参数
  4. 空间效率:未使用的预分配空间不会占用实际磁盘空间

初始化过程:

  1. 获取文件当前大小
  2. 执行空间预分配
  3. 将文件指针移动到实际数据末尾
java
this.realSize = new AtomicLong(channel.size());

            if(isPreallocated){
                preAllocated(DEFAULT_PREALLOCATE_SIZE);
            }

            this.channel.position(this.realSize.get());


private void preAllocated(int defaultPreallocateSize) throws IOException {
        long currentSize = 0;
        try{
            currentSize = this.channel.size();
        }catch(IOException e){
            log.error("获取文件长度时发生错误",e);
        }
        long newSize = currentSize + defaultPreallocateSize;
        if(this.raf != null){
            this.raf.setLength(newSize);
        }
        else if(this.channel != null){
            this.channel.truncate(newSize);
        }

        this.channel.position(currentSize);
        this.realSize.set(currentSize);
    }

写入操作实现

写入操作的完整流程:

  1. 验证写入器状态
  2. 执行文件写入
  3. 更新文件实际大小
  4. 处理重写相关逻辑
java
 @Override
    public int write(ByteBuffer buffer) throws IOException {
        // 检查是否已关闭
        if (channel == null || !channel.isOpen()) {
            throw new IOException("AOF Writer 已关闭,无法执行写入操作");
        }

        // 1. 写入到文件
        int written = writtenFullyTo(channel, buffer);
        realSize.addAndGet(written);

        // 2. 如果正在重写,复制数据到重写缓冲区
        if (isRewriting()) {
            copyToRewriteBuffer(buffer);
        }

        return written;
    }

完整写入实现:

  1. 保存原始位置信息
  2. 循环写入直到完成
  3. 恢复缓冲区状态
  4. 错误处理
java
private int writtenFullyTo(FileChannel channel, ByteBuffer buffer) {
        int originalPosition = buffer.position();
        int originalLimit = buffer.limit();
        int totalBytes = buffer.remaining();

        try {
            int written = 0;
            while (written < totalBytes) {
                written += channel.write(buffer);
            }
            return written;
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            buffer.position(originalPosition);
            buffer.limit(originalLimit);
        }
    }

重写缓冲区实现

重写过程中的数据处理:

  1. 从内存池分配新的ByteBuf
  2. 复制当前写入的数据
  3. 尝试放入重写队列
  4. 确保资源正确释放
java
 private void copyToRewriteBuffer(ByteBuffer buffer) {
        if (rewriteBufferQueue == null) {
            return;
        }

        ByteBuf bufferCopy = null;
        try {
            // 1. 从池中分配ByteBuf
            bufferCopy = allocator.buffer(buffer.remaining());

            // 2. 复制数据到ByteBuf(保持原buffer位置不变)
            int originalPosition = buffer.position();
            bufferCopy.writeBytes(buffer.duplicate());
            buffer.position(originalPosition);

            // 3. 尝试添加到队列
            if (tryOfferToRewriteQueue(bufferCopy)) {
                bufferCopy = null; // 成功添加,转移所有权,不要释放
            }

        } finally {
            // 4.  使用ReferenceCountUtil安全释放
            ReferenceCountUtil.safeRelease(bufferCopy);
        }
    }

重写队列操作实现:

  • 使用超时机制避免阻塞
  • 处理队列满的情况
  • 完整的错误处理
java
    private boolean tryOfferToRewriteQueue(ByteBuf buffer) {
        try {
            if (rewriteBufferQueue.offer(buffer, 100, TimeUnit.MILLISECONDS)) {
                return true; // 成功添加
            } else {
                log.warn("重写AOF文件的缓冲区已满,丢弃数据");
                return false;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("添加到重写缓冲区时被中断", e);
            return false;
        } catch (Exception e) {
            log.error("添加到重写缓冲区时发生错误", e);
            return false;
        }
    }

AOF重写实现

重写任务流程

重写功能的入口实现:

  1. 验证重写状态
  2. 检查系统状态
  3. 启动重写线程
  4. 返回执行结果
java
 @Override
    public boolean bgrewrite() throws IOException {
        if (rewriting.get()) {
            log.warn("正在进行AOF重写,无法再次执行");
            return false;
        }

        // 检查RedisCore是否可用
        if (redisCore == null) {
            log.warn("RedisCore未设置,无法执行AOF重写");
            return false;
        }

        rewriting.set(true);
        Thread rewriteThread = new Thread(this::rewriteTask);
        rewriteThread.start();
        return true;
    }

重写任务的完整实现:

  1. 准备重写资源
  2. 执行重写操作
  3. 完成重写过程
  4. 清理临时资源
java
private void rewriteTask() {
        File rewriteFile = null;
        RandomAccessFile rewriteRaf = null;
        FileChannel rewriteChannel = null;

        try {
            log.info("开始重写aof");

            // 1. 准备重写资源
            RewriteResources resources = prepareRewriteResources();
            rewriteFile = resources.file;
            rewriteRaf = resources.raf;
            rewriteChannel = resources.channel;

            // 2. 执行重写操作
            performRewrite(rewriteChannel);

            // 3. 完成重写
            finishRewrite(rewriteChannel, rewriteRaf, rewriteFile);
            rewriteChannel = null;
            rewriteRaf = null;

            log.info("重写AOF文件完成");

        } catch (IOException e) {
            log.error("重写AOF文件时发生错误", e);
            cleanupTempFile(rewriteFile);
        } finally {
            cleanupResources(rewriteChannel, rewriteRaf);
        }
    }

重写资源准备

重写资源准备过程:

  1. 创建临时文件
  2. 初始化文件访问对象
  3. 设置文件位置
java
    private RewriteResources prepareRewriteResources() throws IOException {
        File rewriteFile = File.createTempFile("redis_aof_temp", ".aof", file.getParentFile());
        RandomAccessFile rewriteRaf = new RandomAccessFile(rewriteFile, "rw");
        FileChannel rewriteChannel = rewriteRaf.getChannel();
        rewriteChannel.position(0);

        return new RewriteResources(rewriteFile, rewriteRaf, rewriteChannel);
    }

重写操作执行

重写操作的两个主要步骤:

  1. 重写数据库内容
  2. 应用重写缓冲区数据
java
private void performRewrite(FileChannel rewriteChannel) throws IOException {
        // 重写数据库内容
        rewriteDatabases(rewriteChannel);

        // 应用重写缓冲区
        log.info("开始缓冲区的重写");
        applyRewriteBuffer(rewriteChannel);
    }

    private void rewriteDatabases(FileChannel rewriteChannel) throws IOException {
        final RedisDB[] dataBases = redisCore.getDataBases();
        for (int i = 0; i < dataBases.length; i++) {
            final RedisDB db = dataBases[i];
            if (db.size() > 0) {
                log.info("正在重写数据库{}", i);
                writeSelectCommand(i, rewriteChannel);
                writeDatabaseToAof(db, rewriteChannel);
            }
        }
    }

    private void applyRewriteBuffer(final FileChannel rewriteChannel) {
        int appliedCommands = 0;
        int totalBytes = 0;

        try {
            final int batchSize = 1000;
            final List<ByteBuf> buffers = new ArrayList<>(batchSize);

            while (rewriteBufferQueue.drainTo(buffers, batchSize) > 0) {
                for (final ByteBuf buffer : buffers) {
                    try {
                        ByteBuffer nioBuffer = buffer.nioBuffer();
                        final int written = writtenFullyTo(rewriteChannel, nioBuffer);
                        totalBytes += written;
                        appliedCommands++;
                    } finally {
                        //  使用ReferenceCountUtil安全释放
                        ReferenceCountUtil.safeRelease(buffer);
                    }
                }
                buffers.clear();
            }
            log.info("重写AOF文件的缓冲区已应用,应用了{}条命令,总字节数: {}",
                    appliedCommands, totalBytes);
        } catch (Exception e) {
            log.error("重写AOF文件的缓冲区应用时发生错误", e);
        }
    }

重写完成处理

重写完成的处理步骤:

  1. 强制刷盘确保数据持久化
  2. 关闭重写资源
  3. 替换原有AOF文件
java
    private void finishRewrite(FileChannel rewriteChannel, RandomAccessFile rewriteRaf, File rewriteFile) throws IOException {
        rewriteChannel.force(true);
        closeRewriteResources(rewriteChannel, rewriteRaf);
        replaceAofFile(rewriteFile);
    }

文件替换的安全实现:

  1. 保存当前文件状态
  2. 关闭现有资源
  3. 执行文件替换
  4. 重新打开文件
java
private void replaceAofFile(final File rewriteFile) {
        RandomAccessFile oldRaf = null;
        FileChannel oldChannel = null;

        try {
            // 1. 保存当前资源引用并关闭
            oldRaf = this.raf;
            oldChannel = this.channel;
            this.raf = null;
            this.channel = null;
            closeFileResources(oldChannel, oldRaf);

            // 2. 执行文件替换操作
            performFileReplacement(rewriteFile);

            // 3. 重新打开文件
            reopenFile();
            log.info("文件重新打开完成,当前位置: {}", this.channel.position());

        } catch (IOException e) {
            log.error("替换AOF文件时发生错误", e);
            handleReopenFailure();
        }
    }

文件替换的安全机制

文件替换过程的安全保障:

  1. 创建备份文件
  2. 执行安全的文件替换
  3. 成功后删除备份
  4. 失败时进行回滚
java
    private void performFileReplacement(File rewriteFile) throws IOException {
        File backupFile = null;
        try {
            // 创建备份
            backupFile = FileUtils.createBackupFile(file, ".bak");
            if (backupFile != null) {
                log.info("创建备份文件{}", backupFile.getAbsolutePath());
            }

            // 替换文件
            FileUtils.safeRenameFile(rewriteFile, file);
            log.info("重写AOF文件完成,替换原文件");

            // 删除备份
            deleteBackupFile(backupFile);

        } catch (Exception e) {
            log.error("重命名文件时发生错误", e);
            restoreFromBackup(backupFile);
            throw e;
        }
    }

AOF加载器实现

文件加载流程

AOF加载器的主要职责:

  1. 解析AOF文件内容
  2. 重放Redis命令
  3. 恢复数据状态
  4. 处理加载异常

文件打开实现:

  1. 验证文件状态
  2. 初始化文件访问对象
  3. 错误处理和资源清理
java
public void openFile() throws IOException {
        final File file = new File(fileName);
        if (!file.exists() || file.length() == 0) {
            log.info("AOF文件不存在或为空: {}", fileName);
            return;
        }

        try {
            this.raf = new RandomAccessFile(file, "r");
            this.channel = raf.getChannel();
            log.info("AOF文件打开成功: {}, 文件大小: {} bytes", fileName, channel.size());
        } catch (IOException e) {
            // 如果打开失败,确保资源被清理
            closeFile();
            throw new IOException("打开AOF文件失败: " + fileName, e);
        }
    }

加载过程实现

完整的加载过程:

  1. 读取文件内容
  2. 解析RESP命令
  3. 执行命令恢复
  4. 清理资源
java
public void load() {
        if (channel == null) {
            log.info("AOF文件通道未初始化,跳过加载操作");
            return;
        }

        ByteBuf commands = null;
        try {
            log.info("开始加载AOF文件: {}", fileName);
            
            // 1. 读取AOF文件的完整内容
            commands = readFileContent();

            // 2. 逐个处理RESP命令
            int successCount = processCommands(commands);

            log.info("AOF文件加载完成,成功执行 {} 条命令", successCount);
        } catch (Exception e) {
            log.error("AOF文件加载失败: {}", fileName, e);
            throw new RuntimeException("AOF加载过程中发生错误", e);
        } finally {
            if(commands !=null && commands.refCnt() >0){
                commands.release();
            }
            closeFile();
        }
    }

文件读取策略

文件读取的两种策略:

  1. 小文件直接读取
  2. 大文件分块处理

小文件读取实现:

java
 private ByteBuf readFileContent() throws IOException{
        long fileSize = channel.size();
        if(fileSize == 0){
            return Unpooled.EMPTY_BUFFER;
        }

        if(fileSize > Integer.MAX_VALUE){
            return readLargeFile();
        }
        int size = (int) fileSize;
        ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
        try{
            ByteBuffer byteBuffer = buffer.nioBuffer(0, size);
            int totalRead = 0;
            while(totalRead < size){
                int read = channel.read(byteBuffer);
                if(read == -1){
                    break;
                }
                totalRead += read;
            }
            buffer.writerIndex(totalRead);
            return buffer;
        }catch(IOException e){
            buffer.release();
            throw e;
        }
    }

大文件读取实现:

java
    private ByteBuf readLargeFile() {
        CompositeByteBuf composite = PooledByteBufAllocator.DEFAULT.compositeBuffer();
        ByteBuf currentBuf = null;
        try{
            long reamining = channel.size();
            while(reamining > 0){
                int chunkSize = Math.min(BUFFER_SIZE, (int) reamining);
                currentBuf = PooledByteBufAllocator.DEFAULT.directBuffer(chunkSize);
                ByteBuffer byteBuffer = currentBuf.nioBuffer(0, chunkSize);
                int read = channel.read(byteBuffer);
                if(read == -1){
                    currentBuf.release();
                    break;
                }
                currentBuf.writerIndex(read);
                composite.addComponent(true,currentBuf);
                reamining -= read;
                currentBuf = null;
            }
            return composite;
        }catch(IOException e){
            if(currentBuf != null){
                currentBuf.release();
            }
            composite.release();
            throw new RuntimeException(e);
        }
    }

命令处理实现

命令处理的主要步骤:

  1. 解析RESP格式命令
  2. 验证命令有效性
  3. 执行命令恢复
  4. 处理执行异常
java
    private int processCommands(ByteBuf commands) {
        int succesCount = 0;
        while(commands.isReadable()){
            int position = commands.readerIndex();
            commands.markReaderIndex();
            try{
                Resp command = Resp.decode(commands);
                if(executeCommand(command,position)){
                    succesCount++;
                }
            }catch(Exception e){
                handleCommandError(commands, position,e);
            }
        }
        return succesCount;
    }

命令执行实现:

  1. 验证命令格式
  2. 解析命令参数
  3. 通过RedisCore执行
  4. 处理执行结果
java
    private boolean executeCommand(Resp respArray,int position) {
        if(!(respArray instanceof RespArray)){
            log.warn("命令不是RespArray类型,在{}",position);
            return false;
        }
        RespArray command = (RespArray) respArray;
        if(!isValiedCommand(command)){
            log.warn("命令无效,在{}",position);
            return false;
        }
        return executeRedisCommand(command,position);
    }
java
    private boolean executeRedisCommand(final RespArray command, final int position) {
        try {
            // 1. 解析命令名称
            final String commandName = ((BulkString) command.getContent()[0])
                    .getContent().getString().toUpperCase();

            // 2. 解析命令参数
            final Resp[] content = command.getContent();
            final String[] args = new String[content.length - 1];
            for (int i = 1; i < content.length; i++) {
                if (content[i] instanceof BulkString) {
                    args[i - 1] = ((BulkString) content[i]).getContent().getString();
                } else {
                    log.warn("命令参数格式错误,位置: {}", position);
                    return false;
                }
            }

            // 3. 通过RedisCore接口执行命令
            final boolean success = redisCore.executeCommand(commandName, args);

            if (!success) {
                log.warn("命令执行失败: {} 在位置: {}", commandName, position);
            }

            return success;
        } catch (Exception e) {
            log.error("命令执行异常,位置: {}", position, e);
            return false;
        }
    }

错误处理机制

错误处理的完整流程:

  1. 定位错误位置
  2. 跳过错误数据
  3. 寻找下一个有效命令
  4. 记录错误信息
java
private void handleCommandError(ByteBuf commands, int position, Exception e) {
        if (!commands.isReadable()) {
            return;
        }

        log.warn("命令解析错误,位置: {}, 错误: {}", position, e.getMessage());

        int startPosition = commands.readerIndex();
        boolean foundNextCommand = false;

        if (commands.isReadable()) {
            commands.readByte(); // 跳过当前错误字节
        }

        while(commands.isReadable()) {
            int currentPos = commands.readerIndex();
            byte b = commands.readByte();
            if(isRespPrefix(b)) {
                commands.readerIndex(currentPos);
                foundNextCommand = true;
                break;
            }
        }

        if (!foundNextCommand) {
            log.warn("未找到有效的下一个命令,跳过剩余数据");
            commands.readerIndex(commands.writerIndex());
        }

        int skippedBytes = commands.readerIndex() - startPosition;
        if (skippedBytes > 0) {
            log.debug("跳过了 {} 字节的无效数据", skippedBytes);
        }
    }

系统总结

核心特性

  1. 高效的批处理机制

    • 基于时间窗口的动态批次控制
    • 智能的批次大小调整(0-50条命令)
    • 大命令直接写入优化(>512KB)
  2. 智能的持久化策略

    • 多级刷盘机制,支持不同安全级别
    • 预分配空间管理(4MB块),减少文件系统开销
    • 原子性文件替换,确保AOF文件完整性
  3. 可靠的容错机制

    • 完整的错误处理链
    • 基于备份的安全文件替换
    • 数据一致性保护机制

性能优化要点

  1. 写入性能优化

    • 使用FileChannel进行零拷贝IO
    • 批量写入合并小I/O操作
    • 池化ByteBuf管理,降低GC压力
    • 预分配空间减少文件系统开销
  2. 延迟控制优化

    • 5ms固定延迟窗口限制
    • 取消最小批次约束
    • 大命令快速通道处理
    • 队列满时同步写入降级
  3. 资源管理优化

    • 严格的ByteBuf引用计数管理
    • 智能的内存池化策略
    • 动态的空间预分配机制

设计亮点

  1. 高效的批处理策略

    • 结合固定延迟窗口(5ms)和最大批次限制(50条)
    • 在保证低延迟的同时最大化批处理效率
    • 动态适应不同的写入负载场景
  2. 可靠的文件操作

    • 引入备份机制的原子性文件替换
    • 基于FileChannel的高效IO操作
    • 完整的异常恢复机制
  3. 智能的资源管理

    • 基于池化的内存管理降低GC压力
    • 预分配空间机制减少文件系统开销
    • 智能的刷盘策略平衡性能和可靠性

这个实现展示了如何通过深入理解业务需求系统瓶颈,来设计一个既高效又可靠的持久化系统。通过将批处理内存池化智能刷盘等技术有机结合,最终实现了一个在性能可靠性可维护性三个维度都达到较高水平的解决方案。