赞
踩
目录
参考《Hadoop安装部署》实验,安装部署配置了三个数据节点的Hadoop集群
本实验的工作目录为~/course/hadoop/mr_pro
,使用以下命令创建和初始化工作目录:
- $ mkdir -p ~/course/hadoop/mr_pro
- $ cd ~/course/hadoop/mr_pro
在桌面右键打开终端输入如下命令打开eclipse:
eclipse &
打开eclipse后选择/headless/course/hadoop/mr_pro
做为工作空间
1.在eclipse中依次点击:File->New->Project->Map/Reduce Project->Next
。
2.在项目名称(Project Name)处填入WordCount
,将工程位置设置为文件夹/headless/course/hadoop/mr_pro/WordCount
,点击Finish
。
新建终端,使用如下命令新建一个文本文件:
- # cd ~/course/hadoop/mr_pro/WordCount/
- # mkdir target
- # mkdir data
- # cd data
- # echo "Hello World" >> file1.txt
- # echo "Hello MapReduce" >> file2.txt
使用如下命令进入master节点:
# docker exec -it --privileged master /bin/bash
主机的~/course
目录挂载到了master节点的/course
目录。
在master节点中使用如下命令新建目录,并将文本文件上传到目录:
- # hadoop fs -mkdir -p mapreduce/WordCount/input
- # cd /course/hadoop/mr_pro/WordCount/data
- # hadoop fs -put file1.txt file2.txt mapreduce/WordCount/input
- # hadoop fs -ls mapreduce/WordCount/input
- Found 2 items
- -rw-r--r-- 3 bd1_cg bd1 12 2018-12-20 17:59 mapreduce/WordCount/input/file1.txt
- -rw-r--r-- 3 bd1_cg bd1 16 2018-12-20 17:59 mapreduce/WordCount/input/file2.txt
MapReduce 编程框架分为三个部分,请在 Eclipse 中的 WordCount
下分别创建如下三个类,内容分别如下:
(1).WcMapper
- public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{
- //重写map这个方法
- //mapreduce框架每读一行数据就调用一次该方法
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- //具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中key-value
- //key是这一行数据的起始偏移量,value是这一行的文本内容
- }
- }
(2).WcReducer
- public class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
-
- //继承Reducer之后重写reduce方法
- //第一个参数是key,第二个参数是集合。
- //框架在map处理完成之后,将所有key-value对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法
- protected void reduce(Text key, Iterable<LongWritable> values,Context context)
- throws IOException, InterruptedException {
-
- }
- }
(3).WcRunner
- public class WcRunner {
-
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- //创建配置文件
- Configuration conf = new Configuration();
- //获取一个作业
- Job job = Job.getInstance(conf);
-
- //设置整个job所用的那些类在哪个jar包
- job.setJarByClass(WcRunner.class);
-
- //本job使用的mapper和reducer的类
- job.setMapperClass(WcMap.class);
- job.setReducerClass(WcReduce.class);
-
- //指定reduce的输出数据key-value类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
-
-
- //指定mapper的输出数据key-value类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
-
- //指定要处理的输入数据存放路径
- FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.51.149:9000/user/cg/input"));
-
- //指定处理结果的输出数据存放路径
- FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.51.149:9000/user/cg/output"));
-
- //将job提交给集群运行
- job.waitForCompletion(true);
- }
- }

Map过程需要继承org.apache.hadoop.mapreduce
包中Mapper
类,并重写其map
方法。通过在map
方法中添加两句把key
值和value
值输出到控制台的代码,可以发现map
方法中value
值存储的是文本文件中的一行(以回车符为行结束标记),而key
值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer
类将每一行拆分成为一个个的单词,并将<word,1>
作为map
方法的结果输出,其余的工作都交有MapReduce
框架处理。
完整代码与解析如下:
- import java.io.IOException;
- import org.apache.commons.lang.StringUtils;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- /***
- *
- * @author Administrator
- * 1:4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的值
- * KEYOUT是输入的key的类型,VALUEOUT是输入的value的值
- * 2:map和reduce的数据输入和输出都是以key-value的形式封装的。
- * 3:默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value
- * 4:key-value数据是在网络中进行传递,节点和节点之间互相传递,在网络之间传输就需要序列化,但是jdk自己的序列化很冗余
- * 所以使用hadoop自己封装的数据类型,而不要使用jdk自己封装的数据类型;
- * Long--->LongWritable
- * String--->Text
- */
- public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{
- //重写map这个方法
- //mapreduce框架每读一行数据就调用一次该方法
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- //具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中key-value
- //key是这一行数据的起始偏移量,value是这一行的文本内容
-
- //1:
- String str = value.toString();
- //2:切分单词,空格隔开,返回切分开的单词
- String[] words = StringUtils.split(str," ");
- //3:遍历这个单词数组,输出为key-value的格式,将单词发送给reduce
- for(String word : words){
- //输出的key是Text类型的,value是LongWritable类型的
- context.write(new Text(word), new LongWritable(1));
- }
- }
- }

Reduce
过程需要继承org.apache.hadoop.mapreduce
包中Reducer
类,并重写其reduce
方法。Map
过程输出<key,values>
中key
为单个单词,而values
是对应单词的计数值所组成的列表,Map
的输出就是Reduce
的输入,所以reduce
方法只要遍历values
并求和,即可得
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。