当前位置:   article > 正文

Kafka超简单部署单机-zookeeper/kraft两种方式_metapropertiesensemble=metapropertiesensemble(meta

metapropertiesensemble=metapropertiesensemble(metadatalogdir=optional.empty,

1. 准备工作

        官网:Apache Kafka ,下载Kafka压缩包,本文以kafka_2.13-3.7.0.gz版本作为样例。
        解压并重命名文件夹为kafka,

  1. tar -zxvf kafka_2.13-3.7.0
  2. mv kafka_2.13-3.7.0 kafka

2. Zookeeper方式

        进入kafka目录,单节点部署,若使用kafka自带的zookeeper则不需要修改zookeeper.properties。这里我们直接修改kafka配置文件。

vi config/server.properties

        只修改两个地方,listeners 监听任意可用 IP, advertised.listeners 发布指定 IP。如果只在内网访问,则这两个参数可以都设置成内网IP地址。

  1. # 监听端口
  2. listeners=PLAINTEXT://0.0.0.0:9092
  3. # 外部调用地址
  4. advertised.listeners=PLAINTEXT://175.178.49.109:9092

        依赖zookeeper,需要先启动,启动时指定配置文件路径

  1. # 添加-daemon表示后台运行,这里我们学习过程不加这个参数,方便观察日志
  2. ./bin/zookeeper-server-start.sh [-daemon] config/zookeeper.properties

        再开一个新窗口启动kafka

./bin/kafka-server-start.sh config/server.properties 

        这样就把kafka和zookeeper 启动起来了。使用jps指令可以查看kafka进程

3. Kraft方式

        kraft模式不需要再依赖于zookeeper。因此只需要修改kraft的配置文件

        Kafka 需要一个唯一标识符(UUID)来进行存储管理。使用 kafka-storage.sh 脚本生成这个 UUID。也可以通过其它工具生成。然后用这个uuid格式化存储目录。若已经格式化过后想修改,则先要删除这个目录/tmp/kraft-combined-logs(也可能是你自己修改后的数据目录)

  1. [root@VM-20-7-centos kafka]# ./bin/kafka-storage.sh random-uuid
  2. nTtEGnZ1ThO0GiiTcbU0XA
  3. [root@VM-20-7-centos kafka]# bin/kafka-storage.sh format -t nTtEGnZ1ThO0GiiTcbU0XA -c config/kraft/server.properties
  4. metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY})
  5. Formatting /tmp/kraft-combined-logs with metadata.version 3.7-IV4.

        修改kraft的配置文件。目录: kafka/config/kraft/server.properties

  1. #指定参与控制器选举的节点(选民) 节点ID@IP:PORT
  2. controller.quorum.voters=1@localhost:9093
  3. # PLAINTEXT://:9092 表示监听所有网络接口9092端口
  4. # CONTROLLER://:9093 表示监听控制器节点之间的通信端口
  5. listeners=PLAINTEXT://:9092,CONTROLLER://:9093
  6. # 配置外部访问IP和端口
  7. advertised.listeners=PLAINTEXT://175.178.49.109:9092

        保存配置,启动kafka

./bin/kafka-server-start.sh config/kraft/server.properties

4. 服务器命令测试

        我们先通过命令的方式创建主题,实现生产、消费。

  1. # 创建名为 test-topic 的主题名称,partitions分区为1,replication-factor副本数量为1
  2. [root@VM-20-7-centos kafka]# bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  3. Created topic test-topic.
  4. #查看本地kafka内的主题列表
  5. [root@VM-20-7-centos kafka]# ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
  6. test-topic

        我们再打开两个窗口。一个窗口作为生产者,一个作为消费者。

  1. # 创建一个主题为 test-topic 的生产者,下放就可以输入消息按回车发送
  2. [root@VM-20-7-centos kafka]# bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
  3. >123
  4. >
  5. # 创建一个主题为 test-topic的消费者,屏幕会实时显示该主题内收到的消息,--from-beginning表示从第一条消息开始消费,若不带这个参数,则只能看到创建后产生的消息
  6. [root@VM-20-7-centos kafka]# bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
  7. 123

        效果如下图所示:

5. SpringBoot集成测试

        先保证服务器防火墙配置正确,确保9092端口可以访问。springboot创建过程就不提了哈~

        pom文件添加依赖。

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.kafka</groupId>
  8. <artifactId>spring-kafka</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-test</artifactId>
  13. <scope>test</scope>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.springframework.kafka</groupId>
  17. <artifactId>spring-kafka-test</artifactId>
  18. <scope>test</scope>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.projectlombok</groupId>
  22. <artifactId>lombok</artifactId>
  23. </dependency>
  24. </dependencies>

        生产者 KafkaProducer.class

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @Author 996
  7. * @Date 2024/5/24
  8. */
  9. @Component
  10. @Slf4j
  11. public class KafkaProducer {
  12. @Autowired
  13. private KafkaTemplate<String, String> kafkaTemplate;
  14. public void send(String topic, String message) {
  15. log.info("send message : topic:{},message:{}",topic, message);
  16. kafkaTemplate.send(topic,message);
  17. }
  18. }

        消费者 KafkaConsumer.class

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @Author 996
  7. * @Date 2024/5/24
  8. */
  9. @Slf4j
  10. @Component
  11. public class KafkaConsumer {
  12. @KafkaListener(id = "test-consumer-group",topics = {"test-topic"})
  13. public void consume1(ConsumerRecord<Object, Object> record) {
  14. log.info("收到消息:topic={},partition={}, offset={}, key={}, value={}",record.topic(),record.partition(),
  15. record.offset(),record.key(),record.value());
  16. }
  17. }

        加一个测试接口

  1. /**
  2. * @Author 996
  3. * @Date 2024/5/24
  4. */
  5. @RestController
  6. public class TestController {
  7. @Autowired
  8. private KafkaProducer kafkaProducer;;
  9. @GetMapping("/api/v1/test")
  10. public void test(@RequestParam String msg) {
  11. kafkaProducer.send("test-topic" ,msg);
  12. }
  13. }

        在springboot配置文件里面配置kafka

  1. spring.kafka.bootstrap-servers=175.178.49.109:9092
  2. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  3. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  4. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  5. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

        启动项目,打开浏览器调用API  localhost:8080/api/v1/test?msg=thisSpringbootMsg

        日志输出生产信息和消费信息

        命令行窗口输出消费信息

6. 总结

        本文仅记录个人学习过程,同时希望提供给有需要的同学减少弯路。本文只为了尽快搭建单机平台,以及集成到SpringBoot,因此很多细节上的问题都没有提,因为这类技术文档以及太多了,写这个文章的目的就是为了快!最快!环境搭建起来了,程序跑起来了,其它东西在使用过程中自然会遇到并且去学习了解,对吧~   

        欢迎各路大神指点。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/897790
推荐阅读
相关标签
  

闽ICP备14008679号