赞
踩
——wirte by 橙心橙意橙续缘,
白话系列
————————————————————————————
也就是我在写作时完全不考虑写作方面的约束,完全把自己学到的东西、以及理由和所思考的东西等等都用大白话诉说出来,这样能够让信息最大化的从自己脑子里输出并且输入到有需要的同学的脑中。PS:较为专业的地方还是会用专业口语诉说,大家放心!
白话Flink系列
————————————————————————————
主要是记录本人(国内某985研究生)在Flink基础理论阶段学习的一些所学,更重要的是一些所思所想,所参考的视频资料或者博客以及文献资料均在文末放出.由于研究生期间的课题组和研究方向与Flink接轨较多,而且Flink的学习对于想进入大厂的同学们来说也是非常的赞,所以该系列文章会随着本人学习的深入来不断修改和完善,希望大家也可以多批评指正或者提出宝贵建议。
DataSet API中的Join操作将两个DataSets连接成一个DataSet。两个数据集的元素在通过一个或多个键
上进行连接,这些键
可以通过使用
这几种不同的方法来进行指定。
.join().where().equalTo()
public static class User { public String name; public int zip; }
public static class Store { public Manager mgr; public int zip; }
DataSet<User> input1 = // [...]
DataSet<Store> input2 = // [...]
// result dataset is typed as Tuple2
DataSet<Tuple2<User, Store>>
result = input1.join(input2)
.where("zip") // key of the first input (users)
.equalTo("zip"); // key of the second input (stores)
.with(new JoinFunction())
// some POJO public class Rating { public String name; public String category; public int points; } // Join function that joins a custom POJO with a Tuple public class PointWeighter implements JoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> { @Override public Tuple2<String, Double> join(Rating rating, Tuple2<String, Double> weight) { // multiply the points and rating and construct a new output tuple return new Tuple2<String, Double>(rating.name, rating.points * weight.f1); } } DataSet<Rating> ratings = // [...] DataSet<Tuple2<String, Double>> weights = // [...] DataSet<Tuple2<String, Double>> weightedRatings = ratings.join(weights) // key of the first input .where("category") //fileds of POJO // key of the second input .equalTo("f0") //pos of Tuple // applying the JoinFunction on joining pairs .with(new PointWeighter());
.with(new FlatJoinFunction())
public class PointWeighter implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> { @Override public void join(Rating rating, Tuple2<String, Double> weight, Collector<Tuple2<String, Double>> out) { if (weight.f1 > 0.1) { out.collect(new Tuple2<String, Double>(rating.name, rating.points * weight.f1)); } } } DataSet<Tuple2<String, Double>> weightedRatings = ratings.join(weights) // [...] // key of the first input .where("category") //fileds of POJO // key of the second input .equalTo("f0") //pos of Tuple // applying the JoinFunction on joining pairs .with(new PointWeighter());
Join with Projection主要用来选择JOIN后加入到新的DataSet的字段和其顺序
。
.projectFirst(0)
,.projectSecond()
DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, String, Double, Byte>>
result =
input1.join(input2)
// key definition on first DataSet using a field position key
.where(0)
// key definition of second DataSet using a field position key
.equalTo(0)
// select and reorder fields of matching tuples
.projectFirst(0,2).projectSecond(1).projectFirst(1);
projectFirst(int…)和projectSecond(int…)选择第一个DataSet和第二个Dataset加入到Join后的输出的字段,这些字段应该被组装成一个输出元组。索引的顺序定义了输出元组中字段的顺序。
.joinWithTiny()
,joinWithHuge()
DataSet<Tuple2<Integer, String>> input1 = // [...] DataSet<Tuple2<Integer, String>> input2 = // [...] DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>> result1 = // hint that the second DataSet is very small input1.joinWithTiny(input2) .where(0) .equalTo(0); DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>> result2 = // hint that the second DataSet is very large input1.joinWithHuge(input2) .where(0) .equalTo(0);
手动选择
一种策略,以防你想强制执行特定的Join方式
。.join(#dataset,#JoinHint)
DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]
DataSet<Tuple2<SomeType, AnotherType> result =
input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
.where("id").equalTo("key");
JoinHint有以下几种取值。
OPTIMIZER_CHOOSES:相当于完全不给提示,让系统来选择。
BROADCAST_HASH_FIRST:广播第一个输入,并据此建立一个哈希表,由第二个输入探测。如果第一个输入的数据非常小,这是一个很好的策略
。
BROADCAST_HASH_SECOND: 广播第二个输入,并从中建立一个哈希表,由第一个输入探测。如果第二个输入非常小,是一个很好的策略
。
REPARTITION_HASH_FIRST:系统对每个输入进行分区(洗牌)(除非输入已经被分区),并从第一个输入建立一个哈希表。如果第一个输入比第二个输入小,但两个输入都很大,这个策略就很好
。注意:如果无法估计大小,也无法重新使用已有的分区和排序,系统就会使用这个默认的后备策略。
REPARTITION_HASH_SECOND:系统对每个输入进行分区(洗牌)(除非输入已经被分区),并从第二个输入建立一个哈希表。如果第二个输入比第一个输入小,但两个输入仍然很大,这个策略就很好
。
REPARTITION_SORT_MERGE:系统对每个输入进行分区(洗牌)(除非输入已经分区),并对每个输入进行排序(除非已经排序)。通过对排序后的输入进行流式合并来加入这些输入。如果一个或两个输入都已经被排序,这种策略是很好的
。
DataSet API中的OuterJoin操作在两个DataSet上执行左、右或全外连接
。外联接与常规(内联接)
类似,为所有键相等的元素创建Tuple对。
OuterJoin与Join的区别
此外,如果在另一侧没有找到匹配的键,"外侧 "的记录(左、右,或者在完全的情况下两者都有)将被保留
。匹配的元素对(或一个元素和另一个输入的空值)被交给JoinFunction将这对元素变成一个元素,或交给FlatJoinFunction将这对元素变成任意多个(包括无)元素。
两个DataSets的元素都是通过一个或多个键连接的,这些键可以通过使用下面的方法来指定
Tuple DataSet only
)OuterJoin操作调用一个用户定义的join Function来处理Joining Tuple。Join Function接收第一个输入DataSet的一个元素和第二个输入DataSet的一个元素,并准确地返回一个元素。根据外部连接的类型(左、右、全),Join Function的两个输入元素中可以有一个是空的
。
下面的代码使用key-selector functions执行DataSet与自定义java对象和Tuple DataSet的左外连接,并展示了如何使用用户定义的Join Function。
// some POJO public class Rating { public String name; public String category; public int points; } // Join function that joins a custom POJO with a Tuple public class PointAssigner implements JoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> { @Override public Tuple2<String, Integer> join(Tuple2<String, String> movie, Rating rating) { // Assigns the rating points to the movie. // NOTE: rating might be null return new Tuple2<String, Double>(movie.f0, rating == null ? -1 : rating.points; } } DataSet<Tuple2<String, String>> movies = // [...] DataSet<Rating> ratings = // [...] DataSet<Tuple2<String, Integer>> moviesWithPoints = movies.leftOuterJoin(ratings) // key of the first input .where("f0") // key of the second input .equalTo("name") // applying the JoinFunction on joining pairs .with(new PointAssigner());
类似于Map和FlatMap,带有Flat-Join Function的OuterJoin与带有Join Function的OuterJoin行为相同,但它不是返回一个元素,而是可以返回(收集)、零、一个或多个元素。
public class PointAssigner implements FlatJoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> { @Override public void join(Tuple2<String, String> movie, Rating rating, Collector<Tuple2<String, Integer>> out) { if (rating == null ) { out.collect(new Tuple2<String, Integer>(movie.f0, -1)); } else if (rating.points < 10) { out.collect(new Tuple2<String, Integer>(movie.f0, rating.points)); } else { // do not emit } } DataSet<Tuple2<String, Integer>> moviesWithPoints = movies.leftOuterJoin(ratings) // [...] // key of the first input .where("f0") // key of the second input .equalTo("name") // applying the JoinFunction on joining pairs .with(new PointAssigner());
Flink运行时可以以各种方式执行OuterJoin。每一种可能的方式在不同的情况下都会优于其他方式。系统试图自动选择一种合理的方式,但允许你手动选择一种策略
,以防你想强制执行特定的外连接
方式。
Join Algorithm Hints主要包括以下几种。
OPTIMIZER_CHOOSES。相当于完全不给提示,让系统来选择。
BROADCAST_HASH_FIRST:广播第一个输入,并据此建立一个哈希表,由第二个输入探测。如果第一个输入的数据非常小,这是一个很好的策略
。
BROADCAST_HASH_SECOND: 广播第二个输入,并从中建立一个哈希表,由第一个输入探测。如果第二个输入非常小,是一个很好的策略
。
REPARTITION_HASH_FIRST:系统对每个输入进行分区(洗牌)(除非输入已经被分区),并从第一个输入建立一个哈希表。如果第一个输入比第二个输入小,但两个输入仍然很大,这个策略就很好
。
REPARTITION_HASH_SECOND:系统对每个输入进行分区(洗牌)(除非输入已经被分区),并从第二个输入建立一个哈希表。如果第二个输入比第一个输入小,但两个输入仍然很大,这个策略就很好
。
REPARTITION_SORT_MERGE:系统对每个输入进行分区(洗牌)(除非输入已经分区),并对每个输入进行排序(除非已经排序)。通过对排序后的输入进行流式合并来加入这些输入。如果一个或两个输入都已经被排序,这个策略就很好
。
注意:目前还不是所有的执行策略都被每个外连接类型所支持。
LeftOuterJoin支持
DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]
DataSet<Tuple2<SomeType, AnotherType> result1 =
input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE)
.where("id").equalTo("key");
DataSet<Tuple2<SomeType, AnotherType> result2 =
input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST)
.where("id").equalTo("key");
Window join将两个流的元素连接起来,这两个流有一个共同的键
,并且位于同一个窗口
中。这些窗口可以通过使用窗口分配器来定义,并对来自两个流的元素进行评估。
然后,来自两边的元素被传递到一个用户定义的或用户可以发出的符合加入标准的结果.JoinFunctionFlatJoinFunction。
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
关于语义的一些说明。
内连接
,这意味着一个流中的元素如果没有另一个流中的相应元素与之连接,就不会发出。窗口中最大的时间戳
作为它们的时间戳
。例如,一个窗口的边界是9,那么加入的元素的时间戳就会是9。当执行Tumbling Window Join时,所有具有共同的键和共同的滚动窗口的元素都会被连接为成对组合。
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; ... DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream.join(greenStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(TumblingEventTimeWindows.of(Time.milliseconds(2))) .apply (new JoinFunction<Integer, Integer, String> (){ @Override public String join(Integer first, Integer second) { return first + "," + second; } });
当执行Sliding Window Join时,所有具有共同键和共同滑动窗口的元素都会以成对组合的方式加入
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; ... DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream.join(greenStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)) .apply (new JoinFunction<Integer, Integer, String> (){ @Override public String join(Integer first, Integer second) { return first + "," + second; } });
当执行Session Window Join时,所有具有相同键的元素,当 "组合 "满足会话标准时,将以成对组合的方式进行连接
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; ... DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream.join(greenStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(EventTimeSessionWindows.withGap(Time.milliseconds(1))) .apply (new JoinFunction<Integer, Integer, String> (){ @Override public String join(Integer first, Integer second) { return first + "," + second; } });
Interval Join将两个流的元素(我们暂且称它们为A和B)用一个共同的键连接起来,流B中的元素的时间戳
与流A中元素的时间戳
处于一个相对的时间间隔
。
这个条件可以用下面的表达式来表示。
其中a和b是A和B的元素,它们有一个共同的键。下界和上界都可以是负的或正的,只要下界总是小于或等于上界。
Interval Join目前只执行内连接和事件时间。
当一对元素被传递,它们将被赋予两个元素中较大的时间戳。
在上面的例子中,我们将两个流’橙色’和’绿色’连接起来,下界为-2毫秒,上界为+1毫秒。默认情况下,这些边界是包容的,但可以应用.lowerBoundExclusive()
和.upperBoundExclusive()
来改变边界行为。
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound。
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.windowing.time.Time; ... DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream .keyBy(<KeySelector>) .intervalJoin(greenStream.keyBy(<KeySelector>)) .between(Time.milliseconds(-2), Time.milliseconds(1)) .process (new ProcessJoinFunction<Integer, Integer, String(){ @Override public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) { out.collect(first + "," + second); } });
类似于SQL JOIN子句。连接两个表。两个表必须有不同的字段名,并且必须通过join操作符或使用where或filter操作符定义至少一个平等连接谓词。
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.join(right)
.where($("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
注意:对于流式查询,计算查询结果所需的状态可能会根据不同输入行的数量而无限增长。请提供一个具有有效保留时间间隔
的查询配置,以防止状态大小过大
类似于SQL 中的LEFT/RIGHT/FULL OUTER JOIN
子句。Outer Join用来连接两个表,两个表必须有不同的字段名
,并且必须定义至少一个平等连接谓词。
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table leftOuterResult = left.leftOuterJoin(right, $("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
Table rightOuterResult = left.rightOuterJoin(right, $("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
Table fullOuterResult = left.fullOuterJoin(right, $("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
注意:对于流式查询,计算查询结果所需的状态可能会根据不同输入行的数量而无限增长。请提供一个具有有效保留时间间隔
的查询配置,以防止状态大小过大。
注:Interval Join是常规连接的一个子集,可以用流式处理,同时支持内联接和外联接。
一个interval join至少需要一个等价连接谓词和一个Join条件,以限制双方的时间
。这样的条件可以由两个合适的范围谓词(<,<=,>=,>)或一个比较两个输入表的相同类型的时间属性(即处理时间或事件时间)的单一平等谓词来定义。
例如,以下谓词是有效的区间连接条件。
Table left = tableEnv.fromDataSet(ds1, $("a"), $("b"), $("c"), $("ltime").rowtime());
Table right = tableEnv.fromDataSet(ds2, $("d"), $("e"), $("f"), $("rtime").rowtime()));
Table result = left.join(right)
.where(
and(
$("a").isEqual($("d")), // 一个Join条件
$("ltime").isGreaterOrEqual($("rtime").minus(lit(5).minutes())), // ltime >= rtime - 10.minutes
$("ltime").isLess($("rtime").plus(lit(10).minutes())) // ltime < rtime + 10.minutes
))
.select($("a"), $("b"), $("e"), $("ltime"));
Interval Join 是用来限制Join双方的时间的,只有符合连接条件的才会进行Join
用Table Function的结果Join一个Table。左表(外表)的每条记录都与相应的Table Function调用所产生的所有记录合并。如果左(外)表的Table Function调用返回的结果是空的,则放弃该表的某行。
// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);
// join
Table orders = tableEnv.from("Orders");
Table result = orders
.joinLateral(call("split", $("c")).as("s", "t", "v"))
.select($("a"), $("b"), $("s"), $("t"), $("v"));
用Table Function的结果Join一个Table。左表(外表)的每条记录都与相应的表函数调用所产生的所有记录合并。如果表函数调用返回的结果为空,则保留相应的外侧行,并将结果用空值填充。
注意:目前,表函数左外侧连接的谓词只能是空或字面为真。
// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);
// join
Table orders = tableEnv.from("Orders");
Table result = orders
.leftOuterJoinLateral(call("split", $("c")).as("s", "t", "v"))
.select($("a"), $("b"), $("s"), $("t"), $("v"));
时态表(Temporal tables)是跟踪随时间变化的表。
Temporal Table Function提供了对时态表在特定时间点的状态的访问。用时态表函数连接表的语法与带表函数的内部连接中的语法相同。
目前只支持与时态表的内联接
。
Table ratesHistory = tableEnv.from("RatesHistory");
// register temporal table function with a time attribute and primary key
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
"r_proctime",
"r_currency");
tableEnv.registerFunction("rates", rates);
// join with "Orders" based on the time attribute and key
Table orders = tableEnv.from("Orders");
Table result = orders
.joinLateral(call("rates", $("o_proctime")), $("o_currency").isEqual($("r_currency")))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。