当前位置:   article > 正文

java多实例消费kafka_kafka消费者高级实例

java 多实例 处理 kafka

为什么使用高级消费者(High Level Consumer)

有时,我们消费Kafka的消息,并不关心偏移量,我们仅仅关心数据能被消费就行。High Level Consumer(高级消费者)提供了消费信息的方法而屏蔽了大量的底层细节。

首先要知道的是,高级消费者在zookeeper的特定分区存储最后的偏离。这个偏移当kafka启动时准备完毕。这一般是指消费者组(Consumer group)。

请小心,对于kafka集群消费群体的名字是全局的,任何的“老”逻辑的消费者应该被关闭,然后运行新的代码。当一个新的进程拥有相同的消费者群的名字,kafka将会增加进程的线程消费topic并且引发的“重新平衡(reblannce)”。在这个重新平衡中,kafka将分配现有分区到所有可用线程,可能移动一个分区到另一个进程的消费分区。如果此时同时拥有旧的的新的代码逻辑,将会有一部分逻辑进入旧得Consumer而另一部分进入新的Consumer中的情况.

设计一个高级消费者(Designing a High Level Consumer)

了解使用高层次消费者的第一件事是,它可以(而且应该!)是一个多线程的应用。线程围绕在你的主题分区的数量,有一些非常具体的规则:

如果你提供比在topic分区多的线程数量,一些线程将永远不会看到消息。

如果你提供的分区比你拥有的线程多,线程将从多个分区接收数据。

如果你每个线程上有多个分区,对于你以何种顺序收到消息是没有保证的。举个例子,你可能从分区10上获取5条消息和分区11上的6条消息,然后你可能一直从10上获取消息,即使11上也拥有数据。

添加更多的进程线程将使kafka重新平衡,可能改变一个分区到线程的分配。

这里是一个简单的消费者例子:

package com.test.groups;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

public class ConsumerTest implements Runnable {

private KafkaStream m_stream;

private int m_threadNumber;

public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {

m_threadNumber = a_threadNumber;

m_stream = a_stream;

}

public void run()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/781333
推荐阅读
相关标签
  

闽ICP备14008679号