当前位置:   article > 正文

Flink入门到实战-阶段一(集群安装&使用)_flink 1.17.1

flink 1.17.1

hello world

引入pom.xml

  1. <build>
  2. <plugins>
  3. <plugin>
  4. <groupId>org.apache.maven.plugins</groupId>
  5. <artifactId>maven-compiler-plugin</artifactId>
  6. <configuration>
  7. <source>8</source>
  8. <target>8</target>
  9. </configuration>
  10. </plugin>
  11. </plugins>
  12. </build>
  13. <properties>
  14. <flink.version>1.13.0</flink.version>
  15. <java.version>1.8</java.version>
  16. <scala.binary.version>2.12</scala.binary.version>
  17. <slf4j.version>1.7.30</slf4j.version>
  18. </properties>
  19. <dependencies>
  20. <!-- 引入 Flink 相关依赖-->
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-java</artifactId>
  24. <version>${flink.version}</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.flink</groupId>
  28. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  29. <version>${flink.version}</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.flink</groupId>
  33. <artifactId>flink-cep_${scala.binary.version}</artifactId>
  34. <version>${flink.version}</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.apache.flink</groupId>
  38. <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  39. <version>${flink.version}</version>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.apache.flink</groupId>
  43. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  44. <version>${flink.version}</version>
  45. </dependency>
  46. <!-- 引入日志管理相关依赖-->
  47. <dependency>
  48. <groupId>org.slf4j</groupId>
  49. <artifactId>slf4j-api</artifactId>
  50. <version>${slf4j.version}</version>
  51. </dependency>
  52. <dependency>
  53. <groupId>org.slf4j</groupId>
  54. <artifactId>slf4j-log4j12</artifactId>
  55. <version>${slf4j.version}</version>
  56. </dependency>
  57. <dependency>
  58. <groupId>org.apache.flink</groupId>
  59. <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
  60. <version>${flink.version}</version>
  61. </dependency>
  62. <dependency>
  63. <groupId>org.apache.flink</groupId>
  64. <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
  65. <version>${flink.version}</version>
  66. </dependency>
  67. <dependency>
  68. <groupId>org.apache.logging.log4j</groupId>
  69. <artifactId>log4j-to-slf4j</artifactId>
  70. <version>2.14.0</version>
  71. </dependency>
  72. </dependencies>

log4j.properties

  1. log4j.rootLogger=ERROR, stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
  5. log4j.appender.logfile=org.apache.log4j.FileAppender
  6. log4j.appender.logfile.File=target/spring.log
  7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
  8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

DataSet(早期的批处理)实现

数据准备

  1. nihao nihao
  2. jiushi nihao

处理程序 

  1. public class FlinkSoctet {
  2. public static void main(String[] args) throws Exception {
  3. //得到执行环境对象
  4. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  5. DataSource<String> initData = env.readTextFile("input/wc.txt");
  6. //使用内部内的优点是不用考虑类型擦除的问题
  7. MapOperator<String, Tuple2<String, Integer>> mapValue = initData.flatMap(new FlatMapFunction<String, String>() {
  8. @Override
  9. public void flatMap(String textItem, Collector<String> out) throws Exception {
  10. String[] resItem = textItem.split(" ");
  11. for (String s : resItem) {
  12. out.collect(s);
  13. }
  14. }
  15. }).map(new MapFunction<String, Tuple2<String, Integer>>() {
  16. @Override
  17. public Tuple2<String, Integer> map(String item) throws Exception {
  18. return Tuple2.of(item, 1);
  19. }
  20. });
  21. //mapValue得到的数据是(key,value)元组类型,0表示key的位置,1表示value的位置
  22. //下面就是用key进行分组,用value进行求和
  23. mapValue.groupBy(0).sum(1).print();
  24. }
  25. }

得到的结果

  1. (nihao,3)
  2. (jiushi,1)

DataStream(流处理Api实现)

数据准备

  1. nihao nihao
  2. jiushi nihao

