当前位置:   article > 正文

springboot集成flink并发布flink集群端运行_springboot flink

springboot flink

背景:近期项目需要,引入flink,研究了下flink,步步踩坑终于可以单独运行,也可发布到集群运行,记录下踩坑点。开发环境:idea+springboot(2.3.5.RELEASSE)+kafka(2.8.1)+mysql(8.0.26)。废话不多说,直接上可执行代码。

以下代码实现了某个时间间隔,设备不上传数据,判断为离线的逻辑

一、项目application创建

  1. /**
  2. * flink任务提交application
  3. *
  4. * @author wangfenglei
  5. */
  6. @SpringBootApplication(scanBasePackages = {"com.wfl.firefighting.flink","com.wfl.firefighting.util"})
  7. public class DataAnalysisFlinkApplication {
  8. public static void main(String[] args) {
  9. SpringApplication.run(DataAnalysisFlinkApplication.class, args);
  10. }
  11. }

二、设备状态计算主体,从kafka接收数据,然后通过KeyedProcessFunction函数进行计算,然后把离线设备输出到mysql sink,更新设备状态

  1. /**
  2. * 从kafka读取数据,计算设备状态为离线后写入mysql
  3. *
  4. * @author wangfenglei
  5. */
  6. @Component
  7. @ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false)
  8. public class DeviceDataKafkaSource {
  9. private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class);
  10. @Value("${spring.kafka.bootstrap-servers:localhost:9092}")
  11. private String kafkaServer;
  12. @Value("${spring.kafka.properties.sasl.jaas.config}")
  13. private String loginConfig;
  14. @Value("${customer.flink.cal-device-status-topic}")
  15. private String topic;
  16. @Autowired
  17. private ApplicationContext applicationContext;
  18. /**
  19. * 执行方法
  20. *
  21. * @throws Exception 异常
  22. */
  23. @PostConstruct
  24. public void execute() throws Exception {
  25. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  26. env.enableCheckpointing(5000);
  27. env.setParallelism(1);
  28. Properties properties = new Properties();
  29. //kafka的节点的IP或者hostName,多个使用逗号分隔
  30. properties.setProperty("bootstrap.servers", kafkaServer);
  31. //kafka的消费者的group.id
  32. properties.setProperty("group.id", "data-nanlysis-flink-devicestatus");
  33. //设置kafka安全认证机制为PLAIN
  34. properties.setProperty("sasl.mechanism", "PLAIN");
  35. //设置kafka安全认证协议为SASL_PLAINTEXT
  36. properties.setProperty("security.protocol", "SASL_PLAINTEXT");
  37. //设置kafka登录验证用户名和密码
  38. properties.setProperty("sasl.jaas.config", loginConfig);
  39. FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
  40. DataStream<String> stream = env.addSource(myConsumer);
  41. stream.print().setParallelism(1);
  42. DataStream<String> deviceStatus = stream
  43. //进行转换只获取设备序列码
  44. .map(data -> CommonConstant.GSON.fromJson(data, MsgData.class).getDevSn())
  45. //按照设备序列码分组
  46. .keyBy(new KeySelector<String, String>() {
  47. @Override
  48. public String getKey(String value) throws Exception {
  49. return value;
  50. }
  51. })
  52. //进行计算,判断周期内是否有新数据上传,没有则输出认为设备离线
  53. .process((CalDeviceOfflineFunction) applicationContext.getBean("calDeviceOfflineFunction"));
  54. //写入数据库
  55. deviceStatus.addSink((SinkFunction) applicationContext.getBean("deviceStatusSink"));
  56. //启动任务
  57. new Thread(() -> {
  58. try {
  59. env.execute("deviceStatusFlinkJob");
  60. } catch (Exception e) {
  61. log.error(e.toString(), e);
  62. }
  63. }).start();
  64. }
  65. }

说明:

1、通过@ConditionalOnProperty开关形式控制程序是否执行,后续此模块可以开发多个flink执行任务,通过开关的形式提交需要的job

2、通过springboot的@PostConstruct注解,让项目application启动时,自动执行job

3、用Thread线程执行任务提交,否则application启动时会一直flink执行中

4、日志打印,需要使用slf4j,跟flink自己依赖jar包打印日志保持一致,如此在flink集群执行时可以打印日志

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class);

