赞
踩
1、向指定的partition 发送数据:
ProducerRecord record = new ProducerRecord<String ,User>(topic,partition,key,message);
2.从指定的partition开始消费数据:
consummer.assgin(Arrays.asList(new TopicPartition(topic,0)));
consummer.poll(1000);
3从指定的partition的指定的offset开始消费数据:
KafkaConsummer consummer.subscribe(Arrays.asList(topic));
ConsummerRecords<String,User> records = consummer.poll(1000);
consummer.seek(new TopicPartition(topic,partitionNum),offset);
consummer.commitSync(); // 向ZK提交offset
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。