当前位置:   article > 正文

Flink join(流流)详解(一)_flink 流join

flink 流join

本文基于flink 1.11进行测试。

前言

这里所说的join是两个或者多个流的join,涉及流批join的内容或者批批join会另写一篇文章专门说。

Flink的join按照窗口类型分可以分为:Tumbling Window Join、Sliding Window Join和Session Window Join。

按join类型分可以分为join和intervalJoin。前者类似RDBMS中的内连接,interval join使用一个公共键连接两个流的元素(我们现在称它们为A和B),其中流B的元素的时间戳与流A中元素的时间戳之间存在相对时间间隔。

inner join示意图:

interval join示意图:

 

本文会用到

代码

创建用于join的两个数据源

  1. package it.kenn.source;
  2. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  3. import scala.Tuple3;
  4. import java.util.Random;
  5. public class ForJoinSource1 implements SourceFunction<Tuple3<String, Long, Double>> {
  6. boolean flag = true;
  7. @Override
  8. public void run(SourceContext<Tuple3<String, Long, Double>> ctx) throws Exception {
  9. Random random = new Random();
  10. while (flag) {
  11. int randInt = random.nextInt(100);
  12. ctx.collect(new Tuple3<>("S" + randInt, System.currentTimeMillis(), random.nextDouble() * 1000));
  13. Thread.sleep(30);
  14. }
  15. }
  16. @Override
  17. public void cancel() {
  18. flag = false;
  19. }
  20. }
  21. //-----------------------------------------------------------------------------------
  22. package it.kenn.source;
  23. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  24. import scala.Tuple3;
  25. import java.util.Random;
  26. public class ForJoinSource2 implements SourceFunction<Tuple3<String, Long, Double>> {
  27. boolean flag = true;
  28. @Override
  29. public void run(SourceContext<Tuple3<String, Long, Double>> ctx) throws Exception {
  30. Random random = new Random();
  31. while (flag) {
  32. int randInt = random.nextInt(110);
  33. ctx.collect(new Tuple3<>("S" + randInt, System.currentTimeMillis(), random.nextDouble() * 1000));
  34. Thread.sleep(20);
  35. }
  36. }
  37. @Override
  38. public void cancel() {
  39. flag = false;
  40. }
  41. }

测试主程序

  1. package it.kenn.join;
  2. import it.kenn.source.ForJoinSource1;
  3. import it.kenn.source.ForJoinSource2;
  4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  5. import org.apache.flink.api.common.functions.JoinFunction;
  6. import org.apache.flink.streaming.api.TimeCharacteristic;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
  11. import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
  12. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  13. import org.apache.flink.streaming.api.windowing.time.Time;
  14. import org.apache.flink.util.Collector;
  15. import scala.Tuple3;
  16. import java.time.Duration;
  17. /**
  18. * 测试join
  19. */
  20. //测试主程序
  21. public static void main(String[] args) throws Exception {
  22. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23. StreamStreamJoinTest joinTest = new StreamStreamJoinTest();
  24. //设置事件时间
  25. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  26. DataStream<Tuple3<String, Long, Double>> source30 = env.addSource(new ForJoinSource1())
  27. //指定时间戳和watermark规则,注意要指定两个:forBoundedOutOfOrderness指定watermark生成策略,withTimestampAssigner指定那个字段是事件时间
  28. .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Long, Double>>forBoundedOutOfOrderness(Duration.ofMillis(10)).withTimestampAssigner((e, ts) -> e._2()));
  29. DataStream<Tuple3<String, Long, Double>> source20 = env.addSource(new ForJoinSource2())
  30. .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Long, Double>>forBoundedOutOfOrderness(Duration.ofMillis(10)).withTimestampAssigner((e, ts) -> e._2()));
  31. DataStream<Tuple3<String, Long, Double>> tumbJoinedStream = joinTest.tumbJoin(source20, source30);
  32. DataStream<Tuple3<String, Long, Double>> slidingJoinStream = joinTest.slidingJoin(source20, source30);
  33. DataStream<Tuple3<String, Long, Double>> intervalJoinStream = joinTest.intervalJoin(source20, source30);
  34. //对不同join进行测试
  35. tumbJoinedStream.print();
  36. env.execute();
  37. }

inner Tumbling Window Join代码测试

  1. /**
  2. * inner Tumbling Window Join
  3. *
  4. * @param source20
  5. * @param source30
  6. * @return
  7. */
  8. public DataStream<Tuple3<String, Long, Double>> tumbJoin(DataStream<Tuple3<String, Long, Double>> source20, DataStream<Tuple3<String, Long, Double>> source30) {
  9. DataStream<Tuple3<String, Long, Double>> joinedStream = source20.join(source30)
  10. .where(e -> e._1().split("-")[1])//左流要join的字段
  11. .equalTo(e -> e._1().split("-")[1])//右侧流要join的字段
  12. .window(TumblingEventTimeWindows.of(Time.milliseconds(50)))//指定窗口类型和窗口大小
  13. //join函数,这里说是join但是跟数据库的join有一些区别,比如下面的逻辑并没有取两个流中的数据,而是比较两个流中数据的大小,只返回某个流中的数据
  14. .apply(new JoinFunction<Tuple3<String, Long, Double>, Tuple3<String, Long, Double>, Tuple3<String, Long, Double>>() {
  15. @Override
  16. public Tuple3<String, Long, Double> join(Tuple3<String, Long, Double> left, Tuple3<String, Long, Double> right) throws Exception {
  17. return left._3() > right._3() ? left : right;
  18. }
  19. });
  20. return joinedStream;
  21. }

