赞
踩
SpringBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表
更多优秀文章,请扫码关注个人微信公众号或搜索“程序猿小杨”添加。
一、背景
因业务发展需要,需要对接kafka,快速批量接收消息日志,避免消息日志累积过多,必须做到数据处理后,动态插入到库表(相同表结构,不同表名)下,并且还要支持批量事务提交,实现消息快速消费。(注意:源码文章最后有获取方式)
二、核心代码
2.1、开启批量、并发消费
- kafka:
- bootstrap-servers: 10.1.*.*:9092 #服务器的ip及端口,可以写多个,服务器之间用“:”间隔
- producer: #生产者配置
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- consumer: #消费者配置
- #指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
- group-id: myGroup #设置消费者的组id default:Group
- enable-auto-commit: true #设置自动提交offset
- auto-commit-interval: 2000 #默认值为5000
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- #值的反序列化方式
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- auto-offset-reset: latest
- max-poll-records: 2000 #批量一次最大拉取数据量 默认500
- listener:
- # poll-timeout: 1000
- type: batch # 开启批量消费
- concurrency: 3 #指定listener 容器中的线程数,用于提高并发量
- properties:
- session:
- timeout:
- ms: 120000 #默认10000
- max:
- poll:
- interval:
- ms: 600000 #默认300000(5分钟)
说明:type: batch # 开启批量消费, max-poll-records: 2000,批量消费每次最多消费记录数。这里设置 max-poll-records是2000,并不是说如果没有达到2000条消息,我们就一直等待。而是说一次poll最多返回的记录数为2000。concurrency: 3 #指定listener 容器中的线程数,用于提高并发量。注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。例如:设置concurrency为3,也就是将会启动3条线程进行监听,而要监听的topic有5个partition,意味着将有2条线程都是分配到2个partition,还有1条线程分配到1个partition。
2.2、多线程异步配置
具体配置参加前面文章:SpringBoot使用@Async实现多线程异步
注意:在启动类上需要加上注解@EnableAsync,开启异步。
2.3、redis相关配置
1、yml相关配置:
- spring:
- redis:
- # 地址
- host: 127.0.0.1
- # 端口,默认为6379
- port: 6379
- # 密码
- # 连接超时时间
- timeout: 10s
- lettuce:
- pool:
- # 连接池中的最小空闲连接
- min-idle: 0
- # 连接池中的最大空闲连接
- max-idle: 8
- # 连接池的最大数据库连接数
- max-active: 8
- # #连接池最大阻塞等待时间(使用负值表示没有限制)
- max-wait: -1ms
2、RedisConfig配置
- package com.wonders.config;
-
- import com.fasterxml.jackson.annotation.JsonAutoDetect;
- import com.fasterxml.jackson.annotation.PropertyAccessor;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import org.springframework.cache.annotation.CachingConfigurerSupport;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.data.redis.connection.RedisConnectionFactory;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
- import org.springframework.data.redis.serializer.StringRedisSerializer;
-
- /**
- * 〈自定义redis序列化方式〉
- * @author yangyalin
- * @create 2018/11/1
- * @since 1.0.0
- */
- @Configuration
- public class RedisConfig extends CachingConfigurerSupport {
- /**
- * @Author yangyalin
- * @Description redisTemplate序列化使用的jdkSerializeable, 存储二进制字节码(默认), 所以自定义序列化类
- * 用于存储可视化内容
- * @Date 15:07 2018/11/1
- * @Param [redisConnectionFactory]
- * @return org.springframework.data.redis.core.RedisTemplate<java.lang.Object,java.lang.Object>
- **/
- @Bean
- public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
- RedisTemplate<Object,Object> redisTemplate=new RedisTemplate();
- redisTemplate.setConnectionFactory(redisConnectionFactory);
- //使用jackson2JsonRedisSerializer替换默认序列化
- Jackson2JsonRedisSerializer jackson2JsonRedisSerializer=new Jackson2JsonRedisSerializer(Object.class);
- ObjectMapper objectMapper=new ObjectMapper();
- objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
- objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
- jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
- //设置key和value的序列化规则
- redisTemplate.setKeySerializer(new StringRedisSerializer());
- redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
- redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer);
- redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
- redisTemplate.afterPropertiesSet();
- return redisTemplate;
- }
- }
2.4、动态表名
- <!--插入到kafka日志临时表中-->
- <insert id="insertMsgInfoTemp" parameterType="com.wonders.entity.KafkaMsgConfig">
- INSERT INTO ${logTableName}("EVN_LOG_ID", "TABLE_NAME", "OPERATION", "PK_VALUE1", "PK_VALUE2",
- "PK_VALUE3", "PK_VALUE4", "PK_VALUE5", "TRANS_FLAG", "PKS", "BASE_CODE", "PLA_BRANCH_CODE",
- "CREATE_TIME","MSG_PRODUCE_TIME")
- VALUES (#{id,jdbcType=VARCHAR}, #{tableName,jdbcType=VARCHAR}, #{operation,jdbcType=VARCHAR},
- #{pk1,jdbcType=VARCHAR}, #{pk2,jdbcType=VARCHAR},#{pk3,jdbcType=VARCHAR},
- #{pk4,jdbcType=VARCHAR},#{pk5,jdbcType=VARCHAR}, 'Y',
- #{pks,jdbcType=VARCHAR}, #{baseCode,jdbcType=VARCHAR},
- #{plaBranchCode,jdbcType=VARCHAR},sysdate,#{msgProduceTime,jdbcType=VARCHAR})
- </insert>
说明:1、#{} :会根据参数的类型进行处理,当传入String类型,则会为参数加上双引号(占位符);2、${} :将参数取出不做任何处理,直接放入语句中,就是简单的字符串替换(替换符)。
2.5、sql批量提交
- public void batchInsert(List<KafkaMsgInfo> kafkaMsgInfoList) throws Exception{
- //如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交
- // 创建session实列
- SqlSessionFactory sqlSessionFactory = ApplicationContextUtils.getBean("sqlSessionFactory");
- // 开启批量处理模式 BATCH 、关闭自动提交事务 false
- SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH,false);
- KafkaMsgConfigMapper KafkaMsgMapper = sqlSession.getMapper(KafkaMsgConfigMapper.class);
- int BATCH = 1000;
- for (int i = 0,size=kafkaMsgInfoList.size(); i < size; i++) {
- //循环插入 + 开启批处理模式
- KafkaMsgMapper.insertKafkaMsgInfo(kafkaMsgInfoList.get(i));
- if (i != 0 && i % BATCH == 0) {
- sqlSession .commit();
- }
- }
- // 一次性提交事务
- sqlSession.commit();
- // 关闭资源
- sqlSession.close();
- }
2.6、业务代码
- @KafkaListener(topics = {"${mykafka.topics:mytopic}"})
- public void myMQConsumer(List<String> msgList){
- log.info("接收到的消息条数size:"+msgList.size());
- //计算程序耗时时间
- StopWatch stopWatch = new StopWatch();
- // 开始计时
- stopWatch.start();
- this.getKafkaMsgAndDel(msgList); //2、接收kafka日志并解析
- stopWatch.stop();
- log.info("本次任务耗时(秒):" + stopWatch.getLastTaskTimeMillis()/1000 + "s");
- }
三、测试结果
序号 | kafka数量(万条) | 消耗(秒) |
1 | 1 | 3 |
2 | 10 | 13 |
3 | 100 | 120 |
更多详细资料,请关注个人微信公众号或搜索“程序猿小杨”添加。
回复:源码,可以获取该项目对应的源码及表结构,开箱即可使用。
推荐文章:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。