当前位置:   article > 正文

MapReduce案例6——学生成绩增强版_数据解释:第一个字段是课程名称,总共四个课程,computer,math,english,algor

数据解释:第一个字段是课程名称,总共四个课程,computer,math,english,algorithm,

题目及数据:

  1. computer,huangxiaoming,85,86,41,75,93,42,85
  2. computer,xuzheng,54,52,86,91,42
  3. computer,huangbo,85,42,96,38
  4. english,zhaobenshan,54,52,86,91,42,85,75
  5. english,liuyifei,85,41,75,21,85,96,14
  6. algorithm,liuyifei,75,85,62,48,54,96,15
  7. computer,huangjiaju,85,75,86,85,85
  8. english,liuyifei,76,95,86,74,68,74,48
  9. english,huangdatou,48,58,67,86,15,33,85
  10. algorithm,huanglei,76,95,86,74,68,74,48
  11. algorithm,huangjiaju,85,75,86,85,85,74,86
  12. computer,huangdatou,48,58,67,86,15,33,85
  13. english,zhouqi,85,86,41,75,93,42,85,75,55,47,22
  14. english,huangbo,85,42,96,38,55,47,22
  15. algorithm,liutao,85,75,85,99,66
  16. computer,huangzitao,85,86,41,75,93,42,85
  17. math,wangbaoqiang,85,86,41,75,93,42,85
  18. computer,liujialing,85,41,75,21,85,96,14,74,86
  19. computer,liuyifei,75,85,62,48,54,96,15
  20. computer,liutao,85,75,85,99,66,88,75,91
  21. computer,huanglei,76,95,86,74,68,74,48
  22. english,liujialing,75,85,62,48,54,96,15
  23. math,huanglei,76,95,86,74,68,74,48
  24. math,huangjiaju,85,75,86,85,85,74,86
  25. math,liutao,48,58,67,86,15,33,85
  26. english,huanglei,85,75,85,99,66,88,75,91
  27. math,xuzheng,54,52,86,91,42,85,75
  28. math,huangxiaoming,85,75,85,99,66,88,75,91
  29. math,liujialing,85,86,41,75,93,42,85,75
  30. english,huangxiaoming,85,86,41,75,93,42,85
  31. algorithm,huangdatou,48,58,67,86,15,33,85
  32. algorithm,huangzitao,85,86,41,75,93,42,85,75
  33. 一、数据解释
  34. 数据字段个数不固定:
  35. 第一个是课程名称,总共四个课程,computer,math,english,algorithm,
  36. 第二个是学生姓名,后面是每次考试的分数
  37. 二、统计需求:
  38. 1、统计每门课程的参考人数和课程平均分
  39. 2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数
  40. 3、求出每门课程参考学生平均分最高的学生的信息:课程,姓名和平均分

题目解析:1、课程平均分需要在map中先计算每个人的课程平均成绩,然后在reduce中求出整体的平均成绩

  1. /**
  2. * @author: lpj
  3. * @date: 2018年3月16日 下午7:16:47
  4. * @Description:
  5. */
  6. package lpj.reduceWork;
  7. import java.io.IOException;
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.fs.FileSystem;
  10. import org.apache.hadoop.fs.Path;
  11. import org.apache.hadoop.io.IntWritable;
  12. import org.apache.hadoop.io.LongWritable;
  13. import org.apache.hadoop.io.Text;
  14. import org.apache.hadoop.mapreduce.Job;
  15. import org.apache.hadoop.mapreduce.Mapper;
  16. import org.apache.hadoop.mapreduce.Reducer;
  17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  19. /**
  20. *
  21. */
  22. public class StudentScore3MR {
  23. public static void main(String[] args) throws Exception {
  24. Configuration conf = new Configuration();
  25. // conf.addResource("hdfs-site.xml");//使用配置文件
  26. // System.setProperty("HADOOP_USER_NAME", "hadoop");//使用集群
  27. FileSystem fs = FileSystem.get(conf);//默认使用本地
  28. Job job = Job.getInstance(conf);
  29. job.setJarByClass(StudentScore3MR.class);
  30. job.setMapperClass(StudentScore3MR_Mapper.class);
  31. job.setReducerClass(StudentScore3MR_Reducer.class);
  32. job.setMapOutputKeyClass(Text.class);
  33. job.setMapOutputValueClass(Text.class);
  34. job.setOutputKeyClass(Text.class);
  35. job.setOutputValueClass(Text.class);
  36. //
  37. // String inputpath = args[0];
  38. // String outpath = args[1];
  39. Path inputPath = new Path("d:/a/homework6.txt");
  40. Path outputPath = new Path("d:/a/homework6");
  41. if (fs.exists(inputPath)) {
  42. fs.delete(outputPath, true);
  43. }
  44. FileInputFormat.setInputPaths(job, inputPath);
  45. FileOutputFormat.setOutputPath(job, outputPath);
  46. boolean isdone = job.waitForCompletion(true);
  47. System.exit(isdone ? 0 : 1);
  48. }
  49. //1、统计每门课程的参考人数和课程平均分
  50. public static class StudentScore3MR_Mapper extends Mapper<LongWritable, Text, Text, Text>{
  51. Text kout = new Text();
  52. Text valueout = new Text();
  53. @Override
  54. protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
  55. //algorithm,huangzitao,85,86,41,75,93,42,85,75
  56. String [] reads = value.toString().trim().split(",");
  57. String kk = reads[0];
  58. int sum = 0;
  59. int count = 0;
  60. double avg = 0;
  61. for(int i = 2; i < reads.length; i++){
  62. sum += Integer.parseInt(reads[i]);
  63. count++;
  64. }
  65. avg = 1.0 * sum / count;
  66. String vv = avg + "";
  67. kout.set(kk);
  68. valueout.set(vv);
  69. context.write(kout, valueout);
  70. }
  71. }
  72. public static class StudentScore3MR_Reducer extends Reducer<Text, Text, Text, Text>{
  73. Text kout = new Text();
  74. Text valueout = new Text();
  75. @Override
  76. protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
  77. double sum = 0;
  78. int count = 0;
  79. double avg = 0;
  80. for(Text text : values){
  81. sum += Double.parseDouble(text.toString());
  82. count ++;
  83. }
  84. avg = sum / count;
  85. String vv = count + "\t" + avg;
  86. valueout.set(vv);
  87. context.write(key, valueout);
  88. }
  89. }
  90. }

