赞
踩
和所有的大数据组件一样,Flink也有广播变量,其目的是:将一份小数据集分发到每台机器上方便每个slot直接使用,减少拉取数据的过程~
类似Spark的broadcast的使用,广播变量有以下的特点:
# 初始化flink环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); # 设置默认算子并行度 env.setParallelism(1); // Sc1 generates M parameters a,b,c for second degree polynomials P(x) = ax^2 + bx + c identified by id DataSet<Integer<String>> sc1 = env.fromElements('小明', '小黑','老王'); // Sc2 generates N x values to be evaluated with the polynomial identified by id DataSet<Tuple2<String, Integer>> sc2 = env .fromElements(new Tuple2<>("1", 5), new Tuple2<>("2", 3), new Tuple2<>("3", 6)); // Sc3 generates N y values to be evaluated with the polynomial identified by id DataSet<Tuple2<String, Integer>> sc3 = env .fromElements(new Tuple2<>("1", 2), new Tuple2<>("2", 3), new Tuple2<>("3", 7)); // Jn1 matches x and y values on id and emits (id, x, y) triples JoinOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> jn1 = sc2.join(sc3).where(0).equalTo(0).with(new Jn1()); // Jn2 matches polynomial and arguments by id, computes p = min(P(x),P(y)) and emits (id, p) tuples JoinOperator<Tuple3<String, Integer, Integer>, Tuple4<String, Integer, Integer, Integer>, Tuple2<String, Integer>> jn2 = jn1.join(sc1).where(0).equalTo(0).with(new Jn2()); // Mp1 selects (id, x, y) triples where x = y and broadcasts z (=x=y) to Mp2 FlatMapOperator<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> mp1 = jn1.flatMap(new Mp1()); // Mp2 filters out all p values which can be divided by z List<Tuple2<String, Integer>> result = jn2.flatMap(new Mp2()).withBroadcastSet(mp1, "z").collect(); JavaProgramTestBase.compareResultAsText(result, RESULT);
具体使用方法 withBroadcastSet(mp1, “z”),接受两个参数
这个类的源码如下,同时也可以看出广播变量的operate算子级别的,每个operate算子在使用的时候直接更加别名获取就能拿到全局一致的结果。
def withBroadcastSet(data: DataSet[_], name: String) = {
javaSet match {
case udfOp: UdfOperator[_] => udfOp.withBroadcastSet(data.javaSet, name)
case _ =>
throw new UnsupportedOperationException("Operator " + javaSet.toString + " cannot have " +
"broadcast variables.")
}
this
}
flink DataSet的广播变量和spark rdd/dataFrame的广播变量其实是类似的。
但是flink作为一个流处理系统,更多的是处理实时流系统的,那么【广播变量】这个词就不太适合了;与此对应的成为【广播状态】,一个典型的应用场景就是【实时配置流+数据流】的课题,下篇文章揭晓。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。