当前位置:   article > 正文

MapReduce组件总结_mapreduce.input.fileinputformat.split.minsize

mapreduce.input.fileinputformat.split.minsize

MapReduce概述

1 名词解释

Job(作业) :  一个MR程序称为一个Job。

MRAppMaster(MR任务的主节点): 一个Job在运行时,会先启动一个进程,这个进程为 MRAppMaster。它负责Job中执行状态的监控,容错,和RM申请资源,提交Task等!

Task(任务):  Task是一个进程!负责某项计算!

Map(Map阶段):

Map是MapReduce程序运行的第一个阶段!Map阶段的目的是将输入的数据,进行切分。将一个大数据,切分为若干小部分!
切分后,每个部分称为1片(split),每片数据会交给一个Task(进程)进行计算!Task负责Map阶段程序的计算,称为MapTask! 

在一个MR程序的Map阶段,会启动N(取决于切片数)个MapTask。每个MapTask是并行运行!

Reduce(Reduce阶段):

Reduce是MapReduce程序运行的第二个阶段(最后一个阶段)!
Reduce阶段的目的是将Map阶段,每个MapTask计算后的结果进行合并汇总!得到最终结果!
Reduce阶段是可选的,即可以不要。
负责Reduce阶段程序的计算的Task称为ReduceTask。 一个Job可以通过设置,启动N个ReduceTask,这些ReduceTask也是并行运行!且每个ReduceTask最终都会产生一个结果!

2 MapReduce中常用的组件

  • Mapper:   map阶段核心的处理逻辑
  • Reducer:   reduce阶段核心的处理逻辑
  • InputFormat(输入格式):

MR程序必须指定一个输入目录,一个输出目录!InputFormat代表输入目录中文件的格式!
如果是普通文件,可以使用FileInputFormat.
如果是SequeceFile(hadoop提供的一种文件格式),可以使用SequnceFileInputFormat.
如果处理的数据在数据库中,需要使用DBInputFormat

  • RecordReader:  记录读取器,RecordReader负责从输入格式中,读取数据,读取后封装为一组记录(k-v)!
  • OutPutFormat: 输出格式

OutPutFormat代表MR处理后的结果,要以什么样的文件格式写出!
将结果写出到一个普通文件中,可以使用FileOutputFormat!
将结果写出到数据库中,可以使用DBOutPutFormat!
将结果写出到SequeceFile中,可以使用SequnceFileOutputFormat

  • RecordWriter: 记录写出器

RecordWriter将处理的结果以什么样的格式,写出到输出文件中!

  • Partitioner: 分区器

在Mapper将数据写出时,为每组key-value打上标记,进行分区!目的: 一个ReduceTask只会处理一个分区的数据!

3 MapReduce执行大致流程

  1. InputFormat调用RecordReader,从输入目录的文件中,读取一组数据,封装为keyin-valuein对象
  2. 将封装好的key-value,交给Mapper.map()------>将处理的结果写出 keyout-valueout
  3. ReduceTask启动Reducer,使用Reducer.reduce()处理Mapper写出的keyout-valueout,
  4. OutPutFormat调用RecordWriter,将Reducer处理后的keyout-valueout写出到文件

3.1 举例说明MapReduce大致流程

需求: 统计某目录中每个文件的单词数量,a-p开头的单词放入到一个结果文件中,q-z开头的单词放入到一个结果文件中。

总结

Map阶段(MapTask):  切片(Split)-----读取数据(Read)-------交给Mapper处理(Map)------分区和排序(sort)
Reduce阶段(ReduceTask):  拷贝数据(copy)------排序(sort)-----合并(reduce)-----写出(write)

4 切片

