当前位置:   article > 正文

01-Flink安装部署及入门案例(仅供学习)

flink安装

概述

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink设计为在所有常见的集群环境中运行,以内存速度和任何规模执行计算。
在这里插入图片描述
Flink 1.13.1版本重要功能:让流处理应用的使用和普通应用一样简单和自然。https://developer.aliyun.com/article/780123
在这里插入图片描述

第一部分:Flink框架概述

01-Flink概述之官方定义

Apache Flink是一个开源的、基于流的、有状态的计算框架。他是分布式的执行的,具备低延迟,高吞吐的优秀性能,并给其非常擅长处理有状态的复杂计算逻辑场景。 Apache Flink 官网:https://flink.apache.org/
官方定义:Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

02-Flink概述之流式计算思想

Flink流式计算程序,来一条数据处理一条数据,每一次处理一条数据,真正流计算。
在这里插入图片描述Flink框架进行流式计算时,整个流程分为:数据源Source、数据转换Transformation和数据接收器Sink。

第一步、从数据源获取数据时,将数据封装到数据流
       DataStream
       实际项目,主要从Kafka消息队列中消费数据
第二步、数据处理时,调用DataStream方法
      DataStream#transformation
      类似RDD中转换算子,比如map、flatMap、filter等等
第三步、将分析数据输出到外部存储
      DataStream#sink
      类似RDD中触发/输出算子,比如foreach......
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

Flink应用程序进行分布式流式计算时,如何做到并行计算,如下图:
在这里插入图片描述对于Flink框架来说,每个Job运行时,在处理第一条数据之前,首先需要获取资源,运行Task任务,准备数据到达,当数据到达时,一个个数据处理。
·算子Operator:无论从数据源Source加载数据,还是调用方法转换Transformation处理数据,到最后数据终端Sink输出看,都称为Operator,分为:Source Operator、Transformation Operator和Sink Operator
·流Stream:数据从一个Opreator流向另一个Operator
在这里插入图片描述
每个算子Operator可以设置并行度(Parallelism),假设【Source】、【map()】及【keyBy()/window()/apply()】三个Operator并行度设置为2,【Sink】Operator并行的设置为1,形成如下示意图:
在这里插入图片描述
Flink流式计算引擎的特点:
在这里插入图片描述

03-Flink概述之应用场景

Apache Flink在阿里巴巴主要应用场景如下四类:[实时数仓、实时监控、实时报表、流数据分析]
在这里插入图片描述
官方提出三个方面,Flink框架应用场景:
在这里插入图片描述
1)、事件驱动型应用:Event-driven Applications
2)、数据分析型应用:Data Analytics Applications
3)、数据管道型应用 (ETL),Data Pipeline Applications

第二部分:

04-安装部署之Flink Cluster架构

Flink Running运行时架构组成:JobManager(主节点)和TaskManagers(从节点)
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/concepts/flink-architecture/
在这里插入图片描述
1)、JobManager:主节点Master,为每个Fink Job分配资源,管理和监控Job运行。
·主要负责调度Flink Job并协调Task做checkpoion; ·
·从clink出接受到Job和JAR包等资源后,会生成优化后的执行计划,并以task为单元调度到各个taskManager去执行;
2)、TaskManager:从节点workers,调度每个Job中task任务执行,及负责task监控和容错等。
·在启动的时候设置:Slot槽位数(资源槽),每个slot能启动Task,期中Task为线程。
在这里插入图片描述
Flinkclent提交应用程序,给主节点jobmanager
在这里插入图片描述
Flink支持多种安装模式
在这里插入图片描述
· 第一:local模式
[在Windows系统上IDEA集成开发环境编写Flink 代码程序,直接运行测试即为本地测试]

  • 适用于本地开发和测试环境,占用的资源较少,部署简单
  • 本地模式LocalMode:JobManager和TaskManager运行在同一个JVM进程
    在这里插入图片描述
    ·第二:standalone模式
    [将JobManager和TaskManagers直接运行机器上,称为Standalone集群,Flink框架自己集群]
  • 可以在测试环境功能验证完毕到版本发布的时候使用,进行性能验证
    https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/overview/
  • 高可用HA:Standalone Cluster HA
    https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/zookeeper_ha/
    ·第三:Flink on Yarn模式
  • [将JobManager和TaskManagers运行在NodeManage的Container容器中,称为Flink on YARN。]
  • Flink使用YARN进行调度
    https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/yarn/
    ·第四:k8s模式
    [将JobManager和TaskManagers运行在K8s容器Container中。]
  • 由于Flink使用的无状态模式,只需要kubernetes提供计算资源即可。会是Flink以后运行的主流方式,可以起到节约硬件资源和便于管理的效果。
    https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes

05-安装部署之本地集群

针对Flink框架来说,进程分别为JobManager(主节点,管理者)和TaskManager(从节点,干活着)
在这里插入图片描述
Local Cluster 本地集群:将不同进程运行在同一台机器上,只有一台机器
在这里插入图片描述

  1. Flink程序(比如jar包)由JobClient进行提交;
  2. JobClient将作业提交给JobManager
  3. JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager
  4. TaskManager启动一个线程以开始执行。TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成;
  5. 作业执行完成后,结果将发送回客户端(JobClient);
    步骤如下:::
    1)、上传软件及解压
