赞
踩
TopN分析法
是指从研究对象中按照某一个指标进行倒序或正序排列,取其中所需的N个数据,并对这N个数据进行重点分析的方法。
num.txt
,现要求使用MapReduce技
术提取上述文本中最大的5个数据,并最终将结果汇总到一个文件中。MapReduce
分区为1,即ReduceTask
个数一定只有一个。我们需要提取TopN
,即全局的前N条数据,不管中间有几个Map
、Reduce
,最终只能有一个用来汇总数据。Map
阶段,使用TreeMap
数据结构保存TopN
的数据,TreeMap
默认会根据其键的自然顺序进行排序,也可根据创建映射时提供的Comparator
进行排序,其firstKey()
方法用于返回当前集合最小值的键。Reduce
阶段,将Map
阶段输出数据进行汇总,选出其中的TopN
数据,即可满足需求。这里需要注意的是,TreeMap
默认采取正序排列,需求是提取5个最大的数据,因此要重写Comparator
类的排序方法进行倒序排序.启动hadoop服务
创建topn
目录,在里面创建num.txt
文件
创建/topn/input
目录,执行命令:hdfs dfs -mkdir -p /topn/input
将文本文件num.txt
,上传到HDFS
的/topn/input
目录
使用IntelliJ
开发工具创建Maven
项目TopN
,并且新建hsl.aex.mr
包,在该路径下编写自定义Mapper
类TopNMapper
,主要用于将文件中的每行数据进行切割提取,并把数据保存到TreeMap
中,判断TreeMap
是否大于5
,如果大于5就需要移除最小的数据
。TreeMap
保存了当前文件最大5条数据
后,再输出到Reduce
阶段。
创建`hsl.aex.mr`包,在包里创建`TopNScoreMapper`类 package hsl.aex.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.util.TreeMap; public class TopNMapper extends Mapper<LongWritable,Text,NullWritable,IntWritable>{ private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>(); // <0,10 3 8 7 6 5 1 2 9 4> // <xx,11 12 17 14 15 20> @Override public void map(LongWritable key, Text value, Context context) { String line = value.toString(); String[] nums = line.split(" "); for (String num : nums) { repToRecordMap.put(Integer.parseInt(num), " "); if (repToRecordMap.size() > 5) { repToRecordMap.remove(repToRecordMap.firstKey()); } } } @Override protected void cleanup(Context context) { for (Integer i : repToRecordMap.keySet()) { try { context.write(NullWritable.get(), new IntWritable(i)); } catch (Exception e) { e.printStackTrace(); } } } }
根据Map
阶段的输出结果形式,同样在hsl.aex.mr
包下,自定义Reducer
类TopNReducer
,主要用于编写TreeMap
自定义排序规则,当需求取最大值时,只需要在compare()
方法中返回正数即可满足倒序排列,reduce()
方法依然是要满足时刻判断TreeMap
中存放数据是前五个数,并最终遍历输出最大的5个数。
创建前N归并器类
在hsl.aex.mr
包里创建TopNReducer
类
package hsl.aex.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Comparator; import java.util.TreeMap; public class TopNReducer extends Reducer<NullWritable,IntWritable,NullWritable,IntWritable>{ private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>(new Comparator<Integer>() { /** * 谁大排后面 * * @param a * @param b * @return 一个整数 */ public int compare(Integer a, Integer b) { return b - a; } }); public void reduce(NullWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) { repToRecordMap.put(value.get(), " "); if (repToRecordMap.size() > 5) { repToRecordMap.remove(repToRecordMap.firstKey()); } } for (Integer i : repToRecordMap.keySet()) { context.write(NullWritable.get(), new IntWritable(i)); } } }
编写MapReduce
程序运行主类TopNDriver
,主要用于设置MapReduce
工作任务的相关参数,对HDFS
上/topn/input
目录下的源文件求前N数据,并将结果输入到HDFS
的/topn/output
目录下。
创建前N驱动器类
在hsl.aex.mr
包里创建TopNDriver
类
package hsl.aex.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.net.URI; public class TopNDriver { public static void main(String[] args) throws Exception { // 创建配置对象 Configuration conf = new Configuration(); // 设置数据节点主机名属性 conf.set("dfs.client.use.datanode.hostname", "true"); // 获取作业实例 Job job = Job.getInstance(conf); // 设置作业启动类 job.setJarByClass(TopNDriver.class); //这几个 // 设置Mapper类 job.setMapperClass(TopNMapper.class); // 设置map任务输出键类型 job.setMapOutputKeyClass(NullWritable.class);//类型的和Mapper里面的对应起来 // 设置map任务输出值类型 job.setMapOutputValueClass(IntWritable.class); // 设置Reducer类 job.setReducerClass(TopNReducer.class); // 设置reduce任务输出键类型 job.setOutputKeyClass(Text.class); //类型的和Reduce里面的对应起来 // 设置reduce任务输出值类型 job.setOutputValueClass(NullWritable.class); // 定义uri字符串 String uri = "hdfs://master:9000"; // 创建输入目录 Path inputPath = new Path(uri + "/topn/input/num.txt"); // 创建输出目录 Path outputPath = new Path(uri + "/topn/output"); // 获取文件系统 FileSystem fs = FileSystem.get(new URI(uri), conf); // 删除输出目录 fs.delete(outputPath, true); // 给作业添加输入目录 FileInputFormat.addInputPath(job, inputPath); // 给作业设置输出目录 FileOutputFormat.setOutputPath(job, outputPath); // 等待作业完成 job.waitForCompletion(true); // 输出统计结果 System.out.println("======统计结果======"); FileStatus[] fileStatuses = fs.listStatus(outputPath); for (int i = 1; i < fileStatuses.length; i++) { // 输出结果文件路径 System.out.println(fileStatuses[i].getPath()); // 获取文件输入流 FSDataInputStream in = fs.open(fileStatuses[i].getPath()); // 将结果文件显示在控制台 IOUtils.copyBytes(in, System.out, 4096, false); } } }
可以看到排名前五的,
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。