当前位置:   article > 正文

10.大数据技术之Flink_大数据flink

大数据flink

文章目录

1、Flink 简介

1.1 Flink 的引入

​ 这几年大数据的飞速发展,出现了很多热门的开源社区,其中著名的有Hadoop、Storm,以及后来的Spark,他们都有着各自专注的应用场景。Spark 掀开了内存计算的先河,也以内存为赌注,赢得了内存计算的飞速发展。Spark 的火热或多或少的掩盖了其他分布式计算的系统身影。就像Flink,也就在这个时候默默的发展着。

​ 在国外一些社区,有很多人将大数据的计算引擎分成了4 代,当然,也有很多人不会认同。我们先姑且这么认为和讨论。

​ 首先第一代的计算引擎,无疑就是Hadoop 承载的MapReduce。这里大家应该都不会对MapReduce 陌生,它将计算分为两个阶段,分别为Map 和Reduce。对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个Job 的串联,以完成一个完整的算法,例如迭代计算。

​ 由于这样的弊端,催生了支持DAG 框架的产生。因此,支持DAG 的框架被划分为第二代计算引擎。如Tez 以及更上层的Oozie。这里我们不去细究各种DAG 实现之间的区别,不过对于当时的Tez 和Oozie 来说,大多还是批处理的任务。

​ 接下来就是以Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是Job 内部的DAG 支持(不跨越Job),以及强调的实时计算。在这里,很多人也会认为第三代计算引擎也能够很好的运行批处理的Job。

​ 随着第三代计算引擎的出现,促进了上层应用快速发展,例如各种迭代计算的性能以及对流计算和SQL 等的支持。Flink 的诞生就被归在了第四代。这应该主要表现在Flink 对流计算的支持,以及更一步的实时性上面。当然Flink 也可以支持Batch 的任务,以及DAG 的运算。

​ 首先,我们可以通过下面的性能测试初步了解两个框架的性能区别,它们都可以基于内存计算框架进行实时计算,所以都拥有非常好的计算性能。经过测试,Flink 计算性能上略好。

1.2 什么是Flink

​ Flink 起源于Stratosphere 项目,Stratosphere 是在2010~2014 年由3 所地处柏林的大学和欧洲的一些其他的大学共同进行的研究项目,2014 年4 月Stratosphere 的代码被复制并捐赠给了Apache 软件基金会,参加这个孵化项目的初始成员是Stratosphere 系统的核心开发人员,2014 年12 月,Flink 一跃成为Apache 软件基金会的顶级项目。

​ 在德语中,Flink 一词表示快速和灵巧,项目采用一只松鼠的彩色图案作为logo,这不仅是因为松鼠具有快速和灵巧的特点,还因为柏林的松鼠有一种迷人的红棕色,而Flink 的松鼠logo拥有可爱的尾巴,尾巴的颜色与Apache 软件基金会的logo 颜色相呼应,也就是说,这是一只Apache 风格的松鼠。

在这里插入图片描述

​ Flink 主页在其顶部展示了该项目的理念:“Apache Flink 是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。

​ Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

1.3 Flink 流处理特性

  1. 支持高吞吐、低延迟、高性能的流处理

  2. 支持带有事件时间的窗口(Window)操作

  3. 支持有状态计算的Exactly-once 语义

  4. 支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作

  5. 支持具有Backpressure 功能的持续流模型

  6. 支持基于轻量级分布式快照(Snapshot)实现的容错

  7. 一个运行时同时支持Batch on Streaming 处理和Streaming 处理

  8. Flink 在JVM 内部实现了自己的内存管理

  9. 支持迭代计算

  10. 支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存

1.4 Flink 基石

​ Flink 之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window。

​ 首先是Checkpoint 机制,这是Flink 最重要的一个特性。Flink 基于Chandy-Lamport 算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。Chandy-Lamport 算法实际上在1985 年的时候已经被提出来,但并没有被很广泛的应用,而Flink 则把这个算法发扬光大了。Spark 最近在实现Continue streaming,Continue streaming 的目的是为了降低它处理的延时,其也需要提供这种一致性的语义,最终采用Chandy-Lamport 这个算法,说明Chandy-Lamport 算法在业界得到了一定的肯定。

​ 提供了一致性的语义之后,Flink 为了让用户在编程时能够更轻松、更容易地去管理状态,还提供了一套非常简单明了的State API,包括里面的有ValueState、ListState、MapState,近期添加了BroadcastState,使用State API 能够自动享受到这种一致性的语义。除此之外,Flink 还实现了Watermark 的机制,能够支持基于事件的时间的处理,或者说基于系统时间的处理,能够容忍数据的延时、容忍数据的迟到、容忍乱序的数据。另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink 提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。

在这里插入图片描述

1.5 批处理与流处理

​ 批处理的特点是有界、持久、大量,批处理非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。流处理的特点是无界、实时,流处理方式无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。在Spark 生态体系中,对于批处理和流处理采用了不同的技术框架,批处理由SparkSQL 实现,流处理由Spark Streaming 实现,这也是大部分框架采用的策略,使用独立的处理器实现批处理和流处理,而Flink 可以同时实现批处理和流处理。

​ Flink 是如何同时实现批处理与流处理的呢?答案是,Flink 将批处理(即处理有限的静态数据)视作一种特殊的流处理。

​ Flink 的核心计算架构是下图中的Flink Runtime 执行引擎,它是一个分布式系统,能够接受数据流程序并在一台或多台机器上以容错方式执行。

