当前位置:   article > 正文

Flink之详解InnerJoin、leftJoin以及窗口延迟时间的问题_flink left join

flink left join

以案例驱动对join的认知

一、前提:

  • 要想两个数据流进行Join,必须对两个流数据划分相同的窗口,在同一个窗口中,进行数据的Join连接。
  • 这里使用EventTime,划分滚动窗口
  • Flink只支持等值Join,即key相等的

二、数据源

1、StreamDataSourceA
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

// 继承可并行的sourceFunction,并制定数据的输出类型
public class StreamDataSourceA extends RichParallelSourceFunction<Tuple3<String, String, Long>> {

    /**
     * volatile: 确保本条指令不会因编译器的优化而省略。
     * 保证了一个线程修改了某个变量的值,
     * 这新值对其他线程来说是立即可见的。(实现可见性)
     */
    private volatile boolean flag = true;

    //执行程序的
    @Override
    public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
        //准备好数据
        Tuple3[] elements = new Tuple3[]{
                Tuple3.of("a", "1", 1000000050000L), //[50000 - 60000)
                Tuple3.of("a", "2", 1000000054000L), //[50000 - 60000)
                Tuple3.of("a", "3", 1000000079900L), //[70000 - 80000)
                Tuple3.of("a", "4", 1000000115000L), //[110000 - 120000)  // 115000 - 5001 = 109998‬ <= 109999
                Tuple3.of("b", "5", 1000000100000L), //[100000 - 110000)
                Tuple3.of("b", "6", 1000000108000L)  //[100000 - 110000)
        };

