赞
踩
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
System Level
在 flink-conf.yaml 中通过 parallelism.default 配置项给所有 execution environments 指定系统级的默认 parallelism;在 ExecutionEnvironment 里头可以 通过 setParallelism 来给 operators、data sources、data sinks 设置默认的 parallelism;如 果 operators 、 data sources 、 data sinks 自 己 有 设 置 parallelism 则 会 覆 盖 ExecutionEnvironment 设置的 parallelism。
需要注意的优先级:算子层面>环境层面>客户端层面>系统层面。
固定延迟重启策略(Fixed Delay Restart Strategy)
故障率重启策略(Failure Rate Restart Strategy)
无重启策略(No Restart Strategy)
Fallback 重启策略(Fallback Restart Strategy)
Flink 提供了一个分布式缓存,类似于 hadoop,可以使用户在并行函数中很方便的读取本地 文件,并把它放在 taskmanager 节点中,防止 task 重复拉取。
此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如 hdfs 或 者 s3),通过 ExecutionEnvironment 注册缓存文件并为它起一个名称。 当程序执行,Flink 自动将文件或者目录复制到所有 taskmanager 节点的本地文件系统,仅 会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从 taskmanager 节点的本地 文件系统访问它。
在 Flink 中,同一个算子可能存在若干个不同的并行实例,计算过程可能不在同一个 Slot 中进行,不同算子之间更是如此,因此不同算子的计算数据之间不能像 Java 数组之间一样互相 访问,而广播变量 Broadcast 便是解决这种情况的。我们可以把广播变量理解为是一个公共的共 享变量,我们可以把一个 dataset 数据集广播出去,然后不同的 task 在节点上都能够获取到, 这个数据在每个节点上只会存在一份。
1.Client上传jar包和配置文件到HDFS集群上
2.Client向Yarn ResourceManager提交任务并申请资源
3.ResourceManager分配Container资源并启动ApplicationMaster,然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager
JobManager和ApplicationMaster运行在同一个container上。
一旦他们被成功启动,AppMaster就知道JobManager的地址(AM它自己所在的机器)。
它就会为TaskManager生成一个新的Flink配置文件(他们就可以连接到JobManager)。
这个配置文件也被上传到HDFS上。
此外,AppMaster容器也提供了Flink的web服务接口。
YARN所分配的所有端口都是临时端口,这允许用户并行执行多个Flink
4.ApplicationMaster向ResourceManager申请工作资源,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
5.TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务
Flink 中的时间与现实世界中的时间是不一致的,在 flink 中被划分为事件时间,摄入时间, 处理时间三种。
如果以 EventTime 为基准来定义时间窗口将形成 EventTimeWindow,要求消息本身 就 应 该 携 带 EventTime
如 果 以 IngesingtTime 为 基 准 来 定 义 时 间 窗 口 将 形 成 IngestingTimeWindow,以 source 的 systemTime 为准。
如果以 ProcessingTime 基准来定义时间窗口将形成 ProcessingTimeWindow,以 operator 的 systemTime 为准。
面到这里,面试官已经很满意你对Flink的掌握,那么更近一步让面试官折服:
Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制,本质上也是一种 时间戳。watermark 是用于处理乱序事件的,处理乱序事件通常用 watermark 机制结合 window 来实现。
Flink 基于分布式快照与可部分重发的数据源实现了容错。
用户可自定义对整个 Job 进行快 照的时间间隔,当任务失败时,Flink 会将整个 Job 恢复到最近一次快照,并从数据源重发快照 之后的数据。
注意:这里 window 产生的数据倾斜指的是不同的窗口内积攒的数据量不同,主要是由源头 数据的产生速度导致的差异。核心思路:1.重新设计 key 2.在窗口计算前做预聚合
flink没学过调优,被问到了,我们总不能说俺不知道,洒家不会之类的吧٩(๑❛ᴗ❛๑)۶下面展示一种回答
首先要确定问题产生的原因,找到最耗时的点,确定性能瓶颈点。比如任务频繁反压,找到 反压点。主要通过:资源调优、作业参数调优。资源调优即是对作业中的 Operator 的并发数 (parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包括:并行度的设置,State 的设置,checkpoint 的设置。
Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块 上。此外,Flink 大量的使用了堆外内存。如果需要处理的数据超出了内存限制, 则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序 列化框架。
这道题问的比较开阔,如果知道 Flink 底层原理,可以详细说说,如果不是很了 解,就直接简单一句话:Flink 的开发者认为批处理是流处理的一种特殊情况。 批处理是有限的流处理。Flink 使用一个引擎支持了 DataSet API 和 DataStream API。
Flink 在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。 选择的状态存储策略不同,会影响状态持久化如何和 checkpoint 交互。Flink 提 供了三种状态存储方式:MemoryStateBackend、FsStateBackend、 RocksDBStateBackend。
Flink 通过实现两阶段提交和状态保存来实现端到端的一致性语义。分为以下几 个步骤: 开始事务(beginTransaction)创建一个临时文件夹,来写把数据写入到这个文件 夹里面 预提交(preCommit)将内存中缓存的数据写入文件并关闭 正式提交(commit)将之前写完的临时文件放入目标目录下。这代表着最终的 数据会有一些延迟 丢弃(abort)丢弃临时文件 若失败发生在预提交成功后,正式提交前。可以根据状态来提交预提交的数据, 也可删除预提交的数据。
Flink 内部是基于 producer-consumer 模型来进行消息传递的,Flink 的反压设计 也是基于这个模型。Flink 使用了高效有界的分布式阻塞队列,就像 Java 通用 的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。
非常经典的wordcount题,类似的用scala,spark,MapReduce手写wc你能写出来吗?
新建文件为 words. txt,文件路径在/ export/ server/data下面,内容如下
Spark Flink flume hadoop
Flink spark flume hadoop
以下使用Flink 计算引擎实现流式数据处理:从Socket接收数据,实时进行词频统计WordCount
Java版:
// 1.准备环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.准备数据-source // DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999); DataStreamSource<String> inputDataStream = env.readTextFile("D:\\0615\\bigdata-flink\\datas\\wordcount.data"); // 3.处理数据-transformation // TODO: 流计算词频统计WordCount与处理思路基本一致 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream // 分割单词 .flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { for (String word : line.trim().split("\\s+")) { out.collect(word); } } }) // 转换二元组 .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String word) throws Exception { return new Tuple2<>(word, 1); } }) // 分组聚合 .keyBy(0).sum(1); // 4.输出结果-sink resultDataStream.print(); // 5.触发执行-execute env.execute(_02StreamWordCount.class.getSimpleName());
思考一下Scala版,Python版该怎么写??
{“user_id”: “1”, “page_id”:“1”, “status”: “success”}
{“user_id”: “1”, “page_id”:“1”, “status”: “success”}
{“user_id”: “1”, “page_id”:“1”, “status”: “success”}
{“user_id”: “1”, “page_id”:“1”, “status”: “success”}
{“user_id”: “1”, “page_id”:“1”, “status”: “fail”}
官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html
代码实现:
//1.准备环境 流执行环境和流表 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2.执行SQL,创建 input\_kafka 表 TableResult inputTable = tEnv.executeSql( "CREATE TABLE input\_kafka (\n" + " `user\_id` BIGINT,\n" + " `page\_id` BIGINT,\n" + " `status` STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'input\_kafka',\n" + " 'properties.bootstrap.servers' = 'node1:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" ); // 创建 output\_kafka TableResult outputTable = tEnv.executeSql( "CREATE TABLE output\_kafka (\n" + " `user\_id` BIGINT,\n" + " `page\_id` BIGINT,\n" + " `status` STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'output\_kafka',\n" + " 'properties.bootstrap.servers' = 'node1:9092',\n" + " 'format' = 'json',\n" + " 'sink.partitioner' = 'round-robin'\n" + ")" ); // 根据 status 是否为 success 条件筛选出来值 String sql = "select " + "user\_id," + "page\_id," + "status " + "from input\_kafka " + "where status = 'success'"; Table ResultTable = tEnv.sqlQuery(sql); //3.toRetractStream DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class); //4.打印输出 resultDS.print(); //5.执行sql 将筛选出来success的数据表插入到 output\_kafka tEnv.executeSql("insert into output\_kafka select \* from "+ResultTable); //6.excute env.execute();
再来一题,小试牛刀:
使用java或 scala语言编程实现消费 kafka中的数据并在数据处理阶段过滤掉
country Code不为cN的内容并打印输出
假设:集群主机 hostname为 node
Kafka的 topic为data
Kafka的消费组为 default Group
示例数据
{dt";“2020-08-05
10: 11: 09”,“country Code”: “CN”,“data”: [{“type” s1,score":0.8),“type”: 52,score": 0. 3}}}
{“dt”:“202008-05
10: 13: 12”,“country Code”: KW",“data”: [{“type”: “s2”,“score”: 0. 41.“type”: “s1”,“score”: 0.3}}}
{“dt”:“202008-05
10: 12: 15”, “country Code”: “US”, “data” [{“type”: “s4”,“score”: 0.3).“type”: 52","score: 0.5}}}
A D A D D
ABCD BCD AB BC ABCD
F F F T T
以上便是大数据Flink面试考题——Flink高频考点,万字超全整理,
题目部分整理自网络
主要是为了准备不久后的考试,及为同笔者一样的萌新复习Flink
看完是不是觉得Flink跟没学的一样,笔者已贴心的为您准备好2021最新的Flink系列教程:
2021年最新最全Flink系列教程笔记_Flink
还有初学Flink必看的Flink思维导图
2021最新Flink思维导图__萌新制作(钜详细)
愿你读过之后有自己的收获,如果有收获不妨一键三连一下~
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
32149913.png)
[外链图片转存中…(img-R2r1ZkPH-1715739615526)]
[外链图片转存中…(img-HeHGnDJL-1715739615526)]
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。