当前位置:   article > 正文

Hadoop分布式计算框架(MapReduce)—— MapReduce的编程模型_mapreduce编程模型

mapreduce编程模型

目录

2.1 MapReduce分布式计算原理

2.2 MapReduce编程模型

(1)Map阶段

(2)Reduce阶段

2.3 深入剖析MapReduce编程模型

(1)背景分析

(2)问题思路分析

(3)深入剖析MapReduce编程模型


MapReduce自身的命名特点可以看出,MapReduce由Map和Reduce两个部分组成。用户只需实现Mapper和Reducer两个抽象类,编写map和reduce两个函数,即可完成简单的分布式程序的开发。这就是最简单的MapReduce编程模型。

2.1 MapReduce分布式计算原理

MapReduce对外提供了5个可编程组件,分别是InputFormat、Mapper、Partitioner、Reducer、OutputFormat。由于这些组件在MapReduce内部都有默认实现,一般情况下不需要自己开发,只需要实现Mapper和Reducer组件即可。

2.2 MapReduce编程模型

MapReduce编程模型可以分为Map和Reduce阶段。

(1)Map阶段
  • 读取输入文件内容,将输入文件的每一行解析成<key,value>键值对,即[K1,V1]。默认输入格式下,K1表示行偏移量,V1表示读取的行内容。

  • 调用map函数,将[K1,V1]作为参数传入。在map函数中封装了数据处理的逻辑,对输入的,key,value>键值对进行处理。Map任务的具体实现逻辑需要开发者根据不同的业务场景来确定。

  • Map任务的处理结果也是姨<key,value>键值对的形式输出,记为[K2,V2]。

(2)Reduce阶段
  • 数据到达Reduce阶段前,需要经历一个Shuffle过程对多个Map任务的输出进行合并、排序,输出[K2,{V2,...}]。

  • 调用reduce函数,将[K2,{V2...}]作为参数传入。在reduce函数中封装了数据汇总的逻辑,对输入的<key,value>键值对进行汇总处理。

  • Reduce阶段的输出结果可以写到文件系统,如HDFS。

2.3 深入剖析MapReduce编程模型

(1)背景分析

WordCount(单词统计)是最简单也是最能提现MapReduce思想的程序之一,可以称为MapReduce版“HelloWorld"。

为了验证Hadoop集群环境是否安装成功,已经成功运行过WordCount,这里主要从WordCount代码的角度对MapReduce编程模型进行详细分析。WordCount主要完成的功能是统计一系列文本文件中每个单词出现的次数,其实现逻辑如图:

(2)问题思路分析
  • 业务场景

有大量的文件,每个文件里面存储的都是单词。

  • 我们的任务

统计所有文件中每个单词出现的次数。

  • 解决思路

先分别统计出每个文件中各个单词出现的次数,然后再累加不同文件中同一个单词出现次数。