​ Flink Runtime 执行引擎可以作为YARN(Yet Another Resource Negotiator)的应用程序在集群上运行,也可以在Mesos 集群上运行,还可以在单机上运行(这对于调试Flink 应用程序来说非常有用)。

在这里插入图片描述

​ 上图为Flink 技术栈的核心组成部分,值得一提的是,Flink 分别提供了面向流式处理的接口(DataStream API)和面向批处理的接口(DataSet API)。因此,Flink 既可以完成流处理,也可以完成批处理。Flink 支持的拓展库涉及机器学习(FlinkML)、复杂事件处理(CEP)、以及图计算(Gelly),还有分别针对流处理和批处理的Table API。能被Flink Runtime 执行引擎接受的程序很强大,但是这样的程序有着冗长的代码,编写起来也很费力,基于这个原因,Flink 提供了封装在Runtime 执行引擎之上的API,以帮助用户方便地生成流式计算程序。Flink 提供了用于流处理的DataStream API 和用于批处理的DataSetAPI。值得注意的是,尽管Flink Runtime 执行引擎是基于流处理的,但是DataSet API 先于DataStream API 被开发出来,这是因为工业界对无限流处理的需求在Flink 诞生之初并不大。

​ DataStream API 可以流畅地分析无限数据流,并且可以用Java 或者Scala 来实现。开发人员需要基于一个叫DataStream 的数据结构来开发,这个数据结构用于表示永不停止的分布式数据流。

​ Flink 的分布式特点体现在它能够在成百上千台机器上运行,它将大型的计算任务分成许多小的部分,每个机器执行一部分。Flink 能够自动地确保发生机器故障或者其他错误时计算能够持续进行,或者在修复bug 或进行版本升级后有计划地再执行一次。这种能力使得开发人员不需要担心运行失败。Flink 本质上使用容错性数据流,这使得开发人员可以分析持续生成且永远不结束的数据(即流处理)。

2、Flink 架构体系

2.1 Flink 中的重要角⾊

在这里插入图片描述

JobManager 处理器:

​ 也称之为Master,用于协调分布式执行,它们用来调度task,协调检查点,协调失败时恢复等。Flink 运行时至少存在一个master 处理器,如果配置高可用模式则会存在多个master 处理器,它们其中有一个是leader,而其他的都是standby。

TaskManager 处理器:

​ 也称之为Worker,用于执行一个dataflow 的task(或者特殊的subtask)、数据缓冲和datastream 的交换,Flink 运行时至少会存在一个worker 处理器。

2.2 无界数据流与有界数据流

无界数据流:

​ 无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性。

有界数据流:

​ 有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理。

在这里插入图片描述

​ Apache Flink 是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink 运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能。现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为它们要实现的目标是完全不相同的:流处理一般需要支持低延迟、Exactly-once 保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。例如,实现批处理的开源方案有MapReduce、Tez、Crunch、Spark,实现流处理的开源方案有Samza、Storm。

​ Flink 在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink 是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。基于同一个Flink 运行时(Flink Runtime),分别提供了流处理和批处理API,而这两种API 也是实现上层面向流处理、批处理类型应用框架的基础。

2.3 Flink 数据流编程模型

​ Flink 提供了不同的抽象级别以开发流式或批处理应用。

在这里插入图片描述

​ 最底层级的抽象仅仅提供了有状态流,它将通过过程函数(Process Function)被嵌入到DataStream API 中。底层过程函数(Process Function) 与DataStream API 相集成,使其可以对某些特定的操作进行底层的抽象,它允许用户可以自由地处理来自一个或多个数据流的事件,并使用一致的容错的状态。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。

​ 实际上,大多数应用并不需要上述的底层抽象,而是针对核心API(Core APIs) 进行编程,比如DataStream API(有界或无界流数据)以及DataSet API(有界数据集)。这些API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。这些API 处理的数据类型以类(classes)的形式由各自的编程语言所表示。

​ Table API 是以表为中心的声明式编程,其中表可能会动态变化(在表达流数据时)。Table API 遵循(扩展的)关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时API 提供可比较的操作,例如select、project、join、group-by、aggregate 等。Table API程序声明式地定义了什么逻辑操作应该执行,而不是准确地确定这些操作代码的看上去如何。尽管Table API 可以通过多种类型的用户自定义函数(UDF)进行扩展,其仍不如核心API 更具表达能力,但是使用起来却更加简洁(代码量更少)。除此之外,Table API 程序在执行之前会经过内置优化器进行优化。

​ 你可以在表与DataStream/DataSet 之间无缝切换, 以允许程序将Table API 与DataStream 以及DataSet 混合使用。

​ Flink 提供的最高层级的抽象是SQL 。这一层抽象在语法与表达能力上与Table API 类似,但是是以SQL 查询表达式的形式表现程序。SQL 抽象与Table API 交互密切,同时SQL 查询可以直接在Table API 定义的表上执行。

SparkFlink
RDD/DataFrame/DStreamDataSet/Table/DataStream
TransformationTransformation
ActionSink
TasksubTask
PipelineOprator chains
DAGDataFlow Graph
Master + DriverJobManager
Worker + ExecutorTaskManager

2.4 Libraries 支持

支持机器学习(FlinkML)

支持图分析(Gelly)

支持关系数据处理(Table)

支持复杂事件处理(CEP)

3、Flink 集群搭建

Flink 支持多种安装模式。

  1. local(本地)——单机模式,一般不使用
  2. standalone——独立模式,Flink 自带集群,开发测试环境使用
  3. yarn——计算资源统一由Hadoop YARN 管理,生产环境测试

