赞
踩
kafka的顺序消费一直是一个难以解决的问题,kafka的消费策略是对于同Topic同Partition的消息可保证顺序消费,其余无法保证。如果一个Topic只有一个Partition,那么这个Topic对应consumer的消费必然是有序的。不同的Topic的任何情况下都无法保证consumer的消费顺序和producer的发送顺序一致。
Kafka的消息顺序消费是指消费者按照消息的顺序逐条消费消息的过程。Kafka的分区(Partition)是消息的基本单位,每个分区中的消息按照顺序进行存储。在一个分区中,消息的顺序是有序的,这意味着先发送的消息会被存储在分区的前部,而后发送的消息会被追加到分区的末尾。
Kafka通过分区的方式实现消息的顺序性,消费者可以订阅一个或多个分区来消费消息。当消费者从分区中拉取消息时,Kafka会按照消息在分区中的顺序返回给消费者。这样就保证了消费者将按照消息的顺序进行消费。
需要注意的是,Kafka的多个分区是并行处理的,每个分区的消息可以独立进行消费。因此,在多个分区并行消费的情况下,消费者之间的消息顺序可能无法保证。但是,对于单个分区的消息消费,Kafka会确保按照消息的顺序进行消费。
为了实现消息的顺序消费,可以根据业务需求将相关消息发送到同一个分区,并且使用单个消费者实例来消费该分区的消息。这样就可以保证消息在整个分区中按照顺序进行处理。同时,Kafka还提供了分区器(Partitioner)机制,可以根据消息的键(key)来决定消息被发送到哪个分区,从而进一步控制消息的顺序消费。
1、单个分区消费:创建一个单独的消费者实例来消费一个分区的消息。这样可以确保在单个分区内的消息按顺序消费。但是需要注意,如果有多个分区,不同分区的消息仍可能以并发方式进行消费。
2、指定分区消费:通过指定消费者订阅的特定分区,可以确保只消费指定分区的消息。这样,可以通过将相关消息发送到同一个分区来保证消息的顺序消费。
3、按键分区:Kafka允许根据消息的键(key)来决定将消息发送到哪个分区。如果消息的键是相同的,Kafka会将它们发送到同一个分区。因此,可以根据消息的键来保证消息的顺序消费。
现有Topic-insert和Topic-update,数据唯一标识为id,对于id=1的数据而言,要保证Topic-insert消费在前,Topic-update消费在后。
两个Topic的消费为不同线程处理,所以为了保证在同一时间内的同一数据标识的消息仅有一个业务逻辑在处理,需要对业务添加锁操作。 使用synchronized进行加锁的话,会影响无关联的insert和update的数据消费能力,如id=1的insert和id=2的update,在synchronized的情况下,无法并发处理,这是没有必要的,我们需要的是对于id=1的insert和id=1的update在同一时间只有一个在处理,所以使用细粒度锁来完成加锁的操作。
在对insert和update加锁之后,其实还是没有解决消费顺序的问题,只是确保了同一时间只有一个业务在处理。 对于消费顺序异常的问题,也就是先消费了update再消费insert的情况。
处理方式:消费到update数据,校验库中是否存在当前数据(也就是是否执行insert),如果没有,就将当前update数据存入缓存,key为数据标识id,在insert消费时检查是否存在id对应的update缓存,如果有,就证明当前数据的消费顺序异常,需执行update操作,再将缓存数据移除。
消息发送:
kafkaTemplate.send("TOPIC_INSERT", "1");
kafkaTemplate.send("TOPIC_UPDATE", "1");
监听代码示例:
KafkaListenerDemo.java
@Component @Slf4j public class KafkaListenerDemo { // 消费到的数据缓存 private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>(); // 数据存储 private Map<String, String> DATA_MAP = new ConcurrentHashMap<>(); private WeakRefHashLock weakRefHashLock; public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) { this.weakRefHashLock = weakRefHashLock; } @KafkaListener(topics = "TOPIC_INSERT") public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{ // 模拟顺序异常,也就是insert后消费,这里线程sleep Thread.sleep(1000); String id = record.value(); log.info("接收到insert :: {}", id); Lock lock = weakRefHashLock.lock(id); lock.lock(); try { log.info("开始处理 {} 的insert", id); // 模拟 insert 业务处理 Thread.sleep(1000); // 从缓存中获取 是否存在有update数据 if (UPDATE_DATA_MAP.containsKey(id)){ // 缓存数据存在,执行update doUpdate(id); } log.info("处理 {} 的insert 结束", id); }finally { lock.unlock(); } acknowledgment.acknowledge(); } @KafkaListener(topics = "TOPIC_UPDATE") public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{ String id = record.value(); log.info("接收到update :: {}", id); Lock lock = weakRefHashLock.lock(id); lock.lock(); try { // 测试使用,不做数据库的校验 if (!DATA_MAP.containsKey(id)){ // 未找到对应数据,证明消费顺序异常,将当前数据加入缓存 log.info("消费顺序异常,将update数据 {} 加入缓存", id); UPDATE_DATA_MAP.put(id, id); }else { doUpdate(id); } }finally { lock.unlock(); } acknowledgment.acknowledge(); } void doUpdate(String id) throws InterruptedException{ // 模拟 update log.info("开始处理update::{}", id); Thread.sleep(1000); log.info("处理update::{} 结束", id); } }
日志(代码中已模拟必现消费顺序异常的场景):
接收到update ::1
消费顺序异常,将update数据 1 加入缓存
接收到insert ::1
开始处理 1 的insert
开始处理update::1
处理update::1 结束
处理 1 的insert 结束
观察日志,此方案可正常处理不同Topic在存在数据关联的消费顺序问题。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。