当前位置:   article > 正文

Flink1.13 cumulate window 累加窗口的使用_flink cumulate window

flink cumulate window

原理分析:

原始订单数据 》 Flink CDC(其实可以做简单的维表Join) 》 Kafka(ODS) 本身存储30h
消费 Kafka ODS 的数据:
累加窗口:(1 MINUTE,1 DAY) 按照1分钟划分窗口,每分钟计算当前分钟的数据 merge 当前分钟的前一分钟的数据结果
按照 订单数据事件时间+水位线 进行窗口触发执行

得到的结果其实就是当天的累计值

cumulate window 是一个窗口,其窗口计算的触发也是完全由 watermark 推动的。与 tumble window 一样。

cumulate window 维护了一个 slice state 和 merged state,slice state 就是每一分钟内窗口数据(叫做切片),merged state 的作用是当 watermark 推动到下一分钟时,这一分钟的 slice state 就会被 merge 到 merged stated 中,因此 merged state 中的值就是当天零点到当前这一分钟的累计值,我们的输出结果就是从 merged state 得到的。(摘自https://zhuanlan.zhihu.com/p/435936541)


import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class CumulateWindowExample2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 读取数据源,并分配时间戳、生成水位线
        SingleOutputStreamOperator<String> socketStream = env.socketTextStream("192.168.137.32",9999);
//        socketStream.print("SocketStream >>>>>>>");


//        socketStream.map(new MapFunction<String, Event>() {
//            @Override
//            public Event map(String line) throws Exception {
//                String[] splits = line.split(",");
//                String user = splits[0].trim();
//                String url = splits[1].trim();
//                Long timestamp = Long.parseLong(splits[2].trim());
//                return new Event(user, url, timestamp);
//            }
//        }).print("EventStream>>>>>>");


        socketStream.map(new MapFunction<String, Event>() {
            @Override
            public Event map(String line) throws Exception {
                String[] splits = line.split(",");
                String user = splits[0].trim();
                String url = splits[1].trim();
                Long timestamp = Long.parseLong(splits[2].trim());
                return new Event(user, url, timestamp);
            }
        }).assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                   @Override
                                   public long extractTimestamp(Event element, long
                                           recordTimestamp) {
                                       return element.timestamp;
                                   }
                               })
          ).print("EventWaterMarker >>>>>> ");


        SingleOutputStreamOperator<Event> eventWaterMarkerStream = socketStream.map(new MapFunction<String, Event>() {
            @Override
            public Event map(String line) throws Exception {
                String[] splits = line.split(",");
                String user = splits[0].trim();
                String url = splits[1].trim();
                Long timestamp = Long.parseLong(splits[2].trim());
                return new Event(user, url, timestamp);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long
                            recordTimestamp) {
                        return element.timestamp;
                    }
                })
        );

        // 创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 将数据流转换成表,并指定时间属性
        Table eventTable = tableEnv.fromDataStream(
                eventWaterMarkerStream,
                $("user"),
                $("url"),
                $("timestamp").rowtime().as("ts")
        );
        // 为方便在 SQL 中引用,在环境中注册表 EventTable
        tableEnv.createTemporaryView("EventTable", eventTable);
        // 设置累积窗口,执行 SQL 统计查询
        Table result = tableEnv
                .sqlQuery(
                        "SELECT " +
                                "user, " +
                                "window_end AS endT, " +
                               --   "sum(if(url='1', 1, if(url='2', -1, 0))) AS state_cnt, " +
                                "COUNT(url) AS cnt " +
                                "FROM TABLE( " +
                                "CUMULATE( TABLE EventTable, " + // 定义累积窗口
                                "DESCRIPTOR(ts), " +
                                "INTERVAL '1' MINUTE, " +
                                "INTERVAL '1' DAY)) " +
                                "GROUP BY user, window_start, window_end "
                );

        // TODO ... 核心是:划分的每一个窗口都是会被执行一次的,
        /**
         * 即:就算某一个窗口没有数据,只要后面的其他窗口数据来了,那么前面空数据的窗口都是会被触发执行的
         */
        tableEnv.toDataStream(result).print("result>>>>>>>");
        env.execute("CumulateWindowExample2");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107

测试数据:

Alice,张三,1000
Bob,李四,1000
Alice,张三,59000

Alice,张三,60000 -- 1min 

Alice,张三,120000 -- 2min 

Alice,张三,240000 -- 4 min 

--迟到的数据
Alice,张三,181000


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

输出:

在这里插入图片描述
这里会发现对于迟到的数据,由于迟到数据所属窗口已经触发过了,所以不会再次触发该窗口的计算,这一点都是一致的。所以这块在实际业务中需要重点考虑怎么处理。

累加窗口原理分析总结:

 * 1.提前会根据你的时间 把窗口全都划分好
 * 按照你定义的窗口划分规则,
 * 比如 (1 MINUTE,1 DAY) 表示:每分钟计算当前分钟的数据 merge 当前分钟的前一分钟的数据结果
 *2.累加窗口维护了两个主要状态:
 *slice state 和 merged state
 * 
 * 3.比如一旦 (1 MINUTE,1 HOUR)  里面 60个窗口全都跑完,那么就会重新划分新的大窗口 ,该窗口状态数据会被清除 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这里分析正常的情况:

     *  1.每分钟数据先不会延迟
     *  2.每分钟都有数据来,都在自己的窗口内计算
     *  3.计算的结果就是每分钟的值,这个值需要和Redis缓存中值(记录的是累计值)进行匹配,
     *      3.1 能匹配上,按照规则+1处理,并更新缓存中的值
     *      3.2 不能匹配上,新写入缓存
     *      注意:缓存中的初始累计值需要考虑怎么刷进去
     *  4.得到最终的结果之后,要把这个值 upsert 到 MySQL /HBase 等
     *  
     *  这里面涉及到的数据一致性问题 需要重点分析!!!
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/417480
推荐阅读
相关标签
  

闽ICP备14008679号