当前位置:   article > 正文

大数据之Flink简介和算子介绍_flink算子是什么

flink算子是什么

一、Flink简介

Flink是Apache旗下的一个分布式处理引擎框架,用于对无界和有界数据流进行有状态的计算。这里涉及到批处理和流处理,批处理针对的是有界数据集,非常适合需要访问海量的全部数据才能完成的计算工作,一般用于离线统计。流处理主要针对的是数据流,特点是无界、实时, 对系统传输的每个数据依次执行操作, 一般用于实时统计。简单来说,大量批处理的数据使用Spark框架来处理,虽然Spark现在也支持流处理,但属于是将批次降低到类似于流处理的形式。实时流处理一般采用Flink框架来处理,虽然Flink也能支持批处理操作,但是相当于有界的数据流来操作的。

1、Flink的特点

Flink 区别与传统数据处理框架的特性如下。
(1)高吞吐和低延迟:
每秒处理数百万个事件,毫秒级延迟。
(2)检查点的一致性:
Flink的故障恢复机制是通过建立分布式应用服务状态一致性检查点实现的。当有故障产生时,应用服务重启后,再重新加载上一次成功备份的状态检查点信息。结合可重放的数据源,该特性可保证精确一次(exactly-once)的状态一致性。
(3)高效的检查点:
如果一个应用要维护一个TB级的状态信息,对此应用的状态建立检查点服务的资源开销是很高的,为了减小因检查点服务对应用的延迟性(SLAs服务等级协议)的影响,Flink采用异步及增量的方式构建检查点服务。
(4)集成多种集群管理服务:
Flink已与多种集群管理服务紧密集成,如 Hadoop YARN, Mesos, 以及 Kubernetes。当集群中某个流程任务失败后,一个新的流程服务会自动启动并替代它继续执行。
(5)内置高可用服务:
Flink内置了为解决单点故障问题的高可用性服务模块,此模块是基于Apache ZooKeeper 技术实现的,Apache ZooKeeper是一种可靠的、交互式的、分布式协调服务组件。

2、Flink的安装

本地启动

(1)下载安装包
进入 Flink 官网,下载 1.13.0 版本安装包 flink-1.13.0-bin-scala_2.12.tgz,注意此处选用对应 scala 版本为 scala 2.12 的安装包。
(2)解压
在hadoop102节点服务器上创建安装目录/opt/module,将 Flink 安装包放在该目录下,并执行解压命令,解压至当前目录。

tar -zxvf flink-1.13.0-bin-scala_2.12.tgz -C /opt/module/ 
  • 1

(3)启动Flink 服务
进入解压后的目录,执行启动命令,并查看进程。在hadoop没有启动的情况下也可以启动,启动的将是单机模式

cd flink-1.13.0/
bin/start-cluster.sh		# 启动
jps							# 查看进程
  • 1
  • 2
  • 3

(4)访问 Web UI
启动成功后,访问 http://hadoop102:8081,可以对 Flink 集群和任务进行监控管理。
(5)关闭Flink 服务

bin/stop-cluster.sh
  • 1

集群启动

(1)进入 conf 目录下,修改 flink-conf.yaml 文件,修改 jobmanager.rpc.address 参数为 hadoop102,如下所示:

$ cd conf/
$ vim flink-conf.yaml # JobManager 节点地址.
jobmanager.rpc.address: hadoop102
  • 1
  • 2
  • 3

这就指定了 hadoop102 节点服务器作为 JobManager 的节点。
(2)修改 workers 文件,将另外两台节点服务器添加为本 Flink 集群的TaskManager 节点,具体修改如下:

$ vim workers 
hadoop103
hadoop104
  • 1
  • 2
  • 3

这样就指定了hadoop103 和 hadoop104 作为TaskManager 节点。
(3)另外,在 flink-conf.yaml 文件中还可以对集群中的 JobManager 和 TaskManager 组件进行优化配置,主要配置项如下:

  • jobmanager.memory.process.size:对 JobManager 进程可使用到的全部内存进行配置, 包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整。
  • taskmanager.memory.process.size:对 TaskManager 进程可使用到的全部内存进行配置,包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整。
  • taskmanager.numberOfTaskSlots:对每个 TaskManager 能够分配的 Slot 数量进行配置, 默认为 1,可根据 TaskManager 所在的机器能够提供给 Flink 的 CPU 数量决定。所谓Slot 就是TaskManager 中具体运行一个任务所分配的计算资源。
  • parallelism.default:Flink 任务执行的默认并行度,优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
    (4)配置修改完毕后,将 Flink 安装目录发给另外两个节点服务器
$ scp -r ./flink-1.13.0 mrlin@hadoop103:/opt/module
$ scp -r ./flink-1.13.0 mrlin@hadoop104:/opt/module
  • 1
  • 2

(5)在 hadoop102 节点服务器上执行 start-cluster.sh 启动 Flink 集群:

# 启动集群
$ bin/start-cluster.sh 
# 各节点查看进程情况
jps
  • 1
  • 2
  • 3
  • 4

5.访问 Web UI
启动成功后,同样可以访问 http://hadoop102:8081 对 Flink集群和任务进行监控管理。

3、Flink提交作业

(1)在 Web UI 上提交作业
可以将程序在本地打包后,在Web UI上面直接提交,点击 submin new job 那里提交作业,然后点击按钮“+ Add New”,选择要提交的jar包,会出现需要填写的参数。
(2)命令行提交作业

## 启动集群 
bin/start-cluster.sh
## 进入到Flink 的安装路径下,在命令行使用 flink run 命令提交作业。
bin/flink run -m hadoop102:8081 -c com.atguigu.wc.StreamWordCount ./FilnkTutorial-1.0-SNAPSHOT.jar
## 在log日志中可以查看执行结果。
  • 1
  • 2
  • 3
  • 4
  • 5

这里的参数 –m 指定了提交到的 JobManager,-c 指定了入口类。

4、Flink的部署模式

Flink 运行的架构有两大组件,分别是作业管理器(JobManger)和任务管理器(TaskManager),作业管理器是提交作业的主要管理者,任务管理器是提交作业的实际执行者。根据客户端提交作业到作业管理器执行的方式可以将Flink的部署模式分为以下几种。
(1)会话模式(Session Mode)
启动Flink后,再启动一个JobManger,在这个模式中通过客户端向JobManager提交作业。所有作业都共享一个JobManager的资源,由JobManager具体分配作业的具体执行到 TaskManager 中执行,作业执行完就释放资源。一旦提交过大需要资源较大的作业,会导致JobManager 资源不够,作业便无法再提交。同时,其中一个作业导致JobManager 宕机会影响其他所有在该 JobManager 执行的作业。因此会话模式适合单个规模小,执行时间短的大量作业。
简单说就是一个JobManager对应多个作业,作业由客户端提交。
(2)单作业模式(Per-Job Mode)
会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个JobManger,这就是所谓的单作业(Per-Job)模式。单作业模式也很好理解,就是严格的一对一,一个 JobManger只为这个作业而生。同样由客户端运行应用程序,然后启动JobManger,作业被提交给 JobManager,进而分发给 TaskManager 执行。作业作业完成后,JobManger就会关闭,所有资源也会释放。这样一来,每个作业都有它自己的 JobManager 管理,占用独享的资源,即使发生故障,它的 JobManger宕机也不会影响其他作业。
这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。需要注意的是,Flink 本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如 YARN、Kubernetes等。
简单说就是一个JobManager对应一个作业,作业由客户端提交。
(3)应用模式(Application Mode)
之前两个模式都是先提交到客户端,再由客户端做一定的处理后,提交到 JobManager 运行。这里不要客户端,先为每一个提交的应用单独启动一个JobManager,再直接将作业提交到JobManager上运行。这个 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了,这就是所谓的应用模式。
简单说就是一个JobManager对应一个作业,作业由 JobManager 来提交。

