4.0.0Kafka-MavenKafka-Maven0.0.1-SNAPSHOTorg...._kafka.javaapi.consumer.consumerconnector">
当前位置:   article > 正文

kafka java api示例_kafka生产者和消费者的javaAPI的示例代码

kafka.javaapi.consumer.consumerconnector

写了个kafka的java demo 顺便记录下,仅供参考

1.创建maven项目

目录如下:

e34eb56dfaa30e05f93b5afb047e4dcf.png

2.pom文件:

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

Kafka-Maven

Kafka-Maven

0.0.1-SNAPSHOT

org.apache.kafka

kafka_2.11

0.10.1.1

org.apache.hadoop

hadoop-common

2.2.0

org.apache.hadoop

hadoop-hdfs

2.2.0

org.apache.hadoop

hadoop-client

2.2.0

org.apache.hbase

hbase-client

1.0.3

org.apache.hbase

hbase-server

1.0.3

org.apache.hadoop

hadoop-hdfs

2.2.0

jdk.tools

jdk.tools

1.7

system

${JAVA_HOME}/lib/tools.jar

org.apache.httpcomponents

httpclient

4.3.6

org.apache.maven.plugins

maven-compiler-plugin

1.7

1.7

3.kafka生产者KafkaProduce:

package com.lijie.producer;

import java.io.File;

import java.io.FileInputStream;

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class KafkaProduce {

private static Properties properties;

static {

properties = new Properties();

String path = KafkaProducer.class.getResource("/").getFile().toString()

+ "kafka.properties";

try {

FileInputStream fis = new FileInputStream(new File(path));

properties.load(fis);

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 发送消息

*

* @param topic

* @param key

* @param value

*/

public void sendMsg(String topic, byte[] key, byte[] value) {

// 实例化produce

KafkaProducer kp = new KafkaProducer(

properties);

// 消息封装

ProducerRecord pr = new ProducerRecord(

topic, key, value);

// 发送数据

kp.send(pr, new Callback() {

// 回调函数

@Override

public void onCompletion(RecordMetadata metadata,

Exception exception) {

if (null != exception) {

System.out.println("记录的offset在:" + metadata.offset());

System.out.println(exception.getMessage() + exception);

}

}

});

// 关闭produce

kp.close();

}

}

4.kafka消费者KafkaConsume:

package com.lijie.consumer;

import java.io.File;

import java.io.FileInputStream;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;

import com.lijie.pojo.User;

import com.lijie.utils.JsonUtils;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.serializer.StringDecoder;

import kafka.utils.VerifiableProperties;

public class KafkaConsume {

private final static String TOPIC = "lijietest";

private static Properties properties;

static {

properties = new Properties();

String path = KafkaConsume.class.getResource("/").getFile().toString()

+ "kafka.properties";

try {

FileInputStream fis = new FileInputStream(new File(path));

properties.load(fis);

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 获取消息

*

* @throws Exception

*/

public void getMsg() throws Exception {

ConsumerConfig config = new ConsumerConfig(properties);

ConsumerConnector consumer = kafka.consumer.Consumer

.createJavaConsumerConnector(config);

Map topicCountMap = new HashMap();

topicCountMap.put(TOPIC, new Integer(1));

StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());

StringDecoder valueDecoder = new StringDecoder(

new VerifiableProperties());

Map>> consumerMap = consumer

.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

KafkaStream stream = consumerMap.get(TOPIC).get(0);

ConsumerIterator it = stream.iterator();

while (it.hasNext()) {

String json = it.next().message();

User user = (User) JsonUtils.JsonToObj(json, User.class);

System.out.println(user);

}

}

}

5.kafka.properties文件

##produce

bootstrap.servers=192.168.80.123:9092

producer.type=sync

request.required.acks=1

serializer.class=kafka.serializer.DefaultEncoder

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

bak.partitioner.class=kafka.producer.DefaultPartitioner

bak.key.serializer=org.apache.kafka.common.serialization.StringSerializer

bak.value.serializer=org.apache.kafka.common.serialization.StringSerializer

##consume

zookeeper.connect=192.168.80.123:2181

group.id=lijiegroup

zookeeper.session.timeout.ms=4000

zookeeper.sync.time.ms=200

auto.commit.interval.ms=1000

auto.offset.reset=smallest

serializer.class=kafka.serializer.StringEncoder

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/575300
推荐阅读
相关标签
  

闽ICP备14008679号