赞
踩
实现在Spring Boot项目中监听Kafka指定topic中的消息,有两种实现思路:
一种是使用Spring Boot提供的@KafkaListener注解
另外一种是在kafka提供的原生java客户端中,消费者使用定时任务或者采while(true){…}进行消息拉取,这种方式可以避免与parent 版本出现冲突
目录
二、while(true){…}用.poll()方式进行消息拉取
导入依赖
- <!-- spring-kafka -->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>版本号</version>
- </dependency>
配置文件
- # kafka地址
- spring.kafka.bootstrap-servers=127.0.0.1:9092
- # 消费者组ID
- spring.kafka.consumer.group-id=1
- # 键序列化方式
- spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- # 值序列化方式
- spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- # 默认提交偏移量
- spring.kafka.consumer.enable-auto-commit=true
监听类配置
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class MyKafkaConsumer {
-
- @KafkaListener(topics = {"test","dev"}) //在这里指定要监听的topic,可以监听多个
- public void listenToMessage(String message){
- System.out.println("使用注解监听到的消息"+message);
- }
- }
运行效果
导入依赖
- <!--kafka-->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>版本号</version>
- </dependency>
配置文件
这里以properties文件为例
- #建立与kafka集群连接的host/port组,请通过控制台公网访问获取
- bootstrap.servers=127.0.0.1:9092
- #用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group
- group.id=1
- #键的序列化方式
- key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- #值的序列化方式
- value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- #自动提交
- enable.auto.commit=true
编写配置类
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
-
- import java.io.BufferedInputStream;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.net.URL;
- import java.util.ArrayList;
- import java.util.Enumeration;
- import java.util.List;
- import java.util.Properties;
-
- public class MqsConsumer {
-
- public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";//配置文件
-
- private KafkaConsumer<Object, Object> consumer;
-
- MqsConsumer(String path)
- {
- Properties props = new Properties();
- try {
- InputStream in = new BufferedInputStream(new FileInputStream(path));
- props.load(in);
- }catch (IOException e)
- {
- e.printStackTrace();
- return;
- }
- consumer = new KafkaConsumer<Object, Object>(props);
- }
-
- public MqsConsumer()
- {
- Properties props = new Properties();
- try {
- props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
- }catch (IOException e)
- {
- e.printStackTrace();
- return;
- }
- consumer = new KafkaConsumer<Object, Object>(props);
- }
- public void consume(List topics)
- {
- consumer.subscribe(topics);
- }
-
- public ConsumerRecords<Object, Object> poll(long timeout)
- {
- return consumer.poll(timeout);
- }
-
- public void close()
- {
- consumer.close();
- }
-
- /**
- * get classloader from thread context if no classloader found in thread
- * context return the classloader which has loaded this class
- *
- * @return classloader*/
-
-
- public static ClassLoader getCurrentClassLoader()
- {
- ClassLoader classLoader = Thread.currentThread()
- .getContextClassLoader();
- if (classLoader == null)
- {
- classLoader = MqsConsumer.class.getClassLoader();
- }
- return classLoader;
- }
-
- /**
- * 从classpath 加载配置信息
- *
- * @param configFileName 配置文件名称
- * @return 配置信息
- * @throws IOException*/
-
-
- public static Properties loadFromClasspath(String configFileName) throws IOException
- {
- ClassLoader classLoader = getCurrentClassLoader();
- Properties config = new Properties();
-
- List<URL> properties = new ArrayList<URL>();
- Enumeration<URL> propertyResources = classLoader
- .getResources(configFileName);
- while (propertyResources.hasMoreElements())
- {
- properties.add(propertyResources.nextElement());
- }
-
- for (URL url : properties)
- {
- InputStream is = null;
- try
- {
- is = url.openStream();
- config.load(is);
- }
- finally
- {
- if (is != null)
- {
- is.close();
- is = null;
- }
- }
- }
-
- return config;
- }
采用while(true){...}进行消息拉取
- import com.rococo.mqs.consumer.MqsConsumer;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- import java.util.Arrays;
-
-
- public class KafkaController {
- @PostConstruct
- public void testConsumer() throws Exception {
- MqsConsumer consumer = new MqsConsumer();
- consumer.consume(Arrays.asList("test"));//监听的topic
- try {
- while (true){
- //timeout表示消费者在没有可用消息时愿意等待的最大时间。等待期间,一旦有消息到达,poll() 会立即返回,并从新开始计时。
- ConsumerRecords<Object, Object> records = consumer.poll(1000);
- System.out.println("the numbers of topic:" + records.count());
- for (ConsumerRecord<Object, Object> record : records)
- {
- Object value = record.value();
- System.out.println(value);
- }
- }
- }catch (Exception e)
- {
- // 异常处理
- e.printStackTrace();
- }finally {
- consumer.close();
- }
- }
- }
执行效果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。