当前位置:   article > 正文

Hadoop学习——MapReduce的组件及简单API(一)_mapreduce常用编程组件

mapreduce常用编程组件

  上一篇参考Hadoop学习——MapReduce的简单介绍及执行步骤

MapReduce的组件

  组件是实现MapReduce的真正干活的东西,即我们的业务逻辑,就是要写到这里边来的。MapReduce共有4个组件

一、Mapper组件
介绍

  可以读取文件,默认是一行一行读取,把输入 keyvalue通过map()传给程序员,输出keyvalue由业务来决定。MR框架会按照Mapper输出key做排序,输出key如果要自定义排序,可以实现WritableComparable接口

补充

  MapTask的数量 = 切片数量,即有几个切片,代码执行的时候,就会有几个mapTask。上一篇说的是等于切块数量,实际上等于切片更加贴切,切片本质上是一个对象,封装了文件块的描述信息,其中是不包含真正的数据的,切块是真正的数据。

二、Reducer组件
介绍

  接收Mapper组件的输出keyvalue,然后按相同key做聚合。

补充

  ReduceTask任务数量通过代码来指定。默认为1。

三、Partitioner组件
介绍

  分区组件,分区概念等同于ReduceTask即有几个ReduceTask,就有几个分区。默认的分区器是HashPartitioner,作用是按照Mapper输出key的hash分区,可以确保相同的key落到同一个分区,此外可以自定义分区,即写一个类继承Partitioner,最后在Driver指定分区方法即可。

补充

  类:HashPartitioner是默认的排序组件,底层用的是简单的hash算法,这种分区发可能会产生数据倾斜。

四、Combiner组件
介绍

  合并组件,作用是让合并工作在MapTask提前发生,可以减少reduceTask的合并负载,然后再发给Reduce端进一步执行。

补充

  开发一个Combine组件即写一个类,同样继承Reducer。然后在Driver中通过job.setCombinerClass()来指定。
  combine组件,如果不设定,默认是没有combine过程的。
  使用combine机制,不能改变最后的结果,即写法跟后边的reducer内容是一样的

JAVA API写法

  因为要利用到API,所以需要先引入包,引包的话,尽量与hadoop的版本一致,首先你要创建一个java project项目,并下载一个hadoop项目到本地,需要用到它里边的jar包。如果想知道引哪些包,可以参考这篇Hadoop Intellij IDEA本地开发环境搭建

1、简单的例子(利用mapper组件)

  首先将待处理文件上传到分布式文件系统。比如,如下文件word.txt的内容,现在对于它简单的输出一下:

hello word
hello word
hellohadoop01
hellohadoop01
  • 1
  • 2
  • 3
  • 4

  将该文件上传到hdfs里的word文件夹下。如果hdfs环境还没有搭建,可以参考我的Hadoop学习——简单介绍及单点配置步骤


  然后在开发工具里创建一个新的,编写一个mapper类,这个类里面实际上相当于创建一个MapTask,具体实现如下:


package mrDay1.mapreduce.word;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * * 新建一个WordCountMapper类,并继承Mapper
 * 1.job的MapTask如何处理文件块数据,是由Mapper组件来决定的,此类的代码需要程序员自行编写
 * 2.开发一个Mapper组件的方式是让一个类继承Mapper
 * 3.第一个泛型类型对应的MapTask的输入key的类型(输入key:每行的行首偏移量,其类型是LongWritable)
 * 4.第二个泛型类型对应的MapTask的输入Value的类型(输入Value:每行的内容,其类型是Text)
 * 5.writable机制是hadoop的序列化机制
 * 	常用的类型:LongWritable、IntWritable、Text(String)、NullWritable
 * 6.在类里面重写map方法,用于将输入key和输入value传给程序员,有一行数据,该方法调用一次
 * 7.WordCountMapper中的第三个泛型类型是MapTask的输出key类型
 * 8.WordCountMapper中的第四个发型类型是MapTask的输出value类型
 *
 */
public class WordCountMapper extends Mapper<LongWritable, Text, LongWritable, Text>{