[root@node1 ~]# cd /export/software/
[root@node1 software]# rz
	上传软件包:flink-1.13.1-bin-scala_2.11.tgz
	
[root@node1 software]# chmod u+x flink-1.13.1-bin-scala_2.11.tgz
[root@node1 software]# tar -zxf flink-1.13.1-bin-scala_2.11.tgz -C /export/server/	

[root@node1 ~]# cd /export/server/
[root@node1 server]# chown -R root:root flink-1.13.1
[root@node1 server]# mv flink-1.13.1 flink-local
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
# 目录结构
[root@node1 server]# cd flink-local/
[root@node1 flink-local]# ll
total 480
drwxrwxr-x  2 root root   4096 May 25 20:36 bin
drwxrwxr-x  2 root root    263 May 25 20:36 conf
drwxrwxr-x  7 root root     76 May 25 20:36 examples
drwxrwxr-x  2 root root   4096 May 25 20:36 lib
-rw-r--r--  1 root root  11357 Oct 29  2019 LICENSE
drwxrwxr-x  2 root root   4096 May 25 20:37 licenses
drwxr-xr-x  2 root root      6 Oct  5 10:25 log
-rw-rw-r--  1 root root 455180 May 25 20:37 NOTICE
drwxrwxr-x  3 root root   4096 May 25 20:36 opt
drwxrwxr-x 10 root root    210 May 25 20:36 plugins
-rw-r--r--  1 root root   1309 Jan 30  2021 README.txt
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2)、启动Flink本地集群

[root@node1 ~]# cd /export/server/flink-local/

[root@node1 flink-local]# bin/start-cluster.sh

[root@node1 flink-local]# jps
3504 TaskManagerRunner
3239 StandaloneSessionClusterEntrypoint
3559 Jps
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3)、访问Flink的Web UI:http://node1:8081/#/overview
在这里插入图片描述Slot在Flink里面可以认为是资源组,Flink是通过将任务(Task)分成子任务(SubTask)并且将这些子任务分配到slot来并行执行程序。
在这里插入图片描述
[Slot 封装Task运行资源,可以认为Contanier容器,]同一个Slot资源槽中可以运行不同类型子任务SubTask,相当于“猪槽,可以被多个PIG吃食。”
4)、测试完成以后,关闭本地集群

[root@node1 ~]# /export/server/flink-local/bin/stop-cluster.sh 
  • 1

实际案例:当本地集群启动以后,运行Flink应用程序,分别运行流计算和批处理 词频统计
例1: flink 脚本命令提交运行jar包程序,具体命令使用说明如下:

[root@node1 ~]# cd /export/server/flink-local/
[root@node1 flink-local]# bin/flink run --help

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action options:
     -c,--class <classname>               Class with the program entry point
                                          ("main()" method). Only needed if the
                                          JAR file does not specify the class in
                                          its manifest.
     -C,--classpath <url>                 Adds a URL to each user code
                                          classloader  on all nodes in the
                                          cluster. The paths must specify a
                                          protocol (e.g. file://) and be
                                          accessible on all nodes (e.g. by means
                                          of a NFS share). You can use this
                                          option multiple times for specifying
                                          more than one URL. The protocol must
                                          be supported by the {@link
                                          java.net.URLClassLoader}.
     -d,--detached                        If present, runs the job in detached
                                          mode
     -p,--parallelism <parallelism>       The parallelism with which to run the
                                          program. Optional flag to override the
                                          default value specified in the
                                          configuration.

  Options for Generic CLI mode:
     -D <property=value>   Allows specifying multiple generic configuration
                           options. The available options can be found at
                           https://ci.apache.org/projects/flink/flink-docs-stabl
                           e/ops/config.html
     -t,--target <arg>     The deployment target for the given application,
                           which is equivalent to the "execution.target" config
                           option. For the "run" action the currently available
                           targets are: "remote", "local", "kubernetes-session",
                           "yarn-per-job", "yarn-session". For the
                           "run-application" action the currently available
                           targets are: "kubernetes-application".

  Options for yarn-cluster mode:
     -m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution
                                      mode.
     -yid,--yarnapplicationId <arg>   Attach to running YARN session
     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper
                                      sub-paths for high availability mode

  Options for default mode:
     -D <property=value>             Allows specifying multiple generic
                                     configuration options. The available
                                     options can be found at
                                     https://ci.apache.org/projects/flink/flink-
                                     docs-stable/ops/config.html
     -m,--jobmanager <arg>           Address of the JobManager to which to
                                     connect. Use this flag to connect to a
                                     different JobManager than the one specified
                                     in the configuration. Attention: This
                                     option is respected only if the
                                     high-availability configuration is NONE.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                                     for high availability mode
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

1)、流计算:WordCount词频统计,[运行流式计算程序,从TCP Socket 读取数据,进行词频统计。]

# 开启1个终端
nc -lk 9999

# 上传jar【StreamWordCount.jar】包至/export/server/flink-local目录
cd /export/server/flink-local
rz

