赞
踩
在Apache Flink中,"广播"是一种特殊的数据分发模式,用于将数据从一个并行操作传播到整个作业的所有并行任务中。广播操作对于将少量数据有效地分发给并行任务,以便它们能够共享这些数据而不必进行昂贵的网络通信,是非常有用的。它通常用于将配置信息、静态数据集或参考数据传播给Flink作业中的所有并行任务。
广播的主要优势在于,它避免了将数据通过网络发送到所有并行任务的开销,而是直接将数据复制到每个任务的本地内存中。这样,任务可以直接从本地内存访问数据,无需进行远程通信。这对于那些需要频繁访问同一份数据的场景非常有利,可以显著降低作业的整体延迟。
配置参数传播: 广播非常适合将配置参数传播到所有并行任务。例如,如果您的Flink作业需要根据特定的配置进行处理,您可以将这些配置参数广播到所有任务中,避免了每个任务单独获取配置的开销。
规则和字典传递: 在一些数据处理场景中,需要使用一组静态的规则或字典进行数据转换或处理。通过广播这些规则或字典,可以避免每个任务都去加载这些数据,从而减少处理延迟和资源消耗。
静态数据集共享: 如果您有一个静态数据集(不随时间变化)需要在整个作业的所有任务之间共享,广播可以提供一种高效的方式来共享这些数据,而不需要进行重复传输。
机器学习模型参数共享: 在机器学习任务中,有时候需要将训练好的模型参数广播给所有任务,以便它们在进行推断或预测时能够共享这些参数,避免重复计算。
数据分片信息: 在一些任务中,需要根据数据的分片信息来进行特定处理。通过广播数据分片信息,所有任务可以了解到数据的分片情况,从而更有效地进行处理。
数据缓存: 当需要频繁访问同一份数据时,通过广播将数据复制到每个任务的本地内存中,可以避免重复从磁盘或网络读取数据,从而提高处理性能。
需要注意的是,广播适用于较小的数据集。如果广播的数据集非常大,超过了内存容量,反而可能导致性能问题,因为它会消耗过多的内存资源。
在Flink中,要实现广播操作,需要遵循以下步骤:
创建要广播的数据集:首先,您需要准备要广播的数据集。通常,这是一个小型的静态数据集,例如配置参数、字典、规则等。这些数据将被广播到整个作业的所有并行任务中。
广播数据集:一旦您有了要广播的数据集,您需要使用broadcast()
方法将其标记为广播数据集。这将会在作业的整个执行过程中将数据复制到所有任务的本地内存中。
使用广播数据集:在作业的其他部分,您可以通过调用withBroadcastSet()
方法将广播数据集传递给算子函数。这样,每个并行任务都可以访问广播数据集,而不必重新复制或通过网络通信。
- package com.fwmagic.flink.batch;
-
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.functions.RichMapFunction;
- import org.apache.flink.api.common.restartstrategy.RestartStrategies;
- import org.apache.flink.api.common.time.Time;
- import org.apache.flink.api.java.DataSet;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.DataSource;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.configuration.Configuration;
-
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.concurrent.TimeUnit;
-
- /**
- * Broadcast操作
- */
- public class Broadcast {
-
- public static void main(String[] args) throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(4,Time.of(10, TimeUnit.SECONDS)));
- //1、准备要广播的数据
- ArrayList<Tuple2<String,Integer>> broadData = new ArrayList<>();
- broadData.add(new Tuple2<>("zs", 23));
- broadData.add(new Tuple2<>("ls", 34));
- broadData.add(new Tuple2<>("ww", 45));
-
- //1.1 处理需要广播的数据,把数据集转成map,key为name,value为age
- DataSet<HashMap<String, Integer>> dateSet = env.fromCollection(broadData).map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
- @Override
- public HashMap<String, Integer> map(Tuple2<String, Integer> tuple) throws Exception {
- HashMap<String,Integer> hashMap = new HashMap<>();
- hashMap.put(tuple.f0, tuple.f1);
- return hashMap;
- }
- });
-
- //源数据
- DataSource<String> dataSource = env.fromElements("zs", "ls", "ww");
-
- //注意:在这里需要使用到RichMapFunction获取广播变量
- DataSet<String> result = dataSource.map(new RichMapFunction<String, String>() {
-
- List<HashMap<String, Integer>> broadcastMap = new ArrayList<>();
- HashMap<String, Integer> allMap = new HashMap<>();
-
-
- @Override
- public void open(Configuration parameters) throws Exception {
- broadcastMap = getRuntimeContext().getBroadcastVariable("mybroadcastdata");
- for (HashMap<String, Integer> map : broadcastMap) {
- allMap.putAll(map);
- }
- }
-
- @Override
- public String map(String name) throws Exception {
- Integer age = allMap.get(name);
- return name + "," + age;
- }
-
- }).withBroadcastSet(dateSet, "mybroadcastdata");
-
- result.print();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。