当前位置:   article > 正文

Java MapReduce用值进行降序排序_protected void cleanup(reducer

protected void cleanup(reducer.c

题:

编写 MapReduce 程序,实现以下功能:统计学生的成绩按降序排序

原始数据:

1001,张三,56
1002,李四,1
1003,王二,5
1004,赵六,200
1005,麻子,58
1006,王五,89

Mapper代码:

  1. private static class Mapp extends Mapper<LongWritable, Text, Text, DoubleWritable> {
  2. @Override
  3. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
  4. String[] split = value.toString().split(","); // 将Text类型转换为String,分隔符为逗号(,)
  5. double a = Double.parseDouble(split[2]); //split[2] 为数值类型
  6. context.write(new Text(split[1]), new DoubleWritable(a)); // split[1] 为地区
  7. }
  8. }

Reduce代码:

  1. private static class Red extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
  2. private TreeMap<Double, List<String>> sorted;
  3. // 定义一个Tree Map对象 数值为键,姓名为值
  4. // 将mapper传入的键值进行调换,供后续使用键进行排序处理
  5. // setup方法用于在每个reducer实例初始化时被调用
  6. // setup方法用于初始化操作
  7. @Override
  8. protected void setup(Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
  9. sorted = new TreeMap<>(Collections.reverseOrder());
  10. // 创建的Tree Map对象,并指定了一个反向排序的比较器
  11. // 根据值进行降序排序(升序排序时可以不填写方法)
  12. }
  13. @Override
  14. protected void reduce(Text key, Iterable<DoubleWritable> values, Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
  15. // 创建一个空列表用于存储每个姓名的数值,并求出最大值
  16. List<Double> a = new ArrayList<>();
  17. for (DoubleWritable value : values) {
  18. // 遍历数值放入空列表
  19. a.add(value.get());
  20. }
  21. // 取出最大值并赋值给 变量s
  22. double s = Collections.max(a);
  23. // 将键值放入创建好的Tree Map对象中,便于进行排序
  24. if (sorted.containsKey(s)) { // 列表中包含这个键(s 数值)就将key(地区)添加到sorted对应的s列表中
  25. sorted.get(s).add(key.toString());
  26. } else {
  27. List<String> keyl = new ArrayList<>(); // 如果没有包含,就创建一个新的字符串列表(keyl)
  28. keyl.add(key.toString());
  29. sorted.put(s, keyl); // 将key放入列表中(keyl) 然后将该值s对应的键列表keyl放入sorted对象中
  30. }
  31. }
  32. // 清理收尾工作
  33. @Override
  34. protected void cleanup(Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
  35. // 遍历并使用Map函数映射Tree Map对象取出需要的键值对
  36. for (Map.Entry<Double, List<String>> en : sorted.entrySet()) {
  37. Double value = en.getKey(); // 将需要的值取出
  38. List<String> keylist = en.getValue(); // 将Tree Map对象中的字符串列表取出赋值给新的列表
  39. for (String s : keylist) { //遍历新的字符串列表
  40. // 将键值对放入容器中,显示上下文,输出键值对
  41. context.write(new Text(s), new DoubleWritable(value)); // 排序完成输出键值
  42. }
  43. }
  44. }
  45. }

Driver代码:

  1. public static void main(String[] args) throws Exception {
  2. Configuration conf = new Configuration();
  3. Job job = Job.getInstance(conf); // 创建一个job对象
  4. job.setJarByClass(Sorted.class); // 设置Driver驱动类
  5. job.setMapperClass(Mapp.class); // 关联mapper类
  6. job.setReducerClass(Red.class); // 关联Reducer类
  7. job.setMapOutputKeyClass(Text.class); // 设置Mapper输出键类型
  8. job.setMapOutputValueClass(DoubleWritable.class); // 设置Mapper输出值类型
  9. job.setOutputKeyClass(Text.class); // 设置最终输出键类型
  10. job.setOutputValueClass(DoubleWritable.class); // 设置最终输出值类型
  11. // 设置文件出入路径 打包后可设置为jar包输入的文件路径 arge[0]
  12. FileInputFormat.setInputPaths(job, new Path("D:\\data\\eurasia_mainland.csv"));
  13. Path path = new Path("d:\\data\\tmp2"); // 创建一个Path对象,放入文件输出路径
  14. FileSystem file = path.getFileSystem(conf); // 调用conf对象用于存储参数
  15. if (file.exists(path)) { // 判断
  16. file.delete(path, true); // 如果文件存在则先删除文件
  17. }
  18. FileOutputFormat.setOutputPath(job, path); // 指定文件输出路径
  19. System.exit(job.waitForCompletion(true) ? 0 : 1); // 任务结束,提交任务
  20. }

