当前位置:   article > 正文

SpringBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表_spring boot kafka 多线程消费

spring boot kafka 多线程消费

SpringBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表

 更多优秀文章,请扫码关注个人微信公众号或搜索“程序猿小杨”添加。

一、背景

        因业务发展需要,需要对接kafka,快速批量接收消息日志,避免消息日志累积过多,必须做到数据处理后,动态插入到库表(相同表结构,不同表名)下,并且还要支持批量事务提交,实现消息快速消费。(注意:源码文章最后有获取方式)

二、核心代码

2.1、开启批量、并发消费

  1. kafka:
  2.     bootstrap-servers: 10.1.*.*:9092     #服务器的ip及端口,可以写多个,服务器之间用“:”间隔
  3.     producer: #生产者配置 
  4.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  5.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  6.     consumer: #消费者配置
  7.       #指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
  8.       group-id: myGroup                 #设置消费者的组id defaultGroup
  9.       enable-auto-commit: true  #设置自动提交offset
  10.       auto-commit-interval: 2000  #默认值为5000
  11.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  12.       #值的反序列化方式
  13.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  14.       auto-offset-reset: latest
  15.       max-poll-records: 2000  #批量一次最大拉取数据量 默认500
  16.     listener:
  17.       # poll-timeout: 1000
  18.       type: batch  # 开启批量消费
  19.       concurrency: 3  #指定listener 容器中的线程数,用于提高并发量
  20.     properties:
  21.       session:
  22.         timeout:
  23.           ms: 120000  #默认10000
  24.         max:
  25.           poll:
  26.             interval:
  27.               ms: 600000  #默认300000(5分钟)

       说明:type: batch  # 开启批量消费, max-poll-records: 2000,批量消费每次最多消费记录数。这里设置 max-poll-records是2000,并不是说如果没有达到2000条消息,我们就一直等待。而是说一次poll最多返回的记录数为2000。concurrency: 3  #指定listener 容器中的线程数,用于提高并发量。注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。例如:设置concurrency为3,也就是将会启动3条线程进行监听,而要监听的topic有5个partition,意味着将有2条线程都是分配到2个partition,还有1条线程分配到1个partition。

2.2、多线程异步配置

    具体配置参加前面文章:SpringBoot使用@Async实现多线程异步

    注意:在启动类上需要加上注解@EnableAsync,开启异步。

2.3、redis相关配置

1、yml相关配置:

  1. spring:
  2.   redis:
  3.     # 地址
  4.     host: 127.0.0.1
  5.     # 端口,默认为6379
  6.     port: 6379
  7.     # 密码
  8.     # 连接超时时间
  9.     timeout: 10s
  10.     lettuce:
  11.       pool:
  12.         # 连接池中的最小空闲连接
  13.         min-idle: 0
  14.         # 连接池中的最大空闲连接
  15.         max-idle: 8
  16.         # 连接池的最大数据库连接数
  17.         max-active: 8
  18.         # #连接池最大阻塞等待时间(使用负值表示没有限制)
  19.         max-wait: -1ms

2、RedisConfig配置

  1. package com.wonders.config;
  2. import com.fasterxml.jackson.annotation.JsonAutoDetect;
  3. import com.fasterxml.jackson.annotation.PropertyAccessor;
  4. import com.fasterxml.jackson.databind.ObjectMapper;
  5. import org.springframework.cache.annotation.CachingConfigurerSupport;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.data.redis.connection.RedisConnectionFactory;
  9. import org.springframework.data.redis.core.RedisTemplate;
  10. import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
  11. import org.springframework.data.redis.serializer.StringRedisSerializer;
  12. /**
  13.  * 〈自定义redis序列化方式〉
  14.  * @author yangyalin
  15.  * @create 2018/11/1
  16.  * @since 1.0.0
  17.  */
  18. @Configuration
  19. public class RedisConfig extends CachingConfigurerSupport {
  20.     /**
  21.      * @Author yangyalin
  22.      * @Description redisTemplate序列化使用的jdkSerializeable, 存储二进制字节码(默认), 所以自定义序列化类
  23.      * 用于存储可视化内容
  24.      * @Date 15:07 2018/11/1
  25.      * @Param [redisConnectionFactory]
  26.      * @return org.springframework.data.redis.core.RedisTemplate<java.lang.Object,java.lang.Object>
  27.      **/
  28.     @Bean
  29.     public RedisTemplate<ObjectObject> redisTemplate(RedisConnectionFactory redisConnectionFactory){
  30.         RedisTemplate<Object,Object> redisTemplate=new RedisTemplate();
  31.         redisTemplate.setConnectionFactory(redisConnectionFactory);
  32.         //使用jackson2JsonRedisSerializer替换默认序列化
  33.         Jackson2JsonRedisSerializer jackson2JsonRedisSerializer=new Jackson2JsonRedisSerializer(Object.class);
  34.         ObjectMapper objectMapper=new ObjectMapper();
  35.         objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
  36.         objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
  37.         jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
  38.         //设置keyvalue的序列化规则
  39.         redisTemplate.setKeySerializer(new StringRedisSerializer());
  40.         redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
  41.         redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer);
  42.         redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
  43.         redisTemplate.afterPropertiesSet();
  44.         return redisTemplate;
  45.     }
  46. }

