赞
踩
MapReduce的大致计算处理过程:
数据是经过mapper 然后 通过Shuffle,最后通过Reducer,然后输出。
Map是映射,负责数据的过滤分发;Reduce是规约,负责数据的计算归并。
根据使用者的意愿,mapper对输入的数据进行操作,选取需要的字段(这个字段可能来自原数据中的值,也可能是原数据中的字段,或者是一个新的字段)和值(这个值可能来自原数据,但是更可能来自经新的定义),组成一个key-value的形式输出返回给reducer,交给reducer后续处理。需要特别注意的是: mapper一次只读取一行(或者说一个记录),也就是说在mapper中的代码逻辑是为了处理一行(一条记录)的,MapReduce本身会自动循环多次,从而达到处理所有数据的效果。(处理所有读入的行的效果)
mapper返回的是 (key1,value1),返回给reducer,这时候reducer接收到一对 (key,value)形式的数据。
这个阶段MapReduce本身已经帮使用者实现了,也就是说使用者只需要实现map和reduce即可完成整个mapreduce过程逻辑,而不需要去开发shuffle程序的逻辑。shuffle实现的是:在map返回的所有数据里,把key相同的放到一个组里面(可以把这个组理解成一个类似数组的东西(可迭代对象)),于是,当map返回的各个单条数据组成的全部数据里有多个不同的key的时候,shuffle里面就会有多个组。这些个组将会一个一个的给reducer处理!当然,map处理过一行数据之后并不会等待其它行的数据也处理完之后才传给shuffle、让shuffle处理划分成组;而是map处理一条数据之后将结果返回给shuffle,这时候shuffle里面之后第一条数据,这条数据达到类似tensorflow里面的占位的作用,当map处理完下一条或者其它条的时候,map会继续将结果传递给shuffle,只要传进去的(key,value)在shuffle中已经有了,它就会归到那个已经有占位的和它有相同的key的组,如果传进去的(key,value)在shuffle中没有,那它将继续占位;直到所有map工作任务达到100%。也就是说在map工作的时候shuffle也是在工作的。
reducer接收到来自shuffle的一组一组的数据,这时候reducer对其进行统计的操作即可达到统计key的个数的效果。当然这看起来所有过程都是在进行统计的过程,但是如果在其中加入一些算法的计算,那么MapReduce将以其强大的集群算力帮我实现大数据集的计算。
MapReduce框架的优点:
不擅长实时计算,像MySQL一样能够立即返回结果
MapReduce的设计本身决定了处理的数据必须是离线数据,因为涉及到数据切分等等。
不擅长DAG(有向图)计算,需要一个Job执行完成之后,另一个Job才能使用他的输出。
MapReduce的总过程可以概括为:
1.输入数据:…
2.map阶段的输入:
<偏移量key1,每一行value1,上下文context>
map阶段的输出:
<key2,value2>,其中key2相当于输入的value1,而value2是经过运算等等得到的。
3.shuffle阶段是MapReduce内部完成的。
shuffler接收来自map的输出,变成shuffler的输入,然后对其以相同的key3(这里的key3其实是map的value2)进行分为一组,然后以<key3,value3>的形式(这里)传给reduce。其实就是达到了多个相同的key分为一组,但是并没有统计(留给reduce做最后的迭代统计)的作用。
4.reduce阶段:
进行一次reduce收到的数据是:多组<key3,value3>组成的一个对象,而且key3是一样的。
这时候需要对value3进行一个牛顿迭代法求和操作,即可得到key3的总value3。达到一个统计的效果。【当然做其它事情也是可以的,按需】
注意: Combiner要慎用,有些场景并不适用Combiner,比如求平均值的场景。
解释: 假如,有1,3,5,7,9五个数为一行的数据文件存在HDFS上,计算其和。
没有Combiner: 首先,读取一行数据进入计算,Mapper返回(偏移量,1),(偏移量,3),(偏移量,5),(偏移量,7),(偏移量,9),进入reducer前根据key对数据进行一个shuffle并返回(key,[1,3,5,7,9])给reducer,reducer根据key对(1,3,5,7,9)进行求和得到结果。
有Combiner:
首先,读取一行数据进入计算,Mapper返回(偏移量1,1),(偏移量1,3),(偏移量1,5),(偏移量2,7),(偏移量2,9),进入reducer前根据key对数据进行一个shuffle并返回(key1,[1,3,5]),(key2,[7,9]);Combiner根据key1和key2进行对[*]求和,然后Reducer再对求和结果进行最终求和,但是与分区不同,Combiner的作用并不会使得数据处理结果产生n个文件。【其实,加入Combiner就相当于是进行了两个reducer】
如何实现Combiner:【main函数下】
job.setCombinerClass(Reducer.class)
分区其实就是根据需求(某个判断条件)把最终的输出数据分发到不同的文件中,其实就是对Mapper划分数据,而负责划分数据的类为Partitioner。
MapReduce默认的Partitioner是HashPartitioner,默认情况下,Partitioner先计算key的散列值(通常为MD5值),然后通过Reducer个数执行求余运算----key的hashCode除以Reducer的个数取余数。
这种方式的好处: 随机的将整个key空间平均分发给每个Reducer,同时也能确保不同Map产生的相同key能被分发到同一个Reducer。
Partitioner的实现例子:
如下根据数组的长度把数据在Mapper阶段划分为2个区,
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class Web_toHivePtion extends Partitioner<NullWritable,Text> { @Override public int getPartition(NullWritable nullWritable, Text text, int numPartition) { String[] tmpL = text.toString().split(","); if (tmpL.length == 5){ return 1 % numPartition; }else{ return 2% numPartition; } } }
mapreduce经典案例解读: 单词统计
题目: 统计文本文件里面的单词的个数,每个单词之间以一个空格分隔。
思路解析: 由mapreduce的基本运作原理,可以得到处理该题目的思路如下:
Mapper阶段:
这里的LongWritable,Text 代表了读入数据之后的key为偏移量,value为一行,也就是说一次读入一行,类型为Text;map输出格式为:key是Text,value是IntWritable。
package com.mytest.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Created by ua07 on 9/20/19. */ public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> { //其实这里LongWritable,Text的LongWritable,Text 代表了读入数据之后的key为偏移量,value为一行,也就是说一次读入一行,类型为Text,概括的话就是mapper的输入格式(后两个);mapper输出格式为:key是Text,value是IntWritable。 /** * * @param key1 * @param value * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key1,Text value,Context context) throws IOException,InterruptedException{ String data = value.toString(); String[] words = data.split(" "); for(String w:words){ context.write(new Text(w),new IntWritable(1)); } } }
Reducer阶段:
package com.mytest.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Created by ua07 on 9/20/19. */ public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> { //这里前面的的Text,IntWritable是reducer的输入,也就是mapper的输出。后面的Text,IntWritable是reducer的输出。 /** * * @param k3 * @param v3 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text k3,Iterable<IntWritable> v3,Context context) throws IOException,InterruptedException{ int total = 0; for (IntWritable v:v3){ total += v.get(); } context.write(k3,new IntWritable(total)); } }
主函数:
package com.mytest.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Created by ua07 on 9/20/19. */ public class WordCountMain { public static void main(String[] args) throws Exception{ Job job = Job.getInstance(new Configuration()); job.setJarByClass(WordCountMain.class); job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }
欢迎加入QQ群一起学习和交流,只为学习和交流:275259334
或者直接扫码加入:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。