当前位置:   article > 正文

Flink 编程实战 - 构建程序框架_new mytimestampextractor()

new mytimestampextractor()

 

背景

          Flink文档中介绍窗口、水印和触发器等功能偏理论,浏览之后对编程方式懵懵懂懂,故作如下练习,是一个基础Flink编程框架,并不是所有Flink程序都如下程序框架所述。

         理解如下代码,需要正确理解Event Time和Watermark,可以浏览《Flink Event Time和WaterMark结合优势分析》 博文和《Flink Windows窗口简介和使用》 两篇博文。

正文

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.api.java.tuple.Tuple;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.TimeCharacteristic;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
  8. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  9. import org.apache.flink.streaming.api.watermark.Watermark;
  10. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  11. import org.apache.flink.streaming.api.windowing.time.Time;
  12. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  13. import org.apache.flink.util.Collector;
  14. import java.text.SimpleDateFormat;
  15. import java.util.Comparator;
  16. import java.util.Date;
  17. import java.util.LinkedList;
  18. public class WatermarkTest {
  19. public static void main(String[] args) throws Exception {
  20. //2018/3/3 3:30:0
  21. Long baseTimestamp = 1520019000000L;
  22. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23. //基于event time可以解决乱序到达或者延迟导致 分配窗口错误问题,详细介绍见:《Flink Event Time和WaterMark结合优势分析》 博文
  24. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  25. env.getConfig().setAutoWatermarkInterval(2000);
  26. env.setParallelism(1);
  27. DataStream<Tuple2<String, Long>> raw = env.socketTextStream("localhost", 9999, "\n").map(new MapFunction<String, Tuple2<String, Long>>() {
  28. @Override
  29. public Tuple2<String, Long> map(String value) throws Exception {
  30. //模拟显示数据,需包含event time
  31. //每行输入数据形如: key1@0,key1@13等等,即在baseTimestamp的基础上加多少秒,作为当前event time
  32. String[] tmp = value.split("@");
  33. Long ts = baseTimestamp + Long.parseLong(tmp[1]) * 1000;
  34. return Tuple2.of(tmp[0], ts);
  35. }
  36. }).assignTimestampsAndWatermarks(
  37. //此方法的作用:
  38. //允许10秒乱序,watermark为当前接收到的最大事件时间戳减10秒
  39. //提取Event Time字段
  40. //设置watermark与当前时间关系
  41. new MyTimestampExtractor(Time.seconds(10))
  42. );
  43. //到目前为止,我们已经提取了时间Event字段,和水印,下一步就是设置窗口分配器,将数据分配到指定窗口
  44. //这里使用的是TumblingEventTimeWindows类型,3秒移动一次,允许延时5s,延时时间是按照watermark的时间进行延迟的
  45. DataStream<String> window = raw.keyBy(0)
  46. //窗口都为自然时间窗口,而不是说从收到的消息时间为窗口开始时间来进行开窗,比如3秒的窗口,那么窗口一次是[0,3),[3,6)....[57,0),如果10秒窗口,那么[0,10),[10,20),...
  47. .window(TumblingEventTimeWindows.of(Time.seconds(3)))
  48. // 允许5秒延迟
  49. //比如窗口[2018-03-03 03:30:00,2018-03-03 03:30:03),如果没有允许延迟的话,那么当watermark到达2018-03-03 03:30:03的时候,将会触发窗口函数并移除窗口,这样2018-03-03 03:30:03之前的数据再来,将被丢弃
  50. //在允许5秒延迟的情况下,那么窗口的移除时间将到watermark为2018-03-03 03:30:08,在watermark没有到达这个时间之前,你输入2018-03-03 03:30:00这个时间,将仍然会触发[2018-03-03 03:30:00,2018-03-03 03:30:03)这个窗口的计算
  51. .allowedLateness(Time.seconds(5))
  52. //窗口应用函数,练习,可按照意图编写实现算法
  53. .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
  54. @Override
  55. public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
  56. LinkedList<Tuple2<String, Long>> data = new LinkedList<>();
  57. for (Tuple2<String, Long> tuple2 : input) {
  58. data.add(tuple2);
  59. }
  60. data.sort(new Comparator<Tuple2<String, Long>>() {
  61. @Override
  62. public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
  63. return o1.f1.compareTo(o2.f1);
  64. }
  65. });
  66. SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  67. String msg = String.format("key:%s, window:[ %s , %s ), elements count:%d, elements time range:[ %s , %s ]", tuple.getField(0)
  68. , format.format(new Date(window.getStart()))
  69. , format.format(new Date(window.getEnd()))
  70. , data.size()
  71. , format.format(new Date(data.getFirst().f1))
  72. , format.format(new Date(data.getLast().f1))
  73. );
  74. out.collect(msg);
  75. }
  76. });
  77. window.print();
  78. env.execute();
  79. }
  80. //自定义Event Time字段提取方法
  81. //自定义watermark 设置函数
  82. //如下所示
  83. public static class MyTimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
  84. private static final long serialVersionUID = 1L;
  85. /**
  86. * The current maximum timestamp seen so far.
  87. */
  88. private long currentMaxTimestamp;
  89. /**
  90. * The timestamp of the last emitted watermark.
  91. */
  92. private long lastEmittedWatermark = Long.MIN_VALUE;
  93. /**
  94. * The (fixed) interval between the maximum seen timestamp seen in the records
  95. * and that of the watermark to be emitted.
  96. */
  97. private final long maxOutOfOrderness;
  98. private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  99. //初始化构造函数
  100. public MyTimestampExtractor(Time maxOutOfOrderness) {
  101. if (maxOutOfOrderness.toMilliseconds() < 0) {
  102. throw new RuntimeException("Tried to set the maximum allowed " +
  103. "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
  104. }
  105. this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
  106. this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
  107. }
  108. public long getMaxOutOfOrdernessInMillis() {
  109. return maxOutOfOrderness;
  110. }
  111. //这里设置水印 ,最大Event Time 向后延迟10s
  112. @Override
  113. public final Watermark getCurrentWatermark() {
  114. // this guarantees that the watermark never goes backwards.
  115. long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
  116. if (potentialWM >= lastEmittedWatermark) {
  117. lastEmittedWatermark = potentialWM;
  118. }
  119. System.out.println(String.format("call getCurrentWatermark======currentMaxTimestamp:%s , lastEmittedWatermark:%s", format.format(new Date(currentMaxTimestamp)), format.format(new Date(lastEmittedWatermark))));
  120. return new Watermark(lastEmittedWatermark);
  121. }
  122. //提取时间字段方法
  123. @Override
  124. public final long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
  125. long timestamp = element.f1;
  126. if (timestamp > currentMaxTimestamp) {
  127. currentMaxTimestamp = timestamp;
  128. }
  129. return timestamp;
  130. }
  131. }
  132. }

借鉴

 

1.https://blog.csdn.net/xiao_jun_0820/article/details/79786517

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/730544
推荐阅读
相关标签
  

闽ICP备14008679号