Appearance
一个高性能的批处理AOF
AOF简介
什么是AOF?
AOF(Append Only File)是Redis的持久化机制之一,它通过记录所有的写命令来保证数据的持久性。与RDB快照相比,AOF能提供更好的数据安全性,因为它几乎可以做到不丢失数据。
为什么需要AOF?
Redis作为一个内存数据库,所有数据都存在内存中。这带来了极致的性能,但同时也面临着数据易失性的问题:
- 服务器断电会导致数据丢失
- 程序崩溃会导致内存数据消失
- 系统重启需要重新加载数据
AOF通过持久化写命令来解决这些问题,它将每个写命令追加到文件末尾,在Redis重启时重放这些命令来恢复数据。
为什么需要高性能的AOF?
传统的AOF实现面临几个关键挑战:
- 写入性能:频繁的磁盘I/O可能成为性能瓶颈
- 数据安全性:需要在性能和数据安全性之间做权衡
- 资源占用:大量小写入会导致系统调用过多
本文提出的批处理AOF方案通过以下特性解决这些问题:
- 智能批处理:动态调整批次大小,平衡延迟和吞吐量
- 低延迟优先:基于时间的批处理策略,确保数据及时持久化
- 资源优化:使用内存池和NIO,减少系统开销
- 可靠性保证:完整的错误处理和原子性文件操作
这是笔者整个项目中最不自卑,最觉得能拿出手的一部分。
项目演进历史
早期版本的问题
其实在笔者的仓库当中存在一个更简陋的redis,这个已经是重构一遍的版本了(是黑历史)。 当时用了一个很糟糕的双缓冲队列,后来我发现根本不是必要的,大致就是对大命令做了特殊优化。但是提升非常微小,大概是20%左右对比JDK官方的阻塞队列。 最后直接弃用了,因为实现思路太过稚嫩,方向完全是错误的。
迭代过程
这个版本经过了两次重要的迭代优化:
第一次迭代:引入批处理
第一次迭代的主要改进是引入了批处理机制:
- 在命令处理器处理完命令后,会把承载命令的
ByteBuf放入阻塞队列中 - 采用生产者消费者模型
- 设置了最小批次(16个命令)和最大批次(50个命令)的限制
- 对大命令进行特殊处理,直接写入
- 实现了简单的背压机制,通过计算超时时间来控制
第二次迭代:性能瓶颈分析与优化
为什么需要迭代呢? 因为我在Benchmark的过程中发现,我的AOF的写入性能足够好,真正的瓶颈在网络IO和多路复用上。 所以完全是不用担心AOF的效率,那么这个时候背压机制就不是被需要的了。这是因为背压是为了上游命令生产的太多了,下游消费不过来对上游进行阻塞。 但很显然,我的性能瓶颈在命令处理以及网络IO来处理命令上。因此背压机制在一代后被淘汰了。
第一代实现中的问题:
- 根据
ByteBuf数量进行聚合可能导致不必要的延迟 - 当实际命令量很少时,等待聚合会增加写入延迟
- 阻塞队列大小的确定比较困难
- 无界队列会带来潜在的内存问题
最终优化版本
因此在第二次迭代当中,我舍弃了这一切。 最后的实现模式是:
- 保留阻塞队列的生产者消费者模型,但移除了背压机制
- 保留最大命令上限,删除最小命令的下限
- 改为以固定的延迟时间为导向,仿照Kafka的累积器模式
- 实现智能的刷盘机制,替代Redis的EVERYSEC模式
- 引入队列满时的降级处理:直接同步写入
这是一个更为成熟的设计,为什么? 因为对于redis来说,如果你想提供一个更为高效的模式,你不应该等待一定要到最小批次再去进行写入,是自断一臂的操作。
核心实现解析
命令处理流程
首先当respcommandhandler处理到了一个写命令时,会执行以下操作:
- 检查命令是否需要AOF持久化
- 创建临时ByteBuf用于编码命令
- 将命令写入AOF系统
- 释放临时资源
java
if (needAof) {
try {
final ByteBuf tempBuf = PooledByteBufAllocator.DEFAULT.buffer();
try {
respArray.encode(respArray, tempBuf);
final byte[] commandBytes = new byte[tempBuf.readableBytes()];
tempBuf.readBytes(commandBytes);
redisContext.writeAof(commandBytes);
log.debug("[AOF] 写命令已持久化: {}", commandType);
} finally {
tempBuf.release();
}
} catch (Exception e) {
log.error("[AOF] 持久化失败,命令: {}, 错误: {}", commandType, e.getMessage(), e);
}
}用一块池化内存拿到这个具体的命令,然后写入到aof当中,经过多层接口会来到aofmanager
AOF写入流程
当命令到达AOF管理器时,会进行如下处理:
- 验证命令有效性
- 分配ByteBuf并写入数据
- 通过线程安全机制写入批处理器
- 确保资源正确释放
java
public void appendBytes(final byte[] commandBytes) throws IOException {
// 1. 参数验证
validateCommandBytes(commandBytes);
// 2. 分配ByteBuf并写入数据
final ByteBuf byteBuf = allocator.buffer(commandBytes.length);
boolean committedToBatchWriter = false;
try {
byteBuf.writeBytes(commandBytes);
// 3. 线程安全的写入操作
writeLock.lock();
try {
batchWriter.write(byteBuf);
committedToBatchWriter = true;
} finally {
writeLock.unlock();
}
} catch (final Exception e) {
throw new IOException("Failed to write bytes to AOF file", e);
} finally {
// 4. 确保资源释放
if (!committedToBatchWriter) {
ReferenceCountUtil.release(byteBuf);
}
}
}把这部分内容写入到AOF的批处理器当中,接下来会被AOF的批处理器进行处理。
批处理器核心组件
AOF批处理器(AofBatchWriter)包含以下关键组件:
- 底层写入器:负责实际的文件操作
- 阻塞队列:用于存储待写入的命令
- 写入线程:负责从队列获取并处理命令
- 刷盘策略:控制数据持久化的时机和方式
java
/** 底层文件写入器 */
private final Writer writer;
/** 默认写入队列大小 */
private static final int DEFAULT_QUEUE_SIZE = 1000;
/** 命令写入队列 */
private final BlockingQueue<ByteBuf> writeQueue;
/** 写入处理线程 */
private final Thread writeThread;
/** 运行状态标志 */
private final AtomicBoolean running = new AtomicBoolean(true);
/** 批处理参数 */
public static final int MAX_BATCH_SIZE = 50;
/** 批处理超时时间(纳秒) - 低延迟优先策略 */
private static final long BATCH_TIMEOUT_NANOS = 5_000_000L; // 5毫秒
/** 大命令阈值(字节) */
private static final int LARGE_COMMAND_THRESHOLD = 512 * 1024;
/** AOF 刷盘策略 */
private final AofSyncPolicy syncPolicy;命令写入策略
批处理器对命令的处理策略:
- 大命令处理:超过阈值的命令直接写入
- 小命令处理:进入队列等待批处理
- 队列满时的降级处理:直接同步写入
java
public void write(ByteBuf byteBuf) throws IOException {
int byteSize = byteBuf.readableBytes();
if (byteSize > LARGE_COMMAND_THRESHOLD) {
writeLargeCommand(byteBuf, byteSize);
} else {
writeToQueue(byteBuf);
}
}大命令处理实现:
- 直接调用底层写入器
- 根据刷盘策略决定是否立即刷盘
- 确保资源释放
java
private void writeLargeCommand(ByteBuf byteBuf, int byteSize) throws IOException {
log.debug("处理大命令,大小: {}KB", byteSize / 1024);
try {
ByteBuffer byteBuffer = byteBuf.nioBuffer();
writer.write(byteBuffer);
handleSyncPolicy();
log.debug("大命令直接写入完成,大小: {}KB", byteSize / 1024);
} catch (Exception e) {
throw new IOException("大命令写入失败,大小: " + byteSize + " bytes", e);
} finally {
byteBuf.release();
}
}刷盘策略处理:
- ALWAYS:每次写入后立即刷盘
- EVERYSEC:标记待刷盘,由定时器处理
- NO:不主动刷盘
java
private void handleSyncPolicy() throws IOException {
if (syncPolicy == AofSyncPolicy.ALWAYS) {
performFlush();
} else if (syncPolicy == AofSyncPolicy.EVERYSEC) {
hasPendingFlush.set(true);
}
}小命令处理流程:
- 尝试放入队列
- 队列满时降级为同步写入
- 异常处理和资源释放
java
private void writeToQueue(ByteBuf byteBuf) throws IOException {
try {
boolean success = writeQueue.offer(byteBuf);
if (!success) {
handleQueueFull(byteBuf);
}
} catch (Exception e) {
throw new IOException("写入AOF失败", e);
}
}
private void handleQueueFull(ByteBuf byteBuf) throws IOException {
log.warn("AOF队列满,直接同步写入 - 队列大小: {}", writeQueue.size());
try {
ByteBuffer byteBuffer = byteBuf.nioBuffer();
writer.write(byteBuffer);
handleSyncPolicy();
} catch (Exception e) {
throw e;
} finally {
byteBuf.release();
}
}批处理主循环实现
写入线程的初始化和启动:
- 创建守护线程
- 设置线程名称
- 启动处理循环
java
// 启动写入线程
this.writeThread = new Thread(this::processWriteQueue);
this.writeThread.setName("AOF-Writer-Thread");
this.writeThread.setDaemon(true);
this.writeThread.start();主循环实现:
- 维护命令批次数组
- 循环收集和处理命令
- 异常处理
- 资源清理
java
public void processWriteQueue() {
ByteBuf[] batch = new ByteBuf[MAX_BATCH_SIZE];
while (running.get() || !writeQueue.isEmpty()) {
int batchSize = 0;
try {
// 收集批次数据
batchSize = collectBatch(batch);
if (batchSize > 0) {
// 批量写入磁盘
writeBatch(batch, batchSize);
}
} catch (Exception e) {
log.error("AOF 批处理过程中发生异常", e);
} finally {
// 清理当前批次的资源
cleanupBatch(batch, batchSize);
}
}
log.info("AOF 批处理主循环已退出");
}批次收集机制
批次收集过程的三个主要步骤:
- 等待第一个命令(阻塞操作)
- 将第一个命令加入批次
- 在超时时间内收集更多命令
java
private int collectBatch(ByteBuf[] batch) {
// 第一步:等待第一个元素(阻塞等待)
ByteBuf firstItem = waitForFirstItem();
if (firstItem == null) {
return 0; // 被中断或停止运行
}
// 第二步:将第一个元素加入批次
batch[0] = firstItem;
int batchSize = 1;
// 第三步:在超时时间内收集更多元素
long batchStartTime = System.nanoTime();
batchSize += collectAdditionalItems(batch, batchSize, batchStartTime);
return batchSize;
}等待第一个命令的实现:
- 使用阻塞方式等待
- 处理中断情况
- 返回null表示需要退出
java
private ByteBuf waitForFirstItem() {
try {
return writeQueue.take();
} catch (InterruptedException e) {
if (!running.get()) {
return null;
}
Thread.currentThread().interrupt();
return null;
}
}收集额外命令的实现:
- 检查批次大小限制
- 计算剩余超时时间
- 尝试获取更多命令
- 超时或队列空时返回
java
private int collectAdditionalItems(ByteBuf[] batch, int currentSize, long batchStartTime) {
int additionalCount = 0;
while (currentSize + additionalCount < MAX_BATCH_SIZE) {
long elapsed = System.nanoTime() - batchStartTime;
if (elapsed >= BATCH_TIMEOUT_NANOS) {
break; // 超时,立即处理当前批次
}
// 计算剩余超时时间
long remainingTimeout = BATCH_TIMEOUT_NANOS - elapsed;
ByteBuf item = pollWithTimeout(remainingTimeout);
if (item == null) {
break; // 超时或队列为空,处理当前批次
}
batch[currentSize + additionalCount] = item;
additionalCount++;
}
return additionalCount;
}超时等待实现:
- 使用带超时的poll操作
- 处理中断异常
- 返回null表示获取失败
java
private ByteBuf pollWithTimeout(long timeoutNanos) {
try {
return writeQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
if (!running.get()) {
return null;
}
Thread.currentThread().interrupt();
return null;
}
}批量写入实现
批量写入的主要步骤:
- 计算总字节数
- 分配合适大小的缓冲区
- 将所有命令写入缓冲区
- 执行实际的文件写入
- 根据刷盘策略处理
java
private void writeBatch(ByteBuf[] batch, int batchSize) {
if (batchSize <= 0) return;
try {
int totalBytes = 0;
for (int i = 0; i < batchSize; i++) {
totalBytes += batch[i].readableBytes();
}
ByteBuffer buffer = ByteBuffer.allocate(totalBytes);
for (int i = 0; i < batchSize; i++) {
buffer.put(batch[i].nioBuffer());
}
buffer.flip();
writer.write(buffer);
batchCount.incrementAndGet();
totalBatchedCommands.addAndGet(batchSize);
// 根据 AOF 刷盘策略决定是否立即刷盘或标记待刷盘
if (syncPolicy == AofSyncPolicy.ALWAYS) {
performFlush(); // ALWAYS 模式直接刷盘
} else if (syncPolicy == AofSyncPolicy.EVERYSEC) { // EVERYSEC模式:标记有待刷盘数据
hasPendingFlush.set(true);
}
// NO 模式下不主动刷盘
} catch (Exception e) {
log.error("Failed to write batch to AOF file", e);
throw new RuntimeException("批次写入失败", e);
}
}然后稍微来说一下我们的底层写入器AofWriter 里面比较关键的组件是,FileChannel和RandomAccessFile,文件实际大小(用于预分配空间),以及一些重写组建,我们后面再说。 这里来稍微说一下为什么采用这个只在开头预分配的机制,有几个原因。 首先是避免运行时的抖动,可能在高峰期会有大量的写入请求,如果每次都要去扩展文件大小,会反而导致性能下降。 其次是因为AOF文件的大小通常是可预测的,尤其是在高负载的情况下。 因为他是可调整的参数,所以可以根据实际情况来调整,使用的时候可以比较灵活来确定,因为AOF的日志大小大致是可控的。
底层写入器实现
关键组件说明
AofWriter是整个AOF系统的底层实现,负责直接与文件系统交互。其核心组件包括:
- FileChannel:提供高效的文件写入能力
- RandomAccessFile:支持文件随机访问操作
- 文件大小追踪:用于空间预分配和管理
- 重写相关组件:支持AOF重写功能
java
/** AOF 文件对象 */
private File file;
/** 文件通道,用于写入操作 */
private FileChannel channel;
/** 随机访问文件,用于文件操作 */
private RandomAccessFile raf;
/** 是否启用预分配空间 */
private boolean isPreallocated;
/** 文件实际大小(不包含预分配空间) */
private AtomicLong realSize;
/** 重写状态标志 */
private final AtomicBoolean rewriting = new AtomicBoolean(false);
/** 默认重写缓冲区大小 */
public static final int DEFAULT_REWRITE_BUFFER_SIZE = 100000;
/** 重写缓冲区队列 */
BlockingQueue<ByteBuf> rewriteBufferQueue;
/** Redis 核心接口 */
private final RedisCore redisCore;
/** 默认预分配空间大小(4MB) */
private static final int DEFAULT_PREALLOCATE_SIZE = 4 * 1024 * 1024;预分配机制设计
预分配机制的设计考虑了以下几个关键因素:
- 性能优化:避免频繁的文件大小调整导致的性能抖动
- 可预测性:利用AOF文件大小的可预测特性
- 灵活配置:支持根据实际需求调整预分配参数
- 空间效率:未使用的预分配空间不会占用实际磁盘空间
初始化过程:
- 获取文件当前大小
- 执行空间预分配
- 将文件指针移动到实际数据末尾
java
this.realSize = new AtomicLong(channel.size());
if(isPreallocated){
preAllocated(DEFAULT_PREALLOCATE_SIZE);
}
this.channel.position(this.realSize.get());
private void preAllocated(int defaultPreallocateSize) throws IOException {
long currentSize = 0;
try{
currentSize = this.channel.size();
}catch(IOException e){
log.error("获取文件长度时发生错误",e);
}
long newSize = currentSize + defaultPreallocateSize;
if(this.raf != null){
this.raf.setLength(newSize);
}
else if(this.channel != null){
this.channel.truncate(newSize);
}
this.channel.position(currentSize);
this.realSize.set(currentSize);
}写入操作实现
写入操作的完整流程:
- 验证写入器状态
- 执行文件写入
- 更新文件实际大小
- 处理重写相关逻辑
java
@Override
public int write(ByteBuffer buffer) throws IOException {
// 检查是否已关闭
if (channel == null || !channel.isOpen()) {
throw new IOException("AOF Writer 已关闭,无法执行写入操作");
}
// 1. 写入到文件
int written = writtenFullyTo(channel, buffer);
realSize.addAndGet(written);
// 2. 如果正在重写,复制数据到重写缓冲区
if (isRewriting()) {
copyToRewriteBuffer(buffer);
}
return written;
}完整写入实现:
- 保存原始位置信息
- 循环写入直到完成
- 恢复缓冲区状态
- 错误处理
java
private int writtenFullyTo(FileChannel channel, ByteBuffer buffer) {
int originalPosition = buffer.position();
int originalLimit = buffer.limit();
int totalBytes = buffer.remaining();
try {
int written = 0;
while (written < totalBytes) {
written += channel.write(buffer);
}
return written;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
buffer.position(originalPosition);
buffer.limit(originalLimit);
}
}重写缓冲区实现
重写过程中的数据处理:
- 从内存池分配新的ByteBuf
- 复制当前写入的数据
- 尝试放入重写队列
- 确保资源正确释放
java
private void copyToRewriteBuffer(ByteBuffer buffer) {
if (rewriteBufferQueue == null) {
return;
}
ByteBuf bufferCopy = null;
try {
// 1. 从池中分配ByteBuf
bufferCopy = allocator.buffer(buffer.remaining());
// 2. 复制数据到ByteBuf(保持原buffer位置不变)
int originalPosition = buffer.position();
bufferCopy.writeBytes(buffer.duplicate());
buffer.position(originalPosition);
// 3. 尝试添加到队列
if (tryOfferToRewriteQueue(bufferCopy)) {
bufferCopy = null; // 成功添加,转移所有权,不要释放
}
} finally {
// 4. 使用ReferenceCountUtil安全释放
ReferenceCountUtil.safeRelease(bufferCopy);
}
}重写队列操作实现:
- 使用超时机制避免阻塞
- 处理队列满的情况
- 完整的错误处理
java
private boolean tryOfferToRewriteQueue(ByteBuf buffer) {
try {
if (rewriteBufferQueue.offer(buffer, 100, TimeUnit.MILLISECONDS)) {
return true; // 成功添加
} else {
log.warn("重写AOF文件的缓冲区已满,丢弃数据");
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("添加到重写缓冲区时被中断", e);
return false;
} catch (Exception e) {
log.error("添加到重写缓冲区时发生错误", e);
return false;
}
}AOF重写实现
重写任务流程
重写功能的入口实现:
- 验证重写状态
- 检查系统状态
- 启动重写线程
- 返回执行结果
java
@Override
public boolean bgrewrite() throws IOException {
if (rewriting.get()) {
log.warn("正在进行AOF重写,无法再次执行");
return false;
}
// 检查RedisCore是否可用
if (redisCore == null) {
log.warn("RedisCore未设置,无法执行AOF重写");
return false;
}
rewriting.set(true);
Thread rewriteThread = new Thread(this::rewriteTask);
rewriteThread.start();
return true;
}重写任务的完整实现:
- 准备重写资源
- 执行重写操作
- 完成重写过程
- 清理临时资源
java
private void rewriteTask() {
File rewriteFile = null;
RandomAccessFile rewriteRaf = null;
FileChannel rewriteChannel = null;
try {
log.info("开始重写aof");
// 1. 准备重写资源
RewriteResources resources = prepareRewriteResources();
rewriteFile = resources.file;
rewriteRaf = resources.raf;
rewriteChannel = resources.channel;
// 2. 执行重写操作
performRewrite(rewriteChannel);
// 3. 完成重写
finishRewrite(rewriteChannel, rewriteRaf, rewriteFile);
rewriteChannel = null;
rewriteRaf = null;
log.info("重写AOF文件完成");
} catch (IOException e) {
log.error("重写AOF文件时发生错误", e);
cleanupTempFile(rewriteFile);
} finally {
cleanupResources(rewriteChannel, rewriteRaf);
}
}重写资源准备
重写资源准备过程:
- 创建临时文件
- 初始化文件访问对象
- 设置文件位置
java
private RewriteResources prepareRewriteResources() throws IOException {
File rewriteFile = File.createTempFile("redis_aof_temp", ".aof", file.getParentFile());
RandomAccessFile rewriteRaf = new RandomAccessFile(rewriteFile, "rw");
FileChannel rewriteChannel = rewriteRaf.getChannel();
rewriteChannel.position(0);
return new RewriteResources(rewriteFile, rewriteRaf, rewriteChannel);
}重写操作执行
重写操作的两个主要步骤:
- 重写数据库内容
- 应用重写缓冲区数据
java
private void performRewrite(FileChannel rewriteChannel) throws IOException {
// 重写数据库内容
rewriteDatabases(rewriteChannel);
// 应用重写缓冲区
log.info("开始缓冲区的重写");
applyRewriteBuffer(rewriteChannel);
}
private void rewriteDatabases(FileChannel rewriteChannel) throws IOException {
final RedisDB[] dataBases = redisCore.getDataBases();
for (int i = 0; i < dataBases.length; i++) {
final RedisDB db = dataBases[i];
if (db.size() > 0) {
log.info("正在重写数据库{}", i);
writeSelectCommand(i, rewriteChannel);
writeDatabaseToAof(db, rewriteChannel);
}
}
}
private void applyRewriteBuffer(final FileChannel rewriteChannel) {
int appliedCommands = 0;
int totalBytes = 0;
try {
final int batchSize = 1000;
final List<ByteBuf> buffers = new ArrayList<>(batchSize);
while (rewriteBufferQueue.drainTo(buffers, batchSize) > 0) {
for (final ByteBuf buffer : buffers) {
try {
ByteBuffer nioBuffer = buffer.nioBuffer();
final int written = writtenFullyTo(rewriteChannel, nioBuffer);
totalBytes += written;
appliedCommands++;
} finally {
// 使用ReferenceCountUtil安全释放
ReferenceCountUtil.safeRelease(buffer);
}
}
buffers.clear();
}
log.info("重写AOF文件的缓冲区已应用,应用了{}条命令,总字节数: {}",
appliedCommands, totalBytes);
} catch (Exception e) {
log.error("重写AOF文件的缓冲区应用时发生错误", e);
}
}重写完成处理
重写完成的处理步骤:
- 强制刷盘确保数据持久化
- 关闭重写资源
- 替换原有AOF文件
java
private void finishRewrite(FileChannel rewriteChannel, RandomAccessFile rewriteRaf, File rewriteFile) throws IOException {
rewriteChannel.force(true);
closeRewriteResources(rewriteChannel, rewriteRaf);
replaceAofFile(rewriteFile);
}文件替换的安全实现:
- 保存当前文件状态
- 关闭现有资源
- 执行文件替换
- 重新打开文件
java
private void replaceAofFile(final File rewriteFile) {
RandomAccessFile oldRaf = null;
FileChannel oldChannel = null;
try {
// 1. 保存当前资源引用并关闭
oldRaf = this.raf;
oldChannel = this.channel;
this.raf = null;
this.channel = null;
closeFileResources(oldChannel, oldRaf);
// 2. 执行文件替换操作
performFileReplacement(rewriteFile);
// 3. 重新打开文件
reopenFile();
log.info("文件重新打开完成,当前位置: {}", this.channel.position());
} catch (IOException e) {
log.error("替换AOF文件时发生错误", e);
handleReopenFailure();
}
}文件替换的安全机制
文件替换过程的安全保障:
- 创建备份文件
- 执行安全的文件替换
- 成功后删除备份
- 失败时进行回滚
java
private void performFileReplacement(File rewriteFile) throws IOException {
File backupFile = null;
try {
// 创建备份
backupFile = FileUtils.createBackupFile(file, ".bak");
if (backupFile != null) {
log.info("创建备份文件{}", backupFile.getAbsolutePath());
}
// 替换文件
FileUtils.safeRenameFile(rewriteFile, file);
log.info("重写AOF文件完成,替换原文件");
// 删除备份
deleteBackupFile(backupFile);
} catch (Exception e) {
log.error("重命名文件时发生错误", e);
restoreFromBackup(backupFile);
throw e;
}
}AOF加载器实现
文件加载流程
AOF加载器的主要职责:
- 解析AOF文件内容
- 重放Redis命令
- 恢复数据状态
- 处理加载异常
文件打开实现:
- 验证文件状态
- 初始化文件访问对象
- 错误处理和资源清理
java
public void openFile() throws IOException {
final File file = new File(fileName);
if (!file.exists() || file.length() == 0) {
log.info("AOF文件不存在或为空: {}", fileName);
return;
}
try {
this.raf = new RandomAccessFile(file, "r");
this.channel = raf.getChannel();
log.info("AOF文件打开成功: {}, 文件大小: {} bytes", fileName, channel.size());
} catch (IOException e) {
// 如果打开失败,确保资源被清理
closeFile();
throw new IOException("打开AOF文件失败: " + fileName, e);
}
}加载过程实现
完整的加载过程:
- 读取文件内容
- 解析RESP命令
- 执行命令恢复
- 清理资源
java
public void load() {
if (channel == null) {
log.info("AOF文件通道未初始化,跳过加载操作");
return;
}
ByteBuf commands = null;
try {
log.info("开始加载AOF文件: {}", fileName);
// 1. 读取AOF文件的完整内容
commands = readFileContent();
// 2. 逐个处理RESP命令
int successCount = processCommands(commands);
log.info("AOF文件加载完成,成功执行 {} 条命令", successCount);
} catch (Exception e) {
log.error("AOF文件加载失败: {}", fileName, e);
throw new RuntimeException("AOF加载过程中发生错误", e);
} finally {
if(commands !=null && commands.refCnt() >0){
commands.release();
}
closeFile();
}
}文件读取策略
文件读取的两种策略:
- 小文件直接读取
- 大文件分块处理
小文件读取实现:
java
private ByteBuf readFileContent() throws IOException{
long fileSize = channel.size();
if(fileSize == 0){
return Unpooled.EMPTY_BUFFER;
}
if(fileSize > Integer.MAX_VALUE){
return readLargeFile();
}
int size = (int) fileSize;
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
try{
ByteBuffer byteBuffer = buffer.nioBuffer(0, size);
int totalRead = 0;
while(totalRead < size){
int read = channel.read(byteBuffer);
if(read == -1){
break;
}
totalRead += read;
}
buffer.writerIndex(totalRead);
return buffer;
}catch(IOException e){
buffer.release();
throw e;
}
}大文件读取实现:
java
private ByteBuf readLargeFile() {
CompositeByteBuf composite = PooledByteBufAllocator.DEFAULT.compositeBuffer();
ByteBuf currentBuf = null;
try{
long reamining = channel.size();
while(reamining > 0){
int chunkSize = Math.min(BUFFER_SIZE, (int) reamining);
currentBuf = PooledByteBufAllocator.DEFAULT.directBuffer(chunkSize);
ByteBuffer byteBuffer = currentBuf.nioBuffer(0, chunkSize);
int read = channel.read(byteBuffer);
if(read == -1){
currentBuf.release();
break;
}
currentBuf.writerIndex(read);
composite.addComponent(true,currentBuf);
reamining -= read;
currentBuf = null;
}
return composite;
}catch(IOException e){
if(currentBuf != null){
currentBuf.release();
}
composite.release();
throw new RuntimeException(e);
}
}命令处理实现
命令处理的主要步骤:
- 解析RESP格式命令
- 验证命令有效性
- 执行命令恢复
- 处理执行异常
java
private int processCommands(ByteBuf commands) {
int succesCount = 0;
while(commands.isReadable()){
int position = commands.readerIndex();
commands.markReaderIndex();
try{
Resp command = Resp.decode(commands);
if(executeCommand(command,position)){
succesCount++;
}
}catch(Exception e){
handleCommandError(commands, position,e);
}
}
return succesCount;
}命令执行实现:
- 验证命令格式
- 解析命令参数
- 通过RedisCore执行
- 处理执行结果
java
private boolean executeCommand(Resp respArray,int position) {
if(!(respArray instanceof RespArray)){
log.warn("命令不是RespArray类型,在{}",position);
return false;
}
RespArray command = (RespArray) respArray;
if(!isValiedCommand(command)){
log.warn("命令无效,在{}",position);
return false;
}
return executeRedisCommand(command,position);
}java
private boolean executeRedisCommand(final RespArray command, final int position) {
try {
// 1. 解析命令名称
final String commandName = ((BulkString) command.getContent()[0])
.getContent().getString().toUpperCase();
// 2. 解析命令参数
final Resp[] content = command.getContent();
final String[] args = new String[content.length - 1];
for (int i = 1; i < content.length; i++) {
if (content[i] instanceof BulkString) {
args[i - 1] = ((BulkString) content[i]).getContent().getString();
} else {
log.warn("命令参数格式错误,位置: {}", position);
return false;
}
}
// 3. 通过RedisCore接口执行命令
final boolean success = redisCore.executeCommand(commandName, args);
if (!success) {
log.warn("命令执行失败: {} 在位置: {}", commandName, position);
}
return success;
} catch (Exception e) {
log.error("命令执行异常,位置: {}", position, e);
return false;
}
}错误处理机制
错误处理的完整流程:
- 定位错误位置
- 跳过错误数据
- 寻找下一个有效命令
- 记录错误信息
java
private void handleCommandError(ByteBuf commands, int position, Exception e) {
if (!commands.isReadable()) {
return;
}
log.warn("命令解析错误,位置: {}, 错误: {}", position, e.getMessage());
int startPosition = commands.readerIndex();
boolean foundNextCommand = false;
if (commands.isReadable()) {
commands.readByte(); // 跳过当前错误字节
}
while(commands.isReadable()) {
int currentPos = commands.readerIndex();
byte b = commands.readByte();
if(isRespPrefix(b)) {
commands.readerIndex(currentPos);
foundNextCommand = true;
break;
}
}
if (!foundNextCommand) {
log.warn("未找到有效的下一个命令,跳过剩余数据");
commands.readerIndex(commands.writerIndex());
}
int skippedBytes = commands.readerIndex() - startPosition;
if (skippedBytes > 0) {
log.debug("跳过了 {} 字节的无效数据", skippedBytes);
}
}系统总结
核心特性
高效的批处理机制
- 基于时间窗口的动态批次控制
- 智能的批次大小调整(0-50条命令)
- 大命令直接写入优化(>512KB)
智能的持久化策略
- 多级刷盘机制,支持不同安全级别
- 预分配空间管理(4MB块),减少文件系统开销
- 原子性文件替换,确保AOF文件完整性
可靠的容错机制
- 完整的错误处理链
- 基于备份的安全文件替换
- 数据一致性保护机制
性能优化要点
写入性能优化
- 使用
FileChannel进行零拷贝IO - 批量写入合并小I/O操作
- 池化
ByteBuf管理,降低GC压力 - 预分配空间减少文件系统开销
- 使用
延迟控制优化
- 5ms固定延迟窗口限制
- 取消最小批次约束
- 大命令快速通道处理
- 队列满时同步写入降级
资源管理优化
- 严格的
ByteBuf引用计数管理 - 智能的内存池化策略
- 动态的空间预分配机制
- 严格的
设计亮点
高效的批处理策略
- 结合固定延迟窗口(5ms)和最大批次限制(50条)
- 在保证低延迟的同时最大化批处理效率
- 动态适应不同的写入负载场景
可靠的文件操作
- 引入备份机制的原子性文件替换
- 基于
FileChannel的高效IO操作 - 完整的异常恢复机制
智能的资源管理
- 基于池化的内存管理降低GC压力
- 预分配空间机制减少文件系统开销
- 智能的刷盘策略平衡性能和可靠性
这个实现展示了如何通过深入理解业务需求和系统瓶颈,来设计一个既高效又可靠的持久化系统。通过将批处理、内存池化和智能刷盘等技术有机结合,最终实现了一个在性能、可靠性和可维护性三个维度都达到较高水平的解决方案。
