当前位置:   article > 正文

Hadoop复习(五) --- Map多输入格式,全排序,二次排序,MR作业流程,作业链条化Chain_filesystem.get(conf).delete(new path(args[1]), tru

filesystem.get(conf).delete(new path(args[1]), true);

一、Map多输入格式问题
-------------------------------------------------
    1.设置输入格式为:TextInputFormat.class,SequenceFileInputFormat.class
        MultipleInputs.addInputPath(job,new Path(args[0]), TextInputFormat.class,MyMapper_Text.class);
        MultipleInputs.addInputPath(job,new Path(args[1]), SequenceFileInputFormat.class,MyMapper_Sequence.class);

    2.设定对应的Map处理类MyMapper_Text.class,MyMapper_Sequence.class
        需要特别注意的是Text输入格式的key是LongWriable类型,Sequence输入格式的key则是自身的key类型。对应的map类要做好输入类型的对应。


二、全排序
---------------------------------------------------
    0.对MR结果的全排序,所有的reduce输出,合在一起仍然是有序的,整体有序

    1.只有1个reduce的时候,就是全排序,但是会产生数据倾斜,仅限数据量很少的情况

    2.通过自定义分区函数,控制map函数的分区
        a.自行设置分界区间,但是此方法人为因素太严重,可能导致数据倾斜

        b.通过hadoop采样机制,进行分区界限限定。
            1.通过采样器生成分区文件
            2.hadoop TotalOrderPartitioner通过分区文件,进行分区划分

        c.代码实现
            0.设定分区文件保存路径,默认为工程路径下
                TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path("d://mr/1.lst"));

            1.创建随机采样器
                //最常用 -- 随机采样器
                //freq: 0.1 -- 采样率,每个key被采样的概率
                //numSamples: 10000-- 被采样的样本总数
                //maxSplitsSampled:10 -- 采样分区数。采样区间被分成10个
                InputSampler.Sampler<IntWritable, IntWritable> sampler =
                                new InputSampler.RandomSampler<IntWritable, IntWritable>(0.1, 10000, 100);

            2.设定分区器--全排序分区
                job.setPartitionerClass(TotalOrderPartitioner.class);

        d.分区器使用的注意事项
            1.分区器一定要定义在,设定reduce任务数量[job.setNumReduceTasks(3)], 之后

            2.InputFormat类的输入类型[job.setInputFormatClass]不能是TextInputFormat.class,
            因为如果使用的话,会默认使用偏移量作为key,这样的key对于分区器是毫无意义的。

            3.由于InputSampler.writePartitionFile 函数实现的原因,
            必须要求 map() 函数输入和输出 Key 的类型一致,否则会出现如下的异常:org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable

            4.设定分区文件保存路径的时候,Conf文件请使用,job.getConfiguration()获取,而不要使用前文的new 出来的conf.
            因为,当Job job = Job.Ins(conf)之后,conf就没用了。conf内部的数据已经拷贝到job中了。以后再使用conf也没用了。

            ------------------------【代码实现】-------------------------------          

  1.  **************1.MyApp*******************
  2.             public class MyApp {
  3.                 public static void main(String [] args)
  4.                 {
  5.                     try {
  6.                         Configuration conf = new Configuration();
  7.                         Job job = Job.getInstance(conf);
  8.                         if(args.length > 1)
  9.                         {
  10.                             FileSystem.get(conf).delete(new Path(args[1]));
  11.                         }
  12.                         //设置job
  13.                         job.setJobName("AllSort");
  14.                         job.setJarByClass(MyApp.class);
  15.                         FileInputFormat.addInputPath(job,new Path(args[0]));
  16.                         FileOutputFormat.setOutputPath(job,new Path(args[1]));
  17.                         job.setMapperClass(MyMapper.class);
  18.                         job.setReducerClass(MyReducer.class);
  19.                         job.setInputFormatClass(SequenceFileInputFormat.class); //设定输入文件格式为seq
  20.                         job.setMapOutputKeyClass(LongWritable.class);    //map输出key类型
  21.                         job.setMapOutputValueClass(LongWritable.class);  //map输出value类型
  22.                         job.setOutputKeyClass(IntWritable.class);      //Reduce输出key类型
  23.                         job.setOutputValueClass(LongWritable.class);    //Reduce输出value类型
  24.                         job.setNumReduceTasks(3);
  25.                         //设定分区文件保存路径
  26.                         TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path("d://mr/1.lst"));
  27.                         //设定特殊属性
  28.                         //创建随机采样器
  29.                         //最常用 -- 随机采样器
  30.                         InputSampler.Sampler sampler =
  31.                                 new InputSampler.RandomSampler (1, 6000, 3);
  32.                         //将分区信息写入到分区文件
  33.                         InputSampler.writePartitionFile(job,sampler);
  34.                         //设定分区器--全排序分区
  35.                         job.setPartitionerClass(TotalOrderPartitioner.class);
  36.                         //
  37.                         job.waitForCompletion(true);
  38.                     } catch (Exception e) {
  39.                         e.printStackTrace();
  40.                     }
  41.                 }
  42.             }
  43.             **************2.Mapper*******************
  44.             /**
  45.              * Mapper类
  46.              */
  47.             public class MyMapper extends Mapper<LongWritable,Text ,LongWritable,LongWritable> {
  48.                 @Override
  49.                 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  50.                     context.write(new LongWritable((long)key.get()),new LongWritable(Long.parseLong(value.toString())));
  51.                 }
  52.             }
  53.             **************3.MyReducer*******************
  54.             /**
  55.              * Reducer类
  56.              */
  57.             public class MyReducer extends Reducer<LongWritable, LongWritable,IntWritable,LongWritable> {
  58.                 @Override
  59.                 protected void reduce(LongWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
  60.                     int max = Integer.MIN_VALUE;
  61.                     Iterator<LongWritable> it = values.iterator();
  62.                     while(it.hasNext())
  63.                     {
  64.                         context.write(new IntWritable((int)key.get()), it.next());
  65.                     }
  66.                 }
  67.             }

 

