当前位置:   article > 正文

MapReduce 运行原理(万字长篇 原理 + 案例)_mapreduce 工作原理及简单应用案例分享 site:csdn.net

mapreduce 工作原理及简单应用案例分享 site:csdn.net

所有实例都是在本地环境下测试的,无需启动集群!

版本说明:

idea:2021.2.2

jdk:1.8

maven:3.8.2(用idea自带的也行)

1. MapReduce 框架原理

image-20211123235213014

运行大致步骤:

MapReduce
FileOutputFormat
ReduceTask
Shuffle
MapTask
FileInputFormat

1.1 MapTask

工作机制简述: 输入的数据通过 split 被逻辑切分为多个 split 文件,通过 Record 按行读取内容给 map(用户实现)进行处理,数据被 map 处理结束后交给 OutputCollector 收集器,对其结果 key 进行分区(hash),然后写入 buffer,每个 maptask 都有一个内存缓冲区,存储着 map 的输出结果,当缓冲区快满时需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个 maptask 结束后,再对磁盘中 maptask 产生的所有临时文件做合并,生成最终的正式输出文件,然后等待 reducetask 获取该数据。

详解见下文…

1.2 ReduceTask

工作机制简述: Reduce 分为 copy、sort、reduce 三个阶段。ReduceTask 从各个 MapTask 上远程 copy 一片数据,进行一次归并排序,最后由 reduce(用户实现)将数据写到 HDFS 上。

详解见下文…

2. 数据输入

数据输入:输入文件的格式包括:日志文件、二进制文件、数据库表等,具体的文件类型,就需要对应的 InputFormat 来读取数据。

常用的 InputFormat 的实现类:

  1. FileInputFormat: 按照文件的内容长度进行切片,切片大小默认等于 Block 大小,切片时不需要考虑数据集整体,只是针对每一个文件单独切片。
  2. TextInputFormat: 是 FileInputFormat 默认的实现类,按照行读取每行记录,键是存储改行在整个文件中的起始字节偏移量(offset),LongWritable 类型;值是该行的内容,Text 类型。
  3. CombineTextInputFormat: 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,及将多个小文件交给一个 MapTask 处理。

FileInputFormat 常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、
NLineInputFormat、CombineTextInputFormat 和自定义 InputFormat 等。

数据块: 是 HDFS 存储的数据单位,实际上是 HDFS 将文件分成一块一块(Block)

数据切片: 是 MapReduce 程序计算输入数据的单位,一个切片会启动一个对应的 MapTask(数据切片只是逻辑上对输入的数据进行分片)

一个 Job 的 Map 阶段的并行度由切片数决定,多少个切片就分配多少 MapTask,默认切片大小 = BlockSize

2.1分片机制

MapReduce 程序启动时,会使用 InputFormat 计算任务的分片,分片的个数与 MapReduce 任务启动的 MapTask 的线程数(并发度)对应。

通常使用两种分片机制: FileInputFormat、 CombineTextInputFormat

2.2 FileInputFormat 分片机制

2.2.1 getSplits 方法

源码中获取切片的方法为 org.apache.hadoop.mapreduce.lib.input.FileInputFormat#getSplits

为啥分片是用这个方法呢?

源码类的开头可以看到: This provides a generic implementation of getSplits(JobContext).

通过打断点,我们发现,程序运行时,也确实进入了该方法

image-20211124093734850

2.2.2 getSplits 源码

/**
 * 生成文件列表并将它们划分为FileSplits(列表中有多少 InputSpilt 就启动多少 MapTask)
 * @param job the job context
 * @throws IOException
 */
