当前位置:   article > 正文

kafka 集成整合外部插件(springboot,flume,flink,spark)_springboot 整合spark、flink

springboot 整合spark、flink

一 kafka集成springboot

1.工程结构

 2.pom文件

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>2.6.1</version>
  5. <relativePath/> <!-- lookup parent from repository -->
  6. </parent>
  7. <!--springboot 启动 -->
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter</artifactId>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.slf4j</groupId>
  14. <artifactId>slf4j-log4j12</artifactId>
  15. <version>1.7.21</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.springframework.boot</groupId>
  19. <artifactId>spring-boot-starter-web</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.kafka</groupId>
  23. <artifactId>spring-kafka</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.projectlombok</groupId>
  27. <artifactId>lombok</artifactId>
  28. <optional>true</optional>
  29. </dependency>

3.resouce配置文件

  1. spring.application.name=kf-demo
  2. # 指定 kafka 的地址
  3. spring.kafka.bootstrapservers=192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092
  4. #指定 keyvalue 的序列化器
  5. spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
  6. spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
  7. # =========消费者配置开始=========
  8. # 指定 kafka 的地址
  9. #spring.kafka.bootstrapservers=hadoop102:9092,hadoop103:9092,hadoop104:9092
  10. # 指定 keyvalue 的反序列化器
  11. spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserializer
  12. spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
  13. #指定消费者组的 group_id
  14. spring.kafka.consumer.group-id=test2

4.生产者

  1. package com.ljf.spring.boot.demo.producerspt;
  2. import com.ljf.spring.boot.demo.utils.DateUtils;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.kafka.core.KafkaTemplate;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import java.util.Date;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. /**
  11. * @ClassName: ProducerSpt
  12. * @Description: TODO
  13. * @Author: liujianfu
  14. * @Date: 2022/04/12 08:16:35
  15. * @Version: V1.0
  16. **/
  17. @RestController
  18. public class ProducerSpt {
  19. // Kafka 模板用来向 kafka 发送数据
  20. @Autowired
  21. KafkaTemplate<String, String> kafka;
  22. @RequestMapping("/send")
  23. public String data(String msg) {
  24. Map map= new HashMap<>();
  25. map.put("name","beijing");
  26. map.put("time", DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss"));
  27. map.put("msg",msg);
  28. kafka.send("kafka-ljf", map.toString());
  29. return "ok";
  30. }
  31. }

 5.配置消费者

  1. package com.ljf.spring.boot.demo.consumerspt;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. /**
  5. * @ClassName: KafkaConsumer
  6. * @Description: TODO
  7. * @Author: liujianfu
  8. * @Date: 2022/04/12 08:21:59
  9. * @Version: V1.0
  10. **/
  11. @Configuration
  12. public class KafkaConsumer {
  13. // 指定要监听的 topic
  14. @KafkaListener(topics = "kafka-ljf")
  15. public void consumeTopic(String msg) { // 参数: 收到的 value
  16. System.out.println("收到的信息: " + msg);
  17. }
  18. }

 6.启动zk,kafka集群,启动程序

 启动程序

 

7.测试

生产端:

 消费端:

二  kafka集成flume

1.作为生产者

 2.作为消费者

 具体代码实现见百度网盘pdf

 

三  kafka集成flink

1.作为生产者

 2.作为消费者

见百度网盘

 

四  集成spark

 资料见百度网盘

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/926249
推荐阅读
相关标签
  

闽ICP备14008679号