5、Flink的本地模式

Flink只是一个分布式计算框架,类似于hadoop中的MapReduce,所以Flink本身不擅长资源的管理和分配,但是本身内置有一定的资源的管理和分配功能。这种由Flink本身负责分布式计算,并且由本身分配资源和管理的方式,称为Flink的本地模式,也叫Flink的独立模式。
上面Flink的安装和启动,启动的就是Flink的会话模式部署,Flink本身不支持单作业模式部署,以下为Flink的应用模式部署:
(1)进入到 Flink 的安装路径下,将应用程序的 jar 包放到 lib/目录下。

cp ./FlinkTutorial-1.0-SNAPSHOT.jar lib/
  • 1

(2)执行以下命令,启动 JobManager。

./bin/standalone-job.sh start --job-classname com.mrlin.wc.StreamWordCount
  • 1

这里我们直接指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包。
(3)同样是使用bin 目录下的脚本,启动 TaskManager。

./bin/taskmanager.sh start
  • 1

(4)如果希望停掉集群,同样可以使用脚本,命令如下。

./bin/standalone-job.sh stop
./bin/taskmanager.sh stop
  • 1
  • 2

6、Flink的Yarn模式

Flink只是一个大数据计算框架,不是资源调度框架,让Flink跟Yarn配合,Flink负责计算,Yarn负责调度,才是比较符合实际应用的。Yarn上部署的过程是:客户端把 Flink 应用提交给 Yarn 的ResourceManager, Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,
Flink 会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源。
在将Flink任务部署至Yarn集群前,需要添加如下配置:
(1)配置环境变量,增加环境变量配置如下:

sudo vim /etc/profile.d/my_env.sh 
HADOOP_HOME=/opt/module/hadoop-2.7.5
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
  • 1
  • 2
  • 3
  • 4
  • 5

(2)启动Hadoop 集群,包括 HDFS 和 YARN:

start-dfs.sh
start-yarn.sh
  • 1
  • 2

(3)进入 conf 目录,修改 flink-conf.yaml 文件,修改以下配置:

cd /opt/module/flink-1.13.0-yarn/conf/
vim flink-conf.yaml 
jobmanager.memory.process.size: 1600m 
taskmanager.memory.process.size: 1728m 
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Yarn会话模式

(1)启动集群
启动 hadoop 集群(HDFS, YARN)
(2)执行脚本命令向 YARN 集群申请资源,开启一个 YARN 会话,启动 Flink 集群。

bin/yarn-session.sh -nm test
  • 1

可用参数解读:

⚫ -d:分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,否则关掉前台yarn会话模式也会关闭即使关掉当前对话窗口,
YARN session 也可以后台运行。
⚫ -jm(--jobManagerMemory):配置 JobManager 所需内存,默认单位 MB。
⚫ -nm(--name):配置在 YARN UI 界面上显示的任务名。
⚫ -qu(--queue):指定 YARN 队列名。
⚫ -tm(--taskManager):配置每个 TaskManager 所使用内存。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Yarn会话模式没有指定TaskManager和slot的参数数量,是因为yarn模式可以动态分配资源,无需指定,这样效率更高。
(3)提交作业

  • 1)通过Web UI 提交作业,这种方式比较简单,与上文所述 Standalone 部署模式基本相同。
  • 2)通过命令行提交作业
    • 将 Standalone 模式讲解中打包好的任务运行 JAR 包上传至集群
    • 执行以下命令将该任务提交到已经开启的 Yarn-Session 中运行。
bin/flink run -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
  • 1

Yarn单作业模式

在 YARN 环境中,由于有了外部平台做资源调度,所以我们也可以直接向 YARN 提交一个单独的作业,从而启动一个 Flink 集群。
(1)执行命令提交作业。

bin/flink run	-d -t yarn-per-job -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
  • 1

(2)可以使用命令行查看或取消作业,命令如下。

./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
  • 1
  • 2

这里的 application_XXXX_YY 是当前应用的 ID,是作业的 ID。注意如果取消作业,整个 Flink 集群也会停掉。

Yarn应用模式

应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。
(1)执行命令提交作业。

bin/flink run-application -t yarn-application -c com.atguigu.wc.StreamWordCountFlinkTutorial-1.0-SNAPSHOT.jar
  • 1

(2)在命令行中查看或取消作业。

./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
./bin/flink	cancel	-t	yarn-application-Dyarn.application.id=application_XXXX_YY <jobId>
  • 1
  • 2

(3)也可以通过yarn.provided.lib.dirs 配置选项指定位置,将 jar 上传到远程。

$ ./bin/flink run-application -t yarn-application-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir"
hdfs://myhdfs/jars/my-application.jar
  • 1
  • 2

这种方式下 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。

Yarn高可用模式

YARN 模式的高可用和独立模式(Standalone)的高可用原理不一样。独立模式中, 同时启动多个 JobManager, 一个为主用,其他的为后备。 当主用挂了, 其他的才会有一个成为主用。而 Yarn 的高可用是只启动一个Jobmanager, 当这个Jobmanager挂了之后, Yarn 会再次启动一个Jobmanager, 所以其实是利用的 Yarn 的重试次数来实现的高可用的。
Yarn的高可用和独立模式的高可用都是使用Zookeeper来实现的,所以需要提前先按照Zookeeper。
高可用的具体配置如下:
(1)在 yarn-site.xml 中配置。

<property>
	<name>yarn.resourcemanager.am.max-attempts</name>
	<value>4</value>
	<description>
		The maximum number of application master execution attempts.
	</description>
</property>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

注意:配置完需要分发。
(2)在 flink-conf.yaml 中配置。

yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop102:9820/flink/yarn/ha 
high-availability.zookeeper.quorum: 
hadoop102:2181,hadoop103:2181,hadoop104:2181
high-availability.zookeeper.path.root: /flink-yarn
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(3)启动 yarn-session。
(4)杀死 JobManager, 查看复活情况。
注意: yarn-site.xml 中配置的是 JobManager 重启次数的上限, flink-conf.xml 中的次数应该小于这个值。

7、Flink的分层API

Flink的API可以大致的分层三层,分别为最顶层SQL/Table层API,中间层DataStream/DataSet层API,还有最底层的有状态流处理(ProceseFuncton)API。
其中越顶层越抽象,表达含义越简明,使用越方便;越底层越具体,表达能力越丰富,使用越灵活。

二、Flink运行时架构

系统架构

