当前位置:   article > 正文

Flink(3)时间、窗口、水印_flink sql 和 flink api 的设置调水印的方式

flink sql 和 flink api 的设置调水印的方式

1 StreamTableEnvironment对象介绍

分层API:
最顶层:SQL/Table API
核心层:DataStream Api
最底层:State Process
功能:管理catalog(Catalog是FlinkSQL的元数据库)、管理数据库、管理表、管理视图、函数等
创建方式

//1.StreamExecutionEnvironment来创建,推荐使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.TableEnvironment来创建
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Flink中的表分为如下三种:临时表、永久表、外部表

临时表永久表外部表
在会话中创建的可以永久存储连接外部数据源
和外部表结合使用,用的最多基本不用和临时结合使用,用的最多

2 FlinkSQL客户端

FlinkSQL客户端,是Flink自带的SQL客户端工具。我们可以使用该客户端进行SQL代码编写,任务提交等。

#1.启动Flink集群
/FLINK_HOME/bin/start-cluster.sh
#2.进入SQL-Client客户端
bin/sql-client.sh
  • 1
  • 2
  • 3
  • 4

2.1 显示模式

table模式:默认的模式。额外开启新页面显示。

#1.设置显示模式
set 'sql-client.execution.result-mode' = 'table';
  • 1
  • 2

changelog模式:变更日志的模式。额外开启新页面显示

#1.设置显示模式
set 'sql-client.execution.result-mode' = 'changelog';
  • 1
  • 2

tableau模式:和传统关系型数据库类似,在当前页面显示

#1.设置显示模式
set 'sql-client.execution.result-mode' = 'tableau';
  • 1
  • 2

2.2 综合案例

Table 的基本概念及常用 API

Flink SQL> create table source_random(
> sku_id string,price bigint ) with(
> 'connector'='datagen',
> 'rows-per-second'='1',
> 'fields.sku_id.length'='2',
> 'fields.price.kind'='random',
> 'fields.price.min'='1',
> 'fields.price.max'='1000'
> );
[INFO] Execute statement succeed.

Flink SQL> create table sink_tb(
> sku_id string primary key not enforced,
> count_result bigint,
> sum_result bigint,
> avg_result double,
> min_result bigint,
> max_result bigint
> )with(
> 'connector'='upsert-kafka',
> 'topic'='test02',
> 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
> 'key.format'='json',
> 'value.format'='json'
> );
[INFO] Execute statement succeed.

Flink SQL> insert into sink_tb
> select sku_id ,
> count(*) as count_result,
> sum(price) as sum_result,
> avg(price) as avg_result,
> min(price) as min_result,
> max(price) as max_result
> from source_random
> group by sku_id;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 7a6f1c3b58103978e35f288705834cb1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

3 数据类型

Flink SQL 内置了很多常见的数据类型,并且也为用户提供了自定义数据类型的能力。总共包含2部分:
1、原子数据类型

1.字符&字符串类型:string、varchar --不需要指定字符的长度
2.二进制类型:binary
3.数值类型:decimal、int、bigint、smallint
4.精度类型:float、double
5.null类型:null
6.时间&日期类型:date、time、datetime、timestamp

2、复合数据类型

1、数组类型:array
2、map类型:map
3、集合类型:multiset
4、Row类型:Row

4 动态表 & 连续查询

动态表:实时动态变化的表。表数据不是固定不变的。
连续查询:持续不断地查询,从未间断。并不是一次性查询。

4.1 动态表

数据输入数据处理数据输出
静态表,数据是界的,是固定的一次性处理数据是固定的,是有界的
动态表,数据是源源不断产生的,是动态变化的持续不断地查询是动态变化的,是无界的

5 时间

时间是为窗口服务的。

5.1 摄入时间(Processing Time)

数据被Flink程序的Source算子加载的时间,几乎不用

5.2 处理时间(Processing Time)

数据被Flink处理的时间,一般可以用处理完成的时间来表示,很少用
eg:创建带有Flink处理时间pt的表,使用函数proctime()

Flink SQL> create table source_with_proctime(
> word string,
> `date` string,
> pt as proctime()
> )with (
> 'connector'='filesystem',
> 'path'='file:///root/wordcount.txt',
> 'format'='csv'
> );
[INFO] Execute statement succeed.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