三、倒排序
----------------------------------
    1.实质上就是将k 和  v进行对调

 

四、二次排序(Map端:Map函数的输出,进入Map端的分区类(决定进入哪个Reduce端),然后进入Reduce端的分组类(决定进入哪个Reduce函数),然后进入reduce函数,进行聚合处理)
---------------------------------------
    1.key在map-reduce的过程中默认是会被排序的,但是value不会。对value排序就是2次排序    2.实现步骤
        a.自定义key类 ComboKey,实现 WritableComparable接口,实现其方法
            -- 自定义类中的字段,year 和 tempuature,标准javabean
            -- 实现ComboKey类的串行和反串行的方法
            -- 实现ComboKey类的比较方法compareTo();
            -- 注意一点,空构造一定要有(如果自定义了带参构造),不然会报无法init的错误。
         

  1.    /**
  2.              * 自定义组合key
  3.              */
  4.             public class ComboKey implements WritableComparable<ComboKey> {
  5.                 private int year;
  6.                 private int temp;
  7.                 public ComboKey(int year, int temp) {
  8.                     this.year = year;
  9.                     this.temp = temp;
  10.                 }
  11.                 public ComboKey()
  12.                 {
  13.                 }
  14.                 public int getYear() {
  15.                     return year;
  16.                 }
  17.                 public void setYear(int year) {
  18.                     this.year = year;
  19.                 }
  20.                 public int getTemp() {
  21.                     return temp;
  22.                 }
  23.                 public void setTemp(int temp) {
  24.                     this.temp = temp;
  25.                 }
  26.                 /**
  27.                  * 自定义对比器
  28.                  * 目的 : 年份升序,温度降序
  29.                  */
  30.                 @Override
  31.                 public int compareTo(ComboKey o) {
  32.                     int y0 = o.getYear();
  33.                     int t0 = o.getTemp();
  34.                     if(this.year == y0)
  35.                     {
  36.                         return -(this.temp - t0);
  37.                     }
  38.                     else
  39.                     {
  40.                         return (this.year - y0);
  41.                     }
  42.                 }
  43.                 /**
  44.                  * 串行化
  45.                  */
  46.                 @Override
  47.                 public void write(DataOutput out) throws IOException {
  48.                     //写入年份
  49.                     out.writeInt(year);
  50.                     //写入温度
  51.                     out.writeInt(temp);
  52.                 }
  53.                 /**
  54.                  *反串行化
  55.                  */
  56.                 @Override
  57.                 public void readFields(DataInput in) throws IOException {
  58.                     this.year = in.readInt();
  59.                     this.temp = in.readInt();
  60.                 }
  61.             }

        b.Map端:编写Map函数
            

  1. /**
  2.              * Mapper类
  3.              */
  4.             public class MyMapper extends Mapper<LongWritable,Text ,ComboKey, NullWritable> {
  5.                 @Override
  6.                 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  7.                     String v = value.toString();
  8.                     if(v != null && v != "")
  9.                     {
  10.                         String [] strs = v.split(" ");
  11.                         int year = Integer.parseInt(strs[0]);
  12.                         int tem = Integer.parseInt(strs[1]);
  13.                         ComboKey com = new ComboKey(year,tem);
  14.                         com.setTemp(tem);
  15.                         com.setYear(year);
  16.                         context.write(com,NullWritable.get());
  17.                     }
  18.                 }
  19.             }

        c.Map端:自定义分区类MyParttitioner,继承Partitioner,重写getPartition方法, 将年份相同的Combokey分到一个区
            -- return year % numPartitions;
          

  1.  /**
  2.              * 自定义分区器
  3.              */
  4.             public class MyParttitioner extends Partitioner<ComboKey, NullWritable> {
  5.                 /**
  6.                  * 将年份相同的,分到一个组
  7.                  */
  8.                 @Override
  9.                 public int getPartition(ComboKey key, NullWritable nullWritable, int numPartitions) {
  10.                     int year = key.getYear();
  11.                     return year % numPartitions;
  12.                 }
  13.             }


        d.Reduce端:自定义排序对比器MyComboComparator extends WritableComparator ,将同一个组内的数据,(年份温度)按照自定义规则进行排序
          

  1.  /**
  2.              * 自定义key的排序对比器,用于Combokey的排序
  3.              */
  4.             public class ComboKeyComparator extends WritableComparator {
  5.                 protected ComboKeyComparator() {
  6.                 super(ComboKey.class, true);
  7.                 }
  8.                 public int compare(WritableComparable a, WritableComparable b) {
  9.                 ComboKey k1 = (ComboKey) a;
  10.                 ComboKey k2 = (ComboKey) b;
  11.                 return k1.compareTo(k2);
  12.                 }
  13.             }

        e.Reduce端:自定义分组对比器MyGroup extends WritableComparator, 将年份相同的Combokey组成一个组
          

  1.  /**
  2.              * 分组对比器,将reduce输入数据,年份相同的key编成一个组,进入一个reduce函数中
  3.              */
  4.             public class MyGroup extends WritableComparator {
  5.                 protected MyGroup() {
  6.                 super(ComboKey.class, true);
  7.                 }
  8.                 @Override
  9.                 public int compare(WritableComparable a, WritableComparable b) {
  10.                 ComboKey k1 = (ComboKey)a ;
  11.                 ComboKey k2 = (ComboKey)b ;
  12.                 return k1.getYear() - k2.getYear() ;
  13.                 }
  14.             }


        f.编写Reduce函数
            注意:--reduce函数传入的数据:是已经经过分组和排序的了,所以,肯定是同一年份,并且温度从高到底排序
                 -- 尤其注意,reduce函数的参数key和values,看似key是单一,values是集合,但实质上,每个key都是和values的单个value对应的。当value发生改变,key就会变成相应的值。所以,key是会变的,虽然是同一个reduce函数,但是key是不同的
                 -- 对values进行遍历,相当于流指针的位移。会一个value一个value的向下移。如果不遍历迭代器values,那么数据永远是第一个
          

  1.  /**
  2.              * Reducer类
  3.              */
  4.             public class MyReducer extends Reducer<ComboKey, NullWritable,IntWritable,IntWritable> {
  5.                 @Override
  6.                 protected void reduce(ComboKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  7.                 System.out.println("=======================> reduce");
  8.                 for (NullWritable v : values) {
  9.                     System.out.println(key.getYear() + "_" + key.getTemp());
  10.                     //return;
  11.                     break;
  12.                 }
  13.                 context.write(new IntWritable(key.getYear()),new IntWritable(key.getTemp()));
  14.                 System.out.println(key.getYear() + "_---_" + key.getTemp());
  15.                 }
  16.             }


        g.编写MyApp函数
         

  1.    /**
  2.              * AppMian
  3.              */
  4.             public class MyApp {
  5.                 public static void main(String [] args)
  6.                 {
  7.                 try {
  8.                     Configuration conf = new Configuration();
  9.                     Job job = Job.getInstance(conf);
  10.                     if(args.length > 1)
  11.                     {
  12.                     FileSystem.get(conf).delete(new Path(args[1]));
  13.                     }
  14.                     //设置job
  15.                     job.setJobName("SecondarySort");
  16.                     job.setJarByClass(MyApp.class);
  17.                     job.setInputFormatClass(TextInputFormat.class); //设定输入文件格式为seq
  18.                     FileInputFormat.addInputPath(job,new Path(args[0]));
  19.                     FileOutputFormat.setOutputPath(job,new Path(args[1]));
  20.                     job.setMapperClass(MyMapper.class);
  21.                     job.setReducerClass(MyReducer.class);
  22.                     job.setMapOutputKeyClass(ComboKey.class);       //map输出key类型
  23.                     job.setMapOutputValueClass(NullWritable.class);  //map输出value类型
  24.                     job.setOutputKeyClass(IntWritable.class);      //Reduce输出key类型
  25.                     job.setOutputValueClass(IntWritable.class);    //Reduce输出value类型
  26.                     //设定分区类
  27.                     job.setPartitionerClass(MyParttitioner.class);
  28.                     //设定分组类
  29.                     job.setGroupingComparatorClass(MyGroup.class);
  30.                     //设定自定义key的排序对比器
  31.                     job.setSortComparatorClass(ComboKeyComparator.class);
  32.                     //
  33.                     job.setNumReduceTasks(3);
  34.                     job.waitForCompletion(true);
  35.                 } catch (Exception e) {
  36.                     e.printStackTrace();
  37.                 }
  38.                 }
  39.             }