由于默认的InputFormat接口使用的是TextInputFormat类,所以我们只需要查看该类的getSplits方法,该方法位于TextInputFormat的父类FileInputFormat中。

  1. public List<InputSplit> getSplits(JobContext job) throws IOException {
  2. StopWatch sw = (new StopWatch()).start();
  3. // minSize从mapreduce.input.fileinputformat.split.minsize和1之间对比,取最大值
  4. long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
  5. // 读取mapreduce.input.fileinputformat.split.maxsize,如果没有设置使用Long.MaxValue作为默认值
  6. long maxSize = getMaxSplitSize(job);
  7. // 开始切片
  8. List<InputSplit> splits = new ArrayList();
  9. // 获取当前job输入目录中所有文件的状态(元数据)
  10. List<FileStatus> files = this.listStatus(job);
  11. boolean ignoreDirs = !getInputDirRecursive(job) && job.getConfiguration().getBoolean("mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs", false);
  12. Iterator var10 = files.iterator();
  13. while(true) {
  14. while(true) {
  15. while(true) {
  16. FileStatus file;
  17. // 如果输入目录中的文件已全部切片,没有可执行文件了,就结束循环
  18. do {
  19. if (!var10.hasNext()) {
  20. job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());
  21. sw.stop();
  22. return splits;
  23. }
  24. file = (FileStatus)var10.next();
  25. } while(ignoreDirs && file.isDirectory());
  26. // 获取文件路径,开始切片逻辑
  27. Path path = file.getPath();
  28. long length = file.getLen();
  29. if (length != 0L) {
  30. // 获取文件的块信息
  31. BlockLocation[] blkLocations;
  32. if (file instanceof LocatedFileStatus) {
  33. blkLocations = ((LocatedFileStatus)file).getBlockLocations();
  34. } else {
  35. FileSystem fs = path.getFileSystem(job.getConfiguration());
  36. blkLocations = fs.getFileBlockLocations(file, 0L, length);
  37. }
  38. // 判断指定文件是否可切,如果可切,就进行切片
  39. if (this.isSplitable(job, path)) {
  40. long blockSize = file.getBlockSize();
  41. // 计算片大小
  42. long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
  43. // 声明待切部分数据的剩余大小
  44. long bytesRemaining;
  45. int blkIndex;
  46. // 如果 待切部分 / 片大小 > 1.1,先切去一片,再判断
  47. for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
  48. // 获取开始切片的offset是哪一个block
  49. blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
  50. // 执行一次切片,并放入切片集合中
  51. splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
  52. }
  53. // 将剩余不能继续切的部分,作为一个片
  54. if (bytesRemaining != 0L) {
  55. blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
  56. splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
  57. }
  58. } else {
  59. // 文件不可切,整个文件作为1片!
  60. splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
  61. }
  62. } else {
  63. // 文件是个空文件,创建一个切片对象,这个切片从当前文件的 0 offset起,向后读取0个字节
  64. splits.add(this.makeSplit(path, 0L, length, new String[0]));
  65. }
  66. }
  67. }
  68. }
  69. }

 总结:

①获取当前输入目录中所有的文件

以文件为单位切片,如果文件为空文件,默认创建一个空的切片

③如果文件不为空,尝试判断文件是否可切(不是压缩文件,都可切)

④如果文件不可切,整个文件作为1片

⑤如果文件可切,先获取片大小(默认等于块大小),循环判断  待切部分/ 片大小 > 1.1倍,如果大于先切去一片,再判断…

⑥剩余部分整个作为1片

4.1 TextInputFormat判断文件是否可切

  1. protected boolean isSplitable(JobContext context, Path file) {
  2. // 根据文件的后缀名获取文件使用的相关的压缩格式
  3. CompressionCodec codec = (new CompressionCodecFactory(context.getConfiguration())).getCodec(file);
  4. // 如果文件不是一个压缩类型的文件,默认都可以切片,否则判断是否是一个可以切片的压缩格式,默认只有Bzip2压缩格式可切片(SplittableCompressionCodec只有一个BZip2Codec子类)
  5. return null == codec ? true : codec instanceof SplittableCompressionCodec;
  6. }

5 片大小的计算

  1. /**
  2. blockSize: 块大小
  3. minSize: minSize从mapreduce.input.fileinputformat.split.minsize和1之间对比,取最大值
  4. maxSize: 读取mapreduce.input.fileinputformat.split.maxsize,如果没有设置使用Long.MaxValue作为默认值
  5. */
  6. protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
  7. return Math.max(minSize, Math.min(maxSize, blockSize));
  8. }

可看出默认的片大小就是文件的块大小。文件的块大小默认为128M,所以默认每片就是128M!

调节片大小 > 块大小:配置 mapreduce.input.fileinputformat.split.minsize > 128M

调节片大小 < 块大小:配置 mapreduce.input.fileinputformat.split.maxsize < 128M

理论上来说:如果文件的数据量是一定的话,片越大,切片数量少,启动的MapTask少,Map阶段运算慢!片越小,切片数量多,启动的MapTask多,Map阶段运算快!

6 片和块的关系

片(InputSplit):在计算MR程序时,才会切片。片在运行程序时,临时将文件从逻辑上划分为若干部分!使用的输入格式不同,切片的方式不同,切片的数量也不同!每片的数据最终也是以块的形式存储在HDFS

块(Block): 在向HDFS写文件时,文件中的内容以块为单位存储!块是实际的物理存在!

MapTask在读取切片的内容时,需要根据切片的metainfo,获取到当前切片属于文件的哪部分! 再根据此信息去寻找对应的块,读取数据!

建议: 片大小最好等于块大小!将片大小设置和块大小一致,可以最大限度减少因为切片带来的磁盘IO和网络IO!。

原因: MR计算框架速度慢的原因在于在执行MR时,会发生频繁的磁盘IO和网络IO!

可以看出当切片大小不等于块大小时,MapTask2和MapTask3需要从其他DataNode上去拷贝所需数据至自己机器上参与计算,这无疑增加了网络IO

