当前位置:   article > 正文

SpringBoot快速集成Kafak_springboot集成kafaka

springboot集成kafaka

前言:Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。Apache Kafka与传统消息系统相比,有以下不同:

  • 它被设计为一个分布式系统,易于向外扩展;

  • 它同时为发布和订阅提供高吞吐量;

  • 它支持多订阅者,当失败时能自动平衡消费者;

  • 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。


一、Kafka安装

在安装kafka之前必须先安装JDK和zookeeper

1.1、安装JDK (kafka 是Scala语言开发,运行在 jvm 上)

Centos7中安装Java环境,在安装前先查看有无安装过Java环境。直接运行java 命令,会提示命令未找到,则是没有安装过java环境。

(1) 查看yum源的java包

yum list java*

(2) 安装jdk1.8

yum -y install java-1.8.0-openjdk


安装完成后会在最下面显示完成

(3) 查看版本,检测安装是否成功

java -version

若出现上述提示表示JDK已经成功安装啦!

1.2、安装 Zookeeper (kafka 依赖 zookeeper)

# 解压文件
tar ‐zxvf apache‐zookeeper‐3.5.8‐bin.tar.gz

# 复制一份配置文件, 方便修改
cp conf/zoo_sample.cfg conf/zoo.cfg

# 启动
bin/zkServer.sh start

# 连接控制台
bin/zkCli.sh # 查看zk的根目录相关节点
ls /

1.3、安装Kafka

其实上面不用安装Zookeeper也行,Kafak自带Zookeeper

Kafka官网下载地址

(1) 解压 kafka,进入kakfa目录

tar -xzf kafka_2.13-2.6.0.tgz

cd kafka_2.13-2.6.0

(2) 修改 kafka-server 的配置文件

vim config/server.config

修改其中的:

  1. # broker.id属性在kafka集群中必须要是唯一
  2. broker.id=0
  3. # kafka的消息存储文件,别放默认的temp目录下,它默认空间不够
  4. log.dir=/usr/local/data/kafka‐logs
  5. # kafka 连接 zookeeper 的地址
  6. zookeeper.connect=localhost:2181
  7. #阿里云内网的ip地址和9092端口
  8. listeners=PLAINTEXT://xxx.xx.xx:9092
  9. # kafka部署的机器ip和提供服务的端口号,阿里云外网地址
  10. advertised.listeners=PLAINTEXT://xx.xx.xx:9092

别光用这个指定 listeners=PLAINTEXT://xxx.xxx.xx.xx:9092,会出现kafka主机名解析hostname异常,解析成主机名SpringBoot启动会报错

DEBUG [org.apache.kafka.clients.NetworkClient] - Error connecting to node 1 at hs:9092: java.io.IOException: Can't resolve address: hs:9092

原因:Windows 上运行的kafka拿到的host是机器名而不是IP地址所以会导致报错,hs是 Kafka 实例所在Linux服务器的主机名。解决方案就是用advertised.listeners=PLAINTEXT://xx.xx.xx:9092。

(3)  首先启动Zookeeper

如果未启动zookeeper 需要先启动zookeeper,如果没有自己安装的zk可以使用Kafka自带的:

启动ZK:使用Kafka安装包中的脚本启动单节点 Zookeeper 实例(加守护进程启动)

  1. #在bin的上一级目录执行命令
  2. bin/zookeeper-server-start.sh config/zookeeper.properties

(4)启动kakfa

kafka如果直接启动会出现问题,就是日志信息会打印在控制台台。然后关闭窗口,kafka随之关闭,可以后台启动命令解决这个问题。

  1. # 后台不挂断启动Kakfa服务 , 运行的日志打印在 logs 目录里的server.log 里
  2. nohup bin/kafka-server-start.sh config/server.properties &
  3. #这时候不知道是否启动成功,我们可以查看日志,kafka目录下
  4. tail -500f logs/server.log

nohup这里若不指定输出日志文件,缺省地会输出到nohup.out文件中;想要不输出日志可以

nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 & ,>/dev/null 2>&1表示不输出正常和错误日志。

(5)关闭kakfa

  1. # 停止kafka (先进入kafka目录)
  2. bin/kafka‐server‐stop.sh

(6)确认Linux 防火墙 kafka和Zookeeper的端口开启

如果是阿里云服务器,还得添加安全组入方向策略,因为它默认只开放远程连接的22端口

1.4、Kafka基本使用

  1. #创建topic
  2. bin/kafka-topics.sh --create --zookeeper localhost:2100 --replication-factor 1 --partitions 1 --topic test02
  3. #查看topic
  4. bin/kafka-topics.sh --list --zookeeper localhost:2100
  5. #生产者生产消息
  6. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test02
  7. 写一条this is test的信息
  8. #消费者消费消息
  9. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning

可以看到消费信息

在这里插入图片描述


 二、Kakfa使用

搭建kafka 和 zookeeper 环境 并且启动它们