5、kafka连接开启了登录验证,配置见application.yml。kafka登录验证server端配置见官网资料,后续有时间写个文章记录下

三、设备离线计算

  1. /**
  2. * KeyedProcessFunction 为每个设备序列码维护一个state,并且会把间隔时间内(事件时间)内没有更新的设备序列码输出:
  3. * 对于每条记录,CalDeviceOfflineFunction 修改最后的时间戳。
  4. * 该函数还会在间隔时间内调用回调(事件时间)。
  5. * 每次调用回调时,都会检查存储的最后修改时间与回调的事件时间时间戳,如果匹配则发送设备序列码(即在间隔时间内没有更新,表示没有设备数据上传)
  6. *
  7. * @author wangfenglei
  8. */
  9. @Component
  10. @ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false)
  11. public class CalDeviceOfflineFunction extends KeyedProcessFunction<String, String, String> {
  12. private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class);
  13. /**
  14. * 这个状态是通过 ProcessFunction维护
  15. */
  16. private ValueState<DeviceLastDataTimestamp> deviceState;
  17. /**
  18. * 定时任务执行时间
  19. */
  20. private ValueState<Long> timerState;
  21. @Autowired
  22. private DeviceService deviceService;
  23. @Override
  24. public void open(Configuration parameters) throws Exception {
  25. deviceState = getRuntimeContext().getState(new ValueStateDescriptor<>("deviceState", DeviceLastDataTimestamp.class));
  26. timerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timerState", Long.class));
  27. }
  28. /**
  29. * 每条数据执行过程
  30. *
  31. * @param value 输入数据
  32. * @param ctx 环境
  33. * @param out 输出数据
  34. * @throws Exception 异常
  35. */
  36. @Override
  37. public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
  38. log.info("++++++++++++++fink recevice deviceSn={}", value);
  39. // 查看当前计数
  40. DeviceLastDataTimestamp current = deviceState.value();
  41. if (current == null) {
  42. current = new DeviceLastDataTimestamp();
  43. current.key = value;
  44. current.lastDataTime = ctx.timestamp();
  45. }
  46. Long currentTimerState = timerState.value();
  47. if (null == currentTimerState) {
  48. //初始值设置为-1
  49. timerState.update(-1L);
  50. }
  51. if (-1 != timerState.value()) {
  52. //删除原先定时任务,然后重新注册新的定时任务
  53. ctx.timerService().deleteProcessingTimeTimer(timerState.value());
  54. }
  55. long interval = deviceService.getDeviceOfflineInterval(value);
  56. // 设置状态的时间戳为记录的事件时间时间戳
  57. current.lastDataTime = ctx.timestamp();
  58. //设置判断离线时间间隔
  59. current.interval = interval;
  60. // 状态回写
  61. deviceState.update(current);
  62. //更新定时任务执行时间
  63. timerState.update(current.lastDataTime + interval);
  64. //注册新的定时任务
  65. ctx.timerService().registerProcessingTimeTimer(current.lastDataTime + interval);
  66. }
  67. /**
  68. * 定时器触发后执行的方法
  69. *
  70. * @param timestamp 这个时间戳代表的是该定时器的触发时间
  71. * @param ctx 定时器环境类
  72. * @param out 输出
  73. * @throws Exception 异常
  74. */
  75. @Override
  76. public void onTimer(
  77. long timestamp,
  78. OnTimerContext ctx,
  79. Collector<String> out) throws Exception {
  80. // 取得该设备状态的State状态
  81. DeviceLastDataTimestamp result = deviceState.value();
  82. // timestamp是定时器触发时间,如果等于最后一次更新时间+离线间隔时间,就表示这十秒内没有收到过该设备报文了
  83. if (timestamp == result.lastDataTime + result.interval) {
  84. // 发送
  85. out.collect(result.key);
  86. // 打印数据,用于核对是否符合预期
  87. log.info("==================" + result.key + " is offline");
  88. }
  89. }
  90. /**
  91. * 设备最后上传数据时间戳数据类
  92. */
  93. class DeviceLastDataTimestamp {
  94. public String key;
  95. public long lastDataTime;
  96. public long interval;
  97. }
  98. }

