赞
踩
week1,A0001,10,20
week1,A0002,8.5,15
week1,A0003,9.2,30
week1,B0001,10.5,50
week2,A0001,11,30
week2,A0002,8,20
week2,A0003,9.2,20
week2,B0001,10,55
week3,A0001,9.5,10
week3,A0002,8.8,30
week3,A0003,9.8,30
week3,B0001,9,58
week4,A0001,9.2,14
week4,A0002,8.5,22
week4,A0003,10.3,45
week4,B0001,7,12
要求:根据以上数据,用 MapReduce 统计出如下数据:
1、每种商品的销售总金额,并降序排序
2、每种商品销售额最多的三周
问题一:分组求和,总体降序排列。
该问题采用两个mapreduce程序进行解决的,第一个求出每种商品的销售总额,第二个进行降序排序。
-
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.DoubleWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
- import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-
- /**
- * 1、每种商品的销售总金额,并降序排序
- * 周数 商品编号 单价 销量(件)
- * 数据:week1 A0001 10 20
- * 2、每种商品销售额最多的三周
- *
- */
- public class QuestionMR_1_1 {
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
-
- /**
- * 在本地跑把下面两行注释掉
- */
- //conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
- //System.setProperty("HADOOP_USER_NAME", "hadoop");
-
-
- Job job = Job.getInstance(conf);
- job.setJarByClass(QuestionMR_1_1.class);
-
- job.setMapperClass(MRMapper.class);
- job.setReducerClass(MRReducer.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(DoubleWritable.class);
-
- /**
- * 自定义分区组件
- */
- //job.setPartitionerClass(Object.class);
-
- /**
- * 自定义分组组件
- */
- //job.setGroupingComparatorClass(Object.class);
-
- /**
- * 必要的时候使用自定义Combine组件
- */
- //job.setCombinerClass(MRReducer.class);
-
-
- Path inputPath = new Path("G:/test/q1/input");
- Path outputPath = new Path("G:/test/q1/output_1_1");
- if(fs.exists(outputPath)){
- fs.delete(outputPath, true);
- }
-
- FileInputFormat.setInputPaths(job, inputPath);
- FileOutputFormat.setOutputPath(job, outputPath);
-
-
-
- Job job2 = Job.getInstance(conf);
- job2.setJarByClass(QuestionMR_1_1.class);
-
- job2.setMapperClass(MRMapper2.class);
- job2.setReducerClass(MRReducer2.class);
-
- job2.setOutputKeyClass(Product.class);
- job2.setOutputValueClass(NullWritable.class);
-
-
-
- Path inputPath2 = new Path("G:/test/q1/output_1_1");
- Path outputPath2 = new Path("G:/test/q1/output_1_2");
- if(fs.exists(outputPath2)){
- fs.delete(outputPath2, true);
- }
-
- FileInputFormat.setInputPaths(job2, inputPath2);
- FileOutputFormat.setOutputPath(job2, outputPath2);
- //使用连个mapreduce程序进行关联操作
- JobControl control = new JobControl("CF");
-
- ControlledJob conjob1 = new ControlledJob(job.getConfiguration());
- ControlledJob conjob2 = new ControlledJob(job2.getConfiguration());
-
- conjob2.addDependingJob(conjob1);
-
- control.addJob(conjob1);
- control.addJob(conjob2);
-
- Thread t = new Thread(control);
- t.start();
-
- while(!control.allFinished()){
- Thread.sleep(1000);
- }
-
- System.exit(0);
-
- }
-
- //week1 A0001商品 10单价 20数量
- //每种商品的销售总金额,并降序排序
- public static class MRMapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{
- Text k = new Text();
- DoubleWritable v = new DoubleWritable();
-
- @Override
- protected void map(LongWritable key, Text value,Context context)
- throws IOException, InterruptedException {
-
- String[] line = value.toString().split(",");
- //根据商品ID作为Key这样在reduce阶段我们就能对每一种商品的销售额进行累加
- k.set(line[1]);
- //根据数据格式,求出每条记录的销售总金额
- double sum = Double.parseDouble(line[2])*Double.parseDouble(line[3]);
- v.set(sum);
- context.write(k, v);
-
- }
- }
-
- public static class MRReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable>{
- DoubleWritable v = new DoubleWritable();
- @Override
- protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
- throws IOException, InterruptedException {
-
- double sum = 0D;
- //相同的商品会在一个reduceTask中进行处理,
- //这里能够对每一种商品的销售额进行累加
- for (DoubleWritable num : values) {
- sum += num.get();
- }
- v.set(sum);
- context.write(key,v);
- }
- }
-
- /**
- * 对商品进行排序的时候这里选择使用自定义数据类型的方式进行
- * 使用compareTo方法对数据进行排序
- */
- public static class MRMapper2 extends Mapper<LongWritable, Text, Product, NullWritable>{
- @Override
- protected void map(LongWritable key, Text value,Context context)
- throws IOException, InterruptedException {
-
- String[] line = value.toString().split("\t");
-
- Product p = new Product(line[0], Double.parseDouble(line[1]),"");
- context.write(p, NullWritable.get());
-
- }
- }
-
- public static class MRReducer2 extends Reducer<Product, NullWritable, Product, NullWritable>{
- @Override
- protected void reduce(Product key, Iterable<NullWritable> values, Context context)
- throws IOException, InterruptedException {
-
- context.write(key, NullWritable.get());
-
- }
- }
- }
问题二:每种商品销售额最多的三周是一种简单的分组取TopN
排序通过自定义数据类型Product中的compareTo方法来控制
-
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.DoubleWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- /**
- * 数据:week1 A0001 10 20
- * 2、每种商品销售额最多的三周
- * @author Administrator
- *
- */
- public class QuestionMR_1_2 {
-
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
-
- /**
- * 在本地跑把下面两行注释掉
- */
- //conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
- //System.setProperty("HADOOP_USER_NAME", "hadoop");
-
-
- Job job = Job.getInstance(conf);
- job.setJarByClass(QuestionMR_1_1.class);
-
- job.setMapperClass(MRMapper.class);
- job.setReducerClass(MRReducer.class);
-
- job.setOutputKeyClass(Product.class);
- job.setOutputValueClass(NullWritable.class);
-
- /**
- * 自定义分区组件
- */
- //job.setPartitionerClass(Object.class);
-
- /**
- * 自定义分组组件
- */
- job.setGroupingComparatorClass(ProductGC.class);
-
- /**
- * 必要的时候使用自定义Combine组件
- */
- //job.setCombinerClass(MRReducer.class);
-
-
- Path inputPath = new Path("G:/exam/q1/input");
- Path outputPath = new Path("G:/exam/q1/output_2_1");
- if(fs.exists(outputPath)){
- fs.delete(outputPath, true);
- }
-
- FileInputFormat.setInputPaths(job, inputPath);
- FileOutputFormat.setOutputPath(job, outputPath);
-
- boolean isDone = job.waitForCompletion(true);
- System.exit(isDone?0:1);
-
- }
-
- //数据:week1 A0001 10 20
- public static class MRMapper extends Mapper<LongWritable, Text, Product, NullWritable>{
- Text k = new Text();
- DoubleWritable v = new DoubleWritable();
- //直接将数据读取以后构建自定义数据类型(bean)
- //然后通过自定义分组组件进行分组
- //自定义数据类型会根据compareTo方法自动排序
- @Override
- protected void map(LongWritable key, Text value,Context context)
- throws IOException, InterruptedException {
-
- String[] line = value.toString().split(",");
- double sum = Double.parseDouble(line[2])*Double.parseDouble(line[3]);
- Product p = new Product(line[1],sum,line[0]);
- context.write(p,NullWritable.get());
-
- }
- }
-
- public static class MRReducer extends Reducer<Product, NullWritable, Product, NullWritable>{
- @Override
- protected void reduce(Product key, Iterable<NullWritable> values, Context context)
- throws IOException, InterruptedException {
-
- int count = 0;
- for (NullWritable nv : values) {
- if(count < 3){
- context.write(key, nv);
- }
- count++;
- }
- }
- }
- }
自定义分组组件:ProductGC
-
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.io.WritableComparator;
-
- public class ProductGC extends WritableComparator{
-
- public ProductGC() {
- super(Product.class,true);
- }
-
- @Override
- public int compare(WritableComparable a, WritableComparable b) {
- Product pa = (Product) a;
- Product pb = (Product) b;
-
- //根据商品的ID进行分组
- return pa.getNamenum().compareTo(pb.getNamenum());
- }
-
- }
自定义数据类型:Product
- package cn.zhao.exam.mapreduce.q1;
-
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
-
- import org.apache.hadoop.io.WritableComparable;
-
- public class Product implements WritableComparable<Product>{
- private String namenum;
- private double price;
- private int count;
- private double totalMoney;
- private String week;
-
-
- public String getWeek() {
- return week;
- }
- public void setWeek(String week) {
- this.week = week;
- }
- public String getNamenum() {
- return namenum;
- }
- public void setNamenum(String namenum) {
- this.namenum = namenum;
- }
- public double getPrice() {
- return price;
- }
- public void setPrice(double price) {
- this.price = price;
- }
- public int getCount() {
- return count;
- }
- public void setCount(int count) {
- this.count = count;
- }
- public double getTotalMoney() {
- return totalMoney;
- }
- public void setTotalMoney(double totalMoney) {
- this.totalMoney = totalMoney;
- }
- public Product() {
- super();
- // TODO Auto-generated constructor stub
- }
-
-
-
- public Product(String namenum, double totalMoney, String week) {
- super();
- this.namenum = namenum;
- this.totalMoney = totalMoney;
- this.week = week;
- }
- @Override
- public String toString() {
- if("".equals(week)){
- return namenum +", "+ totalMoney;
- }else{
- return namenum +", " + week +", " + totalMoney;
- }
- }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(namenum);
- out.writeDouble(totalMoney);
- out.writeUTF(week);
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- namenum = in.readUTF();
- totalMoney = in.readDouble();
- week = in.readUTF();
-
- }
-
- @Override
- public int compareTo(Product o) {
- //这里使用两个字段是因为第二问也使用了这个bean
- if(o.getNamenum().compareTo(this.getNamenum()) == 0){
- //控制倒叙排序 o - this > 0 返回1 为降序
- //this - o > 0 返回 1 为升序
- double flag = o.totalMoney - this.totalMoney;
- if(flag == 0){
- return 0;
- }else if(flag > 0){
- return 1;
- }else{
- return -1;
- }
- }else{
- return o.getNamenum().compareTo(this.getNamenum());
- }
-
- }
-
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。