赞
踩
针对一下两种方案进行对比,并通过代码检验。基于Kafka2.2.1版本
在同一个Consumer Group中,创建多个Consumer,增加消费者性能。
创建Topic
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 10 --topic mul_consumer_10
表格中展示出不同Consumer实例下的执行结果:
Consumer 实例 | Partition | 结果 |
---|---|---|
8 | 10 | Consumer数量小于partition数量时,某个分区数据被消费完之后,会让空闲Consumer继续消费未消费的partition中的数据。(partition数量大于Consumer数量,会将多出的partition分给同一个Consumer) |
10 | 10 | 正常消费。每个Consumer实例分配一个partition,每个Consumer下的partition数据顺序消费。 |
12 | 10 | Consumer数量大于partition数量时,空闲的Consumer啥事也不干。 |
对应Java API代码:
Consumer Thread 实现类
public class ConsumerThread implements Runnable { private KafkaConsumer<String,String> kafkaConsumer; private String topic ; public ConsumerThread(String brokers,String topicId , String topic) { Properties properties = buildKafkaProperty(brokers,topicId); this.kafkaConsumer = new KafkaConsumer<>(properties); this.topic = topic; this.kafkaConsumer.subscribe(Arrays.asList(this.topic)); } private static Properties buildKafkaProperty(String brokers,String groupId){ Properties properties = new Properties(); properties.put("bootstrap.servers", brokers); properties.put("group.id", groupId); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return properties; } @Override public void run() { while (true){ ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100)); for(ConsumerRecord<String,String> record : records){ System.out.print(Threa
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。