赞
踩
属性名 | 属性类型 | 描述 |
---|---|---|
mapreduce.job.id | String | 作业ID |
mapreduce.task.id | String | 任务ID |
mapreduce.task.attempt.id | String | 任务尝试ID |
mapreduce.task.partition | int | 作业中的任务索引,该属性不是很理解 |
mapreduce.task.ismap | boolean | 是否为map任务 |
任务的推测执行
不适合使用推测执行的情况
如果任务因为软件缺陷运行缓慢或挂起,则推测任务也可能会存在相同的问题
解决办法: 修复软件缺陷,使其不会因为软件缺陷运行缓慢或挂起
非幂等任务,每次执行的结果不一致,使用推测执行无法得到与原始任务相同的结果
一般对map任务开启推测执行,reduce任务关闭推测执行。
原因: reduce任务会复制map任务的输出,使得宝贵的网络带宽被占用,反而会降低任务运行速度
推测执行的目的
作业的提交协议
setupJob(): 提交作业,会为作业创建临时工作空间, _temporary
,是整个输出目录的子目录
commitJob(): 作业执行成功,删除临时工作空间,在输出目录中创建隐藏的 _SUCCESS
标志文件。
_SUCCESS
标志文件杨宏宇告知client,作业执行成功
abortJob(): 作业执行失败或终止,删除作业的临时空间(默认实现)
任务的提交协议
false
,表示关闭提交阶段
不必为任务运行分布提交协议,即
commitTask()
或abortTask()
都无需执行
- 如果任务因为失败而重试,则重试成功的任务调用
commitTask()
;之前失败的任务,调用abortTask()
- 如果因为推测执行存在多个任务,则
commitTask()
将处理先执行成功的任务,abortTask()
处理执行较慢的任务
如果不考虑combine,MR的流程为:
map
→
\rightarrow
→ partition(shuffle中的一步)
→
\rightarrow
→ reduce
map、reduce函数有 KEYIN
、VALUEIN
、KEYOUT
、VALUEOUT
,分别对应不同的Hadoop序列化数据类型
- map: (K1, V1) → \rightarrow → list(K2, V2),将输入进行处理后,转化为key-value集合
- partition: (K2, V2) + partitionNums → \rightarrow → int,对每条记录的key进行hash操作,以决定其属于哪个分区
- reduce: (K2, list(V2) ) → \rightarrow → list(K3, V3),将分区、合并后的map输出进行汇总,输出按key排序的、key-value集合
不难看出:map的输入与输入文件有关,map的输出与reduce的输入保持一致,reduce的输出由程序决定
如果考虑combine,MR的流程为:
map
→
\rightarrow
→ partition(shuffle中的一步)
→
\rightarrow
→ combine
→
\rightarrow
→ reduce
输入输出的key、value类型关系如下
- map: (K1, V1) → \rightarrow → list(K2, V2),将输入进行处理后,转化为key-value集合
- partition: (K2, V2) + partitionNums → \rightarrow → int,对每条记录的key进行hash操作,以决定其属于哪个分区
- combine:(K2, list(V2) ) → \rightarrow → list(K2, V2),对map输出进行combine,较少传输给reduce的数据量
- reduce: (K2, list(V2) ) → \rightarrow → list(K3, V3),将分区、合并后的map输出进行汇总,输出按key排序的、key-value集合
MR作业需要设置map、reduce函数、设置输入输出的数据格式等,其默认值如下
public class Main { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = new Job(); job.setJarByClass(Main.class); job.setJobName("default config"); FileInputFormat.addInputPath(job, new Path("/input/1")); FileOutputFormat.setOutputPath(job, new Path("/output/1")); // 默认的输入格式,对应的key:LongWritable, value:text job.setInputFormatClass(TextInputFormat.class); // 默认的map函数,直接将输入的key-value,写到context中 job.setMapperClass(Mapper.class); // 设置map输出的key、value类型 job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); // 设置partition job.setPartitionerClass(HashPartitioner.class); // 设置reduce任务数 job.setNumReduceTasks(1); // 默认的reduce函数,直接将输入的key-value,写到context中 job.setReducerClass(Reducer.class); // 设置输出的key、value类型 job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); // 设置输出格式,key、value之间通过制表符分隔 job.setOutputFormatClass(TextOutputFormat.class); System.exit(job.waitForCompletion(true) ? 0:2); } }
默认的map函数如下:
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
默认的reduce函数如下:
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
注意: 如果,map输出的key、value类型与reduce输出一致,则可以省略对map输出数据类型的定义,即省略以下代码:
// 设置map输出的key、value类型
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
注意: TextInputFormat
对应的key为LongWritable
类型,表示每行记录首部在文件中的offset,并非每行记录的行号
stdin
和stdout
的streaming作业streaming的分隔符
\t
作为默认值,还可以自定义分隔符a,b,c
,组合字段数为2,则a,b
为key,c
为value任务推测执行
MR的提交协议
4. 自己的理解:作业或任务在执行的不同阶段,系统所需完成的操作
5. 书上的介绍:提交协议可以保证作业或任务,完全失败或成功
6. 作业的提交协议:
- setupJob()
,创建临时工作目录;
- commitJob()
,作业成功,删除临时工作空间、创建_SUCCESS
标志文件;
- abortJob()
,作业执行失败或被终止,删除临时工作空间(默认实现)
7. 任务的提交协议:
- setupTask()
,不做任何操作(默认实现);
- needsTask()
, 如果将其设置为false
,关闭提交阶段,不会执行commitTask()
或abortTask()
;
- commitTask()
,任务执行成功,将临时输出目录中的内容移动到输出目录
- abortTask()
,任务执行不成功,删除临时输出目录
- 如果存在推测执行,先执行成功的任务调用commitTask()
,否则调用abortTask()
;如果失败重试,重试成功的任务调用commitTask()
,之前失败的任务调用 abortTask()
MR的默认设置
TextInputFormat
、TextOutputFormat
Mapper
、Reducer
,partition类的设置:HashPartitioner
setNumReduceTasks(1)
streaming作业的默认设置
\t
,可以通过属性自定义分隔符Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。