当前位置:   article > 正文

Kafka生产者消费者实例_messagestreams.get(topic).get(0)

messagestreams.get(topic).get(0)

主要实现Kafka消费者和生产者最基础功能。

消费者实例:

public class MyKafkaConsumer implements Runnable {
private String topic;
public MyKafkaConsumer(String topic) {
super();
this.topic = topic;
}
// 加載kafka配置信息
public Properties createKafkaProperties() {
InputStream in = MyKafkaConsumer.class.getResourceAsStream("/kafka.properties");
Properties prop = new Properties();
try {
prop.load(in);
} catch (IOException e) {
e.printStackTrace();
} finally {
if(in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return prop;
}
@Override
public void run() {
// 獲取kafka連接
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(createKafkaProperties()));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[],byte[]> stream = messageStreams.get(topic).get(0);
ConsumerIterator<byte[],byte[]> iterator = stream.iterator();
while(iterator.hasNext()) {
String message = new String(iterator.next().message());
System.out.println("接收到: " + message);
}

}
public static void main(String[] args) {
new Thread(new MyKafkaConsumer("test")).start();
}
}

生产者实例:

public class MyKafkaProducer implements Runnable {
private String topic;
private LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
public MyKafkaProducer(String topic,LinkedBlockingQueue<String> queue) {
super();
this.topic = topic;
this.queue = queue;
}
public Properties createProperties() {
InputStream in = MyKafkaProducer.class.getResourceAsStream("/kafkaProducer");
Properties prop = new Properties();
try {
prop.load(in);
} catch (IOException e) {
e.printStackTrace();
} finally {
if(in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return prop;
}

@Override
public void run() {
Producer<String, String> producer = new Producer<>(new ProducerConfig(createProperties()));

while(true) {

try {

//此处可以为发送的消息指定Key和Value,进而能够根据指定key进行相应分区

producer.send(new KeyedMessage<String, String>(topic, queue.take()));
} catch (InterruptedException e) {
e.printStackTrace();
}

}

}

}


注:添加prop.put("serializer.class", StringEncoder.class.getName());语句,否则报序列化错误




声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/846050
推荐阅读
相关标签
  

闽ICP备14008679号