当前位置:   article > 正文

Spring Boot整合Flink_springboot整合flink

springboot整合flink

使用spring boot整合flink可以快速的构建起整个应用,将关注点重点放在业务逻辑的实现上。在整合的过程中遇到许多问题,最大的问题是flink流无法访问spring容器中的类,从而导致空指针异常,解决思路是在流中进行spring bean的初始化以获得ApplicationContext,进而使用其getBean方法获取类实例。

软件版本:Spring Boot 2.1.6+Flink1.6.1+JDK1.8

程序主体:

  1. @SpringBootApplication
  2. public class HadesTmsApplication implements CommandLineRunner {
  3. public static void main(String[] args) {
  4. SpringApplication application = new SpringApplication(HadesTmsApplication.class);
  5. application.setBannerMode(Banner.Mode.OFF);
  6. application.run(args);
  7. }
  8. @Override
  9. public void run(String... args) {
  10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. FlinkKafkaConsumer010 kafkaConsumer = new FlinkKafkaConsumer010<>("topic-name"), new SimpleStringSchema(), getProperties());
  12. DataStream<String> dataStream = env.addSource(kafkaConsumer);
  13. // 此处省略处理逻辑
  14. dataStream.addSink(new MySink());
  15. }
  16. private Properties getProperties() {
  17. Properties properties = new Properties();
  18. properties.setProperty("bootstrap.servers", bootstrap_servers);
  19. properties.setProperty("zookeeper.connect", zookeeper_connect);
  20. properties.setProperty("group.id", group_id);
  21. properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  22. properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  23. return properties;
  24. }
  25. }

说明一下:因为是非web项目,所以实现CommandLineRunner接口,重写run方法。在里面编写流处理逻辑。

如果在MySink中需要使用spring容器中的类,而MySink是一个普通的类,那么是无法访问到的。会引发空指针异常。可能有人想到了ApplicationContextAware这个接口,实现这个接口获取ApplicationContext,也即是:

  1. @Component
  2. public class ApplicationContextUtil implements ApplicationContextAware, Serializable {
  3. private static final long serialVersionUID = -6454872090519042646L;
  4. private static ApplicationContext applicationContext = null;
  5. @Override
  6. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  7. if (ApplicationContextUtil.applicationContext == null) {
  8. ApplicationContextUtil.applicationContext = applicationContext;
  9. }
  10. }
  11. public static ApplicationContext getApplicationContext() {
  12. return applicationContext;
  13. }
  14. //通过name获取 Bean.
  15. public static Object getBean(String name) {
  16. return getApplicationContext().getBean(name);
  17. }
  18. //通过class获取Bean.
  19. public static <T> T getBean(Class<T> clazz) {
  20. return getApplicationContext().getBean(clazz);
  21. }
  22. //通过name,以及Clazz返回指定的Bean
  23. public static <T> T getBean(String name, Class<T> clazz) {
  24. return getApplicationContext().getBean(name, clazz);
  25. }
  26. }

这种做法实际上在flink流处理中也是不可行的,在我之前的flink文章中 Flink读写系列之-读mysql并写入mysql 其中读和写阶段有一个open方法,这个方法专门用于进行初始化的,那么我们可以在这里进行spring bean的初始化。那么MySink改造后即为:

  1. @EnableAutoConfiguration
  2. @MapperScan(basePackages = {"com.xxx.bigdata.xxx.mapper"})
  3. public class SimpleSink extends RichSinkFunction<String> {
  4. TeacherInfoMapper teacherInfoMapper;
  5. @Override
  6. public void open(Configuration parameters) throws Exception {
  7. super.open(parameters);
  8. SpringApplication application = new SpringApplication(SimpleSink.class);
  9. application.setBannerMode(Banner.Mode.OFF);
  10. ApplicationContext context = application.run(new String[]{});
  11. teacherInfoMapper = context.getBean(TeacherInfoMapper.class);
  12. }
  13. @Override
  14. public void close() throws Exception {
  15. super.close();
  16. }
  17. @Override
  18. public void invoke(String value, Context context) throws Exception {
  19. List<TeacherInfo> teacherInfoList = teacherInfoMapper.selectByPage(0, 100);
  20. teacherInfoList.stream().forEach(teacherInfo -> System.out.println("teacherinfo:" + teacherInfo.getTeacherId() + "," + teacherInfo.getTimeBit() + "," + teacherInfo.getWeek()));
  21. }
  22. }