# 再开启1个终端,运行流式应用
/export/server/flink-local/bin/flink run \
--class cn.ittcast.flink.StreamWordCount \
/export/server/flink-local/StreamWordCount.jar \
--host node1 --port 9999
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在这里插入图片描述
2)、监控页面查看日志信息数据
在这里插入图片描述在这里插入图片描述查看TaskManager日志,每条数据处理结果:
在这里插入图片描述

例2:执行官方示例Example,读取文本文件数据,进行词频统计WordCount,将结果打印控制台或文件
在这里插入图片描述
1)、准备文件/root/words.txt

[root@node1 ~]# vim /root/words.txt
添加数据
spark python spark hive spark hive
python spark hive spark python
mapreduce spark hadoop hdfs hadoop spark
hive mapreduce
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2)、批处理,执行如下命令

# 指定处理数据文件,通过参数 --input 传递
/export/server/flink-local/bin/flink run \
/export/server/flink-local/examples/batch/WordCount.jar \
--input /root/words.txt
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

# 指定处理数据文件和输出数据目录,分别通过--input 和 --output 传递参数值
/export/server/flink-local/bin/flink run \
/export/server/flink-local/examples/batch/WordCount.jar \
--input /root/words.txt --output /root/out.txt
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

06-安装部署之Standalone集群

Flink Standalone集群,类似Hadoop YARN集群,管理集群资源和分配资源给Flink Job运行任务Task
在这里插入图片描述

  1. Client客户端提交任务给JobManager;
  2. JobManager负责申请任务运行所需要的资源并管理任务和资源;
  3. JobManager分发任务给TaskManager执行;
  4. TaskManager定期向JobManager汇报状态;

0)、集群规划:
在这里插入图片描述
1)、上传软件及解压

[root@node1 ~]# cd /export/software/
[root@node1 software]# rz
	上传软件包:flink-1.13.1-bin-scala_2.11.tgz
	
[root@node1 software]# chmod u+x flink-1.13.1-bin-scala_2.11.tgz
[root@node1 software]# tar -zxf flink-1.13.1-bin-scala_2.11.tgz -C /export/server/	

[root@node1 ~]# cd /export/server/
[root@node1 server]# chown -R root:root flink-1.13.1
[root@node1 server]# mv flink-1.13.1 flink-standalone
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2)、修改flink-conf.yaml

vim /export/server/flink-standalone/conf/flink-conf.yaml
修改内容:33行内容
	jobmanager.rpc.address: node1
  • 1
  • 2
  • 3

3)、修改masters

vim /export/server/flink-standalone/conf/masters
修改内容:	
	node1:8081
  • 1
  • 2
  • 3

4)、修改workers

vim /export/server/flink-standalone/conf/workers
修改内容:	
    node1
    node2
    node3
  • 1
  • 2
  • 3
  • 4
  • 5

5)、添加HADOOP_CONF_DIR环境变量(集群所有机器)

vim /etc/profile
	添加内容:
	export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
# 执行生效
source /etc/profile
  • 1
  • 2
  • 3
  • 4
  • 5

6)、将Flink依赖Hadoop 框架JAR包上传至/export/server/flink-standalone/lib目录
在这里插入图片描述

[root@node1 ~]# cd /export/server/flink-standalone/lib/

[root@node1 lib]# rz
	commons-cli-1.4.jar
	flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar
  • 1
  • 2
  • 3
  • 4
  • 5

7)、分发到集群其他机器

scp -r /export/server/flink-standalone root@node2:/export/server

scp -r /export/server/flink-standalone root@node3:/export/server
  • 1
  • 2
  • 3

。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
接下来,启动服务进程,运行批处理程序:词频统计WordCount。
1)、启动HDFS集群,在node1上执行如下命令

start-dfs.sh
  • 1

2)、启动集群,执行如下命令

# 一键启动所有服务JobManager和TaskManagers
[root@node1 ~]# /export/server/flink-standalone/bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node1.
Starting taskexecutor daemon on host node1.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述3)、访问Flink UI界面:http://node1:8081/#/overview
在这里插入图片描述在这里插入图片描述4)、执行官方测试案例

# 准备测试数据
[root@node1 ~]# hdfs dfs -mkdir -p /wordcount/input/
[root@node1 ~]# hdfs dfs -put /root/words.txt /wordcount/input/
  • 1
  • 2
  • 3

在这里插入图片描述

运行程序,使用--input指定处理数据文件路径
/export/server/flink-standalone/bin/flink run \
/export/server/flink-standalone/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input/words.txt
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

# 使用--output指定处理结果数据存储目录
/export/server/flink-standalone/bin/flink run \
/export/server/flink-standalone/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input/words.txt \
--output hdfs://node1:8020/wordcount/output/result

[root@node1 ~]# hdfs dfs -text /wordcount/output/result
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述
5)、关闭Standalone集群服务

# 一键停止所有服务JobManager和TaskManagers
[root@node1 ~]# /export/server/flink-standalone/bin/stop-cluster.sh 
Stopping taskexecutor daemon (pid: 6600) on host node1.
Stopping taskexecutor daemon (pid: 3016) on host node2.
Stopping taskexecutor daemon (pid: 3034) on host node3.
Stopping standalonesession daemon (pid: 6295) on host node1.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

补充:Flink Standalone集群启动与停止,也可以逐一服务启动

