赞
踩
Flink Broadcast(广播变量)
广播变量允许编程人员在每台机器上保持一个只读缓存变量,而不是传送变量的副本给tasks,广播变量创建之后,他可以运行在集群中任何function上,而不需要多次传递给集群节点,另外不要更改广播变量,这样才能确保每个节点获取到的数据是一致的,简单理解就是一个公共的共享变量,我们可以把dataSet数据集广播出去,然后不同的task在节点上都能获取到,这个数值在每个节点上只会在每个节点上保留一份,如果不使用broadcast,则在每个节点中的每个task都要拷贝一份dataset数据集,比较浪费内存也就是一个节点中可能存在多份dataset数据。
##用法 ##1.初始化数据 DataSet<Integer> toBroadcast =env.fromElements(1,2,3) ##2.广播数据 .withBroadcastSet(toBoradcast,”broadcastsetname”); ##3.获取数据 Collection<Integer> broadcastSet=getRuntimeContext().getBroadcastVariable(“broadcastsetname”); ###注意 广播出去的变量存在于每个节点的内存中,所以这个 数据集不能太大,因为广播出去的数据,会常驻内存,除非程序执行结束 广播变量在初始化广播出去之后不支持修改,这样才能保持一致性。 public static void main(String[] args) throws Exception{
|
DataStreaming 中的Broadcast
-把元素广播给所有哦的分区,数据会被重复处理,类似于Storm中的allGrouping
dataStream.broadcast();
Accumulators即累加器,与MapReduce counter应用场景类似,都能很好的观察task在运行期间的数据变化,可以在flink job 任务中的算子函数中操作累加器,但是只能在执行结束后才能获取累加器的最终结果Counters是一个具体的累加器Accumulator实现,
IntCounter,LongCounter和DoubleCounter 用法
Private IntCounter numlines=new IntCounter();
getRuntimContext().addAccumulator(“num-lines”,this.numlines)
This.numlines.add(1)
myJobExcutionResult.getAccumulatorResutl(“num-lines”)
不适用累加器,直接自定义sum进行计数 public static void main(String[] args) throws Exception { ExecutionEnvironment env= ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> data=env.fromElements("a","b","c","d"); DataSet<String> result=data.map(new RichMapFunction<String, String>() { //1.创建累加器 private IntCounter numlines=new IntCounter(); // int sum=0; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //注册累加器 getRuntimeContext().addAccumulator("num-lines",this.numlines); } @Override public String map(String s) throws Exception { //如果并行度为1,使用普通的累加求和,结果正确,但是设置多个并行度,则普通的累加器求和结果不对 //sum++; //System.out.println("sum: "+sum); //3.使用累加器 this.numlines.add(1); return s; } }); result.writeAsText("d:\\counter.text"); JobExecutionResult jobResult=env.execute("counter"); int num=jobResult.getAccumulatorResult("num-lines"); System.out.println("num: "+num); }
累加器是可以在不同任务中对同一个变量进行累加操作
Broadcast允许程序员将一个只读的变量缓存在每台机器上,而不用在任务间传递变量,广播变量可以共享,但是不能修改。 |
Flink distributed cache(分布式缓存)
Flink 提供了一个分布式缓存,类似于hadoop可以是使用户在并行函数中很方便的读取本地文件。工作机制:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称,当程序执行,flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,用户可以通过这个指定的名称查找文件或者目录,然后taskmanager节点的本地文件系统访问他。
###用法
EnvregisterCachedFile(“hdfs://master:9000/data”,”hdfsFlie”);
FilemyFile =getRuntimeContext().getDistributedCache().getFile(“hdfsFile”); 如果出现以下异常 flink Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supply….. 请在pom文件中加入 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId> <version>${flink.version}</version> </dependency> 并启动hadoop集群 public static void main(String[] args) throws Exception { ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment(); //1、注册一个文件 env.registerCachedFile("hdfs://master:9000/data/fof/input/friend.txt","hdfs"); DataSource<String> data= env.fromElements("a","b","c","d"); DataSet<String> result = data.map(new RichMapFunction<String, String>() { private ArrayList<String> alist = new ArrayList(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //使用文件 File myFile = getRuntimeContext().getDistributedCache().getFile("hdfs"); List<String> list = FileUtils.readLines(myFile); for (String line : list) { this.alist.add(line); System.out.println(line); } } @Override public String map(String s) throws Exception { return s; } }); result.print(); }
|
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。