3.1 standalone 集群环境

3.1.1 准备工作

  1. jdk1.8 及以上【配置JAVA_HOME 环境变量】
  2. ssh 免密码登录【集群内节点之间免密登录】

3.1.2 下载安装包

http://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.1-bin-scala_2.12.tgz

3.1.3 集群规划

服务器配置如下:

master(JobManager)+slave/worker(TaskManager)

hadoop01(master+slave) hadoop02(slave) hadoop03(slave)

3.1.4 步骤

  1. 解压Flink 压缩包到指定目录

  2. 配置Flink

  3. 配置Slaves 节点

  4. 分发Flink 到各个节点

  5. 启动集群

  6. 递交wordcount 程序测试

  7. 查看Flink WebUI

3.1.5 具体操作

  1. 上传Flink 压缩包到指定目录
  2. 解压缩flink 到/opt/servers 目录
cd /opt/servers
tar -xvzf flink-1.10.1-bin-scala_2.12.tgz -C ../servers/
  • 1
  • 2
  1. 修改安装目录下conf 文件夹内的flink-conf.yaml 配置文件,指定JobManager
cd /opt/servers/flink-1.10.1/conf/
  • 1

创建目录

mkdir -p /opt/servers/flink-1.10.1/tmp
  • 1

修改配置文件:flink-conf.yaml

#配置Master 的机器名(IP 地址)

jobmanager.rpc.address: hadoop01

#配置每个taskmanager 生成的临时文件夹

taskmanager.tmp.dirs: /opt/servers/flink-1.10.1/tmp
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  1. 修改安装目录下conf 文件夹内的slave 配置文件,指定TaskManager
hadoop01
hadoop02
hadoop03
  • 1
  • 2
  • 3
  1. 使用vi 修改/etc/profile 系统环境变量配置文件,添加HADOOP_CONF_DIR 目录
export HADOOP_CONF_DIR=/opt/servers/hadoop-2.7.7/etc/hadoop
  • 1

YARN_CONF_DIR 或者HADOOP_CONF_DIR 必须将环境变量设置为读取YARN 和HDFS 配置

新版本需要增加hadoop的附加组件,下载一个jar包放在Flink的lib目录下

下载地址:https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IGg12yvt-1644828219134)(…/…/%E5%A4%A7%E6%95%B0%E6%8D%AE6%E9%98%B6%E6%AE%B5/%E6%95%99%E5%AD%A6%E7%AC%94%E8%AE%B0/day10_Flink01.assets/1627891293772.png)]

  1. 分发/etc/profile 到其他两个节点
scp -r /etc/profile hadoop02:/etc
scp -r /etc/profile hadoop03:/etc
  • 1
  • 2
  1. 每个节点重新加载环境变量
source /etc/profile
  • 1
  1. 将配置好的Flink 目录分发给其他的两台节点
cd /opt/servers
scp -r flink-1.10.1/ hadoop02:$PWD
scp -r flink-1.10.1/ hadoop03:$PWD

  • 1
  • 2
  • 3
  • 4
  1. 启动Flink 集群
 cd /opt/servers/flink-1.10.1
 bin/start-cluster.sh

  • 1
  • 2
  • 3
  1. 通过jps 查看进程信息
--------------------- hadoop01 ----------------
86583 Jps
85963 StandaloneSessionClusterEntrypoint
86446 TaskManagerRunner
--------------------- hadoop03 ----------------
44099 Jps
43819 TaskManagerRunner
--------------------- hadoop03 ----------------
29461 TaskManagerRunner
29678 Jps

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  1. 启动HDFS 集群
start-all.sh

  • 1
  • 2
  1. 在HDFS 中创建/test/input 目录
 hadoop fs -mkdir -p /test/input

  • 1
  • 2
  1. 上传wordcount.txt 文件到HDFS /test/input 目录
 hadoop fs -put /root/wordcount.txt /test/input

  • 1
  • 2
  1. 并运行测试任务
bin/flink run examples/batch/WordCount.jar --input hdfs://hadoop01:8020/test/input/wordcount --output hdfs://hadoop01:8020/test/output/001

  • 1
  • 2
  1. 浏览Flink Web UI 界面
http://hadoop01:8081
  • 1

在这里插入图片描述

16)启动/停止flink 集群

启动:bin/start-cluster.sh
停止:bin/stop-cluster.sh

  • 1
  • 2
  • 3

17) Flink 集群的重启或扩容

启动/停止jobmanager
如果集群中的jobmanager 进程挂了,执行下面命令启动

bin/jobmanager.sh start
bin/jobmanager.sh stop

  • 1
  • 2
  • 3

启动/停止taskmanager
添加新的taskmanager 节点或者重启taskmanager 节点

bin/taskmanager.sh start
bin/taskmanager.sh stop

  • 1
  • 2
  • 3

3.1.6Standalone 集群架构

在这里插入图片描述

  • client 客户端提交任务给JobManager
  • JobManager 负责Flink 集群计算资源管理,并分发任务给TaskManager 执行
  • TaskManager 定期向JobManager 汇报状态

3.2 高可用HA 模式

​ 从上述架构图中,可发现JobManager 存在单点故障,一旦JobManager 出现意外,整个集群无法工作。所以,为了确保集群的高可用,需要搭建Flink 的HA。(如果是部署在YARN 上,部署YARN 的HA),我们这里演示如何搭建Standalone 模式HA。

3.2.1 HA 架构图

在这里插入图片描述

3.2.2 集群规划