设置显示模式为tableau输出,和传统关系型数据库类似,在当前页面显示。

Flink SQL> set 'sql-client.execution.result-mode'='tableau';
[INFO] Execute statement succeed.
  • 1
  • 2

查询表数据

Flink SQL> select * from source_with_proctime;
+----+--------------------------------+--------------------------------+-------------------------+
| op |                           word |                           date |                      pt |
+----+--------------------------------+--------------------------------+-------------------------+
| +I |                          hello |                      2020/2/13 | 2023-08-27 16:38:54.436 |
| +I |                         python |                      2020/2/13 | 2023-08-27 16:38:54.436 |
| +I |                           hive |                      2020/2/14 | 2023-08-27 16:38:54.436 |
| +I |                            hue |                      2020/2/14 | 2023-08-27 16:38:54.436 |
| +I |                         hadoop |                      2020/2/15 | 2023-08-27 16:38:54.436 |
| +I |                          spark |                      2020/2/15 | 2023-08-27 16:38:54.436 |
| +I |                           hive |                      2020/2/15 | 2023-08-27 16:38:54.436 |
| +I |                          flink |                      2020/2/16 | 2023-08-27 16:38:54.436 |
+----+--------------------------------+--------------------------------+-------------------------+
Received a total of 8 rows
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

查看表结构

Flink SQL> desc source_with_proctime;
+------+-----------------------------+-------+-----+---------------+-----------+
| name |                        type |  null | key |        extras | watermark |
+------+-----------------------------+-------+-----+---------------+-----------+
| word |                      STRING |  TRUE |     |               |           |
| date |                      STRING |  TRUE |     |               |           |
|   pt | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |           |
+------+-----------------------------+-------+-----+---------------+-----------+
3 rows in set
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

5.3 事件时间(Event Time)

时间产生的时间,Flink没有关系,用的最多

Flink SQL> create table source_table_with_eventtime(
> word string,
> `date` string,
> row_time timestamp(3),
> watermark for row_time as row_time - interval '0' second
> )with(
> 'connector'='filesystem',
> 'path'='file:///root/wordcount.txt',
> 'format'='csv'
> );
[INFO] Execute statement succeed.

Flink SQL> desc source_table_with_eventtime;
+----------+------------------------+------+-----+--------+----------------------------------+
|     name |                   type | null | key | extras |                        watermark |
+----------+------------------------+------+-----+--------+----------------------------------+
|     word |                 STRING | TRUE |     |        |                                  |
|     date |                 STRING | TRUE |     |        |                                  |
| row_time | TIMESTAMP(3) *ROWTIME* | TRUE |     |        | `row_time` - INTERVAL '0' SECOND |
+----------+------------------------+------+-----+--------+----------------------------------+
3 rows in set
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

6 窗口

在Flink中,窗口有如下这些:

滚动窗口(Tumble)
滑动窗口(hop、slide)
会话窗口(session)
渐进式窗口(cumulate)
聚合窗口(over)

窗口的特点:固定大小、边界、起始、结束

窗口的划分: 计算公式:第一条数据的事件时间-(第一条数据的事件时间%窗口大小)

前提:第一条数据的事件时间为1,窗口大小为5
窗口划分:1 - (1 % 5) = 1 - (1) = 0
由于窗口大小是5秒,所以,第一个窗口就固定了:[0,5)
第一个窗口确定好了后,后续的窗口也就排布好了。依次如下:[0,5),[5,10),[10,15)…

6.1 滚动窗口

窗口往前滑动的距离 = 窗口大小;数据不重复不丢失;应用场景较多,如统计早高峰出行人数

#1.创建source表
CREATE TABLE tumble_table ( 
 user_id STRING, 
 price BIGINT,
 `timestamp` bigint,
 row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
 watermark for row_time as row_time - interval '0' second
) WITH (
  'connector' = 'socket',
  'hostname' = 'node1',        
  'port' = '9999',
  'format' = 'csv'
);
#2.执行查询语句
select user_id,count(*) as pv,sum(price) as sum_price,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000  as window_start,
UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000  as window_end
from tumble_table
group by user_id, tumble(row_time, interval '5' second);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

在这里插入图片描述
在这里插入图片描述
eg:滚动窗口DataStream API

