赞
踩
分层API:
最顶层:SQL/Table API
核心层:DataStream Api
最底层:State Process
功能:管理catalog(Catalog是FlinkSQL的元数据库)、管理数据库、管理表、管理视图、函数等
创建方式
//1.StreamExecutionEnvironment来创建,推荐使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.TableEnvironment来创建
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
Flink中的表分为如下三种:临时表、永久表、外部表
临时表 | 永久表 | 外部表 |
---|---|---|
在会话中创建的 | 可以永久存储 | 连接外部数据源 |
和外部表结合使用,用的最多 | 基本不用 | 和临时结合使用,用的最多 |
FlinkSQL客户端,是Flink自带的SQL客户端工具。我们可以使用该客户端进行SQL代码编写,任务提交等。
#1.启动Flink集群
/FLINK_HOME/bin/start-cluster.sh
#2.进入SQL-Client客户端
bin/sql-client.sh
table模式:默认的模式。额外开启新页面显示。
#1.设置显示模式
set 'sql-client.execution.result-mode' = 'table';
changelog模式:变更日志的模式。额外开启新页面显示
#1.设置显示模式
set 'sql-client.execution.result-mode' = 'changelog';
tableau模式:和传统关系型数据库类似,在当前页面显示
#1.设置显示模式
set 'sql-client.execution.result-mode' = 'tableau';
Table 的基本概念及常用 API
Flink SQL> create table source_random( > sku_id string,price bigint ) with( > 'connector'='datagen', > 'rows-per-second'='1', > 'fields.sku_id.length'='2', > 'fields.price.kind'='random', > 'fields.price.min'='1', > 'fields.price.max'='1000' > ); [INFO] Execute statement succeed. Flink SQL> create table sink_tb( > sku_id string primary key not enforced, > count_result bigint, > sum_result bigint, > avg_result double, > min_result bigint, > max_result bigint > )with( > 'connector'='upsert-kafka', > 'topic'='test02', > 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092', > 'key.format'='json', > 'value.format'='json' > ); [INFO] Execute statement succeed. Flink SQL> insert into sink_tb > select sku_id , > count(*) as count_result, > sum(price) as sum_result, > avg(price) as avg_result, > min(price) as min_result, > max(price) as max_result > from source_random > group by sku_id; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 7a6f1c3b58103978e35f288705834cb1
Flink SQL 内置了很多常见的数据类型,并且也为用户提供了自定义数据类型的能力。总共包含2部分:
1、原子数据类型
1.字符&字符串类型:string、varchar --不需要指定字符的长度
2.二进制类型:binary
3.数值类型:decimal、int、bigint、smallint
4.精度类型:float、double
5.null类型:null
6.时间&日期类型:date、time、datetime、timestamp
2、复合数据类型
1、数组类型:array
2、map类型:map
3、集合类型:multiset
4、Row类型:Row
动态表:实时动态变化的表。表数据不是固定不变的。
连续查询:持续不断地查询,从未间断。并不是一次性查询。
数据输入 | 数据处理 | 数据输出 |
---|---|---|
静态表,数据是界的,是固定的 | 一次性处理 | 数据是固定的,是有界的 |
动态表,数据是源源不断产生的,是动态变化的 | 持续不断地查询 | 是动态变化的,是无界的 |
时间是为窗口服务的。
数据被Flink程序的Source算子加载的时间,几乎不用
数据被Flink处理的时间,一般可以用处理完成的时间来表示,很少用
eg:创建带有Flink处理时间pt的表,使用函数proctime()
Flink SQL> create table source_with_proctime(
> word string,
> `date` string,
> pt as proctime()
> )with (
> 'connector'='filesystem',
> 'path'='file:///root/wordcount.txt',
> 'format'='csv'
> );
[INFO] Execute statement succeed.
设置显示模式为tableau输出,和传统关系型数据库类似,在当前页面显示。
Flink SQL> set 'sql-client.execution.result-mode'='tableau';
[INFO] Execute statement succeed.
查询表数据
Flink SQL> select * from source_with_proctime;
+----+--------------------------------+--------------------------------+-------------------------+
| op | word | date | pt |
+----+--------------------------------+--------------------------------+-------------------------+
| +I | hello | 2020/2/13 | 2023-08-27 16:38:54.436 |
| +I | python | 2020/2/13 | 2023-08-27 16:38:54.436 |
| +I | hive | 2020/2/14 | 2023-08-27 16:38:54.436 |
| +I | hue | 2020/2/14 | 2023-08-27 16:38:54.436 |
| +I | hadoop | 2020/2/15 | 2023-08-27 16:38:54.436 |
| +I | spark | 2020/2/15 | 2023-08-27 16:38:54.436 |
| +I | hive | 2020/2/15 | 2023-08-27 16:38:54.436 |
| +I | flink | 2020/2/16 | 2023-08-27 16:38:54.436 |
+----+--------------------------------+--------------------------------+-------------------------+
Received a total of 8 rows
查看表结构
Flink SQL> desc source_with_proctime;
+------+-----------------------------+-------+-----+---------------+-----------+
| name | type | null | key | extras | watermark |
+------+-----------------------------+-------+-----+---------------+-----------+
| word | STRING | TRUE | | | |
| date | STRING | TRUE | | | |
| pt | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
+------+-----------------------------+-------+-----+---------------+-----------+
3 rows in set
时间产生的时间,Flink没有关系,用的最多
Flink SQL> create table source_table_with_eventtime( > word string, > `date` string, > row_time timestamp(3), > watermark for row_time as row_time - interval '0' second > )with( > 'connector'='filesystem', > 'path'='file:///root/wordcount.txt', > 'format'='csv' > ); [INFO] Execute statement succeed. Flink SQL> desc source_table_with_eventtime; +----------+------------------------+------+-----+--------+----------------------------------+ | name | type | null | key | extras | watermark | +----------+------------------------+------+-----+--------+----------------------------------+ | word | STRING | TRUE | | | | | date | STRING | TRUE | | | | | row_time | TIMESTAMP(3) *ROWTIME* | TRUE | | | `row_time` - INTERVAL '0' SECOND | +----------+------------------------+------+-----+--------+----------------------------------+ 3 rows in set
在Flink中,窗口有如下这些:
滚动窗口(Tumble)
滑动窗口(hop、slide)
会话窗口(session)
渐进式窗口(cumulate)
聚合窗口(over)
窗口的特点:固定大小、边界、起始、结束
窗口的划分: 计算公式:第一条数据的事件时间-(第一条数据的事件时间%窗口大小)
前提:第一条数据的事件时间为1,窗口大小为5
窗口划分:1 - (1 % 5) = 1 - (1) = 0
由于窗口大小是5秒,所以,第一个窗口就固定了:[0,5)
第一个窗口确定好了后,后续的窗口也就排布好了。依次如下:[0,5),[5,10),[10,15)…
窗口往前滑动的距离 = 窗口大小;数据不重复不丢失;应用场景较多,如统计早高峰出行人数
#1.创建source表 CREATE TABLE tumble_table ( user_id STRING, price BIGINT, `timestamp` bigint, row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)), watermark for row_time as row_time - interval '0' second ) WITH ( 'connector' = 'socket', 'hostname' = 'node1', 'port' = '9999', 'format' = 'csv' ); #2.执行查询语句 select user_id,count(*) as pv,sum(price) as sum_price, UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000 as window_start, UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000 as window_end from tumble_table group by user_id, tumble(row_time, interval '5' second);
eg:滚动窗口DataStream API
public class Demo01_TumbleWindow { public static void main(String[] args) throws Exception { // 1.创建流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.获取数据源 DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999); // 3.数据源处理 // 3.1将数据转换成Tuple3类型(id,price,ts) SingleOutputStreamOperator<Tuple3<String, Integer, Long>> mapDS = socketDS.map(new MapFunction<String, Tuple3<String, Integer, Long>>() { @Override public Tuple3<String, Integer, Long> map(String value) throws Exception { String[] words = value.split(","); return Tuple3.of(words[0], Integer.valueOf(words[1]), Long.valueOf(words[2])); } }); // 3.2对数据添加水印 SingleOutputStreamOperator<Tuple3<String, Integer, Long>> watermarkDS = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Long>>() { @Override public long extractTimestamp(Tuple3<String, Integer, Long> element, long recordTimestamp) { return element.f2 * 1000L; } })); watermarkDS.print("数据源:"); // 3.3分组 SingleOutputStreamOperator<Tuple2<String, Integer>> result = watermarkDS.keyBy(value -> value.f0) //对数据进行窗口的划分,这里指定为Tumble窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 对Tumble窗口内的price字段精选sum操作 .sum(1) // 把计算结果进行map转换操作,转成Tuple2<String,Integer>类型 .map(new MapFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(Tuple3<String, Integer, Long> value) throws Exception { return Tuple2.of(value.f0, value.f1); } }); // 4.输出聚合结果 result.print("聚合后的结果为:"); // 执行流式环境 env.execute(); } }
滑动距离 != 窗口大小;
滑动距离< 窗口大小:数据被计算多次;
滑动距离> 窗口大小,数据丢失,不讨论
应用场景:每隔一分钟统计,如双十一成交额
--1.创建表 CREATE TABLE source_table ( user_id STRING, price BIGINT, `timestamp` bigint, row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)), watermark for row_time as row_time - interval '0' second ) WITH ( 'connector' = 'socket', 'hostname' = 'node1', 'port' = '9999', 'format' = 'csv' ); --2.查询的SQL SELECT user_id, UNIX_TIMESTAMP(CAST(hop_start(row_time, interval '2' SECOND, interval '5' SECOND) AS STRING)) * 1000 as window_start, UNIX_TIMESTAMP(CAST(hop_end(row_time, interval '2' SECOND, interval '5' SECOND) AS STRING)) * 1000 as window_end, sum(price) as sum_price FROM source_table GROUP BY user_id, hop(row_time, interval '2' SECOND, interval '5' SECOND);
[root@node1 ~]# nc -lk 9999
1,1,0
1,1,1
1,1,3
1,1,4
1,1,5
eg:滑动窗口DataStream API
public class Demo02_SlideWindow { public static void main(String[] args) throws Exception { // 1.构建流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.数据源 DataStreamSource<String> souceDS = env.socketTextStream("node1", 9999); // 3.数据处理 // 3.1 将数据转换成Tuple3(id,price,ts) SingleOutputStreamOperator<Tuple3<String, Integer, Long>> mapDS = souceDS.map(new MapFunction<String, Tuple3<String, Integer, Long>>() { @Override public Tuple3<String, Integer, Long> map(String value) throws Exception { String[] words = value.split(","); return Tuple3.of(words[0], Integer.valueOf(words[1]), Long.valueOf(words[2])); } }); // 3.2对数据添加水印 SingleOutputStreamOperator<Tuple3<String, Integer, Long>> watermarkDS = mapDS .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Long>>() { @Override public long extractTimestamp(Tuple3<String, Integer, Long> element, long recordTimestamp) { return element.f2 * 1000L; } })); watermarkDS.print("数据源:"); // 3.3分组 SingleOutputStreamOperator<Tuple2<String, Integer>> result = watermarkDS.keyBy(value -> value.f0) //对数据进行滑动窗口输出,滑动范围是2s,窗口范围是5s .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(2))) .sum(1) // 对数据进行格式转换,转换为Tuple2<>输出 .map(new MapFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(Tuple3<String, Integer, Long> value) throws Exception { return Tuple2.of(value.f0, value.f1); } }); // 4.数据输出 result.printToErr("聚合后的结果为:"); // 5.启动流式任务 env.execute(); } }
相邻的两条数据的事件时间没有超过会话间隔,则会落入同一个窗口,反之不落入窗口
特点:窗口大小不固定,窗户的session gap(会话间隔)是固定的
应用场景,如在线人工客服
--1.创建表 CREATE TABLE source_table ( user_id STRING, price BIGINT, `timestamp` bigint, row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)), watermark for row_time as row_time - interval '0' second ) WITH ( 'connector' = 'socket', 'hostname' = 'node1', 'port' = '9999', 'format' = 'csv' ); ---2.执行SQL SELECT user_id, UNIX_TIMESTAMP(CAST(session_start(row_time, interval '5' SECOND) AS STRING)) * 1000 as window_start, UNIX_TIMESTAMP(CAST(session_end(row_time, interval '5' SECOND) AS STRING)) * 1000 as window_end, sum(price) as sum_price FROM source_table GROUP BY user_id, session(row_time, interval '5' SECOND);
eg:会话窗口DataStream API
public class SessionWindow { public static void main(String[] args) throws Exception { // 1.创建流式执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.源数据 DataStreamSource<String> sourceDS = env.socketTextStream("node1", 9999); // 3.1数据格式化,转换成Tuple3<id,price,ts>,并加上水印 SingleOutputStreamOperator<Tuple3<String, Integer, Long>> mapDS = sourceDS.map((String value)->{ String[] words = value.split(","); return Tuple3.of(words[0], Integer.valueOf(words[1]), Long.valueOf(words[2])); }).returns(Types.TUPLE(Types.STRING,Types.INT,Types.LONG)) .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forMonotonousTimestamps() .withTimestampAssigner((Tuple3<String, Integer, Long> element, long recordTimestamp) -> element.f2 * 1000L)); // 打印输出源数据 mapDS.print("数据源:"); // 3.3 数据会话窗口分组聚合,按照value.f0(id)分组,窗口大小为5s,每隔5s触发一次聚合 SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapDS.keyBy(value -> value.f0) .window(EventTimeSessionWindows.withGap(Time.seconds(5))) .sum(1).map((Tuple3<String, Integer, Long> value)->Tuple2.of(value.f0, value.f1)) .returns(Types.TUPLE(Types.STRING,Types.INT)); // 4.结果输出 result.print("聚合后的结果为:"); // 5.创建执行环境 env.execute(); } }
滚在一定的时间范围内,结果程序线性递增的趋势,应用场景不多,大多可以滚动窗口替代
在一定时间范围内,结果呈现单调递增趋势
--1.创建表 CREATE TABLE source_table ( user_id BIGINT, money BIGINT, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), -- watermark 设置 WATERMARK FOR row_time AS row_time - INTERVAL '0' SECOND ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.user_id.min' = '1', 'fields.user_id.max' = '100000', 'fields.money.min' = '1', 'fields.money.max' = '100000' ); --2.执行的SQL SELECT FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(window_start AS STRING))) as window_start, FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(window_end AS STRING))) as window_end, sum(money) as sum_money, count(distinct user_id) as count_distinct_id FROM TABLE(CUMULATE( TABLE source_table , DESCRIPTOR(row_time) , INTERVAL '5' SECOND , INTERVAL '30' SECOND)) GROUP BY window_start, window_end;
1、时间区间聚合
按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例1h区间,最新输出的一条数据的sum聚合结果就是最近一小时数据的amount之和。
--2.执行SQL
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 标识统计范围是一个 product 的最近 30秒的数据
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table;
2、行数聚合
按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近100行数据的 amount 之和。
如果窗口加上了Watermark,则窗口的触发计算由Watermark的时间来决定。水印定义延时计算的时间
--1.创建表 CREATE TABLE source_table ( user_id STRING, price BIGINT, `timestamp` bigint, row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)), watermark for row_time as row_time - interval '2' second --数据允许两秒延迟到达 ) WITH ( 'connector' = 'socket', 'hostname' = 'node1', 'port' = '9999', 'format' = 'csv'); --2.查询SQL select user_id,count(*) as pv,sum(price) as sum_price, UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000 as window_start, UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000 as window_end from source_table group by user_id,tumble(row_time, interval '5' second);
FlinkSQL中,数据一旦超过Watermark的延迟时间,会丢失。这种数据称之为严重迟到的数据。
1、窗口加水印:watermark设置为2,可以允许2秒的延迟,能够处理2秒内延迟到达的数据,如下代码中3.2设置。
OutofOrdernessTimestamp固定延迟水印,正常迟到数据会被正常处理,严重迟到的丢失
MonotonousTimetimestamp(普通):单调递增水印,类似于延时时间为0,正常迟到数据会丢失
2、AllowLateness:允许延迟。就是在固定延迟水印的基础之上,还能迟到的时间。如下代码中3.3.1设置。
3、SideOutput:侧道输出,侧输出流。可以收集任何严重迟到的数据,不会允许数据丢失。如下代码中3.3.2设置。
// 需求:测试多并行度时,是否会有数据丢失,数据的执行情况,延时销毁窗口、侧道输出 public class Demo06_MultiParallelism { public static void main(String[] args) throws Exception { // 1.创建流式执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 env.setParallelism(2); // 2.获取数据流 DataStreamSource<String> sourceDS = env.socketTextStream("node1", 9999); // 3.处理数据 3.1将数据map转换 SingleOutputStreamOperator<WaterSensor> mapDS = sourceDS.map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor02 map(String value) throws Exception { String[] strs = value.split(","); return new WaterSensor02(strs[0], Integer.valueOf(strs[1]), Long.valueOf(strs[2])); } }); // 3.2设置水位线,这里是固定延时水印 // forMonotonousTimestamps:单调递增水印 // forBoundedOutOfOrderness:固定延时水印,用的最多 // forGenerator:自定义水印,基本不用; // noWatermarks:没有水印,基本上不用 SingleOutputStreamOperator<WaterSensor> watermarkDS = mapDS // 给数据添加2s的延时处理时间 .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)) //设置最大空闲等待时间,设置为30秒,最多等待30s(多线程并行处理时候的等待时间) .withIdleness(Duration.ofSeconds(30)) // 指定时间戳 .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { System.out.println("线程ID为:"+Thread.currentThread().getId()+ "处理了时间时间为"+element.getTs()+"的数据"); return element.getTs() * 1000; } })); // 3.3 分组聚合操作 // 设置侧道输出的对象 OutputTag<WaterSensor> outputTag = new OutputTag<WaterSensor02>("lateDate",Types.GENERIC(WaterSensor.class)); SingleOutputStreamOperator<String> result = watermarkDS.keyBy(value -> value.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(5))) //3.3.1延时销毁:延时窗口的销毁时间,延时1秒,第八秒后窗口关闭 .allowedLateness(Time.seconds(1)) //3.3.2侧道输出,将严重迟到的数据侧道输出 .sideOutputLateData(outputTag) .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { out.collect("分组的key为" + s + "\n窗口的数据为" + elements + "\n窗口的数据量为" + elements.spliterator().estimateSize() + "\n窗口的大小为[" + context.window().getStart() + "," + context.window().getEnd() + ")" + "\n当前的水印为" + context.currentWatermark()); } }); // 4.1正常输出 result.print("正常数据:"); // 4.2侧输出 result.getSideOutput(outputTag).printToErr("严重迟到的数据"); // 5.创建执行环境 env.execute(); } }
注解:
Data:能够生成getter、setter、toString等函数
AllArgsConstructor:全参构建器的注解
NoArgsConstructor:空参构造器的注解
eg:自定义的水位传感器类
@AllArgsConstructor
@NoArgsConstructor
@Data
private static class WaterSensor {
//传感器的id
private String id;
//vc:value count,水位器的数据信息
private Integer vc;
//ts:timestamp,事件时间
private Long ts;
}
Flink怎么保证数据不丢失!
在FlinkSQL层次,设置Watermark的延迟时间,可以处理正常迟到的数据。对于严重迟到的数据,FlinkSQL没办法处理,直接丢弃。
在DataStream API 中,代码层次中,我们同样可以设置固定延迟水印机制,保证正常迟到的数据被正确处理,如果超过了固定延迟收水印时间,可以再设置AllowedLateness(允许在固定延迟之后再延迟)=,允许在固定延迟之后再延迟一段时间,如果再超过了AllowedLateness之后,我们可以通过设置侧道输出(侧输出流)机制,把严重迟到的数据收集起来。保证数据的不丢失。
如果是多并行度下,可以设置空闲等待时间,来让多个多并行度的数据正常处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。