结果:

  1. algorithm 6 71.60119047619047
  2. computer 10 69.79896825396825
  3. english 9 66.22655122655122
  4. math 7 72.88265306122449

2、输出结果存储到不同的结果文件中,需要指定setNumReduceTasks,分区规则通过使用partitioner进行分区设定,平均成绩需要进行排序,可以使用封装对象的方式,通过实现WritableComparable接口进行设置排序规则

实体类定义:

  1. /**
  2. * @author: lpj
  3. * @date: 2018年3月14日 下午9:46:02
  4. * @Description:
  5. */
  6. package lpj.day2.homeworkbean;
  7. import java.io.DataInput;
  8. import java.io.DataOutput;
  9. import java.io.IOException;
  10. import java.text.DecimalFormat;
  11. import java.text.ParseException;
  12. import java.text.SimpleDateFormat;
  13. import org.apache.hadoop.io.WritableComparable;
  14. /**
  15. *
  16. */
  17. public class Student implements WritableComparable<Student>{
  18. private String name;
  19. private double score;
  20. private String course;
  21. public String getName() {
  22. return name;
  23. }
  24. public void setName(String name) {
  25. this.name = name;
  26. }
  27. public double getScore() {
  28. return score;
  29. }
  30. public void setScore(double score) {
  31. this.score = score;
  32. }
  33. public String getCourse() {
  34. return course;
  35. }
  36. public void setCourse(String course) {
  37. this.course = course;
  38. }
  39. @Override
  40. public String toString() {
  41. DecimalFormat fs = new DecimalFormat("#.#");
  42. return course + "\t" +name+ "\t"+ fs.format(score);
  43. }
  44. public Student() {
  45. }
  46. public Student(String name, double score, String course) {
  47. super();
  48. this.name = name;
  49. this.score = score;
  50. this.course = course;
  51. }
  52. @Override
  53. public int compareTo(Student o) {
  54. int diff = this.course.compareTo(o.course);
  55. if (diff == 0) {
  56. return (int)(o.score - this.score);
  57. }else{
  58. return diff > 0 ? 1 : -1;
  59. }
  60. }
  61. /* (non-Javadoc)
  62. * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
  63. */
  64. @Override
  65. public void readFields(DataInput in) throws IOException {
  66. name = in.readUTF();
  67. score = in.readDouble();
  68. course = in.readUTF();
  69. }
  70. /* (non-Javadoc)
  71. * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
  72. */
  73. @Override
  74. public void write(DataOutput out) throws IOException {
  75. out.writeUTF(name);
  76. out.writeDouble(score);
  77. out.writeUTF(course);
  78. }
  79. }

