当前位置:   article > 正文

Flink第四章:水位线和窗口_flink api 获取当前水位线

flink api 获取当前水位线

系列文章目录

Flink第一章:环境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)
Flink第四章:水位线和窗口



前言

这次博客记录一下Flink框架中的窗口和水位线.
创建以下scala文件
在这里插入图片描述


一、水位线

在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,
用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数
据的时间戳来驱动的。

水位线共有三种,以下代表做了三种水位线的创立示范
WatermarkTest.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._

import java.time.Duration

object WatermarkTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.getConfig.setAutoWatermarkInterval(500L)

    val stream: DataStream[Event] = env.fromElements(
      Event("Mary", "./home", 1000L),
      Event("Bob", "./cart", 2000L),
    )

    //1. 有序流的水位线生成策略
    stream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[Event]()
    .withTimestampAssigner(
      new SerializableTimestampAssigner[Event] {
        override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
      }
    ))

    //2. 无序流的水位线生成策略
    stream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[Event](Duration.ofSeconds(2))
      .withTimestampAssigner(
        new SerializableTimestampAssigner[Event] {
          override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
        }
      ))


    //3. 自定义水位线生成策略
    stream.assignTimestampsAndWatermarks( new WatermarkStrategy[Event] {
      override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[Event] = {
        new SerializableTimestampAssigner[Event] {
          override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
        }
      }


      override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[Event] = {
        new WatermarkGenerator[Event] {
          // 定义一个延迟时间
          val delay=5000L
          //定义属性保存最大时间戳
          var maxTs: Long =Long.MinValue+delay+1



          override def onEvent(t: Event, l: Long, watermarkOutput: WatermarkOutput): Unit = {
            maxTs=math.max(maxTs,t.timestamp)
          }

          override def onPeriodicEmit(watermarkOutput: WatermarkOutput): Unit = {
            val watermark = new Watermark(maxTs-delay-1)
            watermarkOutput.emitWatermark(watermark)
          }
        }
      }
    })
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

二、窗口

Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想
要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这
就是所谓的“窗口”(Window)。在 Flink 中, 窗口就是用来处理无界流的核心。

这里分别是四种常用的窗口类型,以及一个简单的实现
WindowTest.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time

object WindowTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)
      .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
        .withTimestampAssigner(new SerializableTimestampAssigner[Event] {
          override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
        }))

    //stream.keyBy(_.user)
   //   .window(TumblingEventTimeWindows.of(Time.hours(1),Time.minutes(10))) //基于事件时间的滚动窗口
  //    .window(TumblingProcessingTimeWindows.of(Time.days(1),Time.hours(-8)) ) //基于处理时间的滚动窗口
     // .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(10)) ) //基于事件时间的滑动窗口
     // .window(EventTimeSessionWindows.withGap(Time.seconds(10)))  //基于事件时间的会话窗口
    //  .countWindow(10) //滚动计数窗口

    stream.map(data=>(data.user,1))
      .keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .reduce((stats,data)=>(data._1,stats._2+data._2))
      .print()

    env.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

在这里插入图片描述
案例是最简单的点击次数统计,每1秒发送一次数据,5秒进行统计一次,所以点击数相加都是5.

二、实际案例

1.自定义聚合函数