处理后结果:

赵六    200.0
王五    89.0
麻子    58.0
张三    56.0
王二    5.0
李四    1.0
 

完整代码:

  1. package org.example;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.DoubleWritable;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import java.io.IOException;
  14. import java.util.*;
  15. public class Sorted {
  16. private static class Mapp extends Mapper<LongWritable, Text, Text, DoubleWritable> {
  17. @Override
  18. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
  19. String[] split = value.toString().split(","); // 将Text类型转换为String,分隔符为逗号(,)
  20. double a = Double.parseDouble(split[2]); //split[2] 为成绩
  21. context.write(new Text(split[1]), new DoubleWritable(a)); // split[1] 为学生姓名
  22. }
  23. }
  24. private static class Red extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
  25. private TreeMap<Double, List<String>> sorted;
  26. // 定义一个Tree Map对象 数值为键,姓名为值
  27. // 将mapper传入的键值进行调换,供后续使用键进行排序处理
  28. // setup方法用于在每个reducer实例初始化时被调用
  29. // setup方法用于初始化操作
  30. @Override
  31. protected void setup(Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
  32. sorted = new TreeMap<>(Collections.reverseOrder());
  33. // 创建的Tree Map对象,并指定了一个反向排序的比较器
  34. // 根据值进行降序排序(升序排序时可以不填写方法)
  35. }
  36. @Override
  37. protected void reduce(Text key, Iterable<DoubleWritable> values, Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
  38. // 创建一个空列表用于存储每个姓名的数值,并求出最大值
  39. List<Double> a = new ArrayList<>();
  40. for (DoubleWritable value : values) {
  41. // 遍历数值放入空列表
  42. a.add(value.get());
  43. }
  44. // 取出最大值并赋值给 变量s
  45. double s = Collections.max(a);
  46. // 将键值放入创建好的Tree Map对象中,便于进行排序
  47. if (sorted.containsKey(s)) { // 列表中包含这个键(s 数值)就将key(地区)添加到sorted对应的s列表中
  48. sorted.get(s).add(key.toString());
  49. } else {
  50. List<String> keyl = new ArrayList<>(); // 如果没有包含,就创建一个新的字符串列表(keyl)
  51. keyl.add(key.toString());
  52. sorted.put(s, keyl); // 将key放入列表中(keyl) 然后将该值s对应的键列表keyl放入sorted对象中
  53. }
  54. }
  55. // 清理收尾工作
  56. @Override
  57. protected void cleanup(Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
  58. // 遍历并使用Map函数映射Tree Map对象取出需要的键值对
  59. for (Map.Entry<Double, List<String>> en : sorted.entrySet()) {
  60. Double value = en.getKey(); // 将需要的值取出
  61. List<String> keylist = en.getValue(); // 将Tree Map对象中的字符串列表取出赋值给新的列表
  62. for (String s : keylist) { //遍历新的字符串列表
  63. // 将键值对放入容器中,显示上下文,输出键值对
  64. context.write(new Text(s), new DoubleWritable(value)); // 排序完成输出键值
  65. }
  66. }
  67. }
  68. }
  69. public static void main(String[] args) throws Exception {
  70. Configuration conf = new Configuration();
  71. Job job = Job.getInstance(conf); // 创建一个job对象
  72. job.setJarByClass(Sorted.class); // 设置Driver驱动类
  73. job.setMapperClass(Mapp.class); // 关联mapper类
  74. job.setReducerClass(Red.class); // 关联Reducer类
  75. job.setMapOutputKeyClass(Text.class); // 设置Mapper输出键类型
  76. job.setMapOutputValueClass(DoubleWritable.class); // 设置Mapper输出值类型
  77. job.setOutputKeyClass(Text.class); // 设置最终输出键类型
  78. job.setOutputValueClass(DoubleWritable.class); // 设置最终输出值类型
  79. // 设置文件出入路径 打包后可设置为jar包输入的文件路径 arge[0]
  80. FileInputFormat.setInputPaths(job, new Path("D:\\data\\eurasia_mainland.csv"));
  81. Path path = new Path("d:\\data\\tmp2"); // 创建一个Path对象,放入文件输出路径
  82. FileSystem file = path.getFileSystem(conf); // 调用conf对象用于存储参数
  83. if (file.exists(path)) { // 判断
  84. file.delete(path, true); // 如果文件存在则先删除文件
  85. }
  86. FileOutputFormat.setOutputPath(job, path); // 指定文件输出路径
  87. System.exit(job.waitForCompletion(true) ? 0 : 1); // 任务结束,提交任务
  88. }
  89. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/747995
推荐阅读
相关标签
  

闽ICP备14008679号