public class Demo01_TumbleWindow {
    public static void main(String[] args) throws Exception {
//        1.创建流式环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//        2.获取数据源
        DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
//        3.数据源处理
//        3.1将数据转换成Tuple3类型(id,price,ts)
        SingleOutputStreamOperator<Tuple3<String, Integer, Long>> mapDS = socketDS.map(new MapFunction<String, Tuple3<String, Integer, Long>>() {
            @Override
            public Tuple3<String, Integer, Long> map(String value) throws Exception {
                String[] words = value.split(",");
                return Tuple3.of(words[0], Integer.valueOf(words[1]), Long.valueOf(words[2]));
            }
        });
//        3.2对数据添加水印
        SingleOutputStreamOperator<Tuple3<String, Integer, Long>> watermarkDS = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, Integer, Long> element, long recordTimestamp) {
                        return element.f2 * 1000L;
                    }
                }));
        watermarkDS.print("数据源:");
//        3.3分组
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = watermarkDS.keyBy(value -> value.f0)
                //对数据进行窗口的划分,这里指定为Tumble窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
//                对Tumble窗口内的price字段精选sum操作
                .sum(1)
//                把计算结果进行map转换操作,转成Tuple2<String,Integer>类型
                .map(new MapFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(Tuple3<String, Integer, Long> value) throws Exception {
                        return Tuple2.of(value.f0, value.f1);
                    }
                });
//        4.输出聚合结果
        result.print("聚合后的结果为:");
//        执行流式环境
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

6.2 滑动窗口

滑动距离 != 窗口大小;
滑动距离< 窗口大小:数据被计算多次;
滑动距离> 窗口大小,数据丢失,不讨论
应用场景:每隔一分钟统计,如双十一成交额

--1.创建表
CREATE TABLE source_table ( 
 user_id STRING, 
 price BIGINT,
 `timestamp` bigint,
 row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
 watermark for row_time as row_time - interval '0' second
) WITH (
  'connector' = 'socket',
  'hostname' = 'node1',        
  'port' = '9999',
  'format' = 'csv'
);
--2.查询的SQL
SELECT user_id,
UNIX_TIMESTAMP(CAST(hop_start(row_time, interval '2' SECOND, interval '5' SECOND) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(hop_end(row_time, interval '2' SECOND, interval '5' SECOND) AS STRING)) * 1000 as window_end, sum(price) as sum_price
FROM source_table
GROUP BY user_id, hop(row_time, interval '2' SECOND, interval '5' SECOND);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

在这里插入图片描述

[root@node1 ~]# nc -lk 9999
1,1,0
1,1,1
1,1,3
1,1,4
1,1,5
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

eg:滑动窗口DataStream API

public class Demo02_SlideWindow {
    public static void main(String[] args) throws Exception {
//        1.构建流式环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        2.数据源
        DataStreamSource<String> souceDS = env.socketTextStream("node1", 9999);
//        3.数据处理
//        3.1 将数据转换成Tuple3(id,price,ts)
        SingleOutputStreamOperator<Tuple3<String, Integer, Long>> mapDS = souceDS.map(new MapFunction<String, Tuple3<String, Integer, Long>>() {
            @Override
            public Tuple3<String, Integer, Long> map(String value) throws Exception {
                String[] words = value.split(",");
                return Tuple3.of(words[0], Integer.valueOf(words[1]), Long.valueOf(words[2]));
            }
        });
//        3.2对数据添加水印
        SingleOutputStreamOperator<Tuple3<String, Integer, Long>> watermarkDS = mapDS
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, Integer, Long> element, long recordTimestamp) {
                        return element.f2 * 1000L;
                    }
                }));
        watermarkDS.print("数据源:");
//        3.3分组
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = watermarkDS.keyBy(value -> value.f0)
                //对数据进行滑动窗口输出,滑动范围是2s,窗口范围是5s
                .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(2)))
                .sum(1)
//                对数据进行格式转换,转换为Tuple2<>输出
                .map(new MapFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(Tuple3<String, Integer, Long> value) throws Exception {
                        return Tuple2.of(value.f0, value.f1);
                    }
                });
//        4.数据输出
        result.printToErr("聚合后的结果为:");
//        5.启动流式任务
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

6.3 会话窗口

相邻的两条数据的事件时间没有超过会话间隔,则会落入同一个窗口,反之不落入窗口
特点:窗口大小不固定,窗户的session gap(会话间隔)是固定的
应用场景,如在线人工客服

