当前位置:   article > 正文

Flink的分区策略_flink分区策略

flink分区策略

目录

1. 随机分区(shuffle)

2. 轮询分区(Round-Robin)

3. 重缩放分区(rescale)

4. 广播(broadcast)

5. 全局分区(global)

6. 自定义分区(Custom)


        顾名思义,“分区”(partitioning )操作就是要将数据进行重新分布,传递到不同的流分区
去进行下一步处理。其实我们对分区操作并不陌生,前面介绍聚合算子时,已经提到了 keyBy
它就是一种按照键的哈希值来进行重新分区的操作。只不过这种分区操作只能保证把数据按
key “分开”,至于分得均不均匀、每个 key 的数据具体会分到哪一区去,这些是完全无从控制
的——所以我们有时也说, keyBy 是一种逻辑分区( logical partitioning )操作。
        如果说 keyBy 这种逻辑分区是一种“软分区”,那真正硬核的分区就应该是所谓的“物理
分区”( physical partitioning )。也就是我们要真正控制分区策略,精准地调配数据,告诉每个
数据到底去哪里。其实这种分区方式在一些情况下已经在发生了:例如我们编写的程序可能对
多个处理任务设置了不同的并行度,那么当数据执行的上下游任务并行度变化时,数据就不应
该还在当前分区以直通( forward )方式传输了——因为如果并行度变小,当前分区可能没有
下游任务了;而如果并行度变大,所有数据还在原先的分区处理就会导致资源的浪费。所以这
种情况下,系统会自动地将数据均匀地发往下游所有的并行任务,保证各个分区的负载均衡。
        有些时候,我们还需要手动控制数据分区分配策略。比如当发生数据倾斜的时候,系统无
法自动调整,这时就需要我们重新进行负载均衡,将数据流较为平均地发送到下游任务操作分
区中去。 Flink 对于经过转换操作之后的 DataStream ,提供了一系列的底层操作接口,能够帮
我们实现数据流的手动重分区。为了同 keyBy 相区别,我们把这些操作统称为“物理分区”
操作。物理分区与 keyBy 另一大区别在于, keyBy 之后得到的是一个 KeyedStream ,而物理分
区之后结果仍是 DataStream ,且流中元素数据类型保持不变。从这一点也可以看出,分区算子
并不对数据进行转换处理,只是定义了数据的传输方式。
         Flink 中算子改变并行度,默认RebalancePartitioner分区策略。
        常见的物理分区策略有随机分配(Random )、轮询分配(Round-Robin)、重缩放(Rescale) 和广播(Broadcast),下边我们分别来做了解。

