当前位置:   article > 正文

最全Mapreduce保姆级教程(建议收藏,随用随查)_mapreduce数据处理

mapreduce数据处理

1.1 Mapreduce的设计思想

  Mapreduce遵循的是的分而治之的设计思想,将一个大的任务拆分成若干个小的任务,然后并行的进行处理。在任务处理中,有两类任务,一个是MapTask,另一个是ReduceTask。

1.2 Mapreduce的模板类

  当前这个模板类是从官网直接找到的,功能是单词统计

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

1.3 Mapreduce的数据类型

因为在Mapreduce程序运行中会大量用到网络I/O进行数据传输,所以在Mapreduce程序中所有类型必须使用序列化类型。可以自定义数据类型,查看Mapreduce的自定义数据类型。 Mapreduce自带的数据类型有如下:

  • IntWritable
  • LongWritable
  • DoubleWritable
  • BooleanWritable
  • NullWritable
  • Text

1.4 Mapreduce的Maven依赖

<properties>
        <hadoop.version>2.6.0-cdh5.14.0</hadoop.version>
    </properties>

    <dependencies>
        <!--引入单元测试-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>
 	<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

1.5 Mapreduce的处理流程

  Mapreduce的处理流程分为五大阶段,分别是Input、Map、Shuffle、reduce和Output。Mapreduce在处理流程中所有数据都是以key-value形式存在。

1.5.1 Input

  负责整个程序的读入,默认读取HDFS上面的文件,读取的时候会按照分片规则进行分片,可以通过job的属性值进行修改,如下示例:

/*该值默认为TextInputFormat.class,可以指定为其他值。
TextInputFormat.class
功能:从hdfs上读取数据,并且将HDFS上读取到的内容变成key-value格式
		文件中一行会变成一个 keyvalue
		key:行的偏移量
		value:行的内容
*/
job.setInputFormatClass();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

1.5.2 Map

  根据Input阶段中的split个数来启动对应的MapTask,每个MapTask处理一部分数据,但是处理的逻辑是相同的。split个数由分片规则决定,通过查看FileInputFormat类的源码getSplits()方法和computeSplitSize()方法可以看出一个分片大概在128M左右,可以通过调整mapreduce.input.fileinputformat.split.maxsize、mapreduce.input.fileinputformat.split.minsize这两个参数修改分片规则大小。需要构建一个类继承Mapper类实现Mapper类的map方法并提供具体的实现。每一条key-value数据会调用一次map方法。

1.5.3.Shuffle

  Shuffle有分区、排序、分组的功能,Mapreduce的Shuffle默认会执行所有的功能,Shuffle的设计初衷是为了统计数据。

1.5.3.1 Shuffle功能
1.5.3.1.1 分区

  分区是为了多进程的并行,有几个ReduceTask就有几个分区,分区决定了当前这条数据会进入到那个ReduceTask中,被那个ReduceTask处理。默认的分区规则是根据key的hash值取余ReduceTask的个数,由默认的分区器HashPartitioner实现。Mapreduce的Shuffle分区规则也可以自定义,自定义后可以通过job.setPartitionerClass()方式设置到Mapreduce程序中。实现方式请查看Shuffle的自定义分区

1.5.3.1.2 排序

  默认按照key的字典顺序进行排序。

1.5.3.1.3 分组

  按照key进行分组,相同的key的value会放在同一个迭代器中。

1.5.3.2 Shuffle过程
1.5.3.2.1 Map端的Shuffle

  处理MapTask的结果。

1.5.3.2.1.1 溢写spill

  将内存中的数据写入文件,每一个MapTask会将自己处理的结果放入一个环形的内存缓冲区中,缓冲区的大小默认是100M,当缓冲区数据达到80%的时候,会触发溢写,所有即将被溢写到磁盘的文件都会进行分区和排序,分区默认就是按照key的hash取余进行的,也可以自定义分区,排序的话会调用默认的排序器或者compareTo方法,排序的算法是快排,这里的排序不是整个批次数据有序,而是各个分区的数据内部有序。

