赞
踩
Kafka 和 Spring Boot
兼容版本:https://spring.io/projects/spring-kafka/
依赖版本需要匹配Spring Boot版本,这里使用的 <spring-boot.version>3.1.5</spring-boot.version> 版本
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>3.6.0</version>
</dependency>
mvnrepository:https://mvnrepository.com/
一个 Maven 仓库的在线查找工具,用于查找和浏览 Java 开发中使用的依赖库(dependencies)的信息
Zookeeper
Apache ZooKeeper 项目的存档目录:https://archive.apache.org/dist/zookeeper/
在这个目录下,可以找到 Apache ZooKeeper 发布的历史版本以及与这些版本相关的二进制文件、源代码和其他相关文档。
Kafka 依赖于 Zookeeper,所以首先需要启动 Zookeeper 服务器,这里使用的 apache-zookeeper-3.5.5-bin 版本
kafka
Apache Kafka 官方网站下载:https://kafka.apache.org/downloads
这里使用的 kafka_2.12-3.5.1 版本
环境配置(可选操作)
可以将 Kafka 的 bin
目录添加到系统的 PATH
环境变量中,方便可以在任何地方运行 Kafka 相关的命令。
apache-zookeeper-3.5.5-bin\conf\zoo.cfg
# ZooKeeper 基本时间单元,用于计算时间的基本单位(毫秒)
tickTime=2000
# 存储 ZooKeeper 数据的目录
dataDir=D:/myApp/zookeeper/apache-zookeeper-3.5.5-bin/data
# 用于接受客户端连接的端口号
clientPort=2181
# ZooKeeper AdminServer 的端口号(默认端口8080)
admin.serverPort=8081
Zookeeper:bin目录
zkServer.cmd
Kafka:kafka_2.12-3.5.1目录
.\bin\windows\kafka-server-start.bat .\config\server.properties
package com.xueyi.sample.kafka.producer; import org.apache.kafka.clients.producer.*; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { // 配置Kafka生产者 Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 创建Kafka生产者 Producer<String, String> producer = new KafkaProducer<>(properties); // 发送消息 ProducerRecord<String, String> record = new ProducerRecord<>("your_topic", "key", "Hello , Kafka!"); producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.println("Message sent successfully! Topic: " + metadata.topic() + ", Partition: " + metadata.partition() + ", Offset: " + metadata.offset()); } else { exception.printStackTrace(); } }); // 关闭生产者 producer.close(); } }
消费者
package com.xueyi.sample.kafka.consumer; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { // 配置Kafka消费者 Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("group.id", "your_group_id"); properties.put("key.deserializer", StringDeserializer.class.getName()); properties.put("value.deserializer", StringDeserializer.class.getName()); // 创建Kafka消费者 Consumer<String, String> consumer = new KafkaConsumer<>(properties); // 订阅主题 consumer.subscribe(Collections.singletonList("your_topic")); // 拉取消息 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { System.out.println("Received message: Key = " + record.key() + ", Value = " + record.value() + ", Topic = " + record.topic() + ", Partition = " + record.partition() + ", Offset = " + record.offset()); }); } } }
发送消息
Message sent successfully! Topic: your_topic, Partition: 0, Offset: 7
接收消息
Received message: Key = key, Value = Hello, Kafka!, Topic = your_topic, Partition = 0, Offset = 7
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。