赞
踩
所有实例都是在本地环境下测试的,无需启动集群!
版本说明:
idea:2021.2.2
jdk:1.8
maven:3.8.2(用idea自带的也行)
运行大致步骤:
工作机制简述: 输入的数据通过 split 被逻辑切分为多个 split 文件,通过 Record 按行读取内容给 map(用户实现)进行处理,数据被 map 处理结束后交给 OutputCollector 收集器,对其结果 key 进行分区(hash),然后写入 buffer,每个 maptask 都有一个内存缓冲区,存储着 map 的输出结果,当缓冲区快满时需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个 maptask 结束后,再对磁盘中 maptask 产生的所有临时文件做合并,生成最终的正式输出文件,然后等待 reducetask 获取该数据。
详解见下文…
工作机制简述: Reduce 分为 copy、sort、reduce 三个阶段。ReduceTask 从各个 MapTask 上远程 copy 一片数据,进行一次归并排序,最后由 reduce(用户实现)将数据写到 HDFS 上。
详解见下文…
数据输入:输入文件的格式包括:日志文件、二进制文件、数据库表等,具体的文件类型,就需要对应的 InputFormat 来读取数据。
常用的 InputFormat 的实现类:
FileInputFormat 常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、
NLineInputFormat、CombineTextInputFormat 和自定义 InputFormat 等。
数据块: 是 HDFS 存储的数据单位,实际上是 HDFS 将文件分成一块一块(Block)
数据切片: 是 MapReduce 程序计算输入数据的单位,一个切片会启动一个对应的 MapTask(数据切片只是逻辑上对输入的数据进行分片)
一个 Job 的 Map 阶段的并行度由切片数决定,多少个切片就分配多少 MapTask,默认切片大小 = BlockSize
MapReduce 程序启动时,会使用 InputFormat 计算任务的分片,分片的个数与 MapReduce 任务启动的 MapTask 的线程数(并发度)对应。
通常使用两种分片机制: FileInputFormat、 CombineTextInputFormat
源码中获取切片的方法为 org.apache.hadoop.mapreduce.lib.input.FileInputFormat#getSplits
为啥分片是用这个方法呢?
源码类的开头可以看到: This provides a generic implementation of getSplits(JobContext).
通过打断点,我们发现,程序运行时,也确实进入了该方法
/** * 生成文件列表并将它们划分为FileSplits(列表中有多少 InputSpilt 就启动多少 MapTask) * @param job the job context * @throws IOException */ public List<InputSplit> getSplits(JobContext job) throws IOException { // 启动线程,用于监控分片是否结束,实则在开始运行时就的计时 StopWatch sw = new StopWatch().start(); // getFormatMinSplitSize() = 1 // getMinSplitSize(job) ==> job.getConfiguration().getLong(SPLIT_MINSIZE, default: 1L) ==> SPLIT_MINSIZE // SPLIT_MINSIZE 静态成员变量 默认值为 0 // minSize ==> SPLIT_MINSIZE = 1 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // getMaxSplitSize(job) ==> context.getConfiguration().getLong(SPLIT_MAXSIZE, default: Long.MAX_VALUE) ==> Long.MAX_VALUE // Long.MAX_VALUE = 0x7fffffffffffffffL = 9223372036854775807L // getMinSplitSize(job) ==> Long.MAX_VALUE = 9223372036854775807L long maxSize = getMaxSplitSize(job); // 实例化集合用于存放分片信息,getSplits方法就是返回该集合 ============================ // InputSplit 是一个抽象类,我们通常使用 FileSplit 实现类 // FileSplit 的成员属性包括 : // Path file 文件路径 // long start 当前分片的开始位置 // long length 当前分片的长度 // String[] hosts 文件(Block)的主机列表 // SplitLocationInfo[] hostInfos 文件在主机上存储路径 List<InputSplit> splits = new ArrayList<InputSplit>(); // 通过 job 对象获取文件的状态对象(初学 HDFS JavaAPI 时,FileSystem 通过 Path 获取过 FileStatus) List<FileStatus> files = listStatus(job); // 通过遍历获取每一个文件的状态信息 for (FileStatus file : files) { Path path = file.getPath(); // 获取文件路径 long length = file.getLen(); // 获取文件长度(大小) if (length != 0) { // 若文件非空,创建一个存储当前块地址信息的数组 BlockLocation[] blkLocations; // 判断文件是否属于本地文件状态 if (file instanceof LocatedFileStatus) { // 若是本地文件 ================================================== // 将 file 向上强转为 LocatedFileStatus,并获取文件的快位置 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { // 若是集群文件 ============================================== // 获取拥有该 Path 的文件系统 FileSystem fs = path.getFileSystem(job.getConfiguration()); // 通过 fs 获取一个数组 // 其中包含:给定文件的 hosts(主机名)、cachedHosts(副本主机名)、names(主机IP) // 、topologyPaths()、offset(偏移量)、length(大小)、corrupt(是否损坏) blkLocations = fs.getFileBlockLocations(file, 0, length); } // 判断文件是否支持分片,默认为 true ============================================ if (isSplitable(job, path)) { // 若支持分片 ============================================================== // 获取 HDFS 的块大小 // blockSize ==> dfs.blockSize = 134217728B = 128MB long blockSize = file.getBlockSize(); // 计算分片大小 ============================================================ // computeSplitSize ==> Math.max(minSize, Math.min(maxSize, blockSize)) = 134217728 // Math.min(maxSize, blockSize) ==> Math.min(9223372036854775807L, 134217728) = 134217728 // Math.max(minSize, Math.min(maxSize, blockSize)) ==> Math.max(1, 134217728) = 134217728 long splitSize = computeSplitSize(blockSize, minSize, maxSize); // 声明变量 剩余字节数 = 文件大小 long bytesRemaining = length; // 判断文件是否大于分片的 1.1 倍 ============================================== // SPLIT_SLOP = 1.1 while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { // 若大于 1.1 倍 // 根据当前读取的数据字节开始位置 获取块的序号(索引) // 比如:300MB的文件将分成3片 // 第一片 offset 为 length - bytesRemaining = 300 - 300 ==> 0,也就是第一块 // 第二片 offset 为 length - bytesRemaining = 300 - 128 ==> 1,也就是第二块 // 第三片 offset 为 length - bytesRemaining = 172 - 128 ==> 2,也就是第三块 int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); // 添加到分片列表中 // path 文件位置 // length - bytesRemaining 起始位置(offset) // splitSize 块大小 // blkLocations[blkIndex].getHosts() 当前分片文件的块存储主机列表 // blkLocations[blkIndex].getCachedHosts())) 当前文件块缓存对应的副本主机列表 splits.add(makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); // 移除掉已经产生分片的部分 bytesRemaining -= splitSize; } // 当上面的 while 循环不成立后,判断是否文件中(还)有字节 if (bytesRemaining != 0) { // 若(还)有,则将整个部分作为一个分片 int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // 若不支持分片 =================================================== // 则获取该文件的全部信息,并将该文件作为一个完整分片,添加到分片列表中 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { // 若文件为空,就创建一个空的分片信息,添加到分片列表中 splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits; }
有两个文件:
File1.txt :300MB
File1 将分割为:File1.txt.split1(128MB)、File1.txt.split2(128MB)、File1.txt.split3(44MB)
File2.txt :130MB
130 < 128 * 1.1,所以将文件整体作为一个分片
File2 将分割为:File2.txt.split1(130MB)
案例:对IP进行数量的统计
public class Job_IPCountDriver { public static class Job_IPCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String ip = value.toString().split(" ")[0]; k.set(ip); context.write(k, v); } } public static class Job_IPCountReducer extends Reducer<Text, IntWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (IntWritable value : values) { count += value.get(); } LongWritable v = new LongWritable(count); context.write(key, v); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Job_IPCountDriver.class); job.setMapperClass(Job_IPCountMapper.class); job.setReducerClass(Job_IPCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。