赞
踩
如果生产者发送消息的速度过快,或者是消费者处理消息的速度太慢,那么就会有越来越多的消息无法及时消费,也就是消费积压。
消费积压时,
(1) 可以增加Topic的分区数,并且增加消费组的消费者数量,让消费者数等于分区数。
(2) 还可以使用多线程消费,提高消费速度。
public class ThirdMultiConsumerThreadDemo {
public static final String BROKER_LIST = "localhost:9092";
public static final String TOPIC = "myTopic1";
public static final String GROUP_ID = "group.demo";
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, TOPIC,
Runtime.getRuntime().availableProcessors());
consumerThread.start();
}
/***
* kafka配置
* @return
*/
public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return props;
}
/**
* kafka消费者线程
*/
public static class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;
private ExecutorService executorService;
private int threadNumber;
public KafkaConsumerThread(Properties props, String topic, int threadNumber) {
kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Collections.singletonList(topic));
this.threadNumber = threadNumber;
executorService = new ThreadPoolExecutor(threadNumber, threadNumber,
0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records =
kafkaConsumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
executorService.submit(new RecordsHandler(records));
}
}
} catch (Exception e) {
log.error("run error", e);
} finally {
kafkaConsumer.close();
}
}
}
/**
* 处理消息
*/
public static class RecordsHandler extends Thread {
public final ConsumerRecords<String, String> records;
public RecordsHandler(ConsumerRecords<String, String> records) {
this.records = records;
}
@Override
public void run() {
//处理records.
for (ConsumerRecord<String, String> record : records) {
System.out.println("==========>record:"+record.value() + ",thread:" + Thread.currentThread().getName());
}
}
}
}
发送消息后,使用多线程消息,运行结果如下:
==========>record:{"id":"1234","name":"lin"},thread:pool-1-thread-1
==========>record:{"id":"5678","name":"chen"},thread:pool-1-thread-2
==========>record:{"id":"91011","name":"wu"},thread:pool-1-thread-3
https://blog.csdn.net/sinat_32502451/category_12465196.html
《深入理解Kafka:核心设计与实践原理》
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。