赞
踩
一 重要的字段 String clientId:Consumer唯一标识 ConsumerCoordinator coordinator: 控制Consumer与服务器端GroupCoordinator之间的通信逻辑 Fetcher<K, V> fetcher: 负责从服务器端获取消息的组件,并且更新partition的offset ConsumerNetworkClient client: 负责和服务器端通信 SubscriptionState subscriptions: 便于快速获取topic partition等状态,维护了消费者消费状态 Metadata metadata: 集群元数据信息 AtomicLong currentThread: 当前使用KafkaConsumer的线程id AtomicInteger refcount: 重入次数 二 核心的方法2.1 subscribe 订阅主题 订阅给定的主题列表,以获得动态分配的分区 主题的订阅不是增量的,这个列表将会代替当前的分配。注意,不可能将主题订阅与组管理与手动分区分配相结合 作为组管理的一部分,消费者将会跟踪属于某一个特殊组的消费者列表,如果满足在下列条件,将会触发再平衡操作:1 订阅的主题列表的那些分区数量的改变2 主题创建或者删除3 消费者组的成员挂了4 通过join api将一个新的消费者添加到一个存在的消费者组public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { // 取得一把锁 acquire(); try { if (topics == null) { // 主题列表为null,抛出异常 throw new IllegalArgumentException("Topiccollection to subscribe to cannot be null"); } else if (topics.isEmpty()) {// 主题列表为空,取消订阅 this.unsubscribe(); } else { for (String topic : topics) { if (topic == null || topic.trim().isEmpty()) throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or emptytopic"); } log.debug("Subscribed to topic(s):{}", Utils.join(topics, ", ")); this.subscriptions.subscribe(new HashSet<>(topics), listener); // 用新提供的topic集合替换当前的topic集合,如果启用了主题过期,主题的过期时间将在下一次更新中重新设置。 metadata.setTopics(subscriptions.groupSubscription()); } } finally { // 释放锁 release(); } }2.2 assign 手动分配分区 对于用户手动指定topic的订阅模式,通过此方法可以分配分区列表给一个消费者:public void assign(Collection<TopicPartition> partitions) { acquire(); try { if (
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。