--1.创建表
CREATE TABLE source_table ( 
 user_id STRING, 
 price BIGINT,
 `timestamp` bigint,
 row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
 watermark for row_time as row_time - interval '0' second
) WITH (
  'connector' = 'socket',
  'hostname' = 'node1',        
  'port' = '9999',
  'format' = 'csv'
);
---2.执行SQL
SELECT user_id,
UNIX_TIMESTAMP(CAST(session_start(row_time, interval '5' SECOND) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(session_end(row_time, interval '5' SECOND) AS STRING)) * 1000 as window_end, 
    sum(price) as sum_price
FROM source_table
GROUP BY user_id, session(row_time, interval '5' SECOND);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

eg:会话窗口DataStream API

public class SessionWindow {
    public static void main(String[] args) throws Exception {
//        1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        2.源数据
        DataStreamSource<String> sourceDS = env.socketTextStream("node1", 9999);
//        3.1数据格式化,转换成Tuple3<id,price,ts>,并加上水印
        SingleOutputStreamOperator<Tuple3<String, Integer, Long>> mapDS = sourceDS.map((String value)->{
            String[] words = value.split(",");
            return Tuple3.of(words[0], Integer.valueOf(words[1]), Long.valueOf(words[2]));
        }).returns(Types.TUPLE(Types.STRING,Types.INT,Types.LONG))
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forMonotonousTimestamps()
                .withTimestampAssigner((Tuple3<String, Integer, Long> element, long recordTimestamp) -> element.f2 * 1000L));
//        打印输出源数据
        mapDS.print("数据源:");
//        3.3 数据会话窗口分组聚合,按照value.f0(id)分组,窗口大小为5s,每隔5s触发一次聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapDS.keyBy(value -> value.f0)
                .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
                .sum(1).map((Tuple3<String, Integer, Long> value)->Tuple2.of(value.f0, value.f1))
                        .returns(Types.TUPLE(Types.STRING,Types.INT));
//        4.结果输出
        result.print("聚合后的结果为:");
//        5.创建执行环境
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

6.4 渐进式窗口

滚在一定的时间范围内,结果程序线性递增的趋势,应用场景不多,大多可以滚动窗口替代
在一定时间范围内,结果呈现单调递增趋势

--1.创建表
CREATE TABLE source_table (
    user_id BIGINT,
    money BIGINT,
    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    -- watermark 设置
    WATERMARK FOR row_time AS row_time - INTERVAL '0' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.user_id.min' = '1',
  'fields.user_id.max' = '100000',
  'fields.money.min' = '1',
  'fields.money.max' = '100000'
);
--2.执行的SQL
SELECT FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(window_start AS STRING)))  as window_start,
    FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(window_end AS STRING))) as window_end, 
    sum(money) as sum_money,
    count(distinct user_id) as count_distinct_id
FROM TABLE(CUMULATE(
       TABLE source_table
       , DESCRIPTOR(row_time)
       , INTERVAL '5' SECOND
       , INTERVAL '30' SECOND))
GROUP BY window_start, window_end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

6.5 聚合函数

1、时间区间聚合
按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例1h区间,最新输出的一条数据的sum聚合结果就是最近一小时数据的amount之和。

--2.执行SQL
SELECT product, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    -- 标识统计范围是一个 product 的最近 30秒的数据
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM source_table;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2、行数聚合
按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近100行数据的 amount 之和。

7 Watermark水印

如果窗口加上了Watermark,则窗口的触发计算由Watermark的时间来决定。水印定义延时计算的时间

--1.创建表
CREATE TABLE source_table ( 
 user_id STRING, 
 price BIGINT,
 `timestamp` bigint,
 row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
 watermark for row_time as row_time - interval '2' second --数据允许两秒延迟到达
) WITH (
  'connector' = 'socket',
  'hostname' = 'node1', 
  'port' = '9999',
  'format' = 'csv');
--2.查询SQL
select user_id,count(*) as pv,sum(price) as sum_price,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000  as window_start,
UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000  as window_end
from source_table
group by user_id,tumble(row_time, interval '5' second);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

FlinkSQL中,数据一旦超过Watermark的延迟时间,会丢失。这种数据称之为严重迟到的数据。
1、窗口加水印:watermark设置为2,可以允许2秒的延迟,能够处理2秒内延迟到达的数据,如下代码中3.2设置。

