当前位置:   article > 正文

3.二次开发Flink-coGroup算子支持迟到数据通过测输出流提取_flink cogroup

flink cogroup

目录

1.背景

2.coGroup算子源码分析

2.1完整的coGroup算子调用流程

2.2coGroup方法入口

2.3 CoGroupedStreams对象分析

2.4WithWindow内部类分析

2.5CoGroupWindowFunction函数分析

3.修改源码支持获取迟到数据测输出流

3.1复制CoGroupedStreams

3.2新增WithWindow.sideOutputLateData方法

3.3新增WithWindow构造方法

3.4修改apply方法

3.5开放UnionTypeInfo类的public权限

​3.6编译Flink源码flink-streaming-java模块

3.7项目中查看maven是否已经刷新为最新代码

4.测试


1.背景

coGroup算子开窗到时间关闭之后,迟到数据无法通过测输出流提取,intervalJoin算子提供了api,因为join算子底层就是coGroup算子,所以Join算子也不行。

flink版本 v1.17.1

2.coGroup算子源码分析

2.1完整的coGroup算子调用流程

  1. input1.coGroup(input2)
  2. .where(keySelector1)
  3. .equalTo(keySelector2)
  4. .window(windowAssigner)
  5. .trigger(trigger)
  6. .evictor(evictor)
  7. .allowedLateness(allowedLateness)
  8. .apply(cgroupFunction)

通过上述代码可以看到没有sideOutputLateData的相关方法,用来提取窗口关闭之后的迟到数据

2.2coGroup方法入口

其中创建了一个CoGroupedStreams流对象

  1. /**
  2. * Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys and
  3. * window can be specified.
  4. */
  5. public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
  6. return new CoGroupedStreams<>(this, otherStream);
  7. }

2.3 CoGroupedStreams对象分析

他可以理解为构造设计模式的一个Builder类,通过where方法配置第一条流的KeySelector,再返回一个CoGroupedStreams的内部类Where,再通过equalTo方法配置第二条流的KeySelector,再返回EqualTo内部类,window方法配置窗口划分器,返回WithWindow内部类,后续都是窗口的配置 trigger,evictor,allowedLateness配置窗口参数,最后调用apply方法传送用户业务函数

2.4WithWindow内部类分析

WithWindow是最终保存所有配置的内部类包括两条流,窗口配置,key提取器的配置,最终会用户调用apply方法触发CoGroup的业务,在apply方法中通过union联合两条流,然后通过keyby转为KeyedStream,再通过window配置窗口,最终调用窗口函数的apply方法,传入WindowFunction,做CoGroup的业务与用户业务。

