赞
踩
MapReduce 执行过程中的一些特殊操作:
partition(分区):决定改组 key-value 交由哪个 reduce 进行汇总
sort(排序):数据按照 key 进行升序排列
grouping(聚合):把 key 相同 value 聚集到一个集合中,形成新的 key-value 输入 reduce 进行汇总
如果想实现复杂的数据分析,那么必须手动控制这些特殊操作
自定义分区:
当有多个 reduce 的时候,分区才有意义
默认的分区方案:HashPartition
拿着 key 的 hashCode 对 reduce 个数取余,余数就是对应的 reduce 编号
hashCode 是一串数字,是对象的唯一标识(身份证号)
100 % 3 = 1 表示要把 key-value 交给编号为 1 的 reduce 进行汇总
101 % 3 = 2 表示要把 key-value 交给编号为 2 的 reduce 进行汇总
102 % 3 = 0 表示要把 key-value 交给编号为 0 的 reduce 进行汇总
把学生成绩分成 2 份,一份是及格的,一份是不及格的: 需要 2 个 reduce,一个汇总及格的数据,一个汇总不及格的数据 及格的成绩发送给编号为 0 的 reduce 进行汇总 不及格的成绩发送给编号为 1 的 reduce 进行汇总 map 以成绩为 key,名字为 value 进行输出 自定义分区规则,按照成绩是否及格指配 reduce 编号 0 或 1 1. 创建类继承 Partitioner 并设置两个泛型为 map 输出的 key 和 value 类型 2. 实现父类的 getPartition 方法 1. 修改 3 个参数名为 key,value,reduceCount 2. 根据 key,value,reduceCount 返回 reduce 编号 reduce 以名字为 key,成绩为 value 进行输出 job 配置使用自定义分区方案 job.setPartitionerClass(Score1Partitioner.class); job 配置 reduce 个数 job.setNumReduceTasks(2); 把学生成绩分成 A,B,C,D 四份
学生成绩文档:https://pan.baidu.com/s/1pJSZAzamOEIhb2D0S1Xsqg
package com.it666.student; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author 吴振峰 * 编程夜当午,手握小滑鼠。 * 谁知编程辛,行行皆'心'苦。 * / public class Core { private static class ScoreA1Mapper extends Mapper<LongWritable, Text, IntWritable, Text>{ private IntWritable outputkey = new IntWritable(); private Text outputvalue = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split("\\s+"); String name = words[0]; // Integer.parseInt() 用于把 String 转换为 int 类型 int score = Integer.parseInt(words[1]); // 以成绩为 key,名字为 value 进行输出 outputkey.set(score); outputvalue.set(name); context.write(outputkey, outputvalue); } } private static class ScoreA1Partitioner extends Partitioner<IntWritable, Text> { @Override public int getPartition(IntWritable key, Text value, int reduceCount) { if (key.get() >=60) { // 及格的成绩发送给编号为 0 的 reduce 进行汇总 return 0; }else { // 不及格的成绩发送给编号为 1 的 reduce 进行汇总 return 1; } } } private static class ScoreA1Reducer extends Reducer<IntWritable, Text, Text, IntWritable> { @Override protected void reduce(IntWritable score, Iterable<Text> names, Reducer<IntWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { for (Text name : names) { // 以名字为 key,成绩为 value 进行输出 context.write(name, score); } } } public static void main(String[] args) { Configuration conf = new Configuration(); try { Job job = Job.getInstance(conf); job.setJarByClass(Core.class); job.setMapperClass(ScoreA1Mapper.class); job.setReducerClass(ScoreA1Reducer.class); // 设置分区方案为自定义的方案:Score1Partitioner job.setPartitionerClass(ScoreA1Partitioner.class); //设置reduce的个数为2 job.setNumReduceTasks(2); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path inputPath = new Path("E:/java/MavenHadoopData/04-Students Grade/scoreA.txt"); FileInputFormat.addInputPath(job, inputPath); Path outputPath = new Path("E:/java/MavenHadoopData/04-Students Grade/scoreA1"); FileSystem.get(conf).delete(outputPath, true); FileOutputFormat.setOutputPath(job, outputPath); job.waitForCompletion(true); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。