赞
踩
输出结果文件个数:
默认情况下
不管map阶段有多少个并发执行task,到reduce阶段,所有的结果都将有一个task来处理;
并且最终结果输出到一个文件中,part-r-00000。
改变ReduceTask个数
在MapReduce中,通过Job提供的方法,可以修改reducetask的个数。
默认情况下不设置,reducetask个数为1。
设置job.setNumReduceTasks(6),结果为6个类似于part-r-0000x的文件。
输出结果文件个数和ReduceTask个数关系
通过修改不同reducetask个数值,得出输出结果文件的个数和reducetask个数是一种对等关系。
也就是说有几个reducetask,最终程序就输出几个文件。
当MapReduce中有多个reducetask执行的时候,此时maptask的输出就会面临一个问题:究竟将自己的输出数据交给哪一个reducetask来处理?这就是所谓的数据分区(partition)问题。
Partition概念:
注意:
概述:数据归约是指在尽可能保持数据原貌的前提下,最大限度地精简数据量
MapReduce弊端:
Combiner优化
Combiner中文叫做数据规约,是MapReduce的一种优化手段。
Combiner的作用就是对map端的输出先做一次局部合并,以减少在map和reduce节点之间的数据传输量。
Combiner是MapReduce程序中除了Mapper和Reducer之外的一种组件,默认情况下不启用。
Combiner本质就是Reducer,combiner和reducer的区别在于运行的位置:
combiner是在每一个maptask所在的节点本地运行,是局部聚合;
reducer是对所有maptask的输出结果计算,是全局聚合。
具体实现步骤:
Combiner使用注意事项:
Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟reducer的输入kv类型要对应起来。
下述场景禁止使用Combiner,因为这样不仅优化了网络传输数据量,还改变了最终的执行结果
- 业务和数据个数相关的。
- 业务和整体排序相关的。
Combiner组件不是禁用,而是慎用。用的好提高程序性能,用不好,改变程序结果且不易发现。
技巧:
key的重要性体现:
在MapReduce编程中,核心是牢牢把握住每个阶段的输入输出key是什么。
因为MapReduce中很多默认行为都跟key相关。
排序:key的字典序a-z 正序
分区:key.hashcode % reducetask 个数
分组:key相同的分为一组
最重要的是,如果觉得默认的行为不满足业务需求,MapReduce还支持自定义排序、分区、分组的规则,这将使得编程更加灵活和方便。
概述:
逻辑规划:
MapTask并行度的决定机制叫做逻辑规划。
客户端提交job之前会对待处理数据进行逻辑切片,形成逻辑规划文件。
逻辑规划机制由FileInputFormat类的getSplits方法完成。
逻辑规划结果写入规划文件(job.split),在客户端提交Job之前,把规划文件提交到任务准备区,供后续使用。
每个逻辑切片最终对应启动一个maptask。
逐个遍历待处理目录下的文件,以切片大小形成规划。
默认情况下,splitsize=block size,而默认情况下,block size=128M。
#比如待处理数据有两个文件:
file1.txt 320M
file2.txt 10M
#经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split1—0M~128M
file1.txt.split2—128M~256M
file1.txt.split3—256M~320M
file2.txt.split1—0M~10M
逻辑切片相关参数:
minsize:默认值1 (mapreduce.input.fileinputformat.split.minsize)
maxsize:默认值Long.MAXValue (mapreduce.input.fileinputformat.split.maxsize)
blocksize 默认值128M
因此,默认情况下,splitsize=block size=128M
注意,当bytesRemaining/splitSize > 1.1不满足的话,那么最后所有剩余的会作为一个切片。
概述:
Reducetask并行度同样影响整个job的执行并发度和执行效率,与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置。
job.setNumReduceTasks(N);
注意Reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个Reducetask。
如果数据分布不均匀,有可能在reduce阶段产生数据倾斜。
Step1:
Key表示每行首字符偏移值,value表示这一行文本内容。
RecordReader读取一行数据,这里调用一次map方法进程处理。
Step2:
map处理完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。
MapReduce提供Partitioner接口,它的作用就是决定当前的这对输出数据最终应该交由哪个reduce task处理。
默认对key hash后再以reduce task数量取模。如果用户自己对Partitioner有需求,可以订制并设置到job上。
分区计算之后,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。
key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。
Step3:
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。
如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。
Step4:
每次溢写会在磁盘上生成一个临时文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。
当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。
环形缓存区:
环形缓冲区就是内存中的一块区域,底层是字节数组。缓冲区是有大小限制,默认是100MB。
当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。
这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
Step1:Copy阶段
Step2: Merge阶段
Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。
merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
默认key的字典序排序。a-z
Step3:
对排序后的键值对调用reduce方法,键相等的键值对组成一组,调用一次reduce方法。所谓分组就是纯粹的前后两个key比较,如果相等就继续判断下一个是否和当前相等。
reduce处理的结果会调用默认输出组件TextOutputFormat写入到指定的目录文件中。TextOutputFormat默认是一次输出写一行,key value之间以制表符\t分割。
概述:
Map端Shuffle
Reducer端shuffle
shuffle机制弊端
Shuffle是MapReduce程序的核心与精髓,是MapReduce的灵魂所在。
Shuffle也是MapReduce被诟病最多的地方所在。MapReduce相比较于Spark、Flink计算引擎慢的原因,跟
Shuffle机制有很大的关系。
Shuffle中频繁涉及到数据在内存、磁盘之间的多次往复。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。