7 InputFormat

7.1 TextInputFormat

TextInputFormat是默认的InputFormat。每条记录是一行输入。键是LongWritable类型,存储该行在整个文件中的起始字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符)。

例如文本内容:

解析出来的map:

7.2 KeyValueTextInputFormat

每一行均为一条记录,被分隔符分割为key,value。可以通过在设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");来设定分隔符。默认分隔符是tab(\t)。

例如文本内容,以——>进行分割:

解析出来的map:

此时的键是每行排在制表符之前的Text序列。

7.3 NLineInputFormat

如果使用NlineInputFormat,代表每个map进程处理的InputSplit不再按Block块去划分,而是按NlineInputFormat指定的行数N来划分。即切片数 = 输入文件的总行数/N,如果不整除,切片数=商+1。

举例文本内容:

假设N = 2, 则每个输入分片包含两行。开启2个MapTask。

mapper1:

mapper2:

 

这里的键和值与TextInputFormat生成的一样。

7.4 CombineTextInputFormat

框架默认的TextInputformat切片机制是对任务按文件规划切片,不管文件多小都会是一个单独的切,都会交给一个MapTask,这样如果有大量小文件,就产生大量的MapTask,处理效率极其低下。

应用场景

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。

RecoedReader :  LineRecordReader(将一行封装为一个key-value) key(LongWritable): 行的偏移量, value(Text):  行的内容                    

虚拟存储切片最大值设置

如下,可以根据实际的小文件大小情况来设置具体的值:

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

也可以直接设置参数:

mapreduce.input.fileinputformat.split.maxsize

切片机制

流程:

一. 以文件为单位,划分part

①将输入目录下所有文件按照文件名称字典顺序一次读入,记录文件大小

②将每个文件划分为若干part

③判断: 文件的待切部分的大小 <=  maxSize,整个待切部分作为1part

④maxsize < 文件的待切部分的大小 <= 2* maxSize,将整个待切部分均分为2part

⑤文件的待切部分的大小 > 2* maxSize,先切去maxSize大小,作为1部分,剩余待切部分继续判断!

举例:  maxSize=2048

二. 将之前切分的若干part进行累加,累加后一旦累加的大小超过 maxSize,这些作为1片!如上图所示