对于分布式系统来说,都是多个系统协调合作的效果。如数据存储交给HDFS,资源调度交给Yarn,高可用配置交给ZooKeeper,Flink本身只需做好流式数据处理即可。
Flink运行架构主要是两大组件,分别是作业管理器(JobManager)和任务管理器(TaskManager),其中JobManager是整个作业的管理者,TaskManager是单个服务器上所有TaskSlot的管理者,TaskSlot里面其实包装的就是服务器节点的计算资源(包含CPU,内存,磁盘和网络等)。一个JobManager对应多个TaskManager,一个TaskManager对应多个TaskSlot。设置TaskSlot的数量一般根据服务器上面CPU的核数设置,这样才能做到真正的并行操作,而不是并发操作。

作业管理器(JobManager)

JobManager 是一个 Flink 集群中任务管理和调度的核心,是控制应用执行的主进程。JobManager里面又包含了三个不同的组件:
(1)JobMaster
JobMaster是JobManager的核心组件,负责接收客户端的作业Jar包,数据流图和作业图,并将其转化成执行图(ExecutionGraph),JobMaster会向 Yarn的ResourceManager申请执行任务的资源,获取到资源,就会将作业交由TaskManager执行,并负责检查点的协调和和监控TaskManager的运行状态。
(2)资源管理器-ResourceManager
ResourceManager主要负责资源的分配和管理,需要注意将Flink自带的ResourceManager和其他资源管理平台的ResourceManager区分开,Flink自带的资源管理器一般只作为测试和向其他资源管理平台申请资源使用,但yarn的资源管理器能更好的管理和分配资源。
(3)分发器(Dispatcher)
Dispatcher 主要负责提供一个 REST 接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的 JobMaster 组件。Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息。Dispatcher 在架构中并不是必需的,在不同的部署模式下可能会被忽略掉。

任务管理器(TaskManager)

TaskManager是Flink中的工作进程,负责一个服务器节点的资源管理,并负责执行具体的计算。每一个 TaskManager 都包含了一定数量的任务槽(task slots)。Slot是资源调度的最小单位,slot 的数量限制了TaskManager 能够并行处理的任务数量。

作业提交流程

一般作业提交流程:
(1)一般情况下,由客户端(App)通过分发器提供的 REST 接口,将作业提交给JobManager。
(2)由分发器启动 JobMaster,并将作业(包含 JobGraph)提交给 JobMaster。
(3)JobMaster 将 JobGraph 解析为可执行的ExecutionGraph,得到所需的资源数量,然后向资源管理器请求资源(slots)。
(4)资源管理器判断当前是否有足够的可用资源;如果没有,启动新的 TaskManager。
(5)TaskManager 启动之后,向ResourceManager 注册自己的可用任务槽(slots)。
(6)资源管理器通知 TaskManager 为新的作业提供 slots。
(7)TaskManager 连接到对应的 JobMaster,提供 slots。
(8)JobMaster 将需要执行的任务分发给TaskManager。
(9)TaskManager 执行任务,互相之间可以交换数据。

Yarn模式提交流程:
(1)客户端通过REST 接口,将作业提交给分发器。
(2)分发器启动 JobMaster,并将作业(包含 JobGraph)提交给 JobMaster。
(3)JobMaster 向资源管理器请求资源(slots)。
(4)资源管理器向 YARN 的资源管理器请求container 资源。
(5)YARN 启动新的TaskManager 容器。
(6)TaskManager 启动之后,向 Flink 的资源管理器注册自己的可用任务槽。
(7)资源管理器通知 TaskManager 为新的作业提供 slots。
(8)TaskManager 连接到对应的 JobMaster,提供 slots。
(9)JobMaster 将需要执行的任务分发给TaskManager,执行任务。

重要概念

算子:
在 Flink 代码中,我们定义的每一个处理转换操作都叫作“算子”(Operator),所以我们的程序可以看作是一串算子构成的管道,而数据则像水流一样有序地流过。
按照功能不同可以大致分为三类,如下:
Source 表示“源算子”,负责读取数据源。
Transformation 表示“转换算子”,利用各种算子进行处理加工。
Sink 表示“下沉算子”,负责数据的输出。

并行度:
上面说的任务并行是指CPU的不同核运行不同的任务,达到真正的并行,而不是多线程共享单一核的并发执行,所以这里的并行度指的就是真正并行的数量。因为Flink处理的是流式数据,所以同一时间,同一任务,都会有不同阶段的数据正在处理。每个算子同时处理一个任务的不同阶段,称之为任务并行。一个任务的一个阶段由多个算子同时运行,称之为数据并行,也叫数据并行度(Parallelism),一个程序的并行度由所有算子中最大的并行度决定。
数据并行度设置:

parallelim.default 1   	# 在配置文件中修改,优先级最低,一般不改
./bin/flink run -p 2	# 在运行时,有参数-p来指定
env.setParallelism(2)	# 程序当中指定
  • 1
  • 2
  • 3

上面说到同一阶段由多个算子同时运行,其实就是将该算子的操作复制到多个TaskSlot中,当有大量的数据进来的时候,哪个TaskSlot空闲,就由哪个TaskSlot负责算子的运行。且多个TaskSlot之间是并行的,所有里面相同的算子也是并行的。据此,我们可以优化算子的操作。
算子链:
也叫算子合并,当算子之前的模式是oneToOne并且前后的并行度是一样的时候,可以合并算子。oneToOne是指前一个算子的结果可以直接作为下一个算子的输入,前面并行度一样,是指前一个算子和后一个算子的数据并行度是一样的,就能将这个两个算子合并,合并后当成一个算子处理。TaskSlot共享,也叫算子链,一个TaskSlot可以同时运行不同阶段的算子,这样当一个阶段的算子运行完,下一个算子可以直接使用前一个算子在内存的计算结果,减少了数据的传输和网络带宽,提高效率。
算子链设置:

// 禁用算子链
.map(word -> Tuple2.of(word, 1L)).disableChaining();
// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()
  • 1
  • 2
  • 3
  • 4

一对一和重分区:
一对一就是指的是算子之间的数据,因为分布式都是数据在TaskSlot单独运行的,如果下一步操作是将里面的数据+1,那么这两个操作就是一对一的,因为跟其他TaskSlot里面的数据没有关系。
如果下一步的操作是全部排序,那么就需要将所有TaskSlot里面的数据都汇总才能排序,这个需要先所有数据汇总再处理的过程就是重分区。
执行图:
执行图 (ExecutionGraph)
Flink 中的执行图可以分成四层: StreamGraph -> JobGraph ->ExecutionGraph->物理执行图
StreamGraph: 是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
JobGraph: StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点。
ExecutionGraph: JobManager 根据JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
物理执行图: JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的”图”,并不是一个具体的数据结构。

三、Flink的API使用

Flink的程序处理,其实就是对数据的各种转换,一般来说包含以下的步骤:
(1)获取执行环境(execution environmemt)
(2)读取数据源(source)
(3)定义基于数据的转换操作(transformations)
(4)定义计算结果的输出位置(sink)
(5)触发程序执行(execute)

1、创建执行环境

