赞
踩
最近遇到一个问题,由于kafka接收数据进行处理所花费的时间较长,导致kafka队列中有堆积,然后就想开启很多个consumer但是不怎么会用,报了一些错误,通过一天的学习了解,终于可以让多个consumer共同消费topic中的数据了。
使用3个producer同时对一个topic写入数据,其中使用2个group组来对数据进行读取,其中topic中的partitions定为2。在每个group下又创建2个consumer进行消费数据。
在项目刚开始,我只在topic中设置了一个partitions就是只有一个消费者来消费我传入的数据,但是由于项目的变化,消费的太慢写入的太快,给kafka带来了数据堆积,于是我又加了一个consumer来进行数据的消费,由于刚开始没有给group创建ID使用默认ID,但是发现我每个consumer消费的数据是相同的,并没有达到我的需求。kafka的一条数据会被我的2个consumer同时消费,消费2次,并没有增加效率,而且还给系统带来负担,后台查询官网API发现group中是有ID的,如果没有创建就自动使用默认ID这个一定要注意。其次是一个partition对应一个consumer,如果consumer的数量大于Topic中partition的数量就会有的consumer接不到数据(设置ID不使用默认ID的情况下)。
为了满足的我业务需求我做了一下调整:
增加topic中partition中的数量。
相应增加consumer的数量 consumer的数量<=partition的数量
这里需要强调的是不同的group组之间不做任何影响,就如同我一个group做python机器学习。另一个做Spark计算,这2个group的数据都是相互不影响的,这也是kafka很好用的东西。
下面java的代码如下需要对给group添加一个name:
consumer1
public static Properties props;
static {
props = new Properties();
props.put("zookeeper.connect", "node1:2181/kafka");
props.put("serializer.class", StringEncoder.class.getName());
props.put(
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。