2.1、引入Spring Kafak依赖

  1. <!--Kakfa-->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. </dependency>
  6. <!--Json工具-->
  7. <dependency>
  8. <groupId>com.google.code.gson</groupId>
  9. <artifactId>gson</artifactId>
  10. </dependency>
  11. <!--SpringMVC-->
  12. <dependency>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-starter-web</artifactId>
  15. </dependency>
  16. <!--LomBok使用@Data 注解-->
  17. <dependency>
  18. <groupId>org.projectlombok</groupId>
  19. <artifactId>lombok</artifactId>
  20. <version>1.18.4</version>
  21. <scope>provided</scope>
  22. </dependency>

2.2、修改application.yml,添加Kafak配置

  1. spring:
  2. kafka:
  3. bootstrap-servers: xx.xxx.xx.x:9092 #bootstrap-servers:连接kafka的地址,多个地址用逗号分隔 (指定kafka server的地址,集群配多个,中间,逗号隔开)
  4. consumer:
  5. group-id: myGroup
  6. enable-auto-commit: true
  7. auto-commit-interval: 100ms
  8. properties:
  9. session.timeout.ms: 15000
  10. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  11. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  12. auto-offset-reset: earliest
  13. producer:
  14. retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送
  15. batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
  16. buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
  17. key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类
  18. value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类

2.3、定义消息体Message

  1. package com.hs.demo.kakfa;
  2. import lombok.Data;
  3. import java.util.Date;
  4. //@Data : 注解在类上, 为类提供读写属性, 此外还提供了 equals()、hashCode()、toString() 方法
  5. @Data
  6. public class Message {
  7. private Long id;
  8. private String msg;
  9. private Date sendTime;
  10. }

2.4、定义KafkaProducer类

主要利用 KafkaTemplate 来发送消息 ,将消息封装成Message 并且进行转化成Json串 发送到Kafka中

  1. package com.hs.demo.kakfa;
  2. import com.google.gson.Gson;
  3. import com.google.gson.GsonBuilder;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. import org.springframework.stereotype.Component;
  7. import java.util.Date;
  8. //@Slf4j : 注解在类上, 为类提供一个属性名为 log 的 log4j 的日志对象
  9. @Component
  10. @Slf4j
  11. public class KafkaProducer {
  12. private final KafkaTemplate<String, String> kafkaTemplate;
  13. //构造器方式注入 kafkaTemplate
  14. public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
  15. this.kafkaTemplate = kafkaTemplate;
  16. }
  17. private Gson gson = new GsonBuilder().create();
  18. public void send(String msg) {
  19. Message message = new Message();
  20. message.setId(System.currentTimeMillis());
  21. message.setMsg(msg);
  22. message.setSendTime(new Date());
  23. log.info("【生产者发送消息:{}】", gson.toJson(message));
  24. //对 topic = hello2 的发送消息
  25. kafkaTemplate.send("hello2",gson.toJson(message));
  26. }
  27. }

2.5、定义KafkaConsumer

在监听的方法上通过注解配置一个监听器即可,另外就是指定需要监听的topic。kafka的消息在消费者接收端会被封装成ConsumerRecord对象返回,它内部的value属性就是实际的消息。

  1. package com.hs.demo.kakfa;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.stereotype.Component;
  6. import java.util.Optional;
  7. @Component
  8. @Slf4j
  9. public class KafkaConsumer {
  10. @KafkaListener(topics = {"hello2"})
  11. public void listen(ConsumerRecord<?, ?> record) {
  12. Optional.ofNullable(record.value())
  13. .ifPresent(message -> {
  14. log.info("【+++++++++++++++++ record = {} 】", record);
  15. log.info("【消费者接收message : {}】", message);
  16. });
  17. // Optional msg = Optional.ofNullable(record.value());
  18. // if (msg.isPresent())
  19. // {
  20. // log.info("消费者消费消息:{}", msg.get());
  21. // }
  22. }
  23. }

2.6、提供Kafak测试接口

提供一个 Http接口调用 KafkaProdcuer 去发送消息

  1. package com.hs.demo.kakfa;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.PathVariable;
  6. import org.springframework.web.bind.annotation.RestController;
  7. @RestController
  8. @Slf4j
  9. public class TestController {
  10. @Autowired
  11. private KafkaSender kafkaSender;
  12. @GetMapping("sendMessage/{msg}")
  13. public void sendMessage(@PathVariable("msg") String msg){
  14. kafkaSender.send(msg);
  15. }
  16. }

2.7、 启动SpringBoot项目,监听8080 端口

KafkaMessageListenerContainer中有 consumer group = myGroup 有一个 监听 hello2-0 topic 的 消费者

如何在SpringBoot项目中使用Kafka

(1)调用Http接口

http://localhost:8080/sendMessage/KafkaTestMsg

(2)输出结果


 参考链接:

Linux 安装 kafka 详细步骤

如何在SpringBoot项目中使用Kafka

springboot项目中使用Kafka消息队列

阿里云安装配置kafka, 本地windows10代码访问

Springboot注解@KafkaListener实现Kafka批量消费

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/471686
推荐阅读
相关标签
  

闽ICP备14008679号