赞
踩
MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。
这两个阶段合起来正是MapReduce思想的体现。
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。
MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:
Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现.Map和Reduce,MapReduce处理的数据类型是键值对。
一个完整的mapreduce程序在分布式运行时有三类实例进程:
MRAppMaster
负责整个程序的过程调度及状态协调MapTask
负责map阶段的整个数据处理流程ReduceTask
负责reduce阶段的整个数据处理流程在Hadoop中,用于执行MapReduce作业的机器角色有两个:JobTracker
和TaskTracker
。JobTracker用于调度作业,TaskTracker用于跟踪任务的执行情况。一个Hadoop集群只有一个JobTracker。
1)Client: 用户编写的MapReduce程序通过Client提交到JobTracker端,用户可通过Client提供的一些接口查看作业运行状态
2)JobTracker: JobTracker负责资源监控和作业调度, JobTracker 监控所有TaskTracker与Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点, JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源
3)TaskTracker:TaskTracker 会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等) TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和Reduce Task 使用
4)Task: Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动
①Read:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个的(key/value)。
②Map:该步骤主要将解析出的(key/value)交给用户编写的Map函数处理,并产生一些列新的(key/value)。
③Collect:在用户编写的Map函数中,数据处理完成后,一般会调用OutputCollector.collect()收集结果。在该函数内部,它将会生成(key/value)分片(通过Partitioner),并写入一个环形缓冲区中。
④Spill:即所谓溢写,指当环形缓冲区填满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并,压缩等操作。
⑤Combine:当所有数据处理完成后,Map Task对所有临时变量进行一次合并,以确保最终只会生成一个数据文件。
①Shuffle:也成为Copy阶段。Reduce Task从各个Map Task上远程复制一片数据,并针对某一片数据进行判断,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
②Merge:在远程复制的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或者磁盘上文件过多。
③Sort:按照MapReduce语义,用户编写的Reduce函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚集在一起,Hadoop采用了基于排序的此策略。由于各个Map Task已经实现了对自己的数据结果进行了局部排序,因此,Reduce Task只需要对所有数据进行一次归并排序即可。
④Reduce:在该阶段中,Reduce Task将每组数据依次交给用户编写的reduce()函数处理
⑤Write:reduce()函数将计算结果写到HDFS。
ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:
// 默认值是1,手动设置为4
job.setNumReduceTasks(4);
HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。
shuffle
在map端和reduce都参与操作,所以可以分为map shuffle
和reduce shuffle
两个过程:
从map的输出,需要经过分区、排序、合并过程输出为一个分区有序的文件。
首先根据数据的key值进行分区(默认是hash分区),然后数据写入一个环形缓冲区中,环形缓冲区的实质是一个字节数组,里面包含两部分数据,分别是数据和索引,索引中记录了每个key-value数据的分区等信息,环形缓冲区为100M,默认达到0.8的内存数据量时开始spill溢出到磁盘中,溢出过程前会对环形缓冲区按照partition和key进行排序操作(一般采用快排),也就是数据分区聚集,分区内按照key升序排列,如果这个时候设置了combiner的话,会按照相同的key进行合并,然后溢写到磁盘的一个文件中,当数据量很大的时候,会有多个这样的溢出小文件,多个小文件会按照分区进行合并,从而得到一个大的按照分区排序的输出文件。这是map shuffle做的事。
总结起来map shuffle需要做的事情有:
①分区partition(分区在进入缓冲区之前进行)
②写入环形内存缓冲区
③执行溢出写
排序sort(快排)—>合并combiner—>生成溢出写文件
④归并merge(归并排序),还可能再调用一次combiner
reduce shuffle
也有两个过程,分别是复制map数据
然后排序合并
。
当map输出文件后,会将map输出和机器位置的映射信息报告给application master,同时reduce也会定期向application master询问,获得所需要复制数据的位置。reduce通过http从map端复制相应的数据到自己的内存缓冲区中,当内存数据量达到一定量的时候,进行merge合并,如果设置了combiner,还会combine操作,因为每个map文件已经是有序的,所以多个文件合并的时候采用的是根据key进行归并排序,这样reduce shuffle就产生了一个整体有序的数据块。
首先看一下MapReduce中的排序的总体流程。
MapReduce框架会确保每一个Reducer的输入都是按Key进行排序的。一般,将排序以及Map的输出传输到Reduce的过程称为混洗(shuffle)。每一个Map都包含一个环形的缓存,默认100M,Map首先将输出写到缓存当中。当缓存的内容达到“阈值”时(阈值默认的大小是缓存的80%),一个后台线程负责将结果写到硬盘,这个过程称为“spill”。Spill过程中,Map仍可以向缓存写入结果,如果缓存已经写满,那么Map进行等待。
**Spill的具体过程如下:**首先,后台线程根据Reducer的个数将输出结果进行分组,每一个分组对应一个Reducer。其次,对于每一个分组后台线程对输出结果的Key进行排序。在排序过程中,如果有Combiner函数,则对排序结果进行Combiner函数进行调用。每一次spill都会在硬盘产生一个spill文件。因此,一个Map task有可能会产生多个spill文件,当Map写出最后一个输出时,会将所有的spill文件进行合并与排序,输出最终的结果文件。在这个过程中Combiner函数仍然会被调用。从整个过程来看,Combiner函数的调用次数是不确定的。
下面重点分析下Shuffle阶段的排序过程:
Shuffle阶段的排序可以理解成两部分,一个是对spill进行分区时,由于一个分区包含多个key值,所以要对分区内的<key,value>按照key进行排序,即key值相同的一串<key,value>存放在一起,这样一个partition内按照key值整体有序了。
第二部分并不是排序,而是进行merge,merge有两次,一次是map端将多个spill 按照分区和分区内的key进行merge,形成一个大的文件。第二次merge是在reduce端,进入同一个reduce的多个map的输出 merge在一起,该merge理解起来有点复杂,最终不是形成一个大文件,而且期间数据在内存和磁盘上都有。所以shuffle阶段的merge并不是严格的排序意义,只是将多个整体有序的文件merge成一个大的文件,由于不同的task执行map的输出会有所不同,所以merge后的结果不是每次都相同,不过还是严格要求按照分区划分,同时每个分区内的具有相同key的<key,value>对挨在一起。
Shuffle排序综述:如果只定义了map函数,没有定义reduce函数,那么输入数据经过shuffle的排序后,结果为key值相同的输出挨在一起,且key值小的一定在前面,这样整体来看key值有序(宏观意义的,不一定是按从大到小,因为如果采用默认的HashPartitioner,则key 的hash值相等的在一个分区,如果key为IntWritable的话,每个分区内的key会排序好的),而每个key对应的value不是有序的。
Shuffle如何获取map输出数据给reduce?
答:map执行完后会将map输出和机器位置的映射关系报告给application master,同时reduce会定期向application master询问,获得所需要的数据位置信息,之后reduce会通过http从map端复制相应的数据到reduce端的内存缓冲区中。
Shuffle缺点:
主要是数据传输IO的问题。由于从环形缓冲区需要溢写多个小文件到磁盘,产生较多的磁盘IO。
combiner函数作用和作用在哪些地方?
主要是实现本地key的聚合。一个map都可能会产生大量的本地输出,这些输出会通过网络到达reducer端,这样会浪费带宽。解决这个问题可以通过Combiner。Combiner的作用就是对map端的输出先做一次合并。
combiner有三处:(1)从环形缓冲区写到磁盘(2)小文件合并为大文件(3)reduce的内存缓冲区溢写到磁盘中。
mapreduce执行速度太慢,优化措施有哪些?:
在 MapReduce 中, 通过我们指定分区, 会将同一个分区的数据发送到同一个 Reduce 当中进行处理
例如: 为了数据的统计, 可以把一批类似的数据发送到同一个 Reduce 当中, 在同一个 Reduce 当中统计相同类型的数据, 就可以实现类似的数据分区和统计等。
其实就是相同类型的数据,有共性的数据, 送到一起去处理。默认的分区只有1个分区
得到map给的记录后,它们该分配给哪些reducer来处理呢?
hadoop采用的默认的派发方式是根据散列值来派发的,但是实际中,这并不能很高效或者按照我们要求的去执行任务。例如,经过partition处理后,一个节点的reducer分配到了20条记录,另一个却分配道了10W万条,试想,这种情况效率如何。又或者,我们想要处理后得到的文件按照一定的规律进行输出,假设有两个reducer,我们想要最终结果中part-00000中存储的是"h"开头的记录的结果,part-00001中存储其他开头的结果,这些默认的partitioner是做不到的。所以需要我们自己定制partition来根据自己的要求,选择记录的reducer。自定义partitioner很简单,只要自定义一个类,并且继承Partitioner类,重写其getPartition方法就好了,在使用的时候通过调用Job setPartitionerClass指定一下即可
Map的结果,会通过partition分发到Reducer上。Mapper的结果,可能送到Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。它只有一个方法,
getPartition(Text key, Text value, int numPartitions)
输入是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)。就是指定Mappr输出的键值对到哪一个reducer上去。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。这样保证如果有相同的key值,肯定被分配到同一个reducre上。如果有N个reducer,编号就为0,1,2,3……(N-1)。
分区出现的必要性,如何使用Hadoop产生一个全局排序的文件?
最简单的方法就是使用一个分区,但是该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构的优势。
事实上我们可以这样做,**首先创建一系列排好序的文件;其次,串联这些文件(类似于归并排序);最后得到一个全局有序的文件。**主要的思路是使用一个partitioner来描述全局排序的输出。比方说我们有1000个1-10000的数据,跑10个ruduce任务, 如果我们运行进行partition的时候,能够将在1-1000中数据的分配到第一个reduce中,1001-2000的数据分配到第二个reduce中,以此类推。即第n个reduce所分配到的数据全部大于第n-1个reduce中的数据。这样,每个reduce出来之后都是有序的了,我们只要cat所有的输出文件,变成一个大的文件,就都是有序的了
基本思路就是这样,但是现在有一个问题,就是数据的区间如何划分,在数据量大,还有我们并不清楚数据分布的情况下。一个比较简单的方法就是采样,假如有一亿的数据,我们可以对数据进行采样,如取10000个数据采样,然后对采样数据分区间。在Hadoop中,patition我们可以用TotalOrderPartitioner替换默认的分区。然后将采样的结果传给他,就可以实现我们想要的分区。在采样时,我们可以使用hadoop的几种采样工具,RandomSampler,InputSampler,IntervalSampler。
这样,我们就可以对利用分布式文件系统进行大数据量的排序了,我们也可以重写Partitioner类中的compare函数,来定义比较的规则,从而可以实现字符串或其他非数字类型的排序,也可以实现二次排序乃至多次排序。
分区的目的是根据Key值决定Mapper的输出记录被送到哪一个Reducer上去处理。而分组就是与记录的Key相关。在同一个分区里面,具有相同Key值的记录是属于同一个分组的。
GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑.
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 比较的业务逻辑
return result;
}
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}
很多MapReduce程序受限于集群上可用的带宽,所以它会尽力最小化需要在map和reduce任务之间传输的中间数据。Hadoop允许用户声明一个combiner function来处理map的输出,同时把自己对map的处理结果作为reduce的输入。因为combiner function本身只是一种优化,hadoop并不保证对于某个map输出,这个方法会被调用多少次。换句话说,不管combiner function被调用多少次,对应的reduce输出结果都应该是一样的。
每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一
job.setCombinerClass(CustomCombiner.class)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。