当前位置:   article > 正文

(17) flink时间语义_assignascendingtimestamps

assignascendingtimestamps

时间(Time)语义

在这里插入图片描述
Event Time:事件创建的时间
Ingestion Time:数据进入Flink的时间
Processing Time:执行操作算子的本地系统时间,与机器相关

一般设置时间戳为Event Time,默认是Processing Time
在代码中设置 Event Time
在这里插入图片描述
具体的时间,还需要从数据中提取时间戳(timestamp),分配时间戳越数据源越好,比如样例类处理数据后
在这里插入图片描述
assignAscendingTimestamps 设置一个升序的时间戳

水位线(Watermark)

当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子,由于网络、分布式等原因,会导致乱序数据的产生,造成计算不准确.
Watermark 是一种衡量 Event Time 进展的机制,可以设定延迟触发,Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark 机制结合 window 来实现;数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,window 的执行也是由 Watermark 触发的。watermark 用来让程序自己平衡延迟和结果正确性

特点

在这里插入图片描述
watermark 是一条特殊的数据记录,watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退,watermark 与数据的时间戳相关

watermark 的传递

在这里插入图片描述

引入

Event Time 的使用一定要指定数据源中的时间戳,对于排好序的数据,只需要指定时间戳就够了,不需要延迟触发
在这里插入图片描述
Event Time 的使用一定要指定数据源中的时间戳,调用 assignTimestampAndWatermarks 方法,传入一个 BoundedOutOfOrdernessTimestampExtractor,就可以指定 watermark,对于排好序的数据,不需要延迟触发,可以只指定时间戳就行了在这里插入图片描述
Flink 暴露了 TimestampAssigner 接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳和生成watermark
在这里插入图片描述
MyAssigner 可以有两种类型,都继承自 TimestampAssigner
定义了抽取时间戳,以及生成 watermark 的方法,有两种类型
AssignerWithPeriodicWatermarks

周期性的生成 watermark:系统会周期性的将 watermark 插入到流中
默认周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法进行设置
升序和前面乱序的处理 BoundedOutOfOrderness ,都是基于周期性 watermark 的。

AssignerWithPunctuatedWatermarks

没有时间周期规律,可打断的生成 watermark,间断性生成watermark机制,根据源数据

代码详解

直接设置延迟时间
package study

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/*
样例数据
sensor_1, 1547718199, 35.80018327300259
sensor_6, 1547718201, 15.402984393403084
sensor_7, 1547718202, 6.720945201171228
sensor_10, 1547718205, 38.101067604893444
sensor_1, 1547718206, 35.1
sensor_1, 1547718299, 31.0
 */
object WindowTEst {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //设置时间为事件发生时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = env.socketTextStream("note01", 7777)

   stream.map(data => {
      val dataArray = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })
      .assignAscendingTimestamps(_.timeStamp * 1000) //延迟为当前时间 * 1000
      .map(data => (data.id, data.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5)) // 开时间窗口
      .reduce((data1, data2) => (data1._1, data1._2.min(data2._2))).print() // 用reduce做增量聚合
    env.execute();
  }
}

case class SensorReading(id: String, timeStamp: Long, temperature: Double)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

自定义周期性生成waterMark

package study

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time

/*
sensor_1, 1547718199, 35.80018327300259
sensor_6, 1547718201, 15.402984393403084
sensor_7, 1547718202, 6.720945201171228
sensor_10, 1547718205, 38.101067604893444
sensor_1, 1547718206, 35.1
sensor_1, 1547718299, 31.0
 */
object WindowTEst {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //设置时间为事件发生时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = env.socketTextStream("note01", 7777)

    stream.map(data => {
      val dataArray = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })

      //      .assignAscendingTimestamps(_.timeStamp * 1000) //延迟为当前时间 * 1000
      .assignTimestampsAndWatermarks(new MyAssigner()) //自定义的周期性watermark
      .map(data => (data.id, data.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5)) // 开时间窗口
      .reduce((data1, data2) => (data1._1, data1._2.min(data2._2))).print() // 用reduce做增量聚合
    env.execute();
  }
}

case class SensorReading(id: String, timeStamp: Long, temperature: Double)

class MyAssigner() extends AssignerWithPeriodicWatermarks[SensorReading] {
  //保存当前最大的事件戳
  var maxTs = Long.MinValue
  //定义最大的乱序时间
  val bound = 1000L


  //周期间隔:默认200ms,将watermark注入到当前流中
  override def getCurrentWatermark: Watermark = {
    new Watermark(maxTs - bound)
  }

  override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
    maxTs = maxTs.max(element.timeStamp * 1000L)
    element.timeStamp * 1000
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

自定义间断生成waterMark

类比上面

class MyAssigner() extends AssignerWithPunctuatedWatermarks[SensorReading]{
  override def checkAndGetNextWatermark(lastElement: SensorReading, extractedTimestamp: Long): Watermark = new Watermark(extractedTimestamp)

  override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = element.timestamp * 1000
}
  • 1
  • 2
  • 3
  • 4
  • 5
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/861838
推荐阅读
相关标签
  

闽ICP备14008679号