当前位置:   article > 正文

Kafka 实战 - Kafka 消费者组 subscribe 和 assign 的正确使用_消费模式 assign subscribe

消费模式 assign subscribe

在使用Kafka消费者(Consumer)时,有两种主要的订阅模式:subscribeassign,分别适用于不同的场景和需求。理解并正确使用这两种模式对于构建高效且可靠的消费者应用至关重要。以下是关于subscribeassign的正确使用方法及适用场景:

**1. **subscribe方法(自动分配分区)

使用方法:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class SubscribeConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("topic1", "topic2")); // 订阅多个主题

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",
                        record.key(), record.value(), record.partition(), record.offset());
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
适用场景与特点:
  • 动态订阅:当订阅的主题列表发生变化(新增或删除主题)时,消费者会自动调整订阅关系。

  • 分区自动分配:加入消费者组的消费者会根据订阅的主题自动均衡地分配分区。Kafka使用内部的分区分配策略(如Range、RoundRobin等)确保分区公平分配,并在消费者增减时触发再平衡(Rebalance)过程,重新分配分区。

  • 容错与负载均衡:消费者组内任意消费者故障时,其负责的分区会被重新分配给存活的消费者,保证消息处理的持续性。同时,消费者组能够水平扩展,通过增加消费者实例来提高整体消费能力。

**2. **assign方法(手动分配分区)

使用方法:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class AssignConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        List<TopicPartition> partitions = Arrays.asList(
                new TopicPartition("topic1", 0),
                new TopicPartition("topic2", 1)
        );
        consumer.assign(partitions); // 手动分配分区

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",
                        record.key(), record.value(), record.partition(), record.offset());
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
适用场景与特点:
  • 静态分配:消费者手动指定要消费的分区列表,一旦设置,除非主动调用assign方法更新,否则不会改变订阅关系。

  • 无消费者组:使用assign方法的消费者通常不指定group.id,因此它们不会参与消费者组的再平衡过程,每个消费者独立消费自己指定的分区。

  • 精确控制:适用于需要对分区消费有精细控制的场景,比如特定的顺序消费要求、固定消费者与分区的映射关系、避免频繁Rebalance影响性能等。

  • 无容错与负载均衡:由于没有消费者组,当某个消费者故障时,其负责的分区不会被其他消费者接管,可能导致消息处理中断。同时,增加消费者实例不会自动分摊负载,需要手动调整分区分配。

总结来说,subscribe方法适用于需要动态订阅主题、自动分区分配、实现容错和负载均衡的场景;而assign方法适用于需要精确控制分区消费、避免Rebalance过程、不需要消费者组特性的场景。在实际应用中,应根据业务需求选择合适的订阅模式。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/638931
推荐阅读
相关标签
  

闽ICP备14008679号