(3)深入剖析MapReduce编程模型
  • 数据分割

    首先将数据文件拆分成分片(Split),分片是用来组织数据块(Block)的,它是一个逻辑概念,用来明确一个分片包含多少个数据块,以及这些数据块存储在哪些DataNode节点上,但它并不实际存储源数据。

    源数据以数据块的形式存储在文件系统上,分片只是连接数据块和Map 任务的一个桥梁。源数据被分割成若干分片,每个分片作为一个Map任务的输入。在Map函数执行过程中,分片会被分解成一个个<key,value>键值对,map函数会迭代处理每条数据。默认情况下,当输入文件较小时,每个数据文件将被划分为一个分片,并将文件按行转换成<key,value>键值对,这一步由MapReduce框架自动完成。

    数据分割过程如图:

  • 数据处理

    将分割好的<key,value>键值对交给用户自定义的map函数进行迭代处理,然后输出新的<key,value>键值对。

    数据处理过程如图:

  • 数据局部合并

    在Map任务处理后的数据写入磁盘之前,线程首先根据Reduce个数对数据进行分区。在每个分区中,后台线程按key值在内存中排序,如果设置了Combiner,它就在排序后的输出上运行。运行Combiner使得Map输出结果更少,从而减少写到磁盘的数据和传递给Reduce的数据。

    数据局部合并过程如图:

  • 数据聚合

    经过复杂的Shuffle过程之后,将Map端的输出数据拉取到Reduce端。Reduce端首先对数据进行合并排序,然后交给reduce函数进行聚合处理。注意,相同key的记录只会交给同一个Reduce进行处理,只有这样才能统计出最终的聚合效果。

    数据聚合过程如图:

  1. public class Wordcount {
  2. /**
  3. Mapper类有4个形参类型:
  4. 前两个参数类型为输入格式的key/value类型
  5. 后两个参数类型为Map输出的key/value类型
  6. Text类型相当于Java 中的String类型
  7. Intwritable类型相当于Java中的Integer类型
  8. */
  9.    public static class TokenizerMapper extends Mapper<object,Text,Text,Intwritable>{
  10.        private final static Intwritable one = new Intwritable(1);
  11.        private Text word = new Text();
  12.        /**map方法有3个参数:
  13.        key:表示每行数据偏移量
  14.        value:表示每行数据内容
  15.        context:保存作业运行的上下文信息,如作业配置、分片,任务 ID 等
  16.        */
  17.        public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
  18.            StringTokenizer itr = new StringTokenizer(value.toString());
  19.            while (itr.hasMoreTokens()){
  20.                word.set(itr.nextToken());
  21.                context.write(word,one);
  22.           }
  23.       }
  24.   }
  25.    /**
  26.    Reducer 类有 4 个形参类型
  27.    前两个参数类型为Map输出的key/value类型
  28.    后两个参数类型为Reduce输出的key/value类型
  29.    */
  30.    public static class IntSumReducer extends Reducer<Text,Intwritable,Text,Intwritable>{
  31.         private Intwritable result = new Intwritable();
  32.        /**
  33.        reduce 方法有 3个参数:
  34.        key 表示 Map 端需要聚合的 key
  35.        values 表示同一个 key的value 值的集合
  36.        context 保存了作业运行的上下文信息
  37.        */
  38.        public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
  39.            int sum =0;
  40.            for (Intwritable val : values){
  41.                sum += val.get();
  42.       }
  43.            result.set(sum);
  44.            context.write(key, result);
  45.       }
  46.   }
  47.    public static void main(String[] args) throws Exception {
  48.        //加载 Hadoop 配置文件
  49.        Configuration conf = new Configuration();
  50.        //新建一个job 对象用来提交作业
  51.        Job job = Job.getInstance(conf);
  52.        //Hadoop客户端通过该类可以反推WordCount所在jar文件的绝对路径
  53.        job.setJarByClass(Wordcount.class);
  54.        //设置 job 的名称
  55.        job.setJobName("WordCount");
  56.        //设置输入输出文件格式,默认为文本格式
  57.        job.setInputFormatClass(TextInputFormat.class);
  58.        job.setOutputFormatClass(TextoutputFormat.class);
  59.        //设置MapReduce输入输出路径
  60.        //注意:输出目录不能提前存在,否则Hadoop会报错并拒绝执行作业
  61.        //目的是防止数据丢失,避免长时间运行的作业结果被意外覆盖掉
  62.        FileInputFormat.addInputPath(job,new Path(args[e]));
  63.        FileOutputFormat.setOutputPath(job,new Path(args[1]));
  64.        //设置自定义Mapper和Reducer类
  65.        job.setMapperClass(WordCountMapper.class);
  66.        job.setReducerClass(WordCountReducer.class);
  67.        //设置Map输出key/value类型
  68.        job.setMapOutputKeyClass(Text.class);
  69.        job.setMapOutputValueClass(IntWritable.class);
  70.        //设置Reduce输出key/value类型
  71.        job.setoutputKeyClass(Text.class);
  72.        job.setOutputValueClass(Intwritable.class);
  73.        //提交 job 作业
  74.        job.waitForCompletion(true);
  75.   }
  76. }

以上就是通过WordCount案例对简单的MapReduce程序进行深入剖析的过程。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/777859
推荐阅读
相关标签
  

闽ICP备14008679号