赞
踩
kafka环境搭建参考Kafka:安装和配置_moreCalm的博客-CSDN博客
1、springboot中引入kafka依赖
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <!-- kafkfa -->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- </dependency>
- </dependencies>
2、配置application.yml
- server:
- port: 9991
- spring:
- application:
- name: kafka-demo
- kafka:
- bootstrap-servers: 192.168.200.130:9092
- producer:
- retries: 10
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- consumer:
- group-id: ${spring.application.name}-test
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
传递String类型的消息
3、controller实现消息发送接口
- package com.heima.kafkademo.controller;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- public class HelloController {
-
- @Autowired
- KafkaTemplate<String,String> kafkaTemplate;
-
- @GetMapping("/hello")
- public String addMessage(){
- kafkaTemplate.send("lakers-topic","湖人总冠军!");
- return "ok";
- }
- }
4、component中实现接收类HelloListener
- package com.heima.kafkademo.component;
-
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class HelloListener {
-
- @KafkaListener(topics = "lakers-topic")
- public void onMessage(String msg){
- System.out.println(msg);
- }
-
- }
5、测试
浏览器访问该接口并查看控制台
接收成功
传递对象类型的消息
思路:在传递消息时,将对象转为json字符串,在接收时再解析
1、controller实现发送
- package com.heima.kafkademo.controller;
-
- import com.alibaba.fastjson.JSON;
- import com.heima.kafkademo.model.User;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- public class HelloController {
-
- @Autowired
- KafkaTemplate<String,String> kafkaTemplate;
-
- @GetMapping("/hello")
- public String addMessage(){
- User user = new User();
- user.setName("勒布朗");
- user.setAge(37);
- user.setGender("男");
- user.setJob("NBA球员");
- kafkaTemplate.send("lakers-topic", JSON.toJSONString(user));
- return "ok";
- }
- }
2、component实现接收类
- package com.heima.kafkademo.component;
-
- import com.alibaba.fastjson.JSON;
- import com.heima.kafkademo.model.User;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class HelloListener {
-
- @KafkaListener(topics = "lakers-topic")
- public void onMessage(String msg){
- System.out.println(JSON.parseObject(msg, User.class));
- }
-
- }
3、打印测试结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。