1.5.3.2.1.2 合并merge

  溢写完成后每个分区都会有很多小文件,合并的功能就是把每个分区的所有小文件合并成一个大文件,保证每个MapTask只有一个大文件。合并的话因为是很多小文件合并成一个大文件,所以也会进行排序,这里用到的排序算法是归并排序,排序的逻辑还是相同的。

1.5.3.2.2 Reduce端的Shuffle

  将自己的结果给reduce方法。

1.5.3.2.2.1 拉取pull

  通过Http协议,每个RedcueTask会到每个MapTask中拉取属于自己的数据。

1.5.3.2.2.2 合并merge

  数据拉取到ReduceTask后是乱序的,这里也会发生排序,这里的排序算法也是归并排序(内存中只放索引),排序逻辑相同。

1.5.3.2.2.3 分组group

  在这里会将相同key的value放到一个迭代器中。

1.5.4 Reduce

  合并,启动ReduceTask并把MapTask阶段处理完成的数据拉取到各个ReduceTask上,默认情况下ReduceTask只有一个。并且,在ReduceTask上面进行数据的最后处理。需要构建一个类继承Reducer类实现reuduce方法并提供具体的实现。每一条key-value数据会调用一次reduce方法。

1.5.5 Output

  负责将上一步Reduce的结果进行输出,几个ReduceTask就产生几个结果文件,默认输出到HDFS上变成文件,并且key-value之间以制表符分隔。可以通过job的属性值进行修改,如下示例:

/*该值默认为TextOutputFormat.class,可以指定为其他值。
TextOutputFormat.class
功能:从hdfs上读取数据,并且将HDFS上读取到的内容变成key-value格式
		文件中一行会变成一个 keyvalue
		key:行的偏移量
		value:行的内容
*/
job.setOutputFormatClass();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

1.5.6 处理流程图

1.6 Mapreduce的两种程序类型

  • 没有Shuffle,只有Map端,没有Reduce端
  • 有Shuffle,既有Map端,也有Reduce端

1.7 Shuffle自定义分区

  继承Partitioner抽象类并实现抽象类的getPartition方法。

import org.apache.hadoop.mapreduce.Partitioner;
/**
 * @ClassName Test
 * @Description: TODO
 * @Author qywang
 * @Date 2021/6/19
 * @Version V1.0
 **/