        // 将tp3数组中的每一个tp都进行输出
        int count = 0;
        while (flag && count < elements.length) {

            ctx.collect(Tuple3.of((String) elements[count].f0,
                    (String) elements[count].f1, (Long) elements[count].f2));
            count++;

            //程序睡眠1s,保证数据已经全部发出
            Thread.sleep(1000);
        }
        //Thread.sleep(1000);
    }

    //While循环一直进行run读取处理,改变flag退出循环
    @Override
    public void cancel() {
        flag = false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
2、StreamDataSourceB
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class StreamDataSourceB extends RichParallelSourceFunction<Tuple3<String, String, Long>> {

    private volatile boolean flag = true;

    @Override
    public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {

        // a ,1 hangzhou
        Tuple3[] elements = new Tuple3[]{
                Tuple3.of("a", "hangzhou", 1000000059000L), //[50000, 60000)
                Tuple3.of("b", "beijing",  1000000105000L), //[100000, 110000)
        };

        int count = 0;
        while (flag && count < elements.length) {
            //将数据发出去
            ctx.collect(new Tuple3<>((String) elements[count].f0,
                    (String) elements[count].f1, (long) elements[count].f2));

            count++;
            Thread.sleep(1000);
        }

        //Thread.sleep(100000000);
    }

    @Override
    public void cancel() {
        flag = false;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

三、InnerJoin(只保留能join上的)代码

1、加载数据源
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class FlinkTumblingWindowsInnerJoinDemo {
    public static void main(String[] args) throws Exception {
        int windowSize = 10;
        long delay = 5002L;
        //wm = // 115000 - 5002 = 109998‬ <= 109999
        // 触发不了窗口,下面的数据能正常接收,不会产生迟到


        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设定EventTime作为时间标准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //为了测试方便,这里设置并行度为1 (实际生产中,只有当所有subTask中的窗口都触发,窗口才会触发)
        env.setParallelism(1);

        // 一、加载数据源
        SingleOutputStreamOperator<Tuple3<String, String, Long>> leftSource =
                env.addSource(new StreamDataSourceA()).name("SourceA");

        SingleOutputStreamOperator<Tuple3<String, String, Long>> rightSource =
                env.addSource(new StreamDataSourceB()).name("SourceB");

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
2、对两个数据设置WaterMark
        //("a", "1", 1000)
        SingleOutputStreamOperator<Tuple3<String, String, Long>> leftStream =
                leftSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String,
                        String,
                        Long>>(Time.milliseconds(delay)) {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element) {

                        return element.f2;
                    }
                });

        //("a", "hangzhou", 6000)
        SingleOutputStreamOperator<Tuple3<String, String, Long>> rightStream =
                rightSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String,
                        String,
                        Long>>(Time.milliseconds(delay)) {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element) {
                        return element.f2;
                    }
                });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
3、做InnerJoin操作
  • 划分滚动窗口
  • 调用apply方法,将同一个窗口join上的全量数据,全部收集
  • 新写两个获取key的类
        DataStream<Tuple5<String, String, String, Long, Long>> joined = leftStream.join(rightStream)
                //join条件相等的字段
                .where(new LeftSelectKey())
                .equalTo(new RightSelectKey())
                .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))  //划分窗口
                .apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String,
                        String, String, Long, Long>>() {
                    @Override
                    public Tuple5<String, String, String, Long, Long> join(Tuple3<String, String, Long> first,
                                                                           Tuple3<String, String, Long> second) throws Exception {
                        // 两个流的key的值相等,并且在同一个窗口内
                        // (a, 1, "hangzhou", 1000001000, 1000006000)
                        return new Tuple5<>(first.f0, first.f1, second.f1, first.f2, second.f2);
                    }
                });

        joined.print();
        env.execute("FlinkTumblingWindowsInnerJoinDemo");
    }


    // leftStream获取join 的条件相等字段
    public static class LeftSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
        @Override
        public String getKey(Tuple3<String, String, Long> value) throws Exception {
            return value.f0;
        }
    }

    public static class RightSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
        @Override
        public String getKey(Tuple3<String, String, Long> value) throws Exception {
            return value.f0;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

四、LeftJoin代码

1、获取两个输入流
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class FlinkTumblingWindowsLeftJoinDemo {
    public static void main(String[] args) throws Exception {
        int windowSize = 10;
        long delay = 5002L;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // 一、获取两个输入流
        SingleOutputStreamOperator<Tuple3<String, String, Long>> sourceA =
                env.addSource(new StreamDataSourceA()).name("SourceA");

        SingleOutputStreamOperator<Tuple3<String, String, Long>> sourceB =
                env.addSource(new StreamDataSourceB()).name("SourceB");
                
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
2、抽取timeStamp 设定waterMark
        SingleOutputStreamOperator<Tuple3<String, String, Long>> leftStream =
                sourceA.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String,
                        Long>>(Time.milliseconds(delay)) {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element) {
                        return element.f2;
                    }
                });

        SingleOutputStreamOperator<Tuple3<String, String, Long>> rightStream =
                sourceB.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String,
                        Long>>(Time.milliseconds(delay)) {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element) {
                        return element.f2;
                    }
                });

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
3、做left join操作,调用coGroup方法
  • ① 做left join操作,调用coGroup方法。 进到coGroup里的就是同一个窗口且key相同的,不一定有数据
  • ② 划分滚动窗口
  • ③ 调用apply方法,将同一个窗口join上的全量数据,全部收集
        leftStream.coGroup(rightStream)
                .where(new LeftSelectKey())
                .equalTo(new RightSelectKey())
                .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
                .apply(new LeftJoin())
                .print();

        env.execute("FlinkTumblingWindowsLeftJoinDemo");
        /**
         *	最终的输出结果:
         * 	(a,1,hangzhou,1000000050000,1000000059000)
         * 	(a,2,hangzhou,1000000054000,1000000059000)
         *	(a,3,null,1000000079900,-1)
         * 	(a,4,null,1000000115000,-1)
         * 	(b,5,beijing,1000000100000,1000000105000)
         * 	(b,6,beijing,1000000108000,1000000105000)
         */
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
(1)window.apply方法的LeftJoin类主逻辑代码
  • 将key相同,并且在同一窗口的数据取出来
  • 在coGroup方法时,通过key相等作为join条件,这样leftJoin后,有的数据会没有join上
  • 右边没有join上的,就补null值
 public static class LeftJoin implements CoGroupFunction<Tuple3<String, String, Long>, Tuple3<String, String,
            Long>, Tuple5<String,
            String, String, Long, Long>> {

        // 将key相同,并且在同一窗口的数据取出来
        @Override
        public void coGroup(Iterable<Tuple3<String, String, Long>> first, Iterable<Tuple3<String, String, Long>> second,
                            Collector<Tuple5<String, String, String, Long, Long>> out) throws Exception {

            for (Tuple3<String, String, Long> leftElem : first) {
                boolean hadElements = false;

                //如果左边的流join上了右边的流rightElements就不为空,就会走下面的增强for循环
                for (Tuple3<String, String, Long> rightElem : second) {
                    //将join上的数据输出
                    out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, rightElem.f1, leftElem.f2,
                            rightElem.f2));
                    hadElements = true;
                }

                if (!hadElements) {
                    //没join上,给右边的数据赋空值
                    out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, "null", leftElem.f2, -1L));
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
(2)取出key来作为join条件的keySelect类
   public static class LeftSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
        @Override
        public String getKey(Tuple3<String, String, Long> value) throws Exception {
            return value.f0;
        }
    }

    public static class RightSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
        @Override
        public String getKey(Tuple3<String, String, Long> value) throws Exception {
            return value.f0;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

五、注意点


int windowSize = 10; //窗口大小:秒
long delay = 5002L; // 延迟时间


StreamDataSourceA数据源中:
 Tuple3[] elements = new Tuple3[]{
                Tuple3.of("a", "1", 1000000050000L), //[50000 - 60000)
                Tuple3.of("a", "2", 1000000054000L), //[50000 - 60000)
                Tuple3.of("a", "3", 1000000079900L), //[70000 - 80000)
                Tuple3.of("a", "4", 1000000115000L), //[110000 - 120000)  // 115000 - 5001 = 109998‬ <= 109999
                Tuple3.of("b", "5", 1000000100000L), //[100000 - 110000)
                Tuple3.of("b", "6", 1000000108000L)  //[100000 - 110000)
        };
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

在StreamDataSourceA中数据源是这样的

  • 当设定的延迟时间小于5002时,比如是5000毫秒
  • 那么数据源中(“a”, “4”, 1000000115000L)这一项中的 waterMark = 115000 - 5000 = 110000‬ >= 109999边界,这个[100000 - 110000)窗口就会触发
  • 此时,(“b”, “5”, 1000000100000L) , (“b”, “6”, 1000000108000L) 这两条数据就进不来了, 因为窗口在读取a4的时候就已经触发了,相当于数据迟到了,没坐上车
  • 所以在打印结果的时候,后两条数据就没有join上

调整办法:

将延迟时间调大,比如delay = 5002L,这样 WaterMark = 115000 - 5002 = 109998‬ <= 109999,[100000 - 110000)这个窗口就不会被触发,后两条数据也就能接受到!

但是这也不是最好的解决办法。
最好的解决办法应该是,对迟到的数据,单独去接受,而不是丢弃。
具体的实现方法,请看下一篇博客!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/784801?site
推荐阅读
相关标签
  

闽ICP备14008679号