赞
踩
生产者和消费者往往是一对多的关系,多个消费者可以形成一个消费组来订阅主题消息,对消息进行分类。一个消费组中订阅的都是同一个主题,每个消费者接受主题一部分分区的消息。同一个消费组内的不同消费者只能订阅一个主题下不同分区的消息,不同消费组可以订阅同一个主题的统一分区消息
在同一个消费组下,根据消费者数量的不同,消费者订阅的主题数量也会变化,假设一个主题有4个分区p1,p2,p3,p4,只有唯一的消费者c1,则订阅关系为:
c1->p1
c1->p2
c1->p3
c1->p4
如果有两个消费者c1,c2,则订阅关系会发生变化,可能会变成:
c1->p1
c2->p2
c1->p3
c2->p4
如果消费者数量大于分区数,则会导致部分消费者闲置,不会接收到任何消息。如有5个消费者:
c1->p1
c2->p2
c3->p3
c4->p4
c5 闲置
从上面分析,可以通过适当地增加消费者数量来横向拓展消费能力,这是尤其当消费者需要做一些高延迟处理或发送者发送速率较大的情况下。但不要让消费者数量超过主题分区数。
一个消费组即可保证生产者生产的所有消息都被唯一消费,但如果存在多方对生产的消息感兴趣,可以初始化不同的消费组,不同消费组互不干扰,都分别能对生产者生产的所有消息进行唯一消费。示例如下所示:
对于消费组,有5个状态:
对应以下生命周期流转模型:
类似生产者,在创建消费者时,同样需要指定三个最基本的属性:
在3个必备配置外,还有一个最基本配置group.id,用来指定消费者属于哪一个消费者群组,如果不指定会分配在默认消费组,但这不太常见。
下面来看一个创建示例:
Properties properties = new Properties(); // 指定broker连接 properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); // 指定key反序列化工具类 properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定value反序列化工具类 properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定消费群组id properties.setProperty("group.id", "test2"); // 初始化消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 指定订阅主题,可以同时订阅多个主题,还可以通过指定test*订阅所有test开头的主题 consumer.subscribe(Collections.singletonList("kafka-topic")); // 循环消费消息 while (true) { // 不断尝试拉取 ConsumerRecords<String, String> records = consumer.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。