编写Flink程序的第一步,就是创建Flink执行的环境。主要有以下三种:
(1) getExecutionEnvironment
最简单的方式,会根据当前运行的上下文直接得到正确的结果,不管是本地还是分布式的环境。本地就返回本地环境,分布式就返回分布式的环境。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • 1

(2) createLocalEnvironment
这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU 核心数。

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
  • 1

(3) createRemoteEnviroment
这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包。

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment("host", 		// JobManager 主机名
						 1234, 			// JobManager 进程端口号
						 "path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包
);
  • 1
  • 2
  • 3
  • 4
  • 5

批式处理:
因为在最新的版本中flink弃用了dataSet,改用成dataStream来同时处理流数据和批数据,当前的环境又设置成了流式数据的环境,因此想要修改成批式处理环境的时候,可以通过添加参数来指定批式处理。
流执行模式(Streaming):dataStream默认是流执行模式
批执行模式(BATCH):添加参数BATCH指定是批执行模式
自动模式(AUTOMATIC):由程序确认数据源是否有界,来自定选择执行的模式。
批处理环境配置方式:
(1)通过命令配置BATCH,批执行模式

bin/flink run -Dexecution.runtime-mode=BATCH ...
  • 1

(2)通过代码配置,不常用,不建议使用

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment ();
env.setRuntimeMode (RuntimeExecutionMode.BATCH);
  • 1
  • 2

其实Streaming也能处理批数据,将批数据看成“有界流”数据来处理,但Streaming来处理批数据是不够高效的,因为Streaming每来一条数据都会有一次结果输出,所以简单的原则就是用 BATCH 模式处理批量数据,用 Streaming 模式处理流式数据。

触发程序执行:
因为Flink是由事件驱动的,只有等数据到来,才会触发真正的计算,这被称为“延迟执行”或“懒执行”(lazy execution),因为我们定义了作业的每个算子,每个操作之后,如需执行,还需告诉程序立即执行,就是调用执行环境的 execute()方法,来触发程序的执行。

2、源算子

Flink 可以从各种来源获取数据,然后构建DataStream 进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。
Flink 代码中通用的添加 source 的方式,是调用执行环境的 addSource()方法:

DataStream<String> stream = env.addSource(...);
  • 1

POJO数据类型

实际使用中,我们一般会将需要读取的数据包装成一个类,如果该数据类包含以下的特点,我们就会将它称之为POJO类。
(1)类是公有(public)的
(2)有一个无参的构造方法
(3)所有属性都是公有(public)的
(4)所有属性的类型都是可以序列化的
将数据包装成POJO类更方便理解,也更方便数据的解析和序列化。
此处我们假设定义一个用户访问的POJO数据类,包含用户名,访问的URL,还有访问的时间戳三个字段。具体定义如下:

import java.sql.Timestamp;

public class Event { 
	public String user; 
	public String url; 
	public Long timestamp;

	public Event() {}

	public Event(String user, String url, Long timestamp) {
		this.user = user;
		this.url = url; this.timestamp = timestamp;
	}

