赞
踩
,KafkaConsumer是非线程安全的。它和KafkaProducer不同,后者是线程安全的,因此用户可以在多个线程中放心地使用同一个KafkaProducer实例,事实上这也是社区推荐的producer使用方法,因为通常它比每个线程维护一个KafkaProducer实例效率要高。
但是对于consumer而言,用户无法直接在多个线程中共享一个KafkaConsumer实例,下面给出两种多线程消费的方法以及各自的实例。
在这个方法中,用户创建多个线程来消费topic数据。每个线程都会创建专属于该线程的KafkaConsumer实例,如下图所示:
consumer group由多个线程的KafkaConsumer组成,每个线程负责消费固定数目的分区。下面给出一个完整的样例,该样例中包含3个类:
ConsumerRunnable.java
public class ConsumerRunnable implements Runnable { //每个线程维护私有的KafkaConsumer实例 private final KafkaConsumer<String, String> consumer; public ConsumerRunnable(String brokerList, String groupId, String topic) { Properties properties = new Properties(); properties.put("bootstrap.servers", brokerList); properties.put("group.id", groupId); properties.put("enable.auto.commit", "true");//自动提交offset properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { while (true) { ConsumerRecords<String, String> records = consumer.poll(200); for (ConsumerRecord<String, String> record : records) { System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() + "th message with offset: " + record.offset() + " value:" + record.value()); } } } }
ConsumerGroup.java
public class ConsumerGroup { private List<ConsumerRunnable> consumers; public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) { consumers = new ArrayList<>(consumerNum); for (int i = 0; i < consumerNum; ++i) { ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic); consumers.add(consumerThread); } } public void execute() { for (ConsumerRunnable task : consumers) { new Thread(task).start(); } } }
ConsumerMain.java
public class ConsumerMain {
public static void main(String[] args) {
String brokerList = "localhost:9092";
String groupId = "testGroup";
String topic = "testTopic";
int consumerNum = 3;
ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
consumerGroup.execute();
}
}
此方法与第一种方法的区别在于,我们将消息的获取与消息的处理解耦,把后者放入单独的工作者线程中,即所谓的worker线程中。同时在全局维护一个或若干个consumer实例执行消息获取任务,如图所示。
本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作。之后worker线程完成处理后上报位移状态,由全局consumer提交位移。代码中共有如下3个类。
Consumer ThreadHandler类
:consumer多线程管理类,用于创建线程池以及为每个线程分配消息集合。另外consumer位移提交也在该类中完成。Consumer Worker类
:本质上是一个Runnable,执行真正的消费逻辑并上报位移信息给ConsumerThreadHandler.Main类
:测试主方法类。ConsumerThreadHandler.java
public class ConsumerThreadHandler<K, V> { private final KafkaConsumer<K, V> consumer; private ExecutorService executors; private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); public ConsumerThreadHandler(String brokerList, String groupId, String topic) { Properties properties = new Properties(); properties.put("bootstrap.servers", brokerList); properties.put("group.id", groupId); properties.put("enable.auto.commit", "false");//手动提交offset properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<K, V>(properties); consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { consumer.commitSync(offsets);//提交位移 } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { offsets.clear(); } }); } /** * 消费方法 * @param threadNumber 线程池中的线程数 */ public void consume(int threadNumber) { executors = new ThreadPoolExecutor( threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); try { while (true) { ConsumerRecords<K, V> records = consumer.poll(1000L); if (!records.isEmpty()) { executors.submit(new ConsumerWorker<>(records, offsets)); } commitOffsets(); } } catch (WakeupException e) { } finally { commitOffsets(); consumer.close(); } } private void commitOffsets() { //尽量降低synchronized块对offsets锁定的时间 Map<TopicPartition, OffsetAndMetadata> unmodfiedMap; synchronized (offsets) { if (offsets.isEmpty()) { return; } unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets)); offsets.clear(); } consumer.commitSync(unmodfiedMap); } public void close() { consumer.wakeup(); executors.shutdown(); } }
ConsumerWorker.java
public class ConsumerWorker<K, V> implements Runnable { private final ConsumerRecords<K, V> records; private final Map<TopicPartition, OffsetAndMetadata> offsets; public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets) { this.records = records; this.offsets = offsets; } @Override public void run() { for (TopicPartition partition : records.partitions()) { //获取指定分区的记录 List<ConsumerRecord<K, V>> partitionRecords = this.records.records(partition); for (ConsumerRecord<K, V> record : partitionRecords) { //处理记录 System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() + "th message with offset: " + record.offset() + " value:" + record.value()); } //提交offset long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); synchronized (offsets) { if (!offsets.containsKey(partition)) { offsets.put(partition, new OffsetAndMetadata(lastOffset + 1)); } else { long curr = offsets.get(partition).offset(); if (curr <= lastOffset + 1) { offsets.put(partition, new OffsetAndMetadata(lastOffset + 1)); } } } } } }
Main.java
public class Main { public static void main(String[] args) { String brokerList = "localhost:9092"; String topic = "testTopic"; String groupId = "testGroup"; final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupId, topic); final int cpuCount = Runtime.getRuntime().availableProcessors(); Runnable runnable = new Runnable() { @Override public void run() { handler.consume(cpuCount); } }; new Thread(runnable).start(); try{ Thread.sleep(20000L); } catch (InterruptedException e) { } System.out.println("Starting to close the consumer...."); handler.close(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。