master(JobManager)+slave/worker(TaskManager)
hadoop01(master+slave) hadoop02(master+slave) hadoop03(slave)

3.2.3 步骤

  1. 在flink-conf.yaml 中添加zookeeper 配置

  2. 将配置过的HA 的flink-conf.yaml 分发到另外两个节点

  3. 分别到另外两个节点中修改flink-conf.yaml 中的配置

  4. 在masters 配置文件中添加多个节点

  5. 分发masters 配置文件到另外两个节点

  6. 启动zookeeper 集群

  7. 启动flink 集群

3.2.4 具体操作

  1. 在flink-conf.yaml 中添加zookeeper 配置
#开启HA,使用文件系统作为快照存储
state.backend: filesystem
#默认为none,用于指定checkpoint 的data files 和meta data 存储的目录
state.checkpoints.dir: hdfs://hadoop01:8020/flink-checkpoints
#默认为none,用于指定savepoints 的默认目录
state.savepoints.dir: hdfs://hadoop01:8020/flink-checkpoints
#使用zookeeper 搭建高可用
high-availability: zookeeper
# 存储JobManager 的元数据到HDFS,用来恢复JobManager 所需的所有元数据
high-availability.storageDir: hdfs://hadoop01:8020/flink/ha/
high-availability.zookeeper.quorum: hadoop01:2181,hadoop02:2181,hadoop03:2181

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  1. 将配置过的HA 的flink-conf.yaml 分发到另外两个节点
cd /opt/servers/flink-1.10.1/conf/
scp -r flink-conf.yaml hadoop02:$PWD
scp -r flink-conf.yaml hadoop03:$PWD

  • 1
  • 2
  • 3
  • 4
  1. 到节点2 中修改flink-conf.yaml 中的配置,将JobManager 设置为自己节点的名称
jobmanager.rpc.address: hadoop02

  • 1
  • 2
  1. 在masters 配置文件中添加多个节点
hadoop01:8081
hadoop02:8081

  • 1
  • 2
  • 3
  1. 分发masters 配置文件到另外两个节点
cd /opt/servers/flink-1.10.1/conf/
scp -r masters hadoop02:$PWD
scp -r masters hadoop03:$PWD

  • 1
  • 2
  • 3
  • 4
  1. 启动zookeeper 集群

三个节点启动zookeeper集群

bin/zkServer.sh start

  • 1
  • 2
  1. 启动HDFS 集群
start-all.sh

  • 1
  • 2
  1. 启动flink 集群
 bin/start-cluster.sh

  • 1
  • 2
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host hadoop01.
Starting standalonesession daemon on host hadoop02.
Starting taskexecutor daemon on host hadoop01.
Starting taskexecutor daemon on host hadoop02.
Starting taskexecutor daemon on host hadoop03.

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  1. 分别查看两个节点的Flink Web UI
http://hadoop01:8081
http://hadoop02:8081

  • 1
  • 2
  • 3
  1. kill 掉一个节点,查看另外的一个节点的Web UI
    注意事项
切记搭建HA,需要将第二个节点的jobmanager.rpc.address 修改为hadoop02

  • 1
  • 2

3.3 yarn 集群环境

​ 在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的Workload。因此Flink 也支持在Yarn 上面运行;
​ flink on yarn 的前提是:hdfs、yarn 均启动

3.3.1 准备工作

  1. jdk1.8 及以上【配置JAVA_HOME 环境变量】

  2. ssh 免密码登录【集群内节点之间免密登录】

  3. 至少hadoop2.3

  4. hdfs & yarn

3.3.2 集群规划

3.3.3 修改hadoop 的配置参数

vim etc/hadoop/yarn-site.xml

  • 1
  • 2

添加:

<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

  • 1
  • 2
  • 3
  • 4
  • 5

​ 是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。
​ 在这里面我们需要关闭,因为对于flink 使用yarn 模式下,很容易内存超标,这个时候yarn会自动杀掉job

3.3.4 修改全局变量/etc/profile

添加:

export HADOOP_CONF_DIR=/export/servers/hadoop-2.7.7/etc/Hadoop

  • 1
  • 2

YARN_CONF_DIR 或者HADOOP_CONF_DIR 必须将环境变量设置为读取YARN 和HDFS 配置

新版本需要增加hadoop的附加组件,下载一个jar包放在Flink的lib目录下

下载地址:https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/

3.3.5 Flink on Yarn 的运行机制
在这里插入图片描述

​ 从图中可以看出,Yarn 的客户端需要获取hadoop 的配置信息,连接Yarn 的ResourceManager。
​ 所以要有设置有YARN_CONF_DIR 或者HADOOP_CONF_DIR 或者HADOOP_CONF_PATH,只要设置了其中一个环境变量,就会被读取。如果读取上述的变量失败了,那么将会选择hadoop_home 的环境变量,都区成功将会尝试加载$HADOOP_HOME/etc/hadoop 的配置文件。
​ 1、当启动一个Flink Yarn 会话时,客户端首先会检查本次请求的资源是否足够。资源足将会上传包含HDFS 配置信息和Flink 的jar 包到HDFS。
​ 2、随后客户端会向Yarn 发起请求,启动applicationMaster,随后NodeManager 将会加载有配置信息和jar 包,一旦完成,ApplicationMaster(AM)便启动。
​ 3、当JobManager and AM 成功启动时,他们都属于同一个container,从而AM 就能检索到JobManager 的地址。此时会生成新的Flink 配置信息以便TaskManagers 能够连接到JobManager。
​ 同时,AM 也提供Flink 的WEB 接口。用户可并行执行多个Flink 会话。
​ 4、随后,AM 将会开始为分发从HDFS 中下载的jar 以及配置文件的container 给TaskMangers.完成后Fink 就完全启动并等待接收提交的job.