处理程序

  1. public class FlinkSoctet {
  2. public static void main(String[] args) throws Exception {
  3. //得到执行环境
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. DataStreamSource<String> initData = env.readTextFile("input/wc.txt");
  6. SingleOutputStreamOperator<Tuple2<String, Integer>> map = initData.flatMap(new FlatMapFunction<String, String>() {
  7. @Override
  8. public void flatMap(String item, Collector<String> out) throws Exception {
  9. String[] resItem = item.split(" ");
  10. for (String s : resItem) {
  11. out.collect(s);
  12. }
  13. }
  14. }).map(new MapFunction<String, Tuple2<String, Integer>>() {
  15. @Override
  16. public Tuple2<String, Integer> map(String item) throws Exception {
  17. return Tuple2.of(item, 1);
  18. }
  19. });
  20. //对于得到的元组的流数据,进行分组聚合
  21. map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
  22. @Override
  23. public String getKey(Tuple2<String, Integer> value) throws Exception {
  24. return value.f0;
  25. }
  26. }).sum(1).print();
  27. //由于是流处理程序,所以这里要不断的执行
  28. env.execute();
  29. }
  30. }

得到的结果

前面的数字是分配到的Task的编号,可以看到key相同的数据到了一个Task里面执行,比如nihao都在1线程里面处理

  1. 2> (jiushi,1)
  2. 1> (nihao,1)
  3. 1> (nihao,2)
  4. 1> (nihao,3)

DataStream(流处理Api处理Socket)

在发送数据的linux上面执行

 nc -lk 9997

处理程序 

  1. public class FlinkSoctet {
  2. public static void main(String[] args) throws Exception {
  3. //得到执行环境
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. DataStreamSource<String> initData = env.socketTextStream("master",9997);
  6. SingleOutputStreamOperator<Tuple2<String, Integer>> map = initData.flatMap(new FlatMapFunction<String, String>() {
  7. @Override
  8. public void flatMap(String item, Collector<String> out) throws Exception {
  9. String[] resItem = item.split(" ");
  10. for (String s : resItem) {
  11. out.collect(s);
  12. }
  13. }
  14. }).map(new MapFunction<String, Tuple2<String, Integer>>() {
  15. @Override
  16. public Tuple2<String, Integer> map(String item) throws Exception {
  17. return Tuple2.of(item, 1);
  18. }
  19. });
  20. //对于得到的元组的流数据,进行分组聚合
  21. map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
  22. @Override
  23. public String getKey(Tuple2<String, Integer> value) throws Exception {
  24. return value.f0;
  25. }
  26. }).sum(1).print();
  27. //由于是流处理程序,所以这里要不断的执行
  28. env.execute();
  29. }
  30. }

输入数据

q v d s fd a q

得到的结果

  1. 9> (d,1)
  2. 8> (fd,1)
  3. 4> (q,1)
  4. 9> (s,1)
  5. 4> (q,2)
  6. 11> (a,1)
  7. 3> (v,1)

集群搭建和使用

下载安装

Apache Flink: Downloads

集群规划

masternode1node2
Jobmanager,TaskManagerTaskManagerTaskManager 

解压

tar -zxvf flink-1.13.2-bin-scala_2.12.tgz

修改配置文件

flink-conf.yaml

jobmanager.rpc.address: master

masters

master:8081

workers

  1. master
  2. node1
  3. node2

分发到集群其他机器

./xsync /home/bigdata/congxueflink

启动集群(在bin目录里面)

./start-cluster.sh

访问http://master:8081/

 

 关闭

./stop-cluster.sh

WebUI提交任务

加入打包插件

  1. <build>
  2. <plugins>
  3. <plugin>
  4. <groupId>org.apache.maven.plugins</groupId>
  5. <artifactId>maven-compiler-plugin</artifactId>
  6. <configuration>
  7. <source>8</source>
  8. <target>8</target>
  9. </configuration>
  10. </plugin>
  11. <plugin>
  12. <groupId>org.apache.maven.plugins</groupId>
  13. <artifactId>maven-assembly-plugin</artifactId>
  14. <version>3.0.0</version>
  15. <configuration>
  16. <descriptorRefs>
  17. <descriptorRef>jar-with-dependencies</descriptorRef>
  18. </descriptorRefs>
  19. </configuration>
  20. <executions>
  21. <execution>
  22. <id>make-assembly</id>
  23. <phase>package</phase>
  24. <goals>
  25. <goal>single</goal>
  26. </goals>
  27. </execution>
  28. </executions>
  29. </plugin>
  30. </plugins>
  31. </build>