具体代码如下已写好备注

  1. /**
  2. * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
  3. * well as a {@link WindowAssigner}.
  4. *
  5. * @param <T1> Type of the elements from the first input
  6. * @param <T2> Type of the elements from the second input
  7. * @param <KEY> Type of the key. This must be the same for both inputs
  8. * @param <W> Type of {@link Window} on which the co-group operation works.
  9. */
  10. @Public
  11. public static class WithWindow<T1, T2, KEY, W extends Window> {
  12. //第一条流
  13. private final DataStream<T1> input1;
  14. //第二条流
  15. private final DataStream<T2> input2;
  16. //第一个key提取器
  17. private final KeySelector<T1, KEY> keySelector1;
  18. //第二个Key提取器
  19. private final KeySelector<T2, KEY> keySelector2;
  20. //Key的类型
  21. private final TypeInformation<KEY> keyType;
  22. //窗口分配器
  23. private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
  24. //窗口出发计算器
  25. private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
  26. private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
  27. private final Time allowedLateness;
  28. private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;
  29. //构造函数给上面对象赋值
  30. protected WithWindow(
  31. DataStream<T1> input1,
  32. DataStream<T2> input2,
  33. KeySelector<T1, KEY> keySelector1,
  34. KeySelector<T2, KEY> keySelector2,
  35. TypeInformation<KEY> keyType,
  36. WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
  37. Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
  38. Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
  39. Time allowedLateness) {
  40. this.input1 = input1;
  41. this.input2 = input2;
  42. this.keySelector1 = keySelector1;
  43. this.keySelector2 = keySelector2;
  44. this.keyType = keyType;
  45. this.windowAssigner = windowAssigner;
  46. this.trigger = trigger;
  47. this.evictor = evictor;
  48. this.allowedLateness = allowedLateness;
  49. }
  50. /**
  51. * Completes the co-group operation with the user function that is executed for windowed
  52. * groups.
  53. *
  54. * <p>Note: This method's return type does not support setting an operator-specific
  55. * parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
  56. * {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific
  57. * parallelism.
  58. */
  59. public <T> DataStream<T> apply(
  60. CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
  61. // clean the closure
  62. function = input1.getExecutionEnvironment().clean(function);
  63. //创建合并两个流的公共TypeInfo,UnionTypeInfo最终会将Input1,Input2的数据通过map算子转换为该类型
  64. UnionTypeInfo<T1, T2> unionType =
  65. new UnionTypeInfo<>(input1.getType(), input2.getType());
  66. //转换成union的KeySelector
  67. UnionKeySelector<T1, T2, KEY> unionKeySelector =
  68. new UnionKeySelector<>(keySelector1, keySelector2);
  69. //将taggedInput1的数据类容map成UnionTypeInfo<T1, T2>类型
  70. SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =
  71. input1.map(new Input1Tagger<T1, T2>());
  72. taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);
  73. taggedInput1.returns(unionType);
  74. //将taggedInput2的数据类容map成UnionTypeInfo<T1, T2>类型
  75. SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =
  76. input2.map(new Input2Tagger<T1, T2>());
  77. taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);
  78. taggedInput2.returns(unionType);
  79. //将两个流进行union
  80. DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
  81. //keyBy并且开窗
  82. windowedStream =
  83. new KeyedStream<TaggedUnion<T1, T2>, KEY>(
  84. unionStream, unionKeySelector, keyType)
  85. .window(windowAssigner);
  86. //配置窗口触发器
  87. if (trigger != null) {
  88. windowedStream.trigger(trigger);
  89. }
  90. //配置移除器
  91. if (evictor != null) {
  92. windowedStream.evictor(evictor);
  93. }
  94. //配置allowedLateness
  95. if (allowedLateness != null) {
  96. windowedStream.allowedLateness(allowedLateness);
  97. }
  98. //创建CoGroupWindowFunction ,并把用户函数传入进去
  99. return windowedStream.apply(
  100. new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
  101. }
  102. /**
  103. * Completes the co-group operation with the user function that is executed for windowed
  104. * groups.
  105. *
  106. * <p><b>Note:</b> This is a temporary workaround while the {@link #apply(CoGroupFunction,
  107. * TypeInformation)} method has the wrong return type and hence does not allow one to set an
  108. * operator-specific parallelism
  109. *
  110. * @deprecated This method will be removed once the {@link #apply(CoGroupFunction,
  111. * TypeInformation)} method is fixed in the next major version of Flink (2.0).
  112. */
  113. @PublicEvolving
  114. @Deprecated
  115. public <T> SingleOutputStreamOperator<T> with(
  116. CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
  117. return (SingleOutputStreamOperator<T>) apply(function, resultType);
  118. }
  119. @VisibleForTesting
  120. Time getAllowedLateness() {
  121. return allowedLateness;
  122. }
  123. //获取窗口包装流,但是标记为VisibleForTesting,用户无法调用,如果可以调用的话可以通过该方法获取包装流之后通过窗口流获取迟到数据的测输出流
  124. @VisibleForTesting
  125. WindowedStream<TaggedUnion<T1, T2>, KEY, W> getWindowedStream() {
  126. return windowedStream;
  127. }
  128. }

2.5CoGroupWindowFunction函数分析

CoGroupWindowFunction也是CoGroupedStreams内部类,负责做CoGroup的业务,最终将数据封装好转发给用户函数(也就是2.1中apply中的cgroupFunction)

  1. private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
  2. extends WrappingFunction<CoGroupFunction<T1, T2, T>>
  3. implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
  4. private static final long serialVersionUID = 1L;
  5. public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
  6. super(userFunction);
  7. }
  8. @Override
  9. public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out)
  10. throws Exception {
  11. //缓存当前窗口里1号流的数据
  12. List<T1> oneValues = new ArrayList<>();
  13. //缓存当前窗口里2号流的数据
  14. List<T2> twoValues = new ArrayList<>();
  15. for (TaggedUnion<T1, T2> val : values) {
  16. if (val.isOne()) {
  17. oneValues.add(val.getOne());
  18. } else {
  19. twoValues.add(val.getTwo());
  20. }
  21. }
  22. //传入到用户函数中
  23. wrappedFunction.coGroup(oneValues, twoValues, out);
  24. }
  25. }

3.修改源码支持获取迟到数据测输出流

思路 复制CoGroupedStreams新增一个NewCoGroupedStreams,在WithWindow函数中增加方法sideOutputLateData,让用户传入outputTag,用于提取窗口关闭后的测输出流。

3.1复制CoGroupedStreams

3.2新增WithWindow.sideOutputLateData方法

新增该方法,传入outputTag,下图WithWindow构造方法是3.3新增的

  1. @PublicEvolving
  2. public WithWindow<T1, T2, KEY, W> sideOutputLateData(
  3. OutputTag<TaggedUnion<T1, T2>> outputTag) {
  4. return new WithWindow<>(
  5. input1,
  6. input2,
  7. keySelector1,
  8. keySelector2,
  9. keyType,
  10. windowAssigner,
  11. trigger,
  12. evictor,
  13. allowedLateness,
  14. outputTag
  15. );
  16. }

3.3新增WithWindow构造方法

新增属性laterDataOutputTag,用来保存构造函数中传入的laterOutputTag

  1. protected WithWindow(
  2. DataStream<T1> input1,
  3. DataStream<T2> input2,
  4. KeySelector<T1, KEY> keySelector1,
  5. KeySelector<T2, KEY> keySelector2,
  6. TypeInformation<KEY> keyType,
  7. WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
  8. Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
  9. Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
  10. Time allowedLateness,
  11. OutputTag<TaggedUnion<T1, T2>> laterOutputTag
  12. ) {
  13. this(
  14. input1,
  15. input2,
  16. keySelector1,
  17. keySelector2,
  18. keyType,
  19. windowAssigner,
  20. trigger,
  21. evictor,
  22. allowedLateness);
  23. this.lateDataOutputTag = laterOutputTag;
  24. }