3.3.6 Flink on Yarn 的两种使用方式

yarn-session 提供两种模式

  1. 会话模式
    使用Flink 中的yarn-session ( yarn 客户端) , 会启动两个必要服务JobManager 和TaskManagers

客户端通过yarn-session 提交作业

yarn-session 会一直启动,不停地接收客户端提交的作用

有大量的小作业,适合使用这种方式

在这里插入图片描述

  1. 分离模式

直接提交任务给YARN

大作业,适合使用这种方式

在这里插入图片描述

3.3.6.1 第一种方式:YARN session

yarn-session.sh(开辟资源)+flink run(提交任务)

这种模式下会启动yarn session,并且会启动Flink 的两个必要服务:JobManager 和Task-managers,然后你可以向集群提交作业。同一个Session 中可以提交多个Flink 作业。需要注意的是,这种模式下Hadoop 的版本至少是2.2,而且必须安装了HDFS(因为启动YARN session的时候会向HDFS 上提交相关的jar 文件和配置文件)

通过./bin/yarn-session.sh 脚本启动YARN Session

脚本可以携带的参数:

Usage:
Required
-n,--container <arg> 分配多少个yarn 容器(=taskmanager 的数量)
Optional
-D <arg> 动态属性
-d,--detached 独立运行(以分离模式运行作业,不启动客户端进程,不打印YARN返回信息)
-id,--applicationId <arg> YARN 集群上的任务id,附着到一个后台运行的yarn
session 中
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> JobManager 的内存[in MB]
-m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址
使用这个参数可以指定一个不同于配置文件中的jobmanager
-n,--container <arg> 分配多少个yarn 容器(=taskmanager 的数量)
-nm,--name <arg> 在YARN 上为一个自定义的应用设置一个名字
-q,--query 显示yarn 中可用的资源(内存, cpu 核数)
-qu,--queue <arg> 指定YARN 队列
-s,--slots <arg> 每个TaskManager 使用的slots 数量
-st,--streaming 在流模式下启动Flink
-tm,--taskManagerMemory <arg> 每个TaskManager 的内存[in MB]
-z,--zookeeperNamespace <arg> 针对HA 模式在zookeeper 上创建NameSpace

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

注意:
如果不想让Flink YARN 客户端始终运行,那么也可以启动分离的YARN 会话。该参数被称为-d 或–detached。

启动:
bin/yarn-session.sh -n 2 -tm 800 -s 1 -d

  • 1
  • 2

​ 上面的命令的意思是, 同时向Yarn 申请3 个container( 即便只申请了两个, 因为ApplicationMaster 和Job Manager 有一个额外的容器。一旦将Flink 部署到YARN 群集中,它就会显示Job Manager 的连接详细信息),其中2 个Container 启动TaskManager(-n 2),每个TaskManager 拥有两个Task Slot(-s 2),并且向每个TaskManager 的Container 申请800M 的内存,以及一个ApplicationMaster(Job Manager)。

启动成功之后,控制台显示:

在这里插入图片描述

去yarn 页面:ip:8088 可以查看当前提交的flink session

在这里插入图片描述

点击ApplicationMaster 进入任务页面:

在这里插入图片描述

上面的页面就是使用:yarn-session.sh 提交后的任务页面;

使用flink 提交任务
bin/flink run examples/batch/WordCount.jar

  • 1
  • 2

在控制台中可以看到wordCount.jar 计算出来的任务结果;

在这里插入图片描述

在yarn-session.sh 提交后的任务页面中也可以观察到当前提交的任务:

在这里插入图片描述

停止当前任务:
yarn application -kill application_1527077715040_0007

  • 1
  • 2
3.3.6.2 第二种方式:在YARN 上运行一个Flink 作业

​ 上面的YARN session 是在Hadoop YARN 环境下启动一个Flink cluster 集群,里面的资源是可以共享给其他的Flink 作业。我们还可以在YARN 上启动一个Flink 作业,这里我们还是使用./bin/flink,但是不需要事先启动YARN session:

使用flink 直接提交任务
bin/flink run -m yarn-cluster examples/batch/WordCount.jar

  • 1
  • 2

在8088 页面观察:

在这里插入图片描述

停止yarn-cluster
yarn application -kill application 的ID

  • 1
  • 2

4、Flink 运行架构

4.1 任务提交流程

在这里插入图片描述

​ Flink 任务提交后,Client 向 HDFS 上传 Flink 的Jar 包和配置,之后向Yarn ResourceManager 提交任务, ResourceManager 分配Container 资源并通知对应的 NodeManager 启动 ApplicationMaster,ApplicationMaster 启动后加载 Flink 的 Jar 包和配置构建环境,然后启动 JobManager , 之后ApplicationMaster 向ResourceManager 申请资源启动TaskManager ,ResourceManager 分配Container 资源后, 由ApplicationMaster 通知资源所在节点的NodeManager 启动TaskManager, NodeManager 加载Flink 的Jar 包和配置构建环境并启动TaskManager,TaskManager 启动后向JobManager 发送心跳包,并等待JobManager 向其分配任务。

4.2 任务调度原理

在这里插入图片描述

​ 客户端不是运行时和程序执行的一部分,但它用于准备并发送dataflow 给Master,然后,客户端断开连接或者维持连接以等待接收计算结果,客户端可以以两种方式运行:要么作为Java/Scala 程序的一部分被程序触发执行,要么以命令行./bin/flink run 的方式执行。

