当前位置:   article > 正文

MapReduce基础回顾题目_分别统计各柜台的销售总额,并对各柜台的销售总额进行降序排序。

分别统计各柜台的销售总额,并对各柜台的销售总额进行降序排序。
week1,A0001,10,20
week1,A0002,8.5,15
week1,A0003,9.2,30
week1,B0001,10.5,50
week2,A0001,11,30
week2,A0002,8,20
week2,A0003,9.2,20
week2,B0001,10,55
week3,A0001,9.5,10
week3,A0002,8.8,30
week3,A0003,9.8,30
week3,B0001,9,58
week4,A0001,9.2,14
week4,A0002,8.5,22
week4,A0003,10.3,45
week4,B0001,7,12

要求:根据以上数据,用 MapReduce 统计出如下数据:
1、每种商品的销售总金额,并降序排序
2、每种商品销售额最多的三周

问题一:分组求和,总体降序排列。
该问题采用两个mapreduce程序进行解决的,第一个求出每种商品的销售总额,第二个进行降序排序。

  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.DoubleWritable;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.NullWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
  14. import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  16. /**
  17. * 1、每种商品的销售总金额,并降序排序
  18. * 周数 商品编号 单价 销量(件)
  19. * 数据:week1 A0001 10 20
  20. * 2、每种商品销售额最多的三周
  21. *
  22. */
  23. public class QuestionMR_1_1 {
  24. public static void main(String[] args) throws Exception {
  25. Configuration conf = new Configuration();
  26. FileSystem fs = FileSystem.get(conf);
  27. /**
  28. * 在本地跑把下面两行注释掉
  29. */
  30. //conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
  31. //System.setProperty("HADOOP_USER_NAME", "hadoop");
  32. Job job = Job.getInstance(conf);
  33. job.setJarByClass(QuestionMR_1_1.class);
  34. job.setMapperClass(MRMapper.class);
  35. job.setReducerClass(MRReducer.class);
  36. job.setOutputKeyClass(Text.class);
  37. job.setOutputValueClass(DoubleWritable.class);
  38. /**
  39. * 自定义分区组件
  40. */
  41. //job.setPartitionerClass(Object.class);
  42. /**
  43. * 自定义分组组件
  44. */
  45. //job.setGroupingComparatorClass(Object.class);
  46. /**
  47. * 必要的时候使用自定义Combine组件
  48. */
  49. //job.setCombinerClass(MRReducer.class);
  50. Path inputPath = new Path("G:/test/q1/input");
  51. Path outputPath = new Path("G:/test/q1/output_1_1");
  52. if(fs.exists(outputPath)){
  53. fs.delete(outputPath, true);
  54. }
  55. FileInputFormat.setInputPaths(job, inputPath);
  56. FileOutputFormat.setOutputPath(job, outputPath);
  57. Job job2 = Job.getInstance(conf);
  58. job2.setJarByClass(QuestionMR_1_1.class);
  59. job2.setMapperClass(MRMapper2.class);
  60. job2.setReducerClass(MRReducer2.class);
  61. job2.setOutputKeyClass(Product.class);
  62. job2.setOutputValueClass(NullWritable.class);
  63. Path inputPath2 = new Path("G:/test/q1/output_1_1");
  64. Path outputPath2 = new Path("G:/test/q1/output_1_2");
  65. if(fs.exists(outputPath2)){
  66. fs.delete(outputPath2, true);
  67. }
  68. FileInputFormat.setInputPaths(job2, inputPath2);
  69. FileOutputFormat.setOutputPath(job2, outputPath2);
  70. //使用连个mapreduce程序进行关联操作
  71. JobControl control = new JobControl("CF");
  72. ControlledJob conjob1 = new ControlledJob(job.getConfiguration());
  73. ControlledJob conjob2 = new ControlledJob(job2.getConfiguration());
  74. conjob2.addDependingJob(conjob1);
  75. control.addJob(conjob1);
  76. control.addJob(conjob2);
  77. Thread t = new Thread(control);
  78. t.start();
  79. while(!control.allFinished()){
  80. Thread.sleep(1000);
  81. }
  82. System.exit(0);
  83. }
  84. //week1 A0001商品 10单价 20数量
  85. //每种商品的销售总金额,并降序排序
  86. public static class MRMapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{
  87. Text k = new Text();
  88. DoubleWritable v = new DoubleWritable();
  89. @Override
  90. protected void map(LongWritable key, Text value,Context context)
  91. throws IOException, InterruptedException {
  92. String[] line = value.toString().split(",");
  93. //根据商品ID作为Key这样在reduce阶段我们就能对每一种商品的销售额进行累加
  94. k.set(line[1]);
  95. //根据数据格式,求出每条记录的销售总金额
  96. double sum = Double.parseDouble(line[2])*Double.parseDouble(line[3]);
  97. v.set(sum);
  98. context.write(k, v);
  99. }
  100. }
  101. public static class MRReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable>{
  102. DoubleWritable v = new DoubleWritable();
  103. @Override
  104. protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
  105. throws IOException, InterruptedException {
  106. double sum = 0D;
  107. //相同的商品会在一个reduceTask中进行处理,
  108. //这里能够对每一种商品的销售额进行累加
  109. for (DoubleWritable num : values) {
  110. sum += num.get();
  111. }
  112. v.set(sum);
  113. context.write(key,v);
  114. }
  115. }
  116. /**
  117. * 对商品进行排序的时候这里选择使用自定义数据类型的方式进行
  118. * 使用compareTo方法对数据进行排序
  119. */
  120. public static class MRMapper2 extends Mapper<LongWritable, Text, Product, NullWritable>{
  121. @Override
  122. protected void map(LongWritable key, Text value,Context context)
  123. throws IOException, InterruptedException {
  124. String[] line = value.toString().split("\t");
  125. Product p = new Product(line[0], Double.parseDouble(line[1]),"");
  126. context.write(p, NullWritable.get());
  127. }
  128. }
  129. public static class MRReducer2 extends Reducer<Product, NullWritable, Product, NullWritable>{
  130. @Override
  131. protected void reduce(Product key, Iterable<NullWritable> values, Context context)
  132. throws IOException, InterruptedException {
  133. context.write(key, NullWritable.get());
  134. }
  135. }
  136. }
