赞
踩
目录
从MapReduce自身的命名特点可以看出,MapReduce由Map和Reduce两个部分组成。用户只需实现Mapper和Reducer两个抽象类,编写map和reduce两个函数,即可完成简单的分布式程序的开发。这就是最简单的MapReduce编程模型。
MapReduce对外提供了5个可编程组件,分别是InputFormat、Mapper、Partitioner、Reducer、OutputFormat。由于这些组件在MapReduce内部都有默认实现,一般情况下不需要自己开发,只需要实现Mapper和Reducer组件即可。
MapReduce编程模型可以分为Map和Reduce阶段。
读取输入文件内容,将输入文件的每一行解析成<key,value>键值对,即[K1,V1]。默认输入格式下,K1表示行偏移量,V1表示读取的行内容。
调用map函数,将[K1,V1]作为参数传入。在map函数中封装了数据处理的逻辑,对输入的,key,value>键值对进行处理。Map任务的具体实现逻辑需要开发者根据不同的业务场景来确定。
Map任务的处理结果也是姨<key,value>键值对的形式输出,记为[K2,V2]。
数据到达Reduce阶段前,需要经历一个Shuffle过程对多个Map任务的输出进行合并、排序,输出[K2,{V2,...}]。
调用reduce函数,将[K2,{V2...}]作为参数传入。在reduce函数中封装了数据汇总的逻辑,对输入的<key,value>键值对进行汇总处理。
Reduce阶段的输出结果可以写到文件系统,如HDFS。
WordCount(单词统计)是最简单也是最能提现MapReduce思想的程序之一,可以称为MapReduce版“HelloWorld"。
为了验证Hadoop集群环境是否安装成功,已经成功运行过WordCount,这里主要从WordCount代码的角度对MapReduce编程模型进行详细分析。WordCount主要完成的功能是统计一系列文本文件中每个单词出现的次数,其实现逻辑如图:
业务场景
有大量的文件,每个文件里面存储的都是单词。
我们的任务
统计所有文件中每个单词出现的次数。
解决思路
先分别统计出每个文件中各个单词出现的次数,然后再累加不同文件中同一个单词出现次数。
数据分割
首先将数据文件拆分成分片(Split),分片是用来组织数据块(Block)的,它是一个逻辑概念,用来明确一个分片包含多少个数据块,以及这些数据块存储在哪些DataNode节点上,但它并不实际存储源数据。
源数据以数据块的形式存储在文件系统上,分片只是连接数据块和Map 任务的一个桥梁。源数据被分割成若干分片,每个分片作为一个Map任务的输入。在Map函数执行过程中,分片会被分解成一个个<key,value>键值对,map函数会迭代处理每条数据。默认情况下,当输入文件较小时,每个数据文件将被划分为一个分片,并将文件按行转换成<key,value>键值对,这一步由MapReduce框架自动完成。
数据分割过程如图:
数据处理
将分割好的<key,value>键值对交给用户自定义的map函数进行迭代处理,然后输出新的<key,value>键值对。
数据处理过程如图:
数据局部合并
在Map任务处理后的数据写入磁盘之前,线程首先根据Reduce个数对数据进行分区。在每个分区中,后台线程按key值在内存中排序,如果设置了Combiner,它就在排序后的输出上运行。运行Combiner使得Map输出结果更少,从而减少写到磁盘的数据和传递给Reduce的数据。
数据局部合并过程如图:
数据聚合
经过复杂的Shuffle过程之后,将Map端的输出数据拉取到Reduce端。Reduce端首先对数据进行合并排序,然后交给reduce函数进行聚合处理。注意,相同key的记录只会交给同一个Reduce进行处理,只有这样才能统计出最终的聚合效果。
数据聚合过程如图:
- public class Wordcount {
- /**
- Mapper类有4个形参类型:
- 前两个参数类型为输入格式的key/value类型
- 后两个参数类型为Map输出的key/value类型
- Text类型相当于Java 中的String类型
- Intwritable类型相当于Java中的Integer类型
- */
- public static class TokenizerMapper extends Mapper<object,Text,Text,Intwritable>{
- private final static Intwritable one = new Intwritable(1);
- private Text word = new Text();
- /**map方法有3个参数:
- key:表示每行数据偏移量
- value:表示每行数据内容
- context:保存作业运行的上下文信息,如作业配置、分片,任务 ID 等
- */
- public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
- StringTokenizer itr = new StringTokenizer(value.toString());
- while (itr.hasMoreTokens()){
- word.set(itr.nextToken());
- context.write(word,one);
- }
- }
- }
- /**
- Reducer 类有 4 个形参类型
- 前两个参数类型为Map输出的key/value类型
- 后两个参数类型为Reduce输出的key/value类型
- */
- public static class IntSumReducer extends Reducer<Text,Intwritable,Text,Intwritable>{
- private Intwritable result = new Intwritable();
- /**
- reduce 方法有 3个参数:
- key 表示 Map 端需要聚合的 key
- values 表示同一个 key的value 值的集合
- context 保存了作业运行的上下文信息
- */
- public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
- int sum =0;
- for (Intwritable val : values){
- sum += val.get();
- }
- result.set(sum);
- context.write(key, result);
- }
- }
- public static void main(String[] args) throws Exception {
- //加载 Hadoop 配置文件
- Configuration conf = new Configuration();
- //新建一个job 对象用来提交作业
- Job job = Job.getInstance(conf);
- //Hadoop客户端通过该类可以反推WordCount所在jar文件的绝对路径
- job.setJarByClass(Wordcount.class);
- //设置 job 的名称
- job.setJobName("WordCount");
- //设置输入输出文件格式,默认为文本格式
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextoutputFormat.class);
- //设置MapReduce输入输出路径
- //注意:输出目录不能提前存在,否则Hadoop会报错并拒绝执行作业
- //目的是防止数据丢失,避免长时间运行的作业结果被意外覆盖掉
- FileInputFormat.addInputPath(job,new Path(args[e]));
- FileOutputFormat.setOutputPath(job,new Path(args[1]));
- //设置自定义Mapper和Reducer类
- job.setMapperClass(WordCountMapper.class);
- job.setReducerClass(WordCountReducer.class);
- //设置Map输出key/value类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- //设置Reduce输出key/value类型
- job.setoutputKeyClass(Text.class);
- job.setOutputValueClass(Intwritable.class);
- //提交 job 作业
- job.waitForCompletion(true);
- }
- }
以上就是通过WordCount案例对简单的MapReduce程序进行深入剖析的过程。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。