当前位置:   article > 正文

MR的其他知识 —— Hadoop权威指南12_mr作业

mr作业

1. 关于任务的一些知识

1.1 作业/任务的属性
  • 《Hadoop权威指南》中,介绍了一些任务执行环境的属性。通过学习,我更倾向于这是作业或任务的属性
属性名属性类型描述
mapreduce.job.idString作业ID
mapreduce.task.idString任务ID
mapreduce.task.attempt.idString任务尝试ID
mapreduce.task.partitionint作业中的任务索引,该属性不是很理解
mapreduce.task.ismapboolean是否为map任务
1.2 任务推测执行
  • 一个MR作业,会划分成若干个可以并行执行的map任务、reduce任务
  • 整个作业运行时间,取决于map任务或reduce任务中,运行最慢的那个任务 —— 木桶效应
  • 因此,想要提高任务的执行效率,就应该尽量避免存在执行缓慢的任务拖后腿
  • 任务执行缓慢的原因:硬件老化、软件配置错误
  • 检测任务执行缓慢很容易,但是想要知道任务执行缓慢的原因、甚至修复是非常难的

任务的推测执行

  • 在任务运行过程中,调度器会跟踪任务,为明显低于平均水平的少量任务创建一个备份任务
  • 两个任务同时运行,原始任务先运行结束,则kill备份任务;备份任务先运行结束,则kill原始任务
  • 这样的设计,叫做任务的推测执行,创建备份任务又称推测任务

不适合使用推测执行的情况

  • 如果任务因为软件缺陷运行缓慢或挂起,则推测任务也可能会存在相同的问题

    解决办法: 修复软件缺陷,使其不会因为软件缺陷运行缓慢或挂起

  • 幂等任务,每次执行的结果不一致,使用推测执行无法得到与原始任务相同的结果

  • 一般对map任务开启推测执行,reduce任务关闭推测执行。

    原因: reduce任务会复制map任务的输出,使得宝贵的网络带宽被占用,反而会降低任务运行速度

推测执行的目的

  • 以牺牲集群的整体吞吐量为代价,通过冗余任务来减少作业的执行时间
1.3 MR的提交协议
  • 《Hadoop权威指南》说,MR通过提交协议,保证作业或任务都完全成功或失败
  • 通过学习,书中所讲的提交协议,让我更多体会到了,任务在不同阶段,MR会做些什么事情。与分布式或数据中的一致性协议,好像没啥关系

作业的提交协议

  1. setupJob(): 提交作业,会为作业创建临时工作空间, _temporary,是整个输出目录的子目录

  2. commitJob(): 作业执行成功,删除临时工作空间,在输出目录中创建隐藏的 _SUCCESS标志文件。

    _SUCCESS标志文件杨宏宇告知client,作业执行成功

  3. abortJob(): 作业执行失败或终止,删除作业的临时空间(默认实现)

任务的提交协议

  1. setupTask(): 提交任务,不做任何事情(默认实现)
  2. needsTask(): 任务提交阶段是可选的,如果该方法返回false,表示关闭提交阶段

    不必为任务运行分布提交协议,即commitTask()abortTask()都无需执行

  3. commitTask(): 任务执行成功,调用该方法将临时输出目录中的内容移动到输出目录。
  4. abortTask(): 任务执行不成功,调用该方法删除其临时输出目录
    1. 如果任务因为失败而重试,则重试成功的任务调用commitTask();之前失败的任务,调用abortTask()
    2. 如果因为推测执行存在多个任务,则commitTask()将处理先执行成功的任务,abortTask()处理执行较慢的任务

2. MR的输入输出类型