分区器定义:

  1. /**
  2. * @author: lpj
  3. * @date: 2018年3月16日 下午10:13:24
  4. * @Description:
  5. */
  6. package lpj.reduceWorkbean;
  7. import org.apache.hadoop.io.NullWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Partitioner;
  10. /**
  11. *
  12. */
  13. public class MyPatitioner extends Partitioner<Student, NullWritable>{
  14. /* (non-Javadoc)
  15. * @see org.apache.hadoop.mapreduce.Partitioner#getPartition(java.lang.Object, java.lang.Object, int)
  16. */
  17. @Override
  18. public int getPartition(Student key, NullWritable value, int numPartitions) {
  19. if (key.toString().startsWith("math")) {
  20. return 0;
  21. }else if (key.toString().startsWith("english")) {
  22. return 1;
  23. }else if (key.toString().startsWith("computer")) {
  24. return 2;
  25. }else {
  26. return 3;
  27. }
  28. }
  29. }

主体程序:

  1. /**
  2. * @author: lpj
  3. * @date: 2018年3月16日 下午7:16:47
  4. * @Description:
  5. */
  6. package lpj.reduceWork;
  7. import java.io.IOException;
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.fs.FileSystem;
  10. import org.apache.hadoop.fs.Path;
  11. import org.apache.hadoop.io.IntWritable;
  12. import org.apache.hadoop.io.LongWritable;
  13. import org.apache.hadoop.io.NullWritable;
  14. import org.apache.hadoop.io.Text;
  15. import org.apache.hadoop.mapreduce.Job;
  16. import org.apache.hadoop.mapreduce.Mapper;
  17. import org.apache.hadoop.mapreduce.Reducer;
  18. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  20. import lpj.reduceWorkbean.MyPatitioner;
  21. import lpj.reduceWorkbean.Student;
  22. /**
  23. *
  24. */
  25. public class StudentScore3_2MR2 {
  26. public static void main(String[] args) throws Exception {
  27. Configuration conf = new Configuration();
  28. // conf.addResource("hdfs-site.xml");//使用配置文件
  29. // System.setProperty("HADOOP_USER_NAME", "hadoop");//使用集群
  30. FileSystem fs = FileSystem.get(conf);//默认使用本地
  31. Job job = Job.getInstance(conf);
  32. job.setJarByClass(StudentScore3_2MR2.class);
  33. job.setMapperClass(StudentScore3MR_Mapper.class);
  34. job.setReducerClass(StudentScore3MR_Reducer.class);
  35. job.setMapOutputKeyClass(Student.class);
  36. job.setMapOutputValueClass(NullWritable.class);
  37. job.setOutputKeyClass(Student.class);
  38. job.setOutputValueClass(NullWritable.class);
  39. job.setPartitionerClass(MyPatitioner.class);//设置分区器
  40. job.setNumReduceTasks(4);//设置任务数目
  41. //
  42. // String inputpath = args[0];
  43. // String outpath = args[1];
  44. Path inputPath = new Path("d:/a/homework6.txt");
  45. Path outputPath = new Path("d:/a/homework6_2");
  46. if (fs.exists(inputPath)) {
  47. fs.delete(outputPath, true);
  48. }
  49. FileInputFormat.setInputPaths(job, inputPath);
  50. FileOutputFormat.setOutputPath(job, outputPath);
  51. boolean isdone = job.waitForCompletion(true);
  52. System.exit(isdone ? 0 : 1);
  53. }
  54. //2统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数
  55. public static class StudentScore3MR_Mapper extends Mapper<LongWritable, Text, Student, NullWritable>{
  56. Text kout = new Text();
  57. Text valueout = new Text();
  58. Student stu = new Student();
  59. @Override
  60. protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
  61. //algorithm,huangzitao,85,86,41,75,93,42,85,75
  62. String [] reads = value.toString().trim().split(",");
  63. String kk = reads[0];
  64. int sum = 0;
  65. int count = 0;
  66. double avg = 0;
  67. for(int i = 2; i < reads.length; i++){
  68. sum += Integer.parseInt(reads[i]);
  69. count++;
  70. }
  71. avg = 1.0 * sum / count;
  72. stu.setCourse(kk);
  73. stu.setName(reads[1]);
  74. stu.setScore(avg);
  75. context.write(stu, NullWritable.get());
  76. }
  77. }
  78. public static class StudentScore3MR_Reducer extends Reducer< Student, NullWritable, Student, NullWritable>{
  79. Text kout = new Text();
  80. Text valueout = new Text();
  81. @Override
  82. protected void reduce(Student key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
  83. context.write(key, NullWritable.get());
  84. }
  85. }
  86. }






3、题目涉及排序以及分组,分组使用WritableComparator,进行分组字段设置。其中需要注意的是分组字段与排序字段的关系:分组字段一定是排序字段中的前几个

举例:排序规则:a,b,c,d,e。那么分组规则就只能是以下情况中的任意一种:

a   /    a,b    /  a,b,c    / a,b,c,d    /    a,b,c,d,e    不能跳跃

