赞
踩
Flink是一个面向流处理和批处理的分布式数据计算引擎,能够基于同一个Flink 运行,可以提供流处理和批处理两种类型的功能。 在 Flink 的世界观中,一切都是由流组成的,离线数据是有界的流;实时数据是一个没有界限的流:这就是所谓的有界流和无界流。
Flink可以完全独立于Hadoop,在不依赖Hadoop组件下运行。但是做为大数据的基础设施,Hadoop体系是任何大数据框架都绕不过去的。Flink可以集成众多 Hadooop 组件,例如Yarn、Hbase、HDFS等等。例如,Flink可以和Yarn集成做资源调度,也可以读写HDFS,或者利用HDFS做检查点。
Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个
TaskManager。
Client 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给
JobManager。之后,客户端可以断开连接(分离模式),或保持连接来接收进程报告(附加模式)。客户端可以作为触发执行 Java/Scala 程序的一部分运行,
也可以在命令行进程 ./bin/flink run … 中运行。
可以通过多种方式启动 JobManager 和 TaskManager:直接在机器上作为
standalone 集群启动、在容器中启动、或者通过YARN等资源框架管理并启动。
TaskManager 连接到 JobManagers,宣布自己可用,并被分配工作。
JobManager:
JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:
TaskManagers:
TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 taskslot。TaskManager 中 taskslot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子。
Checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。Flink的Checkpoint机制原理来自“Chandy-Lamport algorithm”算法。
每个需要Checkpoint的应用在启动时,Flink的JobManager为其创建一个
CheckpointCoordinator(检查点协调器),CheckpointCoordinator全权负责本应用的快照制作。
CheckpointCoordinator(检查点协调器),CheckpointCoordinator全权负责本应用的快照制作。
文章推荐:
Flink可靠性的基石-checkpoint机制详细解析
spark streaming 的 checkpoint 仅仅是针对 driver 的故障恢复做了数据和元数据的 checkpoint。而 flink 的 checkpoint 机制 要复杂了很多,它采用的是轻量级的分布式快照,实现了每个算子的快照,及流动中的数据的快照。
Flink通过实现两阶段提交和状态保存来实现端到端的一致性语义。分为以下几个步骤:
开始事务(beginTransaction)创建一个临时文件夹,来写把数据写入到这个文件夹里面
预提交(preCommit)将内存中缓存的数据写入文件并关闭
正式提交(commit)将之前写完的临时文件放入目标目录下。这代表着最终的数据会有一些延迟
丢弃(abort)丢弃临时文件若失败发生在预提交成功后,正式提交前。可以根据状态来提交预提交的数据,也可删除预提交的数据。
两阶段提交协议详解:八张图搞懂Flink的Exactly-once
端到端的exactly-once对sink要求比较高,具体实现主要有幂等写入和事务性写入两种方式。
幂等写入的场景依赖于业务逻辑,更常见的是用事务性写入。而事务性写入又有预写日志(WAL)和两阶段提交(2PC)两种方式。
如果外部系统不支持事务,那么可以用预写日志的方式,把结果数据先当成状态保存,然后在收到 checkpoint 完成的通知时,一次性写入 sink 系统。
分两部分:
推荐阅读:一文学完Flink流计算常用算子(Flink算子大全)
在 Flink 的后台任务管理中,我们可以看到 Flink 的哪个算子和 task 出现了反压。最主要的手段是资源调优和算子调优。资源调优即是对作业中的 Operator 的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包括:并行度的设置,State 的设置,checkpoint 的设置。
Flink 内部是基于 producer-consumer 模型来进行消息传递的,Flink的反压设计也是基于这个模型。Flink 使用了高效有界的分布式阻塞队列,就像 Java 通用的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。
Flink会因为数据堆积和处理速度变慢导致checkpoint超时,而checkpoint是
Flink保证数据一致性的关键所在,最终会导致数据的不一致发生。
数据倾斜:可以在 Flink 的后台管理页面看到每个 Task 处理数据的大小。当数据倾斜出现时,通常是简单地使用类似 KeyBy 等分组聚合函数导致的,需要用户将热点 Key 进行预处理,降低或者消除热点 Key 的影。
GC:不合理的设置 TaskManager 的垃圾回收参数会导致严重的 GC 问题,我们
可以通过 -XX:+PrintGCDetails 参数查看 GC 的日志。
代码本身:开发者错误地使用 Flink 算子,没有深入了解算子的实现机制导致性能问题。我们可以通过查看运行机器节点的 CPU 和内存情况定位问题。
Flink在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和 checkpoint 交互。Flink 提供了三种状态存储方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。
为了更高效地分布式执行,Flink 会尽可能地将 operator 的 subtask 链接
(chain)在一起形成 task。每个 task 在一个线程中执行。将 operators 链接成 task 是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/ 反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。这就是我们所说的算子链。
Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序列化框架。
Flink中的时间有三种类型,如下图所示:
例如,一条日志进入Flink的时间为2021-01-2210:00:00.123,到达Window的系统时间为2021-01-22 10:00:01.234,日志的内容如下:
2021-01-06 18:37:15.624 INFO Fail over to rm2
对于业务来说,要统计1min内的故障日志个数,哪个时间是最有意义的?—— eventTime,因为我们要根据日志的生成时间进行统计。
Flink中 WaterMark 和 Window 机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,对于延迟的数据Flink 也有自己的解决办法,主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据
设置允许延迟的时间是通过allowedLateness(lateness: Time)设置
保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存
获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取文章推荐:
Flink 中极其重要的 Time 与 Window 详细解析
window 产生数据倾斜指的是数据在不同的窗口内堆积的数据量相差过多。本质上产生这种情况的原因是数据源头发送的数据量速度不同导致的。出现这种情况一般通过两种方式来解决:
在流式处理中,CEP 当然是要支持 EventTime 的,那么相对应的也要支持数据的迟到现象,也就是watermark的处理逻辑。CEP对未匹配成功的事件序列的处理,和迟到数据是类似的。在 Flink CEP的处理逻辑中,状态没有满足的和迟到的数据,都会存储在一个Map数据结构中,也就是说,如果我们限定判断事件序列的时长为5分钟,那么内存中就会存储5分钟的数据,这在我看来,也是对内存的极大损伤之一。
推荐阅读:一文学会Flink CEP
们在实际生产环境中可以从四个不同层面设置并行度:
在一个 Flink Job 中,数据需要在不同的 task 中进行交换,整个数据交换是有 TaskManager 负责的,TaskManager 的网络组件首先从缓冲 buffer 中收集 records,然后再发送。Records 并不是一个一个被发送的,是积累一个批次再发送,batch 技术可以更加高效的利用网络资源。
Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序列化框架。
Flink 摒弃了 Java 原生的序列化方法,以独特的方式处理数据类型和序列化,包含自己的类型描述符,泛型类型提取和类型序列化框架。
TypeInformation 是所有类型描述符的基类。它揭示了该类型的一些基本属性,并且可以生成序列化器。
TypeInformation 支持以下几种类型:
构建抽象语法树的事情交给了 Calcite 去做。SQLquery 会经过 Calcite 解析器转变成 SQL 节点树,通过验证后构建成 Calcite 的抽象语法树(也就是图中的 Logical Plan)。另一边,Table API 上的调用会构建成 Table API 的抽象语法树,并通过 Calcite 提供的 RelBuilder 转变成 Calcite 的抽象语法树。
然后依次被转换成逻辑执行计划和物理执行计划。
在提交任务后会分发到各个 TaskManager 中运行,在运行时会使用 Janino 编译器编译代码后运行。
第一时间获取最新大数据技术,尽在公众号:五分钟学大数据搜索公众号:五分钟学大数据,学更多大数据技术!其他大数据技术文档可下方扫码关注获取:
在数据处理的时候不丢失数据,例如sparkstreaming处理kafka数据的时候,要保证数据不丢失,这个尤为重要
前两步中,如果无法保证数据的完整性,那么就要通过离线计算进行数据的校对,这样才能保证我们能够得到期望值
你感觉数仓建设中最重要的是什么
数仓建设中,最重要的是数据准确性,数据的真正价值在于数据驱动决策,通过数据指导运营,在一个不准确的数据驱动下,得到的一定是错误的数据分析,影响的是公司的业务发展决策,最终导致公司的策略调控失败。
数据仓库建模怎么做的
数仓建设中最常用模型–Kimball维度建模详解
数据质量怎么监控
单表数据量监控
一张表的记录数在一个已知的范围内,或者上下浮动不会超过某个阈值
SQL结果:var 数据量 = select count(*)from 表 where 时间等过滤条件
报警触发条件设置:如果数据量不在[数值下限, 数值上限], 则触发报警
同比增加:如果((本周的数据量 -上周的数据量)/上周的数据量*100)不在 [比例下线,比例上限],则触发报警
环比增加:如果((今天的数据量 - 昨天的数据量)/昨天的数据量*100)不在 [比例下线,比例上限],则触发报警
报警触发条件设置一定要有。如果没有配置的阈值,不能做监控
日活、周活、月活、留存(日周月)、转化率(日、周、月)GMV(日、周、月)
复购率(日周月)
单表空值检测
某个字段为空的记录数在一个范围内,或者占总量的百分比在某个阈值范围内
目标字段:选择要监控的字段,不能选“无”
SQL结果:var 异常数据量 = select count(*) from 表 where 目标字段 is null
单次检测:如果(异常数据量)不在[数值下限, 数值上限],则触发报警
单表重复值检测
一个或多个字段是否满足某些规则
目标字段:第一步先正常统计条数;select count(*) form 表;
第二步,去重统计;select count(*) from 表 group by 某个字段
第一步的值和第二步不的值做减法,看是否在上下线阀值之内
单次检测:如果(异常数据量)不在[数值下限, 数值上限], 则触发报警
跨表数据量对比
主要针对同步流程,监控两张表的数据量是否一致
SQL结果:count(本表) - count(关联表)
阈值配置与“空值检测”相同
数据商业分析中会存在很多判断:
观察数据当前发生了什么?
比如想知道线上渠道A、B各自带来了多少流量,新上线的产品有多少用户喜欢,新注册流中注册的人数有多少。这些都需要通过数据来展示结果。
理解为什么发生?
我们需要知道渠道A为什么比渠道B好,这些是要通过数据去发现的。也许某个关键字带来的流量转化率比其他都要低,这时可以通过信息、知识、数据沉淀出发生的原因是什么。
预测未来会发生什么?
在对渠道A、B有了判断之后,根据以往的知识预测未来会发生什么。在投放渠道C、D的时候,猜测渠道C比渠道D好,当上线新的注册流、新的优化,可以知道哪一个节点比较容易出问题,这些都是通过数据进行预测的过程。
商业决策
所有工作中最有意义的还是商业决策,通过数据来判断应该做什么。这是商业分析最终的目的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。