赞
踩
1.工程结构
2.pom文件
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.6.1</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <!--springboot 启动 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.21</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
3.resouce配置文件
- spring.application.name=kf-demo
- # 指定 kafka 的地址
- spring.kafka.bootstrapservers=192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092
- #指定 key 和 value 的序列化器
- spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
-
- # =========消费者配置开始=========
- # 指定 kafka 的地址
- #spring.kafka.bootstrapservers=hadoop102:9092,hadoop103:9092,hadoop104:9092
- # 指定 key 和 value 的反序列化器
- spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
- #指定消费者组的 group_id
- spring.kafka.consumer.group-id=test2
4.生产者
- package com.ljf.spring.boot.demo.producerspt;
-
- import com.ljf.spring.boot.demo.utils.DateUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * @ClassName: ProducerSpt
- * @Description: TODO
- * @Author: liujianfu
- * @Date: 2022/04/12 08:16:35
- * @Version: V1.0
- **/
- @RestController
- public class ProducerSpt {
- // Kafka 模板用来向 kafka 发送数据
- @Autowired
- KafkaTemplate<String, String> kafka;
-
- @RequestMapping("/send")
- public String data(String msg) {
- Map map= new HashMap<>();
- map.put("name","beijing");
- map.put("time", DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss"));
- map.put("msg",msg);
- kafka.send("kafka-ljf", map.toString());
- return "ok";
- }
- }
5.配置消费者
- package com.ljf.spring.boot.demo.consumerspt;
-
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.annotation.KafkaListener;
-
- /**
- * @ClassName: KafkaConsumer
- * @Description: TODO
- * @Author: liujianfu
- * @Date: 2022/04/12 08:21:59
- * @Version: V1.0
- **/
- @Configuration
- public class KafkaConsumer {
- // 指定要监听的 topic
- @KafkaListener(topics = "kafka-ljf")
- public void consumeTopic(String msg) { // 参数: 收到的 value
- System.out.println("收到的信息: " + msg);
- }
- }
6.启动zk,kafka集群,启动程序
启动程序
7.测试
生产端:
消费端:
1.作为生产者
2.作为消费者
具体代码实现见百度网盘pdf
1.作为生产者
2.作为消费者
见百度网盘
资料见百度网盘
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。