8 MapTask

  1. 指定带有源数据,需要处理的文件
  2. 根据参数配置进行切片规划,并计算处MapTask的数量
  3. 每个MapTask使用默认的TextInputFormat和RecordReader将文本内容读取为key,value,并将key,value交给Mapper
  4. Mapper将解析出的key/value交给用户编写map()函数处理,并产生一系列所需的key/value
  5. 当数据处理完成后,一般会调用context.write()写出数据,而此方法中其实调用的是OutputCollector.collect()方法,该方法使用OutputBuffer将数据写入缓冲区,在写入缓冲区前会调用一次Partitioner为该条数据计算出一个分区号(决定了将来是由哪个ReduceTask处理这条数据)
  6. 每调用一次write方法就会调用收集线程将数据写入缓冲区,写入缓冲区时同时记录了下列信息:① index(数据写入缓冲区时的顺序) ② partition(分区号)    ③ keystart(key的偏移量) ④ valuestart(value的偏移量) ⑤ key      ⑥value
  7. 缓冲区中的数据达到缓冲区大小的80%,即溢写阈值时(缓冲区默认大小为100M,此处阈值为80M)
    ①将这些数据对key进行一次快速排序(排序时只排索引而不移动数据本身的位置,必要时对数据进行合并、压缩等操作)
    按照分区编号由小到大依次将每个分区中的数据写入到一个名为spillx.out(x为溢写次数)的临时文件(每次溢写均会写出到一个单独的临时文件
    1. // 源码在MapTask类中的sortAndSpill方法
    2. // this.partitions表示ReduceTask的数量(可见init方法中对该参数的赋值)
    3. for(int i = 0; i < this.partitions; ++i) {
    4. // 按照分区将数据一条一条写出到指定溢写文件
    5. writer.append(key, value);
    6. }
    ③ 最终效果:临时文件中的数据是按照分区编号由小到大,并且每个分区的数据都是按照key有序的存储
  8. 所有数据全部溢写完后(最后一次溢写如果没有达到溢写阈值会执行flush操作),对所有溢写文件进行merge操作(将多个临时文件合并成一个文件),merge时将所有临时文件同一个分区的数据进行汇总,汇总后再排序(归并排序,因为每个临时文件内的数据其实都是有序的,那么将已有序的子序列合并,得到完全有序的序列,归并排序效率最高),最后合并成一个文件,最终效果这个文件的每个分区的数据都是有序的

说明:

  1. 收集线程和溢写线程互不干扰,但是当收集线程发现达到溢写阈值时就会唤醒溢写线程,同样的溢写线程写完后也会唤醒收集线程继续收集
  2. 系统执行排序的过程(即将Mapper输出作为输入传给Reducer)称为Shuffle

8.1 分区

怎么决定要多少个分区?是否需要自定义分区?

1. 分区的数量决定决定了ReduceTask的数量,所以间接决定了要生成文件的个数。简而言之,如果你最终需要将一个文件的内容按照某种规则分成N个文件,那么分区的数量就是N

2. 如果没有手动设定分区数量,那么默认采用Hash分区器,即根据key值按照hash算法算出一个分区号,分区号相同的被分到一个区(算法如下)。由此可见,默认的分区器我们是不可控制哪些数量被分到一个分区。所以如果我们有明确需求:需要将具有相同特征的数据分到一组处理,那么我们就需要使用自定义分区

  1. public int getPartition(K2 key, V2 value, int numReduceTasks) {
  2. return (key.hashCode() & 2147483647) % numReduceTasks;
  3. }

案例:

给定一个文件,该文件中每一行都有一个随机的手机号,此时我们需要将手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

  1. 实现自定义分区器
    1. /*
    2. * KEY, VALUE: Mapper输出的Key-value类型
    3. * FlowBean:自定义Mapper输出类型
    4. */
    5. public class MyPartitioner extends Partitioner<Text, FlowBean>{
    6. // 计算分区 numPartitions为总的分区数,reduceTask的数量
    7. // 分区号必须为int型的值,且必须符合 0<= partitionNum < numPartitions
    8. @Override
    9. public int getPartition(Text key, FlowBean value, int numPartitions) {
    10. String suffix = key.toString().substring(0, 3);
    11. int partitionNum=0;
    12. switch (suffix) {
    13. case "136":
    14. partitionNum=numPartitions-1;
    15. break;
    16. case "137":
    17. partitionNum=numPartitions-2;
    18. break;
    19. case "138":
    20. partitionNum=numPartitions-3;
    21. break;
    22. case "139":
    23. partitionNum=numPartitions-4;
    24. break;
    25. default:
    26. break;
    27. }
    28. return partitionNum;
    29. }
    30. }
  2. 主类中设置RedueTask的数量和自定义分区器
    1. public class FlowBeanDriver{
    2. public static void main(String[] args) {
    3. //作为整个Job的配置
    4. Configuration conf = new Configuration();
    5. // ①创建Job
    6. Job job = Job.getInstance(conf);
    7. // TODO 省略一些配置
    8. // 设置ReduceTask的数量为5
    9. job.setNumReduceTasks(5);
    10. // 设置使用自定义的分区器
    11. job.setPartitionerClass(MyPartitioner.class);
    12. }
    13. }

说明:自定义分区器返回的分区号并不能随意指定:0 <= 分区号 < numPartitions( 指定的ReduceTask的数量)

8.2 比较器

在MapTask流程中有排序的步骤,排序是根据key来排序的,那么按照什么进行排序呢?这就需要使用比较器来定义。

  1. public RawComparator getOutputKeyComparator() {
  2. // 获取自定义的比较器
  3. Class<? extends RawComparator> theClass = this.getClass("mapreduce.job.output.key.comparator.class", (Class)null, RawComparator.class);
  4. return (RawComparator)(theClass != null ? (RawComparator)ReflectionUtils.newInstance(theClass, this) : WritableComparator.get(this.getMapOutputKeyClass().asSubclass(WritableComparable.class), this));
  5. }

从上述代码可以看出

  • 如果有自定义的比较器,那么就根据自定义的比较器类直接实例化比较器对象
  • 如果没有自定义,那么就根据Mapper的output的key的类型来获取。假设Map阶段你输出的key的类型是LongWritable,那么就获取LongWritable中定义的Comparator对象,像通用的Writable子类,如IntWritable,Text等都是实现了WritableComparable接口,且都是有自己的内部类Comparator,具体可查看源码,默认升序排列,如果想要降序,那么就需要自定义比较器

那么如何自定义比较器,有以下几种方式:

  • 自定义类继承WritableComparator类,并实现compare方法,最后设置job参数
  1. public class MyDescComparator extends WritableComparator{
  2. // 实现Long的倒序排序
  3. // 如果key是LongWritable类型,降序也可以直接使用LongWritable中的DecreasingComparator
  4. @Override
  5. public int compare(byte[] b1, int s1, int l1,
  6. byte[] b2, int s2, int l2) {
  7. long thisValue = readLong(b1, s1);
  8. long thatValue = readLong(b2, s2);
  9. return (thisValue<thatValue ? 1 : (thisValue==thatValue ? 0 : -1));
  10. }
  11. }
  1. // 创建Job
  2. Job job = Job.getInstance(new Configuration());
  3. // 设置使用自定义的比较器
  4. job.setSortComparatorClass(DecreasingComparator.class);
  5. // 运行Job
  6. job.waitForCompletion(true);
  • 自定义比较器类实现RawComparator接口,实现其中的两个compare方法,最后设置job参数
  1. public class MyRawComparator implements RawComparator<FlowBean>{
  2. private FlowBean key1=new FlowBean();
  3. private FlowBean key2=new FlowBean();
  4. private DataInputBuffer buffer=new DataInputBuffer();
  5. // 负责从缓冲区中解析出要比较的两个key对象,调用 compare(Object o1, Object o2)对两个key进行对比
  6. @Override
  7. public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  8. try {
  9. buffer.reset(b1, s1, l1); // parse key1
  10. key1.readFields(buffer);
  11. buffer.reset(b2, s2, l2); // parse key2
  12. key2.readFields(buffer);
  13. buffer.reset(null, 0, 0); // clean up reference
  14. } catch (IOException e) {
  15. throw new RuntimeException(e);
  16. }
  17. return compare(key1, key2);
  18. }
  19. // Comparable的compare(),实现最终的比较
  20. @Override
  21. public int compare(FlowBean o1, FlowBean o2) {
  22. return -o1.getSumFlow().compareTo(o2.getSumFlow());
  23. }
  24. }
  1. // 创建Job
  2. Job job = Job.getInstance(new Configuration());
  3. // 设置使用自定义的比较器
  4. job.setSortComparatorClass(DecreasingComparator.class);
  5. // 运行Job
  6. job.waitForCompletion(true);
  • 当你自定义了key的类型时,那么可以直接让该bean类实现WritableComparable接口,并重写compareTo 方法。这样MapTask在比较时会自动调用compareTo方法,无需在job中进行设置比较器。
  1. @Data
  2. public class FlowBean implements WritableComparable<FlowBean>{
  3. private long upFlow;
  4. private long downFlow;
  5. private Long sumFlow;
  6. // 。。。省略其他代码
  7. // 系统封装的比较器在对比key时,调用key的compareTo进行比较
  8. @Override
  9. public int compareTo(FlowBean o) {
  10. return -this.sumFlow.compareTo(o.getSumFlow());
  11. }
  12. }