五、整个MR作业流程函数执行顺序分析
----------------------------------------------------------------------

    [数据准备阶段]
        1.InputFormat -- 确定数据的输入格式

    [数据输入阶段]
        2.Reader -- 通过reader一条一条读取数据

    [Map阶段]
        3.map() -- 一条条数据传入map()函数,在map函数中进行映射处理

        4.分区函数Partitioner -- 将map()函数处理好的数据,按照默认分区函数(hash)或者自定义分区函数(全排序,二次排序,自定义排序等)进行分区

        5.聚合函数Combiner -- 将每个分区里面的数据,进行一次map端的聚合,减少网络间传输的数据量

    [数据传输阶段]
        6.混洗 shuffle -- 将key相同的分区送入同一个Reduced端

    [Reduce阶段]
        7.排序对比 SortCombiner -- 将传入的所有数据(很多个分区的数据),按照默认排序(hash)或者自定义排序函数(全排序,二次排序,自定义排序等)进行排序

        8.分组对比 GroupCombiner -- 将拍好序的数据,进行分组。默认将key相同的分为一组或者自定义分组函数(全排序,二次排序,自定义排序等)进行分组,决定哪部分数据进行同一个reduce函数,进行Reduce化简

        9.reduce(): 将分组对比器传入的一组组数据,按照传入顺序,开始执行reduce函数。对数据进行化简,并将结果写入Context

    [数据输出阶段]
        10. 将Context中的数据输出到存储设备,硬盘,HDFS等