OutofOrdernessTimestamp固定延迟水印,正常迟到数据会被正常处理,严重迟到的丢失
MonotonousTimetimestamp(普通):单调递增水印,类似于延时时间为0,正常迟到数据会丢失

2、AllowLateness:允许延迟。就是在固定延迟水印的基础之上,还能迟到的时间。如下代码中3.3.1设置。
3、SideOutput:侧道输出,侧输出流。可以收集任何严重迟到的数据,不会允许数据丢失。如下代码中3.3.2设置。

// 需求:测试多并行度时,是否会有数据丢失,数据的执行情况,延时销毁窗口、侧道输出
public class Demo06_MultiParallelism {
    public static void main(String[] args) throws Exception {
//        1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        设置并行度
        env.setParallelism(2);
//        2.获取数据流
        DataStreamSource<String> sourceDS = env.socketTextStream("node1", 9999);
//        3.处理数据 3.1将数据map转换
        SingleOutputStreamOperator<WaterSensor> mapDS = sourceDS.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor02 map(String value) throws Exception {
                String[] strs = value.split(",");
                return new WaterSensor02(strs[0], Integer.valueOf(strs[1]), Long.valueOf(strs[2]));
            }
        });
//        3.2设置水位线,这里是固定延时水印
//        forMonotonousTimestamps:单调递增水印
//        forBoundedOutOfOrderness:固定延时水印,用的最多
//        forGenerator:自定义水印,基本不用;
//        noWatermarks:没有水印,基本上不用
        SingleOutputStreamOperator<WaterSensor> watermarkDS = mapDS
//                给数据添加2s的延时处理时间
                .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        //设置最大空闲等待时间,设置为30秒,最多等待30s(多线程并行处理时候的等待时间)
                        .withIdleness(Duration.ofSeconds(30))
//               指定时间戳
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        System.out.println("线程ID为:"+Thread.currentThread().getId()+
                                "处理了时间时间为"+element.getTs()+"的数据");
                        return element.getTs() * 1000;
                    }
                }));
//        3.3 分组聚合操作
//        设置侧道输出的对象
        OutputTag<WaterSensor> outputTag = new OutputTag<WaterSensor02>("lateDate",Types.GENERIC(WaterSensor.class));
        SingleOutputStreamOperator<String> result = watermarkDS.keyBy(value -> value.getId())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                //3.3.1延时销毁:延时窗口的销毁时间,延时1秒,第八秒后窗口关闭
                .allowedLateness(Time.seconds(1))
                //3.3.2侧道输出,将严重迟到的数据侧道输出
                .sideOutputLateData(outputTag)
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                        out.collect("分组的key为" + s +
                                "\n窗口的数据为" + elements +
                                "\n窗口的数据量为" + elements.spliterator().estimateSize() +
                                "\n窗口的大小为[" + context.window().getStart() + "," + context.window().getEnd() + ")"
                                + "\n当前的水印为" + context.currentWatermark());
                    }
                });
//        4.1正常输出
        result.print("正常数据:");
//        4.2侧输出
        result.getSideOutput(outputTag).printToErr("严重迟到的数据");
//        5.创建执行环境
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

注解:
Data:能够生成getter、setter、toString等函数
AllArgsConstructor:全参构建器的注解
NoArgsConstructor:空参构造器的注解
eg:自定义的水位传感器类

    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    private static class WaterSensor {
        //传感器的id
        private String id;

        //vc:value count,水位器的数据信息
        private Integer vc;

        //ts:timestamp,事件时间
        private Long ts;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Flink怎么保证数据不丢失!

在FlinkSQL层次,设置Watermark的延迟时间,可以处理正常迟到的数据。对于严重迟到的数据,FlinkSQL没办法处理,直接丢弃。
在DataStream API 中,代码层次中,我们同样可以设置固定延迟水印机制,保证正常迟到的数据被正确处理,如果超过了固定延迟收水印时间,可以再设置AllowedLateness(允许在固定延迟之后再延迟)=,允许在固定延迟之后再延迟一段时间,如果再超过了AllowedLateness之后,我们可以通过设置侧道输出(侧输出流)机制,把严重迟到的数据收集起来。保证数据的不丢失。
如果是多并行度下,可以设置空闲等待时间,来让多个多并行度的数据正常处理。

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号