Appearance
Kafka Streams 实现点赞系统
本文将探讨如何利用 Kafka Streams 框架,高效且实时地统计文章点赞数,并生成排行榜。我们将分析传统 Redis 方案在大数据量下的局限性,并详细阐述 Kafka Streams 如何解决这些挑战,同时提供一个简化版的 Demo 实现流程。
业务场景概述
我们的场景是一个技术分享平台,用户可以给文章点赞。核心目标是:
- 统计一定时间内的点赞数。
- 展示某一时间段内的文章点赞排行榜。
传统方案的挑战:Redis + Lua 滑动窗口的弊端
传统的解决方案可能会考虑使用 Redis 配合 Lua 脚本实现滑动窗口来统计点赞数。这种方案在数据量不大(例如每天几万、几十万点赞量)时,确实具有实现简单、直接、延迟低的优点。
然而,当平台规模持续增长,点赞量达到每天数亿甚至每秒数十万的级别时,Redis 方案将面临严峻的挑战,使其不再是最佳选择:
大数据量下的“Why Not Redis?”深入思考
这里需要深入思考一个问题:Redis 真的不能承担这种压力吗? 对于点赞这种业务,如果平台规模不大,比如每天几万、几十万的点赞量,Redis 配合 Lua 脚本实现滑动窗口是完全可行的,甚至可能是最优解,因为它简单、直接、延迟极低。
但我们需要考虑更极端的情况:如果我们的技术分享平台发展壮大,点赞量达到每天数亿甚至每秒数十万的级别呢? 这时,传统的 Redis 方案就会面临严峻的挑战:
- 内存爆炸: 为了维护“一定时间”内的点赞数,例如统计过去一小时甚至过去一天的点赞排行榜,Redis 需要在内存中存储所有窗口内活跃的文章ID和对应的点赞事件数据。这将导致 Redis 的内存占用呈线性甚至指数级增长。当数据量大到一定程度,Redis 的内存会迅速耗尽,或需要投入高昂的硬件成本。
- 频繁淘汰带来的计算瓶颈: 无论是通过 ZSet 存储事件,还是周期性地清理旧数据,在海量数据和高并发写入下,Redis 的 过期策略、Key 的删除、以及内部数据结构的调整 都会带来巨大的 CPU 消耗和 IO 压力。Redis 虽然快,但它毕竟是单线程处理命令(部分功能如持久化可异步),大量的“清理旧数据”操作会阻塞主线程,严重影响其他业务读写性能,导致整体吞吐量下降和延迟急剧增加。
- 单点与扩展性挑战: 即使使用 Redis 集群,对于需要聚合计算的场景,维护跨节点的数据一致性以及滑动窗口逻辑的复杂性也会大大增加。而扩展计算能力,往往需要与存储容量紧密耦合,不够灵活。
因此,在 数据量巨大、时间窗口较长(例如分钟级、小时级甚至天级)、对内存占用和计算资源消耗有严格控制的场景下,让 Redis 去承担这种持续、高频、大范围的聚合计算,确实不是一个最佳的选择。Redis 更适合作为高速缓存或存储最终聚合结果的介质。
Kafka Streams 解决方案
使用 Kafka Streams 可以完美解决上述问题。它是一个强大的流处理库,能够提供对数据处理的支持框架。大致的流程是:通过上游的 Kafka topic 接收数据,经过处理后输出到下游的 Kafka topic。它能够完美与 Kafka 进行集成,具备以下优势:
- 实时流处理能力: Kafka Streams 能够以流式方式处理点赞事件,在不增加 Redis 压力的前提下,实时计算点赞数并生成排行榜。
- 容错性: Kafka Streams 的状态存储是容错的。它底层使用 RocksDB 进行本地状态存储,RocksDB 的数据可以定期同步到 Kafka 的一个内部 Topic 中(Changelog Topic),确保即使应用实例崩溃,也能从 Kafka 恢复状态,保证计算的准确性,无需担心数据丢失。
- 开发简便性: 其操作逻辑远比编写 Lua 脚本方便,且代码可维护性更高。
- 系统解耦: 通过事件驱动的方式,将点赞统计逻辑与核心业务逻辑解耦,提高了系统的灵活性和可扩展性。
- 数据利用: 处理后的数据可以方便地输出到其他 Kafka Topic,为未来的数据分析和利用提供了可能。
- 处理长时大流量场景: 在处理长时间、大流量的场景上,Kafka Streams 提供了远比 Redis 更强的能力。
Demo 场景下的简化与生产环境的考量
在这里,我们需要明确一点:为了专注于演示 Kafka Streams 的核心流处理逻辑,我当前这个 Demo 对用户点赞的源头处理进行了简化。
在实际生产环境中,用户点赞通常是一个更复杂的业务流程:
- 用户发起点赞请求。
- 业务服务层进行幂等性检查: 在真正处理点赞之前,服务会查询数据库(或缓存),判断该用户是否已经对该文章点赞。如果已经点赞,可能返回“已点赞”的提示,或处理为取消点赞。这保证了业务逻辑的正确性。
- 点赞记录落库: 如果是新的点赞,会将
ArticleLike
记录持久化到数据库,这是为了数据的一致性和支持用户后续的取消点赞等操作。 - 异步发送事件: 数据库操作成功后,点赞事件会异步地发送到 Kafka。这种解耦方式可以避免发送 Kafka 的延迟影响用户响应时间,并提高系统吞吐量。
由于我们 Demo 的核心目标是展示 Kafka Streams 如何处理流入的事件流,为了快速模拟点赞事件的产生,我采取了以下测试策略来生成点赞数据,而非严格模拟生产环境下的用户点赞业务逻辑:
ArticleLike
实体类,这些数据是需要落库的:
java
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("article_likes")
public class ArticleLike {
@TableId(type = IdType.AUTO)
private Long id;
@TableField("article_id")
private Long articleId;
@TableField("user_id")
private Long userId;
@TableField("like_time")
private Date likeTime;
}
按照 Redis + Lua 的设计,此时点赞数据应该被放入 ZSet 进行滑动窗口处理。现在,我们将其替换为 Kafka Streams。
1. 生成点赞事件并落库
为了模拟点赞行为,我们首先生成一个随机用户 ID,并检查该用户是否已对文章点赞。这是为了模拟实际业务中的幂等性检查:
java
Long userId = (long) (random.nextInt(10000) + 1);
// 检查是否已经点赞过
QueryWrapper<ArticleLike> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("article_id", articleId)
.eq("user_id", userId);
ArticleLike existingLike = articleLikeMapper.selectOne(queryWrapper);
if (existingLike != null) {
log.info("User {} already liked article {}, generating new userId", userId, articleId);
// 如果已经点赞,重新生成用户ID
userId = (long) (random.nextInt(10000) + 10001); // 使用更大的范围避免重复
}
接着,将点赞记录持久化到数据库:
java
ArticleLike articleLike = new ArticleLike();
articleLike.setArticleId(articleId);
articleLike.setUserId(userId);
articleLike.setLikeTime(new Date());
articleLikeMapper.insert(articleLike);
2. 发送点赞事件到 Kafka
点赞记录落库成功后,我们将点赞事件异步发送到 Kafka。发送的事件是一个 ArticleLikeEvent
对象,包含文章 ID、用户 ID、操作类型(点赞/取消点赞)和时间戳。
java
articleLikeProducerService.sendArticleLikeEvent(articleId, userId, "LIKE");
sendArticleLikeEvent
方法内部会构建 ArticleLikeEvent
对象,并将其序列化为 JSON 字符串发送到 Kafka:
ArticleLikeEvent
结构:
java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ArticleLikeEvent {
private Long articleId;
private Long userId;
private String action; // LIKE, UNLIKE
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "Asia/Shanghai")
private Date timestamp;
}
事件发送逻辑:
java
private static final String TOPIC = "article-likes-events"; // 定义 Kafka Topic 名称
java
ArticleLikeEvent event = new ArticleLikeEvent(
articleId,
userId,
action,
new Date());
String eventJson = objectMapper.writeValueAsString(event); // 将事件对象序列化为 JSON 字符串
String key = articleId.toString(); // 将文章 ID 作为 Kafka 消息的 Key,保证相同文章的点赞事件发送到同一个分区
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, key, eventJson);
发送到 Kafka 的数据示例:
- Key:
"1001"
(文章ID) - Value:
{"articleId":1001,"userId":12345,"action":"LIKE","timestamp":"2025-08-01 13:00:00"}
Kafka Streams 处理流程
Kafka Streams 的核心处理逻辑定义在一个 KStream
Bean 方法中。这里我们配置了 article-like-events
作为输入 Topic。
java
@Configuration
@EnableKafkaStreams
@Slf4j
public class ArticleLikeStreamsConfig {
private static final String INPUT_TOPIC = "article-like-events";
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private final ObjectMapper objectMapper;
private final ArticleRankingService articleRankingService;
...
}
Kafka Streams 会使用一个传入 StreamsBuilder
并返回 KStream
的 Bean 方法来做处理。JsonSerde
是用于序列化和反序列化 JSON 数据的工具。
java
@Bean
public KStream<String, String> articleLikeStream(StreamsBuilder streamsBuilder) {
log.info("Initializing Article Like Kafka Streams topology...");
try {
JsonSerde<ArticleLikeEvent> likeEventSerde = new JsonSerde<>(ArticleLikeEvent.class, objectMapper);
// 1. 从输入Topic读取点赞事件
KStream<String, String> sourceStream = streamsBuilder.stream(
INPUT_TOPIC,
Consumed.with(Serdes.String(), Serdes.String()));
log.info("Created source stream for topic: {}", INPUT_TOPIC);
// 2. 添加日志来监控消息接收
sourceStream.foreach((key, value) -> {
log.info("Received message from Kafka: key={}, value={}", key, value);
});
// ... 后续处理
} catch (Exception e) {
log.error("Error initializing Kafka Streams topology", e);
throw new RuntimeException("Kafka Streams initialization failed", e);
}
}
1. 反序列化与初步过滤
首先,我们从 streamsBuilder
的输入 Topic (article-like-events
) 中读取原始的 String
类型数据。通过 mapValues
操作将 JSON 字符串反序列化为 ArticleLikeEvent
对象,并对无效事件进行过滤。
java
KStream<String, ArticleLikeEvent> likeEventStream = sourceStream.mapValues((key, value) -> {
try {
if (value == null || value.trim().isEmpty()) {
log.warn("Received null or empty value for key: {}", key);
return null;
}
ArticleLikeEvent event = objectMapper.readValue(value, ArticleLikeEvent.class);
log.info("Successfully deserialized ArticleLikeEvent: articleId={}, userId={}, action={}",
event.getArticleId(), event.getUserId(), event.getAction());
return event;
} catch (Exception e) {
log.error("Error deserializing ArticleLikeEvent for key: {}, value: {}", key, value, e);
return null;
}
}).filter((key, event) -> {
boolean isValid = event != null && event.getArticleId() != null;
if (!isValid) {
log.warn("Filtered out invalid event: key={}", key);
}
return isValid;
});
// ... 后续处理
mapValues
输入/输出示例:- 输入 (key, value):
("1001", "{\"articleId\":1001,\"userId\":12345,\"action\":\"LIKE\",\"timestamp\":\"2025-08-01 13:00:00\"}")
- 输出 (key, value):
("1001", ArticleLikeEvent{articleId=1001, userId=12345, action='LIKE', timestamp=Fri Aug 01 13:00:00 JST 2025})
- 输入 (key, value):
filter
过滤逻辑: 确保event
非空且articleId
存在。
2. 过滤“LIKE”事件
我们只关注点赞("LIKE")事件,因此需要再次进行过滤,忽略“UNLIKE”或其他类型的事件。
java
KStream<String, ArticleLikeEvent> likeStream = likeEventStream
.filter((key, event) -> {
boolean isLike = "LIKE".equals(event.getAction());
log.info("Filtering event: articleId={}, action={}, isLike={}",
event.getArticleId(), event.getAction(), isLike);
return isLike;
});
filter
过滤逻辑: 仅保留action
为 "LIKE" 的事件。
3. 分组、窗口聚合与计数
接下来是核心的统计逻辑。因为我们需要统计每个文章的点赞数并生成排行榜,所以步骤如下:
- 按照文章 ID 进行分组 (
map
+groupByKey
): 将流中的事件按articleId
重新设置 key。 - 定义时间窗口 (
windowedBy
): 设置一个时间窗口,例如 5 秒(为了便于测试,实际应用中会是更长的时间窗口,例如 5 分钟、1 小时等)。 - 统计点赞数 (
count
): 在每个时间窗口内,对每个文章 ID 的点赞事件进行计数。 - 指定状态存储 (
Materialized.as
):count
操作需要维护每个键的计数。Kafka Streams 会利用其底层的 RocksDB 本地状态存储来高效地管理和更新每个文章 ID 在当前时间窗口内的点赞数。RocksDB 是一种高性能的嵌入式键值存储,它能够将状态持久化到本地磁盘,并优化读写性能,尤其擅长处理大规模的增量更新和聚合计算。
java
KStream<String, Long> articleLikeCounts = likeStream
.map((key, event) -> {
String newKey = event.getArticleId().toString();
log.info("Mapping event to new key: oldKey={}, newKey={}, articleId={}",
key, newKey, event.getArticleId());
return new KeyValue<>(newKey, event);
})
.groupByKey(Grouped.with(Serdes.String(), likeEventSerde)) // 按照新的文章ID作为Key进行分组,值仍然是ArticleLikeEvent
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(5))) // 定义5秒的时间窗口
.count(Materialized.as("article-like-counts-store")) // 在窗口内进行计数,并将结果存储到名为 "article-like-counts-store" 的状态存储中
map
输出示例:- 输入 (key, value):
("1001", ArticleLikeEvent{...})
- 输出 (key, value):
("1001", ArticleLikeEvent{...})
(这里只是将articleId
作为新的 Key,Value 保持不变,以便groupByKey
使用)
- 输入 (key, value):
groupByKey
输出: 形成KGroupedStream<String, ArticleLikeEvent>
,其中每个 Key 对应一个文章 ID,其 Value 是该文章 ID 下的所有ArticleLikeEvent
。windowedBy
: 在每个 Key Group 中,将数据按 5 秒的时间窗口进行切分。count
输出: 得到一个KTable<Windowed<String>, Long>
,表示每个窗口内每个文章 ID 的点赞总数。
4. 转换回流并更新 Redis
最后,我们将 KTable
转换回 KStream
(toStream()
),并对结果进行格式化,将时间窗口信息提取出来,然后将最终的点赞统计结果更新到 Redis 中。
java
.toStream()
.map((windowedKey, count) -> {
String articleId = windowedKey.key();
long windowStart = windowedKey.window().start();
long windowEnd = windowedKey.window().end();
// 格式化时间窗口
LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(windowStart), ZoneId.systemDefault());
LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(windowEnd), ZoneId.systemDefault());
String timeWindow = startTime.format(TIME_FORMATTER) + " - " + endTime.format(TIME_FORMATTER);
log.info("Window aggregation result: Article {} like count: {} in window: {}",
articleId, count, timeWindow);
// 调用服务更新Redis排行榜
articleRankingService.updateArticleLikeCount(Long.parseLong(articleId), count, timeWindow);
// 返回一个可以继续链式操作的值,例如将文章ID和点赞数作为新的KV对
return new KeyValue<>(articleId, count);
});
toStream()
输出示例:- 输入 (Windowed<String> key, Long value):
("1001"@1678886400000/1678886405000, 50L)
(文章 ID 1001 在某个 5 秒窗口内有 50 个点赞) - 输出 (String key, Long value):
("1001", 50L)
(经过map
转换为普通KStream
,并触发updateArticleLikeCount
方法)
- 输入 (Windowed<String> key, Long value):
5. 更新 Redis 排行榜
articleRankingService.updateArticleLikeCount
方法负责将计算出的点赞数存储到 Redis。我们使用 Redis 的 Sorted Set (ZSet) 来存储排行榜数据。
java
public void updateArticleLikeCount(Long articleId, Long likeCount, String timeWindow) {
try {
// 为每个时间窗口创建一个独立的排行榜 Key
String rankingKey = RANKING_KEY_PREFIX + timeWindow.replace(" ", "_").replace(":", "-");
log.info("Updating Redis ranking: articleId={}, likeCount={}, timeWindow={}, rankingKey={}",
articleId, likeCount, timeWindow, rankingKey);
// 将文章ID和点赞数存储到有序集合中(分数为点赞数),实现排行榜功能
redisTemplate.opsForZSet().add(rankingKey, articleId.toString(), likeCount.doubleValue());
// 同时更新一个"当前总排行榜" Key,方便前端直接查询最新总榜
redisTemplate.opsForZSet().add(CURRENT_RANKING_KEY, articleId.toString(), likeCount.doubleValue());
// 为每个时间窗口的排行榜 Key 设置过期时间,例如 24 小时,避免 Redis 内存无限增长
redisTemplate.expire(rankingKey, Duration.ofHours(24));
} catch (Exception e) {
log.error("Error updating article ranking for article: {}", articleId, e);
}
}
Redis 中存储的数据示例:
假设 RANKING_KEY_PREFIX
为 "article:likes:ranking:"
,CURRENT_RANKING_KEY
为 "article:likes:current:ranking"
。
针对特定时间窗口的排行榜 Key-Value 示例:
- Key:
"article:likes:ranking:2025-08-01_13-00-00_-_2025-08-01_13-00-05"
(该 Key 会在 24 小时后过期) - Value (ZSet 成员):
("1001", 50.0)
,("1002", 35.0)
,("1003", 20.0)
"1001"
: 文章 ID50.0
: 点赞数 (ZSet 的 Score)
- Key:
当前总排行榜 Key-Value 示例:
- Key:
"article:likes:current:ranking"
(该 Key 会持续更新,不设置独立过期时间,除非业务需要) - Value (ZSet 成员):
("1001", 150.0)
,("1004", 120.0)
,("1002", 100.0)
(这是累积的点赞数)
- Key:
查询排行榜
要查询排行榜,可以直接从 CURRENT_RANKING_KEY
这个 Redis ZSet 中获取数据。例如,获取点赞数前 N 位的文章:
java
// 获取当前总排行榜前10名的文章
Set<ZSetOperations.TypedTuple<String>> topArticles = redisTemplate.opsForZSet().reverseRangeWithScores(CURRENT_RANKING_KEY, 0, 9);
if (topArticles != null) {
for (ZSetOperations.TypedTuple<String> tuple : topArticles) {
System.out.println("Article ID: " + tuple.getValue() + ", Likes: " + tuple.getScore());
}
}
通过以上步骤,我们成功地利用 Kafka Streams 实现了高吞吐、可扩展、容错的实时点赞统计和排行榜功能,同时避免了传统方案中 Redis 可能面临的压力问题。Kafka Streams 提供了一种优雅的方式来处理流式数据,为大数据量的实时计算场景提供了强大的支持。