在invoke中就可以访问spring容器中的Mapper方法了。

 pom如下:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.1.6.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.xxx.bigdata</groupId>
  12. <artifactId>flink-project</artifactId>
  13. <version>1.0.0</version>
  14. <name>flink-project</name>
  15. <packaging>jar</packaging>
  16. <description>My project for Spring Boot</description>
  17. <properties>
  18. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  19. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  20. <java.version>1.8</java.version>
  21. <flink.version>1.6.1</flink.version>
  22. <skipTests>true</skipTests>
  23. <maven.compiler.source>1.8</maven.compiler.source>
  24. <maven.compiler.target>1.8</maven.compiler.target>
  25. </properties>
  26. <dependencies>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter</artifactId>
  30. <exclusions>
  31. <exclusion>
  32. <groupId>ch.qos.logback</groupId>
  33. <artifactId>logback-classic</artifactId>
  34. </exclusion>
  35. </exclusions>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.apache.flink</groupId>
  39. <artifactId>flink-java</artifactId>
  40. <version>${flink.version}</version>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.flink</groupId>
  44. <artifactId>flink-streaming-java_2.11</artifactId>
  45. <version>${flink.version}</version>
  46. </dependency>
  47. <dependency>
  48. <groupId>org.apache.flink</groupId>
  49. <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
  50. <version>${flink.version}</version>
  51. </dependency>
  52. <dependency>
  53. <groupId>com.cloudera</groupId>
  54. <artifactId>ImpalaJDBC41</artifactId>
  55. <version>2.6.4</version>
  56. </dependency>
  57. <dependency>
  58. <groupId>com.zaxxer</groupId>
  59. <artifactId>HikariCP</artifactId>
  60. <version>3.2.0</version>
  61. </dependency>
  62. <dependency>
  63. <groupId>org.mybatis.spring.boot</groupId>
  64. <artifactId>mybatis-spring-boot-starter</artifactId>
  65. <version>1.3.1</version>
  66. </dependency>
  67. <dependency>
  68. <groupId>com.alibaba</groupId>
  69. <artifactId>fastjson</artifactId>
  70. <version>1.2.47</version>
  71. </dependency>
  72. <dependency>
  73. <groupId>org.projectlombok</groupId>
  74. <artifactId>lombok</artifactId>
  75. <optional>true</optional>
  76. </dependency>
  77. <dependency>
  78. <groupId>org.springframework.boot</groupId>
  79. <artifactId>spring-boot-starter-test</artifactId>
  80. <scope>test</scope>
  81. </dependency>
  82. </dependencies>
  83. <build>
  84. <sourceDirectory>src/main/java</sourceDirectory>
  85. <resources>
  86. <resource>
  87. <directory>src/main/resources</directory>
  88. <filtering>true</filtering>
  89. <includes>
  90. <include>application.properties</include>
  91. <include>application-${package.environment}.properties</include>
  92. </includes>
  93. </resource>
  94. </resources>
  95. <plugins>
  96. <plugin>
  97. <groupId>org.springframework.boot</groupId>
  98. <artifactId>spring-boot-maven-plugin</artifactId>
  99. <configuration>
  100. <skip>true</skip>
  101. <mainClass>com.xxx.bigdata.xxx.Application</mainClass>
  102. </configuration>
  103. <executions>
  104. <execution>
  105. <goals>
  106. <goal>repackage</goal>
  107. </goals>
  108. </execution>
  109. </executions>
  110. </plugin>
  111. <!--mybatis plugin to generate mapping file and class-->
  112. <plugin>
  113. <groupId>org.mybatis.generator</groupId>
  114. <artifactId>mybatis-generator-maven-plugin</artifactId>
  115. <version>1.3.5</version>
  116. <configuration>
  117. <configurationFile>${basedir}/src/main/resources/generatorConfig.xml</configurationFile>
  118. <overwrite>true</overwrite>
  119. <verbose>true</verbose>
  120. </configuration>
  121. <dependencies>
  122. <dependency>
  123. <groupId>com.cloudera</groupId>
  124. <artifactId>ImpalaJDBC41</artifactId>
  125. <version>2.6.4</version>
  126. </dependency>
  127. </dependencies>
  128. </plugin>
  129. </plugins>
  130. </build>
  131. <profiles>
  132. <!--开发环境-->
  133. <profile>
  134. <id>dev</id>
  135. <properties>
  136. <package.environment>dev</package.environment>
  137. </properties>
  138. <!--默认环境-->
  139. <activation>
  140. <activeByDefault>true</activeByDefault>
  141. </activation>
  142. </profile>
  143. <!--预发布环境-->
  144. <profile>
  145. <id>pre</id>
  146. <properties>
  147. <package.environment>pre</package.environment>
  148. </properties>
  149. </profile>
  150. <!--生产环境-->
  151. <profile>
  152. <id>pro</id>
  153. <properties>
  154. <package.environment>pro</package.environment>
  155. </properties>
  156. </profile>
  157. </profiles>
  158. </project>

