赞
踩
官网:Apache Kafka ,下载Kafka压缩包,本文以kafka_2.13-3.7.0.gz版本作为样例。
解压并重命名文件夹为kafka,
- tar -zxvf kafka_2.13-3.7.0
- mv kafka_2.13-3.7.0 kafka
进入kafka目录,单节点部署,若使用kafka自带的zookeeper则不需要修改zookeeper.properties。这里我们直接修改kafka配置文件。
vi config/server.properties
只修改两个地方,listeners 监听任意可用 IP, advertised.listeners 发布指定 IP。如果只在内网访问,则这两个参数可以都设置成内网IP地址。
- # 监听端口
- listeners=PLAINTEXT://0.0.0.0:9092
- # 外部调用地址
- advertised.listeners=PLAINTEXT://175.178.49.109:9092
依赖zookeeper,需要先启动,启动时指定配置文件路径
- # 添加-daemon表示后台运行,这里我们学习过程不加这个参数,方便观察日志
- ./bin/zookeeper-server-start.sh [-daemon] config/zookeeper.properties
再开一个新窗口启动kafka
./bin/kafka-server-start.sh config/server.properties
这样就把kafka和zookeeper 启动起来了。使用jps指令可以查看kafka进程
kraft模式不需要再依赖于zookeeper。因此只需要修改kraft的配置文件
Kafka 需要一个唯一标识符(UUID)来进行存储管理。使用 kafka-storage.sh
脚本生成这个 UUID。也可以通过其它工具生成。然后用这个uuid格式化存储目录。若已经格式化过后想修改,则先要删除这个目录/tmp/kraft-combined-logs(也可能是你自己修改后的数据目录)
- [root@VM-20-7-centos kafka]# ./bin/kafka-storage.sh random-uuid
- nTtEGnZ1ThO0GiiTcbU0XA
- [root@VM-20-7-centos kafka]# bin/kafka-storage.sh format -t nTtEGnZ1ThO0GiiTcbU0XA -c config/kraft/server.properties
- metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY})
- Formatting /tmp/kraft-combined-logs with metadata.version 3.7-IV4.
修改kraft的配置文件。目录: kafka/config/kraft/server.properties
- #指定参与控制器选举的节点(选民) 节点ID@IP:PORT
- controller.quorum.voters=1@localhost:9093
-
-
- # PLAINTEXT://:9092 表示监听所有网络接口9092端口
- # CONTROLLER://:9093 表示监听控制器节点之间的通信端口
- listeners=PLAINTEXT://:9092,CONTROLLER://:9093
-
- # 配置外部访问IP和端口
- advertised.listeners=PLAINTEXT://175.178.49.109:9092
保存配置,启动kafka
./bin/kafka-server-start.sh config/kraft/server.properties
我们先通过命令的方式创建主题,实现生产、消费。
- # 创建名为 test-topic 的主题名称,partitions分区为1,replication-factor副本数量为1
- [root@VM-20-7-centos kafka]# bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
- Created topic test-topic.
-
- #查看本地kafka内的主题列表
- [root@VM-20-7-centos kafka]# ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
- test-topic
我们再打开两个窗口。一个窗口作为生产者,一个作为消费者。
- # 创建一个主题为 test-topic 的生产者,下放就可以输入消息按回车发送
- [root@VM-20-7-centos kafka]# bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
- >123
- >
-
-
- # 创建一个主题为 test-topic的消费者,屏幕会实时显示该主题内收到的消息,--from-beginning表示从第一条消息开始消费,若不带这个参数,则只能看到创建后产生的消息
- [root@VM-20-7-centos kafka]# bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
- 123
效果如下图所示:
先保证服务器防火墙配置正确,确保9092端口可以访问。springboot创建过程就不提了哈~
pom文件添加依赖。
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- </dependencies>
生产者 KafkaProducer.class
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Component;
-
- /**
- * @Author 996
- * @Date 2024/5/24
- */
- @Component
- @Slf4j
- public class KafkaProducer {
-
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
-
- public void send(String topic, String message) {
- log.info("send message : topic:{},message:{}",topic, message);
- kafkaTemplate.send(topic,message);
- }
- }
消费者 KafkaConsumer.class
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
-
- /**
- * @Author 996
- * @Date 2024/5/24
- */
- @Slf4j
- @Component
- public class KafkaConsumer {
-
- @KafkaListener(id = "test-consumer-group",topics = {"test-topic"})
- public void consume1(ConsumerRecord<Object, Object> record) {
- log.info("收到消息:topic={},partition={}, offset={}, key={}, value={}",record.topic(),record.partition(),
- record.offset(),record.key(),record.value());
- }
-
- }
加一个测试接口
- /**
- * @Author 996
- * @Date 2024/5/24
- */
- @RestController
- public class TestController {
-
- @Autowired
- private KafkaProducer kafkaProducer;;
-
- @GetMapping("/api/v1/test")
- public void test(@RequestParam String msg) {
- kafkaProducer.send("test-topic" ,msg);
- }
- }
在springboot配置文件里面配置kafka
- spring.kafka.bootstrap-servers=175.178.49.109:9092
- spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
启动项目,打开浏览器调用API localhost:8080/api/v1/test?msg=thisSpringbootMsg
日志输出生产信息和消费信息
命令行窗口输出消费信息
本文仅记录个人学习过程,同时希望提供给有需要的同学减少弯路。本文只为了尽快搭建单机平台,以及集成到SpringBoot,因此很多细节上的问题都没有提,因为这类技术文档以及太多了,写这个文章的目的就是为了快!最快!环境搭建起来了,程序跑起来了,其它东西在使用过程中自然会遇到并且去学习了解,对吧~
欢迎各路大神指点。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。