当前位置:   article > 正文

二、MapReduce练习----自定义分区:把学生成绩分成 2 份,一份是及格的,一份是不及格的_mapreduce 根据成绩进行分区

mapreduce 根据成绩进行分区

学生成绩分区:及格 不及格

MapReduce 执行过程中的一些特殊操作:
partition(分区):决定改组 key-value 交由哪个 reduce 进行汇总
sort(排序):数据按照 key 进行升序排列
grouping(聚合):把 key 相同 value 聚集到一个集合中,形成新的 key-value 输入 reduce 进行汇总
  • 1
  • 2
  • 3
  • 4

如果想实现复杂的数据分析,那么必须手动控制这些特殊操作

自定义分区:
当有多个 reduce 的时候,分区才有意义
默认的分区方案:HashPartition
拿着 key 的 hashCode 对 reduce 个数取余,余数就是对应的 reduce 编号
hashCode 是一串数字,是对象的唯一标识(身份证号)
100 % 3 = 1 表示要把 key-value 交给编号为 1 的 reduce 进行汇总
101 % 3 = 2 表示要把 key-value 交给编号为 2 的 reduce 进行汇总
102 % 3 = 0 表示要把 key-value 交给编号为 0 的 reduce 进行汇总

把学生成绩分成 2 份,一份是及格的,一份是不及格的:
    需要 2 个 reduce,一个汇总及格的数据,一个汇总不及格的数据
        及格的成绩发送给编号为 0 的 reduce 进行汇总
        不及格的成绩发送给编号为 1 的 reduce 进行汇总

    map 以成绩为 key,名字为 value 进行输出

    自定义分区规则,按照成绩是否及格指配 reduce 编号 0 或 1
        1. 创建类继承 Partitioner 并设置两个泛型为 map 输出的 key 和 value 类型
        2. 实现父类的 getPartition 方法
            1. 修改 3 个参数名为 key,value,reduceCount
            2. 根据 key,value,reduceCount 返回 reduce 编号

    reduce 以名字为 key,成绩为 value 进行输出

    job 配置使用自定义分区方案
        job.setPartitionerClass(Score1Partitioner.class);
    
    job 配置 reduce 个数
        job.setNumReduceTasks(2);

把学生成绩分成 A,B,C,D 四份
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

学生成绩文档:https://pan.baidu.com/s/1pJSZAzamOEIhb2D0S1Xsqg

代码:

package com.it666.student;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author 吴振峰
 *					编程夜当午,手握小滑鼠。
 *					谁知编程辛,行行皆'心'苦。
 *	/
 
public class Core {
	
	private static class ScoreA1Mapper extends Mapper<LongWritable, Text, IntWritable, Text>{
		
		private IntWritable outputkey = new IntWritable();
		private Text outputvalue = new Text();
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
				throws IOException, InterruptedException {
			
			String line = value.toString();
			String[] words = line.split("\\s+");
			String name = words[0];
			// Integer.parseInt() 用于把 String 转换为 int 类型
			int score = Integer.parseInt(words[1]);
			// 以成绩为 key,名字为 value 进行输出
			outputkey.set(score);
			outputvalue.set(name);
			
			context.write(outputkey, outputvalue);
			
		}
	}
	private static class ScoreA1Partitioner extends Partitioner<IntWritable, Text> {

		@Override
		public int getPartition(IntWritable key, Text value, int reduceCount) {
			
			if (key.get() >=60) {
				// 及格的成绩发送给编号为 0 的 reduce 进行汇总
				return 0;
			}else {
				// 不及格的成绩发送给编号为 1 的 reduce 进行汇总
				return 1;
			}
			
		}
	}
	private static class ScoreA1Reducer extends Reducer<IntWritable, Text, Text, IntWritable> {

		@Override
		protected void reduce(IntWritable score, Iterable<Text> names,
				Reducer<IntWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
			
			for (Text name : names) {
				// 以名字为 key,成绩为 value 进行输出
				context.write(name, score);
			}
			
		}
	}
	public static void main(String[] args) {
		Configuration conf = new Configuration();
		try {
			Job job = Job.getInstance(conf);
			
			job.setJarByClass(Core.class);
			
			job.setMapperClass(ScoreA1Mapper.class);
			job.setReducerClass(ScoreA1Reducer.class);
			
			// 设置分区方案为自定义的方案:Score1Partitioner
			job.setPartitionerClass(ScoreA1Partitioner.class);
			
			//设置reduce的个数为2
			job.setNumReduceTasks(2);
			
			job.setMapOutputKeyClass(IntWritable.class);
			job.setMapOutputValueClass(Text.class);
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
			
			Path inputPath  = new Path("E:/java/MavenHadoopData/04-Students Grade/scoreA.txt");
			FileInputFormat.addInputPath(job, inputPath);
			
			Path outputPath = new Path("E:/java/MavenHadoopData/04-Students Grade/scoreA1");
			FileSystem.get(conf).delete(outputPath, true);
			FileOutputFormat.setOutputPath(job, outputPath);
			
			job.waitForCompletion(true);
			
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/797095
推荐阅读
相关标签
  

闽ICP备14008679号