赞
踩
本文主要介绍了Springboot项目整合kafka的最简单的方式.
1.引入Maven
2.增加消费者和生产者配置
3.初始化读取配置
4.进行消费和生产消息
1. 引入Maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
2. 增加消费者和生产者配置
这里需要三个配置文件,application.yml,消费者配置文件,生产者配置文件.
在application.yml中指定消费者和生产者配置文件的名称.
application.yml:
kafka:
consumerTopic: topic1 #消费的topic名称
consumerConfig: kafka_consumer.properties #配置文件名称
consumerCommit: true #消费者是否提交偏移量
producerTopic: topic2 #发送的topic名称
producerConfig: kafka_producer.properties #配置文件名称
producerEnable: true #生产者是否生产开关
kafka_consumer.properties:
#消费者组ID
group.id=test
#broker地址 中间以逗号分隔
bootstrap.servers=10.100.100.100:9092
#zookeeper地址
zookeeper.connect=10.100.100.100:2181
#是否自动提交偏移量 建议设置成false 自己控制偏移量提交
enable.auto.commit=false
#提交offset到zookeeper的时间间隔
auto.commit.interval.ms=1000
#重要 消费的起始位置 earliest表示从最早记录开始消费 latest表示从最新记录消费
auto.offset.reset=earliest
#每次拉取的最大条数
max.poll.records=100
#处理的超时时间
session.timeout.ms=100000
#单条消息的最大大小
max.request.size=104857600
kafka_producer.properties
#broker地址
bootstrap.servers=10.100.100.100:9092
#重要 副本确认模式 producer的消息发送确认机制
acks=1
#单条消息的最大大小
max.request.size=104857600
#生产缓冲区大小
buffer.memory=104857600
#重试次数
retries=3
3. 初始化读取配置.
(1)创建配置类,利用Springboot自动加载application.yml中配置文件中kafka的相关参数.
@Component
@ConfigurationProperties(prefix = "kafka")
public class KafkaConfig {
//消费TOPIC
private String consumerTopic;
//消费者相关配置文件路径
private String consumerConfig;
//消费者是否提交偏移量
private boolean consumerCommit;
//生产者生产TOPIC
private String producerTopic;
//生产者相关配置文件路径
private String producerConfig;
//生产者是否生产开关
private boolean producerEnable;
public String getConsumerTopic() {
return consumerTopic;
}
public void setConsumerTopic(String consumerTopic) {
this.consumerTopic = consumerTopic;
}
public String getConsumerConfig() {
return consumerConfig;
}
public void setConsumerConfig(String consumerConfig) {
this.consumerConfig = consumerConfig;
}
public boolean isConsumerCommit() {
return consumerCommit;
}
public void setConsumerCommit(boolean consumerCommit) {
this.consumerCommit = consumerCommit;
}
public String getProducerTopic() {
return producerTopic;
}
public void setProducerTopic(String producerTopic) {
this.producerTopic = producerTopic;
}
public String getProducerConfig() {
return producerConfig;
}
public void setProducerConfig(String producerConfig) {
this.producerConfig = producerConfig;
}
public boolean isProducerEnable() {
return producerEnable;
}
public void setProducerEnable(boolean producerEnable) {
this.producerEnable = producerEnable;
}
/**
*@Description 将给定topic列表字符串拆解成列表
*@Param [topics] topic列表字符串,中间逗号分隔
*@Return java.util.List<java.lang.String> topic列表
*/
public static List<String> getTopicLists(String topics) {
String[] items = topics.split(",");
List<String> result = new LinkedList<>();
for (String item : items) {
if (!StringUtils.isBlank(item)) {
result.add(StringUtils.trim(item));
}
}
return result;
}
}
(2)定义一个service类,加载生产者和消费者真正配置文件,完成生产对象和消费对象的初始化.
@Autowired
private KafkaConfig config;
//消费对象
private Consumer<String, String> messageConsumer;
//生产对象
private Producer<String, String> messageProducer;
/**
*@Description Kafka集群的初始化 包含加载配置文件,消费者、生产者的初始化
*/
public void init() {
//初始化生产者
if (messageProducer == null && config.isProducerEnable()) {
messageProducer = new KafkaProducer<>(PropertyFileUtil.load(config.getProducerConfig()),
new StringSerializer(), new StringSerializer());
}
//初始化消费者
if (messageConsumer == null) {
//已经在配置中配置了消费者组等等信息
messageConsumer = new KafkaConsumer<>(PropertyFileUtil.load(config.getConsumerConfig()),
new StringDeserializer(), new StringDeserializer());
//订阅主题
messageConsumer.subscribe(config.getTopicLists(config.getConsumerTopic()));
}
}
public class PropertyFileUtil {
/**
*@Description 加载配置文件
*/
public static Properties load(String path) {
InputStream stream = findInUserDir(path);
if (stream == null) {
stream = findInResource(path);
}
if (stream == null) {
return null;
}
Properties properties = new Properties();
try {
properties.load(stream);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
return properties;
}
private static InputStream findInUserDir(String file) {
String userDir = System.getProperty("user.dir");
if (userDir != null) {
File f = new File(userDir, file);
if (f.exists() && f.isFile()) {
try {
FileInputStream in = new FileInputStream(f);
logger.info("加载配置:" + f.getAbsolutePath());
return in;
} catch (FileNotFoundException e) {
return null;
}
}
}
return null;
}
private static InputStream findInResource(String file) {
logger.info("加载配置:classpath:/" + file);
return PropertyFileUtil.class.getResourceAsStream("/" + file);
}
}
4. 进行真正的生产和消费.
生产:
/**
*@Description Kafka消息发送方式,这里是异步发送
*/
public void produce(String key, String value) {
//首先判断如果value为空则跳过
if(StringUtils.isBlank(value)) {
return;
}
if (messageProducer != null) {
String topic = config.getProducerTopic();
messageProducer.send(
new ProducerRecord<>(topic, key, value),
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception ex) {
if (ex != null) {
// 发生错误
LOGGER.error("kafka生产数据异常,异常数据为-->key:{},value:{}",key,value,ex);
throw new RuntimeException("kafka生产数据异常...");
} else {
// 发送成功
LOGGER.info("{}异步生产完毕",key);
}
}
});
}
}
消费:
//默认为false
private volatile boolean reqClose;
/**
*@Description 消费者消费消息方法
*/
public void consume() {
//计数器
long retry = 0;
//因为要不断获取消息,所以需要循环
while (!reqClose) {
try {
ConsumerRecords<String, String> records = messageConsumer.poll(1000);
//会一次返回多个值,所以需要遍历
for (ConsumerRecord<String, String> record : records) {
if (reqClose) {
LOGGER.info("跳出消費..");
break;
}
String key = record.key();
String value = record.value();
//这里写真正的业务逻辑,处理完再提offset,避免丢失数据
}
/**
* 手动提交offset,第一次消费数据的时候会从初始值,也就是你当前消费者保存的offset开始消费,并且会持续消费
* 下一条数据,因为虽然我们没有提交offset,但是内存中还维护了一个offset,但是一旦我们关闭了消费者,
* 再下一次启动时,又会从当前消费者保存的offset开始消费,那也就会造成重复消费.
*/
if (config.isConsumerCommit()) {
//同步提交,线程会阻塞到当前offset提交成功.
messageConsumer.commitSync();
}
} catch (Exception ex) {
if (retry % 60 == 0) {
// 防止Kafka失联时,错误过多,控制1分钟出一次日志
LOGGER.error("Kafka Operation Failed", ex);
retry++;
}
}
}
// 释放资源
this.close();
}
完整的service处理类,这里定义了一个接口,然后使用一个实现类去实现.
public interface KafkaService {
/**
*@Description 初始化Kafka生产者及消费者
*/
public void init();
/**
*@Description 消费kafka消息
*/
public void consume();
/**
*@Description 生产kafka消息
*@Param [key, value] 消息的key和value
*/
public void produce(String key, String value);
/**
*@Description 关闭释放资源
*/
void reqClose();
/**
* 重置kafka消费的offset
*/
void resetOffset();
/**
*@Description 处理消息的业务方法
*/
void consumeRecord(String key,String msg);
}
@Service
public class KafkaServiceImpl implements KafkaService {
private static Logger LOGGER = LoggerFactory.getLogger(KafkaServiceImpl.class);
@Autowired
private KafkaConfig config;
private volatile boolean reqClose;
private volatile boolean closed;
private Consumer<String, String> messageConsumer;
private Producer<String, String> messageProducer;
/**
*@Description Kafka集群的初始化 包含加载配置文件,消费者、生产者的初始化
*/
@Override
public void init() {
LOGGER.info("初始化生产者..");
if (messageProducer == null && config.isProducerEnable()) {
messageProducer = new KafkaProducer<>(PropertyFileUtil.load(config.getProducerConfig()),
new StringSerializer(), new StringSerializer());
}
LOGGER.info("初始化消费者..");
if (messageConsumer == null) {
//已经在配置中配置了消费者组等等信息
messageConsumer = new KafkaConsumer<>(PropertyFileUtil.load(config.getConsumerConfig()),
new StringDeserializer(), new StringDeserializer());
//订阅主题
messageConsumer.subscribe(config.getTopicLists(config.getConsumerTopic()));
}
}
/**
*@Description Kafka消息发送方式
*/
@Override
public void produce(String key, String value) {
//首先判断如果value为空则跳过
if(StringUtils.isBlank(value)) {
return;
}
if (messageProducer != null) {
String topic = config.getProducerTopic();
messageProducer.send(
new ProducerRecord<>(topic, key, value),
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception ex) {
if (ex != null) {
// 发生错误
LOGGER.error("kafka生产数据异常,异常数据为-->key:{},value:{}",key,value,ex);
throw new RuntimeException("kafka生产数据异常...");
} else {
// 发送成功
LOGGER.info("{}异步生产完毕",key);
}
}
});
}
}
/**
*@Description 消费者消费消息方法
*/
@Override
public void consume() {
//计数器
long retry = 0;
//因为要不断获取消息,所以需要循环
while (!reqClose) {
try {
ConsumerRecords<String, String> records = messageConsumer.poll(1000);
//会一次返回多个值,所以需要遍历
for (ConsumerRecord<String, String> record : records) {
if (reqClose) {
LOGGER.info("跳出消費..");
break;
}
String key = record.key();
String value = record.value();
//处理具体的逻辑业务,一条一条处理,处理完再提交offset,避免丢失数据
consumeRecord(key,value);
}
/**
* 手动提交offset,第一次消费数据的时候会从初始值,也就是你当前消费者保存的offset开始消费,并且会持续消费
* 下一条数据,因为虽然我们没有提交offset,但是内存中还维护了一个offset,但是一旦我们关闭了消费者,
* 再下一次启动时,又会从当前消费者保存的offset开始消费,那也就会造成重复消费.
*/
if (config.isConsumerCommit()) {
//同步提交,线程会阻塞到当前offset提交成功.
messageConsumer.commitSync();
}
} catch (Exception ex) {
if (retry % 60 == 0) {
// 防止Kafka失联时,错误过多,控制1分钟出一次日志
LOGGER.error("Kafka Operation Failed", ex);
retry++;
}
}
}
// 释放资源
this.close();
}
/**
*@Description 处理消息的业务方法
*/
@Override
public void consumeRecord(String key,String msg) {
LOGGER.info("接收到kafka消息,key:{}",key);
}
/**
*@Description 重置消费者偏移量
*/
@Override
public void resetOffset() {
Consumer<String, String> consumer = new KafkaConsumer<>(PropertyFileUtil.load(config.getConsumerConfig()),
new StringDeserializer(), new StringDeserializer());
List<String> topics = config.getTopicLists(config.getConsumerTopic());
for (String topic : topics) {
consumer.subscribe(Arrays.asList(topic));
ConsumerRecords<String, String> records = consumer.poll(2000);
Set<TopicPartition> topicList = consumer.assignment();
Map<TopicPartition, Long> endMap = consumer.endOffsets(topicList);
Map<TopicPartition, Long> beginmap = consumer.beginningOffsets(topicList);
long singleTpLagSize = 1000000;
for (TopicPartition tp : topicList) {
long endOffset = endMap.get(tp);
long beginOffset = beginmap.get(tp);
long aimOffset = endOffset - singleTpLagSize;
if (aimOffset > 0 && aimOffset >= beginOffset) {
consumer.seek(tp, endOffset-singleTpLagSize);
} else {
consumer.seek(tp, beginOffset);
}
}
consumer.commitSync();
}
}
/**
* 是否关闭kafka得长轮询
* @return
*/
public boolean isClosed() {
return closed;
}
/**
* 关闭资源
*/
private void close() {
try {
if (messageConsumer != null) {
messageConsumer.close();
}
if (messageProducer != null) {
messageProducer.close();
}
} catch (Exception ex) {
ex.printStackTrace();
}
closed = true;
LOGGER.info("Kafka資源釋放完畢!");
}
/**
* 关闭kafka得循环消费逻辑
*/
public void reqClose() {
this.reqClose = true;
}
}
江湖路远,相逢即为缘分,如略有帮助,一键三联可否,谢谢啦
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。