赞
踩
消息持久化到磁盘,因此可用于批量消费
支持 Server 间的消息分区及分布式消费,同时保证每个 partition 内的消息顺序传输
消息被处理的状态是在 consumer 端维护,而不是由 server 端维护,broker 无状态,consumer 自己保存 offset。
同时支持离线数据处理和实时数据处理。
参考文章消息队列之 Kafka
// Demo01Producer.java @Component public class Demo01Producer { @Resource private KafkaTemplate<Object, Object> kafkaTemplate; public SendResult syncSend(Integer id) throws ExecutionException, InterruptedException { // 创建 Demo01Message 消息 Demo01Message message = new Demo01Message(); message.setId(id); // 同步发送消息 return kafkaTemplate.send(Demo01Message.TOPIC, message).get(); } public ListenableFuture<SendResult<Object, Object>> asyncSend(Integer id) { // 创建 Demo01Message 消息 Demo01Message message = new Demo01Message(); message.setId(id); // 异步发送消息 return kafkaTemplate.send(Demo01Message.TOPIC, message); } }
// Demo01Consumer.java
@Component
public class Demo01Consumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = Demo01Message.TOPIC,
groupId = "demo01-consumer-group-" + Demo01Message.TOPIC)
public void onMessage(Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
// Demo01AConsumer.java
@Component
public class Demo01AConsumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = Demo01Message.TOPIC,
groupId = "demo01-A-consumer-group-" + Demo01Message.TOPIC)
public void onMessage(ConsumerRecord<Integer, String> record) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), record);
}
}
差异一,在方法上,添加了 @KafkaListener
注解,声明消费的 Topic 还是 "DEMO_01"
,消费者分组修改成了 "demo01-A-consumer-group-DEMO_01"
。这样,我们就可以测试 Kafka 集群消费的特性。
集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
也就是说,如果我们发送一条 Topic 为
"DEMO_01"
的消息,可以分别被"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都消费一次。
但是,如果我们启动两个该示例的实例,则消费者分组
"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都会有多个 Consumer 示例。此时,我们再发送一条 Topic 为"DEMO_01"
的消息,只会被"demo01-A-consumer-group-DEMO_01"
的一个 Consumer 消费一次,也同样只会被"demo01-A-consumer-group-DEMO_01"
的一个 Consumer 消费一次。
通过集群消费的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER"
的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:
这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。
差异二,方法参数,设置消费的消息对应的类不是 Demo01Message 类,而是 Kafka 内置的 ConsumerRecord 类。通过 ConsumerRecord 类,我们可以获取到消费的消息的更多信息,例如说消息的所属队列、创建时间等等属性,不过消息的内容(value
)就需要自己去反序列化。当然,一般情况下,我们不会使用 ConsumerRecord 类。
public @interface KafkaListener { /** * id 唯一标识的前缀 * The unique identifier of the container managing for this endpoint. * SpEL {@code #{...}} and property place holders {@code ${...}} are supported. */ String id() default ""; /** * org.springframework.kafka.config.KafkaListenerContainerFactory的 bean 名称, * 用于创建负责为该端点提供服务的消息侦听器容器。 如果未指定,则使用默认容器工厂(如果有) */ String containerFactory() default ""; /** * 监听的 Topic 数组 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'. */ String[] topics() default {}; /** * 监听的 Topic 表达式 * The entries can be 'topic pattern', a 'property-placeholder key' or an 'expression'. */ String topicPattern() default ""; /** * @TopicPartition 注解的数组。每个 @TopicPartition 注解, * 可配置监听的 Topic、队列、消费的开始位置 */ TopicPartition[] topicPartitions() default {}; /** * 所属 MessageListenerContainer Bean 的名字。 */ String containerGroup() default ""; /** * 使用消费异常处理器 KafkaListenerErrorHandler 的 Bean 名字 */ String errorHandler() default ""; /** * 消费者分组 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported. */ String groupId() default ""; /** * 当 groupId 未设置时,是否使用 id 作为 groupId */ boolean idIsGroup() default true; /** * id 唯一标识的前缀 */ String clientIdPrefix() default ""; /** * 真实监听容器的 Bean 名字,需要在名字前加 "__" 。 */ String beanRef() default "__listener"; /** * 自定义消费者监听器的并发数 */ String concurrency() default ""; /** * 是否自动启动监听器。默认情况下,为 true 自动启动。 */ String autoStartup() default ""; /** * Kafka Consumer 拓展属性。 */ String[] properties() default {}; }
application.properties
spring.kafka.producer.batch-size=16384 # 每次批量发送消息的最大数量
spring.kafka.producer.buffer-memory=33554432 # 每次批量发送消息的最大内存
spring.kafka.producer.properties.linger.ms=30000 # 批处理延迟时间上限。不管是否消息数量是否到达 batch-size 或者消息大小到达 buffer-memory 后,都直接发送一次请求。
批量发送消息的producer看起来没有什么特别的区别
application.properties
spring.kafka.listener.type=BATCH # 监听器类型,默认为 SINGLE ,只监听单条消息。配置 BATCH ,监听多条消息,批量消费
spring.kafka.consumer.max-poll-records=100 # poll 一次消息拉取的最大数量
spring.kafka.consumer.fetch-min-size= 10 # poll 一次消息拉取的最小数据量,单位:字节
spring.kafka.consumer.fetch-max-wait=10000 # poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息
// Demo02Consumer.java
@Component
public class Demo02Consumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = Demo02Message.TOPIC,
groupId = "demo02-consumer-group-" + Demo02Message.TOPIC)
public void onMessage(List<Demo02Message> messages) {
logger.info("[onMessage][线程编号:{} 消息数量:{}]", Thread.currentThread().getId(), messages.size());
}
}
Spring-Kafka 提供消费重试的机制。在消息消费失败的时候,Spring-Kafka 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
当然,Spring-Kafka 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,Spring-Kafka 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,Spring-Kafka 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
Spring-Kafka 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。后续,我们可以通过对死信队列中的消息进行重发,来使得消费者实例再次进行消费
KafkaConfiguration
配置类,增加消费异常的 ErrorHandler 处理器
// KafkaConfiguration.java @Configuration public class KafkaConfiguration { @Bean @Primary public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) { // <1> 创建 DeadLetterPublishingRecoverer 对象 // 负责实现,在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。 ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template); // <2> 创建 FixedBackOff 对象 // 我们配置了重试 3 次,每次固定间隔 30 秒 BackOff backOff = new FixedBackOff(10 * 1000L, 3L); // <3> 创建 SeekToCurrentErrorHandler 对象 // 处理异常,串联整个消费重试的整个过程 return new SeekToCurrentErrorHandler(recoverer, backOff); } }
#seek(TopicPartition partition, long offset)
方法,将 Consumer 对于该消息对应的 TopicPartition 分区的本地进度设置成该消息的位置。这样,Consumer 在下次从 Kafka Broker 拉取消息的时候,又能重新拉取到这条消费失败的消息,并且是第一条。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。