赞
踩
上一篇Kafka开发实战(二)-集群环境搭建文章中,我们已经搭建起了Kafka集群了,接下来我们通过代码演示如何发布、订阅消息。
1、添加maven依赖
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>0.9.0.1</version>
- </dependency>
我使用的Kafka版本是0.9.0.1,下面来看看Kafka Producer代码
2、KafkaProducer
- package com.ricky.codelab.kafka;
-
- import java.io.IOException;
- import java.util.Properties;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import com.ricky.codelab.kafka.util.PropertyUtils;
-
- public class KafkaProducerDemo {
-
- private int total = 1000000;
-
- public static void main(String[] args) {
-
- new KafkaProducerDemo().send();
- }
-
- public void send(){
-
- long start = System.currentTimeMillis();
- System.out.println("Kafka Producer send msg start,total msgs:"+total);
-
- // set up the producer
- Producer<String, String> producer = null;
- try {
- Properties props = PropertyUtils.load("producer_config.properties");
- producer = new KafkaProducer<>(props);
-
- for (int i = 0; i < total; i++){
- producer.send(new ProducerRecord<String, String>("hello",
- String.valueOf(i), String.format("{\"type\":\"test\", \"t\":%d, \"k\":%d}", System.currentTimeMillis(), i)));
-
- // every so often send to a different topic
- if (i % 1000 == 0) {
- producer.send(new ProducerRecord<String, String>("test", String.format("{\"type\":\"marker\", \"t\":%d, \"k\":%d}", System.currentTimeMillis(), i)));
- producer.send(new ProducerRecord<String, String>("hello", String.format("{\"type\":\"marker\", \"t\":%d, \"k\":%d}", System.currentTimeMillis(), i)));
-
- producer.flush();
- System.out.println("Sent msg number " + i);
- }
-
- }
-
- } catch (IOException e) {
- e.printStackTrace();
- }finally{
- producer.close();
- }
-
- System.out.println("Kafka Producer send msg over,cost time:"+(System.currentTimeMillis()-start)+"ms");
- }
- }
KafkaProducer在构造时需要传入一个Properties对象来传入Kafka相关配置信息,producer_config.properties如下:
- bootstrap.servers=172.18.19.206:9092,172.18.19.207:9092,172.18.19.208:9092
- acks=all
- retries=0
- batch.size=16384
- linger.ms=1
- buffer.memory=33554432
- auto.commit.interval.ms=1000
- key.serializer=org.apache.kafka.common.serialization.StringSerializer
- value.serializer=org.apache.kafka.common.serialization.StringSerializer
3、KafkaConsumer
- package com.ricky.codelab.kafka;
-
- import java.io.IOException;
- import java.util.Arrays;
- import java.util.Properties;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import com.google.gson.JsonObject;
- import com.google.gson.JsonParser;
- import com.ricky.codelab.kafka.util.PropertyUtils;
-
- public class KafkaConsumerDemo {
-
- public static void main(String[] args) {
-
- new KafkaConsumerDemo().consume();
- }
-
- public void consume() {
-
- JsonParser jsonParser = new JsonParser();
-
- // and the consumer
- KafkaConsumer<String, String> consumer = null;
- try {
- Properties props = PropertyUtils.load("consumer_config.properties");
- consumer = new KafkaConsumer<>(props);
-
- //subscribe topics
- consumer.subscribe(Arrays.asList("hello", "test"));
-
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records){
-
- // System.out.printf("offset -> %d, key -> %s, value -> %s",
- // record.offset(), record.key(), record.value());
-
- switch (record.topic()) {
- case "hello":
-
- JsonObject jObj = (JsonObject)jsonParser.parse(record.value());
- switch (jObj.get("type").getAsString()) {
- case "test":
-
- long latency = System.currentTimeMillis() - jObj.get("t").getAsLong();
- System.out.println(latency);
-
- break;
- case "marker":
-
- break;
- default:
- break;
- }
- break;
- case "test":
-
- break;
- default:
- throw new IllegalStateException("Shouldn't be possible to get message on topic " + record.topic());
- }
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }finally{
- if(consumer!=null){
- consumer.close();
- }
- }
- }
- }
KafkaConsumer在构造时也需要传入java.util.Properties对象,来告诉它Kafka相关的配置信息,consumer_config.properties如下:
- bootstrap.servers=172.18.19.206:9092,172.18.19.207:9092,172.18.19.208:9092
- group.id=test
- enable.auto.commit=true
- auto.commit.interval.ms=1000
- session.timeout.ms=30000
- key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
在KafkaProducer和KafkaConsumer都使用到了PropertyUtils类,它的作用是将.properties文件转换为java.util.Properties对象,代码如下:
- package com.ricky.codelab.kafka.util;
-
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.util.Properties;
-
- import org.apache.commons.io.IOUtils;
-
- public class PropertyUtils {
-
- private PropertyUtils(){
-
- }
-
- public static Properties load(File file) throws IOException{
-
- InputStream in = null;
- try {
- in = new FileInputStream(file);
- Properties props = new Properties();
- props.load(in);
-
- return props;
-
- }finally{
- IOUtils.closeQuietly(in);
- }
- }
-
- public static Properties load(String path) throws IOException{
-
- InputStream in = null;
- try {
- in = PropertyUtils.class.getClassLoader().getResourceAsStream(path);
- Properties props = new Properties();
- props.load(in);
-
- return props;
-
- }finally{
- IOUtils.closeQuietly(in);
- }
- }
- }
4、打包执行
将KafkaProducerDemo.java、KafkaConsumerDemo.java 分别单独打成CLI jar包(KafkaProducer.jar和KafkaConsumer.jar),然后分别启动 KafkaProducer.jar和KafkaConsumer.java,就能看到控制台输出信息了。
点此下载完整项目代码
参考资料:
KafkaProducer javadoc:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
KafkaConsumer javadoc:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。