编写一个监听socket的程序

  1. public class FlinkSoctet {
  2. public static void main(String[] args) throws Exception {
  3. //得到执行环境
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. DataStreamSource<String> initData = env.socketTextStream("master",9997);
  6. SingleOutputStreamOperator<Tuple2<String, Integer>> map = initData.flatMap(new FlatMapFunction<String, String>() {
  7. @Override
  8. public void flatMap(String item, Collector<String> out) throws Exception {
  9. String[] resItem = item.split(" ");
  10. for (String s : resItem) {
  11. out.collect(s);
  12. }
  13. }
  14. }).map(new MapFunction<String, Tuple2<String, Integer>>() {
  15. @Override
  16. public Tuple2<String, Integer> map(String item) throws Exception {
  17. return Tuple2.of(item, 1);
  18. }
  19. });
  20. //对于得到的元组的流数据,进行分组聚合
  21. map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
  22. @Override
  23. public String getKey(Tuple2<String, Integer> value) throws Exception {
  24. return value.f0;
  25. }
  26. }).sum(1).print();
  27. //由于是流处理程序,所以这里要不断的执行
  28. env.execute();
  29. }
  30. }

然后打包

 

提交成功并执行

 查看处理的数据

命令提交任务

 -m是jobmanager开放提交的地址

-c是提交jar的启动类

-p是并行度

最后的jar就是自己打包的jar

./flink run -m master:8081 -c com.chongxue.flink.FlinkSoctet -p 2 flink-1.0-SNAPSHOT.jar

停止任务(后面的是job的id)

./flink cancel d9e4f4dcb0516551d6611675c16113bb

部署模式介绍

  • 会话模式:上面我们搭建的就是会话模式,集群的什么周期大于一切.适合任务小而多的场景
  • 单作业模式:提交一个任务就启动一个集群
  • 应用模式:和单作业模式不同的是,单作业模式一个应用里面每执行一个job提交,那么就启动一个集群,如果是应用模式那么就是只启动一个集群,还有应用模式提价job的jar在jobmanager上面,缓解了单作业模式由客户端提交到jobmanager的网络开销压力

Yarn模式部署

环境准备

配置环境变量

sudo vi  /etc/profile.d/my_env.sh
  1. HADOOP_HOME=/home/bigdata/hadoop/hadoop
  2. export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
  3. export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
  4. export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile.d/my_env.sh

可选配置:更具自己的需求配置启动的资源分配

 vim flink-conf.yaml
  1. jobmanager.memory.process.size: 1600m
  2. taskmanager.memory.process.size: 1728m
  3. taskmanager.numberOfTaskSlots: 8
  4. parallelism.default: 1

 yarn会话模式

执行脚本命令向 YARN 集群申请资源,开启一个 YARN 会话,启动 Flink 集群
./yarn-session.sh -nm test

可用参数解读:
⚫ -d:分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,
即使关掉当前对话窗口,YARN session 也可以后台运行。
⚫ -jm(--jobManagerMemory):配置 JobManager 所需内存,默认单位 MB。
⚫ -nm(--name):配置在 YARN UI 界面上显示的任务名。
⚫ -qu(--queue):指定 YARN 队列名。
⚫ -tm(--taskManager):配置每个 TaskManager 所使用内存。
注意:Flink1.11.0 版本不再使用-n 参数和-s 参数分别指定 TaskManager 数量和 slot 数量,
YARN 会按照需求动态分配 TaskManager 和 slot。所以从这个意义上讲,YARN 的会话模式也
不会把集群资源固定,同样是动态分配的。
YARN Session 启动之后会给出一个 web UI 地址以及一个 YARN application ID,
用户可以通过 web UI 或者命令行两种方式提交作业。

访问http://node1:34982/#/overview 

如下图

提交一个job(node1:34982是刚才访问WebUI的地址和端口)

 ./flink run -m node1:34982 -c com.chongxue.flink.FlinkSoctet -p 2 flink-1.0-SNAPSHOT.jar

单作业模式部署

(每一次提交启动一个完整的集群)

启动

./flink run -d -t yarn-per-job -c com.chongxue.flink.FlinkSoctet flink-1.0-SNAPSHOT.jar

 

停止job

$ ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
$ ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
<jobId>
这里的 application_XXXX_YY 是当前应用的 ID <jobId> 是作业的 ID 。注意如果取消作
业,整个 Flink 集群也会停掉。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/548786
推荐阅读
相关标签
  

闽ICP备14008679号