赞
踩
一、Map多输入格式问题
-------------------------------------------------
1.设置输入格式为:TextInputFormat.class,SequenceFileInputFormat.class
MultipleInputs.addInputPath(job,new Path(args[0]), TextInputFormat.class,MyMapper_Text.class);
MultipleInputs.addInputPath(job,new Path(args[1]), SequenceFileInputFormat.class,MyMapper_Sequence.class);
2.设定对应的Map处理类MyMapper_Text.class,MyMapper_Sequence.class
需要特别注意的是Text输入格式的key是LongWriable类型,Sequence输入格式的key则是自身的key类型。对应的map类要做好输入类型的对应。
二、全排序
---------------------------------------------------
0.对MR结果的全排序,所有的reduce输出,合在一起仍然是有序的,整体有序
1.只有1个reduce的时候,就是全排序,但是会产生数据倾斜,仅限数据量很少的情况
2.通过自定义分区函数,控制map函数的分区
a.自行设置分界区间,但是此方法人为因素太严重,可能导致数据倾斜
b.通过hadoop采样机制,进行分区界限限定。
1.通过采样器生成分区文件
2.hadoop TotalOrderPartitioner通过分区文件,进行分区划分
c.代码实现
0.设定分区文件保存路径,默认为工程路径下
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path("d://mr/1.lst"));
1.创建随机采样器
//最常用 -- 随机采样器
//freq: 0.1 -- 采样率,每个key被采样的概率
//numSamples: 10000-- 被采样的样本总数
//maxSplitsSampled:10 -- 采样分区数。采样区间被分成10个
InputSampler.Sampler<IntWritable, IntWritable> sampler =
new InputSampler.RandomSampler<IntWritable, IntWritable>(0.1, 10000, 100);
2.设定分区器--全排序分区
job.setPartitionerClass(TotalOrderPartitioner.class);
d.分区器使用的注意事项
1.分区器一定要定义在,设定reduce任务数量[job.setNumReduceTasks(3)], 之后
2.InputFormat类的输入类型[job.setInputFormatClass]不能是TextInputFormat.class,
因为如果使用的话,会默认使用偏移量作为key,这样的key对于分区器是毫无意义的。
3.由于InputSampler.writePartitionFile 函数实现的原因,
必须要求 map() 函数输入和输出 Key 的类型一致,否则会出现如下的异常:org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable
4.设定分区文件保存路径的时候,Conf文件请使用,job.getConfiguration()获取,而不要使用前文的new 出来的conf.
因为,当Job job = Job.Ins(conf)之后,conf就没用了。conf内部的数据已经拷贝到job中了。以后再使用conf也没用了。
------------------------【代码实现】-------------------------------
- **************1.MyApp*******************
- public class MyApp {
-
- public static void main(String [] args)
- {
- try {
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
- if(args.length > 1)
- {
- FileSystem.get(conf).delete(new Path(args[1]));
- }
- //设置job
- job.setJobName("AllSort");
- job.setJarByClass(MyApp.class);
- FileInputFormat.addInputPath(job,new Path(args[0]));
- FileOutputFormat.setOutputPath(job,new Path(args[1]));
- job.setMapperClass(MyMapper.class);
- job.setReducerClass(MyReducer.class);
- job.setInputFormatClass(SequenceFileInputFormat.class); //设定输入文件格式为seq
- job.setMapOutputKeyClass(LongWritable.class); //map输出key类型
- job.setMapOutputValueClass(LongWritable.class); //map输出value类型
- job.setOutputKeyClass(IntWritable.class); //Reduce输出key类型
- job.setOutputValueClass(LongWritable.class); //Reduce输出value类型
- job.setNumReduceTasks(3);
-
- //设定分区文件保存路径
- TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path("d://mr/1.lst"));
- //设定特殊属性
- //创建随机采样器
- //最常用 -- 随机采样器
- InputSampler.Sampler sampler =
- new InputSampler.RandomSampler (1, 6000, 3);
- //将分区信息写入到分区文件
- InputSampler.writePartitionFile(job,sampler);
- //设定分区器--全排序分区
- job.setPartitionerClass(TotalOrderPartitioner.class);
-
- //
- job.waitForCompletion(true);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
- }
-
-
- **************2.Mapper*******************
- /**
- * Mapper类
- */
- public class MyMapper extends Mapper<LongWritable,Text ,LongWritable,LongWritable> {
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- context.write(new LongWritable((long)key.get()),new LongWritable(Long.parseLong(value.toString())));
- }
- }
-
- **************3.MyReducer*******************
- /**
- * Reducer类
- */
- public class MyReducer extends Reducer<LongWritable, LongWritable,IntWritable,LongWritable> {
-
- @Override
- protected void reduce(LongWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-
- int max = Integer.MIN_VALUE;
-
- Iterator<LongWritable> it = values.iterator();
- while(it.hasNext())
- {
- context.write(new IntWritable((int)key.get()), it.next());
- }
- }
- }
三、倒排序
----------------------------------
1.实质上就是将k 和 v进行对调
四、二次排序(Map端:Map函数的输出,进入Map端的分区类(决定进入哪个Reduce端),然后进入Reduce端的分组类(决定进入哪个Reduce函数),然后进入reduce函数,进行聚合处理)
---------------------------------------
1.key在map-reduce的过程中默认是会被排序的,但是value不会。对value排序就是2次排序 2.实现步骤
a.自定义key类 ComboKey,实现 WritableComparable接口,实现其方法
-- 自定义类中的字段,year 和 tempuature,标准javabean
-- 实现ComboKey类的串行和反串行的方法
-- 实现ComboKey类的比较方法compareTo();
-- 注意一点,空构造一定要有(如果自定义了带参构造),不然会报无法init的错误。
- /**
- * 自定义组合key
- */
- public class ComboKey implements WritableComparable<ComboKey> {
-
- private int year;
- private int temp;
-
- public ComboKey(int year, int temp) {
- this.year = year;
- this.temp = temp;
- }
-
- public ComboKey()
- {
-
- }
-
- public int getYear() {
- return year;
- }
-
- public void setYear(int year) {
- this.year = year;
- }
-
- public int getTemp() {
- return temp;
- }
-
- public void setTemp(int temp) {
- this.temp = temp;
- }
-
-
- /**
- * 自定义对比器
- * 目的 : 年份升序,温度降序
- */
- @Override
- public int compareTo(ComboKey o) {
- int y0 = o.getYear();
- int t0 = o.getTemp();
-
- if(this.year == y0)
- {
- return -(this.temp - t0);
- }
- else
- {
- return (this.year - y0);
- }
- }
-
- /**
- * 串行化
- */
- @Override
- public void write(DataOutput out) throws IOException {
- //写入年份
- out.writeInt(year);
- //写入温度
- out.writeInt(temp);
- }
-
- /**
- *反串行化
- */
- @Override
- public void readFields(DataInput in) throws IOException {
-
- this.year = in.readInt();
- this.temp = in.readInt();
- }
- }
b.Map端:编写Map函数
- /**
- * Mapper类
- */
- public class MyMapper extends Mapper<LongWritable,Text ,ComboKey, NullWritable> {
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-
- String v = value.toString();
- if(v != null && v != "")
- {
- String [] strs = v.split(" ");
- int year = Integer.parseInt(strs[0]);
- int tem = Integer.parseInt(strs[1]);
-
- ComboKey com = new ComboKey(year,tem);
- com.setTemp(tem);
- com.setYear(year);
- context.write(com,NullWritable.get());
- }
- }
- }
c.Map端:自定义分区类MyParttitioner,继承Partitioner,重写getPartition方法, 将年份相同的Combokey分到一个区
-- return year % numPartitions;
- /**
- * 自定义分区器
- */
- public class MyParttitioner extends Partitioner<ComboKey, NullWritable> {
-
- /**
- * 将年份相同的,分到一个组
- */
- @Override
- public int getPartition(ComboKey key, NullWritable nullWritable, int numPartitions) {
- int year = key.getYear();
- return year % numPartitions;
- }
- }
d.Reduce端:自定义排序对比器MyComboComparator extends WritableComparator ,将同一个组内的数据,(年份温度)按照自定义规则进行排序
- /**
- * 自定义key的排序对比器,用于Combokey的排序
- */
- public class ComboKeyComparator extends WritableComparator {
- protected ComboKeyComparator() {
- super(ComboKey.class, true);
- }
-
- public int compare(WritableComparable a, WritableComparable b) {
- ComboKey k1 = (ComboKey) a;
- ComboKey k2 = (ComboKey) b;
- return k1.compareTo(k2);
- }
- }
e.Reduce端:自定义分组对比器MyGroup extends WritableComparator, 将年份相同的Combokey组成一个组
- /**
- * 分组对比器,将reduce输入数据,年份相同的key编成一个组,进入一个reduce函数中
- */
- public class MyGroup extends WritableComparator {
-
- protected MyGroup() {
- super(ComboKey.class, true);
- }
-
- @Override
- public int compare(WritableComparable a, WritableComparable b) {
- ComboKey k1 = (ComboKey)a ;
- ComboKey k2 = (ComboKey)b ;
- return k1.getYear() - k2.getYear() ;
- }
- }
f.编写Reduce函数
注意:--reduce函数传入的数据:是已经经过分组和排序的了,所以,肯定是同一年份,并且温度从高到底排序
-- 尤其注意,reduce函数的参数key和values,看似key是单一,values是集合,但实质上,每个key都是和values的单个value对应的。当value发生改变,key就会变成相应的值。所以,key是会变的,虽然是同一个reduce函数,但是key是不同的
-- 对values进行遍历,相当于流指针的位移。会一个value一个value的向下移。如果不遍历迭代器values,那么数据永远是第一个
- /**
- * Reducer类
- */
- public class MyReducer extends Reducer<ComboKey, NullWritable,IntWritable,IntWritable> {
-
- @Override
- protected void reduce(ComboKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
-
- System.out.println("=======================> reduce");
- for (NullWritable v : values) {
- System.out.println(key.getYear() + "_" + key.getTemp());
- //return;
- break;
- }
- context.write(new IntWritable(key.getYear()),new IntWritable(key.getTemp()));
- System.out.println(key.getYear() + "_---_" + key.getTemp());
- }
- }
g.编写MyApp函数
- /**
- * AppMian
- */
- public class MyApp {
-
- public static void main(String [] args)
- {
- try {
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
- if(args.length > 1)
- {
- FileSystem.get(conf).delete(new Path(args[1]));
- }
- //设置job
- job.setJobName("SecondarySort");
- job.setJarByClass(MyApp.class);
-
- job.setInputFormatClass(TextInputFormat.class); //设定输入文件格式为seq
-
- FileInputFormat.addInputPath(job,new Path(args[0]));
- FileOutputFormat.setOutputPath(job,new Path(args[1]));
-
- job.setMapperClass(MyMapper.class);
- job.setReducerClass(MyReducer.class);
-
-
- job.setMapOutputKeyClass(ComboKey.class); //map输出key类型
- job.setMapOutputValueClass(NullWritable.class); //map输出value类型
-
- job.setOutputKeyClass(IntWritable.class); //Reduce输出key类型
- job.setOutputValueClass(IntWritable.class); //Reduce输出value类型
-
- //设定分区类
- job.setPartitionerClass(MyParttitioner.class);
- //设定分组类
- job.setGroupingComparatorClass(MyGroup.class);
- //设定自定义key的排序对比器
- job.setSortComparatorClass(ComboKeyComparator.class);
- //
- job.setNumReduceTasks(3);
- job.waitForCompletion(true);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
- }
五、整个MR作业流程函数执行顺序分析
----------------------------------------------------------------------
[数据准备阶段]
1.InputFormat -- 确定数据的输入格式
[数据输入阶段]
2.Reader -- 通过reader一条一条读取数据
[Map阶段]
3.map() -- 一条条数据传入map()函数,在map函数中进行映射处理
4.分区函数Partitioner -- 将map()函数处理好的数据,按照默认分区函数(hash)或者自定义分区函数(全排序,二次排序,自定义排序等)进行分区
5.聚合函数Combiner -- 将每个分区里面的数据,进行一次map端的聚合,减少网络间传输的数据量
[数据传输阶段]
6.混洗 shuffle -- 将key相同的分区送入同一个Reduced端
[Reduce阶段]
7.排序对比 SortCombiner -- 将传入的所有数据(很多个分区的数据),按照默认排序(hash)或者自定义排序函数(全排序,二次排序,自定义排序等)进行排序
8.分组对比 GroupCombiner -- 将拍好序的数据,进行分组。默认将key相同的分为一组或者自定义分组函数(全排序,二次排序,自定义排序等)进行分组,决定哪部分数据进行同一个reduce函数,进行Reduce化简
9.reduce(): 将分组对比器传入的一组组数据,按照传入顺序,开始执行reduce函数。对数据进行化简,并将结果写入Context
[数据输出阶段]
10. 将Context中的数据输出到存储设备,硬盘,HDFS等
六、解决数据倾斜
---------------------------------------------------------
1.Map端自定义分区函数
2.Map作业链条化
七、作业链条化Chain
---------------------------------------------------------
1.Job作业的一般形式 [Map + / Reduce Map *]
-- 1个或者多个Map组成一个ChainMapper
-- 1个Reduce 和 0个或者多个Map 组成一个ChainReducer
2.作业链条化的优势在于:大大减小IO的读写和传输
3.代码实现:实现单词统计,并且过滤掉敏感词汇,最后输出单词数量大于5的所有记录
- ***********************************MyApp函数**********************************
- /**
- * AppMian
- */
- public class MyApp {
-
- public static void main(String [] args)
- {
- try {
- Configuration conf = new Configuration();
-
- Job job = Job.getInstance(conf);
- if(args.length > 1)
- {
- FileSystem.get(conf).delete(new Path(args[1]));
- }
- //设置job
- job.setJobName("test chain");
- job.setJarByClass(MyApp.class);
-
-
- FileInputFormat.addInputPath(job,new Path(args[0]));
- FileOutputFormat.setOutputPath(job,new Path(args[1]));
-
- //设定ChainMapper
- ChainMapper.addMapper(job,MyMapMapper_1.class, LongWritable.class,Text.class,Text.class,IntWritable.class,job.getConfiguration());
- ChainMapper.addMapper(job,MyMapMapper_2.class, Text.class,IntWritable.class,Text.class,IntWritable.class,job.getConfiguration());
- //设定ChainReducer
- ChainReducer.setReducer(job,MyReducer.class,Text.class,IntWritable.class,Text.class,IntWritable.class,job.getConfiguration());
- ChainReducer.addMapper(job,MyReducer_Mapper_1.class,Text.class,IntWritable.class,Text.class,IntWritable.class,job.getConfiguration());
-
- //设定任务属性
- job.setNumReduceTasks(3);
- //
- job.waitForCompletion(true);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
- }
-
- ***********************************ChainMapper_map1**********************************
- /**
- * ChainMapper_map1
- */
- public class MyMapMapper_1 extends Mapper<LongWritable,Text ,Text,IntWritable> {
-
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-
- String v = value.toString();
- String [] strs = v.split(" ");
- for (String s : strs) {
- context.write(new Text(s), new IntWritable(1));
- }
- }
- }
-
- ***********************************ChainMapper_map2**********************************
- /**
- * ChainMapper_map1
- */
- public class MyMapMapper_2 extends Mapper<Text,IntWritable ,Text,IntWritable> {
-
- @Override
- protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
-
- String sp = "falungong";
-
- if(!key.toString().contains(sp))
- {
- context.write(key, value);
- }
- }
- }
-
- ***********************************MyReducer **********************************
- /**
- * Reducer类
- */
- public class MyReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
-
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
-
- int count = 0;
- Iterator<IntWritable> it = values.iterator();
- while(it.hasNext())
- {
- int i = it.next().get();
- count += i;
- }
- context.write(key,new IntWritable(count));
- }
- }
-
-
- ***********************************MyReducer_Mapper_1**********************************
- /**
- * Reducer类
- */
- public class MyReducer_Mapper_1 extends Mapper<Text, IntWritable,Text,IntWritable> {
-
- @Override
- protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
- //只留下数量大于5的单词
- if(value.get() > 5)
- {
- context.write(key,value);
- }
- }
- }
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。