2.1 默认的MR作业
  • 如果不考虑combine,MR的流程为:
    map → \rightarrow partition(shuffle中的一步) → \rightarrow reduce

  • map、reduce函数KEYINVALUEINKEYOUTVALUEOUT,分别对应不同的Hadoop序列化数据类型

    • map: (K1, V1) → \rightarrow list(K2, V2),将输入进行处理后,转化为key-value集合
    • partition: (K2, V2) + partitionNums → \rightarrow int,对每条记录的key进行hash操作,以决定其属于哪个分区
    • reduce: (K2, list(V2) ) → \rightarrow list(K3, V3),将分区、合并后的map输出进行汇总,输出按key排序的、key-value集合
  • 不难看出:map的输入与输入文件有关,map的输出与reduce的输入保持一致,reduce的输出由程序决定

  • 如果考虑combine,MR的流程为:
    map → \rightarrow partition(shuffle中的一步) → \rightarrow combine → \rightarrow reduce

  • 输入输出的key、value类型关系如下

    • map: (K1, V1) → \rightarrow list(K2, V2),将输入进行处理后,转化为key-value集合
    • partition: (K2, V2) + partitionNums → \rightarrow int,对每条记录的key进行hash操作,以决定其属于哪个分区
    • combine:(K2, list(V2) ) → \rightarrow list(K2, V2),对map输出进行combine,较少传输给reduce的数据量
    • reduce: (K2, list(V2) ) → \rightarrow list(K3, V3),将分区、合并后的map输出进行汇总,输出按key排序的、key-value集合
  • MR作业需要设置map、reduce函数、设置输入输出的数据格式等,其默认值如下

    public class Main {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Job job = new Job();
            job.setJarByClass(Main.class);
            job.setJobName("default config");
    
            FileInputFormat.addInputPath(job, new Path("/input/1"));
            FileOutputFormat.setOutputPath(job, new Path("/output/1"));
    
            // 默认的输入格式,对应的key:LongWritable, value:text
            job.setInputFormatClass(TextInputFormat.class);
            // 默认的map函数,直接将输入的key-value,写到context中
            job.setMapperClass(Mapper.class);
            // 设置map输出的key、value类型
            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(Text.class);
            
            // 设置partition
            job.setPartitionerClass(HashPartitioner.class);
           
            // 设置reduce任务数
            job.setNumReduceTasks(1);
            // 默认的reduce函数,直接将输入的key-value,写到context中
            job.setReducerClass(Reducer.class);
            // 设置输出的key、value类型
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(Text.class);
            // 设置输出格式,key、value之间通过制表符分隔
            job.setOutputFormatClass(TextOutputFormat.class);
            
            System.exit(job.waitForCompletion(true) ? 0:2);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
  1. 默认的map函数如下:

    protected void map(KEYIN key, VALUEIN value, 
                       Context context) throws IOException, InterruptedException {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
    
    • 1
    • 2
    • 3
    • 4
  2. 默认的reduce函数如下:

    protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                          ) throws IOException, InterruptedException {
      for(VALUEIN value: values) {
        context.write((KEYOUT) key, (VALUEOUT) value);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  3. 注意: 如果,map输出的key、value类型与reduce输出一致,则可以省略对map输出数据类型的定义,即省略以下代码:

    // 设置map输出的key、value类型
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(Text.class);
    
    • 1
    • 2
    • 3
  4. 注意: TextInputFormat对应的key为LongWritable类型,表示每行记录首部在文件中的offset,并非每行记录的行号

2.2 默认的streaming作业
  • streaming作业,自己虽然从未接触过,但是《Hadoop编程指南》一书,有很多地方介绍了基于stdinstdout的streaming作业
  • streaming作业,可以设置map输入输出、reduce输入输出的分隔符,用于获取/创建对应的key、value对
    在这里插入图片描述

streaming的分隔符

  1. 输入、输出的分隔符都以\t作为默认值,还可以自定义分隔符
  2. 输出时,还可以将多个字段组合成一个key,例如,reduce输出为a,b,c,组合字段数为2,则a,b为key,c为value
    在这里插入图片描述

3. 总结

任务推测执行

  1. 为了避免运行缓慢的任务拖后腿,调度器会跟踪任务运行速度,为明显低于平均水平的任务创建推测副本
  2. 任务推测执行的目的:以牺牲集群的整体吞吐量为代价,通过冗余任务来提高作业的运行效率
  3. 不适合推测执行的三种情况:软件缺陷、非幂等任务、reduce任务

MR的提交协议
4. 自己的理解:作业或任务在执行的不同阶段,系统所需完成的操作
5. 书上的介绍:提交协议可以保证作业或任务,完全失败或成功
6. 作业的提交协议:
- setupJob(),创建临时工作目录;
- commitJob(),作业成功,删除临时工作空间、创建_SUCCESS标志文件;
- abortJob(),作业执行失败或被终止,删除临时工作空间(默认实现)
7. 任务的提交协议:
- setupTask(),不做任何操作(默认实现);
- needsTask(), 如果将其设置为false,关闭提交阶段,不会执行commitTask()abortTask()
- commitTask(),任务执行成功,将临时输出目录中的内容移动到输出目录
- abortTask(),任务执行不成功,删除临时输出目录
- 如果存在推测执行,先执行成功的任务调用commitTask(),否则调用abortTask();如果失败重试,重试成功的任务调用commitTask(),之前失败的任务调用 abortTask()

MR的默认设置

  1. 输入、输出格式的默认设置:TextInputFormatTextOutputFormat
  2. map和reduce类的设置:MapperReducer,partition类的设置:HashPartitioner
  3. map的输出key-value类型设置:使用默认mapper,将与输入的key-value类型一致
  4. reduce的输出key-value类型设置:使用默认reducer,将与输入的key-value类型一致
  5. 设置reduce任务数:setNumReduceTasks(1)

streaming作业的默认设置

  1. map和reduce任务,输入输出的默认分隔符为\t,可以通过属性自定义分隔符
  2. 可以将字段组合,作为key
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/777806
推荐阅读
相关标签
  

闽ICP备14008679号