当前位置:   article > 正文

MapReduce基础编程(概念篇)_如果在reduce阶段需要将结果都输出到同一个文件里,那么需要设置reduce任务数,下列

如果在reduce阶段需要将结果都输出到同一个文件里,那么需要设置reduce任务数,下列

1 MapReduce Partition、Combiner

1.1 MapReducePartition分区

输出结果文件个数

  • 默认情况下

    不管map阶段有多少个并发执行task,到reduce阶段,所有的结果都将有一个task来处理;

    并且最终结果输出到一个文件中,part-r-00000。

  • 改变ReduceTask个数

    在MapReduce中,通过Job提供的方法,可以修改reducetask的个数。

    默认情况下不设置,reducetask个数为1。

    设置job.setNumReduceTasks(6),结果为6个类似于part-r-0000x的文件。

  • 输出结果文件个数和ReduceTask个数关系

    通过修改不同reducetask个数值,得出输出结果文件的个数和reducetask个数是一种对等关系。

    也就是说有几个reducetask,最终程序就输出几个文件。

当MapReduce中有多个reducetask执行的时候,此时maptask的输出就会面临一个问题:究竟将自己的输出数据交给哪一个reducetask来处理?这就是所谓的数据分区(partition)问题。

Partition概念

  • 默认情况下,MapReduce是只有一个reducetask来进行数据的处理。这就使得不管输入的数据量多大,最终的结果都是输出到一个文件中。
  • 当改变reducetask个数的时候,作为maptask就会涉及到分区的问题,即:MapTask输出的结果如何分配给各个ReduceTask来处理。
  • MapReduce默认分区规则是HashPartitioner。
  • 分区的结果和map输出的key有关。

注意

  • reducetask个数的改变导致了数据分区的产生,而不是有数据分区导致了reducetask个数改变。
  • 数据分区的核心是分区规则。即如何分配数据给各个reducetask。
  • 默认的规则可以保证只要map阶段输出的key一样,数据就一定可以分区到同一个reducetask,但是不能保证数据平均分区。
  • reducetask个数的改变还会导致输出结果文件不再是一个整体,而是输出到多个文件中。
1.2 MapReduceCombiner规约

概述:数据归约是指在尽可能保持数据原貌的前提下,最大限度地精简数据量

MapReduce弊端

  • MapReduce是一种具有两个执行阶段的分布式计算程序,Map阶段和Reduce阶段之间会涉及到跨网络数据传递。
  • 每一个MapTask都可能会产生大量的本地输出,这就导致跨网络传输数据量变大,网络IO性能低。
  • 比如WordCount单词统计案例,假如文件中有1000个单词,其中999个为hello,这将产生999个<hello,1>的键值对在网络中传递,性能及其低下。

Combiner优化

  • Combiner中文叫做数据规约,是MapReduce的一种优化手段。

  • Combiner的作用就是对map端的输出先做一次局部合并,以减少在map和reduce节点之间的数据传输量。

  • Combiner是MapReduce程序中除了Mapper和Reducer之外的一种组件,默认情况下不启用。

  • Combiner本质就是Reducer,combiner和reducer的区别在于运行的位置:

    combiner是在每一个maptask所在的节点本地运行,是局部聚合;

    reducer是对所有maptask的输出结果计算,是全局聚合。

具体实现步骤:

  • 自定义一个CustomCombiner类,继承Reducer,重写reduce方法
  • job.setCombinerClass(CustomCombiner.class)

Combiner使用注意事项

  • Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟reducer的输入kv类型要对应起来。

  • 下述场景禁止使用Combiner,因为这样不仅优化了网络传输数据量,还改变了最终的执行结果

    - 业务和数据个数相关的。

    - 业务和整体排序相关的。

  • Combiner组件不是禁用,而是慎用。用的好提高程序性能,用不好,改变程序结果且不易发现。

2 MapReduce编程指南

技巧

  • MapReduce执行流程了然于心,能够知道数据在MapReduce中流转过程。
  • 业务需求解读准确,即需要明白做什么。
  • 牢牢把握住key的选择,因为MapReduce很多行为跟key相关,比如:排序、分区、分组。
  • 学会自定义组件修改默认行为,当默认的行为不满足业务需求,可以尝试自定义规则。
  • 通过画图梳理业务执行流程,确定每个阶段的数据类型。

key的重要性体现

  • 在MapReduce编程中,核心是牢牢把握住每个阶段的输入输出key是什么。

  • 因为MapReduce中很多默认行为都跟key相关。

    排序:key的字典序a-z 正序

    分区:key.hashcode % reducetask 个数

    分组:key相同的分为一组

  • 最重要的是,如果觉得默认的行为不满足业务需求,MapReduce还支持自定义排序、分区、分组的规则,这将使得编程更加灵活和方便。

3 MapReduce并行度机制

3.1 MapTask并行度机制