四、 更新设备离线状态

  1. /**
  2. * 向mysql写入数据
  3. *
  4. * @author wangfenglei
  5. */
  6. @Component
  7. @ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false)
  8. public class DeviceStatusSink extends RichSinkFunction<String> {
  9. private static final Logger log = LoggerFactory.getLogger(DeviceStatusSink.class);
  10. @Value("${spring.datasource.dynamic.datasource.master.url}")
  11. private String datasoureUrl;
  12. @Value("${spring.datasource.dynamic.datasource.master.username}")
  13. private String userName;
  14. @Value("${spring.datasource.dynamic.datasource.master.password}")
  15. private String password;
  16. @Value("${spring.datasource.dynamic.datasource.master.driver-class-name}")
  17. private String driverClass;
  18. private Connection conn = null;
  19. private PreparedStatement ps = null;
  20. @Override
  21. public void open(Configuration parameters) throws Exception {
  22. //加载驱动,开启连接
  23. try {
  24. Class.forName(driverClass);
  25. conn = DriverManager.getConnection(datasoureUrl, userName, password);
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. @Override
  31. public void invoke(String deviceSn, Context context) {
  32. try {
  33. String sql = "update biz_device t set t.status=2 where t.dev_sn=?";
  34. ps = conn.prepareStatement(sql);
  35. ps.setString(1, deviceSn);
  36. ps.executeUpdate();
  37. log.info("update biz_device t set t.status=2 where t.dev_sn={}", deviceSn);
  38. } catch (Exception e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. /**
  43. * 结束任务,关闭连接
  44. *
  45. * @throws Exception
  46. */
  47. @Override
  48. public void close() throws Exception {
  49. if (conn != null) {
  50. conn.close();
  51. }
  52. if (ps != null) {
  53. ps.close();
  54. }
  55. }
  56. }

五、application.yml配置

  1. server:
  2. port: 8099
  3. spring:
  4. autoconfigure:
  5. exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
  6. datasource:
  7. druid:
  8. stat-view-servlet:
  9. enabled: true
  10. loginUsername: admin
  11. loginPassword: 123456
  12. allow:
  13. web-stat-filter:
  14. enabled: true
  15. dynamic:
  16. druid: # 全局druid参数,绝大部分值和默认保持一致。(现已支持的参数如下,不清楚含义不要乱设置)
  17. # 连接池的配置信息
  18. # 初始化大小,最小,最大
  19. initial-size: 5
  20. min-idle: 5
  21. maxActive: 20
  22. # 配置获取连接等待超时的时间
  23. maxWait: 60000
  24. # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
  25. timeBetweenEvictionRunsMillis: 60000
  26. # 配置一个连接在池中最小生存的时间,单位是毫秒
  27. minEvictableIdleTimeMillis: 300000
  28. validationQuery: SELECT 1 FROM DUAL
  29. testWhileIdle: true
  30. testOnBorrow: false
  31. testOnReturn: false
  32. # 打开PSCache,并且指定每个连接上PSCache的大小
  33. poolPreparedStatements: true
  34. maxPoolPreparedStatementPerConnectionSize: 20
  35. # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
  36. filters: stat,wall,slf4j
  37. # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
  38. connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
  39. datasource:
  40. master:
  41. url: jdbc:mysql://127.0.0.1:3306/fire?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
  42. username: root
  43. password: root
  44. driver-class-name: com.mysql.cj.jdbc.Driver
  45. kafka:
  46. bootstrap-servers: 127.0.0.1:9092 # 指定kafka 代理地址,可以多个
  47. producer: # 生产者
  48. retries: 1 # 设置大于0的值,则客户端会将发送失败的记录重新发送
  49. # 每次批量发送消息的数量
  50. batch-size: 16384
  51. buffer-memory: 33554432
  52. # 指定消息key和消息体的编解码方式
  53. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  54. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  55. #修改最大向kafka推送消息大小
  56. properties:
  57. max.request.size: 52428800
  58. consumer:
  59. group-id: data-analysis-flink
  60. #手动提交offset保证数据一定被消费
  61. enable-auto-commit: false
  62. #指定从最近地方开始消费(earliest)
  63. auto-offset-reset: latest
  64. #消费者组
  65. #group-id: dev
  66. properties:
  67. #服务端没有收到心跳超时时间,设置长点以防调试时超时
  68. session:
  69. timeout:
  70. ms: 60000
  71. heartbeat:
  72. interval:
  73. ms: 30000
  74. security:
  75. protocol: SASL_PLAINTEXT
  76. sasl:
  77. mechanism: PLAIN
  78. jaas:
  79. config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="root" password="root";'
  80. #自定义配置
  81. customer:
  82. #flink相关配置
  83. flink:
  84. #是否开启设置状态计算
  85. cal-device-status: true
  86. cal-device-status-topic: device-upload-data

六、pom.xml文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>com.wfl.firefighting</groupId>
  7. <artifactId>data-analysis</artifactId>
  8. <version>1.0.0</version>
  9. </parent>
  10. <groupId>com.wfl.firefighting</groupId>
  11. <artifactId>data-analysis-flink</artifactId>
  12. <version>1.0.0</version>
  13. <packaging>jar</packaging>
  14. <dependencies>
  15. <dependency>
  16. <groupId>org.springframework.boot</groupId>
  17. <artifactId>spring-boot-starter-web</artifactId>
  18. <exclusions>
  19. <exclusion>
  20. <groupId>org.springframework.boot</groupId>
  21. <artifactId>spring-boot-starter-logging</artifactId>
  22. </exclusion>
  23. </exclusions>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-test</artifactId>
  28. <scope>test</scope>
  29. </dependency>
  30. <dependency>
  31. <groupId>com.wfl.firefighting</groupId>
  32. <artifactId>data-analysis-service</artifactId>
  33. <version>1.0.0</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>com.wfl.firefighting</groupId>
  37. <artifactId>data-analysis-model</artifactId>
  38. <version>1.0.0</version>
  39. </dependency>
  40. <dependency>
  41. <groupId>io.github.openfeign</groupId>
  42. <artifactId>feign-httpclient</artifactId>
  43. <version>10.10.1</version>
  44. </dependency>
  45. <!-- druid -->
  46. <dependency>
  47. <groupId>com.alibaba</groupId>
  48. <artifactId>druid-spring-boot-starter</artifactId>
  49. <version>1.1.22</version>
  50. </dependency>
  51. <!-- 动态数据源 -->
  52. <dependency>
  53. <groupId>com.baomidou</groupId>
  54. <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
  55. <version>2.5.4</version>
  56. </dependency>
  57. <!--mysql-->
  58. <dependency>
  59. <groupId>mysql</groupId>
  60. <artifactId>mysql-connector-java</artifactId>
  61. <version>8.0.20</version>
  62. </dependency>
  63. <!-- flink依赖引入 开始-->
  64. <dependency>
  65. <groupId>org.apache.flink</groupId>
  66. <artifactId>flink-java</artifactId>
  67. <version>1.13.1</version>
  68. </dependency>
  69. <dependency>
  70. <groupId>org.apache.flink</groupId>
  71. <artifactId>flink-streaming-java_2.11</artifactId>
  72. <version>1.13.1</version>
  73. </dependency>
  74. <dependency>
  75. <groupId>org.apache.flink</groupId>
  76. <artifactId>flink-clients_2.11</artifactId>
  77. <version>1.13.1</version>
  78. </dependency>
  79. <!-- flink连接kafka -->
  80. <dependency>
  81. <groupId>org.apache.flink</groupId>
  82. <artifactId>flink-connector-kafka_2.11</artifactId>
  83. <version>1.13.1</version>
  84. </dependency>
  85. <!-- flink连接es-->
  86. <dependency>
  87. <groupId>org.apache.flink</groupId>
  88. <artifactId>flink-json</artifactId>
  89. <version>1.13.1</version>
  90. </dependency>
  91. <!-- flink连接mysql-->
  92. <dependency>
  93. <groupId>org.apache.flink</groupId>
  94. <artifactId>flink-jdbc_2.11</artifactId>
  95. <version>1.10.0</version>
  96. </dependency>
  97. <!-- flink依赖引入 结束-->
  98. </dependencies>
  99. <build>
  100. <finalName>data-analysis-flink</finalName>
  101. <plugins>
  102. <plugin>
  103. <groupId>org.apache.maven.plugins</groupId>
  104. <artifactId>maven-shade-plugin</artifactId>
  105. <version>3.2.4</version>
  106. <executions>
  107. <execution>
  108. <phase>package</phase>
  109. <goals>
  110. <goal>shade</goal>
  111. </goals>
  112. <configuration>
  113. <createDependencyReducedPom>false</createDependencyReducedPom>
  114. <artifactSet>
  115. <excludes>
  116. <exclude>com.google.code.findbugs:jsr305</exclude>
  117. <exclude>org.slf4j:*</exclude>
  118. <exclude>log4j:*</exclude>
  119. </excludes>
  120. </artifactSet>
  121. <filters>
  122. <filter>
  123. <artifact>*:*</artifact>
  124. <excludes>
  125. <exclude>module-info.class</exclude>
  126. <exclude>META-INF/*.SF</exclude>
  127. <exclude>META-INF/*.DSA</exclude>
  128. <exclude>META-INF/*.RSA</exclude>
  129. </excludes>
  130. </filter>
  131. </filters>
  132. <transformers>
  133. <transformer
  134. implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
  135. <resource>META-INF/spring.handlers</resource>
  136. <resource>reference.conf</resource>
  137. </transformer>
  138. <transformer
  139. implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
  140. <resource>META-INF/spring.factories</resource>
  141. </transformer>
  142. <transformer
  143. implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
  144. <resource>META-INF/spring.schemas</resource>
  145. </transformer>
  146. <transformer
  147. implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
  148. <transformer
  149. implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  150. <mainClass>com.wfl.firefighting.flink.DataAnalysisFlinkApplication</mainClass>
  151. </transformer>
  152. </transformers>
  153. </configuration>
  154. </execution>
  155. </executions>
  156. </plugin>
  157. </plugins>
  158. </build>
  159. </project>

说明: 

1、如果使用local执行方式,不需要提交到flink服务端执行job,可以使用spring-boot-maven-plugin,直接java -jar执行即可,如下:

<build>
    <finalName>data-analysis-flink</finalName>
    <plugins>
         <plugin>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-maven-plugin</artifactId>
             <!-- 指定启动入口 -->
             <configuration>
                 <mainClass>com.wfl.firefighting.flink.DataAnalysisFlinkApplication</mainClass>
             </configuration>
             <executions>
                 <execution>
                     <goals>
                         <!--可以把依赖的包都打包到生成的Jar包中-->
                         <goal>repackage</goal>
                     </goals>
                 </execution>
             </executions>
         </plugin>
     </plugins>
 </build>

使用spring-boot-maven-plugin打的jar包,提交到flink集群端执行,会报错,提示找不到类,因为springboot默认打包BOOT-INF目录,flink服务端执行会提示找不到类。使用maven-shade-plugin打包,既可以用java -jar执行,也可以提交到flink服务端执行。

2、maven-shade-plugin打的jar包,如果提交到服务端执行,需要去掉springboot默认集成的logback,否则服务端执行报错,提示Caused by: java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath,如下:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
   <exclusions>
      <exclusion>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-logging</artifactId>
      </exclusion>
   </exclusions>
</dependency>

如果本地执行java -jar形式,需要在build的中注释掉以下内容,否则启动报错提示:java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory

<!--<exclude>org.slf4j:*</exclude>-->
<!--<exclude>log4j:*</exclude>-->

3、使用maven-shade-plugin打包,必须添加如下,否则提示Cannot find 'resource' in class org.apache.maven.plugins.shade.resource.ManifestResourceTransformer

<transformers>
      <transformer
            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
         <resource>META-INF/spring.handlers</resource>
         <resource>reference.conf</resource>
      </transformer>
      <transformer
            implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
         <resource>META-INF/spring.factories</resource>
      </transformer>
      <transformer
            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
         <resource>META-INF/spring.schemas</resource>
      </transformer>
      <transformer
            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
      <transformer
            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
         <mainClass>com.wfl.firefighting.flink.DataAnalysisFlinkApplication</mainClass>
      </transformer>
</transformers>

七、执行效果:

1、本地执行

2、提交到flink集群执行 

八、其他踩坑点

1、报错提示:The RemoteEnvironment cannot be instantiated when running in a pre-defined context

解决方法:将StreamExecutionEnvironment修改为getExecutionEnvironment,获取当前执行环境

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2、报错提示:  Insufficient number of network buffers: required 65, but only 38 available. The total number of network buffers is currently set to 2048 of 32768 bytes each.

解决办法:env.setParallelism(1)

env.setParallelism(1);

3、报错提示: Caused by: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'

解决办法: flink 配置文件里 flink-conf.yaml设置
classloader.check-leaked-classloader: false

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/894076
推荐阅读
相关标签
  

闽ICP备14008679号