当前位置:   article > 正文

Flink Window

flink window

1.window 概述

  • streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集
  • Window窗口就在一个无界流中设置起始位置和终止位置,让无界流变成有界流,并且在有界流中进行
    数据处理
  • Window操作常见的业务场景:统计过去一段时间、最近一些元素的数据指标

2.window 窗口的类型

2.1 根据数据流是否keyBy划分 Keyed vs Non-Keyed Windows

  • 要指定是否是 Keyed windows 需要在window前指定(可查看下面代码),使用keyBy(…)会将无界流分组为逻辑键流。如果keyBy(…)未调用,则不会为您的流根据key来分组。
  • 对stream分流之后,可以使窗口化计算可以由多个任务并行执行(也就是说可以分流到不同slot 甚至是不同的机器上面去做并行计算),因为每个逻辑键控流都可以根据key分流独立于其余逻辑流进行处理。而引用同一键的所有元素将被发送到同一并行任务。
    对于没有keyBy的流,原始流将不会拆分为多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行度为1(假如前面的DataStream没有设置多并行情况下)
  • flink的设计是先分流再做窗口,Google Dataflow 是先窗口再分流
Keyed Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

-----------------------------------------------------------------------------------------------------------------------

Non-Keyed Windows

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

2.2 根据不同的Window Assigners划分

  • WindowAssigner 负责将每条输入的数据分发到正确的 window 中。这是通过 在window(…)(针对KeyedStream)或windowAll()(针对DataStream)传入不同的WindowAssigner来完成的。window() 方法接收的输入参数是一个 WindowAssigner。WindowAssigner负责将每个传入元素分配给一个或多个窗口(滑动窗口,有些元素可能需要复制到多个窗口中)。

  • Flink中根据比较常见的场景提供了一些WindowAssigner:tumbling windows, sliding windows, session windows and global windows 。也可以通过实现WindowAssigner class来自定义一些窗口分配器。

  • 所有flink定义的窗口分配器(全局窗口除外)都是基于时间将元素分配给窗口。这个时间的定义可以是 processing time,也可以是event time。在生产需求中,大部分使用event time 。关于时间的定义以及水位的定义,会在后面的文章涉及到。

  • 下面简单介绍下flink提供的常用的WindowAssigner
    下面用的图显示了每种WindowAssigner的工作情况。紫色圆圈表示流的元素,这些元素由某个键(在这种情况下为用户1,用户2和用户3)划分。x轴显示时间进度。

tumbling windows, sliding windows, session windows and global windows

2.2.1 滚动窗口(Tumbling Windows)

  • 将数据依据固定的窗口长度对数据进行切片。
  • 特点:时间对齐,窗口长度固定,没有重叠。
  • 滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个5分钟大小的window size,窗口的创建如下图所示:
    tumbling windows
  • 适用场景:适合做BI统计等(做每个时间段的聚合计算)。
  • 官网提供的代码
val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

通过静态方法TumblingEventTimeWindows.of来实例化TumblingEventTimeWindows类,可以通过源码看到TumblingEventTimeWindows构造是有两个参数

 private TumblingProcessingTimeWindows(long size, long offset) {
		...
}
  • 1
  • 2
  • 3

时间间隔可以通过指定Time.milliseconds(x),Time.seconds(x), Time.minutes(x)
至于offset 参数,该参数可用于更改窗口的对齐方式。例如,如果没有偏移,则每小时滚动窗口与epoch对齐,即你将获得诸如的窗口 1:00:00.000 - 1:59:59.999,2:00:00.000 - 2:59:59.999等等。如果要更改,可以提供一个偏移量。随着15分钟的偏移量,你会,例如,拿 1:15:00.000 - 2:14:59.999,2:15:00.000 - 3:14:59.999等
一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,必须指定的偏移量Time.hours(-8)。

  • 简单案例
import java.sql.DriverManager

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.functi
  • 1
  • 2
  • 3
  • 4
  • 5
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/1019250
推荐阅读
相关标签
  

闽ICP备14008679号