# 每个服务单独启动
# 在node1上启动
/export/server/flink-standalone/bin/jobmanager.sh start
# 在node1、node2、node3.
/export/server/flink-standalone/bin/taskmanager.sh start  # 每台机器执行

# ===============================================================
# 每个服务单独停止
# 在node1上停止
/export/server/flink-standalone/bin/jobmanager.sh stop
# 在node1、node2、node3
/export/server/flink-standalone/bin/taskmanager.sh stop 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

07-安装部署之Standalone HA

从Standalone架构图中,可发现JobManager存在单点故障(SPOF),一旦JobManager出现意外,整个集群无法工作。为了确保集群的高可用,需要搭建Flink的Standalone HA。
在这里插入图片描述Flink Standalone HA集群,类似YARN HA 集群安装部署,可以启动多个主机点JobManager,使用Zookeeper集群监控JobManagers转态,进行选举leader,实现自动故障转移。
在这里插入图片描述 在 Zookeeper 的协助下,一个 Standalone的Flink集群会同时有多个活着的 JobManager,其中**只有一个处于Active工作状态,其他处于 Standby 状态。**当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选一个新的 JobManager 来接管 Flink 集群。
1)、集群规划
在这里插入图片描述

# 在node1上复制一份standalone
[root@node1 ~]# cd /export/server/
[root@node1 server]# cp -r flink-standalone flink-ha

# 删除日志文件
[root@node1 ~]# rm -rf /export/server/flink-ha/log/*
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2)、启动ZooKeeper,在node1上启动

start-zk.sh
  • 1

3)、启动HDFS,在node1上启动,如果没有关闭,不用重启

start-dfs.sh
  • 1

4)、停止集群,在node1操作,进行HA高可用配置

/export/server/flink-standalone/bin/stop-cluster.sh 
  • 1

5)、修改flink-conf.yaml,在node1操作

vim /export/server/flink-ha/conf/flink-conf.yaml
	修改内容:
jobmanager.rpc.address: node1	

high-availability: zookeeper
high-availability.storageDir: hdfs://node1:8020/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_standalone

state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://node1:8020/flink/checkpoints
state.savepoints.dir: hdfs://node1:8020/flink/savepoints
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

6)、修改masters,在node1操作

vim /export/server/flink-ha/conf/masters
	修改内容:
	node1:8081
	node2:8081
  • 1
  • 2
  • 3
  • 4

7)、分发到集群其他机器,在node1操作

scp -r /export/server/flink-ha root@node2:/export/server/
scp -r /export/server/flink-ha root@node3:/export/server/
  • 1
  • 2

8)、修改node2上的flink-conf.yaml

[root@node2 ~]# vim /export/server/flink-ha/conf/flink-conf.yaml 
	修改内容:33 行
	jobmanager.rpc.address: node2
  • 1
  • 2
  • 3

9)、重新启动Flink集群

# node1和node2上执行
/export/server/flink-ha/bin/jobmanager.sh start

# node1和node2、node3执行
/export/server/flink-ha/bin/taskmanager.sh start  # 每台机器执行
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

08-Flink on YARN之运行流程

​ 在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的Workload,因此 Flink 也支持在 Yarn 集群运行。

为什么使用Flink on Yarn或Spark on Yarn?

  • 1)、Yarn的资源可以按需使用,提高集群的资源利用率

  • 2)、Yarn的任务有优先级,根据优先级运行作业

  • 3)、基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)

    当应用程序(MR、Spark、Flink)运行在YARN集群上时,可以实现容灾恢复。

09-Flink on YARN之安装部署

Flink on YARN安装配置,此处考虑高可用HA配置,集群机器安装软件框架示意图:
在这里插入图片描述1)、关闭YARN的内存检查(node1操作)

# yarn-site.xml中添加配置
vim /export/server/hadoop/etc/hadoop/yarn-site.xml
  • 1
  • 2

添加如下内容:

<!-- 关闭yarn内存检查 -->
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>	
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2)、 配置Application最大的尝试次数(node1操作)

# yarn-site.xml中添加配置
vim /export/server/hadoop/etc/hadoop/yarn-site.xml
  • 1
  • 2

添加如下内容:

<property>
	<name>yarn.resourcemanager.am.max-attempts</name>
	<value>4</value>
</property>
  • 1
  • 2
  • 3
  • 4

3)、同步yarn-site.xml配置文件(node1操作)

cd /export/server/hadoop/etc/hadoop
scp -r yarn-site.xml root@node2:$PWD
scp -r yarn-site.xml root@node3:$PWD
  • 1
  • 2
  • 3

4)、启动HDFS集群和YARN集群(node1操作)

[root@node1 ~]# start-dfs.sh

[root@node1 ~]# start-yarn.sh
  • 1
  • 2
  • 3

5)、添加HADOOP_CONF_DIR环境变量(集群所有机器

# 添加环境变量
 vim /etc/profile
  • 1
  • 2

添加内容:

export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
  • 1

环境变量生效

source /etc/profile
  • 1

6)、上传软件及解压(node1操作)

[root@node1 ~]# cd /export/software/
[root@node1 software]# rz
	上传软件包:flink-1.13.1-bin-scala_2.11.tgz
	
