当前位置:   article > 正文

Kafka_kafkalistener

kafkalistener

Kafka

消息持久化到磁盘,因此可用于批量消费

支持 Server 间的消息分区及分布式消费,同时保证每个 partition 内的消息顺序传输

消息被处理的状态是在 consumer 端维护,而不是由 server 端维护,broker 无状态,consumer 自己保存 offset。

同时支持离线数据处理和实时数据处理。

参考文章消息队列之 Kafka

1 Kafka中的基本概念

  • Broker:Kafka 集群中的一台或多台服务器统称为 Broker
  • Topic:每条发布到 Kafka 的消息都有一个类别,这个类别被称为 Topic 。(物理上不同 Topic 的消息分开存储。逻辑上一个 Topic 的消息虽然保存于一个或多个broker上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)
  • Partition:Topic 物理上的分区,一个 Topic 可以分为多个 Partition ,每个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 id(offset)
  • Producer:消息和数据的生产者,可以理解为往 Kafka 发消息的客户端
  • Consumer:消息和数据的消费者,可以理解为从 Kafka 取消息的客户端
  • Consumer Group:每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定Group Name,若不指定 Group Name 则属于默认的 Group)。 这是 Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer )和单播(发给任意一个 Consumer )的手段。一个 Topic 可以有多个 Consumer Group。Topic 的消息会复制(不是真的复制,是概念上的)到所有的 Consumer Group,但每个 Consumer Group 只会把消息发给该 Consumer Group 中的一个 Consumer如果要实现广播,只要每个 Consumer 有一个独立的 Consumer Group 就可以了。如果要实现单播只要所有的 Consumer 在同一个 Consumer Group 。用 Consumer Group 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic 。

2 spring-kafka

