赞
踩
上一篇参考Hadoop学习——MapReduce的简单介绍及执行步骤
组件是实现MapReduce
的真正干活的东西,即我们的业务逻辑,就是要写到这里边来的。MapReduce
共有4个组件
可以读取文件,默认是一行一行读取,把输入 key
和value
通过map()
传给程序员,输出key
和value
由业务来决定。MR框架会按照Mapper
的输出key
做排序,输出key
如果要自定义排序,可以实现WritableComparable
接口
MapTask
的数量 = 切片数量,即有几个切片,代码执行的时候,就会有几个mapTask
。上一篇说的是等于切块数量,实际上等于切片更加贴切,切片本质上是一个对象,封装了文件块的描述信息,其中是不包含真正的数据的,切块是真正的数据。
接收Mapper
组件的输出key
和value
,然后按相同key做聚合。
ReduceTask
任务数量通过代码来指定。默认为1。
分区组件,分区概念等同于ReduceTask
,即有几个ReduceTask,就有几个分区。默认的分区器是HashPartitioner
,作用是按照Mapper
输出key的hash分区,可以确保相同的key落到同一个分区,此外可以自定义分区,即写一个类继承Partitioner
,最后在Driver指定分区方法即可。
类:HashPartitioner
是默认的排序组件,底层用的是简单的hash算法,这种分区发可能会产生数据倾斜。
合并组件,作用是让合并工作在MapTask提前发生,可以减少reduceTask的合并负载,然后再发给Reduce端进一步执行。
开发一个Combine
组件即写一个类,同样继承Reducer
。然后在Driver
中通过job.setCombinerClass()
来指定。
combine
组件,如果不设定,默认是没有combine
过程的。
使用combine机制,不能改变最后的结果,即写法跟后边的reducer内容是一样的。
因为要利用到API,所以需要先引入包,引包的话,尽量与hadoop
的版本一致,首先你要创建一个java project
项目,并下载一个hadoop
项目到本地,需要用到它里边的jar包。如果想知道引哪些包,可以参考这篇Hadoop Intellij IDEA本地开发环境搭建
首先将待处理文件上传到分布式文件系统。比如,如下文件word.txt
的内容,现在对于它简单的输出一下:
hello word
hello word
hellohadoop01
hellohadoop01
将该文件上传到hdfs
里的word
文件夹下。如果hdfs
环境还没有搭建,可以参考我的Hadoop学习——简单介绍及单点配置步骤。
然后在开发工具里创建一个新的,编写一个mapper类,这个类里面实际上相当于创建一个MapTask,具体实现如下:
package mrDay1.mapreduce.word; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * * 新建一个WordCountMapper类,并继承Mapper * 1.job的MapTask如何处理文件块数据,是由Mapper组件来决定的,此类的代码需要程序员自行编写 * 2.开发一个Mapper组件的方式是让一个类继承Mapper * 3.第一个泛型类型对应的MapTask的输入key的类型(输入key:每行的行首偏移量,其类型是LongWritable) * 4.第二个泛型类型对应的MapTask的输入Value的类型(输入Value:每行的内容,其类型是Text) * 5.writable机制是hadoop的序列化机制 * 常用的类型:LongWritable、IntWritable、Text(String)、NullWritable * 6.在类里面重写map方法,用于将输入key和输入value传给程序员,有一行数据,该方法调用一次 * 7.WordCountMapper中的第三个泛型类型是MapTask的输出key类型 * 8.WordCountMapper中的第四个发型类型是MapTask的输出value类型 * */ public class WordCountMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ @Override //重写map()方法 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException { //将输入key和输入value直接输出,来验证一下输入key为行首偏移量,value为每行数据 context.write(key, value); } }
想要执行该任务,现在还少一个驱动程序,接下来编写驱动程序的类,具体实现如下
package mrDay1.mapreduce.word; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //获取job对象 Job job = Job.getInstance(conf); //设置job方法入口的驱动类 job.setJarByClass(WordCountDriver.class); //设置Mapper组件类 job.setMapperClass(WordCountMapper.class); //设置mapper的输出key类型 job.setMapOutputKeyClass(LongWritable.class); //设置Mappper的输出value类型,注意Text的导包问题 job.setMapOutputValueClass(Text.class); //设置输入路径,下边的ip即是hadoop的安装主机名 //9000端口是表示hdfs的rpc通信端口 //如果指定的是目录,会执行当前目录下的所有非“_”开头的文件。 FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/word")); //设置输出结果路径,要求结果路径事先不能存在 FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/word/result")); //提交job,产生阻塞,直到job任务执行完成后才放开 job.waitForCompletion(true); } }
然后将编写完成的项目打成jar
包,并指定执行类。
将jar
包上传到Linux服务器,首先运行jps
命令,确保ResourceManager
、NodeManager
这两个进程已经运行,如果这两个进程没有运行,当执行hadoop jar
命令时会报错。
如果运行成功,则可以直接输入如下命令执行jar包。
hadoop jar wordcount.jar
其中wordcount.jar
即是打好的jar包。当执行结束,可以通过命令查看HDFS
里的word/result
文件夹下,会有一个part-r-00000
文件(如果reduce任务有多个,会出来多个文件),即结果,可以通过命令找到并查看该文件。
hadoop fs -cat /word/result/part-r-00000
会得到如下内容,即是我们的计算结果:
0 hello word
0 hello word
11 hellohadoop01
11 hellohadoop01
以上即是一个简单的含有Mapper组件的写法。
统计一个“word.txt”文件中不同单词出现的次数。文件内容具体如下:
hello word
hello hadoop01
i am a programer
实现方法如下:
首先编写Mapper类:
package mrDay1.mapreduce.word.example; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class EXWordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); //text类型没有截取方法,则转为string String[] words = line.split(" "); for (String word : words) { context.write(new Text(word), new IntWritable(1)); } } }
如果单纯的只写mapper组件,结果会如下:
a 1
am 1
hadoop01 1
hello 1
hello 1
i 1
programer 1
word 1
现在假如还要将上边的结果按照相同的key做聚合,那就需要reduce组件了
Reduce的工作原理:将相同的key做聚合,将value形成迭代器。以下为其API实现:
首先开发ExWordCountReducer
package mrDay1.mapreduce.word.reduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 1.第一个泛型类型对应的是reducer的输出key类型 * 2.第二个泛型类型对应的是reducer的输出value类型 * 3.第三个泛型类型是reduce的输出key类型 * 4.第四个泛型类型是reduce的输出value类型 * 5.reduce组件不能单独存在,因为要接收Mapper组件的输出 * 6.Mapper组件可以单独存在,当只有Mapper时,最后的结果文件时MapTask的输出 * 7.当既有Mapper又有Reduce时,最后的结果文件时Reduce的输出而Mapper的输出作为中间结果。 * */ public class ExWordCountReducer extends Reducer<Text, IntWritable, Text, Text>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, Text>.Context context) throws IOException, InterruptedException { String result = ""; for(IntWritable v : values) { result = result + "," + v.get(); } //做测试,看一下reducer组件传进来的key和Iterable context.write(key, new Text(result)); } }
然后修改driver,只需在driver中配置reduce即可:
package mrDay1.mapreduce.word; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import mrDay1.mapreduce.word.mapper.ExWordCountMapper; import mrDay1.mapreduce.word.reduce.ExWordCountReducer; public class ExWordCountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //获取job对象 Job job = Job.getInstance(conf); //设置job方法入口的驱动类 job.setJarByClass(ExWordCountDriver.class); //设置Mapper组件类 job.setMapperClass(ExWordCountMapper.class); //设置mapper的输出key类型 job.setMapOutputKeyClass(LongWritable.class); //设置Mappper的输出value类型,注意Text的导包问题 job.setMapOutputValueClass(Text.class); //设置reduce组件类 job.setReducerClass(ExWordCountReducer.class); //设置reduce输出的key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入路径 FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/word")); //设置输出结果路径,要求结果路径事先不能存在 FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/word/result")); //提交job job.waitForCompletion(true); } }
配置完成之后,打jar包并上传到分布式文件系统,执行即可,执行结果如下图。
a ,1
am ,1
hadoop01 ,1
hello ,1,1
i ,1
programer ,1
word ,1
注意:
① 要保证MapperTask
类中的第三、四泛型类型与ReduceTask
类中的第一、二泛型类型相同,因为MapperTask
中输出作为ReduceTask
中的输入使用,如果不相同则会错误。
② 要保证MapperTask
中的输出key和输出value的类型与driver
中设置的输出的key和value相同,否则也会报错。
ReduceTask
默认是一个1个分区,在生成结果文件的时候,只会生成一个,默认是以0开始的,比如part-r-00000
,如果想要设置成3个分区,只需要在Driver
里,加一行配置job.setNumReduceTasks(int tasks);
即可,最终的结果文件,也会有3个,依然是从0开始。比如part-r-00000
、part-r-00001
、part-r-00002
Partitioner组件
和 Combiner组件
写到了下一篇里。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。