8.3 Combiner

Combiner实际上本质是一个Reducer类,且只有在设置了之后,才会运行!

Combiner和Reducer的区别

  1. Reducer是在reduce阶段调用,而Combiner是在shuffle阶段(既可以在MapTask的shuffle,也可以在ReduceTask的shuffle)调用!

  2. 本质都是Reducer类,作用都是对有相同key的key-value进行合并!

Combiner意义

在shuffle阶段对相同key的key-value进行提前合并,从而可以减少磁盘IO和网络IO!

使用条件

使用Combiner必须保证不能影响处理逻辑和结果!经验证,Combiner只能用在加、减操作的场景,不能用在乘、除操作的场景,用在乘、除操作的场景会导致最终计算的结果与不使用Combiner时不一致

调用时机

Combiner既有可能在MapTask端调用:

每次溢写前会调用Combiner对溢写的数据进行局部合并

②在merge(多个溢写出的临时文件合并为一个文件)时,如果溢写的片段数>=3,(即溢写出的临时文件>=3)如果设置了Combiner,Combiner会再次对数据进行Combine!

Combiner也有可能在ReduceTask端调用:

③shuffle线程拷贝多个MapTask同一分区的数据,拷贝后执行merge和sort,

如果数据量过大,需要将部分数据先合并排序后,溢写到磁盘!

如果设置了Combiner,Combiner会再次运行!

9 ReduceTask

ReduceTask的数量取决于分区的数量,因为每个ReduceTask只会处理所有MapTask输出的相同分区的数据,具体流程如下

  1.  ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
  2. 用户编写reduce()函数,输入数据(key,value)是按key进行聚集的一组数据。而为了将key相同的数据聚在一组,Hadoop采用了基于排序的策略,即先针对key进行一次归并排序(各个MapTask已经实现对自己的处理结果进行了局部排序),再使用GroupingComparator实现类将相同的key分到一组。
  3. reduce()函数一次读取一组数据,并使用OutputFormat将计算结果输出到磁盘或写到HDFS上

9.1 分组比较器

直接使用一个案例来解释:

有一个文件内容如下所示,orderid:订单id,pid:商品id,account: 商品金额

需求:统计同一笔订单中,金额最大的商品记录输出

分析得出: 在同一笔订单中,对每条记录的金额进行降序排序,最大的排前边,那么此时就需要进行二次排序,即先按照订单号进行排序(升降都可以)再按照金额进行降序排序。而为了将同一个订单的记录分到一组,就需要使用分组比较器。而MapReduce只能针对key进行排序,所以我们又需要封装一个bean作为key,该bean中至少包含orderid和account两个字段

MapTask阶段:

