赞
踩
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.14.RELEASE</version>
</dependency>
spring: kafka: consumer: bootstrap-servers: 192.168.2.130:9092 auto-offset-reset: earliest #enable-auto-commit: true #消费者自动提交的间隔 #auto-commit-interval: 5S #通过ack-mode设置自动提交(推荐) enable-auto-commit: false # 错误处理 key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: spring: json: trusted: packages: "com.example.kafak2.user" spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer listener: #listener类型为单条记录single类型(默认为single单条消费模式) type: single #offset提交模式为record ack-mode: record
关键属性解释:
listener-type
:指定消费者监听器的类型,提供参数:
single
:单线程模式,所有分配给消费者的分区都由同一个线程处理,使用于少量主题和顺序处理的场景。
batch
:多线程模式,消费者使用多个线程来批量处理消息,每个线程负责处理一批分区的消息。适用于大量主题和需要高吞吐量的场景。
spring.json.trusted.packages
:配置信任的java包。设置消费者和生产者在序列化和反序列化JSON消息时,哪些包受信任,可以被使用。如果消息中的Java包不在受信任的列表中,kafka会抛出com.fasterxml.jackson.databind.exc.InvalidTypeIdException
异常,防止潜在的安全风险。多个包之间用逗号分隔。
spring.deserializer.[key|value].delegate.class
:配置kay/value的反序列化器的委托类。
委托类必须实现org.apache.kafka.common.serialization.Deserializer
接口,并提供反序列化的实现。因此我们可以自定义实现一个委托类,或者使用kafka提供的默认实现类。
[key|value]-deserializer:
通常设置为org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
,用来在反序列化时处理可能出现的异常。默认情况下,ErrorHandlingDeserializer
会使用org.springframework.kafka.listener.SeekToCurrentErrorHandler
作为异常处理器。该异常处理会重新定位到当前的位置,并尝试重新消费下一个消息。以确保消费者不会因为单个异常而中断,增强了消费者的健壮性。
auto-offset-reset
:自定义重定位偏移量。属性值:earliest:从最早的可用消息开始消费。latest:从最新的消息开始消费。none:如果不存在消费偏移,则抛出一个异常。exception:抛出一个异常。
需要注意的是,如果kafka服务器记录有消费者消费到的offset,那么无论设置的是什么,消费者都会按照服务器上的offset继续消费数据
如果服务器上消费者的offset记录丢失了,此时auto-offset-rest就会起作用。
enable-auto-commit
:是否自动提交偏移量,消费者的偏移量并非自动更新的,只有消费者提交之后,kafka上存储的消费组的offset才会被更新。
需要注意的是,kafka更推荐通过ack-mode
进行偏移量提交。
ack-mode
提供以下几种模式:
1.RECORD
当每一条记录被消费者监听器处理后提交。
2.BATCH
当每一批poll()的数据被消费者监听器处理后提交
3.TIME
当每一批poll()的数据被消费者监听器处理后,距离上次提交时间大于ACK-TIME时提交
4.COUNT
当每一批poll()的数据被消费者监听器处理之后,被处理的record数量大于等于ACK-COUNT时提交
5.COUNT_TIME
TIME COUNT设置有一个条件满足时提交
6.MANUAL
当每一批poll()的数据被消费者监听器处理之后,手动调用Acknowledgment.acknowledge()
后提交。
7.MANUAL_IMMEDIATE
⼿动调⽤Acknowledgment.acknowledge()后⽴即提交
kafka为什么不推荐使用enable-auto-commit进行偏移量提交
1.无法控制提交的时机,可能会导致偏移量提交不准确或丢失。
2.无法保证消费者偏移量的完整性:当消费者在处理消息的过程中发送错误时,无法保证偏移量完整性,可能会造成消息重复消费或者丢失。
3.没有处理消费者偏移量的反馈信息,无法获取到提交是否成功的反馈。
kafka为什么推荐使用ack-mode进行偏移量提交
1.确保偏移量完整性:使用ack-mode可以确保消费者提交偏移量的请求被服务器正确处理并写入到日志中。当消费者提交偏移量时,它会等待服务器的确认,确保偏移量已经被记录下来,避免偏移量的丢失或提交不准确的问题。
2.提供更可靠的偏移量提交:使用ack-mode可以确保偏移量的提交是可靠的。消费者会等待分区的leader接收并写入偏移量,然后才会继续消费下一批消息。如果提交失败,消费者会重试,直到成功为止。减少偏移量提交失败的情况,提高可靠性
。
3.支持精确的控制:使用ack-mode可以精确地控制何时提交偏移量。根据业务需要,也可以在消息处理完成后手动提交偏移量,而不是像enable-auto-commit在固定的时间间隔或再均衡发生时自动提交。
spring: kafka: producer: bootstrap-servers: 192.168.2.130:9092 # acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。 # acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。 # acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。 acks: -1 #多少数据发送一次,默认16k batch-size: 16384 #客户端缓冲区大小,默认32M ,满了也会触发消息发送 buffer-memory: 33554432 #异常自动重试次数 retries: 3 properties: spring: json: trusted: packages: com.example.kafak2.user max: block: ms: 3000 #批量发送等待时间,防止数据一直达不到发送容量标准 linger: ms: 500 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
关键属性解释:
acks
:设置消息的复制策略。
acks=0:生产者发送消息后,不需要等待Broker的响应,消息会立即返回,这种情况下消息可能会丢失。
acks=1:生产者发送消息后需要等待Broker的确认消息,只要消息被成功写入Leader分区就会返回ack,这种情况下如果Leader分区发生故障,消息可能会丢失。
ack=all(或ack=-1):生产者发生消息后需要等待所有的ISR(同步副本)都确认成功后才会返回ack,这种情况下可以保证消息不丢失。
batch-size
:生产者端,一次批量发送的消息数量。默认16k。
buffer-memory
:用于控制生产者缓冲区的总内存大小的。它指定了生产者可以使用的总内存量,用于存储待发送消息的缓冲区。默认32M ,满了也会触发消息发送。
linger.ms
:生产者发送消息的延迟时间;也可以理解为批量发送消息的等待时间,防止数据一直达不到发送的容量标准。
max.block.ms
:当生产者发送消息时,如果Kafka的发送缓冲区已满或者无法连接到Kafka集群。生产者将等待的最长时间。该参数的默认值为60000毫秒(即60秒)。如果在等待时间内,无法发送消息成功,将抛出TimeoutException
异常。
retries
:生产者在发送消息时的重试次数。当生产者发送消息到Kafka集群时,如果发送失败或者出现可恢复的错误,生产者会尝试重新发送消息。默认值为0
@KafkaListener(topicPattern="java.*",groupId = "consumerGroup1")
public void essage(ConsumerRecord consumerRecord){
System.out.printf("consumerGroup1:分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.key(),
consumerRecord.value().toString(),
consumerRecord.timestamp()
);
}
@Resource
KafkaTemplate kafkaTemplate;
public void sendMassage(String massage){
Message<User> message = MessageBuilder
.withPayload(new User("测试json","23"))
.setHeader(KafkaHeaders.TOPIC,"javaTopic")
.build();
kafkaTemplate.send(message);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。