4.3 Worker 与Slots

​ 每一个worker(TaskManager)是一个JVM 进程,它可能会在独立的线程上执行一个或多个subtask。为了控制一个worker 能接收多少个task,worker 通过task slot 来进行控制(一个worker 至少有一个task slot)。每个task slot 表示TaskManager 拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot 化意味着一个subtask将不需要跟来自其他job 的subtask 竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU 的隔离,slot 目前仅仅用来隔离task 的受管理的内存。
​ 通过调整task slot 的数量, 允许用户定义subtask 之间如何互相隔离。如果一个TaskManager 一个slot,那将意味着每个task group 运行在独立的JVM 中(该JVM 可能是通过一个特定的容器启动的),而一TaskManager 多个slot 意味着更多的subtask 可以共享同一个JVM。而在同一个JVM 进程中的task 将共享TCP 连接(基于多路复用)和心跳消息。它们也可能共享数据集和数据结构,因此这减少了每个task 的负载。
在这里插入图片描述

​ Task Slot 是静态的概念, 是指TaskManager 具有的并发执行能力, 可以通过参数taskmanager.numberOfTaskSlots 进行配置,而并行度parallelism 是动态概念,即TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default 进行配置。也就是说,假设一共有3 个TaskManager,每一个TaskManager 中的分配3 个TaskSlot,也就是每个TaskManager 可以接收3 个task , 一共9 个TaskSlot , 如果我们设置parallelism.default=1,即运行程序默认的并行度为1,9 个TaskSlot 只用了1 个,有8 个空闲,因此,设置合适的并行度才能提高效率。

4.4 程序与数据流

​ Flink 程序的基础构建模块是流(streams) 与转换(transformations)(需要注意的是,Flink 的DataSet API 所使用的DataSets 其内部也是stream)。一个stream 可以看成一个中间结果,而一个transformations 是以一个或多个stream 作为输入的某种operation,该operation利用这些stream 进行计算从而产生一个或多个result stream。
​ 在运行时,Flink 上运行的程序会被映射成streaming dataflows,它包含了streams 和transformations operators。每一个dataflow 以一个或多个sources 开始以一个或多个sinks结束。dataflow 类似于任意的有向无环图(DAG),当然特定形式的环可以通过iteration 构建。在大部分情况下,程序中的transformations 跟dataflow 中的operator 是一一对应的关系,但有时候,一个transformation 可能对应多个operator。

在这里插入图片描述

4.5 并行数据流

​ Flink 程序的执行具有并行、分布式的特性。在执行过程中,一个stream 包含一个或多个stream partition ,而每一个operator 包含一个或多个operator subtask,这些operator subtasks 在不同的线程、不同的物理机或不同的容器中彼此互不依赖得执行。
​ 一个特定operator 的subtask 的个数被称之为其parallelism(并行度)。一个stream 的并行度总是等同于其producing operator 的并行度。一个程序中,不同的operator 可能具有不同的并行度。
在这里插入图片描述

​ Stream 在operator 之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing 的模式,具体是哪一种形式,取决于operator 的种类。

​ One-to-one:stream(比如在source 和map operator 之间)维护着分区以及元素的顺序。那意味着map operator 的subtask 看到的元素的个数以及顺序跟source operator 的subtask 生产的元素的个数、顺序相同,map、fliter、flatMap 等算子都是one-to-one 的对应关系。

​ Redistributing:stream(map()跟keyBy/window 之间或者keyBy/window 跟sink 之间)的分区会发生改变。每一个operator subtask 依据所选择的transformation 发送数据到不同的目标subtask。例如,keyBy() 基于hashCode 重分区、broadcast 和rebalance 会随机重新分区,这些算子都会引起redistribute 过程,而redistribute 过程就类似于Spark 中的shuffle 过程。

4.6 task 与operator chains

​ 出于分布式执行的目的,Flink 将operator 的subtask 链接在一起形成task,每个task 在一个线程中执行。将operators 链接成task 是非常有效的优化:它能减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。链接的行为可以在编程API 中进行指定。

​ 下面这幅图,展示了5 个subtask 以5 个并行的线程来执行:

在这里插入图片描述

在这里插入图片描述

5、DataSet API 开发

5.1 案例

5.1.1 Flink 批处理程序的一般流程

  1. 获取Flink 批处理执行环境

  2. 构建source

  3. 数据处理

  4. 构建sink

5.1.2 JAVA示例

导入Flink 所需的Maven 依赖

		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
         <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

使用java编写Flink 程序,用来统计单词的数量。

 
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
 
//批处理指的是离线数据的处理
public class BatchWordCount {