	@Override
	public String toString() { 
		return "Event{" +
				"user='" + user + '\'' +
				", url='" + url + '\'' +
				", timestamp=" + new Timestamp(timestamp) 
				+ '}';
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

从集合中读取数据

最简单的数据读取方式,直接创建一个集合,然后调用执行环境的fromCollection方法读取,一般用于数据测试。

public static void main(String[] args) throws Exception {
	// 获取流式执行环境
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
	// 设置数据并行度为1
	env.setParallelism(1);
	// 创建集合
	ArrayList<Event> clicks = new ArrayList<>(); 
	// 给集合添加数据
	clicks.add(new Event("Mary","./home",1000L)); 
	clicks.add(new Event("Bob","./cart",2000L));
	// 调用执行环境的fromCollection方法读取集合的数据,返回一个DataStream类型的源算子 stream
	DataStream<Event> stream = env.fromCollection(clicks); 
	// 或者直接采用匿名函数的形式添加集合数据
	DataStreamSource<Event> stream2 = env.fromElements( 
		new Event("Mary", "./home", 1000L),
		new Event("Bob", "./cart", 2000L)
	);
	// 直接将stream 的内容打印出来
	stream.print();
	// 触发程序执行
	env.execute();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

从文件中读取数据

但一般我们更为常用的方式,是从文件中读取,这里调用的是运行环境的readTextFile方法。

DataStream<String> stream = env.readTextFile("clicks.txt");
  • 1

其中:
(1)参数可以是目录,也可以是文件,目录则读取该目录下的所有文件;
(2)路径可以是相对路径,也可以是绝对路径;
(3)相对路径是从系统属性 user.dir 获取路径: idea 下是 project 的根目录, standalone 模式
下是集群节点根目录;
(4)可以从hdfs 目录下读取, 使用路径hdfs://…, 由于 Flink 没有提供hadoop 相关依赖,
需要 pom 中添加相关依赖:

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-client</artifactId>
	<version>2.7.5</version>
	<scope>provided</scope>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

从Socket中读取数据

上面读取的数据其实都是有界的,但流处理实际处理更多的是无界的数据。一个简单的方式就是通过socket来读取文本,这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

DataStream<String> stream = env.socketTextStream("localhost", 7777);
  • 1

从kafka中读取数据

真正的实际应用项目一般采用kafka这种来读取数据,Kafka作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。Kafka具备数据重放的功能,就是当Flink处理过程出错时,会给Kafka发送数据重放需求,Kafka根据需要将需要的数据重放,确保数据正确且只处理一次,配合使用可以实现Flink精确一次的功能。
使用Kafka首先需要导入kafka的依赖:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
	<version>${flink.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

Flink内部没有提供预实现的方法,我们采用通用的addSource的方式,通过实现一个SourceFunction来实现。

	Properties properties = new Properties();
	properties.setProperty("bootstrap.servers", "hadoop102:9092"); 
	properties.setProperty("group.id", "consumer-group"); 
	properties.setProperty("key.deserializer",
	"org.apache.kafka.common.serialization.StringDeserializer"); 
	properties.setProperty("value.deserializer",
	"org.apache.kafka.common.serialization.StringDeserializer"); 
	properties.setProperty("auto.offset.reset", "latest");
	
	DataStreamSource<String> stream	= env.addSource(new FlinkKafkaConsumer<String>(
		"clicks",
		new SimpleStringSchema(), 
		properties
	)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

创建 FlinkKafkaConsumer 时需要传入三个参数:

  • 第一个参数topic,定义了从哪些主题中读取数据。可以是一个topic,也可以是 topic 列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条流中去。
  • 第二个参数是一个DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是公共接口,所以我们也可以自定义反序列化逻辑。
  • 第三个参数是一个 Properties 对象,设置了Kafka 客户端的一些属性。

自定义Source读取数据

上面的能满足大部分的需求了,但还有一个更实用的就是自定义数据的读取,不管是测试还是读取外部的系统,都很实用。就是自定义SourceFunction,实现SourceFunction接口,重写以下两个关键方法:

  • run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
  • cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
public class ClickSource implements SourceFunction<Event> {
    private Boolean running  = true;
    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        Random random = new Random();		//在给定的数据集里面随机
        String[] users = {"Mary","Alice","Bob","Juli","Tom","Jumi","Junms","Keb","Hilo"};
        String[] urls = {"./home","./cart","./fav","./prod","product","redCart"};
        while (running){
            ctx.collect(
                    new Event(
                            users[random.nextInt(users.length)],	//随机用户
                            urls[random.nextInt(urls.length)],		//随机页面
                            Calendar.getInstance().getTimeInMillis()
                    )
            );
            Thread.sleep(500);		//每个500毫秒生成一条数据
        }
    }
    @Override
    public void cancel() {
        running = false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

自定义了数据源之后,只需调用addSource()就可以了。

//有了自定义的 source function,调用 addSource 方法
DataStreamSource<Event> stream = env.addSource(new ClickSource());
  • 1
  • 2

Flink支持的数据类型

分布式往往都涉及一个概念,那就是需要将任务分散到不同节点执行,也需要将A节点失败的任务再次交给B节点运行,这就涉及到需要保持任务的数据类型和任务的操作,所以任务支持的类型都要能序列化和反序列化,这样才能在不同服务节点之间来传输。Flink中所有类型描述的基类是TypeInformation,它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
(1)基本类型
常见的Java和Scala数据类型,所有 Java 基本类型及其包装类,再加上 Void、String、Date、BigDecimal 和 BigInteger。
(2)数组类型
包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。
(3)复合数据类型

  • Java 元组类型(TUPLE):这是 Flink 内置的元组类型,是 Java API 的一部分。最多
    25 个字段,也就是从 Tuple0~Tuple25,不支持空字段;
  • Scala 样例类及 Scala 元组:不支持空字段;
  • 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段;
  • POJO:Flink 自定义的类似于 Java bean 模式的类;
    (4)辅助类型
    Option、Either、List、Map 等。
    (5)泛型类型(GENERIC)
    Flink 支持所有的 Java 类和 Scala 类。不过如果没有按照上面 POJO 类型的要求来定义, 就会被 Flink 当作泛型类来处理。
    Flink对POJO类型的定义如下:
  • 类是公共的(public)和独立的(standalone,也就是说没有非静态的内部类);
  • 类有一个公共的无参构造方法;
  • 类中的所有字段是 public 且非 final 的;或者有一个公共的 getter 和 setter 方法,这些方法需要符合 Java bean 的命名规范。
    实际项目中,往往会将程序处理中的元素类型定义为Flink的POJO类型,因为POJO具备灵活性,支持创建复杂的数据类型,也更容易理解和使用。
    类型提示:
    上面说到当使用的类型是泛型类型时,由于Java中存在泛型擦除的情况,在某些特殊情况下,Flink是无法获知里面数据的具体类型的,这时在转移任务的时候就会发生错误,所以需要明确指定某些数据的类型,告知Flink,才能正确的解析出完整的数据。Flink专门提供了TypeHint类,它可以捕获泛型的类型信息,并且一直记录下来。
    形式如下所示:
.returns(Types.TUPLE(Types.STRING, Types.LONG));
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
  • 1
  • 2

3、转化算子

在读取了数据之后,其实最重要的就是对数据的处理转化,我们称数据的各种处理转换为算子,算子的逻辑其实就是将一个或多个DataStream转换成新的DataStream,多个算子构成数据的处理逻辑。

基本转换算子

(1)映射(map)
元素之间一一对应,一对一的映射就使用map来处理。调用方法是传入实现MapFunction接口的函数,方法的参数为输入的参数类型和输出的参数类型。

{	// 这里省略了环境的创建
	// 从集合获取数据
	DataStreamSource<Event> stream = env.fromElements( 
		new Event("Mary", "./home", 1000L),
		new Event("Bob", "./cart", 2000L)
	);
	// 方式1,用匿名类实现MapFunction,Event是输入类型,String是输出类型,一一对应
	stream.map(new MapFunction<Event, String>() {
		// 需要重写Map方法
		@Override
		public String map(Event e) throws Exception { 
			return e.user;
		}
	});
	// 方式2,实现接口实现
	public static class UserExtractor implements MapFunction<Event, String> { 
		// 需要重写Map方法
		@Override
		public String map(Event e) throws Exception { 
			return e.user;
		}
	}
	stream.map(new UserExtractor()).print();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

Map总结,元素之间的关系是一一对应(一一映射)时使用,需要实现MapFunction接口,该接口需要指定两个泛型,分别是输入数据的泛型,输出数据的泛型,还需要重写map方法,指定数据映射的逻辑。
(2)过滤(filter)
filter转换操作,就是对数据流进行一个过滤操作,也能叫做数据的清洗,通过设置一个布尔条件表达式来设置过滤的条件,对于每一个流内的元素进行判断,true则通过,false则丢弃。调用方法是实现FilterFunction接口,传入数据的泛型,并返回布尔类型的条件表达式。

	// 方式1 传入匿名类实现
	stream.filter(new FilterFunction<Event>() {
		@Override
		public boolean filter(Event e) throws Exception { 
			return e.user.equals("Mary");
		}
	});
	// 方式2 实现接口实现
	public static class UserFilter implements FilterFunction<Event> { 
		@Override
		public boolean filter(Event e) throws Exception { 
			return e.user.equals("Mary");
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

filter总结,需要针对元素进行过滤时使用,需要实现FilterFunction接口,该接口需要指定输入数据的泛型,重写filter方法,返回布尔类型。
(3)扁平映射(flatMap)
当原始数据的每一个元素都是一个集合,或者一个列表的时候,我们需要对集合里面的元素进行操作,就需要先将集合的数据取出来,再进行操作。对应到元素上,就是一个输入元素,对应0个或者多个输出元素。扁平映射也可以看成是先扁平化(flatten)再映射(map)两步操作,而flatMap就是将两步合成一步,在使用中更加的方便。
调用方法是实现FlatMapFunction接口,传入输入的数据泛型和最终的输出数据的泛型,并重写里面的flatMap方法,该方法的参数一个是输入的数据泛型,另一个因为输出是不固定的,所以只是一个收集器(Collector),当需要输出的数据时,调用收集器的.collect()方法,将输出数据存放到收集器即可,.collect()可以不调用,也可以多次调用。

// 方式1
	stream.flatMap(new FlatMapFunction<Event, String>() {
        @Override
        public void flatMap(Event value, Collector<String> out) throws Exception {
            if (value.user.equals("Mary")){
                out.collect(value.user);
            } else if (value.user.equals("Bob")){
                out.collect(value.url);
            }
        }
    });
// 方式2
    public static class MyFlatMapFunction implements FlatMapFunction<Event, String>{
        @Override
        public void flatMap(Event event, Collector<String> out) throws Exception {
            if (event.user.equals("Mary")){
                out.collect(event.user);
            } else if (event.user.equals("Bob")){
                out.collect(event.url);
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

flatmap总结,当需要对数据进行扁平化操作时使用,需要实现FlatMapFunction接口,该接口需要指定输入参数的泛型,输出参数的泛型,并重写flatMap方法,输出值调用数据收集器的.collect()来添加,该方法可以不调用,也可以多次调用。因此flatmap可以实现map或者filter的功能,但不建议这样使用,方法专用能更方便维护。

聚合算子

数据的处理逻辑处理转换之外,还有一个常用的功能就是聚合,当需要求和,最大值,最小值之类的,都是需要聚合算子来处理的。但在Flink的框架中,是没有直接聚合的概念的,因为Flink处理的都是大数据,如果将所有数据都拿来聚合,就丢失了分布式处理的特点,就是数据分布处理,因为大数据中的聚合往往都是分区聚合,就是先按照不同的Key值在分区,然后再做聚合。
(1)按键分区(keyBy)
按键分区,就是将健值(key)相同的数据发送到同一个服务节点进行操作,这样下一步不管如何聚合数据都在同一个服务节点上面,更方便计算和处理。调用方法是实现KeySelector方法,传入输入数据泛型和输出数据泛型,并实现里面的getKey方法。但实际使用更多的是使用简化的方法,比如对于元组(Tuple)数据类型,可以指定字段的位置或者多个位置的组合;对于 POJO 类型,可以指定字段的名称(String);另外,还可以传入 Lambda 表达式,用于说明从数据中提取 key 的逻辑。

	// 方式1(常用),data指代Event的数据类型
	stream.keyBy(data -> data.user);
	// 方式2
	stream.keyBy(new KeySelector<Event, String>() {
          @Override
          public String getKey(Event event) throws Exception {
              return event.user;
          }
      });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

转换算子处理后的数据是DataStream类型,但聚合算子keyBy处理后的数据是keyedStream类型,这个是有区别的,因为类似于sum(),min(),max()和reduce()等聚合算法只能基于keyedStream的数据类型来处理。
keyBy聚合总结,当需要使用聚合功能时,第一步应该先按key分区聚合,再执行聚合。常用的方法不是实现KeySelector接口,而是看元素类型,如果是元组,可以按照元组的位置或对个位置指定Key;如果是POJO类型,可以直接字段名称;如果是其他,一般按照Lambda表达式指定key。
(2)简单聚合函数
按键分区之后得到一个KeyedStream流,此时就可以基于这个流,实现聚合功能,一般常用的聚合功能如下:

  • sum():在输入流上,对指定的字段做叠加求和的操作。
  • min():在输入流上,对指定的字段求最小值。
  • max():在输入流上,对指定的字段求最大值。
  • minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。
  • maxBy() :与 max() 类似, 在输入流上针对指定字段求最大值。两者区别与
    min()/minBy()完全一致。
    简单聚合的调用比较简单,只需指明需要聚合的字段即可,指定可以通过指定位置或者指定名称两种方式来指定。额外说明的是,元组中字段的名称,是以 f0、f1、f2、…来命名的,而POJO类型只能通过字段名来指定,无法通过位置来指定。
	// POJO类型通过字段来指定,先按用户分区,再聚合求最小的时间戳
   KeyedStream<Event, String> keyStream = stream.keyBy(data -> data.user);
   keyStream.minBy("timestamp").print("minBy: ");

   stream.keyBy(new KeySelector<Event, String>() {
       @Override
       public String getKey(Event event) throws Exception {
           return event.user;
       }
   }).min("timestamp").print("min: ");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

简单聚合之后返回的还是DataStream类型,所以KeyBy和聚合总是成对出现的,这样元素的数据类型保持不变,有利于下个算子的数据处理。
(3)归约聚合(reduce)
归约的含义就是把每一个新输入的数据,和当前已经归约出来的值,再做一个聚合计算。具体流程为,ReduceFunction内部会维护一个初始值为0的累加器,累加器的类型和输入元素的类型相同,当第一条元素到来时,累加器的值更新为第一条元素的值,当新的元素到来时,新元素会和累加器进行累加操作,这里的累加操作就是 reduce 函数定义的运算规则。然后将更新以后的累加器的值向下游输出。
调用方法是实现ReduceFunction 接口,传入元素的数据泛型,并实现里面的reduce方法,reduce方法的2个参数类型跟ReduceFunction 一致,表示累加器的数据类型和新数据的数据类型,reduce的返回值也是相同的数据类型。
函数之间的调用,也就是算子之间是可以使用点(.)来调用的,这里调用map().keyBy().reduce()等。

// 方式1
SingleOutputStreamOperator<Tuple2<String, Long>> clickByUser = streamFromArrayEvent2.map(
	new MapFunction<Event, Tuple2<String, Long>>() {
     @Override
     public Tuple2<String, Long> map(Event event) throws Exception {
         return Tuple2.of(event.user, 1L);
     }
 }).keyBy(data -> data.f0)
   .reduce(new ReduceFunction<Tuple2<String, Long>>() {
       @Override
       public Tuple2<String, Long> reduce(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {
           return Tuple2.of(t1.f0, t1.f1 + t2.f1);
       }
   });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

keyBy 处,因为Map的输出参数是元祖类型Tuple2<String, Long>,这里使用的是元祖的名称指定元祖的第一个元素,元祖的名称是以 f0、f1、f2、…来命名的;
ReduceFunction 处的泛型跟map的输出参数一致,传入元祖类型Tuple2<String, Long>,reduce处的t1是累加器的参数类型,t2是新元素的数据类型,每次返回就是元祖的第二个元素相加,而元祖的第一个元素保持不变。

Lambda表达式

前面需要实现的接口函数,MapFunction,filterFunction,ReduceFunction等他们都是单一抽象方法(Single Abstract Method,SAM)接口,Java8新增的Lambda表达式可以实现SAM接口,这样的话,其实简单的做法不是通过自定义函数,或者是匿名函数来实现功能,而是通过Lambda表达式来实现。

// 单一抽象方法接口源代码
public interface MapFunction<T, O> extends Function, Serializable {
    O map(T value) throws Exception;
}
public interface ReduceFunction<T> extends Function, Serializable {
    T reduce(T value1, T value2) throws Exception;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Lambda表达式示例:

	// Map 使用Lambda表达式简化
	stream.map(new MapFunction<Event, String>() {
		// 需要重写Map方法
		@Override
		public String map(Event e) throws Exception { 
			return e.user;
		}
	});
	// Lambda表达式
	stream.map(data->data.user)
	
	// Filter使用Lambda表达式简化
	stream.filter(new FilterFunction<Event>() {
		@Override
		public boolean filter(Event e) throws Exception { 
			return e.user.equals("Mary");
		}
	});
	//  Lambda表达式
	stream.filter(data->data.user.equals("Mary));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

使用Lambda表达式能更简化的编程,但不是所有情况Flink都能自动推断出类型,还记得上面说的flatMap的数据收集收集器,数据收集器的类型就无法正确推断,因此需要显示指定返回的类型,使用returns来返回:

// flatMap 使用 Lambda 表达式,必须通过 returns 明确声明返回类型
DataStream<String> stream2 = clicks.flatMap(
	(Event event, Collector<String> out) -> {
		out.collect(event.url);
	}
	).returns(Types.STRING);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

此处,(Event event, Collector out)是flatMap里面调用函数的参数,返回值为out.collect(event.url),最后需要指定返回值的参数.returns(Types.STRING);。
之后很多都会采用Lambda表达式的方式来编程,需要熟练掌握和理解,特别是算子之间的参数类型传递。

富函数类(Rich Function Classes)

所有的Flink函数类都有其Rich的版本,富函数类一般是以抽象类的形式出现的,例如:RichMapFunction、RichFilterFunction、 RichReduceFunction 等。富函数相对于之前的函数提供更多的功能,复函数可以获取运行环境的上下文,并有用一些生命周期的方法,可以实现更复杂的功能,所以当正常的函数不能实现功能时,考虑使用富函数类。
富函数典型的生命周期方法有:(每个富函数都有的,且生命周期只调用一次)

  • open()方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map()或者 filter()方法被调用之前,open()会首先被调用。所以像文件 IO 的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在 open()方法中完成。
  • close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一 些清理工作。
    上面生命周期的方法如open()和close(),对于一个并行子任务只会调用一次,而对应的,Map()方法等是针对每个数据调用一次。
    使用富函数可以获取上下文信息,程序执行的并行度,任务名称,以及状态等,这对关于状态的编程或者关于上下文的调度等都有很多的方法,富函数类也是在三层API中属于最低层的API,功能非常强大,后续还会介绍。

物理分区

上面介绍的KeyBy也算是分区的一种,因为它是将数据根据Key分散到不同的服务节点运行,这个其实也可以称为软分区。真正物理上面的分区,称之为物理分区,物理分区的策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)等。
(1)随机分区(shuffle)
最简单的分区方式,将数据随机的均匀的分配到下游任务中,因为数据是随机的,所以每次执行结果都是不一样的。

// 读取数据源,并行度为 1
DataStreamSource<Event> stream = env.addSource(new ClickSource());
// 经shuffle后打印输出 , 并行度为 4 
stream.shuffle().print("shuffle").setParallelism(4);
  • 1
  • 2
  • 3
  • 4

(2)轮询分区(Round-Robin)
轮询也是一种常见的重分区方式,简单来说就是按照先后顺序将数据做依次分发。通过调用 DataStream 的.rebalance()方法,就可以实现轮询重分区。rebalance 使用的是Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

// 读取数据源,并行度为 1
DataStreamSource<Event> stream = env.addSource(new ClickSource());
// 经轮询重分区后打印输 出 ,并行度为4 
stream.rebalance().print("rebalance").setParallelism(4);
  • 1
  • 2
  • 3
  • 4

(3)重缩放(Rescale)
重缩放分区和轮询分区非常相似,重缩放分区是调用rescale()方法,底层也是调用Round-Robin 算法进行轮询,但rescale()只会针对下游的一部分任务,而轮询分区rebalance()针对的是全部。例如上游2个分区,下游6个分区,使用rescale()时,每个分区只会发给下游3个分区的数据;但rebalance()时,会每个分区都针对下游的6个分区轮询,因此重缩放(Rescale)的分区效率要比轮询分区(Round-Robin)稍高。

	// 这里使用了上下文获取getIndexOfThisSubtask
	env.addSource(new RichParallelSourceFunction<Integer>() { 
	@Override
	public void run(SourceContext<Integer> sourceContext) throws Exception {
		for (int i = 0; i < 8; i++) {
			// 将奇数发送到索引为 1 的并行子任务
			// 将偶数发送到索引为 0 的并行子任务
			if((i+1)%2 == getRuntimeContext().getIndexOfThisSubtask()) {
				sourceContext.collect(i + 1);
			}
		}
	}

	@Override
	public void cancel() {}
	}).setParallelism(2)
	.rescale()
	.print().setParallelism(4);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

(4)广播(BroadCast)
广播就是将输入的数据复制并发送到下游算子的所有并行任务中去,当下游的服务节点开启多个Task时,只需用广播在服务节点保留一份数据即可,不同让同个节点的多个Task都保留数据,可以用广播的方式实现。

// 读取数据源,并行度为 1
DataStreamSource<Event> stream = env.addSource(new ClickSource());
// 经广播后打印输出,并行度为 4
stream. broadcast().print("broadcast").setParallelism(4);
  • 1
  • 2
  • 3
  • 4

(5)全局分区(global)
全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。不建议使用。
(6)自定义分区(Custom)
当上面的分区不能满足需求时,可以自定义分区器,通过partitionCustom()的方法来自定义分区策略。在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定, 也可以通过字段位置索引来指定,还可以实现一个KeySelector。

// 将自然数按照奇偶分区
env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
    .partitionCustom(new Partitioner<Integer>() {
        @Override
        public int partition(Integer key, int numPartitions) {
            return key % 2;
        }}, new KeySelector<Integer, Integer>() {
        @Override
        public Integer getKey(Integer value) throws Exception {
            return value;
        }
    })
    .print().setParallelism(2);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

KeySelector就是选择用来判断分区的字段,这里直接就是原数据,所以直接返回。partition就是分区的规则,返回分区的分区号,从0开始的分区号,这里直接采用取模的方式,因为代码里面的分区规则也是按照哈希值取模的。

4、输出算子(Sink)

上面有源算子,就是数据的输入,自然也会有输出算子,就是将处理后的数据输出到外部系统,或者直接存储起来。Flink专门提供了向外部系统写出的方法addSink(),之前我们一直在使用的print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。查看源码可以发现,print 方法返回的就是一个DataStreamSink。
调用方式就是调用addSink方法,实现里面的SinkFunction 接口,并在接口中重写invoke()方法,invoke()方法会在每条数据到来时都调用一次。
Flink官方目前支持的第三方系统连接器包括以下:

  • FileSystem (Hadoop included) - Streaming only(sink)
  • FileSystem (Hadoop included) - Streaming and Batch (sink)
  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Apache NiFi (source/sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • RabbitMQ (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)
  • JDBC (sink)
    其中source表示文件读取,sink表示文件写入。

(1)输出到文件
之前的输出到文件都是并行度为1,无法发挥分布式的优点,并且不能保证Flink系统的精确一次性。StreamingFileSink流式文件连接器,继承自富函数类RichSinkFunction,而且集成了 Flink 的检查点checkpoint机制,用来保证精确一次exactly once的一致性语义。其主要操作是将数据写入桶Buckets中,每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。因为桶的存储都是基于时间的,且默认每隔1小时分一个桶。
StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式,调用方法也非常简单,可以直接调用 StreamingFileSink 的静态方法:

  • 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)
  • 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)
    调用行编码或者批量编码方法时,需要传入两个参数,用来指定存储桶的基本路径
    (basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。
DataStreamSource<Event> stream = env.fromElements(
	new Event("Mary", "./home", 1000L),
	new Event("Bob", "./cart", 2000L),
	new Event("Alice", "./prod?id=100", 3000L), 
	new Event("Alice", "./prod?id=200", 3500L), 
	new Event("Bob", "./prod?id=2", 2500L),
	new Event("Alice", "./prod?id=300", 3600L), 
	new Event("Bob", "./home", 3000L),
	new Event("Bob", "./prod?id=1", 2300L),
	new Event("Bob", "./prod?id=3", 3300L));

StreamingFileSink<String> fileSink = StreamingFileSink
	.<String>forRowFormat(new Path("./output"), 
		new SimpleStringEncoder<>("UTF-8"))
		// 指定滚动策略
		.withRollingPolicy(DefaultRollingPolicy.builder()
			// 隔多长时间滚动一次,滚动周期
			.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
			// 不活动时间周期,多久时间没有数据来
			.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
			// 最大文件大小
			.withMaxPartSize(1024 * 1024 * 1024)
			.build())
		.build();
// 将 Event 转换成 String 写入文件
stream.map(Event::toString).addSink(fileSink);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

上面直接调用StreamingFileSink的静态方法StreamingFileSink.forRowFormat,写出一个行编码的文件。forRowFormat前面有一个写入数据类型的泛型,这里是String,所以后面的编码器使用的是对应的SimpleStringEncoder(简单字符串编码器)。这之后都是build的设计模式,然后采用with的方式指定策略,上面指定的是滚动策略,然后在滚动策略里面也是build的设计模式,同样可以指定滚动策略的策略规则。

  • withRolloverInterval 滚动周期,多久滚动一次,参数是毫秒
  • withInactivityInterval 不活动周期,超过多长时间没有数据到来,也会滚动,参数也是毫秒
  • withMaxPartSize 最大文件大小,设置1G为文件滚动的大小,超过1G文件就会滚动

(2)输出到Kafka
Kafka本身是一个分布式的基于发布/订阅的消息系统,本身也是流式数据,跟Flink也是天生一对,所以实际情况中,很多会使用Kafka跟Flink的配合使用。因为Kafka跟Flink配合,能够提供端到端的精确一次性保证,这在实际项目中是最高级别的一致性保证。
输出到kafka的一般步骤如下:

  • 添加Kafka 连接器依赖
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
	<version>${flink.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 启动Kafka 集群
  • 编写代码将数据插入到kafka当中
// 开启和连接Kafka
Properties properties = new Properties(); 
properties.put("bootstrap.servers", "hadoop102:9092");
// 本地新建一个input/clicks.csv 文件,写入数据
DataStreamSource<String> stream = env.readTextFile("input/clicks.csv");
// 读取文件,写入到kafka当中
stream.addSink(new FlinkKafkaProducer<String>( 
	"clicks",
	new SimpleStringSchema(), 
	properties
));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 运行代码,在linux主机启动一个消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic clicks
  • 1

这里大概了解kafka的流程即可,kafka在实际提交的过程中是两阶段提交的,为了确保Flink的精确一次性,这个在后面会再详细展开。
(3)输出到Redis
Redis 是一个开源的内存式的数据存储,提供了像字符串string、哈希表hash、列表list、集合set、排序集合sorted set、位图bitmap、地理索引和流stream等一系列常用的数据结构。因为它运行速度快、支持的数据类型丰富,在实际项目中已经成为了架构优化必不可少的一员,一般用作数据库、缓存,也可以作为消息代理。
具体的步骤跟kafaka类似:

  • 导入的Redis 连接器依赖
<dependency>
	<groupId>org.apache.bahir</groupId>
	<artifactId>flink-connector-redis_2.11</artifactId>
	<version>1.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 启动Redis 集群
  • 编写输出到Redis 的示例代码
public class SinkToRedisTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> stream = env.addSource(new ClickSource());
        // 创建一个到redis连接的配置
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop102")
                .build();
        stream.addSink(new RedisSink<Event>(conf, new MyRedisMapper()));
        env.execute();
    }
    public static class MyRedisMapper implements RedisMapper<Event> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "clicks");
        }
        @Override
        public String getKeyFromData(Event data) {
            return data.user;
        }
        @Override
        public String getValueFromData(Event data) {
            return data.url;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 运行代码,查看Redis是否收到数据
redis-cli
hgetall clicks
  • 1
  • 2

这里RedisSink 的构造方法需要传入两个参数:

  • JFlinkJedisConfigBase:Jedis 的连接配置
  • RedisMapper:Redis 映射类接口,说明怎样将数据转换成可以写入Redis 的类型
    这里保持到Redis的数据格式为HSET,就是保存为哈希表hash,表名为clicks,保存的数据key为user,保存的值value为url,每来一条数据都会做一次转换保存一次。注意,多条相同的数据只会保存一条,其他的会覆盖前面,因为hash表的key是唯一的。

(4)输出到Elasticsearch
ElasticSearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据。ElasticSearch 有着简洁的REST 风格的API,以良好的分布式特性、速度和可扩展性而闻名,在大数据领域应用非常广泛。
步骤跟上面类似:

  • 添加Elasticsearch 连接器依赖
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactI d>
	<version>${flink.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 启动Elasticsearch 集群
  • 编写输出到Elasticsearch 的代码
public class SinkToEsTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 3600L),
                new Event("Bob", "./home", 3000L),
                new Event("Bob", "./prod?id=1", 2300L),
                new Event("Bob", "./prod?id=3", 3300L));
        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("hadoop102", 9200, "http"));
        // 创建一个ElasticsearchSinkFunction
        ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {
            @Override
            public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {
                HashMap<String, String> data = new HashMap<>();
                data.put(element.user, element.url);
                IndexRequest request = Requests.indexRequest()
                        .index("clicks")
                        .type("type")    // Es 6 必须定义 type
                        .source(data);
                indexer.add(request);
            }
        };
        stream.addSink(new ElasticsearchSink.Builder<Event>(httpHosts, elasticsearchSinkFunction).build());
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

Builder 的构造方法中又有两个参数:

  • httpHosts:连接到的Elasticsearch 集群主机列表
  • elasticsearchSinkFunction:准备数据向 Elasticsearch 发送请求的函数
    具体的操作需要重写中 elasticsearchSinkFunction 中的 process 方法,我们可以将要发送的数据放在一个HashMap 中,包装成 IndexRequest 向外部发送HTTP 请求。

(5)输出到MySQL(JDBC)
一些应用或业务相关的数据,需要有导出到数据库的需求,提供给其他应用使用。
步骤如下:

  • 添加依赖
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>5.1.47</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 启动 MySQL,在database 库下建表 clicks
mysql> create table clicks(
-> user varchar(20) not null,
-> url varchar(100) not null);
  • 1
  • 2
  • 3
  • 编写输出到 MySQL 的代码
stream.addSink(
    JdbcSink.sink(
    	// 插入语句
         "INSERT INTO clicks (user, url) VALUES (?, ?)",
         (statement, r) -> {
             statement.setString(1, r.user);
             statement.setString(2, r.url);
         },
         new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                 .withUrl("jdbc:mysql://localhost:3306/test") //JDBC连接
                 .withDriverName("com.mysql.jdbc.Driver")	//JDBC连接驱动
                 .withUsername("root")	//	用户名
                 .withPassword("root")	//	密码
                 .build()
    )
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 运行代码,用客户端连接 MySQL,查看是否成功写入数据。
mysql> select * from clicks;
  • 1

(6)用户自定义Sink输出
Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction 抽象类,只要实现它,通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外部存储。因为富函数类,所有可以将连接和关闭的功能分别放在 open()方法和 close()方法中。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/413143
推荐阅读
相关标签
  

闽ICP备14008679号