赞
踩
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.3.9</version>
</dependency>
2.springboot
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.3</version>
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
package com.example.demo_kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Component public class ConsumerHandler { //kafka消费对象 private KafkaConsumer<Object, Object> consumer; //线程池对象 private ExecutorService executors; //kafka属性配置() public static Properties initConfig() { Properties props = new Properties(); props.put("bootstrap.servers", "172.16.1.240:9092"); props.put("group.id", "test01"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } //初始化kafka连接 @PostConstruct public void initKafkaConfig() { Properties properties = initConfig(); consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton("test001")); } /** * 多线程消费kafka数据 * @param workerNum */ public void execute(int workerNum) { executors = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); while (true) { ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofMillis(100)); if (!consumerRecords.isEmpty()) { for (final ConsumerRecord record : consumerRecords) { executors.submit(new Worker(record)); } } } } }
业务处理
package com.example.demo_kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; public class Worker implements Runnable { private ConsumerRecord<String, String> consumerRecord; public Worker(ConsumerRecord record) { this.consumerRecord = record; } @Override public void run() { // 这里写你的消息处理逻辑只是简单地打印消息 System.out.println(Thread.currentThread().getId()+" "+Thread.currentThread().getName() + " consumed " + consumerRecord.partition() + "th message with offset: " + consumerRecord.offset()+"=======>"+ consumerRecord.value()); } }
package com.example.demo_kafka; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.util.concurrent.ListenableFuture; @EnableScheduling @SpringBootApplication public class DemoKafkaApplication { public static void main(String[] args) { SpringApplication.run(DemoKafkaApplication.class, args); } @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private ConsumerHandler consumers; @Scheduled(cron = "0/30 * * * * ? ") public void test() { System.err.println("定时消费"); consumers.execute(10); } @Scheduled(cron = "0/10 * * * * ? ") public void test01() { System.err.println("定时生产"); for (int i = 0; i < 10000; i++) { try{ String message = "{\"x1\": 2133, \"x2\": 2477, \"y1\": 1568, \"y2\": 1888, \"conf\": 0.791015625, \"label_id\": 0, \"label_name\": \"trash\"}"; ListenableFuture<SendResult<String, String>> topic_test = kafkaTemplate.send("test001", message); System.err.println(message); }catch (Exception e){ e.printStackTrace(); } } } }
使用队列进行接收消息,在使用多线程进行消费信息#####
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。