当前位置:   article > 正文

Window下搭建kafka运行环境_windows搭建kafka环境

windows搭建kafka环境

项目场景:

互联网项目中经常用到MQ,由于本地项目开发连接测试环境kafka很不方便,所有在本机搭建一个kafka,方便开发测试。


前置准备

提示:Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper

下载Zookeeper地址:https://zookeeper.apache.org/releases.html

下载kafka地址:http://kafka.apache.org/downloads.html


配置Zookeeper

1.将下载好的文件解压到本地,如图:

复制zoo_sample.cfg文件,并将新复制的文件命名为zoo.cfg,修改文件zoo.cfg内容如下:

  1. dataDir=F:\mq\apache-zookeeper-3.6.3\dataDir
  2. dataLogDir=F:\mq\apache-zookeeper-3.6.3\dataLogDir

2.配置Window环境变量

3.启动Zookeeper

进入Zookeeper安装目录,cmd 输入命令zkserver,如图

 启动成功!!


配置kafka

1.解压下载文件到本地

进入F:\mq\kafka_2.13-2.8.0\config文件内,修改文件server.properties

log.dirs=F:\mq\kafka_2.13-2.8.0\logs

2.启动kafka服务

在安装目录cmd输入命令:

  .\bin\windows\kafka-server-start.bat .\config\server.properties

 无报错则正常启动,本地启动窗口不要关闭。

3.创建topic名称为syn_user的命令:

 .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic syn_user

4.查看创建的topic

 .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

本地kafka环境测试:

启动生产者

 .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic syn_user

启动消防者监听消息

 .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic syn_user


springboot 集成:

1.引入pom依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

2.yml配置

  1. kafka:
  2. bootstrap-servers: 127.0.0.1:9092
  3. producer:
  4. acks: -1
  5. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  6. value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  7. consumer:
  8. enable-auto-commit: false
  9. key-serializer: org.apache.kafka.common.serialization.StringDeserializer
  10. value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer
  11. group-id: test-consumer-group
  12. listener:
  13. ack-mode: MANUAL

3.创建消息生产者

  1. @RestController
  2. @Api(value = "mq消息", tags = "Fh-mq消息")
  3. @RequestMapping("/wkafka")
  4. public class ProducerController {
  5. private static final String KAFKA_TOPIC_NAME = "wlhydemo";
  6. @Autowired
  7. KafkaTemplate<String, String> kafka;
  8. @PostMapping("/send")
  9. public String register(@RequestBody User user) {
  10. try {
  11. String message = JSONUtil.toJsonStr(user);
  12. System.out.println("注册用户信息:" + message);
  13. kafka.send(KAFKA_TOPIC_NAME, message);
  14. return "OK";
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. }
  18. return "消息同步失败";
  19. }
  20. }

4.监听topic消息类

  1. @Slf4j
  2. @Component
  3. public class KaUserConsumer {
  4. @KafkaListener(topics = "wlhydemo")
  5. public void listenFlowStart(@Payload String businessStr,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
  6. @Header(KafkaHeaders.OFFSET) int offset )
  7. {
  8. try{
  9. // 模拟业务处理...
  10. log.info("当前消费分区:{}", partition);
  11. log.info("当前消费位置:{}", offset);
  12. log.info("接收到的消息:{}", businessStr);
  13. User user= JSONUtil.toBean(businessStr, User.class);
  14. user.getNickName();
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/614975
推荐阅读
相关标签
  

闽ICP备14008679号