当前位置:   article > 正文

Flink基础系列-DataSet广播变量_flink中广播流 建议缓存多少数据呀

flink中广播流 建议缓存多少数据呀

广播变量简介

和所有的大数据组件一样,Flink也有广播变量,其目的是:将一份小数据集分发到每台机器上方便每个slot直接使用,减少拉取数据的过程~

类似Spark的broadcast的使用,广播变量有以下的特点:

  • 数据只在广播端进行修改,使用方是没有权利改变数据的结果和具体值的权力;
  • 广播出去的数据是存储在每个节点的memory中,因此这个数据不能太大。建议一般不超过GB,也可以根据环境的实际情况决定;
  • 对于批处理需要手工释放广播变量,但是对于flink流系统有checkpoint维护广播变量的状态;

使用示例:

1、代码级别(参见最后一行)

# 初始化flink环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

# 设置默认算子并行度
env.setParallelism(1);

// Sc1 generates M parameters a,b,c for second degree polynomials P(x) = ax^2 + bx + c identified by id
DataSet<Integer<String>> sc1 = env.fromElements('小明', '小黑''老王');

// Sc2 generates N x values to be evaluated with the polynomial identified by id
DataSet<Tuple2<String, Integer>> sc2 = env
		.fromElements(new Tuple2<>("1", 5), new Tuple2<>("2", 3), new Tuple2<>("3", 6));

// Sc3 generates N y values to be evaluated with the polynomial identified by id
DataSet<Tuple2<String, Integer>> sc3 = env
		.fromElements(new Tuple2<>("1", 2), new Tuple2<>("2", 3), new Tuple2<>("3", 7));

// Jn1 matches x and y values on id and emits (id, x, y) triples
JoinOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> jn1 =
		sc2.join(sc3).where(0).equalTo(0).with(new Jn1());

// Jn2 matches polynomial and arguments by id, computes p = min(P(x),P(y)) and emits (id, p) tuples
JoinOperator<Tuple3<String, Integer, Integer>, Tuple4<String, Integer, Integer, Integer>, Tuple2<String, Integer>> jn2 =
		jn1.join(sc1).where(0).equalTo(0).with(new Jn2());

// Mp1 selects (id, x, y) triples where x = y and broadcasts z (=x=y) to Mp2
FlatMapOperator<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> mp1 =
		jn1.flatMap(new Mp1());

// Mp2 filters out all p values which can be divided by z
List<Tuple2<String, Integer>> result = jn2.flatMap(new Mp2()).withBroadcastSet(mp1, "z").collect();

JavaProgramTestBase.compareResultAsText(result, RESULT);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

具体使用方法 withBroadcastSet(mp1, “z”),接受两个参数

  • 具体数据
  • 广播变量的别名

这个类的源码如下,同时也可以看出广播变量的operate算子级别的,每个operate算子在使用的时候直接更加别名获取就能拿到全局一致的结果。

def withBroadcastSet(data: DataSet[_], name: String) = {
    javaSet match {
      case udfOp: UdfOperator[_] => udfOp.withBroadcastSet(data.javaSet, name)
      case _ =>
        throw new UnsupportedOperationException("Operator " + javaSet.toString + " cannot have " +
          "broadcast variables.")
    }
    this
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

总结

flink DataSet的广播变量和spark rdd/dataFrame的广播变量其实是类似的。

但是flink作为一个流处理系统,更多的是处理实时流系统的,那么【广播变量】这个词就不太适合了;与此对应的成为【广播状态】,一个典型的应用场景就是【实时配置流+数据流】的课题,下篇文章揭晓。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/696244
推荐阅读
相关标签
  

闽ICP备14008679号