OrderBean

  1. @Data
  2. public class OrderBean implements WritableComparable<OrderBean>{
  3. private String orderId;
  4. private String pId;
  5. private Double acount;
  6. // 序列化
  7. @Override
  8. public void write(DataOutput out) throws IOException {
  9. out.writeUTF(orderId);
  10. out.writeUTF(pId);
  11. out.writeDouble(acount);
  12. }
  13. // 反序列化
  14. @Override
  15. public void readFields(DataInput in) throws IOException {
  16. orderId=in.readUTF();
  17. pId=in.readUTF();
  18. acount=in.readDouble();
  19. }
  20. // 二次排序,先按照orderid排序(升降序都可以),再按照acount(降序)排序
  21. @Override
  22. public int compareTo(OrderBean o) {
  23. //先按照orderid排序升序排序
  24. int result=this.orderId.compareTo(o.getOrderId());
  25. if (result==0) {
  26. //再按照acount(降序)排序,降序前面加个负号即可
  27. result=-this.acount.compareTo(o.getAcount());
  28. }
  29. return result;
  30. }
  31. }

OrderMapper

  1. /*
  2. * LongWritable, Text: 默认使用TextInputFormat,所以LongWritable表示行号,Text表示那一行的内容
  3. */
  4. public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
  5. private OrderBean out_key=new OrderBean();
  6. private NullWritable out_value=NullWritable.get();
  7. @Override
  8. protected void map(LongWritable key, Text value,
  9. Context context)
  10. throws IOException, InterruptedException {
  11. String[] words = value.toString().split("\t");
  12. out_key.setOrderId(words[0]);
  13. out_key.setPId(words[1]);
  14. out_key.setAcount(Double.parseDouble(words[2]));
  15. context.write(out_key, out_value);
  16. }
  17. }

经过MapTask的shuffle阶段后,传入Reduce的数据应是如下所示,即先按orderid升序排序,再按照金额降序排序:

ReduceTask阶段
在此阶段会获取分组比较器,如果没设置默认使用MapTask排序时key的比较器!而在这个案例中默认的比较器比较策略不符合要求,因为它认为orderId一样且acount一样的记录才是一组。那我们就需要自定义分组比较器,只按照orderId进行对比,只要OrderId一样,认为key相等,这样可以将orderId相同的分到一个组!


实现自定义GroupingComparator

实现自定义的分组比较器和之前实现自定义比较器一样,都是两种方式,继承WritableCompartor 或 实现RawComparator

  1. //1. 继承WritableCompartor
  2. public class MyGroupingComparator2 extends WritableComparator{
  3. // 必须调用一次父类构造器,且s第三个参数要传true,否则会报空指针异常,第二个conf参数不传的话会默认构造
  4. public MyGroupingComparator2() {
  5. super(OrderBean.class,null,true);
  6. }
  7. public int compare(WritableComparable a, WritableComparable b) {
  8. OrderBean o1=(OrderBean) a;
  9. OrderBean o2=(OrderBean) b;
  10. return o1.getOrderId().compareTo(o2.getOrderId());
  11. }
  12. }
  13. // 或2. 实现RawComparator
  14. public class MyGroupingComparator implements RawComparator<OrderBean>{
  15. private OrderBean key1=new OrderBean();
  16. private OrderBean key2=new OrderBean();
  17. private DataInputBuffer buffer=new DataInputBuffer();
  18. @Override
  19. public int compare(OrderBean o1, OrderBean o2) {
  20. return o1.getOrderId().compareTo(o2.getOrderId());
  21. }
  22. @Override
  23. public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  24. try {
  25. buffer.reset(b1, s1, l1); // parse key1
  26. key1.readFields(buffer);
  27. buffer.reset(b2, s2, l2); // parse key2
  28. key2.readFields(buffer);
  29. buffer.reset(null, 0, 0); // clean up reference
  30. } catch (IOException e) {
  31. throw new RuntimeException(e);
  32. }
  33. return compare(key1, key2);
  34. }
  35. }

Reducer

  1. public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
  2. @Override
  3. protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  4. // 由于从mapper传入reducer的数据已经按照商品价格降序排了序
  5. // 所以下面这行代码是获取最大的商品价格
  6. Double maxAcount = key.getAcount();
  7. // 这里遍历values有一个注意点:
  8. // 每次调用迭代器迭代下个记录时,使用反序列化器从文件中或内存中读取下一个key-value数据的值,key-value对象始终是同一个对象,即指针不变,只是在循环过程中会重新给key-value赋值。
  9. // 按照此案例,这一组数据如下所示,value为NULLWritable
  10. // 10000001 Pdt_02 222.8
  11. // 10000001 Pdt_01 222.8
  12. // 10000001 Pdt_05 25.8
  13. // 那么第一次遍历,key值为10000001 Pdt_02 222.8,第二次遍历key变为10000001 Pdt_01 222.8,第三次变为10000001 Pdt_05 25.8
  14. for (NullWritable nullWritable : values) {
  15. // 由于每次遍历,key会被重新赋值,所以只需要和获取的maxAcount进行比较,相同就写出。
  16. if (!key.getAcount().equals(maxAcount)) {
  17. break;
  18. }
  19. //复合条件的记录
  20. context.write(key, nullWritable);
  21. }
  22. }
  23. }

