赞
踩
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.7</version>
</dependency>
spring:
kafka:
bootstrap-servers: localhost:9092 # 用来初始化连接kafka(不用配置全部节点,会动态发现)
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
buffer-memory: 33554432 # 缓存容量。默认值32MB = 33554432
batch-size: 163840 # 默认 single request 批处理大小(以字节为单位),默认16KB = 16384
retries: 1 # 消息发送失败重试次数
acks: 1
properties:
linger:
ms: 500 # 不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送。与batch-size配合使用,满足一个就发送
max:
request:
size: 1048576 # 请求的最大字节数
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: radar # 默认消费者组
max-poll-records: 2000 # 批量一次最大拉取数据量
enable-auto-commit: false # 自动提交已消费offfset,false-禁用
auto-commit-interval: 4000 # 自动提交时间间隔,单位ms
auto-offset-reset: earliest
heartbeat-interval: 10000 # ⼼跳与消费者协调员之间的预期时间(以毫秒为单位)
fetch-max-wait: 500
listener:
ack-mode: manual_immediate # manual_immediate-手动ack后立即提交;batch-批量自动确认;RECORD-单条自动确认;
type: batch # 批量消费
missing-topics-fatal: false # 未发现topic时不报错: 自动创建topic需要设置为false
template:
default-topic: radar
patitions: 7
replications: 1
batch-size
和linger.ms
。batch-size
参数是指一批消息发送的字节数,消息积累到这么多字节就会发送,默认16384(16KB)。根据自己业务需求,决定是要低延迟还是高吞吐量,可以改小或改大,也可以通过修改数值不断尝试,从而取得延迟与吞吐量的平衡。linger.ms
是指延迟毫秒数,默认是0立刻发送,设置数值后在到达指定毫秒数时才会一起发送batch-size
和linger.ms
这两个条件都设置时,只要满足其中一个条件,就会发送消息linger.ms
,和batch-size
不同,没法直接配置,需要使用properties
进行配置,还有一些其他参数也是如此listener.type
设置为batch
,启用批量监听消费max.poll.records
,一次批量拉取的数量,默认500,可以根据需要设置大一点,但要注意,如果一次拉取太多,消费不了阻塞了,也会有问题enable-auto-commit: false
,消息消费后手动确认立刻生效listener.ack-mode: manual_immediate
server.properties
里设置,这个配置类就不需要了,只需要设置spring.kafka.listener.missing-topics-fatal
为false
即可,未发现topic时不会报错而是自动创建topic,具体可参考我的这篇博客package com.newatc.collect.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName KafkaConfiguration
* @Description kafka配置类,读取配置创建topic
* @Date 2022-6-28 09:55:26
* @Author yanyulin
**/
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.template.default-topic}")
private String topic;
@Value("${spring.kafka.template.patitions}")
private Integer patitions;
@Value("${spring.kafka.template.replications}")
private Short replications;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Integer getPatitions() {
return patitions;
}
public void setPatitions(Integer patitions) {
this.patitions = patitions;
}
public Short getReplications() {
return replications;
}
public void setReplications(Short replications) {
this.replications = replications;
}
/**
* 项目启动时,自动创建topic,指定分区和副本数量
* @return Topic
*/
@Bean
public NewTopic topic() {
return new NewTopic(topic, patitions, replications);
}
}
KafkaTemplate
即可package com.newatc.collect.config;
import com.newatc.collect.util.PartitionEnum;
import javax.annotation.Resource;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* @ClassName KafkaProducer
* @Description kafka信息发送类
* @Date 2022-6-27 11:20:12
* @Author yanyulin
**/
@Component
public class KafkaProducer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${spring.kafka.template.default-topic}")
private String topic;
/**
* 将雷达上报数据,发到kafka队列里
* @param type
* @param data
*/
public void sendData(String type, String data) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, PartitionEnum.getMap().get(type), type, data);
kafkaTemplate.send(producerRecord);
}
/**
* 将雷达上报数据,发到kafka队列里
* @param type
* @param message
*/
public void sendMessage(String type, String message) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, PartitionEnum.getMap().get(type), type, message);
kafkaTemplate.send(producerRecord);
log.debug("发送 {} 数据到kafka : {}", type, message);
}
}
KafkaProducer
依赖注入即可 @Autowired
private KafkaProducer producer;
producer.sendData(PartitionEnum.FLOW_STATS.getType(), JSONObject.toJSONString(flowStats));
@KafkaListener
即可topic
和partition
@Component
public class KafkaConsumer {
/**
* 雷达实时轨迹数据加载呈现<1s,即时数据立刻消费
* @param records
* @param ack
*/
@KafkaListener(
containerGroup = "${spring.kafka.consumer.group-id}",
topicPartitions = { @TopicPartition(topic = "${spring.kafka.template.default-topic}", partitions = { "0" }) }
)
public void receiverRealTimeDataRecord(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
int size = records.size();
log.debug("RealTimeData RECV MSG COUNT: {}\n", size);
List<String> data = new ArrayList<>();
for (ConsumerRecord<String, String> consumerRecord : records) {
data.add(consumerRecord.value());
}
realTimeDataClickHouseService.saveAll(data);
log.debug("\n[RealTimeData] {} 消费完成", size);
//确认单当前消息(及之前的消息)offset均已被消费完成
ack.acknowledge();
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。