赞
踩
截取部分数据如下:
该数据为电影评分数据,分别是电影名、评分、时间、点评人ID。
(movie相同的所有电影的rate总和)
代码如下:
Map Reduce Driver分开来写: (1)Map package com.jasmine; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMap extends Mapper<LongWritable,Text,Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String word = value.toString(); String[] words = word.split(" "); for (String w:words) { context.write(new Text(w),new IntWritable(1)); } } } (2)Reduce package com.jasmine; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> { @Override protected void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException, InterruptedException { Integer count = 0; for (IntWritable v:values) { count++; } context.write(key,new IntWritable(count)); } } (3)Driver package com.jasmine; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.log4j.BasicConfigurator; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { BasicConfigurator.configure(); //自动快速地使用缺省Log4j环境 Configuration conf = new Configuration(); conf.set("yarn.resorcemanager.hostname","jasmine01"); conf.set("fs.deafutFS","hdfs://jasmine01:9000/"); Job job = Job.getInstance(conf); job.setJarByClass(WordCountDriver.class); //设置本次job是使用map,reduce job.setMapperClass(WordCountMap.class); job.setReducerClass((WordCountReduce.class)); //设置本次map和reduce的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputKeyClass(IntWritable.class); //制定本次job读取源数据时需要用的组件:我们的源文件在hdfs的文本文件中,用TextInputFormat job.setInputFormatClass(TextInputFormat.class); //制定本次job输出数据时需要用的组件:我们要输出到hdfs文件中,用TextInputFormat job.setOutputFormatClass(TextOutputFormat.class); //设置输入路径 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //提交任务,客户端返回 job.submit(); //核心代码:提交jar程序给yarn,客户端不退出,等待接受mapreduce的进度信息,打印进度信息,并等待最终运行的结果 //客户端true:的含义 等着 //result:返回true:则跑完了 false:出错了 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }
运行后,得到的数据如下:
(对第一题中得到的数据,每部电影的总分进行降序排列)
代码如下:
这里需要用到一个Bean类,重写部分方法,同时连接排序接口,实现排序功能。 (1)Bean类 package com.SumRateMovie; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class UserRateSum implements WritableComparable<UserRateSum> { private String movie; private Integer rate; private String timeStamp; private String uid; private Integer ratesum; public String getMovie() { return movie; } @Override public void readFields(DataInput dataInput) throws IOException { this.movie = dataInput.readUTF(); this.ratesum = dataInput.readInt(); } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.movie); dataOutput.writeInt(this.ratesum); } public Integer getRatesum() { return ratesum; } public void setRatesum(Integer ratesum) { this.ratesum = ratesum; } public void setMovie(String movie) { this.movie = movie; } public Integer getRate() { return rate; } public void setRate(Integer rate) { this.rate = rate; } public String getTimeStamp() { return timeStamp; } public void setTimeStamp(String timeStamp) { this.timeStamp = timeStamp; } public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } @Override public String toString() { return "movie:" + movie+";"+"rateSum:" + ratesum; } @Override public int compareTo(UserRateSum o) { Integer other_Sum = o.getRatesum(); Integer my_Sum = this.ratesum; Integer cha = other_Sum - my_Sum; return cha; } } (2)主函数类 package com.SumRateMovie; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; public class UserRateSort { public static class UserRateMap extends Mapper<LongWritable, Text,UserRateSum,NullWritable>{ protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); UserRateSum userRateSum = new UserRateSum(); String movie = split[0]; Integer ratesum = Integer.parseInt(split[1]); userRateSum.setMovie(movie); userRateSum.setRatesum(ratesum); context.write(userRateSum,NullWritable.get()); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。