赞
踩
//stream api和table api
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.14.2</version>
<!-- provided不会打包到jar -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.14.2</version>
<scope>provided</scope>
</dependency>
@EnableAutoConfiguration
public class ChildApplication {
}
public interface Task {
void run(String... args) throws Exception;
}
@Slf4j public abstract class AbstractTask implements Task { @Override public void run(String... args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //解析spring参数 DefaultApplicationArguments arguments = new DefaultApplicationArguments(args); //解析flink参数 ParameterTool parameterTool = ParameterTool.fromArgs(args); //合并两种参数 Configuration configuration = new Configuration(); Map<String, String> map = parameterTool.toMap(); for (Map.Entry<String, String> entry : map.entrySet()) { if (Objects.equals(entry.getValue(), "__NO_VALUE_KEY")) { continue; } configuration.setString(entry.getKey(), entry.getValue()); } Set<String> optionNames = arguments.getOptionNames(); for (String optionName : optionNames) { List<String> optionValues = arguments.getOptionValues(optionName); if (CollectionUtils.isEmpty(optionValues)) { continue; } configuration.setString(optionName, String.join(",", optionValues)); } //设置全局参数 env.getConfig().setGlobalJobParameters(configuration); //配置任务 configTask(env, parameterTool); //提交任务 JobClient jobClient = env.executeAsync(getClass().getName()); if (jobClient instanceof WebSubmissionJobClient) { return; } jobClient.getJobExecutionResult() .whenComplete(new BiConsumer<JobExecutionResult, Throwable>() { @Override public void accept(JobExecutionResult jobExecutionResult, Throwable throwable) { log.error("time {}", jobExecutionResult.getNetRuntime(TimeUnit.SECONDS)); } }); } public abstract void configTask(StreamExecutionEnvironment env, ParameterTool tool); }
@Slf4j @Service public class TaskManager implements CommandLineRunner { @Resource List<Task> taskList; @Override public void run(String... args) throws Exception { ParameterTool parameterTool = ParameterTool.fromArgs(args); log.info("程序参数 {}", parameterTool); String runTaskName = parameterTool.get("task"); if (CollectionUtils.isEmpty(taskList) || StringUtils.isBlank(runTaskName)) { return; } for (Task task : taskList) { if (Objects.equals(runTaskName, task.getClass().getName())) { task.run(args); } } } }
@Slf4j @Service public class TimeSource extends RichSourceFunction<Date> { volatile boolean running = true; private JdbcTemplate jdbcTemplate; @Override public void open(Configuration parameters) throws Exception { //创建一个容器,并拿到需要的bean List<String> args = new LinkedList<>(); args.add(String.format("--spring.application.admin.jmx-name=org.springframework.boot:type=Admin,name=%s", cls.getName() + UUID.randomUUID())); args.add(String.format("--spring.jmx.default-domain=%s", cls.getName() + UUID.randomUUID())); Configuration globalJobParameters = (Configuration) runtimeContext.getExecutionConfig().getGlobalJobParameters(); String activeKey = "spring.profiles.active"; String active = globalJobParameters.getString(ConfigOptions.key(activeKey).stringType().noDefaultValue()); if (StringUtils.isNotEmpty(active)) { args.add(String.format("--%s=%s", activeKey, active)); } ConfigurableApplicationContext applicationContext = SpringApplication.run(ChildApplication.class, args.toArray(new String[0])); jdbcTemplate = applicationContext.getBean(JdbcTemplate.class); } @Override public void run(SourceContext<Date> ctx) throws Exception { while (running) { Date date = DataAccessUtils.uniqueResult(jdbcTemplate.queryForList("select now()", Date.class)); ctx.collect(date); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { running = false; } }
写这个数据源类花了很长时间,期间报了很多错,一直不符合预期:
@Slf4j @Service public class TimeTask extends AbstractTask { @Resource private TimeSource timeSource; @Override public void configTask(StreamExecutionEnvironment env, ParameterTool tool) { env.getConfig().setAutoWatermarkInterval(0); env.addSource(timeSource) .setParallelism(1) .print() .setParallelism(1); } }
@SpringBootApplication
public class Demo2Application {
public static void main(String[] args) {
SpringApplication.run(Demo2Application.class, args);
}
}
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
因为spring-boot-maven-plugin打包区分了main-class和start-class,打包之后main-class是org.springframework.boot.loader.JarLauncher引导类,上传到flink web执行报错。
参考SpringBoot超详细讲解集成Flink的部署与打包方法的方法二写了一版:
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.3.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>module-info.class</exclude> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> <resource>reference.conf</resource> </transformer> <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> <resource>META-INF/spring.factories</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>${start-class}</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin>
结果报错:
Cannot find ‘resource’ in class org.apache.maven.plugins.shade.resource.ServicesResourceTransformer
纠结了半天,也没找到原因
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <archive> <manifest> <mainClass>${start-class}</mainClass> </manifest> </archive> <!-- 打包依赖 --> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
可以正常打包,本地也能运行,但是上传到flink web报错
LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/opt/flink/lib/log4j-slf4j-impl-2.16.0.jar)
很明显,日志相关的jar冲突了。那么问题就是怎么配置maven-assembly-plugin,打包的时候移出org.apache.logging.log4j或ch.qos.logback?这个也比较困难,需要自定义assembly.xml文件,相对来说成本比较大。
找到很多资料,包括flink官方的maven打包方式也是用maven-shade-plugin,所以决定还是使用maven-shade-plugin。
那怎么解决Cannot find 'resource' in class org.apache.maven.plugins.shade.resource.ServicesResourceTransformer
的问题呢?
恰好最近在看maven pom文件的相关知识,不小心打开了spring-boot-starter-parent
的pluginManagement
,发现里面定义很多插件,其中就包括maven-shade-plugin
。
按照pom依赖的逻辑,只要在build->plugins声明maven-shade-plugin就行:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
mvn clean package
打包成功了!
仔细翻看spring-boot-starter-parent声明的maven-shade-plugin,发现executions->execution->configuration->transformers的内容在spring-boot的不同版本是不同的。难怪找不到resource。
后续打包上传到flink web,也是报日志相关的jar冲突,不过maven-shade-plugin打包排除依赖比maven-assembly-plugin简单多了。由于flink运行时包含/opt/flink/lib/log4j-slf4j-impl-2.16.0.jar,所以果断排除logback,完整plugin配置如下:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<artifactSet>
<excludes>
<!-- 参考flink官网也把这个排除 -->
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>ch.qos.logback:*</exclude>
</excludes>
</artifactSet>
</configuration>
</plugin>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。