Appearance
高容错特性
Kafka 提供高度的容错能力:在包含 n 个副本的集群中,可以容忍 n-1 个节点失败而保持系统可用。
Consumer Group 机制
消费者可以组成 Consumer Group,每条消息只会被同一组内的一个消费者处理。这种设计完美匹配 Kafka 的分区特性:当一个主题有多个分区时,消息会动态分配给多个消费者,显著提高并发处理的吞吐量。
java
public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final ConsumerCoordinator coordinator;
public ConsumerRecords<K, V> poll(Duration timeout) {
// 检查是否需要重新平衡
coordinator.poll(time.timer(timeout));
// 获取分配的分区
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
fetcher.fetchedRecords();
if (!records.isEmpty())
return new ConsumerRecords<>(records);
return ConsumerRecords.empty();
}
}
高可用性与再平衡机制
如果某个消费者无法处理其分配的分区,其他消费者会接管这些分区,触发再平衡(rebalance)。这种机制确保即使部分消费者失效,整体系统仍能继续运行,同时提供负载均衡效果。
java
public class ConsumerCoordinator extends AbstractCoordinator {
public boolean needRejoin() {
return subscriptions.partitionsAutoAssigned() &&
(rejoinNeededOrPending() || assignmentExpired());
}
private void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
// 处理分区分配结果
PartitionAssignor.Assignment assignment = assignor.onAssignment(
assignmentBuffer, partitionAssignor.name());
// 更新分区分配
subscriptions.assignFromSubscribed(assignment.partitions());
}
}
批量处理机制
Kafka 使用批量发送机制,以消息集合为单位进行发送,大幅提高推送效率。整体架构遵循:
- Producer 负责向 Broker 推送消息
- Consumer 负责从 Broker 拉取消息
消息发送流程
- 拦截器处理:记录处理时间、消息数量、错误数量,可过滤不必要信息,执行数据清洗,处理敏感信息
- 序列化:对 key 和 value 进行序列化
- 批量累积:将消息累积成批次后发送
java
public final class RecordAccumulator {
private final int batchSize;
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) {
// 尝试添加到现有批次
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (dq.isEmpty() || dq.getLast().tryAppend(timestamp, key, value, callback) == null) {
// 创建新批次
ProducerBatch batch = new ProducerBatch(tp, produceRequestBuilder,
timestamp, key, value, callback);
dq.addLast(batch);
}
}
// 返回结果
return new RecordAppendResult(...);
}
}
Leader 和 Follower 机制
Kafka 集群中的 Broker 承担不同角色:
- 每个 Topic 分为多个分区(Partition)
- 每个分区的数据在多个 Broker 上存储形成副本
- 每个分区的副本中,选出一个 Leader 副本负责处理所有读写请求
- 其他副本作为 Follower,负责从 Leader 同步数据
副本失败处理机制
当 Producer 发送消息时,会指定 acks
参数,决定 Kafka 返回确认前需满足的条件。如果某个 Follower 副本同步失败:
- Kafka 通过 ISR (In-Sync Replicas) 机制管理副本状态
- 失败的 Follower 会从 ISR 集合中移除
- Producer 会收到一个错误
- 根据配置的
retries
参数进行重试 - 由于问题 Follower 已被移出 ISR,即使设置了
acks=all
,重试也可能成功 - 如果重试达到最大次数仍失败,抛出异常,由应用程序决定如何处理
java
public class KafkaProducer<K, V> implements Producer<K, V> {
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
// 获取分区的元数据
Cluster cluster = metadata.fetch();
// 计算目标分区
int partition = partition(record, cluster);
// 序列化记录
byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
// 发送到累加器
RecordAccumulator.RecordAppendResult result = accumulator.append(
topicPartition,
timestamp,
serializedKey,
serializedValue,
headers,
callback,
maxBlockTimeMs);
// 如果需要立即发送
if (result.batchIsFull || result.newBatchCreated) {
sender.wakeup();
}
return result.future;
}
}
重试机制
java
public class Sender implements Runnable {
private void sendProducerData(long now) {
// 获取准备发送的批次
Map<Integer, List<ProducerBatch>> batches = accumulator.drain(
cluster,
maxSize,
now);
for (Map.Entry<Integer, List<ProducerBatch>> entry : batches.entrySet()) {
NodeBatch batch = entry.getValue();
// 发送批次
sendProducerBatch(batch);
}
}
private void handleProduceResponse(ProduceResponse.PartitionResponse response,
ProducerBatch batch,
long now) {
if (response.error != Errors.NONE) {
// 处理错误
if (canRetry(batch, response.error)) {
// 重试发送
reenqueueBatch(batch, now);
} else {
// 完成发送,带有错误
completeBatch(batch, response.error);
}
} else {
// 成功完成发送
completeBatch(batch, null);
}
}
private boolean canRetry(ProducerBatch batch, Errors error) {
return batch.attempts() < this.retries && error.exception().isRetriable();
}
}
分区选择策略
消息写入分区遵循以下原则:
- 如果指定了 Partition,则写入指定的 Partition
- 如果设置了数据的 Key,则根据 Key 的哈希值选择 Partition
- 如果既未指定 Partition 也未设置 Key,则使用轮询方式选择 Partition
存储结构
分区在服务器上表现为独立的文件夹,每个分区下包含多组 Segment 文件:
- 每组 Segment 文件包含
.log
、.index
、.timeindex
文件 .log
文件实际存储消息数据.index
和.timeindex
文件作为索引用于快速检索信息
java
public class LogSegment {
private final FileRecords log; // 日志文件
private final LazyIndex<OffsetIndex> lazyOffsetIndex; // 偏移量索引
private final LazyIndex<TimeIndex> lazyTimeIndex; // 时间戳索引
private final TransactionIndex txnIndex; // 事务索引
private final long baseOffset; // 基础偏移量
}
消息结构
存储在 Log 中的消息包含:
- Offset:序列号,标识每条消息在分区内的位置
- 消息大小:占用 4 字节,描述消息大小
- 消息体:实际存储的消息数据(经过压缩)
存储策略
Kafka 存储策略基于时间和大小:
- 无论消息是否被消费,都会保存所有满足保留条件的消息
- 读取特定消息的复杂度为 O(1)
- 删除过期文件不会提高 Kafka 性能
消息查找机制
查找特定 Offset 的消息过程:
- 定位消息所在的 Segment 文件(如查找 Offset 为 368801 的消息)
- 打开该 Segment 的索引文件
- 计算相对 Offset(相对于该 Segment 起始 Offset)
- 使用二分查找定位小于等于目标相对 Offset 的索引项
- 获取对应的物理偏移位置
- 打开数据文件,从该物理位置开始顺序扫描直到找到目标消息
虽然过程看似复杂,但被认为是 O(1) 复杂度,原因是:
- 二分查找定位 Segment 的复杂度是 O(log(Segment 文件数))
- 但 Segment 数量有限,可视为常数操作
- 基于文件偏移量可以直接定位,不需要遍历整个日志
注意:在实际应用中,消费者数量应与分区数量匹配。如果消费者数量多于分区数量,多余的消费者会处于空闲状态。