赞
踩
- -- 创建kafka 表
- CREATE TABLE bid (
- bidtime TIMESTAMP(3),
- price DECIMAL(10, 2) ,
- item STRING,
- WATERMARK FOR bidtime AS bidtime
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'bid', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'csv' -- 读取数据的格式
- );
-
- kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid
- 2020-04-15 08:05:00,4.00,C
- 2020-04-15 08:07:00,2.00,A
- 2020-04-15 08:09:00,5.00,D
- 2020-04-15 08:11:00,3.00,B
- 2020-04-15 08:13:00,1.00,E
- 2020-04-15 08:17:00,6.00,F
- -- TUMBLE: 滚动窗口函数,函数的作用时在原表的基础上增加[窗口开始时间,窗口结束时间,窗口时间]
- -- TABLE;表函数,将里面函数的结果转换成动态表
- SELECT * FROM
- TABLE(
- TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
- );
-
-
- -- 在基于窗口函数提供的字段进行聚合计算
- -- 实时统计每隔商品的总的金额,每隔10分钟统计一次
- SELECT
- item,
- window_start,
- window_end,
- sum(price) as sum_price
- FROM
- TABLE(
- -- 滚动的事件时间窗口
- TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
- )
- group by item,window_start,window_end;
- CREATE TABLE words (
- word STRING,
- proctime as PROCTIME() -- 定义处理时间,PROCTIME:获取处理时间的函数
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'words', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'csv' -- 读取数据的格式
- );
-
- kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words
- java
- spark
-
- -- 在flink SQL中处理时间和事件时间的sql语法没有区别
- SELECT * FROM
- TABLE(
- TUMBLE(TABLE words, DESCRIPTOR(proctime), INTERVAL '5' SECOND)
- );
-
-
- SELECT
- word,window_start,window_end,
- count(1) as c
- FROM
- TABLE(
- TUMBLE(TABLE words, DESCRIPTOR(proctime), INTERVAL '5' SECOND)
- )
- group by
- word,window_start,window_end
- -- HOP: 滑动窗口函数
- -- 滑动窗口一条数据可能会落到多个窗口中
-
- SELECT * FROM
- TABLE(
- HOP(TABLE bid, DESCRIPTOR(bidtime),INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
- );
-
-
- -- 每隔5分钟计算最近10分钟所有商品总的金额
- SELECT
- window_start,
- window_end,
- sum(price) as sum_price
- FROM
- TABLE(
- HOP(TABLE bid, DESCRIPTOR(bidtime),INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
- )
- group by
- window_start,window_end
- CREATE TABLE words (
- word STRING,
- proctime as PROCTIME() -- 定义处理时间,PROCTIME:获取处理时间的函数
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'words', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'csv' -- 读取数据的格式
- );
-
- kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words
- java
- spark
-
- select
- word,
- SESSION_START(proctime,INTERVAL '5' SECOND) as window_start,
- SESSION_END(proctime,INTERVAL '5' SECOND) as window_end,
- count(1) as c
- from
- words
- group by
- word,SESSION(proctime,INTERVAL '5' SECOND);
在Flink中的批处理的模式,over函数和hive是一致的。
- SET 'execution.runtime-mode' = 'batch';
- -- 有界流
- CREATE TABLE students_hdfs_batch (
- sid STRING,
- name STRING,
- age INT,
- sex STRING,
- clazz STRING
- )WITH (
- 'connector' = 'filesystem', -- 必选:指定连接器类型
- 'path' = 'hdfs://master:9000/data/student', -- 必选:指定路径
- 'format' = 'csv' -- 必选:文件系统连接器指定 format
- );
-
- -- row_number,sum,count,avg,lag,lead,max,min
- -- 需要注意的是sum,sum在有排序的是聚合,在没有排序的是全局聚合。
- -- 获取每隔班级年龄最大的前两个学生
-
- select *
- from(
- select
- *,
- row_number() over(partition by clazz order by age desc) as r
- from
- students_hdfs_batch
- ) as a
- where r <=2
flink流处理中over聚合使用限制
1、order by 字段必须是时间字段升序排序或者使用over_number时可以增加条件过滤
2、在流处理里面,Flink中目前只支持按照时间属性升序定义的over的窗口。因为在批处理中,数据量的大小是固定的,不会有新的数据产生,所以在做排序的时候,只需要一次排序,所以排序字段可以随便指定,但是在流处理中,数据量是源源不断的产生,当每做一次排序的时候,就需要将之前的数据都取出来存储,随着时间的推移,数据量会不断的增加,在做排序时计算量非常大。但是按照时间的顺序,时间是有顺序的,可以减少计算的代价。
3、也可以选择top N 也可以减少计算量。
4、在Flink中做排序时,需要考虑计算代价的问题,一般使用的排序的字段是时间字段。
- SET 'execution.runtime-mode' = 'streaming';
- -- 创建kafka 表
- CREATE TABLE students_kafka (
- sid STRING,
- name STRING,
- age INT,
- sex STRING,
- clazz STRING,
- proctime as PROCTIME()
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'students', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'csv' -- 读取数据的格式
- );
- -- 在流处理模式下,flink只能按照时间字段进行升序排序
-
- -- 如果按照一个普通字段进行排序,在流处理模式下,每来一条新的数据都需重新计算之前的顺序,计算代价太大
- -- 在row_number基础上增加条件,可以限制计算的代价不断增加
-
- select * from (
- select
- *,
- row_number() over(partition by clazz order by age desc) as r
- from
- students_kafka
- )
- where r <= 2;
-
-
- -- 在流处理模式下,flink只能按照时间字段进行升序排序
- select
- *,
- sum(age) over(partition by clazz order by proctime)
- from
- students_kafka
-
-
- -- 时间边界
- -- RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
- select
- *,
- sum(age) over(
- partition by clazz
- order by proctime
- -- 统计最近10秒的数据
- RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
- )
- from
- students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */;
-
-
- -- 数据边界
- --ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
- select
- *,
- sum(age) over(
- partition by clazz
- order by proctime
- -- 统计最近10秒的数据
- ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
- )
- from
- students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */;
-
-
- kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
-
- 1500100003,tom,22,女,理科六班
在使用order by进行排序的时候,排序的字段中必须使用到时间字段:
- -- 排序字段必须带上时间升序排序,使用到时间字段:proctime
- select * from
- students_kafka
- order by proctime,age;
-
- -- 限制排序的计算代价,避免全局排序,在使用限制的时候,在做排序的时候,就只需要对限制的进行排序,减少了计算的代价。
-
- select *
- from
- students_kafka
- order by age
- limit 10;
- CREATE TABLE students_kafka (
- sid STRING,
- name STRING,
- age INT,
- sex STRING,
- clazz STRING,
- proctime as PROCTIME()
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'students', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'csv' -- 读取数据的格式
- );
- kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
- 1500100003,tom,22,女,理科六班
-
- select * from (
- select
- sid,name,age,
- row_number() over(partition by sid order by proctime) as r
- from students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */
- )
- where r = 1;
Regular Joins: 主要用于批处理,如果在流处理上使用,状态会越来越大
Interval Join: 主要用于双流join
Temporal Joins:用于流表关联时态表(不同时间状态不一样,比如汇率表)
Lookup Join:用于流表关联维表(不怎么变化的表)
- CREATE TABLE students_hdfs_batch (
- sid STRING,
- name STRING,
- age INT,
- sex STRING,
- clazz STRING
- )WITH (
- 'connector' = 'filesystem', -- 必选:指定连接器类型
- 'path' = 'hdfs://master:9000/data/student', -- 必选:指定路径
- 'format' = 'csv' -- 必选:文件系统连接器指定 format
- );
-
- CREATE TABLE score_hdfs_batch (
- sid STRING,
- cid STRING,
- score INT
- )WITH (
- 'connector' = 'filesystem', -- 必选:指定连接器类型
- 'path' = 'hdfs://master:9000/data/score', -- 必选:指定路径
- 'format' = 'csv' -- 必选:文件系统连接器指定 format
- );
-
- SET 'execution.runtime-mode' = 'batch';
-
- -- inner join
- select a.sid,a.name,b.score from
- students_hdfs_batch as a
- inner join
- score_hdfs_batch as b
- on a.sid=b.sid;
-
- -- left join
- select a.sid,a.name,b.score from
- students_hdfs_batch as a
- left join
- score_hdfs_batch as b
- on a.sid=b.sid;
-
- -- full join
- select a.sid,a.name,b.score from
- students_hdfs_batch as a
- full join
- score_hdfs_batch as b
- on a.sid=b.sid;
-
- CREATE TABLE students_kafka (
- sid STRING,
- name STRING,
- age INT,
- sex STRING,
- clazz STRING
- )WITH (
- 'connector' = 'kafka',
- 'topic' = 'students', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'csv', -- 读取数据的格式
- 'csv.ignore-parse-errors' = 'true' -- 如果数据解析异常自动跳过当前行
- );
- kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
- 1500100001,tom,22,女,文科六班
- 1500100002,tom1,24,男,文科六班
- 1500100003,tom2,22,女,理科六班
-
- CREATE TABLE score_kafka (
- sid STRING,
- cid STRING,
- score INT
- )WITH (
- 'connector' = 'kafka',
- 'topic' = 'scores', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'csv', -- 读取数据的格式
- 'csv.ignore-parse-errors' = 'true'
- );
- kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
- 1500100001,1000001,98
- 1500100001,1000002,5
- 1500100001,1000003,137
-
-
- SET 'execution.runtime-mode' = 'streaming';
-
- -- 使用常规关联方式做流处理,flink会将两个表的数据一直保存在状态中,状态会越来越大
- -- 可以设置状态有效期避免状态无限增大
- SET 'table.exec.state.ttl' = '5000';
-
- -- full join
- select a.sid,b.sid,a.name,b.score from
- students_kafka as a
- full join
- score_kafka as b
- on a.sid=b.sid;
假设:前提是流处理模式,需要将两张实时的表中的姓名和成绩关联在一起,此时使用到join,当过了很长一段时间假设是一年,依旧可以将学生姓名和成绩关联在一起,原因就是之前的数据都会存储在状态中,但是也会产生问题,随着时间的推移,状态中的数据会越来越多。可能会导致任务失败。
可以通过参数指定保存状态的时间,时间一过,状态就会消失,数据就不存在:
- -- 使用常规关联方式做流处理,flink会将两个表的数据一直保存在状态中,状态会越来越大
- -- 可以设置状态有效期避免状态无限增大
- SET 'table.exec.state.ttl' = '5000';
-
-
-
- 'csv.ignore-parse-errors' = 'true'
- -- 如果数据解析异常自动跳过当前行
两个表在join时只关联一段时间内的数据,之前的数据就不需要保存在状态中,可以避免状态无限增大
- CREATE TABLE students_kafka_time (
- sid STRING,
- name STRING,
- age INT,
- sex STRING,
- clazz STRING,
- ts TIMESTAMP(3),
- WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
- )WITH (
- 'connector' = 'kafka',
- 'topic' = 'students', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'csv', -- 读取数据的格式
- 'csv.ignore-parse-errors' = 'true' -- 如果数据解析异常自动跳过当前行
- );
- kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
- 1500100001,tom,22,女,文科六班,2023-11-10 17:10:10
- 1500100001,tom1,24,男,文科六班,2023-11-10 17:10:11
- 1500100001,tom2,22,女,理科六班,2023-11-10 17:10:12
-
- CREATE TABLE score_kafka_time (
- sid STRING,
- cid STRING,
- score INT,
- ts TIMESTAMP(3),
- WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
- )WITH (
- 'connector' = 'kafka',
- 'topic' = 'scores', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'csv', -- 读取数据的格式
- 'csv.ignore-parse-errors' = 'true'
- );
- kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
- 1500100001,1000001,98,2023-11-10 17:10:09
- 1500100001,1000002,5,2023-11-10 17:10:11
- 1500100001,1000003,137,2023-11-10 17:10:12
-
- -- a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts
- -- a表数据的时间需要在b表数据的时间减去5秒到b表数据时间的范围内
- SELECT a.sid,b.sid,a.name,b.score
- FROM students_kafka_time a, score_kafka_time b
- WHERE a.sid = b.sid
- AND a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts
1、用于流表关联时态表,比如订单表和汇率表的关联
2、每一个时间数据都会存在不同的状态,如果只是用普通的关联,之恶能关联到最新的数
- -- 订单表
- CREATE TABLE orders (
- order_id STRING, -- 订单编号
- price DECIMAL(32,2), --订单金额
- currency STRING, -- 汇率编号
- order_time TIMESTAMP(3), -- 订单时间
- WATERMARK FOR order_time AS order_time -- 水位线
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'orders', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'csv' -- 读取数据的格式
- );
-
- kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders
- 001,100,CN,2023-11-11 09:48:10
- 002,200,CN,2023-11-11 09:48:11
- 003,300,CN,2023-11-11 09:48:14
- 004,400,CN,2023-11-11 09:48:16
- 005,500,CN,2023-11-11 09:48:18
-
- -- 汇率表
- CREATE TABLE currency_rates (
- currency STRING, -- 汇率编号
- conversion_rate DECIMAL(32, 2), -- 汇率
- update_time TIMESTAMP(3), -- 汇率更新时间
- WATERMARK FOR update_time AS update_time, -- 水位线
- PRIMARY KEY(currency) NOT ENFORCED -- 主键
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'currency_rates', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'canal-json' -- 读取数据的格式
- );
-
- insert into currency_rates
- values
- ('CN',7.2,TIMESTAMP'2023-11-11 09:48:05'),
- ('CN',7.1,TIMESTAMP'2023-11-11 09:48:10'),
- ('CN',6.9,TIMESTAMP'2023-11-11 09:48:15'),
- ('CN',7.4,TIMESTAMP'2023-11-11 09:48:20');
-
- kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic currency_rates
-
- -- 如果使用常规关联方式,取的时最新的汇率,不是对应时间的汇率
- select a.order_id,b.* from
- orders as a
- left join
- currency_rates as b
- on a.currency=b.currency;
-
-
- -- 时态表join
- -- FOR SYSTEM_TIME AS OF orders.order_time: 使用订单表的时间到汇率表中查询对应时间的数据
- SELECT
- order_id,
- price,
- conversion_rate,
- order_time
- FROM orders
- LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
- ON orders.currency = currency_rates.currency;
1、传统的方式是将数据库中的数据都读取到流表中,当来一条数据就会取关联一条数据。如果数据库中学生表更新了,flink不知道,关联不到最新的数据。
2、Look Join使用的原理:是当流表中的数据发生改变的时候,就会使用关联字段维表的数据源中查询数据。
优化:
在使用的时候可以使用缓存,将数据进行缓存,但是随着时间的推移,缓存的数量就会越来大,此时就可以对缓存设置一个过期时间。可以在建表的时候设置参数:
- 'lookup.cache.max-rows' = '1000', -- 缓存的最大行数
- 'lookup.cache.ttl' = '20000' -- 缓存过期时间
- -- 学生表
- CREATE TABLE students_jdbc (
- id BIGINT,
- name STRING,
- age BIGINT,
- gender STRING,
- clazz STRING,
- PRIMARY KEY (id) NOT ENFORCED -- 主键
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://master:3306/student',
- 'table-name' = 'students',
- 'username' ='root',
- 'password' ='123456',
- 'lookup.cache.max-rows' = '1000', -- 缓存的最大行数
- 'lookup.cache.ttl' = '20000' -- 缓存过期时间
- );
-
- -- 分数表
- CREATE TABLE score_kafka (
- sid BIGINT,
- cid STRING,
- score INT,
- proc_time as PROCTIME()
- )WITH (
- 'connector' = 'kafka',
- 'topic' = 'scores', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'csv', -- 读取数据的格式
- 'csv.ignore-parse-errors' = 'true'
- );
- kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
- 1500100001,1000001,98
- 1500100001,1000002,5
- 1500100001,1000003,137
-
-
- -- 使用常规关联方式,关联维度表
- -- 1、任务在启动的时候会将维表加载到flink 的状态中,如果数据库中学生表更新了,flink不知道,关联不到最新的数据
- select
- b.id,b.name,a.score
- from
- score_kafka as a
- left join
- students_jdbc as b
- on a.sid=b.id;
-
-
- -- lookup join
- -- FOR SYSTEM_TIME AS OF a.proc_time : 使用关联字段到维表中查询最新的数据
- -- 优点: 流表每来一条数据都会去mysql中查询,可以关联到最新的数据
- -- 每次查询mysql会降低性能
- select
- b.id,b.name,a.score
- from
- score_kafka as a
- left join
- students_jdbc FOR SYSTEM_TIME AS OF a.proc_time as b
- on a.sid=b.id;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。