赞
踩
1.引用jar包
org.springframework.kafka
spring-kafka
2.kafka配置文件
spring:
kafka:
consumer:
enable-auto-commit: true
group-id: applog
auto-offset-reset: latest
bootstrap-servers: 127.0.0.1:9092
#集群使用逗号隔开
3.创建kafka工具类ConsumerHandler
package com.netintech.kafka.kafkajob;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.lang.reflect.Constructor;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.*;
public class ConsumerHandler {
// 本例中使用一个consumer将消息放入后端队列,你当然可以使用前一种方法中的多实例按照某张规则同时把消息放入后端队列
private KafkaConsumerconsumer;
private ExecutorService executors;
private Properties props;
public ConsumerHandler(String servers,String commit,String intervalms,String timeoutms,String groupId, String topic) {
props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", groupId);
props.put("enable.auto.commit", commit);
props.put("auto.commit.interval.ms", intervalms);
props.put("session.timeout.ms", timeoutms);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
public void execute(int workerNum,Class> cls, String topic) throws Exception {
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
Constructor constructor = cls.getConstructor(new Class[]{ConsumerRecord.class});
while (true) {
//kafka为空重连
if(consumer!=null){
ConsumerRecordsrecords = consumer.poll(200);
for (final ConsumerRecord record : records) {
Runnable object =(Runnable) constructor.newInstance(new Object[]{record});
executors.submit(object);
}
}else{
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
}
}
public void stop(){
if (consumer != null) {
consumer.wakeup();
}
}
public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}
}
4.创建多线程消费任务类OneWork
package com.netintech.kafka.impl;
import com.alibaba.fastjson.JSONObject;
import com.netintech.kafka.bean.Test;
import com.netintech.kafka.service.TestService;
import com.netintech.kafka.task.SendVehicleInfo;
import com.netintech.kafka.utils.SpringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Transactional;
/**
* 多线程kafka消费类
*/
public class OneWork implements Runnable {
//日志类
private static final Logger LOG = LoggerFactory.getLogger(OneWork.class);
private ConsumerRecordconsumerRecord;
public OneWork(ConsumerRecord record) {
this.consumerRecord = record;
}
@Override
public void run() {
try{
//执行消费数据处理方法consumerRecord.value()--消费数据
SendVehicleInfo.jsonToWebService(consumerRecord.value());
}catch (Exception e){
LOG.info("异常错误信息:"+e.getMessage());
}
}
}
5.开启消费
//获取的配置文件中的配置
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String commit;
@Value("${auto.commit.interval.ms}")
private String intervalms;
@Value("${session.timeout.ms}")
private String timeoutms;
开启消费的方法
ConsumerHandler consumers = new ConsumerHandler( servers,commit,intervalms,timeoutms,消费组, 消费主题topic);
//反射多线程消费任务类传入执行方法
Class> cl = Class.forName("com.demo.kafka.impl.OneWork");
//开始消费 num线程数量 topic 消费主题
consumers.execute(num,cl,topic);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。