当前位置:   article > 正文

flink 广播变量&&累加器&&计数器_flink map获取累加值

flink map获取累加值

广播变量

Flink Broadcast(广播变量)

广播变量允许编程人员在每台机器上保持一个只读缓存变量,而不是传送变量的副本给tasks,广播变量创建之后,他可以运行在集群中任何function上,而不需要多次传递给集群节点,另外不要更改广播变量,这样才能确保每个节点获取到的数据是一致的,简单理解就是一个公共的共享变量,我们可以把dataSet数据集广播出去,然后不同的task在节点上都能获取到,这个数值在每个节点上只会在每个节点上保留一份,如果不使用broadcast,则在每个节点中的每个task都要拷贝一份dataset数据集,比较浪费内存也就是一个节点中可能存在多份dataset数据。

 

##用法

##1.初始化数据

DataSet<Integer> toBroadcast =env.fromElements(1,2,3)

##2.广播数据

   .withBroadcastSet(toBoradcast,”broadcastsetname”);

##3.获取数据

   Collection<Integer> broadcastSet=getRuntimeContext().getBroadcastVariable(“broadcastsetname”);

###注意

广播出去的变量存在于每个节点的内存中,所以这个 数据集不能太大,因为广播出去的数据,会常驻内存,除非程序执行结束

广播变量在初始化广播出去之后不支持修改,这样才能保持一致性。

public static void main(String[] args) throws Exception{
  ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment()
;

 
//准备数据
 
ArrayList<Tuple2<String,Integer>> broadcast=new ArrayList<>();
 
broadcast.add(new Tuple2<>("zhangsan",29));
 
broadcast.add(new Tuple2<>("lisi",45));
 
broadcast.add(new Tuple2<>("wangwu",98));
 
DataSet<Tuple2<String,Integer>> ds=env.fromCollection(broadcast);
 
//处理要广播的数据,吧数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄
 
DataSet<HashMap<String,Integer>>tobct= ds.map(new MapFunction<Tuple2<String,Integer>, HashMap<String,Integer>>() {
   
@Override
   
public HashMap<String, Integer> map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
      HashMap<String
,Integer> res=new HashMap<>();
     
res.put(stringIntegerTuple2.f0,stringIntegerTuple2.f1);
      return
res;
   
}
  })
;


 
//源数据
 
DataSource<String> data=env.fromElements("zhangsan","wangwu","lisi");

 
DataSet<String> result=data.map(new RichMapFunction<String, String>() {
   
//这个方法只会执行一次可以实现初始化的功能
   
List<HashMap<String,Integer>>  broadcastMap=new ArrayList<>();
   
HashMap<String,Integer> allmap=new HashMap<>();
   
@Override
   
public void open(Configuration parameters) throws Exception {
     
//可以获取广播数据
     
super.open(parameters);
       this
.broadcastMap=getRuntimeContext().getBroadcastVariable("ds");
       for
(HashMap map :broadcastMap){
        
allmap.putAll(map);
      
}
    }
   
//来一次数据处理一次,参数表示用户名
   
@Override
   
public String map(String s) throws Exception {
      Integer age=
allmap.get(s);
      return
s+" "+age;
   
}
  }).withBroadcastSet(tobct
,"ds");//执行广播数据操作;
 
result.print();
}

 

DataStreaming 中的Broadcast

-把元素广播给所有哦的分区,数据会被重复处理,类似于Storm中的allGrouping

dataStream.broadcast();

 

 

 

 

 Flink累加器&&计数器

Accumulators即累加器,与MapReduce counter应用场景类似,都能很好的观察task在运行期间的数据变化,可以在flink job 任务中的算子函数中操作累加器,但是只能在执行结束后才能获取累加器的最终结果Counters是一个具体的累加器Accumulator实现,

IntCounter,LongCounter和DoubleCounter

用法

  1. 创建累加器

Private IntCounter numlines=new IntCounter();

  1. 注册累加器

getRuntimContext().addAccumulator(“num-lines”,this.numlines)

  1. 使用累加器

This.numlines.add(1)

  1. 获取累加器结果

myJobExcutionResult.getAccumulatorResutl(“num-lines”)

 

不适用累加器,直接自定义sum进行计数

public static void main(String[] args) throws Exception {

  ExecutionEnvironment env= ExecutionEnvironment.getExecutionEnvironment();

  

  DataSource<String> data=env.fromElements("a","b","c","d");

  

  DataSet<String> result=data.map(new RichMapFunction<String, String>() {

    //1.创建累加器

    private IntCounter numlines=new IntCounter();

  

   // int sum=0;

    @Override

    public void open(Configuration parameters) throws Exception {

      super.open(parameters);

      //注册累加器

      getRuntimeContext().addAccumulator("num-lines",this.numlines);

    }

  

    @Override

    public String map(String s) throws Exception {

      //如果并行度为1,使用普通的累加求和,结果正确,但是设置多个并行度,则普通的累加器求和结果不对

      //sum++;

      //System.out.println("sum: "+sum);

      //3.使用累加器

      this.numlines.add(1);

      return s;

    }

  });

  result.writeAsText("d:\\counter.text");

  JobExecutionResult jobResult=env.execute("counter");

  int num=jobResult.getAccumulatorResult("num-lines");

  System.out.println("num: "+num);

  }

 

累加器是可以在不同任务中对同一个变量进行累加操作

 

Broadcast允许程序员将一个只读的变量缓存在每台机器上,而不用在任务间传递变量,广播变量可以共享,但是不能修改。

 

Flink distributed cache(分布式缓存)

Flink 提供了一个分布式缓存,类似于hadoop可以是使用户在并行函数中很方便的读取本地文件。工作机制:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称,当程序执行,flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,用户可以通过这个指定的名称查找文件或者目录,然后taskmanager节点的本地文件系统访问他。

###用法

  1. 注册一个文件

EnvregisterCachedFile(“hdfs://master:9000/data”,”hdfsFlie”);

  1. 访问数据

FilemyFile =getRuntimeContext().getDistributedCache().getFile(“hdfsFile”);

如果出现以下异常

flink Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supply…..

请在pom文件中加入

<dependency>

   <groupId>org.apache.flink</groupId>

   <artifactId>flink-shaded-hadoop2</artifactId>

   <version>${flink.version}</version>

</dependency>

并启动hadoop集群

public static void main(String[] args) throws Exception {

  ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();

  

  //1、注册一个文件

  env.registerCachedFile("hdfs://master:9000/data/fof/input/friend.txt","hdfs");

  

  DataSource<String> data= env.fromElements("a","b","c","d");

  DataSet<String> result = data.map(new RichMapFunction<String, String>() {

  

    private ArrayList<String> alist = new ArrayList();

  

    @Override

    public void open(Configuration parameters) throws Exception {

      super.open(parameters);

  

      //使用文件

      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfs");

      List<String> list = FileUtils.readLines(myFile);

      for (String line : list) {

        this.alist.add(line);

        System.out.println(line);

      }

    }

  

    @Override

    public String map(String s) throws Exception {

      return s;

    }

  });

  result.print();

  }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/696173
推荐阅读
相关标签
  

闽ICP备14008679号