项目打包使用了默认的spring boot插件,配置了skip为true,如果不配置此项,打包后会多一个BOOT-INF目录,运行时会引起ClassNotFoundException等各种异常,比如KafkaStreming问题,甚至需要反转flink的类加载机制,由child-first变为parent-first(修改flink配置文件)等等。

遇到的问题:

1.  java.lang.NoSuchMethodError: com.google.gson.GsonBuilder.setLenient()Lcom/google/gson/GsonBuilder

 GsonBuilder类来自gson-xxx.jar包,而我在自己的项目中执行mvn dependency:tree并没有发现依赖这个包。莫非在flink运行时会使用自己lib库下的gson包,转而去flink的lib库下,发现flink-dist_2.11-1.6.1.jar里包含了gson-xxx包,但是打开这个包一看类中没有setLenient方法,于是在服务器上建立一个commlib,把gson-2.8.0.jar(包含setLenient方法)放进去,然后使用flink run提交时,指定classpath即可。

  2.日志冲突

Caused by: java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.slf4j.impl.Log4jLoggerFactory loaded from file:/opt/flink-1.6.1/lib/slf4j-log4j12-1.7.7.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml: org.slf4j.impl.Log4jLoggerFactory

排除springboot中的日志即可:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter</artifactId>
  4. <exclusions>
  5. <exclusion>
  6. <groupId>ch.qos.logback</groupId>
  7. <artifactId>logback-classic</artifactId>
  8. </exclusion>
  9. </exclusions>
  10. </dependency>

3.flink run提交作业到yarn上时,如果需要指定classpath,则需要指定到确定的jar包,指定目录不可行。那么假如所有依赖包已经放置在目录中,拼接的shell可以这么写:

  1. lib_classpath="";
  2. for jar in `ls /home/hadoop/lib`
  3. do
  4. jar_suffix=${jar##*.}
  5. if [ "$jar_suffix" = "jar" ]
  6. then
  7. jar_path=" --classpath file:///home/hadoop/lib/$jar "
  8. lib_classpath=${lib_classpath}${jar_path}
  9. else
  10. echo "the jar file $jar it not legal jar file,skip appendig"
  11. fi
  12. done

拼接后的lib_classpath值如下效果:

--classpath file:///home/hadoop/lib/accessors-smart-1.2.jar  --classpath file:///home/hadoop/lib/akka-actor_2.11-2.4.20.jar 

注意:如果jar包放本地文件系统,那么需要每台机器都放一份。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/563616
推荐阅读
相关标签
  

闽ICP备14008679号