当前位置:   article > 正文

kafka消费者监听(java)_kafka监听topic消费java写法

kafka监听topic消费java写法

1、kafka消费者配置类

  1. @Configuration
  2. @Slf4j
  3. public class KafkaConfig {
  4. @Value("${spring.kafka.bootstrap-servers}")
  5. private String kafkaIps;// 172.168.16.18:8092,172.35.138.28:8099
  6. public static KafkaConsumer<String, String> kafkaConsumer;
  7. @Bean
  8. public void loadKafkaConfig() {
  9. Properties props = new Properties();
  10. props.put("bootstrap.servers", kafkaIps);
  11. props.put("group.id", "GROUP_ID_888");
  12. props.put("enable.auto.commit", "true");
  13. props.put("auto.commit.interval.ms", "1000");
  14. props.put("session.timeout.ms", "30000");
  15. props.put("max.poll.records", 1000);
  16. props.put("auto.offset.reset", "earliest");
  17. props.put("key.deserializer", StringDeserializer.class.getName());
  18. props.put("value.deserializer", StringDeserializer.class.getName());
  19. kafkaConsumer = new KafkaConsumer<String, String>(props);
  20. kafkaConsumer.subscribe(Arrays.asList("one","two"));// 消费者订阅的topic, 可同时订阅多个
  21. log.info("消息订阅成功!kafka配置:" + props.toString());
  22. //启动消息监听线程
  23. KafkaListenerJob kafkaListenerJob = new KafkaListenerJob();
  24. Thread t = new Thread(kafkaListenerJob);
  25. t.start();
  26. }
  27. }

2、为了解决线程内中无法使用@Autowired注入Bean,新增一个类,实现ApplicationContextAware接口。

(@Autowired注入Spring Bean,则当前类必须也是Spring Bean才能注入成功,不能用new xxx()来获得对象,这种方式获得的对象也无法使用@Autowired注解注入Bean。
因此,当我们在new一个线程之后,发现线程里使用@Autowired注入的对象都是空的)

  1. import org.springframework.beans.BeansException;
  2. import org.springframework.context.ApplicationContext;
  3. import org.springframework.context.ApplicationContextAware;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class ApplicationContextUtil implements ApplicationContextAware {
  7. private static ApplicationContext applicationContext = null;
  8. @Override
  9. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  10. ApplicationContextUtil.applicationContext = applicationContext;
  11. }
  12. public static Object getBeanByName(String beanName) {
  13. if (applicationContext == null) {
  14. return null;
  15. }
  16. return applicationContext.getBean(beanName);
  17. }
  18. public static <T> T getBean(Class<T> type) {
  19. return applicationContext.getBean(type);
  20. }
  21. }

3、kafka消息监听线程类

  1. @Slf4j
  2. @Component
  3. public class KafkaListenerJob implements Runnable{
  4. private OdsWmsAdcMapper odsWmsAdcMapper = ApplicationContextUtil.getBean(OdsWmsAdcMapper.class);
  5. @Override
  6. public void run() {
  7. log.info("kafka消息监听任务已启动!");
  8. //进行消息监听
  9. while (true) {
  10. ConsumerRecords<String, String> records = KafkaConfig.kafkaConsumer.poll(6000);
  11. log.info("-------------读取数据条数----------" + records.count());
  12. for (ConsumerRecord<String, String> record : records) {
  13. try {
  14. OdsWmsAdc odsWmsAdc = JSON.parseObject(record.value(), OdsWmsAdc.class);
  15. log.info("---------插入数据---------" + odsWmsAdc.toString());
  16. odsWmsAdcMapper.insert(odsWmsAdc);
  17. } catch (Exception e) {
  18. log.error("消息消费异常!", e);
  19. }
  20. }
  21. }
  22. }
  23. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/906307
推荐阅读
相关标签
  

闽ICP备14008679号