赞
踩
此时我们可以直接选择使用Apache Kafka,另外项目还可以加个Spring Web准备让前台调用
如果你不是像上述一样新建的项目,那你也可以选择在已有的Spring Boot应用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
在application.properties文件中添加以下配置:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group
这里我们指定了Kafka服务器的地址和端口,并配置了消费者组的ID,关于消费者组的概念,其实就是某一些消费者具备相同的功能,因此会把他们设为同一个消费者组,这样他们就不会重复消费同一条消息了。更具体地原理,我们会在之后地篇章中介绍。
在Kafka中,生产者是发送消息的应用程序或服务。在Spring Boot中,我们可以使用KafkaTemplate类来创建Kafka生产者
package com.zhanfu.kafkademo.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { kafkaTemplate.send("test\_topic", message); } }
这里我们使用@Autowired注解来自动注入KafkaTemplate,并使用send方法将消息发送到名为“test_topic”的Kafka主题中。
在Kafka中,消费者是接收并处理订阅主题消息的应用程序或服务。在Spring Boot中,我们可以使用@KafkaListener注解来创建Kafka消费者。
package com.zhanfu.kafkademo.listener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaLis {
@KafkaListener(topics = "test\_topic", groupId = "test\_group")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
现在我们已经完成了Spring Boot和Kafka的整合。我们可以启动Spring Boot应用程序,然后发送消息并消费它,以测试我们的应用程序是否正确地与Kafka集成。
package com.zhanfu.kafkademo.controller; import com.zhanfu.kafkademo.service.KafkaService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; @RestController public class MessageController { @Autowired private KafkaService kafkaService; @GetMapping("/send/{message}") public String sendMessage(@PathVariable String message) { kafkaService.sendMessage(message); return "Message sent successfully"; } }
在这个例子中,我们使用@Autowired注解来自动注入KafkaProducer,并通过发送消息的方法来调用sendMessage方法。最终项目整体框架如图:
首先自然是启动 Kafka ,怎么启动可参考 《上手第一关,手把手教你安装kafka与可视化工具kafka-eagle》,然后是启动我们的Spring Boot项目
然后在浏览器中输入
http://127.0.0.1:8080/send/hello
最后检查我们的项目日志:
可以看到,整个发送和接收的流程都走通了
不难看出,在Springboot中,使用kafka的关键在于 KafkaTemplate
, 它是 Spring 提供的 Kafka 生产者模版,用于向 Kafka 集群发送消息。并且把 Kafka 的生产者客户端封装成了一个 Spring Bean,提供更加方便易用的 API。
它有三个主要属性:
producerFactory
:生产者工厂类,用于创建 KafkaProducer 实例。defaultTopic
:默认主题名称,如果在发送消息时没有指定主题名称,则使用该默认主题。messageConverter
:消息转换器,用于将消息对象转换为 Kafka ProducerRecord它的主要方法:
除了上述方法外,KafkaTemplate 还提供了其他方法,如 sendDefault()、sendOffsetsToTransaction() 等,可以根据实际需要进行选择和使用。
需要注意的是,在使用 KafkaTemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等问题,以保证消息的可靠性和正确性。
当然,很多同学可能还注意到一个细节,我们在上面的Demo中,我们直接将其 @Autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 KafkaTemplate
为什么自己就会被spring 容器管理的呢?其实这得益于SpringBoot中对Kafka有了很多自动配置的内容。如下:
如上图,相信对Spring Boot熟悉的同学看到 ConditionalOnClass
、 ConditionalOnMissingBean
应该就明白了。其实Spring Boot 早就贴心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 KafkaTemplate 类,那么它就能被自动配置并存入Spring 容器内
今天我们通过一个Demo讲解了在SpringBoot中如何对接Kafka,也介绍了下关键类 KafkaTemplate ,得益于Spring Boot 的自动配置,开发者要做的配置内容其实并不多,使用也主要是依赖其提供的API,相对简单,相信大家很容易也都学会了,那么在后面的过程中,我们将继续学习其使用,并且会着重讲解 Kafka 的原理与结构
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!*
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
[外链图片转存中…(img-dGdZj4ez-1712849706300)]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。