[root@node1 software]# chmod u+x flink-1.13.1-bin-scala_2.11.tgz
[root@node1 software]# tar -zxf flink-1.13.1-bin-scala_2.11.tgz -C /export/server/	

[root@node1 ~]# cd /export/server/
[root@node1 server]# chown -R root:root flink-1.13.1
[root@node1 server]# mv flink-1.13.1 flink-yarn
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

7)、将Flink依赖Hadoop 框架JAR包上传至/export/server/flink-yarn/lib目录
在这里插入图片描述

[root@node1 ~]# cd /export/server/flink-yarn/lib/
[root@node1 lib]# rz
	commons-cli-1.4.jar
	flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar
  • 1
  • 2
  • 3
  • 4

8)、配置HA高可用,依赖Zookeeper及重试次数(node1操作)

# 修改配置文件
vim /export/server/flink-yarn/conf/flink-conf.yaml
  • 1
  • 2

添加如下内容:

high-availability: zookeeper
high-availability.storageDir: hdfs://node1:8020/flink/yarn-ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
high-availability.zookeeper.path.root: /flink-yarn-ha
high-availability.cluster-id: /cluster_yarn

yarn.application-attempts: 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

9)、集群所有机器,同步分发Flink 安装包,便于任意机器提交运行Flink Job。

scp -r /export/server/flink-yarn root@node2:/export/server/

scp -r /export/server/flink-yarn root@node3:/export/server/
  • 1
  • 2
  • 3

10)、启动Zookeeper集群(node1操作)

start-zk.sh
  • 1

在Flink中执行应用有如下三种部署模式(Deployment Mode):
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/28d9a859154f9ba2e2fd90d2110735.png#pic_center

10-Flink on YARN之Session模式运行

Flink on YARN :Session 模式,表示多个Flink Job运行共享Standalone集群资源。

​ 先向Hadoop YARN申请资源,启动运行服务JobManager和TaskManagers,再提交多个Job到Flink 集群上执行。
在这里插入图片描述

  • 无论JobManager还是TaskManager,都是运行NodeManager Contanier容器中,以JVM 进程方式运行;
  • 提交每个Flink Job执行时,找的就是JobManager(AppMaster),找运行在YARN上应用ID;

Session 会话模式:arn-session.sh(开辟资源) + flink run(提交任务)

  • 第一、Hadoop YARN 运行Flink 集群,开辟资源,使用:yarn-session.sh
    • 在NodeManager上,启动容器Container运行JobManager和TaskManagers
  • 第二、提交Flink Job执行,使用:flink run

准备测试数据,测试运行批处理词频统计WordCount程序

[root@node1 ~]# vim /root/words.txt
  • 1

添加数据

spark python spark hive spark hive
python spark hive spark python
mapreduce spark hadoop hdfs hadoop spark
hive mapreduce
  • 1
  • 2
  • 3
  • 4

数据文件上传

[root@node1 ~]# hdfs dfs -mkdir -p /wordcount/input/
[root@node1 ~]# hdfs dfs -put /root/words.txt /wordcount/input/
  • 1
  • 2

在这里插入图片描述

  • 第一步、在yarn上启动一个Flink会话,node1上执行以下命令
export HADOOP_CLASSPATH=`hadoop classpath`
/export/server/flink-yarn/bin/yarn-session.sh -d -jm 1024 -tm 1024 -s 2

# 参数说明
-d:后台执行
-s:	每个TaskManager的slot数量
-jm:JobManager的内存(单位MB)
-tm:每个TaskManager容器的内存(默认值:MB)

# 提交flink 集群运行yarn后,提示信息
JobManager Web Interface: http://node1:44263
..................................................................
$ echo "stop" | ./bin/yarn-session.sh -id application_1633441564219_0001
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1633441564219_0001
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 第二步、查看UI界面,http://node1:8088/cluster/apps
    在这里插入图片描述 JobManager提供WEB UI:http://node1:8088/proxy/application_1614756061094_0002/#/overview

在这里插入图片描述
此时,没有任何TaskManager运行在容器Container中,需要等待有Flink Job提交执行时,才运行TaskManager。

  • 第三步、使用flink run提交任务
/export/server/flink-yarn/bin/flink run \
-t yarn-session \
-Dyarn.application.id=application_1652168669227_0001 \
/export/server/flink-yarn/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input/words.txt
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

  • 第四步、通过上方的ApplicationMaster可以进入Flink的管理界面
    在这里插入图片描述

  • 第五步、关闭yarn-session

# 优雅 停止应用,如果设置重启次数,即使停止应用,也会重启,一直到超过次数以后,才能真正停止应用
echo "stop" | /export/server/flink-yarn/bin/yarn-session.sh -id application_1633441564219_0001

# kill 命令,直接将运行在yarn应用杀死,毫不留情
yarn application -kill application_1633441564219_0001
  • 1
  • 2
  • 3
  • 4
  • 5

11-Flink on YARN之PerJob模式运行

每个Flink Job提交运行到Hadoop YARN集群时,根据自身的情况,单独向YARN申请资源,直到作业执行完成

在这里插入图片描述

​ 在Hadoop YARN中,每次提交job都会创建一个新的Flink集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。

采用Job分离模式,每个Flink Job运行,都会申请资源,运行属于自己的Flink 集群

  • 第一步、直接提交job