    public static void main(String[] args) throws Exception {

        /**
         * 1.读取外部文件,word.txt,词频统计
         */
        //1.创建批处理运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2.创建数据源 source
        //readTextFile用于读取外部文件,既可以是本地文件,也可以是hdfs
        DataSource<String> source = env.readTextFile("hdfs://hadoop01:8020/test/input/word.txt");

        //3.数据处理过程, 处理过程中会调用dataset的方法,这些方法称为算子,也叫作operator
        //hello world hadoop  -->  hello 1   world 1   hadoop 1
        //常用算子map:完成数据一对一转换操作
        // flatMap(扁平化处理):数据压扁,数据会形成一对多的操作
        // filter :数据的过滤
        // reduce: 数据的聚合
        FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = source.flatMap(
                //泛型表示,接收的数据类型和返回的数据类型
                new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    /**
                     * 会一行一行数据处理
                     * @param value:获取一行一行的数据
                     * @param out:收集器,将数据收集输出
                     * @throws Exception
                     */
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        // value :hello world hadoop
                        String[] words = value.split(" ");
                        //将每一个词循环输出
                        for (String word : words) {
                            //将数据收集输出
                            out.collect(new Tuple2<String, Integer>(word, 1));
                        }

                    }
                });

        //先分组,再聚合,0表示按照第一个字段分组
        UnsortedGrouping<Tuple2<String, Integer>> groupByOperator = wordAndOne.groupBy(0);

        //数据聚合,1表示第二个字段
        AggregateOperator<Tuple2<String, Integer>> result = groupByOperator.sum(1);

        //4.数据下沉 sink
        //print是sink的方法,在下沉中使用
        //result.print();
        result.writeAsText("hdfs://hadoop01:8020/test/output111", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        //flink中是惰性加载,必须加上execute
        env.execute();
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

6、DataStream API 开发

6.1 案例

6.1.1 Flink 流处理程序的一般流程

  1. 获取Flink 流处理执行环境

  2. 构建source

  3. 数据处理

  4. 构建sink

6.1.2 示例

编写Flink 程序,用来统计单词的数量。

6.1.3 步骤

  1. 获取Flink 批处理运行环境

  2. 构建一个socket 源

  3. 使用flink 操作进行单词统计

  4. 打印

6.1.4 参考代码

 public static void main(String[] args) throws Exception {

        //1.创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.指定数据源 socket数据源中输入一行一行的数据:hello world
        DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999);
        //3.数据处理
        //hello world --》 hello  1
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapOperator = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                //value拆分
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        });
        //分流,聚合
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = flatMapOperator.keyBy(0);
        //集合的是无界的数据流
        //SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedStream.sum(1);

        //有界的数据流
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> windowStream = keyedStream.timeWindow(Time.seconds(10));

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowStream.sum(1);

        //4.数据下沉
        result.print();
        env.execute();


    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

注:安装netcat

yum -y install nc
  • 1

6.2 基于kafka 的source 操作

addSource - 添加一个新的source function 。例如, 你可以addSource(new FlinkKafkaConsumer<>(…)) 以从Apache Kafka 读取数据。

上面几种的特点:

  1. 基于集合:有界数据集,更偏向于本地测试用

  2. 基于文件:适合监听文件修改并读取其内容

  3. 基于Socket:监听主机的host port,从Socket 中获取数据

  4. 自定义addSource:大多数的场景数据都是无界的,会源源不断的过来。比如去消费Kafka某个topic 上的数据,这时候就需要用到这个addSource,可能因为用的比较多的原因吧,Flink 直接提供FlinkKafkaConsumer 等类可供你直接使用。你可以去看看FlinkKafkaConsumerBase 这个基础类,它是Flink Kafka 消费的最根本的类。Flink 目前支持如下图里面常见的Source:

在这里插入图片描述

​ Flink 提供的Kafka 连接器,用于向Kafka 主题读取或写入数据。Flink Kafka Consumer集成了Flink 的检查点机制,可提供一次性处理语义。为实现这一目标,Flink 并不完全依赖kafka的消费者群体偏移跟踪,而是在内部跟踪和检查这些偏移。

6.2.1 不同版本兼容介绍

Flink版本Kafka版本
1.12.X2.4.1
1.11.X2.4.1
1.10.X2.2.0
1.9.X2.2.0
1.8.X2.0.1
1.7.X2.0.1
0.10.x0.8.2.0
0.9.x0.8.2.0

​ 在Maven仓库网站https://mvnrepository.com/中,找到flink-connector-kafka的详情页面:https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka

6.2.2 构造函数参数说明

主题名称/主题名称列表

DeserializationSchema / KeyedDeserializationSchema 用于反序列化来自Kafka 的数据

Kafka 消费者的属性。需要以下属性:

“bootstrap.servers”(以逗号分隔的Kafka 经纪人名单)

“zookeeper.connect”(逗号分隔的Zookeeper 服务器列表)(仅Kafka 0.8 需要)

“group.id”消费者群组的ID

6.2.3 反序列化Schema 类型

作用:对kafka 里获取的二进制数据进行反序列化

FlinkKafkaConsumer 需要知道如何将kafka 中的二进制数据转换成Java/Scala 对象

DeserialzationSchema 定义了该转换模式,通过T deserialize(byte[] message)

FlinkKafkaConsumer 从kafka 获取的每条消息都会通过DeserialzationSchema 的T deserialize(byte[] message)反序列化处理

反序列化Schema 类型(接口)

DeserialzationSchema(只反序列化value)

KeyedDeserializationSchema(反序列化key 和value)

6.2.4 常用的反序列化shema

Schema描述
SimpleStringSchema可以将消息反序列化为字符串。 当我们接收到消息并且反序列化失败的时候, 会出现以下两种情况:
1) Flink 从 deserialize(…)方法中抛出异常, 这会导致 job 的失败, 然后 job 会重启; (没有开启 容错)
2) 在 deserialize(…) 方法出现失败的时候返回 null, 这会让 Flink Kafka consumer 默默的忽略 这条消息。 请注意, 如果配置了 checkpoint 为 enable, 由于 consumer 的失败容忍机制, 失败的消 息会被继续消费, 因此还会继续失败, 这就会导致 job 被不断自动重启
JSONDeserializationSchema JSONKeyValueDeserializationSchema可以把序列化后的 Json 反序列化成 ObjectNode, ObjectNode 可以通过 objectNode.get(“field” ).as(Int/String/…)() 来访问指定的字段
TypeInformationSerializationSchema TypeInformationKeyValueSerializationSchema(适合读写均是 flink 的场景)他们会基于 Flink 的 TypeInformation 来创建 schema。 这对于那些从 Flink 写入, 又从 Flink 读出的数据是很有用的。 这种 Flink-specific 的反序列化会比其他通用的序 列化方式带来更高的性能。

