赞
踩
先在服务器端启动消费者监听,
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
再运行下边生产者代码我们在上边的服务器端会收到JAVA客户端发送的数据
添加pom依赖
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.3.0</version>
- </dependency>
生产者代码
- package kafaka;
-
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
-
- import java.util.Properties;
-
- public class ProducerHelloworld {
- public static void main(String[] args) {
- // 1. 创建用于连接Kafka的Properties配置
-
- String topic = "test";
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.20.101:9092");
- props.put("acks", "all");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- // 2. 创建一个生产者对象KafkaProducer
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
-
- // 二、带回调函数异步方式
- producer.send(new ProducerRecord<String, String>("test", null, "wjm"), new Callback() {
-
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if(exception != null) {
- System.out.println("发送消息出现异常");
- }
- else {
- String topic = metadata.topic();
- int partition = metadata.partition();
- long offset = metadata.offset();
-
- System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
- }
- }
- });
-
-
- // 5. 关闭生产者
- producer.close();
- }
-
-
-
- }

二。使用JAVA编写消费者
建立消费者代码
- package kafaka;
-
-
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Collections;
- import java.util.Properties;
- import java.util.Random;
-
-
- public class KafaTest {
-
- public static String topic = "test";
-
- public static void main(String[] args) {
- new Thread(()-> new Producer().execute()).start();
- new Thread(()-> new Consumer().execute()).start();
- }
-
- public static class Consumer {
-
- private void execute() {
- Properties p = new Properties();
- p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.20.101:9092");
- p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- p.put(ConsumerConfig.GROUP_ID_CONFIG, topic);
-
- KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(p);
- // 订阅消息
- kafkaConsumer.subscribe(Collections.singletonList(topic));
-
- while (true) {
- ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
- record.topic(), record.offset(), record.value()));
- }
- }
- }
- }
-
-
- public static class Producer {
-
- private void execute() {
- Properties p = new Properties();
- //kafka地址,多个地址用逗号分割
- p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.20.101:9092");
- p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
-
- try {
- while (true) {
- String msg = "Hello," + new Random().nextInt(100);
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
- kafkaProducer.send(record);
- System.out.println("消息发送成功:" + msg);
- Thread.sleep(500);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- kafkaProducer.close();
- }
- }
-
- }
- }

首先我们运行消费者代码,接着在服务器上
使用生产者命令,生产数据kafka-console-producer.sh --broker-list localhost:9092 --topic test
发送数据
最后在代码的console端可以看到在服务器上发送的数据
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。