六、解决数据倾斜
---------------------------------------------------------
    1.Map端自定义分区函数

    2.Map作业链条化


七、作业链条化Chain
---------------------------------------------------------
    1.Job作业的一般形式 [Map + / Reduce Map *]
        -- 1个或者多个Map组成一个ChainMapper
        -- 1个Reduce 和 0个或者多个Map 组成一个ChainReducer

    2.作业链条化的优势在于:大大减小IO的读写和传输

    3.代码实现:实现单词统计,并且过滤掉敏感词汇,最后输出单词数量大于5的所有记录      

  1.  ***********************************MyApp函数**********************************
  2.         /**
  3.          * AppMian
  4.          */
  5.         public class MyApp {
  6.             public static void main(String [] args)
  7.             {
  8.             try {
  9.                 Configuration conf = new Configuration();
  10.                 Job job = Job.getInstance(conf);
  11.                 if(args.length > 1)
  12.                 {
  13.                 FileSystem.get(conf).delete(new Path(args[1]));
  14.                 }
  15.                 //设置job
  16.                 job.setJobName("test chain");
  17.                 job.setJarByClass(MyApp.class);
  18.                 FileInputFormat.addInputPath(job,new Path(args[0]));
  19.                 FileOutputFormat.setOutputPath(job,new Path(args[1]));
  20.                 //设定ChainMapper
  21.                 ChainMapper.addMapper(job,MyMapMapper_1.class, LongWritable.class,Text.class,Text.class,IntWritable.class,job.getConfiguration());
  22.                 ChainMapper.addMapper(job,MyMapMapper_2.class, Text.class,IntWritable.class,Text.class,IntWritable.class,job.getConfiguration());
  23.                 //设定ChainReducer
  24.                 ChainReducer.setReducer(job,MyReducer.class,Text.class,IntWritable.class,Text.class,IntWritable.class,job.getConfiguration());
  25.                 ChainReducer.addMapper(job,MyReducer_Mapper_1.class,Text.class,IntWritable.class,Text.class,IntWritable.class,job.getConfiguration());
  26.                 //设定任务属性
  27.                 job.setNumReduceTasks(3);
  28.                 //
  29.                 job.waitForCompletion(true);
  30.             } catch (Exception e) {
  31.                 e.printStackTrace();
  32.             }
  33.             }
  34.         }
  35.         ***********************************ChainMapper_map1**********************************
  36.         /**
  37.          * ChainMapper_map1
  38.          */
  39.         public class MyMapMapper_1 extends Mapper<LongWritable,Text ,Text,IntWritable> {
  40.             @Override
  41.             protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  42.             String v = value.toString();
  43.             String [] strs = v.split(" ");
  44.             for (String s : strs) {
  45.                 context.write(new Text(s), new IntWritable(1));
  46.             }
  47.             }
  48.         }
  49.         ***********************************ChainMapper_map2**********************************
  50.         /**
  51.          * ChainMapper_map1
  52.          */
  53.         public class MyMapMapper_2 extends Mapper<Text,IntWritable ,Text,IntWritable> {
  54.             @Override
  55.             protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
  56.             String sp = "falungong";
  57.             if(!key.toString().contains(sp))
  58.             {
  59.                 context.write(key, value);
  60.             }
  61.             }
  62.         }
  63.         ***********************************MyReducer **********************************
  64.         /**
  65.          * Reducer类
  66.          */
  67.         public class MyReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
  68.             @Override
  69.             protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  70.             int count = 0;
  71.             Iterator<IntWritable> it = values.iterator();
  72.             while(it.hasNext())
  73.             {
  74.                 int i = it.next().get();
  75.                 count += i;
  76.             }
  77.             context.write(key,new IntWritable(count));
  78.             }
  79.         }
  80.         ***********************************MyReducer_Mapper_1**********************************
  81.         /**
  82.          * Reducer类
  83.          */
  84.         public class MyReducer_Mapper_1 extends Mapper<Text, IntWritable,Text,IntWritable> {
  85.             @Override
  86.             protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
  87.             //只留下数量大于5的单词
  88.             if(value.get() > 5)
  89.             {
  90.                 context.write(key,value);
  91.             }
  92.             }
  93.         }


 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/563989
推荐阅读
相关标签
  

闽ICP备14008679号