赞
踩
消费者客户端提供的消费方式
- 订阅模式:消费者指定订阅主题,由协调者为消费者分配动态的分区
- 分配模式:消费者指定消费特定的分区,但是这个模式会失去协调者为消费者动态分配分区的功能
Consumer
public class Consumer extends ShutdownableThread { private final KafkaConsumer<Integer, String> consumer; private final String topic; public Consumer(String topic) { super("KafkaConsumerExample", false); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); // 反序列化 key value props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 就是初始化几个核心组件 consumer = new KafkaConsumer<>(props); this.topic = topic; } @Override public void doWork() { // 订阅主题 consumer.subscribe(Collections.singletonList(this.topic)); // 拉取消息,0.10.1.0 官方案例,更高版本该传参已作废,新版传参 Duration.ofMillis(timeout) ConsumerRecords<Integer, String> records = consumer.poll(1000); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } } @Override public String name() { return null; } @Override public boolean isInterruptible() { return false; } }
ShutdownableThread
abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true) extends Thread(name) with Logging { this.setDaemon(false) this.logIdent = "[" + name + "], " val isRunning: AtomicBoolean = new AtomicBoolean(true) private val shutdownLatch = new CountDownLatch(1) def shutdown() = { initiateShutdown() awaitShutdown() } def initiateShutdown(): Boolean = { if(isRunning.compareAndSet(true, false)) { info("Shutting down") isRunning.set(false) if (isInterruptible) interrupt() true } else false } /** * After calling initiateShutdown(), use this API to wait until the shutdown is complete */ def awaitShutdown(): Unit = { shutdownLatch.await() info("Shutdown completed") } /** * This method is repeatedly invoked until the thread shuts down or this method throws an exception * 此方法被反复调用,直到线程关闭或该方法抛出异常为止 */ def doWork(): Unit override def run(): Unit = { info("Starting ") try{ while(isRunning.get()){ doWork() } } catch{ case e: Throwable => if(isRunning.get()) error("Error due to ", e) } shutdownLatch.countDown() info("Stopped ") } }
订阅模式调用的方法:subscribe()
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
...
if (topics.isEmpty()) {
// treat subscribing to empty topic list as the same as unsubscribing
// 将订阅空主题列表视为与取消订阅相同
this.unsubscribe();
} else {
// 更新订阅状态对象
this.subscriptions.subscribe(new HashSet<>(topics), listener);
// 为元数据设置最新的主题
metadata.setTopics(subscriptions.groupSubscription());
}
}
分配模式调用的方法:assign()
public void assign(Collection<TopicPartition> partitions) {
...
this.subscriptions.assignFromUser(new HashSet<>(partitions));
// 为元数据设置最新的主题
metadata.setTopics(topics);
}
private static class TopicPartitionState { // 拉取偏移量 private Long position; // last consumed position 最后消费位置 // 消费偏移量,提交偏移量 private OffsetAndMetadata committed; // last committed position 最后提交位置 // 分区是否被暂停拉取 private boolean paused; // whether this partition has been paused by the user 该分区是否已被用户暂停 // 重置策略 private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting 需要重置偏移量时使用的策略 public TopicPartitionState() { this.paused = false; this.position = null; this.committed = null; this.resetStrategy = null; } // 重置拉取偏移量(第一次分配给消费者时调用) private void awaitReset(OffsetResetStrategy strategy) { // 设置重置策略 this.resetStrategy = strategy; // 清空 position this.position = null; } public boolean awaitingReset() { return resetStrategy != null; } // 开始重置 private void seek(long offset) { // 设置 position this.position = offset; // 清空重置策略 this.resetStrategy = null; } // 更新拉取偏移量(拉取线程在拉取到消息后调用) private void position(long offset) { // 当前 position 必须有效,才可以更新 position if (!hasValidPosition()) throw new IllegalStateException("Cannot set a new position without a valid current position"); this.position = offset; } public boolean hasValidPosition() { return position != null; } // 更新提交偏移量(定时提交任务调用) private void committed(OffsetAndMetadata offset) { this.committed = offset; } private boolean isFetchable() { // 没有暂停,且 position 有效才可以拉取 return !paused && hasValidPosition(); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。