export HADOOP_CLASSPATH=`hadoop classpath`
/export/server/flink-yarn/bin/flink run \
-t yarn-per-job -m yarn-cluster \
-yjm 1024 -ytm 1024 -ys 1 \
/export/server/flink-yarn/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input

# 参数说明
-m:指定需要连接的jobmanager(主节点)地址,指定为 yarn-cluster,启动一个新的yarn-session
-yjm:JobManager可用内存,单位兆
-ytm:每个TM所在的Container可申请多少内存,单位兆
-ys:每个TM会有多少个Slot
-yd:分离模式(后台运行,不指定-yd, 终端会卡在提交的页面上)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

在这里插入图片描述

  • 第二步、查看UI界面:http://node1:8088/cluster
    在这里插入图片描述
    提交Flink Job在Hadoop YARN执行时,最后给出如下错误警告:
    在这里插入图片描述
解决办法: 在 flink 配置文件里 flink-conf.yaml设置
	classloader.check-leaked-classloader: false
  • 1
  • 2

12-Flink on YARN之Application模式运行

Flink 1.11 引入了一种新的部署模式,即 Application 模式,目前可以支持基于 Hadoop YARN 和 Kubernetes 的 Application 模式。

# 1、Session 模式:
	所有作业Job共享1个集群资源,隔离性差,JM 负载瓶颈,每个Job中main 方法在客户端执行。

# 2、Per-Job 模式:
	每个作业单独启动1个集群,隔离性好,JM 负载均衡,Job作业main 方法在客户端执行。
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述 以上两种模式,main方法都是在客户端执行,需要获取 flink 运行时所需的依赖项,并生成 JobGraph,提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。此外,提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应的临时目录,带来大量的网络的开销,所以如果任务特别多的情况下,平台的吞吐量将会直线下降。

Application 模式下,用户程序的 main 方法将在集群中运行,用户将程序逻辑和依赖打包进一个可执行的 jar 包里,集群的入口程序 (ApplicationClusterEntryPoint) 负责调用其中的 main 方法来生成 JobGraph。
在这里插入图片描述

Application 模式为每个提交的应用程序创建一个集群,并在应用程序完成时终止。Application 模式在不同应用之间提供了资源隔离和负载平衡保证。在特定一个应用程序上,JobManager 执行 main 可以[节省所需的 CPU 周期],还可以[节省本地下载依赖项所需的带宽]。
Application 模式==使用 bin/flink run-application提交作业,本质上是Session和Per-Job模式的折衷。

  • 通过 -t 指定部署环境,目前支持部署在 yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application);
  • 通过 -D 参数指定通用的运行配置,比如 jobmanager/taskmanager 内存、checkpoint 时间间隔等。
export HADOOP_CLASSPATH=`hadoop classpath`

/export/server/flink-yarn/bin/flink run-application \
-t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dtaskmanager.numberOfTaskSlots=1 \
/export/server/flink-yarn/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

由于MAIN方法在JobManager(也就是NodeManager的容器Container)中执行,当Flink Job执行完成以后,启动MRJobHistoryServer历史服务器,查看AppMaster日志信息。

# node1 上启动历史服务
[root@node1 ~]# mr-jobhistory-daemon.sh start historyserver 
  • 1
  • 2

第二步、查看UI界面:http://node1:8088/cluster
在这里插入图片描述

测试Flink Job不同运行模式时,注意事项如下
在这里插入图片描述

第三部分:Flink入门案例

13-Flink入门案例之编程模型

基于Flink计算引擎,分别实现批处理(Batch)和流计算(Streaming )中:词频统计WordCount

第一点:Flink API== ,提供四个层次API,越在下面API,越复杂和灵活;越在上面API,使用越简单和抽象
在这里插入图片描述
第二点:编程模型==,无论编写批处理还是流计算程序,分为三个部分:Data Source、Transformations和Data Sink

# 第一步、从数据源DataSource获取数据
	流计算:DataStream
	批处理:DataSet

# 第二步、对数据进行转换处理
	
# 第三步、结果数据输出DataSink
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

无论批处理Batch,还是流计算Stream,首先需要创建执行环境ExecutionEnvironment对象,类似Spark中SparkSession或者SparkContext

在这里插入图片描述
创建整个Flink基础课程Maven Project,设置MAVEN Repository仓库目录及Maven安装目录
在这里插入图片描述
约定:每天创建一个Maven Module](),创建第1天Maven Module,模块结构:
在这里插入图片描述
POM文件添加如下内容:

    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        </repository>
        <repository>
            <id>central_maven</id>
            <name>central maven</name>
            <url>https://repo1.maven.org/maven2</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.13.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.13.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>1.13.1</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                 
                               </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134

日志配置文件:log4j.properties

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, console

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

14-Flink入门案例之WordCount【批处理】

