当前位置:   article > 正文

(8)Flink-windows_flink window 函数 结果如何输出

flink window 函数 结果如何输出

目录

Count Window

Time Window

Session Window

窗口聚合 

增量聚合

全量聚合


Flink有3个内置Window

  • Count Window 以事件数量驱动
  • Time Window  以时间驱动
  • Session Window 以会话间隔驱动

Count Window

计数窗口,采用事件数量作为窗口处理依据。计数窗口分为滚动滑动两类,使用keyedStream.countWindow实现计数窗口定义。

  • Tumbling Count Window 滚动计数窗口 例子:以用户分组,当每位用户有3次付款事件时计算一次该用户付款总金额。下图中“消息A、B、C、D”代表4位不同用户,我们以A、B、C、D分组并计算金额。
  1. /**3个事件,计算窗口内数据 */
  2. keyedStream.countWindow(3);

以上满足3个才会计算一次,没有则不计算 。

  • Sliding Count Window 滑动计数窗口 例子:一位用户每3次付款事件计算最近4次付款事件总金额。
  1. /**3个事件,计算最近4个事件消息 */
  2. keyedStream.countWindow(4,3);

链接:countwindow的案例

Time Window

时间窗口,采用时间作为窗口处理依据。时间窗分为滚动和滑动两类,使用keyedStream.timeWindow实现时间窗定义。

  • Tumbling Time Window 滚动时间窗口:
  1. /**1分钟,计算窗口数据 */
  2. keyedStream.timeWindow( Time.minutes(1));

  • Sliding Time Window 滑动时间窗口:
  1. /** 每半分钟,计算最近1分钟窗口数据 */
  2. keyedStream.timeWindow( Time.minutes(1), Time.seconds(30));

参考案例:案例一

功能需求:对每天(00:00:00-23:59:59)、每小时(00:00-59:59)这个两个区间段内的数据进行统计。
发现滑动时间窗口不满足这个功能,查找资料后发现具有这个function:TumblingEventTimeWindows

对每小时的数据count:

  1. import static flink.GetTime.dateToTimestamp;
  2. import static stranger.PropertyLoader.getPropertiesConfig;
  3. /**
  4. * @author
  5. * @description 对每小时的数据进行统计
  6. * @date 2019/6/6
  7. */
  8. public class EventTimeStreamWindowAll {
  9. public static void main(String[] args) {
  10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  12. final String configPath = "config.properties";
  13. final Properties pro = getPropertiesConfig(configPath);
  14. final String topic = "stranger";
  15. final String groupId = "mainStranger";
  16. String bootstrapServers = pro.getProperty("bootstrap.servers");
  17. Properties properties = new Properties();
  18. properties.setProperty("bootstrap.servers", bootstrapServers);//kafka的节点的IP或者hostName,多个使用逗号分隔
  19. properties.setProperty("group.id", groupId);//flink consumer flink的消费者的group.id
  20. FlinkKafkaConsumer011<String> kafkaSource = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), properties);
  21. kafkaSource.setStartFromLatest();
  22. SingleOutputStreamOperator<String> mainStream = env.addSource(kafkaSource).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
  23. @Nullable
  24. @Override
  25. public Watermark getCurrentWatermark() {
  26. return new Watermark(System.currentTimeMillis() - 5000);
  27. }
  28. @Override
  29. public long extractTimestamp(String s, long l) {
  30. String[] split = s.split("\\t");
  31. long timestamp = dateToTimestamp(split[0]);
  32. return timestamp;
  33. }
  34. });
  35. //2019-06-05 15:13:32,people6,进入,place3
  36. DataStream<Tuple5<String, String, String, String, Long>> mainCount = mainStream.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
  37. @Override
  38. public Tuple4<String, String, String, String> map(String s) throws Exception {
  39. String[] split = s.split("\\t");
  40. return new Tuple4<>(split[0], split[1], split[2], split[3]);
  41. }
  42. }).keyBy(2)
  43. //以整点为窗口,每小时的窗口,offset 默认为0
  44. .windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
  45. //以每天为窗口,进行统计
  46. // .windowAll(TumblingEventTimeWindows.of(Time.days(1),Time.hours(-8)))
  47. //5秒触发一次
  48. .trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)))
  49. //每多少条触发一次
  50. // .trigger(CountTrigger.of(1))
  51. .process(new ProcessAllWindowFunction<Tuple4<String, String, String, String>, Tuple5<String, String, String, String, Long>, TimeWindow>() {
  52. @Override
  53. public void process(Context context, Iterable<Tuple4<String, String, String, String>> iterables, Collector<Tuple5<String, String, String, String, Long>> collector) throws Exception {
  54. Long sum = 0L;
  55. String time = null;
  56. String people = null;
  57. String behavior = null;
  58. String place = null;
  59. for (Tuple4<String, String, String, String> iterable : iterables) {
  60. sum += 1;
  61. time = iterable.f0;
  62. people = iterable.f1;
  63. behavior = iterable.f2;
  64. place = iterable.f3;
  65. }
  66. collector.collect(new Tuple5<>(time, people, behavior, place, sum));
  67. }
  68. });
  69. mainCount.print();
  70. try {
  71. env.execute("test count");
  72. } catch (Exception e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. }

Session Window

会话窗口,采用会话持续时长作为窗口处理依据。设置指定的会话持续时长时间,在这段时间中不再出现会话则认为超出会话时长。例子:每只股票超过2秒没有交易事件时计算窗口内交易总金额。下图中“消息A、消息B”代表两只不同的股票。

  1. /** 会话持续2秒。当超过2秒不再出现会话认为会话结束 */
  2. keyedStream.window( ProcessingTimeSessionWindows.withGap( Time.seconds(2)))


窗口聚合 

聚合分为两类,一类是增量聚合,另一类是全量聚合。

增量聚合

窗口每进入一条数据,就计算一次:

常见的增量聚合函数有:

reduce(reduceFuction)

aggregate(aggregateFunction)

sum()

min()

max()

全量聚合

窗口触发的时候对窗口内的数据进行计算一次:

常见的全量聚合函数有:

apply(windowFunction)

process(processWindowFunction)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/1019182
推荐阅读
相关标签
  

闽ICP备14008679号