public List<InputSplit> getSplits(JobContext job) throws IOException {

    // 启动线程,用于监控分片是否结束,实则在开始运行时就的计时
    StopWatch sw = new StopWatch().start();
    // getFormatMinSplitSize() = 1
    // getMinSplitSize(job) ==> job.getConfiguration().getLong(SPLIT_MINSIZE, default: 1L) ==> SPLIT_MINSIZE
    // SPLIT_MINSIZE 静态成员变量 默认值为 0
    // minSize ==> SPLIT_MINSIZE = 1
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    // getMaxSplitSize(job) ==> context.getConfiguration().getLong(SPLIT_MAXSIZE, default: Long.MAX_VALUE) ==> Long.MAX_VALUE
    // Long.MAX_VALUE = 0x7fffffffffffffffL = 9223372036854775807L
    // getMinSplitSize(job) ==> Long.MAX_VALUE = 9223372036854775807L
    long maxSize = getMaxSplitSize(job);


    // 实例化集合用于存放分片信息,getSplits方法就是返回该集合 ============================
    // InputSplit 是一个抽象类,我们通常使用 FileSplit 实现类
    // FileSplit 的成员属性包括 :
    // Path file  文件路径
    // long start  当前分片的开始位置
    // long length  当前分片的长度
    // String[] hosts  文件(Block)的主机列表
    // SplitLocationInfo[] hostInfos  文件在主机上存储路径
    List<InputSplit> splits = new ArrayList<InputSplit>();
    // 通过 job 对象获取文件的状态对象(初学 HDFS JavaAPI 时,FileSystem 通过 Path 获取过 FileStatus)
    List<FileStatus> files = listStatus(job);
    // 通过遍历获取每一个文件的状态信息
    for (FileStatus file : files) {
        Path path = file.getPath();    // 获取文件路径
        long length = file.getLen();   // 获取文件长度(大小)
        if (length != 0) {
            // 若文件非空,创建一个存储当前块地址信息的数组
            BlockLocation[] blkLocations;
            // 判断文件是否属于本地文件状态
            if (file instanceof LocatedFileStatus) {
                // 若是本地文件 ==================================================
                // 将 file 向上强转为 LocatedFileStatus,并获取文件的快位置
                blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
                // 若是集群文件 ==============================================
                // 获取拥有该 Path 的文件系统
                FileSystem fs = path.getFileSystem(job.getConfiguration());
                // 通过 fs 获取一个数组
                // 其中包含:给定文件的 hosts(主机名)、cachedHosts(副本主机名)、names(主机IP)
                //           、topologyPaths()、offset(偏移量)、length(大小)、corrupt(是否损坏)
                blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            // 判断文件是否支持分片,默认为 true ============================================
            if (isSplitable(job, path)) {
                // 若支持分片 ==============================================================
                // 获取 HDFS 的块大小
                // blockSize ==> dfs.blockSize = 134217728B = 128MB
                long blockSize = file.getBlockSize();
                // 计算分片大小 ============================================================
                // computeSplitSize ==> Math.max(minSize, Math.min(maxSize, blockSize)) = 134217728
                // Math.min(maxSize, blockSize) ==> Math.min(9223372036854775807L, 134217728) = 134217728
                // Math.max(minSize, Math.min(maxSize, blockSize)) ==> Math.max(1, 134217728) = 134217728
                long splitSize = computeSplitSize(blockSize, minSize, maxSize);

                // 声明变量 剩余字节数 = 文件大小
                long bytesRemaining = length;
                // 判断文件是否大于分片的 1.1 倍 ==============================================
                // SPLIT_SLOP = 1.1
                while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                    // 若大于 1.1 倍
                    // 根据当前读取的数据字节开始位置  获取块的序号(索引)
                    // 比如:300MB的文件将分成3片
                    // 第一片 offset 为 length - bytesRemaining = 300 - 300 ==> 0,也就是第一块
                    // 第二片 offset 为 length - bytesRemaining = 300 - 128 ==> 1,也就是第二块
                    // 第三片 offset 为 length - bytesRemaining = 172 - 128 ==> 2,也就是第三块
                    int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
                    // 添加到分片列表中
                    // path  文件位置
                    // length - bytesRemaining  起始位置(offset)
                    // splitSize  块大小
                    // blkLocations[blkIndex].getHosts()  当前分片文件的块存储主机列表
                    // blkLocations[blkIndex].getCachedHosts()))  当前文件块缓存对应的副本主机列表
                    splits.add(makeSplit(path, length - bytesRemaining, splitSize,
                                         blkLocations[blkIndex].getHosts(),
                                         blkLocations[blkIndex].getCachedHosts()));
                    // 移除掉已经产生分片的部分
                    bytesRemaining -= splitSize;
                }

                // 当上面的 while 循环不成立后,判断是否文件中(还)有字节
                if (bytesRemaining != 0) {
                    // 若(还)有,则将整个部分作为一个分片
                    int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
                    splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                                         blkLocations[blkIndex].getHosts(),
                                         blkLocations[blkIndex].getCachedHosts()));
                }
            } else { // 若不支持分片 ===================================================
                // 则获取该文件的全部信息,并将该文件作为一个完整分片,添加到分片列表中
                splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                                     blkLocations[0].getCachedHosts()));
            }
        } else {
            // 若文件为空,就创建一个空的分片信息,添加到分片列表中
            splits.add(makeSplit(path, 0, length, new String[0]));
        }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
        LOG.debug("Total # of splits generated by getSplits: " + splits.size()
                  + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116

2.2.3 getSplits 举例

有两个文件:

File1.txt :300MB

File1 将分割为:File1.txt.split1(128MB)、File1.txt.split2(128MB)、File1.txt.split3(44MB)

File2.txt :130MB

130 < 128 * 1.1,所以将文件整体作为一个分片

File2 将分割为:File2.txt.split1(130MB)

2.2.4 案例实操

案例:对IP进行数量的统计

public class Job_IPCountDriver {

    public static class Job_IPCountMapper
        extends Mapper<LongWritable, Text, Text, IntWritable> {
        Text k = new Text();
        IntWritable v = new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
            String ip = value.toString().split(" ")[0];
            k.set(ip);
            context.write(k, v);
        }
    }

    public static class Job_IPCountReducer
        extends Reducer<Text, IntWritable, Text, LongWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            long count = 0;
            for (IntWritable value : values) {
                count += value.get();
            }
            LongWritable v = new LongWritable(count);
            context.write(key, v);
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(Job_IPCountDriver.class);
        job.setMapperClass(Job_IPCountMapper.class);
        job.setReducerClass(Job_IPCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/240296
推荐阅读
相关标签