当前位置:   article > 正文

【大数据】Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT_flinksql with

flinksql with

1.WITH 子句

应用场景(支持 Batch / Streaming):With 语句和离线 Hive SQL With 语句一样的,语法糖 +1,使用它可以让你的代码逻辑更加清晰。

-- 语法糖 +1
WITH orders_with_total AS (
    SELECT order_id, price + tax AS total
    FROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2.SELECT & WHERE 子句

应用场景(支持 Batch / Streaming):SELECT & WHERE 语句和离线 Hive SQL 语句一样的,常用作 ETL,过滤,字段清洗标准化。

INSERT INTO target_table
SELECT * FROM Orders

INSERT INTO target_table
SELECT order_id, price + tax FROM Orders

INSERT INTO target_table
-- 自定义 Source 的数据
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1))  AS t (order_id, price)

INSERT INTO target_table
SELECT price + tax FROM Orders WHERE id = 10

-- 使用 UDF 做字段标准化处理
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
-- 过滤条件
Where id > 3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

其实理解一个 SQL 最后生成的任务是怎样执行的,最好的方式就是理解其语义。

以下面的 SQL 为例,我们来介绍下其在离线中和在实时中执行的区别,对比学习一下,大家就比较清楚了。

INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
Where id > 3
  • 1
  • 2
  • 3

这个 SQL 对应的实时任务,假设 Orders 为 Kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Orders Kafka 中一条一条的读取数据,然后一条一条发送给下游的 过滤和字段标准化算子
  • 过滤和字段标准化算子Where id > 3PRETTY_PRINT(order_id)):接收到上游算子发的一条一条的数据,然后判断 id > 3 ?,将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,一条一条将计算结果数据发给下游 数据汇算子
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

可以看到这个实时任务的所有算子是以一种 Pipeline 模式运行的,所有的算子在同一时刻都是处于 running 状态的,24 小时一直在运行,实时任务中也没有离线中常见的分区概念。

在这里插入图片描述

⭐ 关于看如何看一段 Flink SQL 最终的执行计划:最好的方法就如上图,看 Flink Web UI 的算子图,算子图上详细的标记清楚了每一个算子做的事情。

以上图来说,我们可以看到主要有三个算子:

  • Source 算子。Source: TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, name]) -> Calc(select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]) -> WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)])
    • 其中 Source 表名称为 table=[[default_catalog, default_database, Orders]
    • 字段为 select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]
    • Watermark 策略为 rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]
  • 过滤算子。Calc(select=[order_id, name, row_time], where=[(order_id > 3)]) -> NotNullEnforcer(fields=[order_id])
    • 其中过滤条件为 where=[(order_id > 3)]
    • 结果字段为 select=[order_id, name, row_time]
  • Sink 算子。Sink: Sink(table=[default_catalog.default_database.target_table], fields=[order_id, name, row_time])
    • 其中最终产出的表名称为 table=[default_catalog.default_database.target_table]
    • 表字段为 fields=[order_id, name, row_time]

可以看到 Flink SQL 具体执行了哪些操作是非常详细的标记在算子图上。所以小伙伴萌一定要学会看算子图,这是掌握 Debug、调优前最基础的一个技巧。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个类似的算子(虽然实际可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),离线和实时任务的执行方式完全不同:

  • 数据源算子From Order):数据源从 Orders Hive 表(通常都是读一天、一小时的分区数据)中一次性读取所有的数据,然后将读到的数据全部发给下游 过滤和字段标准化算子,然后 数据源算子 就运行结束了,释放资源了。
  • 过滤和字段标准化算子Where id > 3PRETTY_PRINT(order_id)):接收到上游算子的所有数据,然后遍历所有数据判断 id > 3 ?,将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,将所有数据发给下游 数据汇算子,然后 过滤和字段标准化算子 就运行结束了,释放资源了
  • 数据汇算子INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 表中,然后整个任务就运行结束了,整个任务的资源也就都释放了

可以看到离线任务的算子是分阶段(Stage)进行运行的,每一个 Stage 运行结束之后,然后下一个 Stage 开始运行,全部的 Stage 运行完成之后,这个离线任务就跑结束了。

