赞
踩
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; // 继承可并行的sourceFunction,并制定数据的输出类型 public class StreamDataSourceA extends RichParallelSourceFunction<Tuple3<String, String, Long>> { /** * volatile: 确保本条指令不会因编译器的优化而省略。 * 保证了一个线程修改了某个变量的值, * 这新值对其他线程来说是立即可见的。(实现可见性) */ private volatile boolean flag = true; //执行程序的 @Override public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception { //准备好数据 Tuple3[] elements = new Tuple3[]{ Tuple3.of("a", "1", 1000000050000L), //[50000 - 60000) Tuple3.of("a", "2", 1000000054000L), //[50000 - 60000) Tuple3.of("a", "3", 1000000079900L), //[70000 - 80000) Tuple3.of("a", "4", 1000000115000L), //[110000 - 120000) // 115000 - 5001 = 109998 <= 109999 Tuple3.of("b", "5", 1000000100000L), //[100000 - 110000) Tuple3.of("b", "6", 1000000108000L) //[100000 - 110000) }; // 将tp3数组中的每一个tp都进行输出 int count = 0; while (flag && count < elements.length) { ctx.collect(Tuple3.of((String) elements[count].f0, (String) elements[count].f1, (Long) elements[count].f2)); count++; //程序睡眠1s,保证数据已经全部发出 Thread.sleep(1000); } //Thread.sleep(1000); } //While循环一直进行run读取处理,改变flag退出循环 @Override public void cancel() { flag = false; } }
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; public class StreamDataSourceB extends RichParallelSourceFunction<Tuple3<String, String, Long>> { private volatile boolean flag = true; @Override public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception { // a ,1 hangzhou Tuple3[] elements = new Tuple3[]{ Tuple3.of("a", "hangzhou", 1000000059000L), //[50000, 60000) Tuple3.of("b", "beijing", 1000000105000L), //[100000, 110000) }; int count = 0; while (flag && count < elements.length) { //将数据发出去 ctx.collect(new Tuple3<>((String) elements[count].f0, (String) elements[count].f1, (long) elements[count].f2)); count++; Thread.sleep(1000); } //Thread.sleep(100000000); } @Override public void cancel() { flag = false; } }
import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class FlinkTumblingWindowsInnerJoinDemo { public static void main(String[] args) throws Exception { int windowSize = 10; long delay = 5002L; //wm = // 115000 - 5002 = 109998 <= 109999 // 触发不了窗口,下面的数据能正常接收,不会产生迟到 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设定EventTime作为时间标准 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //为了测试方便,这里设置并行度为1 (实际生产中,只有当所有subTask中的窗口都触发,窗口才会触发) env.setParallelism(1); // 一、加载数据源 SingleOutputStreamOperator<Tuple3<String, String, Long>> leftSource = env.addSource(new StreamDataSourceA()).name("SourceA"); SingleOutputStreamOperator<Tuple3<String, String, Long>> rightSource = env.addSource(new StreamDataSourceB()).name("SourceB");
//("a", "1", 1000) SingleOutputStreamOperator<Tuple3<String, String, Long>> leftStream = leftSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) { @Override public long extractTimestamp(Tuple3<String, String, Long> element) { return element.f2; } }); //("a", "hangzhou", 6000) SingleOutputStreamOperator<Tuple3<String, String, Long>> rightStream = rightSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) { @Override public long extractTimestamp(Tuple3<String, String, Long> element) { return element.f2; } });
DataStream<Tuple5<String, String, String, Long, Long>> joined = leftStream.join(rightStream) //join条件相等的字段 .where(new LeftSelectKey()) .equalTo(new RightSelectKey()) .window(TumblingEventTimeWindows.of(Time.seconds(windowSize))) //划分窗口 .apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String, String, String, Long, Long>>() { @Override public Tuple5<String, String, String, Long, Long> join(Tuple3<String, String, Long> first, Tuple3<String, String, Long> second) throws Exception { // 两个流的key的值相等,并且在同一个窗口内 // (a, 1, "hangzhou", 1000001000, 1000006000) return new Tuple5<>(first.f0, first.f1, second.f1, first.f2, second.f2); } }); joined.print(); env.execute("FlinkTumblingWindowsInnerJoinDemo"); } // leftStream获取join 的条件相等字段 public static class LeftSelectKey implements KeySelector<Tuple3<String, String, Long>, String> { @Override public String getKey(Tuple3<String, String, Long> value) throws Exception { return value.f0; } } public static class RightSelectKey implements KeySelector<Tuple3<String, String, Long>, String> { @Override public String getKey(Tuple3<String, String, Long> value) throws Exception { return value.f0; } } }
import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class FlinkTumblingWindowsLeftJoinDemo { public static void main(String[] args) throws Exception { int windowSize = 10; long delay = 5002L; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // 一、获取两个输入流 SingleOutputStreamOperator<Tuple3<String, String, Long>> sourceA = env.addSource(new StreamDataSourceA()).name("SourceA"); SingleOutputStreamOperator<Tuple3<String, String, Long>> sourceB = env.addSource(new StreamDataSourceB()).name("SourceB");
SingleOutputStreamOperator<Tuple3<String, String, Long>> leftStream = sourceA.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) { @Override public long extractTimestamp(Tuple3<String, String, Long> element) { return element.f2; } }); SingleOutputStreamOperator<Tuple3<String, String, Long>> rightStream = sourceB.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) { @Override public long extractTimestamp(Tuple3<String, String, Long> element) { return element.f2; } });
leftStream.coGroup(rightStream) .where(new LeftSelectKey()) .equalTo(new RightSelectKey()) .window(TumblingEventTimeWindows.of(Time.seconds(windowSize))) .apply(new LeftJoin()) .print(); env.execute("FlinkTumblingWindowsLeftJoinDemo"); /** * 最终的输出结果: * (a,1,hangzhou,1000000050000,1000000059000) * (a,2,hangzhou,1000000054000,1000000059000) * (a,3,null,1000000079900,-1) * (a,4,null,1000000115000,-1) * (b,5,beijing,1000000100000,1000000105000) * (b,6,beijing,1000000108000,1000000105000) */ }
public static class LeftJoin implements CoGroupFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String, String, String, Long, Long>> { // 将key相同,并且在同一窗口的数据取出来 @Override public void coGroup(Iterable<Tuple3<String, String, Long>> first, Iterable<Tuple3<String, String, Long>> second, Collector<Tuple5<String, String, String, Long, Long>> out) throws Exception { for (Tuple3<String, String, Long> leftElem : first) { boolean hadElements = false; //如果左边的流join上了右边的流rightElements就不为空,就会走下面的增强for循环 for (Tuple3<String, String, Long> rightElem : second) { //将join上的数据输出 out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, rightElem.f1, leftElem.f2, rightElem.f2)); hadElements = true; } if (!hadElements) { //没join上,给右边的数据赋空值 out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, "null", leftElem.f2, -1L)); } } } }
public static class LeftSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
@Override
public String getKey(Tuple3<String, String, Long> value) throws Exception {
return value.f0;
}
}
public static class RightSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
@Override
public String getKey(Tuple3<String, String, Long> value) throws Exception {
return value.f0;
}
}
int windowSize = 10; //窗口大小:秒
long delay = 5002L; // 延迟时间
StreamDataSourceA数据源中:
Tuple3[] elements = new Tuple3[]{
Tuple3.of("a", "1", 1000000050000L), //[50000 - 60000)
Tuple3.of("a", "2", 1000000054000L), //[50000 - 60000)
Tuple3.of("a", "3", 1000000079900L), //[70000 - 80000)
Tuple3.of("a", "4", 1000000115000L), //[110000 - 120000) // 115000 - 5001 = 109998 <= 109999
Tuple3.of("b", "5", 1000000100000L), //[100000 - 110000)
Tuple3.of("b", "6", 1000000108000L) //[100000 - 110000)
};
在StreamDataSourceA中数据源是这样的
将延迟时间调大,比如delay = 5002L,这样 WaterMark = 115000 - 5002 = 109998 <= 109999,[100000 - 110000)这个窗口就不会被触发,后两条数据也就能接受到!
但是这也不是最好的解决办法。
最好的解决办法应该是,对迟到的数据,单独去接受,而不是丢弃。
具体的实现方法,请看下一篇博客!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。