当前位置:   article > 正文

微服务集成Windows版kafka_kafka windows运行

kafka windows运行

微服务集成Windows版kafka

1-兼容

Kafka 和 Spring Boot

兼容版本https://spring.io/projects/spring-kafka/
在这里插入图片描述

2-雷点

依赖版本需要匹配Spring Boot版本,这里使用的 <spring-boot.version>3.1.5</spring-boot.version> 版本

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>3.6.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

mvnrepositoryhttps://mvnrepository.com/

一个 Maven 仓库的在线查找工具,用于查找和浏览 Java 开发中使用的依赖库(dependencies)的信息

3-安装

  1. Zookeeper

    Apache ZooKeeper 项目的存档目录https://archive.apache.org/dist/zookeeper/

    在这个目录下,可以找到 Apache ZooKeeper 发布的历史版本以及与这些版本相关的二进制文件、源代码和其他相关文档。

    Kafka 依赖于 Zookeeper,所以首先需要启动 Zookeeper 服务器,这里使用的 apache-zookeeper-3.5.5-bin 版本

  2. kafka

    Apache Kafka 官方网站下载https://kafka.apache.org/downloads

    这里使用的 kafka_2.12-3.5.1 版本

4-配置

  1. 环境配置(可选操作)
    可以将 Kafka 的 bin 目录添加到系统的 PATH 环境变量中,方便可以在任何地方运行 Kafka 相关的命令。

  2. apache-zookeeper-3.5.5-bin\conf\zoo.cfg

    # ZooKeeper 基本时间单元,用于计算时间的基本单位(毫秒)
    tickTime=2000
    
    # 存储 ZooKeeper 数据的目录
    dataDir=D:/myApp/zookeeper/apache-zookeeper-3.5.5-bin/data
    
    # 用于接受客户端连接的端口号
    clientPort=2181
    
    # ZooKeeper AdminServer 的端口号(默认端口8080)
    admin.serverPort=8081
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

5-启动

  1. Zookeeper:bin目录

    zkServer.cmd
    
    • 1
  2. Kafka:kafka_2.12-3.5.1目录

    .\bin\windows\kafka-server-start.bat .\config\server.properties
    
    • 1

6-实现

  1. 生产者
package com.xueyi.sample.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 配置Kafka生产者
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者
        Producer<String, String> producer = new KafkaProducer<>(properties);
        // 发送消息
        ProducerRecord<String, String> record = new ProducerRecord<>("your_topic", "key", "Hello , Kafka!");
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                System.out.println("Message sent successfully! Topic: " + metadata.topic() +
                        ", Partition: " + metadata.partition() + ", Offset: " + metadata.offset());
            } else {
                exception.printStackTrace();
            }
        });

        // 关闭生产者
        producer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  1. 消费者

    package com.xueyi.sample.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaConsumerExample {
        public static void main(String[] args) {
            // 配置Kafka消费者
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "localhost:9092");
            properties.put("group.id", "your_group_id");
            properties.put("key.deserializer", StringDeserializer.class.getName());
            properties.put("value.deserializer", StringDeserializer.class.getName());
    
            // 创建Kafka消费者
            Consumer<String, String> consumer = new KafkaConsumer<>(properties);
    
            // 订阅主题
            consumer.subscribe(Collections.singletonList("your_topic"));
    
            // 拉取消息
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                records.forEach(record -> {
                    System.out.println("Received message: Key = " + record.key() +
                            ", Value = " + record.value() +
                            ", Topic = " + record.topic() +
                            ", Partition = " + record.partition() +
                            ", Offset = " + record.offset());
                });
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
  2. 发送消息

    Message sent successfully! Topic: your_topic, Partition: 0, Offset: 7
    在这里插入图片描述

  3. 接收消息

    Received message: Key = key, Value = Hello, Kafka!, Topic = your_topic, Partition = 0, Offset = 7

    在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/354152
推荐阅读
相关标签
  

闽ICP备14008679号