AggregateFunctionTest.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object AggregateFunctionTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)
      .assignAscendingTimestamps(_.timestamp)

    // 统计pv和uv,输出pv/uv
    stream.keyBy(data=>true)
      .window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2)))
      .aggregate( new PvUv)
      .print()

    env.execute()
  }

  //实现自定义函数,用一个二元组(Long,Set)表示聚合的(pv,uv)状态
  class PvUv extends AggregateFunction[Event,(Long,Set[String]),Double]{
    override def createAccumulator(): (Long, Set[String]) = (0L,Set[String]())

    //每来一条数据,调用一次
    override def add(in: Event, acc: (Long, Set[String])): (Long, Set[String]) = (acc._1+1,acc._2+in.user)

    //返回最终计算结果
    override def getResult(acc: (Long, Set[String])): Double = acc._1.toDouble/acc._2.size

    override def merge(acc: (Long, Set[String]), acc1: (Long, Set[String])): (Long, Set[String]) = ???
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

在这里插入图片描述

2.全窗口函数

FullWindowFunctionTest.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object FullWindowFunctionTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)
      .assignAscendingTimestamps(_.timestamp)

    //测试全窗口函数,统计UV
    stream.keyBy(data=>"key")
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .process(new UvCountByWindows)
      .print()

    env.execute()

    class UvCountByWindows extends ProcessWindowFunction[Event,String,String,TimeWindow] {
      override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {
        //使用一个Set进行去重操作
        var userSet: Set[String] = Set[String]()

        // 从elements中提取所有数据,一次放入set中去重
        elements.foreach( userSet+=_.user)
        val uv: Int = userSet.size
        //提取窗口信息包装String进行输出
        val windeEnd: Long = context.window.getEnd
        val windowStart: Long = context.window.getStart

        out.collect(s"窗口 $windowStart - $windeEnd 的uv值为:$uv")
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

3.水位线+窗口

WatermarkWindowTest.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.time.Duration

object WatermarkWindowTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.getConfig.setAutoWatermarkInterval(500L)

    val stream: DataStream[Event] = env.socketTextStream("127.0.0.1", 7777)
      .map(data => {
        val fields: Array[String] = data.split(",")
        Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
      })

    stream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))
      .withTimestampAssigner(
        new SerializableTimestampAssigner[Event] {
          override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
        }
      )).keyBy(_.user)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .process(  new WatermarkWindowsResult)
      .print()

    env.execute()

  }


  class WatermarkWindowsResult extends ProcessWindowFunction[Event,String,String,TimeWindow] {
    override def process(user: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {
      val start: Long = context.window.getStart
      val end: Long = context.window.getEnd
      val count: Int = elements.size

      //增加水位线信息
      val currentWatermark: Long = context.currentWatermark
      out.collect(s"窗口 $start - $end ,用户 $user 的活跃度 $count,水位线现在位于:$currentWatermark")
    }
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

因为咱们给窗口设置的时间是10秒,等待时间是5秒,所以需要时间戳达到15秒,窗口才会处理数据.
在这里插入图片描述
因为窗口时间是[0,10000),所以他只统计了1000和3000两个数据.
在这里插入图片描述

4.统计用户点击数据

UrlViewCountExample.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

// 定义统计输出的结果数据结果

case class UrlViewCount(url:String,count:Long,windowStart:Long,windowEnd:Long)

object UrlViewCountExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)
      .assignAscendingTimestamps(_.timestamp)

    // 结合使用增量聚合函数和全窗口函数,包装统计信息

    stream.keyBy(_.url)
      .window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
      .aggregate(new UrlViewCountAgg,new UrlViewCountResult)
      .print()

    env.execute()
  }
  //实现增量聚合函数,来一个数据就加1
  class UrlViewCountAgg extends AggregateFunction[Event,Long,Long] {
    override def createAccumulator(): Long = 0L

    override def add(in: Event, acc: Long): Long = acc+1

    override def getResult(acc: Long): Long = acc

    override def merge(acc: Long, acc1: Long): Long = ???
  }

  //实现全窗口函数
  class UrlViewCountResult extends ProcessWindowFunction[Long,UrlViewCount,String,TimeWindow] {
    override def process(url: String, context: Context, elements: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
      // 提取需要的数据
      val count: Long = elements.iterator.next()
      val start: Long = context.window.getStart
      val end: Long = context.window.getEnd

      //输出数据
      out.collect(UrlViewCount(url = url, count = count, windowStart = start, windowEnd = end))
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

在这里插入图片描述

5.处理迟到数据

ProcessLateExample.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.Event
import com.atguigu.chapter03.UrlViewCountExample.{UrlViewCountAgg, UrlViewCountResult}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time

import java.time.Duration

object ProcessLateExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.socketTextStream("127.0.0.1", 7777)
      .map(data => {
        val fields: Array[String] = data.split(",")
        Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
      })assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))
      .withTimestampAssigner(
        new SerializableTimestampAssigner[Event] {
          override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
        }
      ))

    //定义一个测输出流标签
    val outputTag: OutputTag[Event] = OutputTag[Event]("late-data")

    val result: DataStream[UrlViewCount] = stream.keyBy(_.url)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      //指定窗口允许等待的实践
      .allowedLateness(Time.minutes(1))
      //将迟到数据输入到侧数据窗口
      .sideOutputLateData(outputTag)
      .aggregate(new UrlViewCountAgg, new UrlViewCountResult)


    result.print("result")

    stream.print("input")

    result.getSideOutput(outputTag).print("late data")

    env.execute()

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

在这里插入图片描述
可以看到当数据的时间戳达到15毫秒是 0-10秒的窗口才开始统计.我们继续添加数据.
在这里插入图片描述
我们继续添加0-10秒内的数据,窗口还是会继续计算,但是窗口最终还是会关闭,我们设置的等待时间是1分钟,所以我们将水位线推进到70秒.
在这里插入图片描述
可以看到,我们以及触发了第二个窗口计算,现在我们向关闭的0-10秒数据窗口发送数据.
在这里插入图片描述
可以看到窗口依然打开这,可以进行计算,这是因为,我们为水位线这设置了5秒的延迟,所以水位线现在到了65秒,我们发送75秒的数据,将水位线推到70秒.
在这里插入图片描述
可以看到我们将水位线推到了70秒,窗口关闭,依旧可以捕捉到迟到数据,但是无法触发窗口的计算.后续迟到结果需要我们手动加入结果中.


总结

以上就是Flink中有关窗口和水位线的操作.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/544126
推荐阅读
相关标签
  

闽ICP备14008679号