3.4修改apply方法

判断lateDataOutputTag 是否为null,如果不为null则调用windowedStream的sideOutputLateData设置迟到数据tag

  1. /**
  2. * Completes the co-group operation with the user function that is executed for windowed
  3. * groups.
  4. *
  5. * <p>Note: This method's return type does not support setting an operator-specific
  6. * parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
  7. * {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific
  8. * parallelism.
  9. */
  10. public <T> DataStream<T> apply(
  11. CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
  12. // clean the closure
  13. function = input1.getExecutionEnvironment().clean(function);
  14. UnionTypeInfo<T1, T2> unionType =
  15. new UnionTypeInfo<>(input1.getType(), input2.getType());
  16. UnionKeySelector<T1, T2, KEY> unionKeySelector =
  17. new UnionKeySelector<>(keySelector1, keySelector2);
  18. SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =
  19. input1.map(new Input1Tagger<T1, T2>());
  20. taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);
  21. taggedInput1.returns(unionType);
  22. SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =
  23. input2.map(new Input2Tagger<T1, T2>());
  24. taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);
  25. taggedInput2.returns(unionType);
  26. DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
  27. // we explicitly create the keyed stream to manually pass the key type information in
  28. windowedStream =
  29. new KeyedStream<TaggedUnion<T1, T2>, KEY>(
  30. unionStream, unionKeySelector, keyType)
  31. .window(windowAssigner);
  32. if (trigger != null) {
  33. windowedStream.trigger(trigger);
  34. }
  35. if (evictor != null) {
  36. windowedStream.evictor(evictor);
  37. }
  38. if (allowedLateness != null) {
  39. windowedStream.allowedLateness(allowedLateness);
  40. }
  41. //判断lateDataOutputTag是否为NULL,如果不为NULL,则调用windowedStream
  42. //的sideOutputLateData方法,传入lateDataOutputTag让迟到数据输出到测输出流中
  43. if (lateDataOutputTag != null) {
  44. windowedStream.sideOutputLateData(lateDataOutputTag);
  45. }
  46. return windowedStream.apply(
  47. new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
  48. }

3.5开放UnionTypeInfo类的public权限

该类就是union之后的公共类的类型 oneType代表Input1流的数据类型,TwoType代表Input2流的数据类型

进入到flink-streaming-java所在磁盘目录输入以下命令编译

mvn clean install -DskipTests -Dfast

编译成功

3.7项目中查看maven是否已经刷新为最新代码

编译之后,可以看到导入的maven包已经有了新增的NewCoGroupedStreams类了,注意项目中的maven依赖中的flink版本,要与编译源码的版本一致,否则无法引入到。

4.测试

新建两个流,通过new NewCoGroupedStreams创建对象,在allowedLateness之后通过sideOutputLateData设置outputTag,然后通过with方法触发业务,with底层也是调用了apply,只不过他帮我们把返回的流转为了SingleOutputStreamOperator类型,可以用于提取测输出流。最后通过with.getSideOutput(outputTag)提取测输出流,最后通过map转换为 Tuple2<Integer, WaterSensor> 类型进行打印

  1. OutputTag<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>> outputTag = new OutputTag<>("later",
  2. new NewCoGroupedStreams.UnionTypeInfo<>(Types.POJO(WaterSensor.class), Types.POJO(WaterSensor.class)));
  3. NewCoGroupedStreams<WaterSensor, WaterSensor> newCgroupStream = new NewCoGroupedStreams<>(ds1, ds2);
  4. SingleOutputStreamOperator<String> with = newCgroupStream.where((x) -> x.getId()).equalTo(x -> x.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10)))
  5. .allowedLateness(Time.seconds(3))
  6. .sideOutputLateData(outputTag)
  7. .with(new RichCoGroupFunction<WaterSensor, WaterSensor, String>() {
  8. @Override
  9. public void coGroup(Iterable<WaterSensor> first, Iterable<WaterSensor> second, Collector<String> out) throws Exception {
  10. out.collect(first.toString() + "======" + second.toString());
  11. }
  12. });
  13. with.print();
  14. with.getSideOutput(outputTag).map(new MapFunction<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>, Tuple2<Integer, WaterSensor>>() {
  15. @Override
  16. public Tuple2<Integer, WaterSensor> map(NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor> value) throws Exception {
  17. return value.isOne() ? Tuple2.of(1, value.getOne()) : Tuple2.of(2, value.getTwo());
  18. }
  19. }).print();

可以看到下图结果,ts代表时间戳,第一个打印是RichCoGroupFunction打印,代表关闭了1~10s的时间窗,后面我们在输入,WaterSensor{id='a', ts=1, vc=1} 就通过测输出流打印为二元组了

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/784798
推荐阅读
相关标签
  

闽ICP备14008679号