下图是tumble window join的示意图,但是下面join结果有些歧义,像是笛卡尔积。其实只要在join的时候加上where条件就不可能会产生下面笛卡尔积的情况了。

下图还有一个信息点,在最后一个窗口的时候,只有橙色流中有数据,绿色流中并没有数据,那么这个窗口的计算不会被触发。

 

sliding Join 测试

  1. /**
  2. * sliding Join 测试
  3. *
  4. * @param source20
  5. * @param source30
  6. * @return
  7. */
  8. public DataStream<Tuple3<String, Long, Double>> slidingJoin(DataStream<Tuple3<String, Long, Double>> source20, DataStream<Tuple3<String, Long, Double>> source30) {
  9. DataStream<Tuple3<String, Long, Double>> joinedStream = source20.join(source30)
  10. .where(e -> e._1().split("-")[1])//左流要join的字段
  11. .equalTo(e -> e._1().split("-")[1])//右侧流要join的字段
  12. .window(SlidingEventTimeWindows.of(Time.milliseconds(50), Time.milliseconds(30)))//指定窗口类型和窗口大小
  13. .apply(new JoinFunction<Tuple3<String, Long, Double>, Tuple3<String, Long, Double>, Tuple3<String, Long, Double>>() {
  14. @Override
  15. public Tuple3<String, Long, Double> join(Tuple3<String, Long, Double> left, Tuple3<String, Long, Double> right) throws Exception {
  16. return left._3() > right._3() ? left : right;
  17. }
  18. });
  19. return joinedStream;
  20. }

sliding join示意图:

interval join有一个需要注意的特点:有些事件可能在一个滑动窗口中没有被join但是在另外一个滑动窗口中去呗join了。比如上图橙色2号事件,在蓝色窗口中没有与绿色流join,但是在后面的绿色窗口中却与绿色3号join了。

 

Session Window Join测试

这种方式用到的场景好像不太多,如果哪天我用到了会在这里补上笔记的。

  1. import org.apache.flink.api.java.functions.KeySelector;
  2. import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
  3. import org.apache.flink.streaming.api.windowing.time.Time;
  4. ...
  5. DataStream<Integer> orangeStream = ...
  6. DataStream<Integer> greenStream = ...
  7. orangeStream.join(greenStream)
  8. .where(<KeySelector>)
  9. .equalTo(<KeySelector>)
  10. //指定Gap大小
  11. .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
  12. .apply (new JoinFunction<Integer, Integer, String> (){
  13. @Override
  14. public String join(Integer first, Integer second) {
  15. return first + "," + second;
  16. }
  17. });

session window join示意图:

上图是session window示意图。可以看到他的原理是通过两个流的间隔时间划分的窗口,这种窗口的数量非常不稳定。如果流中event间隔一直小于指定的GAP,那么窗口会一直不触发。换句话说,这种窗口的触发相比其他窗口而言比较被动,完全是数据驱动的触发,而不是时间驱动的触发。

 

interval Join 测试

  1. /**
  2. * interval Join 测试
  3. *
  4. * @param source20
  5. * @param source30
  6. * @return
  7. */
  8. public DataStream<Tuple3<String, Long, Double>> intervalJoin(DataStream<Tuple3<String, Long, Double>> source20, DataStream<Tuple3<String, Long, Double>> source30) {
  9. SingleOutputStreamOperator<Tuple3<String, Long, Double>> intervalJoinedStream = source20.keyBy(e -> e._1().split("-")[1])
  10. .intervalJoin(source30.keyBy(e -> e._1().split("-")[1]))
  11. .between(Time.milliseconds(-12), Time.milliseconds(9))
  12. //默认情况下上面的between条件是包含边界的,如果不希望包含边界可以使用下面两个方法去除
  13. .lowerBoundExclusive()
  14. .upperBoundExclusive()
  15. .process(new ProcessJoinFunction<Tuple3<String, Long, Double>, Tuple3<String, Long, Double>, Tuple3<String, Long, Double>>() {
  16. @Override
  17. public void processElement(Tuple3<String, Long, Double> left, Tuple3<String, Long, Double> right, Context ctx, Collector<Tuple3<String, Long, Double>> out) throws Exception {
  18. out.collect(left._3() > right._3() ? left : right);
  19. }
  20. });
  21. return intervalJoinedStream;
  22. }

interval join示意图:

上图是interval join示意图。之前遇到过一个场景。假设绿色流和黄色流是两组人的运动轨迹。在黄色2位置,某人进了一家餐馆,求跟黄色2号事件前后5分钟同时进入这家餐馆的绿色事件,使用interval join就很合适。

还有一点需要注意,在上面注释也写明了,图中也画出来了,默认情况下between是包含边界的,如果要去掉边界,需要使用上面两个函数去除边界,当然可以根据情况只去除一个边界。

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/734627
推荐阅读
相关标签
  

闽ICP备14008679号