public class Test<K, V> extends Partitioner<K, V> {
    /**
     * @param k             一条数据的key
     * @param v             一条数据的value
     * @param numPartitions ReduceTask的个数
     * @return
     */
    @Override
    public int getPartition(K k, V v, int numPartitions) {
        return (k.hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

1.8 Mapreduce的自定义数据类型

1.8.1 实现Writable接口

/**
 * @ClassName WordCountBean
 * @Description TODO 自定义数据类型,用于封装单词和单词长度
 */
public class WordCountBean implements Writable {
    //定义属性
    private String word;
    private int length ;
    public WordCountBean() {
    }
    public WordCountBean(String word, int length) {
        this.setWord(word);
        this.setLength(length);
    }
    public String getWord() {
        return word;
    }
    public void setWord(String word) {
        this.word = word;
    }
    public int getLength() {
        return length;
    }
    public void setLength(int length) {
        this.length = length;
    }
    //序列化
    @Override
    public void write(DataOutput out) throws IOException {
        //输出两个对象
        out.writeUTF(this.word);
        out.writeInt(this.length);
    }
    //反序列化:注意顺序必须与序列化的顺序保持一致,不然会导致值错乱
    @Override
    public void readFields(DataInput in) throws IOException {
        //读出两个对象
        this.word = in.readUTF();
        this.length = in.readInt();
    }
    //toString方法,在输出的 时候会被调用,写入文件或者打印
    @Override
    public String toString() {
        return this.word+"\t"+this.length;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

1.8.2 实现WritableComparable接口

/**
 * @ClassName WordCountBean
 * @Description TODO 自定义数据类型,用于封装单词和单词长度
 */
public class WordCountBean implements WritableComparable<WordCountBean> {
    //定义属性
    private String word;
    private int length ;
    //构造
    public WordCountBean() {
    }
    public void WordCountBean(String word, int length) {
        this.setWord(word);
        this.setLength(length);
    }
    public String getWord() {
        return word;
    }
    public void setWord(String word) {
        this.word = word;
    }
    public int getLength() {
        return length;
    }
    public void setLength(int length) {
        this.length = length;
    }
    //序列化
    @Override
    public void write(DataOutput out) throws IOException {
        //输出两个对象
        out.writeUTF(this.word);
        out.writeInt(this.length);
    }
    //反序列化:注意顺序必须与序列化的顺序保持一致,不然会导致值错乱
    @Override
    public void readFields(DataInput in) throws IOException {
        //读出两个对象
        this.word = in.readUTF();
        this.length = in.readInt();
    }
    //toString方法,在输出的 时候会被调用,写入文件或者打印
    @Override
    public String toString() {
        return this.word+"\t"+this.length;
    }
    //用于在shuffle阶段的排序以及分组
    @Override
    public int compareTo(WordCountBean o) {
        //先比较第一个值
        int comp = this.getWord().compareTo(o.getWord());
        if(0 == comp){
            //如果第一个值,相等,以第二个值的比较结果作为最终的结果
            return Integer.valueOf(this.getLength()).compareTo(Integer.valueOf(o.getLength()));
        }
        return comp;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

1.8.3 两种数据类型实现方式的区别

这种主要观察程序是否需要进行Shuffle,如果程序要经过Shuffle,则必须实现WritableComparable接口。实现WritableComparable接口主要是在Shuffle阶段对key进行排序使用,可以实现compareTo方法自定义。

1.9 Shuffle的自定义排序

  在源代码中调用排序器的逻辑,如果有自定义的排序器就用自定义的排序器,如果没有的话判断该类是否实现了compareTo方法,有的话就用compareTo方法,如果还是没有的话,则会调用默认的key的字典升序进行排序。

1.9.1 自定义数据类型

  自定义数据类型可以调用该类型中的compareTo方法实现排序。

1.9.2 官方自带的类型

  官方自带的数据类型默认的排序方式为对key进行升序,如果要对key降序的话,需要自定义一个排序器,然后通过job.setSortComparatorClass()方法设置自定义的排序器进去就可以。示例:

/**
 * @ClassName UserSort
 * @Description TODO 用户自定义排序器
 */
public class UserSort extends WritableComparator {
    /**
     * 注册类型的转换
     */
    public UserSort() {
        //类型的转换注册,允许将类型转换为Text
        super(Text.class,true);
    }
    /**
     * 调用排序的方法
     * @param a
     * @param b
     * @return
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //需求:比较Text类型,传入WritableComparable,要进行转换
        Text o1 = (Text) a;
        Text o2 = (Text) b;
        //降序
        return -o1.compareTo(o2);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

1.10 Shuffle中的两个优化

1.10.1 Combiner

  Combiner也称为Map端的聚合,默认情况下是关闭的,如果要使用的话,必须手动设置。判断程序是否可以使用Combiner,只需要看reduce方法的输入是否等于输出。如果相同就可以使用Combiner,如果不相同,就不行。使用Combiner主要是因为正常情况下Map端的并发数是要远远大于Reduce端的并发数的,将Reduce端的聚合操作先在Map端分别完成,能有效降低数据量,减轻Reduce端的负载。可以通过job.setCombinerClass()方法进行设置,Combinesr的类一般就是Reducer的类,聚合逻辑是一致的。

1.10.2 Compress压缩

  Mapreduce的压缩主要使用在Shuffle阶段的数据写入磁盘中,具体的压缩配置可以参考Hive教程这篇文件中Hive调优中提到的压缩。

1.11 Mapreduce的join解决方案

1.11.1 Reduce join

  Reduce join在Mapreduce中适用于大的数据量和大的数据量进行join,效率比较低。

/**
 * 定义Mapper的实现类以及Map过程中的处理逻辑
 */
public static class MRJoinMapper extends Mapper<LongWritable,Text,Text, Text>{

    private Text outputKey = new Text();
    private Text outputValue = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //先判断当前的数据是哪个文件的
        FileSplit inputSplit = (FileSplit) context.getInputSplit();//先获取这条数据属于哪个文件分片
        String fileName = inputSplit.getPath().getName();//获取文件名称
        //如果是订单数据,key为第三个字段,value是其他剩余字段
        if("orders.txt".equals(fileName)){
            String[] items = value.toString().split(",");
            this.outputKey.set(items[2]);//商品id
            this.outputValue.set(items[0]+"\t"+items[1]+"\t"+items[3]);//其他字段
            context.write(this.outputKey,this.outputValue);//将订单数据输出
        }else{
            //如果是商品数据,key为第一个字段,value是第二个字段
            String[] split = value.toString().split(",");
            this.outputKey.set(split[0]);//商品id
            this.outputValue.set(split[1]);//商品名称
            context.write(this.outputKey,this.outputValue);//输出商品数据
        }
    }
}
/**
* 定义Reducer的实现类以及Reduce过程中的处理逻辑
*/
public static class MRJoinReduce extends Reducer<Text,Text,Text,Text>{
  private Text outputValue = new Text();
  @Override
  protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuilder stringBuilder = new StringBuilder();
        for (Text value : values) {
            stringBuilder.append(value.toString()+"\t");//将所有的商品的名称和对应的所有订单进行拼接
        }
        this.outputValue.set(stringBuilder.toString());//将商品名称及订单作为value
        context.write(key,this.outputValue);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

1.11.2 Map join

  Map join在Mapreduce中适用于小的数据量和大的数据量进行join。将小数据量数据放入分布式缓存,大数据的每个MapTask需要用到时,从分布式缓存中取,然后直接在Map端完成join,不需要进行Shuffle。
示例:

public class MapJoin extends Configured implements Tool {
    /**
     * 具体整个MapReduce job的定义:构建、配置、提交
     * @param args
     * @return
     * @throws Exception
     */
    @Override
    public int run(String[] args) throws Exception {
        /**
         * 构建一个job
         */
        //创建一个job的实例
        Job job = Job.getInstance(this.getConf(),"mrjob");
        //设置job运行的类
        job.setJarByClass(MapJoin.class);
        /**
         * 配置job
         */
        //input:定义输入的方式,输入的路径
        Path orderPath = new Path("datas/mrjoin/orders.txt");
        TextInputFormat.setInputPaths(job,orderPath);
        //将商品的数据放入分布式缓存
        Path productPath = new Path("hdfs://v6:8020/datas/mrjoin/product.txt");
        //必须使用绝对路径
        job.addCacheFile(productPath.toUri());
        //map:定义Map阶段的类及输出类型
        job.setMapperClass(MRJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //shuffle:定义shuffle阶段实现的类
        //reduce:定义reduce阶段的类及输出类型
//        job.setReducerClass(MRJoinReduce.class);
//        job.setOutputKeyClass(Text.class);
//        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);//设置Reduce的个数,就是分区的个数
        //output:定义输出的类以及输出的路径
        Path outputPath = new Path("datas/output/join/mapjoin");
        //如果输出存在,就删除
        FileSystem hdfs = FileSystem.get(this.getConf());
        if(hdfs.exists(outputPath)){
            hdfs.delete(outputPath,true);
        }
        TextOutputFormat.setOutputPath(job,outputPath);
        /**
         * 提交job:并根据job运行的结果返回
         */
        return job.waitForCompletion(true) ? 0:-1;
}
/**
 * 定义Mapper的实现类以及Map过程中的处理逻辑
 */
public static class MRJoinMapper extends Mapper<LongWritable,Text,Text, Text>{

    private Text outputKey = new Text();
    private Text outputValue = new Text();
    Map<String,String> maps = new HashMap<>();
/**
 * Map和Reduce的类:三个方法
 *      1-setup:会在map或者reduce方法之前执行
 *      2-map/reduce:map逻辑或者reduce逻辑
 *      3-close:最后执行的方法
 * @param context
 * @throws IOException
 * @throws InterruptedException
 */
@Override
protected void setup(Context context) throws IOException, InterruptedException {
        //将分布式缓存的 数据读取进来
        URI[] cacheFiles = context.getCacheFiles();//获取所有的缓存数据
        //读取文件内容
        BufferedReader bufferedReader = new BufferedReader(new FileReader(cacheFiles[0].getPath()));
        String line = null;
        while(StringUtils.isNotBlank(line = bufferedReader.readLine())){
            //读取到每一行的内容
            String pid = line.split(",")[0];//商品id
            String productName = line.split(",")[1];//商品名称
            //将商品id和名称放入map集合
            maps.put(pid,productName);
        }
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取订单数据
        String[] items = value.toString().split(",");
        String pid = items[2]; //订单中的商品id
        String productName = maps.get(pid);
        this.outputKey.set(productName);
        this.outputValue.set(value.toString());
        context.write(this.outputKey,this.outputValue);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92

1.12 自定义Input

  Mapreduce不适合处理小文件,一个小文件就占一个块,对应就有一个MapTask,导致资源的浪费。一个MapTask从启动申请资源,到处理数据,最后关闭资源,如果数据小,处理数据时间非常短,可能都没有启动和关闭的时间长。在文件处理前做合并:将多个小文件合并成一个文件【Input中直接实现】,多个分片实现合并输出。自定义一个读取器。

/**
 * @ClassName UserRecordReader
 * @Description TODO 自定义一个读取器,将整个文件的内容,变成一个keyvalue
 * return new LineRecordReader(recordDelimiterBytes); 将每一行变成一个keyvalue返回
 */
public class UserRecordReader extends RecordReader<NullWritable, BytesWritable> {

    //定义keyvalue,用于赋值和返回
    private NullWritable key = NullWritable.get();
    private BytesWritable value = new BytesWritable();
    //定义全局的配置文件对象
    Configuration conf = null;
    //定义全局的分片对象
    FileSplit fileSplit = null;
    //设置标志变量
    boolean flag = false;

    /**
     * 初始化的方法,只调用一次
     * @param split
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        conf = context.getConfiguration(); //从上下文中获取当前程序的配置对象
        fileSplit = (FileSplit) split; //获取当前切片的信息
    }

    /**
     * 读取并封装每一条keyvalue
     *  如果返回true:表示还有下一条,继续调用该方法读取
     *      第一次执行时,必须返回true,将第一次得到的keyvalue保存,然后读取下一条
     *  如果返回false:表示没有下一条,将当前结果进行返回
     *      如果第一次直接返回false,表示该分片中没有任何数据
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if(!flag){
            //从每个分片中读取数据,变成keyvalue:读取每个文件,将文件的内容放入value中
            //先构建一个 文件系统对象
            FileSystem hdfs = FileSystem.get(conf);
            //先从切片中获取该切片对应 的文件路径
            Path path = fileSplit.getPath();
            //打开这个文件
            FSDataInputStream open = hdfs.open(path);
            //将输入流赋值给字节数组
            byte[] bytes = new byte[(int) fileSplit.getLength()];
            IOUtils.readFully(open,bytes,0, (int) fileSplit.getLength());
            //将字节数组赋值给value
            this.value.set(bytes,0, (int) fileSplit.getLength());
            //关闭资源
            open.close();
            hdfs.close();
            //修改标志
            flag = true;
            //返回
            return true;
        }
        return false;
    }

    //返回当前读取到的key
    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    //返回当前读取到的value
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    //获取当前读取数据的进度的
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {
        //释放资源
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89

  自定义一个输入类

/**
 * @ClassName UserInputFormat
 * @Description TODO 自定义的输入:将读取到的每个小文件 = 每个分片 = 变成一个keyvalue
 */
public class UserInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
    /**
     * 构建一个读取者对象
     * @param split
     * @param context
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        //构建一个读取者对象
        UserRecordReader userRecordReader = new UserRecordReader();
        //调用初始化方法
        userRecordReader.initialize(split,context);//将当前方法传入进来的分片和 上下文对象传递给初始化方法
        return userRecordReader;
    }

    /**
     * 代表是否可分割:需求中不需要分割
     * @param context
     * @param filename
     * @return
     */
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;//输入的 数据不可分割
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
/**
 * @ClassName UserInputMR
 * @Description TODO 用于实现自定义输入
 */
public class UserInputMR extends Configured implements Tool {


    /**
     * 具体整个MapReduce job的定义:构建、配置、提交
     * @param args
     * @return
     * @throws Exception
     */
    @Override
    public int run(String[] args) throws Exception {
        /**
         * 构建一个job
         */
        //创建一个job的实例
        Job job = Job.getInstance(this.getConf(),"userinput");
        //设置job运行的类
        job.setJarByClass(UserInputMR.class);

        /**
         * 配置job
         */
        //input:定义输入的方式,输入的路径
        Path inputPath = new Path("datas/inputformat");
        job.setInputFormatClass(UserInputFormat.class);//自定义输入,将每个文件的内容只变成一个keyvalue,调用一次map方法
        UserInputFormat.setInputPaths(job,inputPath);
        //map:定义Map阶段的类及输出类型
        job.setMapperClass(MRModelMapper.class);
        job.setMapOutputKeyClass(Text.class);//在map方法中获取文件名
        job.setMapOutputValueClass(BytesWritable.class);//在map方法中获取这个文件所有内容
        //shuffle:定义shuffle阶段实现的类
        //reduce:定义reduce阶段的类及输出类型
//        job.setReducerClass(MRModelReduce.class);
        job.setOutputKeyClass(Text.class);              //文件名
        job.setOutputValueClass(BytesWritable.class);   //文件内容
        job.setNumReduceTasks(1);//设置Reduce的个数,就是分区的个数
        //output:定义输出的类以及输出的路径
        Path outputPath = new Path("datas/output/inputformat");
        //如果输出存在,就删除
        FileSystem hdfs = FileSystem.get(this.getConf());
        if(hdfs.exists(outputPath)){
            hdfs.delete(outputPath,true);
        }
        //将结果保存为一个Sequencefile文件
        job.setOutputFormatClass(SequenceFileOutputFormat.class);//该类型文件为keyvalue的二进制类型文件
        SequenceFileOutputFormat.setOutputPath(job,outputPath);

        /**
         * 提交job:并根据job运行的结果返回
         */
        return job.waitForCompletion(true) ? 0:-1;
    }


    /**
     * 程序的入口
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //构建一个Conf对象,用于管理当前程序的所有配置
        Configuration conf = new Configuration();
        //调用当前类的run方法
        int status = ToolRunner.run(conf, new UserInputMR(), args);
        //根据job运行的状态,来退出整个程序
        System.exit(status);
    }

    /**
     * 定义Mapper的实现类以及Map过程中的处理逻辑
     */
    public static class MRModelMapper extends Mapper<NullWritable,BytesWritable,Text, BytesWritable>{
        private Text outputKey = new Text();

        @Override
        protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
            //当前输入的value就是该文件的所有内容
            //获取文件名封装成Text
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            String fileName = inputSplit.getPath().getName();
            this.outputKey.set(fileName);
            //输出这个文件的名称和所有内容
            context.write(outputKey,value);
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91

1.13 自定义Output

  自定义一个输出类。

/**
 * @ClassName UserOutputFormat
 * @Description TODO 自定义输出器,将reduce的数据,输出到两个文件
 */
public class UserOutputFormat extends FileOutputFormat<Text, NullWritable>{
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        //构建hdfs对象
        Configuration conf = context.getConfiguration();
        FileSystem hdfs = FileSystem.get(conf);
        //构建输出流
        FSDataOutputStream goodContent = hdfs.create(new Path("datas/output/outputformat/good"));
        FSDataOutputStream badContent = hdfs.create(new Path("datas/output/outputformat/bad"));
        UserRecordWriter userRecordWriter = new UserRecordWriter(goodContent, badContent);
        //返回一个 输出器
        return userRecordWriter;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

  自定义一个输出器。

/**
 * @ClassName UserRecordWriter
 * @Description TODO 真正实现将数据输出的对象
 */
public class UserRecordWriter extends RecordWriter<Text, NullWritable> {
    //定义属性
    FSDataOutputStream good = null;
    FSDataOutputStream bad = null;
    public UserRecordWriter(FSDataOutputStream goodContent,FSDataOutputStream badContent){
        good = goodContent;
        bad = badContent;
    }
    /**
    * 将keyvalue数据输出的方法
    * @param key
    * @param value
    * @throws IOException
    * @throws InterruptedException
    */
     @Override
     public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        String content = key.toString().split("\t")[9];
        //将差评写入一个文件:2
        if("2".equals(content)){
            bad.write(key.toString().getBytes());//将整条差评输出
            bad.write("\r\n".getBytes());//输出一个换行
        }else {
            //其他评价写入另一个文件:0,1
            good.write(key.toString().getBytes());
            good.write("\r\n".getBytes());
        }
    }
    //关闭资源的方法
    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        good.close();
        bad.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
/**
 * @ClassName UserOutputMR
 * @Description TODO 将评论数据拆分到不同的文件中
 */
public class UserOutputMR extends Configured implements Tool {


    /**
     * 具体整个MapReduce job的定义:构建、配置、提交
     * @param args
     * @return
     * @throws Exception
     */
    @Override
    public int run(String[] args) throws Exception {
        /**
         * 构建一个job
         */
        //创建一个job的实例
        Job job = Job.getInstance(this.getConf(),"mrjob");
        //设置job运行的类
        job.setJarByClass(UserOutputMR.class);

        /**
         * 配置job
         */
        //input:定义输入的方式,输入的路径
        Path inputPath = new Path("datas/outputformat/ordercomment.csv");
        TextInputFormat.setInputPaths(job,inputPath);
        //map:定义Map阶段的类及输出类型
        job.setMapperClass(MRModelMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        //shuffle:定义shuffle阶段实现的类
        //reduce:定义reduce阶段的类及输出类型
        job.setReducerClass(MRModelReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(1);//设置Reduce的个数,就是分区的个数
        //output:定义输出的类以及输出的路径
        Path outputPath = new Path("datas/output/outputformat/normal");
        //如果输出存在,就删除
        FileSystem hdfs = FileSystem.get(this.getConf());
        if(hdfs.exists(outputPath)){
            hdfs.delete(outputPath,true);
        }
        //修改输出的类
        job.setOutputFormatClass(UserOutputFormat.class);
        UserOutputFormat.setOutputPath(job,outputPath);

        /**
         * 提交job:并根据job运行的结果返回
         */
        return job.waitForCompletion(true) ? 0:-1;
    }


    /**
     * 程序的入口
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //构建一个Conf对象,用于管理当前程序的所有配置
        Configuration conf = new Configuration();
        //调用当前类的run方法
        int status = ToolRunner.run(conf, new UserOutputMR(), args);
        //根据job运行的状态,来退出整个程序
        System.exit(status);
    }

    /**
     * 定义Mapper的实现类以及Map过程中的处理逻辑
     */
    public static class MRModelMapper extends Mapper<LongWritable,Text,Text, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //将每一行的内容作为key,直接输出
            context.write(value,NullWritable.get());
        }
    }

    /**
     * 定义Reducer的实现类以及Reduce过程中的处理逻辑
     */
    public static class MRModelReduce extends Reducer<Text,NullWritable,Text,NullWritable>{
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            //直接输出每行内容
            context.write(key,NullWritable.get());
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

1.14 Shuffle的自定义分组

  Mapreduce默认按照key进行分组,如果想按照其他方式进行分组,可以自定义分组,然后通过job.setGroupingComparatorClass()方法设置分组类。如下示例:

package cn.itcast.hadoop.mapreduce.group;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import java.util.Collections;
import java.util.Comparator;
/**
 * @ClassName OrderGroup
 * @Description TODO 自定义分组,按照key的第一个值来分组
 */
public class OrderGroup extends WritableComparator {
    //注册类型,用于强转
    public OrderGroup(){
        super(OrderBean.class,true);
    }
    //实现两个OrderBean的比较
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean o1 = (OrderBean) a;
        OrderBean o2 = (OrderBean) b;
        //只比较第一个值,订单id,如果订单id,相同就是同一组
        return o1.getOrderId().compareTo(o2.getOrderId());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/892998
推荐阅读
相关标签
  

闽ICP备14008679号