当前位置:   article > 正文

Kafka Consumer 消费 多线程vs多实例_kafka 多实例 多线程

kafka 多实例 多线程

Kafka Consumer消费性能提升

针对一下两种方案进行对比,并通过代码检验。基于Kafka2.2.1版本

1. 方案一

在同一个Consumer Group中,创建多个Consumer,增加消费者性能。
在这里插入图片描述

创建Topic

bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 10 --topic mul_consumer_10

表格中展示出不同Consumer实例下的执行结果:

Consumer 实例 Partition 结果
8 10 Consumer数量小于partition数量时,某个分区数据被消费完之后,会让空闲Consumer继续消费未消费的partition中的数据。(partition数量大于Consumer数量,会将多出的partition分给同一个Consumer)在这里插入图片描述
10 10 正常消费。每个Consumer实例分配一个partition,每个Consumer下的partition数据顺序消费。
在这里插入图片描述
12 10 Consumer数量大于partition数量时,空闲的Consumer啥事也不干。

对应Java API代码:

  • Consumer Thread 实现类

    public class ConsumerThread implements Runnable {
         
        private KafkaConsumer<String,String> kafkaConsumer;
        private String topic ;
    
        public ConsumerThread(String brokers,String topicId ,  String topic) {
         
    
            Properties properties = buildKafkaProperty(brokers,topicId);
            this.kafkaConsumer = new KafkaConsumer<>(properties);
            this.topic = topic;
            this.kafkaConsumer.subscribe(Arrays.asList(this.topic));
        }
    
        private static Properties buildKafkaProperty(String brokers,String groupId){
         
    
            Properties properties = new Properties();
            properties.put("bootstrap.servers", brokers);
            properties.put("group.id", groupId);
            properties.put("enable.auto.commit", "true");
            properties.put("auto.commit.interval.ms", "1000");
            properties.put("session.timeout.ms", "30000");
            properties.put("auto.offset.reset", "earliest");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            return properties;
        }
    
        @Override
        public void run() {
         
    
            while (true){
         
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                for(ConsumerRecord<String,String> record : records){
         
                    System.out.print(Threa
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/419844
推荐阅读
相关标签
  

闽ICP备14008679号