注意:很多小伙伴都是之前做过离线数仓的,熟悉了离线的分区、计算任务定时调度运行这两个概念,所以在最初接触 Flink SQL 时,会以为 Flink SQL 实时任务也会存在这两个概念,这里博主做一下解释:

  • 分区概念:离线由于能力限制问题,通常都是进行一批一批的数据计算,每一批数据的数据量都是有限的集合,这一批一批的数据自然的划分方式就是时间,比如按小时、天进行划分分区。但是 在实时任务中,是没有分区的概念的,实时任务的上游、下游都是无限的数据流。
  • 计算任务定时调度概念:同上,离线就是由于计算能力限制,数据要一批一批算,一批一批输入、产出,所以要按照小时、天定时的调度和计算。但是 在实时任务中,是没有定时调度的概念的,实时任务一旦运行起来就是 24 小时不间断,不间断的处理上游无限的数据,不简单的产出数据给到下游。

3.SELECT DISTINCT 子句

应用场景(支持 Batch / Streaming):语句和离线 Hive SQL SELECT DISTINCT 语句一样的,用作根据 key 进行数据去重。

INSERT into target_table
SELECT DISTINCT id 
FROM Orders
  • 1
  • 2
  • 3

也是拿离线和实时做对比。

这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Orders Kafka 中一条一条的读取数据,然后一条一条发送给下游的 去重算子
  • 去重算子DISTINCT id):接收到上游算子发的一条一条的数据,然后判断这个 id 之前是否已经来过了,判断方式就是使用 Flink 中的 state 状态,如果状态中已经有这个 id 了,则说明已经来过了,不往下游算子发,如果状态中没有这个 id,则说明没来过,则往下游算子发,也是一条一条发给下游 数据汇算子
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

在这里插入图片描述
对于实时任务,计算时的状态可能会无限增长。状态大小取决于不同 key(上述案例为 id 字段)的数量。为了防止状态无限变大,我们可以设置状态的 TTL。但是这可能会影响查询结果的正确性,比如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个相同的算子(虽然可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),但是其和实时任务的执行方式完全不同:

  • 数据源算子From Order):数据源从 Orders Hive 表(通常都有天、小时分区限制)中一次性读取所有的数据,然后将读到的数据全部发给下游 去重算子,然后 数据源算子 就运行结束了,释放资源了。
  • 去重算子DISTINCT id):接收到上游算子的所有数据,然后遍历所有数据进行去重,将去重完的所有结果数据发给下游 数据汇算子,然后 去重算子 就运行结束了,释放资源了。
  • 数据汇算子INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 中,然后整个任务就运行结束了,整个任务的资源也就都释放了。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/blog/article/detail/56793