启动类

  1. public class OrderBeanDriver {
  2. public static void main(String[] args) throws Exception {
  3. //作为整个Job的配置
  4. Configuration conf = new Configuration();
  5. // ①创建Job
  6. Job job = Job.getInstance(conf);
  7. // ②设置Job
  8. // 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
  9. job.setMapperClass(OrderMapper.class);
  10. job.setReducerClass(OrderReducer.class);
  11. // Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
  12. // 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
  13. job.setOutputKeyClass(OrderBean.class);
  14. job.setOutputValueClass(NullWritable.class);
  15. // 设置输入目录和输出目录
  16. FileInputFormat.setInputPaths(job, new Path("E:\\mrinput\\groupcomparator"));
  17. FileOutputFormat.setOutputPath(job, new Path("e:/mroutput/groupcomparator"));
  18. // 设置自定义的分组比较器
  19. job.setGroupingComparatorClass(MyGroupingComparator2.class);
  20. // ③运行Job
  21. job.waitForCompletion(true);
  22. }
  23. }

10 OutputFormat

OutputFormat是一个MapReduce输出数据的基类

10.1 文本输出TextOutputFormat

  默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。

10.2 SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续 MapReduce任务的输入

10.3 自定义OutputFormat

步骤:

  1. 自定义一个类继承FileOutputFormat
  2. 改写RecordWriter,具体改写输出数据的方法write()。

案例说明:

需求:过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log,log日志内容如下:

Mapper

  1. /*
  2. * 1.什么时候需要Reduce
  3. * ①合并
  4. * ②需要对数据排序
  5. * 2. 没有Reduce阶段,key-value不需要实现序列化
  6. */
  7. public class CustomOFMapper extends Mapper<LongWritable, Text, String, NullWritable>{
  8. @Override
  9. protected void map(LongWritable key, Text value, Context context)
  10. throws IOException, InterruptedException {
  11. String content = value.toString();
  12. context.write(content+"\r\n", NullWritable.get());
  13. }
  14. }

OutputFormat

  1. public class MyOutPutFormat extends FileOutputFormat<String, NullWritable>{
  2. @Override
  3. public RecordWriter<String, NullWritable> getRecordWriter(TaskAttemptContext job)
  4. throws IOException, InterruptedException {
  5. return new MyRecordWriter(job);
  6. }
  7. }

RecordWriter

  1. public class MyRecordWriter extends RecordWriter<String, NullWritable> {
  2. private Path atguiguPath=new Path("e:/atguigu.log");
  3. private Path otherPath=new Path("e:/other.log");
  4. private FSDataOutputStream atguguOS ;
  5. private FSDataOutputStream otherOS ;
  6. private FileSystem fs;
  7. private TaskAttemptContext context;
  8. public MyRecordWriter(TaskAttemptContext job) throws IOException {
  9. context=job;
  10. Configuration conf = job.getConfiguration();
  11. fs=FileSystem.get(conf);
  12. atguguOS = fs.create(atguiguPath);
  13. otherOS = fs.create(otherPath);
  14. }
  15. // 负责将key-value写出到文件
  16. @Override
  17. public void write(String key, NullWritable value) throws IOException, InterruptedException {
  18. if (key.contains("atguigu")) {
  19. atguguOS.write(key.getBytes());
  20. context.getCounter("MyCounter", "atguiguCounter").increment(1);
  21. // 统计输出的含有atguigu 的key-value个数
  22. }else {
  23. otherOS.write(key.getBytes());
  24. context.getCounter("MyCounter", "otherCounter").increment(1);
  25. }
  26. }
  27. // 关闭操作
  28. @Override
  29. public void close(TaskAttemptContext context) throws IOException, InterruptedException {
  30. if (atguguOS != null) {
  31. IOUtils.closeStream(atguguOS);
  32. }
  33. if (otherOS != null) {
  34. IOUtils.closeStream(otherOS);
  35. }
  36. if (fs != null) {
  37. fs.close();
  38. }
  39. }
  40. }

启动类

  1. Job job = Job.getInstance(new Configuration());
  2. job.setMapperClass(CustomOFMapper.class);
  3. // 设置输入和输出格式
  4. job.setOutputFormatClass(MyOutPutFormat.class);
  5. // 取消reduce阶段
  6. job.setNumReduceTasks(0);
  7. // ...

11 计数器

计数器可以记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量

计数器API

1. 采用枚举的方式统计计数

context.getCounter(枚举).increment(1);

