当前位置:   article > 正文

【Flink】SpringBoot整合Flink并以集群方式运行,可以通过接口来动态创建执行任务,并行度可通过接口动态配置,可以和业务进行交互,灵活性极强,扩展性极高_springboot flink

springboot flink

查阅无数资料,爬了无数个坑!!!

整体思路:把SpringBoot当成一个任务放进Flink集群中运行,并且该任务会一直运行,当其他任务需要执行时只需要调用SpringBoot的接口来动态生成任务,可以把每一个接口都当成一个任务,调用接口时Flink会根据当前环境动态创建任务并执行

注意事项: 使用 ./flink run 命令以后台运行的方式去运行打好的jar包

一、引入以下依赖

  1. <properties>
  2. <maven.compiler.source>8</maven.compiler.source>
  3. <maven.compiler.target>8</maven.compiler.target>
  4. <java.version>1.8</java.version>
  5. <flink.version>1.13.0</flink.version>
  6. <scala.binary.version>2.12</scala.binary.version>
  7. <slf4j.version>1.7.30</slf4j.version>
  8. <mysql.version>5.1.47</mysql.version>
  9. <spring.boot.version>2.0.3.RELEASE</spring.boot.version>
  10. </properties>
  11. <parent>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-parent</artifactId>
  14. <version>2.1.1.RELEASE</version>
  15. <relativePath/> <!-- lookup parent from repository -->
  16. </parent>
  17. <dependencies>
  18. <dependency>
  19. <groupId>org.springframework.boot</groupId>
  20. <artifactId>spring-boot-starter-web</artifactId>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter-logging</artifactId>
  25. <exclusions>
  26. <exclusion>
  27. <groupId>*</groupId>
  28. <artifactId>*</artifactId>
  29. </exclusion>
  30. </exclusions>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.apache.flink</groupId>
  34. <artifactId>flink-java</artifactId>
  35. <version>${flink.version}</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.apache.flink</groupId>
  39. <artifactId>flink-streaming-java_2.11</artifactId>
  40. <version>${flink.version}</version>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.flink</groupId>
  44. <artifactId>flink-clients_2.11</artifactId>
  45. <version>${flink.version}</version>
  46. </dependency>
  47. <dependency>
  48. <groupId>org.apache.flink</groupId>
  49. <artifactId>flink-runtime-web_2.11</artifactId>
  50. <version>${flink.version}</version>
  51. </dependency>
  52. <dependency>
  53. <groupId>org.springframework.boot</groupId>
  54. <artifactId>spring-boot-starter</artifactId>
  55. <version>${spring.boot.version}</version>
  56. </dependency>
  57. <dependency>
  58. <groupId>org.apache.flink</groupId>
  59. <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
  60. <version>${flink.version}</version>
  61. </dependency>
  62. <dependency>
  63. <groupId>org.projectlombok</groupId>
  64. <artifactId>lombok</artifactId>
  65. <version>1.18.8</version>
  66. <scope>provided</scope>
  67. </dependency>
  68. <dependency>
  69. <groupId>mysql</groupId>
  70. <artifactId>mysql-connector-java</artifactId>
  71. <version>${mysql.version}</version>
  72. </dependency>
  73. </dependencies>
  74. <build>
  75. <finalName>flink</finalName>
  76. <plugins>
  77. <plugin>
  78. <groupId>org.apache.maven.plugins</groupId>
  79. <artifactId>maven-shade-plugin</artifactId>
  80. <version>3.2.4</version>
  81. <executions>
  82. <execution>
  83. <phase>package</phase>
  84. <goals>
  85. <goal>shade</goal>
  86. </goals>
  87. <configuration>
  88. <createDependencyReducedPom>false</createDependencyReducedPom>
  89. <artifactSet>
  90. <excludes>
  91. <exclude>com.google.code.findbugs:jsr305</exclude>
  92. <exclude>org.slf4j:*</exclude>
  93. <exclude>log4j:*</exclude>
  94. </excludes>
  95. </artifactSet>
  96. <filters>
  97. <filter>
  98. <artifact>*:*</artifact>
  99. <excludes>
  100. <exclude>module-info.class</exclude>
  101. <exclude>META-INF/*.SF</exclude>
  102. <exclude>META-INF/*.DSA</exclude>
  103. <exclude>META-INF/*.RSA</exclude>
  104. </excludes>
  105. </filter>
  106. </filters>
  107. <transformers>
  108. <transformer
  109. implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
  110. <resource>META-INF/spring.handlers</resource>
  111. <resource>reference.conf</resource>
  112. </transformer>
  113. <transformer
  114. implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
  115. <resource>META-INF/spring.factories</resource>
  116. </transformer>
  117. <transformer
  118. implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
  119. <resource>META-INF/spring.schemas</resource>
  120. </transformer>
  121. <transformer
  122. implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
  123. <transformer
  124. implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  125. <mainClass>cn.sdata.FlinkBootApplication</mainClass>
  126. </transformer>
  127. </transformers>
  128. </configuration>
  129. </execution>
  130. </executions>
  131. </plugin>
  132. </plugins>
  133. </build>

二、yml配置

  1. server:
  2. port: 10001

三、SpringBoot启动类

  1. @SpringBootApplication
  2. public class FlinkBootApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(FlinkBootApplication.class,args);
  5. //加一个死循环来保证main方法不会停止
  6. while (true) {
  7. Thread.sleep(30000);
  8. }
  9. }
  10. }

四、Controller层业务测试代码(一个流处理、一个批处理)

  1. @RequestMapping("test")
  2. @RestController
  3. @AllArgsConstructor
  4. public class TestController {
  5. @GetMapping("test1")
  6. public void test(Integer parallelism) throws Exception {
  7. //创建执行环境
  8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. //设置并行度
  10. env.setParallelism(parallelism);
  11. //读取文本流
  12. DataStreamSource<String> source = env.socketTextStream("192.168.1.200", 8888);
  13. SingleOutputStreamOperator<Tuple2<String, Integer>> operator = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  14. @Override
  15. public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
  16. String[] splits = s.split(" ");
  17. for (String split : splits) {
  18. collector.collect(new Tuple2<>(split, 1));
  19. }
  20. }
  21. }).filter(data -> StringUtils.isNotEmpty(data.f0)).keyBy(data -> data.f0).sum(1);
  22. //打印
  23. operator.print();
  24. //执行
  25. env.execute();
  26. }
  27. @GetMapping("test2")
  28. public void test2() throws Exception {
  29. //执行环境
  30. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  31. //设置并行度
  32. env.setParallelism(1);
  33. //将一列元素作为数据源
  34. DataStreamSource<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
  35. //控制台打印
  36. integerDataStream.print("int");
  37. //执行任务
  38. env.execute();
  39. }
  40. }

五、在你的服务器上开启一个nc端口

六、打包上传至你的JobManager所在的服务器

 七、进入Flink中的bin目录

八、后台方式运行jar包

 九、通过接口访问流处理方法并行度设置为4

 可以看到流处理方法所占用了4个插槽

十、通过接口访问批处理方法

可以看到该方法占用1个插槽

 稍等一会再次刷新页面

 发现该任务以执行完毕,插槽不再占用

总结:完成SpringBoot和Flink的高度整合,通过调用接口的方式来进行Flink任务的创建,SpringBoot在处理一些数据量比较大且计算量也比较大的业务场景时就可以通过Flink进行数据的处理入库,并且支持Flink集群节点可无限制扩充

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/563614
推荐阅读
相关标签
  

闽ICP备14008679号