推荐阅读
  • Flink 输出至 Elasticsearch
    Flink输出至Elasticsearch。Flink输出至Elasticsearch【1】引入pom.xml依赖<dependency><groupId>org.apache.flink</groupId&g... [详细]

  • 都是从中scan出来的。而这个myTable又是我们注册进去的。问题就是有哪些方式可以注册Table。类似于上述的WordCount,指定一个文件系统fs,也可以是kafka等,还需要一些格式和Schema等。//将source注册到env... [详细]

  • Flink CDC、OGG、Debezium等基于日志开源CDC方案对比_flink cdc 2.4不同
    CDC的全称是,在广义的概念上,只要能捕获数据变更的技术,我们都可以称为CDC。我们目前通常描述的CDC技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。_flinkcdc2.4不同flinkcdc2.4不同先上一张图,后面再... [详细]

  • 状态精准一次是Flink流式计算引擎的一大特色,本章节从状态、状态后端、checkpoint算法逐步为读者展示了Flink状态机制设计的绝妙之处。深入理解Flink(二)FlinkStateBackendCheckpoint容错深入分析... [详细]

  • 2-1启动gateway之前先启动一个flinksession./bin/yarn-session.sh-d2-2启动命令:2-3查看日志观察是否启动成功:查看日志出现这个条信息就证明已经找到了flinksessionapplication... [详细]

  • Flink 的时间属性及原理解析
    FlinkAPI大体上可以划分为三个层次:处于,这三层中每一层都非常依赖于时间属性时间FlinkAPI。在这一层中因为封装方面原因,我们能够接触到时间地方不是很多,所以我们将重点放在底层和最上层Flink时间属性及原理解... [详细]

  • flink读取文件数据写到elasticsearch_flink写入esflink写入es前言es是大数据存储的必备中间件之一,通过flink可以读取来自日志文件,kafka等外部数据源的数据,然后写入es中,本篇将通过实例演示下完整的操... [详细]

  • 文章目录ElasticSearch是一个分布式的开源搜索和分析引擎,适用于所有类型的数据。ElasticSearch有着简洁的REST风格的API,以良好的分布式特性、速度和可扩展性而闻名,在大数据领域应用非常广泛。Flink为Elasti... [详细]

  • Flink数据下沉到Elasticsearch示例简介  当初做课程设计的时候,找到的flink接入elasticsearch的文章除了flink的文档示例之外版本都挺老的,所以自己按照flink的文档把原来的改造了一下。现再更新最新版本... [详细]

  • Elasticsearch连接器的使用与JDBC连接器非常相似,写入数据的模式同样是由创建表的DDL中是否有主键定义决定的。​除了所有ROW类型的字段(对应着HBase中的family),表中还应有一个原子类型的字段,它就会被识别为HBas... [详细]

  • 模式【1】;生成,然后转化为JobGraph;【2】依次启动三者都服从分布式协同一致的策略;将JobGraph转化为,然后转化为物理执行任务Execution,然后进行deploydeploy过程会向请求slot,如果有直接deploy到对... [详细]

  • 除了使用已有的所定义的数据格式类型之外,用户也可以自定义实现,来满足的不同的数据类型定义需求。Flink提供了可插拔的让用户将自定义的注册到Flink类型系统中。如下代码所示只需要通过实现接口,返回相应的类型信息。通过@TypeInfo注解... [详细]

  • 在分布式系统中,消息的丢失、错乱不可避免,这些问题会在分布式系统的组件中引入不一致状态,如果没有定消息,那么组件无法从这些不一致状态中恢复。作为分布式数据处理框架,Flink提供了支撑流计算和批计算的接口,同在此基础之上抽象出不同的... [详细]

  • 大家都应该清楚Task和StreamTask两个概念,Task是直接受TaskManager管理和调度的,而Task又会调用StreamTask,而StreamTask中真正封装了算子的处理逻辑。在run()方法中,首先将反序列化后的数据封... [详细]

  • 这里目录标题11、TableAPI与SQL11.1需要引入的pom依赖11.2、两种planner区别(old/blink)11.3、基本程序结构11.4.TableEnvironment11.5、表(Table)11.5.1创建Table... [详细]

  • 在定义数据处理管道时,TableAPI和DataStreamAPI同样重要。DataStreamAPI在一个相对较低级别的命令式编程API中提供了流处理的原语(即时间、状态和数据流管理)。TableAPI抽象了许多内部结构,并提供了结构化和... [详细]

  • KafkaTable&SQL连接Kafka读写数据的样例代码:首先是依赖,maven项目中需指定:org.apache.flinkflink-stream... [详细]

  • 当事件流流进Partition时会判断新事件流的WM是否大于当前的PartitionWM,当大于时就更新Partition的时间戳WM为新流入的WM(取最大值),如下1->2象限PartitionWM的变化。同时,如下Task也维护了一个全... [详细]

  • 例如一个加法算子,第一次输入2+3=5那么以后我多次数据2+3时候得到结果都是5。得出结论就是,相同输入都会得到相同结果,与次数无关。访问量统计,我们都知道Nginx访问日志一个请求一条日志,基于此我们就可以统计访问量。如下,... [详细]

  • 现实世界中,所有的数据都是以流式的形态产生的,不管是哪里产生的数据,在产生的过程中都是一条条地生成,最后经过了存储和转换处理,形成了各种类型的数据集。如下图所示,根据现实的数据产生方式和数据产生是否含有边界(具有起始点和终止点)角度,将数据... [详细]

相关标签
  

闽ICP备14008679号