首先,基于Flink计算引擎,[实现离线批处理Batch:从文本文件读取数据,词频统计]。
在这里插入图片描述
批处理时词频统计思路如下伪代码所示:

					spark flink flink flink spark
								|
								| flatMap
								|
			 3-1. 分割单词 spark, flink, flink, flink, spark
			 					|
			                    | map
			                    |
			 3-2. 转换二元组 (spark, 1) (flink, 1) (flink, 1) (flink, 1) (spark, 1)
			 					|
			                    | groupBy(0)
			                    |
			 3-3. 按照单词分组
			        spark -> [(spark, 1) (spark, 1)]
			        flink -> [(flink, 1) (flink, 1) (flink, 1) ]
			        			|
			                    |sum(1)
			                    |
			 3-4. 组内数据求和,第二元素值累加
			        spark -> 1 + 1 = 2
			        flink -> 1 + 1 + 1 =3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

基于Flink编写批处理或流计算程序步骤如下:(5个步骤)

1.执行环境-env
2.数据源-source
3.数据转换-transformation
4.数据接收器-sink
5.触发执行-execute
  • 1
  • 2
  • 3
  • 4
  • 5

编写批处理词频统计:BatchWordCount,创建Java类

package cn.itqzd.flink.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * 使用Flink计算引擎实现离线批处理:词频统计WordCount
	 * 1.执行环境-env
	 * 2.数据源-source
	 * 3.数据转换-transformation
	 * 4.数据接收器-sink
	 * 5.触发执行-execute
 */
                                            public class BatchWordCount {

	public static void main(String[] args) throws Exception {
		// 1.执行环境-env
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment() ;

		// 2.数据源-source
		DataSource<String> inputDataSet = env.readTextFile("datas/words.txt");

		// 3.数据转换-transformation
		/*
			spark flink spark hbase spark
						|flatMap
			分割单词: spark, flink, spark
						|map
			转换二元组:(spark, 1)  (flink, 1) (spark, 1), TODO:Flink Java API中提供元组类Tuple
						|groupBy(0)
			分组:spark -> [(spark, 1), (spark, 1)]  flink -> [(flink, 1)]
						|sum(1)
			求和:spark -> 1 + 1 = 2,   flink = 1
		 */
		// 3-1. 分割单词
		FlatMapOperator<String, String> wordDataSet = inputDataSet.flatMap(new FlatMapFunction<String, String>() {
			@Override
			public void flatMap(String line, Collector<String> out) throws Exception {
				String[] words = line.trim().split("\\s+");
				for (String word : words) {
					out.collect(word);
				}
			}
		});

		// 3-2. 转换二元组
		MapOperator<String, Tuple2<String, Integer>> tupleDataSet = wordDataSet.map(new MapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public Tuple2<String, Integer> map(String word) throws Exception {
				return Tuple2.of(word, 1);
			}
		});

		// 3-3. 分组及求和, TODO: 当数据类型为元组时,可以使用下标指定元素,从0开始
		AggregateOperator<Tuple2<String, Integer>> resultDataSet = tupleDataSet.groupBy(0).sum(1);

		// 4.数据接收器-sink
		resultDataSet.print();

		// 5.触发执行-execute, TODO:批处理时,无需触发,流计算必须触发执行
		//env.execute("BatchWordCount") ;
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

15-Flink入门案例之WordCount【流计算】

编写Flink程序,接收TCP Socket的单词数据,并以空格进行单词拆分,分组统计单词个数
在这里插入图片描述

package cn.itqzd.flink.stream;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 使用Flink计算引擎实现实时流计算:词频统计WordCount,从TCP Socket消费数据,结果打印控制台。
	 * 1.执行环境-env
	 * 2.数据源-source
	 * 3.数据转换-transformation
	 * 4.数据接收器-sink
	 * 5.触发执行-execute
 */
public class StreamWordCount {

	public static void main(String[] args) throws Exception {
		// 1.执行环境-env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 2.数据源-source
		DataStreamSource<String> inputDataStream = env.socketTextStream("node1", 9999);

		// 3.数据转换-transformation
		SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
			// 3-1. 分割单词
			.flatMap(new FlatMapFunction<String, String>() {
				@Override
				public void flatMap(String line, Collector<String> out) throws Exception {
					for (String word : line.trim().split("\\s+")) {
						out.collect(word);
					}
				}
			})
			// 3-2. 转换二元组
			.map(new MapFunction<String, Tuple2<String, Integer>>() {
				@Override
				public Tuple2<String, Integer> map(String word) throws Exception {
					return new Tuple2<>(word, 1);
				}
			})
			// 3-3. 分组和组内求和
			.keyBy(0).sum(1);

		// 4.数据接收器-sink
		resultDataStream.print();

		// 5.触发执行-execute
		env.execute("StreamWordCount");
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

修改流计算词频统计,从本地系统文本文件加载数据,处理数据,设置执行模式为:Batch

package cn.itqzd.flink.execution;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 使用Flink计算引擎实现离线批处理:词频统计WordCount,TODO:从Flink 1.12开始,流批一体化,API统一,设置执行模式即可
	 * 1.执行环境-env
	 * 2.数据源-source
	 * 3.数据转换-transformation
	 * 4.数据接收器-sink
	 * 5.触发执行-execute
 */
public class ExecutionWordCount {

	public static void main(String[] args) throws Exception {
		// 1.执行环境-env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		// TODO: 设置执行模式execute-mode为Batch批处理
		env.setRuntimeMode(RuntimeExecutionMode.BATCH) ;

		// 2.数据源-source
		DataStreamSource<String> inputDataStream = env.readTextFile("datas/words.txt") ;

		// 3.数据转换-transformation
		SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
			// 3-1. 分割单词
			.flatMap(new FlatMapFunction<String, String>() {
				@Override
				public void flatMap(String line, Collector<String> out) throws Exception {
					for (String word : line.trim().split("\\s+")) {
						out.collect(word);
					}
				}
			})
			// 3-2. 转换二元组
			.map(new MapFunction<String, Tuple2<String, Integer>>() {
				@Override
				public Tuple2<String, Integer> map(String word) throws Exception {
					return new Tuple2<>(word, 1);
				}
			})
			// 3-3. 分组和组内求和
			.keyBy(0).sum(1);

		// 4.数据接收器-sink
		resultDataStream.print();

		// 5.触发执行-execute
		env.execute("StreamWordCount");
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

修改流式程序,从应用程序传递参数:host和port,使用Flink中工具类:ParameterTool,解析参数,代码如下所示:
在这里插入图片描述

package cn.itqzd.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 使用Flink计算引擎实现实时流计算:词频统计WordCount,从TCP Socket消费数据,结果打印控制台。
	 * 1.执行环境-env
	 * 2.数据源-source
	 * 3.数据转换-transformation
	 * 4.数据接收器-sink
	 * 5.触发执行-execute
 */
public class WordCount {

	public static void main(String[] args) throws Exception {

		// TODO: 构建参数解析工具类实例对象
		ParameterTool parameterTool = ParameterTool.fromArgs(args);
		if(parameterTool.getNumberOfParameters() != 2){
			System.out.println("Usage: WordCount --host <hostname> --port <port> .........");
			System.exit(-1);
		}
		final String host = parameterTool.get("host") ; // 直接传递参数,获取值
		final int port = parameterTool.getInt("port", 9999) ; // 如果没有参数,使用默认值

		// 1.执行环境-env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(2) ; // 设置并行度

		// 2.数据源-source
		DataStreamSource<String> inputDataStream = env.socketTextStream(host, port);

		// 3.数据转换-transformation
		SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
			// 3-1. 分割单词
			.flatMap(new FlatMapFunction<String, String>() {
				@Override
				public void flatMap(String line, Collector<String> out) throws Exception {
					for (String word : line.trim().split("\\s+")) {
						out.collect(word);
					}
				}
			})
			// 3-2. 转换二元组
			.map(new MapFunction<String, Tuple2<String, Integer>>() {
				@Override
				public Tuple2<String, Integer> map(String word) throws Exception {
					return new Tuple2<>(word, 1);
				}
			})
			// 3-3. 分组和组内求和
			.keyBy(0).sum(1);

		// 4.数据接收器-sink
		resultDataStream.print();

		// 5.触发执行-execute
		env.execute("StreamWordCount");
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

在这里插入图片描述

16-Flink入门案例之打包部署运行

Flink 程序提交运行方式有两种:

  • 1)、方式一:以命令行的方式提交:flink run
  • 2)、方式二:以UI界面方式提交
命令行提交

命令行方式提交Flink应用,可以运行至Standalone集群和YARN集群,以运行YARN Job分离模式为例演示提交Flink应用程序。

  • 1)、应用程序编译打包:flink-day01-1.0.0.jar,不包含其他依赖jar包,删除log4j配置文件。

  • 2)、启动HDFS集群和YARN集群

# 在node1上启动服务
start-zk.sh

start-dfs.sh

start-yarn.sh

mr-jobhistory-daemon.sh start historyserver 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 3)、上传作业jar包到linux服务器
cd /export/server/flink-yarn/
rz
  • 1
  • 2
  • 4)、提交运行
/export/server/flink-yarn/bin/flink run \
-t yarn-per-job \
-m yarn-cluster \
-yjm 1024 -ytm 1024 -ys 1 \
--class cn.itqzd.flink.WordCount \
/export/server/flink-yarn/flink-start-1.0-SNAPSHOT.jar \
--host node1 --port 9999
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 5)、第三步、查看任务运行概述
    http://ndoe1:8088/

界面UI提交
UI 方式提交,此种方式提交应用,可以提交Flink Job在Flink Standalone集群和YARN Session会话模式下,此处以YARN Session为例演示。

  • 1)、第一步、启动HDFS集群和YARN集群
# 在node1上启动服务
start-zk.sh

start-dfs.sh

start-yarn.sh
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 2)、第二步、启动YARN Session
export HADOOP_CLASSPATH=`hadoop classpath`
/export/server/flink-yarn/bin/yarn-session.sh -d -jm 1024 -tm 1024 -s 1
  • 1
  • 2
  • 3)、第三步、上传作业jar包及指定相关参数

在这里插入图片描述

选择打成jar包,然后填写参数值,截图如下:

在这里插入图片描述

参数内容:

Entry Class:cn.itqzd.flink.WordCount
Program Arguments:--host node1 --port 9999
  • 1
  • 2

在这里插入图片描述

点击显示计划【Show Plan】:

点击提交按钮【Submit】,运行Flink应用。

  • 4)、第四步、查看任务运行概述
    http://ndoe1:8088/
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/598040
推荐阅读
相关标签
  

闽ICP备14008679号