当前位置:   article > 正文

KafkaConsumer分析_kafka consumer clientid

kafka consumer clientid

 

一 重要的字段 String clientId:Consumer唯一标识 ConsumerCoordinator coordinator: 控制Consumer与服务器端GroupCoordinator之间的通信逻辑 Fetcher<K, V> fetcher: 负责从服务器端获取消息的组件,并且更新partition的offset ConsumerNetworkClient client:  负责和服务器端通信 SubscriptionState subscriptions: 便于快速获取topic partition等状态,维护了消费者消费状态 Metadata metadata: 集群元数据信息 AtomicLong currentThread: 当前使用KafkaConsumer的线程id AtomicInteger refcount: 重入次数 二 核心的方法2.1 subscribe 订阅主题 订阅给定的主题列表,以获得动态分配的分区 主题的订阅不是增量的,这个列表将会代替当前的分配。注意,不可能将主题订阅与组管理与手动分区分配相结合 作为组管理的一部分,消费者将会跟踪属于某一个特殊组的消费者列表,如果满足在下列条件,将会触发再平衡操作:1 订阅的主题列表的那些分区数量的改变2 主题创建或者删除3 消费者组的成员挂了4 通过join api将一个新的消费者添加到一个存在的消费者组public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {     // 取得一把锁    acquire();     try {         if (topics == null) { // 主题列表为null,抛出异常            throw new IllegalArgumentException("Topiccollection to subscribe to cannot be null");         } else if (topics.isEmpty()) {// 主题列表为空,取消订阅            this.unsubscribe();         } else {             for (String topic : topics) {                 if (topic == null || topic.trim().isEmpty())                     throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or emptytopic");             }             log.debug("Subscribed to topic(s):{}", Utils.join(topics, ", "));             this.subscriptions.subscribe(new HashSet<>(topics), listener);             // 用新提供的topic集合替换当前的topic集合,如果启用了主题过期,主题的过期时间将在下一次更新中重新设置。            metadata.setTopics(subscriptions.groupSubscription());         }     } finally {         // 释放锁        release();     } }2.2 assign 手动分配分区 对于用户手动指定topic的订阅模式,通过此方法可以分配分区列表给一个消费者:public void assign(Collection<TopicPartition> partitions) {     acquire();     try {         if (

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/142464
推荐阅读
相关标签
  

闽ICP备14008679号