赞
踩
Application Mode将在YARN上启动一个Flink集群,其中Application jar的main()方法将在YARN中的JobManager上执行。应用程序完成后,群集将立即关闭。您可以使用yarn application-kill或取消Flink作业来手动停止集群。
由于所需的Flink jar和应用程序jar将由指定的远程位置接收,而不是由客户机发送到集群,因此上面的内容将允许作业提交变得格外轻量级。这样的话,我们就可以通过java 来进行应用程序的提交,针对于数据平台来讲比较友好。当然没有这个模式我们也可以提交,但是这种操作更加的简单便捷。
我们为什么要这样做的,在数据平台中,由于我司是2B业务,我们有一个实时数据处理平台,拖拉拽生成flink任务,那么这个时候部署的时候经常会有问题,例如交付的能力参差不齐,经常会出现一些意想不到的问题,还有环境有些不一致。这一些经常有一些问题。那么我们可以自己控制部署吗,例如哪些配置,让他们通过我们实时平台处理程序进行部署,这样也就不需要依赖flink client了,并且我们还可以在程序内部进行管控配置。极大的简化了交付工程师部署的辛苦。
当然最重要的是我们可以通过java springboot 就可以提交程序到yarn上面的,对使用者很友好,毕竟我们是拖拉拽的平台,必须要实现这个,不然拖拉拽完成了 给用户个jar包?让用户去上传jar包,这个根本不现实 不是吗。当然没有 这个flink 1.11新出的新模式,我们也实现了。我们现在急需的是配置的优化以及过程的优化。以我公司为例,阐述了一下,开发背景,接下来给大家讲解一下,怎么做呢?
**1、将下载下来的flink 1.11.0版本以上的 包解压 我们需要其中的 conf目录和lib目录 **
2、将其中的conf放到IDEA中的resources中
3、将core-site.xml、hdfs-site.xml、yarn-site.xml放到resources下(因为是demo,所以我们就直接放了,并没有自己去获取)
其中标红的,去maven自己找吧。
package com.bigdata; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; import java.util.concurrent.TimeUnit; /** * @author :qingzhi.wu * @date :2021/5/12 3:38 下午 */ public class WordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> ctx) throws Exception { String[] words = {"spark", "flink", "hadoop", "hdfs", "yarn"}; Random random = new Random(); while (true) { ctx.collect(words[random.nextInt(words.length)]); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { } }); source.print(); env.execute(); } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>test_flink_mode</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.11.2</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <hadoop.version>2.7.7</hadoop.version> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-yarn_${scala.binary.version}</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-common</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.3.1</version> </dependency> </dependencies> <build> <plugins> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.bigdata.WordCount</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> <pluginManagement> <plugins> <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --> <plugin> <groupId>org.eclipse.m2e</groupId> <artifactId>lifecycle-mapping</artifactId> <version>1.0.0</version> <configuration> <lifecycleMappingMetadata> <pluginExecutions> <pluginExecution> <pluginExecutionFilter> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <versionRange>[3.1.1,)</versionRange> <goals> <goal>shade</goal> </goals> </pluginExecutionFilter> <action> <ignore/> </action> </pluginExecution> <pluginExecution> <pluginExecutionFilter> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <versionRange>[3.1,)</versionRange> <goals> <goal>testCompile</goal> <goal>compile</goal> </goals> </pluginExecutionFilter> <action> <ignore/> </action> </pluginExecution> </pluginExecutions> </lifecycleMappingMetadata> </configuration> </plugin> </plugins> </pluginManagement> </build> </project>
package com.bigdata; import org.apache.flink.client.deployment.ClusterDeploymentException; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.configuration.*; import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterInformationRetriever; import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.configuration.YarnDeploymentTarget; import org.apache.flink.yarn.configuration.YarnLogConfigUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import java.util.Collections; import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES; /** * @author :qingzhi.wu * @date :2021/5/12 7:16 下午 */ public class App { public static void main(String[] args) { ///home/root/flink/lib/lib System.setProperty("HADOOP_USER_NAME","root"); String configurationDirectory = "/Users/wuqingzhi/java_work_space/test_flink_mode/src/main/resources/conf/"; org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); String flinkLibs = "hdfs://node01:9000/data/flink/libs"; String userJarPath = "hdfs://node01:9000/data/flink/user-lib/7.jar"; String flinkDistJar = "hdfs://node01:9000/data/flink/libs/flink-yarn_2.12-1.11.2.jar"; YarnClient yarnClient = YarnClient.createYarnClient(); YarnConfiguration yarnConfiguration = new YarnConfiguration(); yarnClient.init(yarnConfiguration); yarnClient.start(); YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever .create(yarnClient); //获取flink的配置 Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration( configurationDirectory); flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); flinkConfiguration.set( PipelineOptions.JARS, Collections.singletonList( userJarPath)); YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration,configurationDirectory); Path remoteLib = new Path(flinkLibs); flinkConfiguration.set( YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(remoteLib.toString())); flinkConfiguration.set( YarnConfigOptions.FLINK_DIST_JAR, flinkDistJar); //设置为application模式 flinkConfiguration.set( DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName()); //yarn application name flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobname"); //设置配置,可以设置很多 flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES)); flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES)); ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder() .createClusterSpecification(); // 设置用户jar的参数和主类 ApplicationConfiguration appConfig = new ApplicationConfiguration(args,"com.bigdata.WordCount"); YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor( flinkConfiguration, yarnConfiguration, yarnClient, clusterInformationRetriever, true); ClusterClientProvider<ApplicationId> clusterClientProvider = null; try { clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster( clusterSpecification, appConfig); } catch (ClusterDeploymentException e){ e.printStackTrace(); } ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient(); ApplicationId applicationId = clusterClient.getClusterId(); System.out.println(applicationId); } }
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YIx52fwS-1620963672506)(/Users/wuqingzhi/Library/Application Support/typora-user-images/image-20210514113805180.png)]
到此完成,其实可以进行很多优化的,但是毕竟不是生产代码,就简单的写写demo,谢谢大家的观看。
借鉴:
https://blog.csdn.net/zhangjun5965/article/details/107511615
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。