1. 随机分区(shuffle

        最简单的重分区方式就是直接“洗牌”。通过调用 DataStream .shuffle() 方法,将数据随
机地分配到下游算子的并行任务中去。
        随机分区服从均匀分布(uniform distribution ),所以可以把流中的数据随机打乱,均匀地
传递到下游任务分区,如图 5-9 所示。因为是完全随机的,所以对于同样的输入数据 , 每次执
行得到的结果也不会相同。

        经过随机分区之后,得到的依然是一个 DataStream
我们可以做个简单测试:将数据读入之后直接打印到控制台,将输出的并行度设置为 4
中间经历一次 shuffle 。执行多次,观察结果是否相同。
  1. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. public class ShuffleTest {
  4. public static void main(String[] args) throws Exception {
  5. // 创建执行环境
  6. StreamExecutionEnvironment env =
  7. StreamExecutionEnvironment.getExecutionEnvironment();
  8. env.setParallelism(1);
  9. // 读取数据源,并行度为 1
  10. DataStreamSource<Event> stream = env.addSource(new ClickSource());
  11. // 经洗牌后打印输出,并行度为 4
  12. stream.shuffle().print("shuffle").setParallelism(4);
  13. env.execute();
  14. }
  15. }
可以得到如下形式的输出结果:
shuffle:1> Event{user='Bob', url='./cart', timestamp=...}
shuffle:4> Event{user='Cary', url='./home', timestamp=...}
shuffle:3> Event{user='Alice', url='./fav', timestamp=...}
shuffle:4> Event{user='Cary', url='./cart', timestamp=...}
shuffle:3> Event{user='Cary', url='./fav', timestamp=...}
shuffle:1> Event{user='Cary', url='./home', timestamp=...}
shuffle:2> Event{user='Mary', url='./home', timestamp=...}
shuffle:1> Event{user='Bob', url='./fav', timestamp=...}
shuffle:2> Event{user='Mary', url='./home', timestamp=...}

2. 轮询分区(Round-Robin

        轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分
发,如图 5-10 所示。通过调用 DataStream .rebalance() 方法,就可以实现轮询重分区。 rebalance 使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。
注:Round-Robin 算法用在了很多地方,例如 Kafka 和 Nginx。

我们同样可以在代码中进行测试:
  1. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. public class RebalanceTest {
  4. public static void main(String[] args) throws Exception {
  5. // 创建执行环境
  6. StreamExecutionEnvironment env =
  7. StreamExecutionEnvironment.getExecutionEnvironment();
  8. env.setParallelism(1);
  9. // 读取数据源,并行度为 1
  10. DataStreamSource<Event> stream = env.addSource(new ClickSource());
  11. // 经轮询重分区后打印输出,并行度为 4
  12. stream.rebalance().print("rebalance").setParallelism(4);
  13. env.execute();
  14. }
  15. }
输出结果的形式如下所示,可以看到,数据被平均分配到所有并行任务中去了。
rebalance:2> Event{user='Cary', url='./fav', timestamp=...}
rebalance:3> Event{user='Mary', url='./cart', timestamp=...}
rebalance:4> Event{user='Mary', url='./fav', timestamp=...}
rebalance:1> Event{user='Cary', url='./home', timestamp=...}
rebalance:2> Event{user='Cary', url='./cart', timestamp=...}
rebalance:3> Event{user='Alice', url='./prod?id=1', timestamp=...}
rebalance:4> Event{user='Cary', url='./prod?id=2', timestamp=...}
rebalance:1> Event{user='Bob', url='./prod?id=2', timestamp=...}
rebalance:2> Event{user='Alice', url='./prod?id=1', timestamp=...}

3. 重缩放分区(rescale

        重缩放分区和轮询分区非常相似。当调用 rescale() 方法时,其实底层也是使用 Round-Robin
算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,如图 5-11 所示。也就
是说,“发牌人”如果有多个,那么 rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale
的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。

        当下游任务(数据接收方)的数量是上游任务(数据发送方)数量的整数倍时,rescale
的效率明显会更高。比如当上游任务数量是 2 ,下游任务数量是 6 时,上游任务其中一个分区
的数据就将会平均分配到下游任务的 3 个分区中。
        由于 rebalance 是所有分区数据的“重新平衡”,当 TaskManager 数据量较多时,这种跨节
点的网络传输必然影响效率;而如果我们配置的 task slot 数量合适,用 rescale 的方式进行“局
部重缩放”,就可以让数据只在当前 TaskManager 的多个 slot 之间重新分配,从而避免了网络
传输带来的损耗。
        从底层实现上看,rebalance rescale 的根本区别在于任务之间的连接机制不同。 rebalance
将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这
是一个笛卡尔积的关系;而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信
通道,节省了很多资源。
可以在代码中测试如下:
  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import
  3. org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
  4. public class RescaleTest {
  5. public static void main(String[] args) throws Exception {
  6. StreamExecutionEnvironment env =
  7. StreamExecutionEnvironment.getExecutionEnvironment();
  8. env.setParallelism(1);
  9. // 这里使用了并行数据源的富函数版本
  10. // 这样可以调用 getRuntimeContext 方法来获取运行时上下文的一些信息
  11. env
  12. .addSource(new RichParallelSourceFunction<Integer>() {
  13. @Override
  14. public void run(SourceContext<Integer> sourceContext) throws
  15. Exception {
  16. for (int i = 0; i < 8; i++) {
  17. // 将奇数发送到索引为 1 的并行子任务
  18. // 将偶数发送到索引为 0 的并行子任务
  19. if ((i + 1) % 2 ==
  20. getRuntimeContext().getIndexOfThisSubtask()) {
  21. sourceContext.collect(i + 1);
  22. }
  23. }
  24. }
  25. @Override
  26. public void cancel() {
  27. }
  28. })
  29. .setParallelism(2)
  30. .rescale()
  31. .print().setParallelism(4);
  32. env.execute();
  33. }
  34. }
这里使用 rescale 方法,来做数据的分区,输出结果是:
4> 3
3> 1
1> 2
1> 6
3> 5
4> 7
2> 4
2> 8
可以将 rescale 方法换成 rebalance 方法,来体会一下这两种方法的区别。

4. 广播(broadcast

        这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一
份,可能进行重复处理。可以通过调用 DataStream broadcast() 方法,将输入数据复制并发送
到下游算子的所有并行任务中去。
具体代码测试如下:
 
  1. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. public class BroadcastTest {
  4. public static void main(String[] args) throws Exception {
  5. // 创建执行环境
  6. StreamExecutionEnvironment env =
  7. StreamExecutionEnvironment.getExecutionEnvironment();
  8. env.setParallelism(1);
  9. // 读取数据源,并行度为 1
  10. DataStreamSource<Event> stream = env.addSource(new ClickSource());
  11. // 经广播后打印输出,并行度为 4
  12. stream. broadcast().print("broadcast").setParallelism(4);
  13. env.execute();
  14. }
  15. }
输出结果的形式如下所示:
broadcast:3> Event{user='Mary', url='./cart', timestamp=...}
broadcast:1> Event{user='Mary', url='./cart', timestamp=...}
broadcast:4> Event{user='Mary', url='./cart', timestamp=...}
broadcast:2> Event{user='Mary', url='./cart', timestamp=...}
broadcast:2> Event{user='Alice', url='./fav', timestamp=...}
broadcast:1> Event{user='Alice', url='./fav', timestamp=...}
broadcast:3> Event{user='Alice', url='./fav', timestamp=...}
broadcast:4> Event{user='Alice', url='./fav', timestamp=...}
可以看到,数据被复制然后广播到了下游的所有并行任务中去了。

5. 全局分区(global

        全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global() 方法,会将所
有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行
度变成了 1 ,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

6. 自定义分区(Custom

        当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 , 我 们 可 以 通 过 使 用
partitionCustom() 方法来自定义分区策略。
        在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner )对象,第二个
是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,
也可以通过字段位置索引来指定,还可以实现一个 KeySelector
例如,我们可以对一组自然数按照奇偶性进行重分区。代码如下:
 
  1. import org.apache.flink.api.common.functions.Partitioner;
  2. import org.apache.flink.api.java.functions.KeySelector;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. public class CustomPartitionTest {
  5. public static void main(String[] args) throws Exception {
  6. StreamExecutionEnvironment env =
  7. StreamExecutionEnvironment.getExecutionEnvironment();
  8. env.setParallelism(1);
  9. // 将自然数按照奇偶分区
  10. env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
  11. .partitionCustom(new Partitioner<Integer>() {
  12. @Override
  13. public int partition(Integer key, int numPartitions) {
  14. return key % 2;
  15. }
  16. }, new KeySelector<Integer, Integer>() {
  17. @Override
  18. public Integer getKey(Integer value) throws Exception {
  19. return value;
  20. }
  21. })
  22. .print().setParallelism(2);
  23. env.execute();
  24. }
  25. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/659835
推荐阅读
相关标签
  

闽ICP备14008679号