赞
踩
开发环境主要指本地电脑代码开发环境
开发环境的准备
具备了开发所依赖的各类环境后,即可进行Flink的开发工作
创建一个maven工程
pom.xml文件:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zenitera.bigdata</groupId> <artifactId>flink-0309</artifactId> <version>1.0-SNAPSHOT</version> <properties> <flink.version>1.13.1</flink.version> <scala.binary.version>2.12</scala.binary.version> <slf4j.version>1.7.30</slf4j.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.14.0</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers combine.children="append"> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
其中<properties> </properties>中的版本可自定义对应需要的开发Flink版本即可
- 1
log4j.properties文件内容
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
创建一个批任务示例:Flink01_Batch_Wordcount
Flink也可以实现批数据的处理
package com.wangting.test01; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class Flink01_Batch_Wordcount { public static void main(String[] args) throws Exception { // 处理环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 读取数据 DataSource<String> dataSource = env.readTextFile("input/words.txt"); // 处理数据 dataSource .flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(word); } } }) .map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String word) throws Exception { return Tuple2.of(word, 1L); } }) .groupBy(0) .sum(1) .print(); } }
运行效果:
(huahua,2)
(wangting,4)
(world,1)
(hello,6)
Process finished with exit code 0
创建一个有界流任务示例:Flink02_Streaming_wordcount
package com.wangting.test01; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class Flink02_Streaming_wordcount { public static void main(String[] args) throws Exception { // 处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 读取数据 DataStreamSource<String> dataStreamSource = env.readTextFile("input/words.txt"); env.setParallelism(1); // 数据处理 dataStreamSource .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } } }) .keyBy(new KeySelector<Tuple2<String, Long>, String>() { @Override public String getKey(Tuple2<String, Long> tuple2) throws Exception { return tuple2.f0; } }) .sum(1) .print(); // 执行任务 try { env.execute(); } catch (Exception e) { throw new RuntimeException(e); } } }
运行效果:
(huahua,1)
(huahua,2)
(wangting,1)
(wangting,2)
(hello,1)
(wangting,3)
(hello,2)
(hello,3)
(hello,4)
(hello,5)
(world,1)
(wangting,4)
(hello,6)
Process finished with exit code 0
创建一个无界流任务示例:Flink02_Streaming_wordcount
package com.wangting.test01; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class Flink03_Streaming_unbounded_wordcount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> socketTextStream = env.socketTextStream("116.196.108.175", 6666); // linux: yum install -y nc ; nc -lk 6666 socketTextStream .flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(word); } } }) .map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String word) throws Exception { return Tuple2.of(word, 1L); } }) .keyBy(new KeySelector<Tuple2<String, Long>, String>() { @Override public String getKey(Tuple2<String, Long> tuple2) throws Exception { return tuple2.f0; } }) .sum(1) .print(); env.execute(); } }
数据流输入:
[root@wangting ~]# nc -lk 6666
20230312
20230312
wang wang wang wang
huahua
qiqi
运行效果:
(20230312,1)
(20230312,2)
(wang,1)
(wang,2)
(wang,3)
(wang,4)
(huahua,1)
(qiqi,1)
Process finished with exit code -1
使用工具将项目package打包,在target目录下打包的flink-0309-1.0-SNAPSHOT.jar包即可在服务器上运行
local-cluster模式-本地集群
单机版为免配置,解压即可使用,类似本地开发环境
安装部署:
[wangting@wangt-flink01 software]$ wget http://archive.apache.org/dist/flink/flink-1.13.1/flink-1.13.1-bin-scala_2.12.tgz
[wangting@wangt-flink01 software]$ tar -zxvf flink-1.13.1-bin-scala_2.12.tgz -C /opt/module
[wangting@wangt-flink01 software]$ cd /opt/module
[wangting@wangt-flink01 module]$ cp -r flink-1.13.1 flink-local
[wangting@wangt-flink01 module]$ cd /opt/module/flink-local
[wangting@wangt-flink01 flink-local]$ mkdir job
# 上传flink-0309-1.0-SNAPSHOT.jar到job目录
[wangting@wangt-flink01 flink-local]$ ls job/
flink-0309-1.0-SNAPSHOT.jar
任务运行测试:
[wangting@wangt-flink01 flink-local]$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host wangt-flink01.
Starting taskexecutor daemon on host wangt-flink01.
[wangting@wangt-flink01 flink-local]$ bin/flink run -m wangt-flink01:8081 -c com.wangting.test01.Flink03_Streaming_unbounded_wordcount job/flink-0309-1.0-SNAPSHOT.jar
数据流输入:
[wangting@wangt-flink03 module]$ nc -lk 6666
wang wang wang
aaa aaa bbb
aaaaaaaa
[wangting@wangt-flink01 flink-local]$ bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 31132) on host wangt-flink01.
Stopping standalonesession daemon (pid: 30852) on host wangt-flink01.
独立集群模式下,Flink服务是一个以集群方式部署运行,Flink自身提供计算资源,无需其他框架提供资源,所以是独立集群模式。
安装部署:
[wangting@wangt-flink01 flink-local]$ cd /opt/module/
[wangting@wangt-flink01 module]$ cp -r flink-1.13.1 flink-standalone
[wangting@wangt-flink01 module]$ vim /opt/module/flink-standalone/conf/flink-conf.yaml
# 修改配置项
jobmanager.rpc.address: wangt-flink01
# 配置worker节点文件
[wangting@wangt-flink01 module]$ vim /opt/module/flink-standalone/conf/workers
wangt-flink02
wangt-flink03
[wangting@wangt-flink01 module]$
# 将flink-standalone目录分发到各workers节点
[wangting@wangt-flink01 module]$ scp -r flink-standalone wangt-flink02:/opt/module/
[wangting@wangt-flink01 module]$ scp -r flink-standalone wangt-flink03:/opt/module/
使用独立集群模式运行示例:
# 启动集群
[wangting@wangt-flink01 module]$ cd flink-standalone/
[wangting@wangt-flink01 flink-standalone]$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host wangt-flink01.
Starting taskexecutor daemon on host wangt-flink02.
Starting taskexecutor daemon on host wangt-flink03.
[wangting@wangt-flink01 flink-standalone]$
数据流输入:
[wangting@wangt-flink03 module]$ nc -lk 6666
aaaaa
aa aa aa
zxc
zxc
关闭集群
[wangting@wangt-flink01 flink-standalone]$ bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 25520) on host wangt-flink02.
Stopping taskexecutor daemon (pid: 25405) on host wangt-flink03.
Stopping standalonesession daemon (pid: 2006) on host wangt-flink01.
独立部署(Standalone)模式由Flink自身提供计算资源,无需其他框架提供资源,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但Flink主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,生产环境普遍采用专业的资源调度框架集成搭配使用,在国内工作环境中,搭配Yarn使用的非常普遍。
把Flink应用提交给Yarn的ResourceManager, Yarn的ResourceManager会申请容器从Yarn的NodeManager上面. Flink会创建JobManager和TaskManager在这些容器上.Flink会根据运行在JobManger上的job的需要的slot的数量动态的分配TaskManager资源
环境要求已经具备了hadoop集群
/etc/profile中增加 export HADOOP_CLASSPATH=hadoop classpath
Session-Cluster模式需要先启动Flink集群,向Yarn申请资源。后续提交任务都向这里提交。这个Flink集群会常驻在yarn集群中,直到手工停止。
在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交.
缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 会导致后续无法提交新的job
所以, Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job
一个Job会对应一个Flink集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行. 只要应用程序执行结束, Flink集群会马上被关闭. 也可以手动停止集群.
与Per-Job-Cluster的区别: 就是Application Mode下, 用户的main函数是在集群中(job manager)执行的
Per-Job-Cluster模式执行无界流WordCount:
[wangting@wangt-flink01 ~]$ cd /opt/module/flink-yarn/
[wangting@wangt-flink01 flink-yarn]$ bin/flink run -d -t yarn-per-job -c com.wangting.test01.Flink03_Streaming_unbounded_wordcount job/flink-0309-1.0-SNAPSHOT.jar
在红框中向后找到Tracking UI ,可以跳转到Flink的dashboard
数据流输入:
[wangting@wangt-flink03 module]$ nc -lk 6666
zzz zzzzzz
zzz zzz
zzz
asd
asd
www
任务关闭:
Session-Cluster模式执行无界流WordCount:
# 1.启动一个Flink-Session
[wangting@wangt-flink01 flink-yarn]$ bin/yarn-session.sh -d
applicationID:application_1678589491194_0013
提交任务:
[wangting@wangt-flink01 flink-yarn]$ bin/flink run -t yarn-session -Dyarn.application.id=application_1678589491194_0013 -c com.wangting.test01.Flink03_Streaming_unbounded_wordcount job/flink-0309-1.0-SNAPSHOT.jar
注意: application_1678589491194_0013指的是在yarn上启动的yarn应用
也可以不指定ID,自动去分配,但一般生产可能不止一个session,分配则未必会运行在自己期待的session上
[wangting@wangt-flink01 flink-yarn]$ bin/flink run -c com.wangting.test01.Flink03_Streaming_unbounded_wordcount job/flink-0309-1.0-SNAPSHOT.jar
数据流输入:
[wangting@wangt-flink03 module]$ nc -lk 6666
lll
lll
lll
asd
ttttttttttttttttttt
Application Mode模式执行无界流WordCount :
[wangting@wangt-flink01 flink-yarn]$ bin/flink run-application -t yarn-application -c com.wangting.test01.Flink03_Streaming_unbounded_wordcount job/flink-0309-1.0-SNAPSHOT.jar
数据流输入:
[wangting@wangt-flink03 module]$ nc -lk 6666
qqqq
qqq
wangting
wangg
wangg
wokao
aa aa aa aa
yarn的高可用是同时只启动一个Jobmanager, 当这个Jobmanager挂了之后, yarn会再次启动一个, 本质是利用的yarn的重试次数来实现的高可用
1.在yarn-site.xml中配置
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
</property>
参数作用是 应用程序主机执行尝试的最大次数
需要分发yarn-site后并重启hadoop的yarn服务
2.在flink-conf.yaml中配置
yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://wangt-flink01:8020/flink/yarn/ha
high-availability.zookeeper.quorum: wangt-flink01:2181,wangt-flink02:2181,wangt-flink03:2181
high-availability.zookeeper.path.root: /flink-yarn
配置完成后,即实现了Yarn高可用模式
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。