问题二:每种商品销售额最多的三周是一种简单的分组取TopN
根据商品进行分组,每一组根据销售额进行倒序排序,取Top3即可。
这里使用了自定义分组组件,这样能够适当的减少reduce端的循环次数。

排序通过自定义数据类型Product中的compareTo方法来控制

  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.DoubleWritable;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.NullWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. /**
  15. * 数据:week1 A0001 10 20
  16. * 2、每种商品销售额最多的三周
  17. * @author Administrator
  18. *
  19. */
  20. public class QuestionMR_1_2 {
  21. public static void main(String[] args) throws Exception {
  22. Configuration conf = new Configuration();
  23. FileSystem fs = FileSystem.get(conf);
  24. /**
  25. * 在本地跑把下面两行注释掉
  26. */
  27. //conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
  28. //System.setProperty("HADOOP_USER_NAME", "hadoop");
  29. Job job = Job.getInstance(conf);
  30. job.setJarByClass(QuestionMR_1_1.class);
  31. job.setMapperClass(MRMapper.class);
  32. job.setReducerClass(MRReducer.class);
  33. job.setOutputKeyClass(Product.class);
  34. job.setOutputValueClass(NullWritable.class);
  35. /**
  36. * 自定义分区组件
  37. */
  38. //job.setPartitionerClass(Object.class);
  39. /**
  40. * 自定义分组组件
  41. */
  42. job.setGroupingComparatorClass(ProductGC.class);
  43. /**
  44. * 必要的时候使用自定义Combine组件
  45. */
  46. //job.setCombinerClass(MRReducer.class);
  47. Path inputPath = new Path("G:/exam/q1/input");
  48. Path outputPath = new Path("G:/exam/q1/output_2_1");
  49. if(fs.exists(outputPath)){
  50. fs.delete(outputPath, true);
  51. }
  52. FileInputFormat.setInputPaths(job, inputPath);
  53. FileOutputFormat.setOutputPath(job, outputPath);
  54. boolean isDone = job.waitForCompletion(true);
  55. System.exit(isDone?0:1);
  56. }
  57. //数据:week1 A0001 10 20
  58. public static class MRMapper extends Mapper<LongWritable, Text, Product, NullWritable>{
  59. Text k = new Text();
  60. DoubleWritable v = new DoubleWritable();
  61. //直接将数据读取以后构建自定义数据类型(bean)
  62. //然后通过自定义分组组件进行分组
  63. //自定义数据类型会根据compareTo方法自动排序
  64. @Override
  65. protected void map(LongWritable key, Text value,Context context)
  66. throws IOException, InterruptedException {
  67. String[] line = value.toString().split(",");
  68. double sum = Double.parseDouble(line[2])*Double.parseDouble(line[3]);
  69. Product p = new Product(line[1],sum,line[0]);
  70. context.write(p,NullWritable.get());
  71. }
  72. }
  73. public static class MRReducer extends Reducer<Product, NullWritable, Product, NullWritable>{
  74. @Override
  75. protected void reduce(Product key, Iterable<NullWritable> values, Context context)
  76. throws IOException, InterruptedException {
  77. int count = 0;
  78. for (NullWritable nv : values) {
  79. if(count < 3){
  80. context.write(key, nv);
  81. }
  82. count++;
  83. }
  84. }
  85. }
  86. }

