赞
踩
SpringBoot为Kafka提供了两种配置方式
SpringBoot提供了KafkaProperties
配置类,且会将spring.kafka
开头的配置项值注入的配置类中
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
// 配置项
}
在使用Kafka配置项时只需要将其注入即可
// 表明此类是springboot的配置类
@Configuration
// 使@ConfigurationProperties注解的类生效
@EnableConfigurationProperties({KafkaProperties.class})
// 开启spring kafka的能力
@EnableKafka
public class KafkaConfig {
// 此时kafkaProperties已经注入了配置文件的值
private final KafkaProperties kafkaProperties;
}
在spring的配置文件application.yml
中配置如下:
spring:
kafka:
# Kafka服务端监听地址端口,集群用逗号分隔
bootstrap-servers: 192.168.31.249:9092
consumer:
# 消费者组ID,在消费者实例没有指定消费者组的时候生效
group-id: test01
# 如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。
enable-auto-commit: true
# 每次自动提交offset的时间间隔,当enable-auto-commit设置为true时生效,默认值为5000,单位ms
auto-commit-interval: 500
# kafka服务(实际是zookeeper)中没有初始化的offset时,如果offset是以下值的回应:
# earliest:自动复位offset为smallest的offset
# latest:自动复位offset为largest的offset
# anything else:向consumer抛出异常
# none:如果整个消费者组中没有以往的offset,则抛出异常
auto-offset-reset: latest
# message的key的解码类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# message的value的解码类
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 单次消费获取数据的最大条数
max-poll-records: 500
# 每次fetch请求时,server应该返回的最小字节数。
# 如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。默认值为1,单位bytes
fetch-min-size: 1
# 如果没有足够的数据能够满足fetch.min.bytes(fetch-min-size),
# 则此项配置是指在应答fetch请求之前,server会阻塞的最大时间,默认值为100,单位ms
fetch-max-wait: 100
# 如果设置为read_committed,则consumer会缓存消息,直到收到消息对应的事务控制消息。
# 若事务commit,则对外发布这些消息;若事务abort,则丢弃这些消息
# 默认值为read_uncommitted
isolation-level: read_uncommitted
producer:
# producer需要server接收到数据之后发出的确认接收的信号
# acks=0:设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1;
# acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
# acks=all: 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
acks: 1
# 设置大于0的值将使客户端重新发送任何数据。
# 注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
retries: 4
# producer将试图批处理消息记录,以减少请求次数,这项配置控制默认的批量处理消息字节数,默认值16384,单位bytes
batch-size: 16384
# producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常, 默认值33554432,单位bytes
buffer-memory: 33554432
# key的序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 生产者生成的所有数据的压缩类型,此配置接受标准压缩编解码器('gzip','snappy','lz4','zstd')
# 默认为none
compression-type: none
创建流程:
// 根据生产者工厂构建kafkaTemplate
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
return kafkaTemplate;
}
// 将一个生产者工厂注册到spring容器中
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}
KafkaTemplate提供了几个发送消息的接口如下:
topic:指定要发送消息的topic名称
partition:向指定的partition发送消息
key:消息的key
data:消息的data
timestamp:时间信息,一般默认为当前时间
record:ProducerRecord
结构,是对key和value的一层封装,直接发送key和value,也会在内部被封装成ProducerRecord
然后再发送出去
message:包含消息头(topic、partition、字符集)等信息和消息的封装格式
// 向默认的topic发送消息
public ListenableFuture<SendResult<K, V>> sendDefault(V data);
public ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
// 向指定的topic发送消息
public ListenableFuture<SendResult<K, V>> send(String topic, V data);
public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
public ListenableFuture<SendResult<K, V>> send(Message<?> message)
由此利用spring-kafka发送一条数据的代码如下:
Kafka配置类KafkaConfig.java
@Configuration
@EnableConfigurationProperties({KafkaProperties.class})
@EnableKafka
@AllArgsConstructor
public class KafkaConfig {
private final KafkaProperties kafkaProperties;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}
}
测试类SpringBootDemoMqKafkaApplicationTests
测试消息发送
@ExtendWith(SpringExtension.class)
@SpringBootTest
public class SpringBootDemoMqKafkaApplicationTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 每隔1s向numb主题发送“hello,kakfa”的消息
@Test
public void test_send() throws InterruptedException {
while (true) {
kafkaTemplate.send("numb", "hello, kafka");
TimeUnit.SECONDS.sleep(1);
}
}
}
使用kafka-console-consumer.sh
查看消息
./bin/kakfa-console-consumer.sh --bootstrap-server localhost:9092 --topic numb --partition 0 --from-begining
>>hello
2.2.2 中发送消息后,无法感知数据是否成功被发送到broker,这样就可能存在生产者数据提交时消息丢失。SpringKafka提供了生产者消息回调ProducerListener
,分别针对消息提交成功和提交失败进行不同的回调。
public interface ProducerListener<K, V> {
// 成功回调
default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
}
// 失败回调
default void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
}
}
KafkaTemplate
设置ProducerListener
public void setProducerListener(@Nullable ProducerListener<K, V> producerListener) {
this.producerListener = producerListener;
}
测试带回调的KafkaTemplate发送消息
@Configuration
@EnableConfigurationProperties({KafkaProperties.class})
@EnableKafka
@AllArgsConstructor
public class KafkaConfig {
private final KafkaProperties kafkaProperties;
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory,
ProducerListener<String, String> producerListener) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setProducerListener(producerListener);
return kafkaTemplate;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}
// Kafka消息回调
@Bean
public ProducerListener<String, String> producerListener() {
return new ProducerListener<String, String>() {
@Override
public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
System.out.println("发送成功");
}
@Override
public void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata,
Exception exception) {
System.out.println("发送失败");
}
};
}
}
测试类SpringBootDemoMqKafkaApplicationTests
测试消息发送
@ExtendWith(SpringExtension.class)
@SpringBootTest
public class SpringBootDemoMqKafkaApplicationTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
public void test_send_kv_with_listener() throws InterruptedException {
while (true) {
kafkaTemplate.send(KafkaConsts.TOPIC_TEST, "data");
TimeUnit.SECONDS.sleep(1);
}
}
}
测试结果:
发送成功
发送成功
发送成功
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。