赞
踩
MapRedece
的
Shuffle
过程中溢出前在分区内进行的是
快速排序
,该过程是
自动进行
的。无论
reduceTask
还是
mapTask
默认是
按字典顺序排序
,并且默认是对
key的字典顺序排序
。
Shuffle
相关可以参考:
MapReduce学习4:框架原理详解
部分排序
:MapReduce
根据输入记录的键对数据集排序。保证输出的每个文件内部有序
全排序
:最终输出结果只有一个文件
,且文件内部有序。实现方式是只设置一个reduceTask
。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce
所提供的并行架构
辅助排序
:(GroupingComparator
分组):在Reduce端
对key
进行分组。应用于在接收的key为bean
对象时,想让一个或几个字段相同
〈全部字段比较不相同)的key
进入到同一个reduce
方法时,可以采用分组排序
二次排序
:在自定义排序过程中
,如果compareTo
中的判断条件为两个即为二次排序
Java中
比较相关的类和接口:
comparable
: 比较接口,通过compareTo方法
定义比较规则comparator
: 比较器,通过compare
方法定义比较规则hadoop
比较使用的类和接口:
WritableComparable
:支持序列化和比较的接口WritableComparator
: 比较器hadoop
实现排序实际就是实现比较,实际是调用的比较器处理:包括自定义比较器
以及默认比较器
。但是无论哪种比较器,发生比较的对象的类都需要实现WritableComparable接口
,并且只有key
才会发生比较而value
不会。例如自定义wordcount官方实例中。在Map阶段
输出到环形缓冲区的时候,在达到一定阈值的时候将要将要输出到磁盘之前会在每个分区内进行快速排序,在实例中,map
输出的key
是单词,而value
固定是1
,表示1个单词,那么分组排只能是key
也就是单词进行排序,并且默认是按字典顺序
,也就是对于两个单词ab
和ba
,ab
会排在前边。key
的类型是Text
,那么Text.class
就需要实现WritableComparable接口
。接下来详细解析
对于上述的环形缓冲区等概念涉及Shuffle原理,可以参考这里:MapReduce学习4:框架原理详解
从源码中可以看到WritableComparable接口
如下,其继承了Writable
以及Comparable
,Writable
是用来实现hadoop序列化
的接口,Writable
为Comparable
处理序列化问题,所以在hadoop
中进行比较也是需要序列化的,因为继承的两个接口可以说是绑定的。关于hadoop序列化
以及Writable接口
,可以参考这里:MapReduce学习3:序列化
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
我们一般运行实例并且map
输出的key
的类型使用hadoop
基本数据类型,输出的数据都是一个局部有序
的状态,这是因为基本数据类型都自定义了比较器用以排序比较
如下为Text.class
源码片段,可以看到其比较器:
自定义比较器都是统一继承的WritableComparator类
,如上是Text
自定义比较器:Comparator类
那么既然有了比较器怎么用它进行比较排序?他是通过将该类的对应的比较器进行注册
,然后使后续逻辑能通过该类找到他所对应的比较器。注册相关是在Text类
中的静态代码块中,通过实现的WritableComparator
的define方法
进行注册该比较类
而define方法
中,主要是将对应的类作为key
还有Comparator对象
放到一个线程安全的HashMap
:comparators
中,也就是这里的注册即使添加一个k-v
关系,后续逻辑从该HashMap
中获取Text.class
对应的比较器
上述举例了Text类
中自定义的比较器,自定义了比较器后,就需要在后续逻辑进行比较执之前进行获取比较器进行比较。如下是在数据即将序列化输出到磁盘之前进行排序时调用的获取排序器的方法getOutputKeyComparator
获取比较器。该源码片段可以在MapTask.class
找到
通过getOuputKeyComparator方法
最终获取比较器,然后进行配置比较器,等待排序时机就会进行调用该比较器进行排序比较。如下是该方法getOuputKeyComparator方法
的逻辑getClass方法
中,首先是首先是获取配置文件中mapreduce.job.output.key.comparator.class
定义的比较器,该配置项可以在mapred-site.xml
中进行配置。默认配置文件(hadoop 3.1.2
)没有相关配置
public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
如果配置文件配置了相关的比较器的类,那么就会通过getClass
进行返回,如果没有就是null
getMapOutputKeyClass
进行获取输出的键的类型(OutputKeyClass
可以通过job对象
进行配置)。addSubClass方法
是getMapOutputKeyClass方法
获取的类强制转换成WritableComparable.class
,这样做是保证获取的类是WritableComparable.class
的子类,因为Shuffle过程
中的涉及排序的key
的类型都是统一实现的WritableComparable接口的
,例如默认的Text.class
都是实现了WritableComparable接口的
。这么做的一个直接的原因是:上述获取比较器然后进行后续的比较步骤,但是实际进行比较的时候实际调用的是比较器的compare方法,而对于自定义比较器,compare方法
的参数类型都是重写的WritableComparator类
的compare方法
,而WritableComparator类
的compare方法
的参数类型是WritableComparable
在没有在配置文件的配置项中配置,那么就进行最后一步逻辑,首先是调用getMapOutputKeyClass方法
getMapOutputKeyClass方法
首先也是获取配置文件上mapreduce.map.output.key.class
是否相应的值,如果存在,那么返回配置的类
如果不存在就获取配置的输出键的类
,该类可以在自定义的Driver文件
中配置Map阶段
输出的key
的类型
job.setMapOutputKeyClass(CompareBean.class);
public static final String MAP_OUTPUT_KEY_CLASS = "mapreduce.map.output.key.class";
最终结果是返回设置的类或者客户端设置的Map阶段
的输出key
的类并被WritableComparator.get
方法接受
get方法
就用到了上述线程安全的HashMap
,如果通过该HashMap
可以获取到类对应的比较器那么正常返回,例如hadoop基本类型
中Text
就自定义了比较器,那么就会第一段逻辑中通过comparatos
获取到该对应关系并最终拿到Text
对应的比较器
若获取的比较器不存在,首先是进行类的强制初始化(强制类加载
),执行forceInit方法
,因为类有可能没有加载进来或者在内存中已经被销毁了。强制初始化类就会重新加载类的静态方法进行加载比较器以及注册比较器
如果强制初始化也没用,那么就新建一个比较器对象(WritableComparator
)
comparator = new WritableComparator(c, conf, true);
其中第一个参数就是发生比较的数据的类,第二个参数是配置对象,而对于第三个参数,如果是自定义的比较器,那么就需要传true
,具体可以从源码看到
如上述红色箭头,当传入为true
的时候,才会新建key1
和key2
,新建的两个对象就是要比较的对象,从newKey()方法
可以看到,他是通过反射的方式创建keyClass
,也就是传入的要比较的数据的类。最后新建的两个对象发生比较排序,如果不是true
,就没有新建两个对象,也就不会存在比较排序了
上述对排序比较有了一个大致的认识,接下来是介绍自定义比较规则,为什么是自定义比较规则呢?因为实现排序比较的时候,可以自定义比较器,也可以使用默认比较器
排序只针对key
,对于一个传入的key
,他要实现比较,可能有两种情况为:
key
有单独设置了比较器类,也就是上述中在配置文件直接配置了mapreduce.job.output.key.comparator.class
的值或者通过在driver文件
中通过job对象
配置比较器类,这种情况就可以直接使用该属性对应的比较器类key
没有设置比较器类,那么hadoop
就会自行创建一个比较器对象。比较规则是可以自定义的,正常自定义都是没有设置比较器类的,也就是不会像Text.class
这种在comparators
这个线程安全的HashMap
中进行注册比较器,所以会按照逻辑,对于自定义比较规则的就会按上述源码,就会走到如下的位置。那么就会创建一个默认的比较器那么如何实现自定义的排序规则呢?上文是最终获取了比较器,这个比较器可以是自定义的,也可以是默认的,最终进行过比较的事实就是比较器中调用compare方法。所以实现自定义的比较,就有两种方案
重写compare方法
key
对应的类型的类(例如key
的类型是Text
等),那么对应的类只需要继承WritableComparator接口
并实现compareTo方法
即可可能在这里比较模糊,接下来的实例会更形象地进行介绍
1、需求:对于每个手机号产生一定的上行和下行流量,统计每个手机的总流量(上行流量+下行流量)并按总流量降序排序
输出,输出格式如下:
手机号 上行流量 下行流量 总流量
2、输入数据
#ID 手机号 IP 上行流量 下行流量 网络状态码
1 13660436666 120.196.101.99 1126 954 200
2 13760436667 120.196.102.99 1136 954 200
3 13860436668 120.196.103.99 1146 954 200
4 13960436669 120.196.104.99 1156 954 200
5 13660436676 120.196.105.99 1166 954 200
6 13760436686 120.196.106.99 1176 954 200
7 13860436666 120.196.107.99 1126 954 200
8 13960436667 120.196.108.99 1136 954 200
9 13660436668 120.196.109.99 1146 954 200
3、分析:对于上述数据中,因为需要输出上行流量和下行流量以及总流量,那么可以创建一个bean对象
去装载这些数据。因为需要按总流量降序进行排序,但是hadoop
的排序规则只是针对输出的key
处理的,所以在Map输出
后将bean对象
作为key
并定义compare方法
按总流量排序
3、CompareBean.class
:用以存储上下行流量以及总流量的bean对象
package com.compare.maven; import com.writable.maven.FlowBean; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /* 实现WritableComparable接口,对于情况1就是实现自己的比较器,并且重写compare方法 而被重写的compare方法的参数是WritableComparable参数,而本次实例中就需要将CompareBean 作为比较的参数,所以需要实现WritableComparable接口 */ public class CompareBean implements WritableComparable<CompareBean> { private Long downFlow; private Long upFlow; private Long sumFlow; public CompareBean(){ } public Long getDownFlow() { return downFlow; } public void setDownFlow(Long downFlow) { this.downFlow = downFlow; } public Long getUpFlow() { return upFlow; } public void setUpFlow(Long upFlow) { this.upFlow = upFlow; } public Long getSumFlow() { return sumFlow; } public void setSumFlow(Long sumFlow) { this.sumFlow = sumFlow; } public void setSumFlow(){ this.sumFlow = this.upFlow + this.downFlow; } @Override public void write(DataOutput out) throws IOException { out.writeLong(this.upFlow); out.writeLong(this.downFlow); out.writeLong(this.sumFlow); } @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } @Override public String toString() { return downFlow + "\t" + upFlow +"\t"+ sumFlow ; } /* WritableComparable实现方式,这里不需要管,默认定义即可 因为通过WritableComparable方式重写compare方法,而底层进行比较是 直接调用compare方法,所以没有经过该方法 */ @Override public int compareTo(CompareBean o) { return 0; } }
4、CompareMapper.class
package com.compare.maven; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class CompareMapper extends Mapper<LongWritable, Text, CompareBean, Text> { private Text outK = new Text(); private CompareBean cb = new CompareBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // super.map(key, value, context); String line = value.toString(); String[] words = line.split("\\W+"); String phoneNum = words[1]; outK.set(phoneNum); int len = words.length; cb.setUpFlow(Long.parseLong(words[len-3])); cb.setDownFlow(Long.parseLong(words[len-2])); cb.setSumFlow(); /* 因为需要对总流量进行排序,所以总流量必须作为key,因为仅对key进行排序 */ context.write(cb,outK); } }
5、CompareReducer.class
package com.compare.maven; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class CompareReducer extends Reducer<CompareBean, Text, Text, CompareBean> { @Override protected void reduce(CompareBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // super.reduce(key, values, context); for(Text value: values){ context.write(value, key); } } }
6、CompareDriver.class
package com.compare.maven; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class CompareDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(CompareDriver.class); job.setMapperClass(CompareMapper.class); job.setReducerClass(CompareReducer.class); job.setMapOutputKeyClass(CompareBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CompareBean.class); FileInputFormat.setInputPaths(job, new Path("E:\\bigdata\\study\\test_files\\compareinput")); FileOutputFormat.setOutputPath(job, new Path("E:\\bigdata\\study\\test_files\\compareoutput")); // 设置比较器类 job.setSortComparatorClass(CompareWritableComparator.class); job.waitForCompletion(true); } }
通过job.setSortComparatorClass(CompareWritableComparator.class)
,实际是设置了配置配置项mapreduce.job.output.key.comparator.class
,使得他的值为CompareWritableComparator.class
,那么接下来的逻辑就是直接获取该配置项的值,然后通过反射创建比较器实例并返回,在进行比较的时候就会调用我们的compare方法
7、CompareWritableComparator.class
:继承WritableComparator
,实现自定义的比较器
package com.compare.maven; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class CompareWritableComparator extends WritableComparator { public CompareWritableComparator(){ super(CompareBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { CompareBean aBean = (CompareBean)a; CompareBean bBean = (CompareBean)b; /* 这里直接调用Long方法的compareTo方法 aBean.getSumFlow().compareTo(bBean.getSumFlow())表示: aBean.sumFlow - bBean.sumFlow,正值返回1,相等返回0,负值返回-1 aBean.getSumFlow().compareTo(bBean.getSumFlow())是升序,前边加 负号是降序 */ return -aBean.getSumFlow().compareTo(bBean.getSumFlow()); } }
上述是自定义比较器,在排序阶段会调用自定义的排序器的compare方法实现排序
对于本次案例实现,要首先取消自定义比较器的设置
package com.compare.maven; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class CompareDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(CompareDriver.class); job.setMapperClass(CompareMapper.class); job.setReducerClass(CompareReducer.class); job.setMapOutputKeyClass(CompareBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CompareBean.class); FileInputFormat.setInputPaths(job, new Path("E:\\bigdata\\study\\test_files\\compareinput")); FileOutputFormat.setOutputPath(job, new Path("E:\\bigdata\\study\\test_files\\compareoutput")); // 取消配置job.setSortComparatorClass(CompareWritableComparator.class); job.waitForCompletion(true); } }
并且重写CompareBean.class
方法中的comapreTo方法
package com.compare.maven; import com.writable.maven.FlowBean; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class CompareBean implements WritableComparable<CompareBean> { private Long downFlow; private Long upFlow; private Long sumFlow; public CompareBean(){ } public Long getDownFlow() { return downFlow; } public void setDownFlow(Long downFlow) { this.downFlow = downFlow; } public Long getUpFlow() { return upFlow; } public void setUpFlow(Long upFlow) { this.upFlow = upFlow; } public Long getSumFlow() { return sumFlow; } public void setSumFlow(Long sumFlow) { this.sumFlow = sumFlow; } public void setSumFlow(){ this.sumFlow = this.upFlow + this.downFlow; } @Override public void write(DataOutput out) throws IOException { out.writeLong(this.upFlow); out.writeLong(this.downFlow); out.writeLong(this.sumFlow); } @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } @Override public String toString() { return downFlow + "\t" + upFlow +"\t"+ sumFlow ; } @Override public int compareTo(CompareBean o) { return -this.sumFlow.compareTo(o.sumFlow); } }
为什么可以直接设置compareTo方法
呢?结合上述源码分析,在没有配置比较器类的时候,那么就会走get方法
对于基本数据类型会进行注册相关的比较器,但是我们这里是没有注册的步骤的,在没有设置Comparator
的时候也没有注册就会走compare为null
的逻辑,最终就会new
一个WritableComparator
,也就是创建一个默认的比较器
而最终进行比较是调用默认比较器的compare方法
,如下可以看到默认的compare方法
实际是调用输出类的compareTo方法
,上述排序的时候输出的类是compareBean类,
所以就调用他的compareTo方法
进行排序比较
1、需求:在4需求
的基础上将不同手机分区为不同的省份并且在不同的分区内按总流量降序排序
2、分析:添加分区器并配置分区器,只要配置了比较器,就会自动进行区内排序
3、分区器设置
PhonePatitioner.class
package com.compare.maven; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class PhonePatitioner extends Partitioner<CompareBean, Text> { @Override public int getPartition(CompareBean compareBean, Text text, int numPartitions) { String preNum = text.toString().substring(0, 3); int partition=4; if("136".equals(preNum)){ partition=0; }else if("137".equals(preNum)){ partition=1; }else if("138".equals(preNum)){ partition=2; }else if("139".equals(preNum)){ partition=3; } return partition; } } }
分区相关原理和详细实例可以看这里:MapReduce学习4:框架原理详解
4、在CompareDriver.class
中配置分区器并设置reduceTask
的数量
job.setPartitionerClass(PhonePatitioner.class);
job.setNumReduceTasks(4);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。