赞
踩
1:创建一个consumer,多个worker线程执行消费其中worker的数量由下边代码中的new Integer(a_numThreads)参数决定
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(topic, new Integer(a_numThreads));
- ConsumerConfig consumerConfig = new ConsumerConfig(props);
- ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
- List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。