赞
踩
把元素广播给所有的分区,数据会被重复处理 类似于storm中的allGrouping dataStream.broadcast().这里我先介绍下这个broadcast分区。在之前的文章中说道Flink DataSet的hash分区以及range分区。他们都是把数据源分给不同的线程执行。而broadcast分区则是共享同一份数据。每一个线程的数据都是相同的,并且每一个map操作的并不是线程复制的副本,而是共享的变量。
因此下面我先演示下brocast的分区,看下他的分区规则。
- public class BrocastHandler {
-
- public static void main(String[] args) throws Exception{
- //设置并行度为4
- StreamExecutionEnvironment e = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(4);
- DataStreamSource<Long> streamSource = e.addSource(new SelfdDefinitionSerialSourceFunction());
- //broadcast分区
- streamSource.broadcast().map(new MapFunction<Long, Long>() {
- @Override
- public Long map(Long value) throws Exception {
- System.out.println("接受到的数据:" + value);
- return value;
- }
- }).timeWindowAll(Time.seconds(2)).sum(0).print().setParallelism(1);
- e.execute(BrocastHandler.class.getSimpleName());
-
- }
- }

- public class SelfdDefinitionSerialSourceFunction implements SourceFunction<Long> {
-
- private boolean ISRUNNING=true;
-
- private Long COUNT=0L;
-
- public void run(SourceContext<Long> sourceContext) throws Exception {
-
- while (ISRUNNING){
- sourceContext.collect(COUNT);
- COUNT++;
- Thread.sleep(1000);
- }
-
- }
-
- public void cancel() {
- ISRUNNING=false;
- }
- }

- 接受到的数据:0
- 接受到的数据:0
- 接受到的数据:0
- 接受到的数据:0
- 0
- 接受到的数据:1
- 接受到的数据:1
- 接受到的数据:1
- 接受到的数据:1
- 接受到的数据:2
- 接受到的数据:2
- 接受到的数据:2
- 接受到的数据:2
需求:Flink从数据源获取到用户的姓名,最终需要把用户的姓名和年龄打印出来。
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.functions.RichMapFunction;
- import org.apache.flink.api.java.DataSet;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.DataSource;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.configuration.Configuration;
-
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
- /**
- * brocast广播变量
- *
- * @ProjectName: FlinkLearning
- * @Package: com.caozg.batch
- * @ClassName: BrocastVarHandler
- * @Author: GoSaint
- * @CreateDate: 19-11-2 下午9:12
- * @UpdateDate: 19-11-2 下午9:12
- * @Version: 1.0
- */
- public class BrocastVarHandler {
-
- private static final String BROCASTVALNAME = "employee";
-
- public static void main(String[] args) throws Exception {
- ExecutionEnvironment e = ExecutionEnvironment.getExecutionEnvironment();
- /**
- * Tuple2中存储了用户的姓名和年龄
- */
- List<Tuple2<String, Integer>> data = new ArrayList<>();
- data.add(new Tuple2<>("zs", 18));
- data.add(new Tuple2<>("ls", 21));
- data.add(new Tuple2<>("wu", 26));
-
- DataSource<Tuple2<String, Integer>> source = e.fromCollection(data);
- DataSet<Map<String, Integer>> toBrocastSource = source.map(new MapFunction<Tuple2<String, Integer>, Map<String, Integer>>() {
- @Override
- public Map<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
- Map<String, Integer> maps = new HashMap<>();
- maps.put(value.f0, value.f1);
- return maps;
- }
- });
-
- DataSource<String> dataSource = e.fromElements("zs", "ls", "wu");
- dataSource.map(new RichMapFunction<String, String>() {
-
- List<Map<String, Integer>> targetList = new ArrayList<>();
- Map<String, Integer> targetMap = new HashMap<>();
-
- /**
- * open方法只会执行一次,因此可以在这个方法里面做初始化操作
- * @param parameters
- * @throws Exception
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.targetList = getRuntimeContext().getBroadcastVariable(BROCASTVALNAME);
- for (Map<String, Integer> m : targetList) {
- this.targetMap.putAll(m);
- }
- }
-
- @Override
- public String map(String value) throws Exception {
- Integer age = targetMap.get(value);
- return value+" , "+age;
- }
-
-
- }).withBroadcastSet(toBrocastSource, BROCASTVALNAME).print();
- }
- }

- import org.apache.flink.api.common.JobExecutionResult;
- import org.apache.flink.api.common.accumulators.IntCounter;
- import org.apache.flink.api.common.functions.RichMapFunction;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.DataSource;
- import org.apache.flink.configuration.Configuration;
-
- /**
- * 累加器的演示代码
- * 统计map操作的次数
- */
- public class CounterHandler {
-
- private static final String COUNTERNAME = "num-lines";
-
- public static void main(String[] args) throws Exception {
- ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSource<String> dataSource = environment.fromElements("a", "b", "c", "d");
-
- dataSource.map(new RichMapFunction<String, String>() {
-
- private IntCounter numLines = new IntCounter();
-
- @Override
- public String map(String value) throws Exception {
- //使用累加器
- numLines.add(1);
- return value;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- //注册累加器
- getRuntimeContext().addAccumulator(COUNTERNAME, this.numLines);
- }
- //这里不能使用print sink输出
- }).setParallelism(4).writeAsText("/home/caozg/Desktop/data/count");
-
- JobExecutionResult job = environment.execute(CounterHandler.class.getSimpleName());
- int result = job.getAccumulatorResult(COUNTERNAME);
- System.out.println(result);//结果为4
- }
- }

Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可以进行共享,但是不可以进行修改 Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。
- import org.apache.commons.io.FileUtils;
- import org.apache.flink.api.common.functions.RichMapFunction;
- import org.apache.flink.api.java.DataSet;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.DataSource;
- import org.apache.flink.configuration.Configuration;
-
- import java.io.File;
- import java.util.List;
-
- /**
- * 分布式缓存
- */
- public class CacheHandler {
-
- private static final String CACHEFILENAME = "cache";
-
- public static void main(String[] args) throws Exception {
- ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
- //注册缓存
- environment.registerCachedFile("/home/caozg/Desktop/data/a.txt", CACHEFILENAME);
- DataSource<String> source = environment.fromElements("a", "b", "c", "d");
-
- DataSet<String> res = source.map(new RichMapFunction<String, String>() {
- @Override
- public String map(String value) throws Exception {
- // TODO 这里可以获取缓存数据做业务操作
- return value;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- File file = getRuntimeContext().getDistributedCache().getFile(CACHEFILENAME);
- List<String> list = FileUtils.readLines(file);
- for (String s : list) {
- System.out.println("line:" + s);
- }
- }
- });
-
- res.print();
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。