2.4、动态表名

  1.     <!--插入到kafka日志临时表中-->
  2.     <insert id="insertMsgInfoTemp" parameterType="com.wonders.entity.KafkaMsgConfig">
  3.       INSERT INTO ${logTableName}("EVN_LOG_ID""TABLE_NAME""OPERATION""PK_VALUE1""PK_VALUE2",
  4.            "PK_VALUE3""PK_VALUE4""PK_VALUE5""TRANS_FLAG""PKS""BASE_CODE""PLA_BRANCH_CODE",
  5.            "CREATE_TIME","MSG_PRODUCE_TIME")
  6.       VALUES (#{id,jdbcType=VARCHAR}, #{tableName,jdbcType=VARCHAR}, #{operation,jdbcType=VARCHAR},
  7.             #{pk1,jdbcType=VARCHAR}, #{pk2,jdbcType=VARCHAR},#{pk3,jdbcType=VARCHAR},
  8.             #{pk4,jdbcType=VARCHAR},#{pk5,jdbcType=VARCHAR}, 'Y',
  9.             #{pks,jdbcType=VARCHAR}, #{baseCode,jdbcType=VARCHAR},
  10.             #{plaBranchCode,jdbcType=VARCHAR},sysdate,#{msgProduceTime,jdbcType=VARCHAR})
  11.     </insert>

    说明:1、#{} :会根据参数的类型进行处理,当传入String类型,则会为参数加上双引号(占位符);2、${} :将参数取出不做任何处理,直接放入语句中,就是简单的字符串替换(替换符)。

2.5、sql批量提交

  1. public void batchInsert(List<KafkaMsgInfo> kafkaMsgInfoList) throws Exception{
  2.         //如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交
  3.         // 创建session实列
  4.         SqlSessionFactory sqlSessionFactory = ApplicationContextUtils.getBean("sqlSessionFactory");
  5.         // 开启批量处理模式 BATCH 、关闭自动提交事务 false
  6.         SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH,false);
  7.         KafkaMsgConfigMapper KafkaMsgMapper = sqlSession.getMapper(KafkaMsgConfigMapper.class);
  8.         int BATCH = 1000;
  9.         for (int i = 0,size=kafkaMsgInfoList.size(); i < size; i++) {
  10.             //循环插入 + 开启批处理模式
  11.             KafkaMsgMapper.insertKafkaMsgInfo(kafkaMsgInfoList.get(i));
  12.             if (i != 0 && i % BATCH == 0) {
  13.                 sqlSession .commit();
  14.             }
  15.         }
  16.         // 一次性提交事务
  17.         sqlSession.commit();
  18.         // 关闭资源
  19.         sqlSession.close();
  20.     }
2.6、业务代码
  1.  @KafkaListener(topics = {"${mykafka.topics:mytopic}"})
  2.     public void myMQConsumer(List<String> msgList){
  3.         log.info("接收到的消息条数size:"+msgList.size());
  4.         //计算程序耗时时间
  5.         StopWatch stopWatch = new StopWatch();
  6.         // 开始计时
  7.         stopWatch.start();
  8.         this.getKafkaMsgAndDel(msgList);  //2、接收kafka日志并解析
  9.         stopWatch.stop();
  10.         log.info("本次任务耗时(秒):" + stopWatch.getLastTaskTimeMillis()/1000 + "s");
  11.     }

三、测试结果

序号kafka数量(万条)消耗(秒)
113
21013
3100120

 

更多详细资料,请关注个人微信公众号或搜索“程序猿小杨”添加。

回复:源码,可以获取该项目对应的源码及表结构,开箱即可使用。

推荐文章:

    1、SpringBoot使用@Async实现多线程异步;

    2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;

    3、SpringBoot用线程池ThreadPoolExecutor处理百万级数据

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/394057
推荐阅读
相关标签
  

闽ICP备14008679号