赞
踩
//主类: package com.hadoop.demo.sort; /** * @author: 易霭珞 * @description * @date: 2022/11/7 19:15 */ import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.map.InverseMapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class MusicDriver { public static void main(String[] args) throws Exception { System.setProperty("HADOOP_USER_NAME","hduser"); Configuration conf = new Configuration(); conf.set("fs.defaultFs","hdfs://192.168.56.100:9000"); Path tempDir = new Path("wordcount-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); @SuppressWarnings("deprecation") Job job = new Job(conf, "word count"); job.setJarByClass(MusicDriver.class); try { job.setMapperClass(MusicMapper.class); job.setCombinerClass(MusicReducer.class); job.setReducerClass(MusicReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.56.100:9000/user/hduser/music")); FileOutputFormat.setOutputPath(job, tempDir); job.setOutputFormatClass(SequenceFileOutputFormat.class); if (job.waitForCompletion(true)) { Job sortJob = new Job(conf, "sort"); sortJob.setJarByClass(MusicDriver.class); FileInputFormat.addInputPath(sortJob, tempDir); sortJob.setInputFormatClass(SequenceFileInputFormat.class); sortJob.setMapperClass(InverseMapper.class); FileOutputFormat.setOutputPath(sortJob, new Path("hdfs://192.168.56.100:9000/user/hduser/music/out")); sortJob.setOutputKeyClass(IntWritable.class); sortJob.setOutputValueClass(Text.class); sortJob.setSortComparatorClass(MusicSort.class); System.exit(sortJob.waitForCompletion(true) ? 0 : 1); } } finally { FileSystem.get(conf).deleteOnExit(tempDir); } } } //数据预处理类: package com.hadoop.demo.sort; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; /** * @author: 易霭珞 * @description *对数据进行格式化处理 * @date: 2022/11/7 22:00 */ public class MusicLine { private String music; private IntWritable one = new IntWritable(1); private boolean right = true; public MusicLine(String musicLine) { if (musicLine == null || "".equals(musicLine)) { this.right = false; return; } String[] strs = musicLine.split("//"); this.music = strs[0]; } public boolean isRight() { return right; } public Text getMusicCountMapOutKey() { return new Text(this.music); } public IntWritable getMusicCountMapOutValue() { return this.one; } } //Mapper实现类: package com.hadoop.demo.sort; /** * @author: 易霭珞 * @description *实现Mapper接口,输入是Text,这里需要把输入的value赋值给输出的key,而输出的value可以 为任意类型 * @date: 2022/11/7 22:30 */ import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MusicMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { MusicLine musicLine = new MusicLine(value.toString()); if (musicLine.isRight()) { context.write(musicLine.getMusicCountMapOutKey(), musicLine.getMusicCountMapOutValue()); } } } //Reducer实现类: package com.hadoop.demo.sort; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * @author: 易霭珞 * @description、 *实现Mapper接口,输入是Text,这里需要把输入的value赋值给输出的key,而输出的value可以 为任意类型 * @date: 2022/11/7 22:50 */ public class MusicReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } //数据排序类: package com.hadoop.demo.sort; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.WritableComparator; /** * @author: 易霭珞 * @description *根据单曲数量对歌曲进行排序 * @date: 2022/11/7 23:30 */ public class MusicSort extends IntWritable.Comparator{ public int compare(WritableComparator a,WritableComparator b) { return -super.compare(a, b); } public int compare(byte[]b1,int s1,int l1,byte[]b2,int s2,int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } }
上传的数据集
音乐数据集
运行后的结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。