2.1 集群消费(Clustering

// Demo01Producer.java

@Component
public class Demo01Producer {

    @Resource
    private KafkaTemplate<Object, Object> kafkaTemplate;

    public SendResult syncSend(Integer id) throws ExecutionException, InterruptedException {
        // 创建 Demo01Message 消息
        Demo01Message message = new Demo01Message();
        message.setId(id);
        // 同步发送消息
        return kafkaTemplate.send(Demo01Message.TOPIC, message).get();
    }

    public ListenableFuture<SendResult<Object, Object>> asyncSend(Integer id) {
        // 创建 Demo01Message 消息
        Demo01Message message = new Demo01Message();
        message.setId(id);
        // 异步发送消息
        return kafkaTemplate.send(Demo01Message.TOPIC, message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
// Demo01Consumer.java

@Component
public class Demo01Consumer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @KafkaListener(topics = Demo01Message.TOPIC,
            groupId = "demo01-consumer-group-" + Demo01Message.TOPIC)
    public void onMessage(Demo01Message message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
// Demo01AConsumer.java

@Component
public class Demo01AConsumer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @KafkaListener(topics = Demo01Message.TOPIC,
            groupId = "demo01-A-consumer-group-" + Demo01Message.TOPIC)
    public void onMessage(ConsumerRecord<Integer, String> record) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), record);
    }
    
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 差异一,在方法上,添加了 @KafkaListener 注解,声明消费的 Topic 还是 "DEMO_01" ,消费者分组修改成"demo01-A-consumer-group-DEMO_01" 。这样,我们就可以测试 Kafka 集群消费的特性。

    集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。

    也就是说,如果我们发送一条 Topic 为 "DEMO_01" 的消息,可以分别被 "demo01-A-consumer-group-DEMO_01""demo01-consumer-group-DEMO_01" 都消费一次。

    但是,如果我们启动两个该示例的实例,则消费者分组 "demo01-A-consumer-group-DEMO_01""demo01-consumer-group-DEMO_01" 都会有多个 Consumer 示例。此时,我们再发送一条 Topic 为 "DEMO_01" 的消息,只会被 "demo01-A-consumer-group-DEMO_01" 的一个 Consumer 消费一次,也同样只会被 "demo01-A-consumer-group-DEMO_01" 的一个 Consumer 消费一次。

    通过集群消费的机制,我们可以实现针对相同 Topic不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER" 的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:

    • 积分模块:判断如果是手机注册,给用户增加 20 积分。
    • 优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。
    • 站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。
    • … 等等

    这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。

  • 差异二,方法参数,设置消费的消息对应的类不是 Demo01Message 类,而是 Kafka 内置的 ConsumerRecord 类。通过 ConsumerRecord 类,我们可以获取到消费的消息的更多信息,例如说消息的所属队列、创建时间等等属性,不过消息的内容(value)就需要自己去反序列化。当然,一般情况下,我们不会使用 ConsumerRecord 类。

2.2 @KafkaListener

public @interface KafkaListener {

	/** 
	 * id 唯一标识的前缀
	 * The unique identifier of the container managing for this endpoint.
	 * SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
	 */
	String id() default "";

	/**
	 * org.springframework.kafka.config.KafkaListenerContainerFactory的 bean 名称,
	 * 用于创建负责为该端点提供服务的消息侦听器容器。 如果未指定,则使用默认容器工厂(如果有)
	 */
	String containerFactory() default "";

	/**
	 * 监听的 Topic 数组
	 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
	 */
	String[] topics() default {};

	/**
	 * 监听的 Topic 表达式
	 * The entries can be 'topic pattern', a 'property-placeholder key' or an 'expression'. 
	 */
	String topicPattern() default "";

	/**
	 * @TopicPartition 注解的数组。每个 @TopicPartition 注解,
	 * 可配置监听的 Topic、队列、消费的开始位置
	 */
	TopicPartition[] topicPartitions() default {};

	/**
	 * 所属 MessageListenerContainer Bean 的名字。
	 */
	String containerGroup() default "";

	/**
	 * 使用消费异常处理器 KafkaListenerErrorHandler 的 Bean 名字
	 */
	String errorHandler() default "";

	/**
	 * 消费者分组
	 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
	 */
	String groupId() default "";

	/**
	 * 当 groupId 未设置时,是否使用 id 作为 groupId
	 */
	boolean idIsGroup() default true;

	/**
	 * id 唯一标识的前缀
	 */
	String clientIdPrefix() default "";

	/**
	 * 真实监听容器的 Bean 名字,需要在名字前加 "__" 。
	 */
	String beanRef() default "__listener";

	/**
	 * 自定义消费者监听器的并发数
	 */
	String concurrency() default "";

	/**
	 * 是否自动启动监听器。默认情况下,为 true 自动启动。
	 */
	String autoStartup() default "";

	/**
	 * Kafka Consumer 拓展属性。
	 */
	String[] properties() default {};

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

2.3 批量发送消息

application.properties

spring.kafka.producer.batch-size=16384 # 每次批量发送消息的最大数量
spring.kafka.producer.buffer-memory=33554432 # 每次批量发送消息的最大内存
spring.kafka.producer.properties.linger.ms=30000 # 批处理延迟时间上限。不管是否消息数量是否到达 batch-size 或者消息大小到达 buffer-memory 后,都直接发送一次请求。
  • 1
  • 2
  • 3

批量发送消息的producer看起来没有什么特别的区别

2.4 批量消费消息

application.properties

spring.kafka.listener.type=BATCH # 监听器类型,默认为 SINGLE ,只监听单条消息。配置 BATCH ,监听多条消息,批量消费
spring.kafka.consumer.max-poll-records=100 # poll 一次消息拉取的最大数量
spring.kafka.consumer.fetch-min-size= 10 # poll 一次消息拉取的最小数据量,单位:字节
spring.kafka.consumer.fetch-max-wait=10000 # poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息
  • 1
  • 2
  • 3
  • 4
// Demo02Consumer.java

@Component
public class Demo02Consumer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @KafkaListener(topics = Demo02Message.TOPIC,
            groupId = "demo02-consumer-group-" + Demo02Message.TOPIC)
    public void onMessage(List<Demo02Message> messages) {
        logger.info("[onMessage][线程编号:{} 消息数量:{}]", Thread.currentThread().getId(), messages.size());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2.5 消费重试

Spring-Kafka 提供消费重试的机制。在消息消费失败的时候,Spring-Kafka 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。

当然,Spring-Kafka 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,Spring-Kafka 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,Spring-Kafka 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

Spring-Kafka 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。后续,我们可以通过对死信队列中的消息进行重发,来使得消费者实例再次进行消费

KafkaConfiguration配置类,增加消费异常的 ErrorHandler 处理器

// KafkaConfiguration.java

@Configuration
public class KafkaConfiguration {

    @Bean
    @Primary
    public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {
        // <1> 创建 DeadLetterPublishingRecoverer 对象
        // 负责实现,在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。
        ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
        // <2> 创建 FixedBackOff 对象
        // 我们配置了重试 3 次,每次固定间隔 30 秒
        BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
        // <3> 创建 SeekToCurrentErrorHandler 对象
        // 处理异常,串联整个消费重试的整个过程
        return new SeekToCurrentErrorHandler(recoverer, backOff);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 消息消费失败时,SeekToCurrentErrorHandler 会将 调用 Kafka Consumer 的 #seek(TopicPartition partition, long offset) 方法,将 Consumer 对于该消息对应的 TopicPartition 分区的本地进度设置成该消息的位置。这样,Consumer 在下次从 Kafka Broker 拉取消息的时候,又能重新拉取到这条消费失败的消息,并且是第一条。
  • 同时,Spring-Kafka 使用 FailedRecordTracker 对每个 Topic 的每个 TopicPartition 消费失败次数进行计数,这样相当于对该 TopicPartition 的第一条消费失败的消息的消费失败次数进行计数
    声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/484000
推荐阅读
相关标签