	@Override  //重写map()方法
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context)
			throws IOException, InterruptedException {
		//将输入key和输入value直接输出,来验证一下输入key为行首偏移量,value为每行数据
		context.write(key, value);
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

  想要执行该任务,现在还少一个驱动程序,接下来编写驱动程序的类,具体实现如下

package mrDay1.mapreduce.word;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		//获取job对象
		Job job = Job.getInstance(conf);
		//设置job方法入口的驱动类
		job.setJarByClass(WordCountDriver.class);
		//设置Mapper组件类
		job.setMapperClass(WordCountMapper.class);
		//设置mapper的输出key类型
		job.setMapOutputKeyClass(LongWritable.class);
		//设置Mappper的输出value类型,注意Text的导包问题
		job.setMapOutputValueClass(Text.class);
		//设置输入路径,下边的ip即是hadoop的安装主机名
		//9000端口是表示hdfs的rpc通信端口
		//如果指定的是目录,会执行当前目录下的所有非“_”开头的文件。
		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/word"));
		//设置输出结果路径,要求结果路径事先不能存在
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/word/result"));
		//提交job,产生阻塞,直到job任务执行完成后才放开
		job.waitForCompletion(true);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

  然后将编写完成的项目打成jar包,并指定执行类。

  将jar包上传到Linux服务器,首先运行jps命令,确保ResourceManagerNodeManager这两个进程已经运行,如果这两个进程没有运行,当执行hadoop jar命令时会报错。

  如果运行成功,则可以直接输入如下命令执行jar包

hadoop jar wordcount.jar
  • 1

  其中wordcount.jar即是打好的jar包。当执行结束,可以通过命令查看HDFS里的word/result文件夹下,会有一个part-r-00000文件(如果reduce任务有多个,会出来多个文件),即结果,可以通过命令找到并查看该文件。

hadoop fs -cat /word/result/part-r-00000
  • 1

  会得到如下内容,即是我们的计算结果:

0 hello word
0 hello word
11 hellohadoop01
11 hellohadoop01
  • 1
  • 2
  • 3
  • 4

  以上即是一个简单的含有Mapper组件的写法。

2、统计文件中的单词数量(利用mapper+reducer组件)

  统计一个“word.txt”文件中不同单词出现的次数。文件内容具体如下:

hello word
hello hadoop01
i am a programer
  • 1
  • 2
  • 3

  实现方法如下:

  首先编写Mapper类:

package mrDay1.mapreduce.word.example;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EXWordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {

		String line = value.toString();
		//text类型没有截取方法,则转为string
		String[] words = line.split(" ");
		for (String word : words) {
			context.write(new Text(word), new IntWritable(1));
		}
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

  如果单纯的只写mapper组件,结果会如下:

a       1
am      1
hadoop01        1
hello   1
hello   1
i       1
programer       1
word    1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

  现在假如还要将上边的结果按照相同的key做聚合,那就需要reduce组件了

  Reduce的工作原理:将相同的key做聚合,将value形成迭代器。以下为其API实现:

首先开发ExWordCountReducer

package mrDay1.mapreduce.word.reduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * 1.第一个泛型类型对应的是reducer的输出key类型
 * 2.第二个泛型类型对应的是reducer的输出value类型
 * 3.第三个泛型类型是reduce的输出key类型
 * 4.第四个泛型类型是reduce的输出value类型
 * 5.reduce组件不能单独存在,因为要接收Mapper组件的输出
 * 6.Mapper组件可以单独存在,当只有Mapper时,最后的结果文件时MapTask的输出
 * 7.当既有Mapper又有Reduce时,最后的结果文件时Reduce的输出而Mapper的输出作为中间结果。
 *
 */
public class ExWordCountReducer extends Reducer<Text, IntWritable, Text, Text>{
	
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, Text>.Context context)
			throws IOException, InterruptedException {
		String result = "";
		for(IntWritable v : values) {
			result = result + "," + v.get();
		}
		//做测试,看一下reducer组件传进来的key和Iterable
		context.write(key, new Text(result));
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

然后修改driver,只需在driver中配置reduce即可:

package mrDay1.mapreduce.word;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import mrDay1.mapreduce.word.mapper.ExWordCountMapper;
import mrDay1.mapreduce.word.reduce.ExWordCountReducer;

public class ExWordCountDriver {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		//获取job对象
		Job job = Job.getInstance(conf);
		//设置job方法入口的驱动类
		job.setJarByClass(ExWordCountDriver.class);
		//设置Mapper组件类
		job.setMapperClass(ExWordCountMapper.class);
		//设置mapper的输出key类型
		job.setMapOutputKeyClass(LongWritable.class);
		//设置Mappper的输出value类型,注意Text的导包问题
		job.setMapOutputValueClass(Text.class);
		//设置reduce组件类
		job.setReducerClass(ExWordCountReducer.class);
		//设置reduce输出的key和value类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		//设置输入路径
		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/word"));
		//设置输出结果路径,要求结果路径事先不能存在
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/word/result"));
		//提交job
		job.waitForCompletion(true);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

配置完成之后,打jar包并上传到分布式文件系统,执行即可,执行结果如下图。

a       ,1
am      ,1
hadoop01        ,1
hello   ,1,1
i       ,1
programer       ,1
word    ,1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

注意:

 ① 要保证MapperTask类中的第三、四泛型类型与ReduceTask类中的第一、二泛型类型相同,因为MapperTask中输出作为ReduceTask中的输入使用,如果不相同则会错误。
 ② 要保证MapperTask中的输出key和输出value的类型与driver中设置的输出的key和value相同,否则也会报错。
ReduceTask默认是一个1个分区,在生成结果文件的时候,只会生成一个,默认是以0开始的,比如part-r-00000,如果想要设置成3个分区,只需要在Driver里,加一行配置job.setNumReduceTasks(int tasks); 即可,最终的结果文件,也会有3个,依然是从0开始。比如part-r-00000part-r-00001part-r-00002

  关于Partitioner组件Combiner组件 写到了下一篇里。

  下一篇,Hadoop学习——MapReduce的组件及简单API(二)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/777842
推荐阅读
相关标签
  

闽ICP备14008679号