6.2.5 Kafka Consumers 消费模式配置(影响从哪里开始消费)

消费模式说明
setStartFromEarliest从对头开始, 最早的记录内部的 consumer 递交到 kafka/zk 的偏移量将被忽略
setStartFromLatest从对尾开始, 最新的记录内部的 consumer 递交到 kafka/zk 的偏移量将被忽略
setStartFromGroupOffsets默认值, 从当前消费组记录 的偏移量开始, 接着上次的 偏移量消费以 consumer 递交到 kafka/zk 中的偏移量为起始位置开始消费, group.id 设置在 consumer 的 properties 里面; 如果没有找到记录的偏移量, 则使用 consumer 的 properties 的 auto.offset.reset 设置的策略
setStartFromSpecificOffsets(Map<TopicPart ition, Long>的参数)从指定的具体位置开始消费
setStartFromTimestamp(long)从指定的时间戳开始消费对于每个分区, 时间戳大于或者等于指定时间戳的记录将用作起始位 置, 如果一个分区的最新时间早于时间戳, 那么只需要从最新记录中 读取该分区, 在此模式下, kafka/zk 中递交的偏移量将被忽略,时间 戳指的是 kafka 中消息自带的时间戳

简单理解:

​ 如果是默认行为(setStartFromGroupOffsets),那么任务从检查点重启,按照重启前的offset 进行消费,如果直接重启不从检查点重启并且group.id 不变,程序会按照上次提交的offset 的位置继续消费。如果group.id 改变了,则程序按照auto.offset.reset 设置的属性进行消费。但是如果程序带有状态的算子,还是建议使用检查点重启。

如果是setStartFromEarliest()/ setStartFromLatest():那么任务只会从最新或最老消费。

6.2.6 Kafka 动态分区检测

​ 对于有实时处理业务需求的企业,随着业务增长数据量也会同步增长,将导致原有的Kafka分区数不满足数据写入所需的并发度,需要扩展Kafka 的分区或者增加Kafka 的topic,这时就要求实时处理程序。

​ SparkStreaming(与Kafka 0.10 版本结合支持动态分区检测)、Flink(创建一个线程,该线程会定期检测Kafka 新增分区,然后将其添加到kafkaFetcher 里) 都能动态发现新增topic 分区并消费处理新增分区的数据。

​ Spark 无需做任何配置就可动态发现Kafka 新增分区, 而Flink 需要将flink.partition-discovery.interval-millis 该属性设置为大于0 ,属性值为时间间隔单位为毫秒。

6.2.7 使用案例


public static void main(String[] args) throws Exception {

        //1.创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.指定数据源

        //创建kafka消费者
        /**
         * String topic, 主题
         * DeserializationSchema<T> valueDeserializer, 指定反序列化数据的类型
         * new SimpleStringSchema():按照字符串反序列化
         * Properties props:配置参数
         */
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092");
        properties.setProperty("group.id", "test");
		properties.setProperty("flink.partition-discovery.interval-millis", "5000");

		

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);

		String topic = "test";
        HashMap<KafkaTopicPartition, Long> offsets = new HashMap<>();
        offsets.put(new KafkaTopicPartition(topic, 0), 11111111l);
        offsets.put(new KafkaTopicPartition(topic, 1), 222222l);
        offsets.put(new KafkaTopicPartition(topic, 2), 33333333l);
 /**
     * Flink 从topic 中最初的数据开始消费
     */
    consumer.setStartFromEarliest();
    /**
     * Flink 从topic 中指定的时间点开始消费,指定时间点之前的数据忽略
     */
    consumer.setStartFromTimestamp(1559801580000l);
    /**
     * Flink 从topic 中指定的offset 开始,这个比较复杂,需要手动指定offset
     */
    consumer.setStartFromSpecificOffsets(offsets);
    /**
     * Flink 从topic 中最新的数据开始消费
     */
    consumer.setStartFromLatest();
    /**
     * Flink 从topic 中指定的group 上次消费的位置开始消费,所以必须 配置group.id 参数
     */
    consumer.setStartFromGroupOffsets();

        DataStreamSource<String> source = env.addSource(consumer);

        //3.数据处理
        //filter

        SingleOutputStreamOperator<Tuple2<String, Integer>> mapOperator = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                //value就是一条记录
                String[] attrs = value.split("\\|");
                if (attrs.length == 5) {//etl
                    return new Tuple2<String, Integer>(attrs[4], 1);
                }

                return new Tuple2<String, Integer>("异常数据", 1);
            }
        });
        //分流,聚合
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapOperator.keyBy(0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedStream.sum(1);

        //4.数据输出
        result.print();
        //
        //result.addSink()
        env.execute();

    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79

启动kafka服务

先启动zookeeper在启动kafka

cd /opt/servers/kafka_2.12-2.2.0
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
  • 1
  • 2

启动kafka生产者测试数据

cd /opt/servers/kafka_2.12-2.2.0
bin/kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic test

  • 1
  • 2
  • 3

查看偏移量

kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --group test --describe
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/480576
推荐阅读
相关标签
  

闽ICP备14008679号