赞
踩
在公司使用Kafka的过程中,存在跨区消费顺序错乱导致的数据一致性问题
而公司的业务场景,要求数据必须严格一致,所以这个问题一直让我们头疼
最近,我特别抽出一段时间,就这个问题的产生场景做了一些测试,并结合之前的一些处理方案,整理出了这篇博客.
其中博客内容的一,二两点介绍了我们实际业务场景遇到的问题和之前尝试的一些解决方案,不敢兴趣的可以从第三点开始阅读
本篇博客要点如下:
我们使用Kafka的主要场景是:
生产端推送因交易产生的OGG队列文件给消费端-->消费端对数据进行处理-->数据落地到MongoDB
消费次序错乱引发的问题主要表现在以下几个方面:
1. 数据和源端不一致(这个是遭到投诉最多的问题)
2. 数据重复(在不自定义Mongo主键的情况下,更新队列优先于插入队列处理)
3. 数据缺失(在自定义Mongo主键的情况下,更新队列优先于插入队列处理(插入队列写入失败,更新队列因分片键原因没有写入交易日期,导致数据缺失))
1. 更新队列API替换(针对数据重复和数据缺失场景)
对于MongoTemplate API,更新时使用updateFirst或者updateMulti 方法 替换 upsert方法
替代方法,在数据库没有该条数据时,不会插入, 该方案能够基本上解决数据重复和数据缺失的问题
但会加重数据和源端不一致的问题
2. 临时补丁上线(针对数据缺失场景)
该补丁主要用于解决数据缺失的问题, 会在一定时间内自动补全数据库中不包含结算日期数据的日期信息,
目前,该补丁大大缓解了数据缺失的问题
3. 自定义分区
通过自定义分区设置,将同一笔交易的所有数据发往同一个分区,保证同一笔交易按照发生时间的先后顺序进行消费, 就目前的观测来看很好的改善了数据一致性的问题
对于kafka的使用,整体上可分为如下四种场景:
a. 单分区生产,单线程消费
b. 单分区生产,多线程消费
c. 多分区生产,单线程消费
d. 多分区生产,多线程消费
测试时针对以下四种场景,不采用自定义分区
其中 :测试的key为1~100的随机整数, 值为从0其递增的整数
topic的创建和分区数的设置通过kafka manager工具,测试时多线程消费采用Java提供的线程池ThreadPoolExecutor
日志使用的是slf4j框架,可以在程序运行的时候打印出执行时间,使用线程等关键信息
通过打印日志值的情况,消费数据的时间,线程使用等消息来判断是否存在乱序问题
我测试时操作Kafka使用的是SpringBoot框架
SpringBoot框架需要导入的依赖如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
如果使用原生的Kafka需要导入如下依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.3.0</version>
</dependency>
生产端的代码
package pers.xmr.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.Random; /** * @author xmr * @date 2019/7/31 15:10 * @description kafka生产端测试 */ public class MyProducer { public static void main(String[] args) { Properties properties = MyProducer.getProperties(); Producer<String, String> producer = new KafkaProducer<>(properties); Random random = new Random(); int i = 0; while (true) { producer.send(new ProducerRecord<String, String>("xmr_topic2", String.valueOf(random.nextInt(100)),String.valueOf(i++))); } } private static Properties getProperties() { Properties props = new Properties(); props.put("bootstrap.servers", "10.213.32.96:9092,10.213.32.97:9092,10.213.32.98:9092"); props.put("acks", "all"); //判断是否发送成功,不成功会阻塞所有消息,性能低,但是可靠性高 props.put("retries", 0); // 请求失败不自动重试,启用重试,可能会出现消息重复 props.put("batch.size", 16384); //缓存区域大小 props.put("linger.ms", 1); props.put("buffer.memory", 33554432); // 生产者可用的缓存总量 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } }
消费端代码, 包含单线程消费和多线程消费
package pers.xmr.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.*; import java.util.concurrent.*; /** * @author xmr * @date 2019/7/31 15:45 * @description */ @Service public class MyConsumer { private Logger logger = LoggerFactory.getLogger(getClass()); private KafkaConsumer<String, String> consumer; public static void main(String[] args) { Properties props = MyConsumer.getProperties(); MyConsumer myConsumer = new MyConsumer(); myConsumer.multiThread(props); myConsumer.singleThread(props); } @Async public void multiThread(Properties props) { consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList("xmr_topic2")); execute(); } private void execute() { try { ExecutorService executors = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); while (true) { ConsumerRecords<String, String> records = consumer.poll(200); for (final ConsumerRecord record : records) { //获取新这个partition中的最后一条记录的offset并加1 那么这个位置就是下一次要提交的offset ConsumerRunner consumerRunner = new ConsumerRunner(consumer, record); executors.submit(consumerRunner); } consumer.commitAsync(); } } finally { consumer.close(); } } private void singleThread(Properties props) { consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList("xmr_topic2")); ConsumerRunner consumerRunner = new ConsumerRunner(consumer); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { logger.info("offset = " + record.offset() + "partation = " + record.partition()+ "key= " + record.key()+ "value= " + record.value()); } } } public static Properties getProperties(){ Properties props = new Properties(); props.put("bootstrap.servers", "10.213.32.96:9092,10.213.32.97:9092,10.213.32.98:9092"); props.put("group.id", "test2"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } } class ConsumerRunner implements Runnable { private Logger logger = LoggerFactory.getLogger(getClass()); KafkaConsumer consumer; ConsumerRecord record; ConsumerRunner(KafkaConsumer consumer) { this.consumer = consumer; } ConsumerRunner(KafkaConsumer consumer, ConsumerRecord record) { this.consumer = consumer; this.record = record; } @Override public void run() { logger.info("offset = " + record.offset() + "partation = " + record.partition()+ "key= " + record.key()+ "value= " + record.value()); } }
1. 单分区生产,单线程消费
日志截取如下:
10:00:04.710 [main] INFO pers.xmr.kafka.MyConsumer - offset = 4246988partation = 0key= 28value= 4246988
10:00:04.710 [main] INFO pers.xmr.kafka.MyConsumer - offset = 4246989partation = 0key= 42value= 4246989
10:00:04.710 [main] INFO pers.xmr.kafka.MyConsumer - offset = 4246990partation = 0key= 25value= 4246990
10:00:04.710 [main] INFO pers.xmr.kafka.MyConsumer - offset = 4246991partation = 0key= 95value= 4246991
10:00:04.710 [main] INFO pers.xmr.kafka.MyConsumer - offset = 4246992partation = 0key= 6value= 4246992
10:00:04.710 [main] INFO pers.xmr.kafka.MyConsumer - offset = 4246993partation = 0key= 96value= 4246993
10:00:04.710 [main] INFO pers.xmr.kafka.MyConsumer - offset = 4246994partation = 0key= 35value= 4246994
从日志中可以看到, 主线程完全按照推送顺序进行消费,不存在顺序错乱问题
2. 单分区生产,多线程消费
日志截取如下:
10:12:21.469 [pool-1-thread-2] INFO pers.xmr.kafka.ConsumerRunner - offset = 3001649partation = 0key= 21value= 3001649
10:12:21.469 [pool-1-thread-4] INFO pers.xmr.kafka.ConsumerRunner - offset = 3001650partation = 0key= 1value= 3001650
10:12:21.469 [pool-1-thread-2] INFO pers.xmr.kafka.ConsumerRunner - offset = 3001651partation = 0key= 16value= 3001651
10:12:21.470 [pool-1-thread-2] INFO pers.xmr.kafka.ConsumerRunner - offset = 3001653partation = 0key= 25value= 3001653
10:12:21.469 [pool-1-thread-1] INFO pers.xmr.kafka.ConsumerRunner - offset = 3001652partation = 0key= 55value= 3001652
10:12:21.471 [pool-1-thread-1] INFO pers.xmr.kafka.ConsumerRunner - offset = 3001655partation = 0key= 39value= 3001655
从日志中可以看到,尽管不同线程去消费同一分区的数据,但从时间上来看,消费的顺序确实是按照偏移量从小到大,因此也不存在顺序错乱问题
3. 多分区生产,单线程消费
日志截取如下 :
10:05:26.650 [main] INFO pers.xmr.kafka.MyConsumer - offset = 1058594partation = 1key= 25value= 6231830
10:05:26.650 [main] INFO pers.xmr.kafka.MyConsumer - offset = 1058595partation = 1key= 16value= 6231840
10:05:26.650 [main] INFO pers.xmr.kafka.MyConsumer - offset = 1058596partation = 1key= 65value= 6231843
10:05:26.650 [main] INFO pers.xmr.kafka.MyConsumer - offset = 1058597partation = 1key= 16value= 6231845
10:05:26.650 [main] INFO pers.xmr.kafka.MyConsumer - offset = 1058598partation = 1key= 29value= 6231847
10:05:26.650 [main] INFO pers.xmr.kafka.MyConsumer - offset = 1058599partation = 1key= 73value= 6231848
10:05:26.660 [main] INFO pers.xmr.kafka.MyConsumer - offset = 1058599partation = 2key= 73value= 6231835
这里,分区内消费顺序是一致的,但是分区间的顺序是错乱的比如说值为6231835,6231843的两条数据如果对应于同一笔交易,就会出现数据一致性的问题
4. 多分区生产,多线程消费
日志截取如下 :
10:07:30.844 [pool-1-thread-3] INFO pers.xmr.kafka.ConsumerRunner - offset = 1162772partation = 2key= 74value= 5816608
10:07:30.963 [pool-1-thread-2] INFO pers.xmr.kafka.ConsumerRunner - offset = 1722708partation = 3key= 3value= 5385007
10:07:30.963 [pool-1-thread-2] INFO pers.xmr.kafka.ConsumerRunner - offset = 1722711partation = 3key= 28value= 5385022
从以上三条日志可以看出,多分区,多线程的场景下,在不自定义分区的情形,面临的顺序错乱问题是很严峻的
总结 : 多分区生产, 单线程消费. 和多分区生产,多线程消费两种场景会产生数据一致性问题
注意 : 多分区生产,单线程消费 和单分区生产,多线程消费可能和我们预期的结果有所出入
这里, 有两个误区 :
a. kafka只能保证数据分区内有序, 不能保证数据全局有序
b. 同一时刻,kafka的一个分区的数据只能被一个线程消费, 但不意味着,一个线程只会消费一个分区的数据
针对多分区产生的问题,我们通过自定义分区来解决,
自定义分区的要点:
a. 保证数据基本均匀的落在每个分区
b. 保证同一笔交易对应的所有流水在同一个分区
由于我们的Kafka生产端是使用OGG推送数据,自定义分区代码不够纯粹,所以关于自定义分区,我仅仅给出一个最简单的demo实现,其中关键是实现Partitioner接口
package pers.xmr.kafka; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class ProducerPartition { public static void main(String args[]) { //1.配置生产者属性 Properties props = new Properties(); // Kafka服务端的主机名和端口号,可以是多个 props.put("bootstrap.servers", "ip:9092"); //配置发送的消息是否等待应答 props.put("acks", "all"); //配置消息发送失败的重试 props.put("retries", 0); // 批量处理数据的大小:16kb props.put("batch.size", 16384); // 设置批量处理数据的延迟,单位:ms props.put("linger.ms", 1); // 设置内存缓冲区的大小 props.put("buffer.memory", 33554432); //数据在发送之前一定要序列化 // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //设置分区 props.put("partitioner.class", "cn.ysjh.Partition"); //2.实例化KafkaProducer KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 50; i < 100; i++) { //3.调用Producer的send方法,进行消息的发送,每条待发送的消息,都必须封装为一个Record对象,接口回调 producer.send(new ProducerRecord<String, String>("test", "hello"+i), new Callback() { @Override public void onCompletion(RecordMetadata arg0, Exception arg1) { if(arg0!=null) { System.out.println(arg0.partition()+"--"+arg0.offset()); } } }); } //4.close释放资源 producer.close(); } }
package pers.xmr.kafka; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; public class Partition implements Partitioner{ @Override public void configure(Map<String, ?> arg0) { // TODO Auto-generated method stub } @Override public void close() { // TODO Auto-generated method stub } @Override public int partition(String arg0, Object arg1, byte[] arg2, Object arg3, byte[] arg4, Cluster arg5) { // TODO Auto-generated method stub return 0; } }
显然, 多分区生产,单线程消费在进行自定义分区处理后, 一定能够保证同一笔交易按照发生时间先后顺序进行消费(因为都在一个分区里,依序消费)
那么,多分区,多线程场景下使用自定义分区会完全解决顺序错乱的问题么?
我采用当前最流行的两种多线程消费的框架来进行测试, 为了保证测试场景紧贴实际业务,我把我们的业务代码封装到了这两种框架里进行测试
1. 使用Java提供的线程池ThreadPoolExecutor
实现要点 : 创建一个指定线程数量的线程池, 并将消费的逻辑封装在一个实现Runnable的类里面, 或者使用匿名内部类或者lambda表达式实现
测试结果 :
2019-08-01 10:51:00:443 INFO [pool-1-thread-5] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到更新队列数据! AC_DT : 20190720LOG_NO: 707812011734TXN_STS: U
2019-08-01 10:51:00:445 INFO [pool-1-thread-2] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到更新队列数据! AC_DT : 20190720LOG_NO: 707812011734TXN_STS: T
2019-08-01 10:51:00:530 INFO [pool-1-thread-4] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到插入队列数据! AC_DT: 20190720LOG_NO: 708613011826TXN_STS: U
2019-08-01 10:51:00:558 INFO [pool-1-thread-2] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到更新队列数据! AC_DT : 20190720LOG_NO: 708613011826TXN_STS: U
2019-08-01 10:51:00:560 INFO [pool-1-thread-5] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到更新队列数据! AC_DT : 20190720LOG_NO: 708613011826TXN_STS: T
从日志打印情况来看, 尽管使用了自定义分区,但是同一笔交易的流水被不同的线程处理, 严重不符合预期!
2. 使用SpringBoot提供的kafka api
实现要点 : 在需要监听的方法前添加如下注解 : @KafkaListener(topics = "ogg_etl_serial") , topics后面跟的是需要监听的topic
下面介绍一个最简单的实现:
package pers.xmr.listener; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; /** * @author xmr * @date 2019/8/1 11:26 * @description */ @Service public class KafkaListener { private Logger logger = LoggerFactory.getLogger(getClass()); @KafkaListener(topics = "ogg_etl_serial") public void listen0(List<ConsumerRecord<?, ?>> records, Acknowledgment ack, Consumer<?, ?> consumer){ for (ConsumerRecord<?, ?> record : records) { logger.info("offset = " + record.offset() + "partation = " + record.partition()+ "key= " + record.key()+ "value= " + record.value()); } ack.acknowledge(); } }
这部分比较关键,所以多截取一些运行日志, 测试结果 : 2019-08-01 14:24:07:592 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-5-C-1] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到插入队列数据! AC_DT: 20190720LOG_NO: 708385011794TXN_STS: U 2019-08-01 14:24:07:641 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-5-C-1] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到更新队列数据! AC_DT : 20190720LOG_NO: 708385011794TXN_STS: U 2019-08-01 14:24:07:812 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-5-C-1] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到更新队列数据! AC_DT : 20190720LOG_NO: 708385011794TXN_STS: T 2019-08-01 14:24:07:878 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] org.mongodb.driver.connection - Opened connection [connectionId{localValue:12}] to 10.213.32.84:50000 2019-08-01 14:24:08:245 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-5-C-1] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到插入队列数据! AC_DT: 20190720LOG_NO: 707299011675TXN_STS: U 2019-08-01 14:24:08:292 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-5-C-1] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到更新队列数据! AC_DT : 20190720LOG_NO: 707299011675TXN_STS: U 2019-08-01 14:24:08:409 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-5-C-1] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到更新队列数据! AC_DT : 20190720LOG_NO: 707299011675TXN_STS: T 2019-08-01 14:24:08:553 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到插入队列数据! AC_DT: 20190720LOG_NO: 707577011707TXN_STS: U 2019-08-01 14:24:08:595 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到更新队列数据! AC_DT : 20190720LOG_NO: 707577011707TXN_STS: U 2019-08-01 14:24:08:753 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到更新队列数据! AC_DT : 20190720LOG_NO: 707577011707TXN_STS: T 2019-08-01 14:24:08:845 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-5-C-1] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到插入队列数据! AC_DT: 20190720LOG_NO: 708286011783TXN_STS: U 2019-08-01 14:24:09:100 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-5-C-1] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到更新队列数据! AC_DT : 20190720LOG_NO: 708286011783TXN_STS: U 2019-08-01 14:24:09:208 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-5-C-1] c.y.b.p.e.s.i.HpstjnlServiceImpl - 读取到更新队列数据! AC_DT : 20190720LOG_NO: 708286011783TXN_STS: T 在这里可以看到,同一笔交易的所有流水被同一个线程按照推送过来的时间先后顺序进行消费,符合预期!
通过上述的一些测试,想要在自定义分区的前提下,保证消费顺序的一直性
需要摒弃Java提供的线程池,使用springBoot提供的kafka api,
由于框架把基本的实现都给我们封装好,所以使用起来非常简单,而且效果很好!
最终: 通过自定义分区 + springBoot提供的kafka多线程消费方式, 基本上能够解决跨分区消费引发的乱序问题!
以上,是我的一些理解,希望能对大家遇到类似问题时提供一些帮助!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。