赞
踩
1、kafka消费者配置类
- @Configuration
- @Slf4j
- public class KafkaConfig {
-
- @Value("${spring.kafka.bootstrap-servers}")
- private String kafkaIps;// 172.168.16.18:8092,172.35.138.28:8099
-
- public static KafkaConsumer<String, String> kafkaConsumer;
-
- @Bean
- public void loadKafkaConfig() {
- Properties props = new Properties();
- props.put("bootstrap.servers", kafkaIps);
- props.put("group.id", "GROUP_ID_888");
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "1000");
- props.put("session.timeout.ms", "30000");
- props.put("max.poll.records", 1000);
- props.put("auto.offset.reset", "earliest");
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- kafkaConsumer = new KafkaConsumer<String, String>(props);
- kafkaConsumer.subscribe(Arrays.asList("one","two"));// 消费者订阅的topic, 可同时订阅多个
- log.info("消息订阅成功!kafka配置:" + props.toString());
- //启动消息监听线程
- KafkaListenerJob kafkaListenerJob = new KafkaListenerJob();
- Thread t = new Thread(kafkaListenerJob);
- t.start();
- }
- }
2、为了解决线程内中无法使用@Autowired注入Bean,新增一个类,实现ApplicationContextAware接口。
(@Autowired注入Spring Bean,则当前类必须也是Spring Bean才能注入成功,不能用new xxx()来获得对象,这种方式获得的对象也无法使用@Autowired注解注入Bean。
因此,当我们在new一个线程之后,发现线程里使用@Autowired注入的对象都是空的)
-
- import org.springframework.beans.BeansException;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.stereotype.Component;
-
- @Component
- public class ApplicationContextUtil implements ApplicationContextAware {
-
- private static ApplicationContext applicationContext = null;
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- ApplicationContextUtil.applicationContext = applicationContext;
- }
-
- public static Object getBeanByName(String beanName) {
- if (applicationContext == null) {
- return null;
- }
- return applicationContext.getBean(beanName);
- }
-
- public static <T> T getBean(Class<T> type) {
- return applicationContext.getBean(type);
- }
- }
3、kafka消息监听线程类
- @Slf4j
- @Component
- public class KafkaListenerJob implements Runnable{
-
- private OdsWmsAdcMapper odsWmsAdcMapper = ApplicationContextUtil.getBean(OdsWmsAdcMapper.class);
-
- @Override
- public void run() {
- log.info("kafka消息监听任务已启动!");
- //进行消息监听
- while (true) {
- ConsumerRecords<String, String> records = KafkaConfig.kafkaConsumer.poll(6000);
- log.info("-------------读取数据条数----------" + records.count());
- for (ConsumerRecord<String, String> record : records) {
- try {
- OdsWmsAdc odsWmsAdc = JSON.parseObject(record.value(), OdsWmsAdc.class);
- log.info("---------插入数据---------" + odsWmsAdc.toString());
- odsWmsAdcMapper.insert(odsWmsAdc);
- } catch (Exception e) {
- log.error("消息消费异常!", e);
- }
- }
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。