2.采用计数器组、计数器名称的方式统计,组名和计数器名称随便起,但最好有意义

context.getCounter("counterGroup", "counter").increment(1);

案例可查看自定义OutputFormat中的RecordWriter

12 Reduce Join

Reduce Join类似sql中的Join,将两个表的数据通过某个字段关联起来,返回两个表的数据。

假设有order.txt文件,字段有orderId,pid,amount,pd.txt文件:pid,pname,输出:orderId,pid,amount,pname

可见我们只需要将AB文件的pid进行关联即可。那么实现如下:

1. 在Map阶段,封装数据。 自定义的Bean需要能够封装两个切片中的所有的数据。而两种不同的数据,经过同一个Mapper的map()处理,因此需要在map()中,判断切片数据的来源,且将来源存入Bean中,以便在reduce阶段根据来源执行不同的封装策略

2. 相同pid的数据,需要分到同一个区,即以pid为条件分区,pid相同的分到一个区
3. 在reduce输出时,只需要将来自于order.txt中的数据,将pid替换为pname,而不需要输出所有的key-value(在Map阶段对数据打标记,标记哪些key-value属于order.txt,哪些属于pd.txt)

13 MapJoin

ReduceJoin需要在Reduce阶段实现Join功能,一旦数据量过大,效率低,因为需要分区和排序才能进行合并,那么可以使用MapJoin解决ReduceJoin低效的问题!即每个MapTask在map()中完成Join!

做法:

只需要将要Join的数据order.txt作为切片,让MapTask读取pd.txt(不以切片形式读入),而直接在MapTask中使用HDFS下载此文件,下载后,使用输入流手动读取其中的数据!在map()之前通常是将大文件以切片形式读取,小文件手动读取!

JoinBean

  1. public class JoinBean implements Writable{
  2. private String orderId;
  3. private String pid;
  4. private String pname;
  5. private String amount;
  6. // TODO 。。。
  7. }
MapJoinMapper
  1. /*
  2. * 1. 在Hadoop中,hadoop为MR提供了分布式缓存(job.addCacheFile(new URI("file://e:/cache/xx.txt"));)
  3. * 用来缓存一些Job运行期间的需要的文件(普通文件,jar,归档文件(har))
  4. * 分布式缓存会假设当前的文件已经上传到了HDFS,并且在集群的任意一台机器都可以访问到这个URI所代表的文件
  5. * 分布式缓存会在每个节点的task运行之前,提前将文件发送到节点
  6. * 分布式缓存的高效是由于每个Job只会复制一次文件,且可以自动在从节点对归档文件解归档
  7. */
  8. public class MapJoinMapper extends Mapper<LongWritable, Text, JoinBean, NullWritable>{
  9. private JoinBean out_key=new JoinBean();
  10. private Map<String, String> pdDatas=new HashMap<String, String>();
  11. //在map之前手动读取pd.txt中的内容
  12. @Override
  13. protected void setup(Context context)
  14. throws IOException, InterruptedException {
  15. //从分布式缓存中读取数据
  16. URI[] files = context.getCacheFiles();
  17. for (URI uri : files) {
  18. BufferedReader reader = new BufferedReader(new FileReader(new File(uri)));
  19. String line="";
  20. //循环读取pd.txt中的每一行
  21. while(StringUtils.isNotBlank(line=reader.readLine())) {
  22. String[] words = line.split("\t");
  23. pdDatas.put(words[0], words[1]);
  24. }
  25. reader.close();
  26. }
  27. }
  28. //对切片中order.txt的数据进行join,输出
  29. @Override
  30. protected void map(LongWritable key, Text value, Context context)
  31. throws IOException, InterruptedException {
  32. String[] words = value.toString().split("\t");
  33. out_key.setOrderId(words[0]);
  34. out_key.setPname(pdDatas.get(words[1]));
  35. out_key.setAmount(words[2]);
  36. context.write(out_key, NullWritable.get());
  37. }
  38. }

启动类

  1. public class MapJoinDriver {
  2. public static void main(String[] args) throws Exception {
  3. // ①创建Job
  4. Job job = Job.getInstance(new Configuration());
  5. job.setJarByClass(MapJoinDriver.class);
  6. // 为Job创建一个名字
  7. job.setJobName("xxx");
  8. // ②设置Job
  9. // 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
  10. job.setMapperClass(MapJoinMapper.class);
  11. // 设置输入目录和输出目录
  12. FileInputFormat.setInputPaths(job, inputPath);
  13. FileOutputFormat.setOutputPath(job, outputPath);
  14. // 设置分布式缓存
  15. job.addCacheFile(new URI("file:///e:/pd.txt"));
  16. //取消reduce阶段
  17. job.setNumReduceTasks(0);
  18. // ③运行Job
  19. job.waitForCompletion(true);
  20. }
  21. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/777851
推荐阅读
相关标签
  

闽ICP备14008679号