赞
踩
Kafka自身提供的Java客户端来演示消息的收发,与Kafka的Java客户端相关的Maven依赖如下:
<properties> <scala.version>2.11</scala.version> <slf4j.version>1.7.21</slf4j.version> <kafka.version>2.0.0</kafka.version> <lombok.version>1.18.8</lombok.version> <junit.version>4.11</junit.version> <gson.version>2.2.4</gson.version> <protobuff.version>1.5.4</protobuff.version> <spark.version>2.3.1</spark.version> </properties> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
package com.demo.kafkademo.ch1; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * Kafka 消息生产者 */ public class ProducerFastStart { // Kafka集群地址 private static final String brokerList = "192.168.33.129:9092"; // 主题名称-之前已经创建 private static final String topic = "topicone"; public static void main(String[] args) { Properties properties = new Properties(); // 设置key序列化器 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //另外一种写法 //properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置重试次数 properties.put(ProducerConfig.RETRIES_CONFIG, 10); // 设置值序列化器 properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 设置集群地址 properties.put("bootstrap.servers", brokerList); // KafkaProducer 线程安全 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!"); try { producer.send(record); //RecordMetadata recordMetadata = producer.send(record).get(); //System.out.println("part:" + recordMetadata.partition() + ";topic:" + recordMetadata.topic()); } catch (Exception e) { e.printStackTrace(); } producer.close(); } }
package com.demo.kafkademo.ch1; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; /** * Kafka 消息消费者 */ public class ConsumerFastStart { // Kafka集群地址 private static final String brokerList = "192.168.33.129:9092"; // 主题名称-之前已经创建 private static final String topic = "topicone"; // 消费组 private static final String groupId = "group.demo"; public static void main(String[] args) { Properties properties = new Properties(); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("bootstrap.servers", brokerList); properties.put("group.id", groupId); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } } } }
先启动消费端,再启动生产端进行消息的发送
注意 : waring:使用java连接linux下kafka集群需要设置hosts绑定;
kafka 安装目录 config/server.properties 文件 其中 listeners=PLAINTEXT://:9092
改为listeners=PLAINTEXT://192.168.33.129:9092 (加上kafka服务所在虚拟机ip)
否则会出现异常: Connection to node 1 (localhost/127.0.0.1:9092) could not be established
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。