赞
踩
本关任务:根据用户行为数据,编写 MapReduce 程序来统计出商品点击量排行。
在 MapReduce 的 Shuffle 的过程中执行了三次排序,分别是: map 中的溢写阶段:根据分区以及 key 进行快速排序。 map 中合并溢写文件:将同一分区的多个溢写文件进行归并排序,合成一个大的溢写文件。 reduce 输入阶段:将同一分区,来自不同 map task 的数据文件进行归并排序。
在 MapReduce 整个过程中,默认是会对输出的键值对按照 key 进行排序的,而且是使用快速排序。map 输出的排序的,其实也就是上面的溢写过程中的排序。reduce 输出的排序,即 reduce 处理完数据后,MapReduce内部会自动对输出的键值对按照key进行排序。
这是编程中用到的电商数据,为 CSV 格式,文件名user_behavior.csv
,大小9948行,前几行示例如下:
1002309,1008608,手机,pv
1002573,1009007,耳机,pv
1001541,1008614,手机,pv
1001192,1008612,手机,pv
1001016,1008909,平板电脑,buy
1001210,1008605,手机,pv
1001826,1008704,笔记本,pv
1002208,1008906,平板电脑,pv
1002308,1008702,笔记本,pv
1002080,1008702,笔记本,cart
1001525,1008702,笔记本,cart
1002749,1008702,笔记本,pv
1002134,1008704,笔记本,cart
1002497,1008608,手机,pv
···
---总共 9948行---
即统计出每个商品id中用户行为是pv
(点击浏览)的数量,reduce的输出最后是按点击量的大小从大到小排序。
编程中可能会用到 cleanup() 方法,cleanup 方法是 mapper/reduce 对象执行完所有的 map/reduce 方法之后最后执行的方法,可用于清理资源释放或清理工作;默认继承的父类方法为空,什么也不做。
根据提示,在右侧编辑器补充代码,计算得出商品点击量排行。
预期输出格式(按点击量从大到小):
商品id,点击量
商品id,点击量
···
···
平台会对你编写的代码进行测试,如果编写的 MapReduce 输出与预期一致,则通过。
注:出于显示原因,网页端的 mapreduce 的输出结果中制表符统一用逗号代替显示,但在实际 reduce 结果中 key\value 仍是原样制表符分割,这只是显示上的变化,不影响编程与评测结果。
开始你的任务吧,祝你成功!
代码如下:
package educoder;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* UserLoseDriver
*/
public class ItemClickRankDriver {
public static class ThisMap extends Mapper<Object, Text, Text, IntWritable> {
private static IntWritable one = new IntWritable(1);
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
/*** 在这编写map内容 ****/
/********** Begin **********/
//1. 分割每行数据
String[] atts=value.toString().split(",");
//2. 得到商品id
String item=atts[1];
//3. 得到行为属性
String behavior=atts[3];
//4. 如果行为属性是 'pv',则写入到map输出
if(behavior.equals("pv")){
context.write(new Text(item),one);
}
/********** End **********/
}
}
public static class ThisReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
//对象实例,用来保存reduce方法中处理的数据
List<Object[]> list=new LinkedList<>();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
/*** 在这编写reduce内容 ****/
/********** Begin **********/
int sum=0;
for(IntWritable one:values){
sum+=one.get();
}
list.add(new Object[] {key.toString(),Integer.valueOf(sum)});
// 统计同key总数, 把key和sum写入到list中
/********** End **********/
}
//cleanup方法,即reduce对象执行完所有的reduce方法后最后执行的方法
@Override
protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
list=list.stream().sorted((o1,o2) -> {return ((int)o1[1] - (int)o2[1]);}).collect(Collectors.toList());
for(int i=list.size()-1;i>=0;i--){
Object[] o=list.get(i);
context.write(new Text((String) o[0]),new IntWritable((int) o[1]));
}
/********** Begin **********/
/********** End **********/
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "商品点击量排行");
job.setJarByClass(ItemClickRankDriver.class);
job.setMapperClass(ThisMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(ThisReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。