排序字段一定大于等于分组字段,并且包含分组字段

使用分组组件进行:

实体类如题2

分组类代码:

  1. /**
  2. * @author: lpj
  3. * @date: 2018年3月16日 下午10:36:55
  4. * @Description:
  5. */
  6. package lpj.reduceWorkbean;
  7. import org.apache.hadoop.io.WritableComparable;
  8. import org.apache.hadoop.io.WritableComparator;
  9. /**
  10. *
  11. */
  12. public class MyGroup extends WritableComparator{
  13. public MyGroup() {
  14. super(Student.class,true);//创建对象
  15. }
  16. @Override
  17. public int compare(WritableComparable a, WritableComparable b) {
  18. Student s1 = (Student)a;
  19. Student s2 = (Student)b;
  20. return s1.getCourse().compareTo(s2.getCourse());//设置课程分组器
  21. }
  22. }

主体类代码;


  1. /**
  2. * @author: lpj
  3. * @date: 2018年3月16日 下午7:16:47
  4. * @Description:
  5. */
  6. package lpj.reduceWork;
  7. import java.io.IOException;
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.fs.FileSystem;
  10. import org.apache.hadoop.fs.Path;
  11. import org.apache.hadoop.io.IntWritable;
  12. import org.apache.hadoop.io.LongWritable;
  13. import org.apache.hadoop.io.NullWritable;
  14. import org.apache.hadoop.io.Text;
  15. import org.apache.hadoop.mapreduce.Job;
  16. import org.apache.hadoop.mapreduce.Mapper;
  17. import org.apache.hadoop.mapreduce.Reducer;
  18. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  20. import lpj.reduceWorkbean.MyGroup;
  21. import lpj.reduceWorkbean.MyPatitioner;
  22. import lpj.reduceWorkbean.Student;
  23. /**
  24. *
  25. */
  26. public class StudentScore3_3MR3 {
  27. public static void main(String[] args) throws Exception {
  28. Configuration conf = new Configuration();
  29. // conf.addResource("hdfs-site.xml");//使用配置文件
  30. // System.setProperty("HADOOP_USER_NAME", "hadoop");//使用集群
  31. FileSystem fs = FileSystem.get(conf);//默认使用本地
  32. Job job = Job.getInstance(conf);
  33. job.setJarByClass(StudentScore3_3MR3.class);
  34. job.setMapperClass(StudentScore3MR_Mapper.class);
  35. job.setReducerClass(StudentScore3MR_Reducer.class);
  36. job.setMapOutputKeyClass(Student.class);
  37. job.setMapOutputValueClass(NullWritable.class);
  38. job.setOutputKeyClass(Student.class);
  39. job.setOutputValueClass(NullWritable.class);
  40. job.setGroupingComparatorClass(MyGroup.class);//调用分组
  41. Path inputPath = new Path("d:/a/homework6.txt");
  42. Path outputPath = new Path("d:/a/homework6_3");
  43. if (fs.exists(inputPath)) {
  44. fs.delete(outputPath, true);
  45. }
  46. FileInputFormat.setInputPaths(job, inputPath);
  47. FileOutputFormat.setOutputPath(job, outputPath);
  48. boolean isdone = job.waitForCompletion(true);
  49. System.exit(isdone ? 0 : 1);
  50. }
  51. //3求出每门课程参考学生平均分最高的学生的信息:课程,姓名和平均分
  52. public static class StudentScore3MR_Mapper extends Mapper<LongWritable, Text, Student, NullWritable>{
  53. Text kout = new Text();
  54. Text valueout = new Text();
  55. Student stu = new Student();
  56. @Override
  57. protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
  58. //algorithm,huangzitao,85,86,41,75,93,42,85,75
  59. String [] reads = value.toString().trim().split(",");
  60. String kk = reads[0];
  61. int sum = 0;
  62. int count = 0;
  63. double avg = 0;
  64. for(int i = 2; i < reads.length; i++){
  65. sum += Integer.parseInt(reads[i]);
  66. count++;
  67. }
  68. avg = 1.0 * sum / count;
  69. stu.setCourse(kk);
  70. stu.setName(reads[1]);
  71. stu.setScore(avg);
  72. context.write(stu, NullWritable.get());
  73. }
  74. }
  75. public static class StudentScore3MR_Reducer extends Reducer< Student, NullWritable, Student, NullWritable>{
  76. Text kout = new Text();
  77. Text valueout = new Text();
  78. @Override
  79. protected void reduce(Student key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
  80. context.write(key, NullWritable.get());
  81. }
  82. }
  83. }




声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/747943
推荐阅读
相关标签
  

闽ICP备14008679号