赞
踩
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
可以根据情况只配置生产着或消费者
spring:
kafka:
# 以逗号分隔的地址列表,用于建立与 Kafka 集群的初始连接 (kafka 默认的端口号为 9092)
bootstrap-servers: ip:port,ip:port,ip:port
# 生产者配置
producer:
# 消息重发的次数
retries: 0
# 一个批次可以使用的内存大小
batch-size: 16384
# 设置生产者内存缓冲区的大小
buffer-memory: 33444432
# 键的序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
acks: all
# 消费者配置
consumer:
max:
poll:
records: 1
broker-id: 0
# 自动提交的时间间隔 在 spring boot 2.X 版本中这里采用的是值的类型为 Duration 需要符合特定的格式,如 1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是 true,为了避免出现重复数据和数据丢失,可以把它设置为 false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 更换消费组就可以从头消费数据,同一个消费组用户不会重复消费数据
# groupid可以消费的时候再指定
group-id: capb_group
listener:
# 在侦听器容器中运行的线程数。
concurrency: 3
# 手工ack,调用ack后立刻提交offset
ack-mode: manual_immediate
private static final String TOPIC_NAME = "my_topic_name";
@Test
public void testSendKafka() {
kafkaTemplate.send(TOPIC_NAME, "{\"json\":\"ok\"}").addCallback(success -> {
String topic = success.getRecordMetadata().topic();
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
System.out.println("发送成功:topic=" + topic + ", partition=" + partition + ", offset=" + offset);
}, failue -> {
System.out.println("发送消息失败:" + failue.getMessage());
});
}
每个groupId都可以完整消费指定topic的所有数据,要想重新消费所有数据可以更换groupid组
// 可以在配置文件中灵活配置
// @KafkaListener(topics = {"my_topic_name"}, groupId = "my_groupId")
@KafkaListener(topics = {"my_topic_name"})
public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value());
System.out.println(JSON.parseObject(record.value().toString(), KafkaEvent.class));
// 消费完后手动通知已消费,在上面有配置
ack.acknowledge();
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。