自定义分组组件:ProductGC

  1. import org.apache.hadoop.io.WritableComparable;
  2. import org.apache.hadoop.io.WritableComparator;
  3. public class ProductGC extends WritableComparator{
  4. public ProductGC() {
  5. super(Product.class,true);
  6. }
  7. @Override
  8. public int compare(WritableComparable a, WritableComparable b) {
  9. Product pa = (Product) a;
  10. Product pb = (Product) b;
  11. //根据商品的ID进行分组
  12. return pa.getNamenum().compareTo(pb.getNamenum());
  13. }
  14. }

自定义数据类型:Product

  1. package cn.zhao.exam.mapreduce.q1;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.WritableComparable;
  6. public class Product implements WritableComparable<Product>{
  7. private String namenum;
  8. private double price;
  9. private int count;
  10. private double totalMoney;
  11. private String week;
  12. public String getWeek() {
  13. return week;
  14. }
  15. public void setWeek(String week) {
  16. this.week = week;
  17. }
  18. public String getNamenum() {
  19. return namenum;
  20. }
  21. public void setNamenum(String namenum) {
  22. this.namenum = namenum;
  23. }
  24. public double getPrice() {
  25. return price;
  26. }
  27. public void setPrice(double price) {
  28. this.price = price;
  29. }
  30. public int getCount() {
  31. return count;
  32. }
  33. public void setCount(int count) {
  34. this.count = count;
  35. }
  36. public double getTotalMoney() {
  37. return totalMoney;
  38. }
  39. public void setTotalMoney(double totalMoney) {
  40. this.totalMoney = totalMoney;
  41. }
  42. public Product() {
  43. super();
  44. // TODO Auto-generated constructor stub
  45. }
  46. public Product(String namenum, double totalMoney, String week) {
  47. super();
  48. this.namenum = namenum;
  49. this.totalMoney = totalMoney;
  50. this.week = week;
  51. }
  52. @Override
  53. public String toString() {
  54. if("".equals(week)){
  55. return namenum +", "+ totalMoney;
  56. }else{
  57. return namenum +", " + week +", " + totalMoney;
  58. }
  59. }
  60. @Override
  61. public void write(DataOutput out) throws IOException {
  62. out.writeUTF(namenum);
  63. out.writeDouble(totalMoney);
  64. out.writeUTF(week);
  65. }
  66. @Override
  67. public void readFields(DataInput in) throws IOException {
  68. namenum = in.readUTF();
  69. totalMoney = in.readDouble();
  70. week = in.readUTF();
  71. }
  72. @Override
  73. public int compareTo(Product o) {
  74. //这里使用两个字段是因为第二问也使用了这个bean
  75. if(o.getNamenum().compareTo(this.getNamenum()) == 0){
  76. //控制倒叙排序 o - this > 0 返回1 为降序
  77. //this - o > 0 返回 1 为升序
  78. double flag = o.totalMoney - this.totalMoney;
  79. if(flag == 0){
  80. return 0;
  81. }else if(flag > 0){
  82. return 1;
  83. }else{
  84. return -1;
  85. }
  86. }else{
  87. return o.getNamenum().compareTo(this.getNamenum());
  88. }
  89. }
  90. }
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号