赞
踩
MapReduce是由两部分构成,即Map和Reduce分别对应的实现类时Mapper和Reducer。官网 中这么介绍的
Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.
简单来说就是MapReduce是一个用于在大型集群中可以对TB规模数据处理的计算框架,并且编程起来很easy。之所以简单是因为MapReduce有着固定的“套路”。
MapReduce在编程时有三个必不可少的组成:Mapper,Reducer和Driver。分别负责对数据的分区映射,排序合并以及驱动程序。
写MR程序时,需要自己自定义一个Mapper类
并且继承org.apache.hadoop.mapreduce 的 Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>,同时还得有map函数
:
protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
context.write(key, value);
}
有了Mapper之后还得有Reducer将数据聚合处理,自定义Reducer类
继承org.apache.hadoop.mapreduce 的 Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>,同时还得有reduce函数
:
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
Iterator i$ = values.iterator();
while(i$.hasNext()) {
VALUEIN value = i$.next();
context.write(key, value);
}
}
作为MapReduce的驱动程序,需要多个步骤,下面介绍一般程序都有的步骤。
创建环境Configuration
如果有需要还可以设置相应的环境变量,通过应用变量添加新的变量或者覆盖原先配置的xml文件中的变量,例如可以通过设置configuration.setBoolean(Job.MAP_OUTPUT_COMPRESS,true)让map输出进行压缩,可以减少磁盘空间,减少网络传输。
创建作业Job
,Job通过静态方法getInstance()得到Job实例,该方法根据不同的参数有不同的实现,例如:
public static Job getInstance(Configuration conf) throws IOException {
JobConf jobConf = new JobConf(conf);
return new Job(jobConf);
}
设置作业的驱动类
public void setJarByClass(Class<?> cls) {
this.ensureState(Job.JobState.DEFINE);
this.conf.setJarByClass(cls);
}
设置作业的Mapper类和Reducer类
//设置Mapper
public void setMapperClass(Class<? extends Mapper> cls) throws IllegalStateException {
this.ensureState(Job.JobState.DEFINE);
this.conf.setClass("mapreduce.job.map.class", cls, Mapper.class);
}
//设置Reducer
public void setReducerClass(Class<? extends Reducer> cls) throws IllegalStateException {
this.ensureState(Job.JobState.DEFINE);
this.conf.setClass("mapreduce.job.reduce.class", cls, Reducer.class);
}
设置Mapper的输出键-值对
//键
public void setMapOutputKeyClass(Class<?> theClass) throws IllegalStateException {
this.ensureState(Job.JobState.DEFINE);
this.conf.setMapOutputKeyClass(theClass);
}
//值
public void setMapOutputValueClass(Class<?> theClass) throws IllegalStateException {
this.ensureState(Job.JobState.DEFINE);
this.conf.setMapOutputValueClass(theClass);
}
设置Reducer的输出键-值对
public void setOutputKeyClass(Class<?> theClass) throws IllegalStateException {
this.ensureState(Job.JobState.DEFINE);
this.conf.setOutputKeyClass(theClass);
}
public void setOutputValueClass(Class<?> theClass) throws IllegalStateException {
this.ensureState(Job.JobState.DEFINE);
this.conf.setOutputValueClass(theClass);
}
设置输入输出路径
输入输出路径分别由FileInputFormat
和FileOutputFormat
的静态方法setInputPaths()
与setOutputPath()
public static vo
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。