概述

  • MapTask的并行度指的是map阶段有多少个并行的task共同处理任务。
  • map阶段的任务处理并行度势必影响到整个job的处理速度。
  • 那么MapTask并行实例是否越多越好呢?其并行度又是如何决定呢?

逻辑规划

  • MapTask并行度的决定机制叫做逻辑规划。

  • 客户端提交job之前会对待处理数据进行逻辑切片,形成逻辑规划文件。

  • 逻辑规划机制由FileInputFormat类的getSplits方法完成。

  • 逻辑规划结果写入规划文件(job.split),在客户端提交Job之前,把规划文件提交到任务准备区,供后续使用。

  • 每个逻辑切片最终对应启动一个maptask。

  • 逐个遍历待处理目录下的文件,以切片大小形成规划。

  • 默认情况下,splitsize=block size,而默认情况下,block size=128M。

#比如待处理数据有两个文件:
file1.txt 320M
file2.txt 10M

#经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split1—0M~128M
file1.txt.split2—128M~256M
file1.txt.split3—256M~320M
file2.txt.split1—0M~10M
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

逻辑切片相关参数

  • 在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize));

minsize:默认值1 (mapreduce.input.fileinputformat.split.minsize)

maxsize:默认值Long.MAXValue (mapreduce.input.fileinputformat.split.maxsize)

blocksize 默认值128M

  • 因此,默认情况下,splitsize=block size=128M

  • 注意,当bytesRemaining/splitSize > 1.1不满足的话,那么最后所有剩余的会作为一个切片。

3.2 ReduceTask并行度机制

概述

  • Reducetask并行度同样影响整个job的执行并发度和执行效率,与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置。

    job.setNumReduceTasks(N);

  • 注意Reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个Reducetask。

  • 如果数据分布不均匀,有可能在reduce阶段产生数据倾斜

4 MapReduce工作流程详解

4.1 MapTask工作详解
  • 输入文件被逻辑切分为多个split文件,通过LineRecordReader按行读取内容给map(用户自己实现的)进行处理;
  • 数据被map处理结束之后交给OutputCollector收集器,对其结果key进行分区(HashPartitioner),然后写入内存缓冲区,当缓冲区快满的时候(80%)需要将缓冲区的数据以一个临时文件的方式spill溢出到磁盘;
  • 最后再对磁盘上产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。

在这里插入图片描述

Step1

  • 读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。
  • 逻辑规划后,由RecordReader对象(默认LineRecordReader)进行读取,以回车换行作为分隔符,读取一行数据,返回<key,value>。

​ Key表示每行首字符偏移值,value表示这一行文本内容。

  • 读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。

​ RecordReader读取一行数据,这里调用一次map方法进程处理。

Step2

  • map处理完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。

    MapReduce提供Partitioner接口,它的作用就是决定当前的这对输出数据最终应该交由哪个reduce task处理。

    默认对key hash后再以reduce task数量取模。如果用户自己对Partitioner有需求,可以订制并设置到job上。

  • 分区计算之后,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。

​ key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组

Step3

  • 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。

    如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。

Step4

  • 每次溢写会在磁盘上生成一个临时文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。

    当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。

环形缓存区

  • 环形缓冲区就是内存中的一块区域,底层是字节数组。缓冲区是有大小限制,默认是100MB。

  • 当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。

  • 这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。

4.2 ReduceTask工作详解
  • Reduce大致分为copysortreduce三个阶段,重点在前两个阶段。
  • copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。
  • 待数据copy完成之后,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段。
  • 完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。

在这里插入图片描述

Step1:Copy阶段

  • 简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。

Step2: Merge阶段

  • Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。

    merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。

  • 把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。

    默认key的字典序排序。a-z

Step3:

  • 对排序后的键值对调用reduce方法,键相等的键值对组成一组,调用一次reduce方法。所谓分组就是纯粹的前后两个key比较,如果相等就继续判断下一个是否和当前相等。

  • reduce处理的结果会调用默认输出组件TextOutputFormat写入到指定的目录文件中。TextOutputFormat默认是一次输出写一行,key value之间以制表符\t分割。

5 Shuffle机制

概述

  • Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。
  • 而在MapReduce中,Shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便reduce端接收处理。
  • 一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle。

Map端Shuffle

在这里插入图片描述

  • Collect阶段:将MapTask的结果收集输出到默认大小为100M的环形缓冲区,保存之前会对key进行分区的计算,默认Hash分区。
  • Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序。
  • Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。

Reducer端shuffle

在这里插入图片描述

  • Copy阶段: ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据。
  • Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
  • Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,
    ReduceTask只需保证Copy的数据的最终整体有效性即可。

shuffle机制弊端

  • Shuffle是MapReduce程序的核心与精髓,是MapReduce的灵魂所在。

  • Shuffle也是MapReduce被诟病最多的地方所在。MapReduce相比较于Spark、Flink计算引擎慢的原因,跟

    Shuffle机制有很大的关系。

  • Shuffle中频繁涉及到数据在内存、磁盘之间的多次往复。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号