赞
踩
参考本例子前:
1.需要确保搭建好了hadoop集群环境。
2.安装了eclipse开发hadoop的环境。
3.这是关于Hadoop实战之WordCount统计单词数目。
新建一个MapReducer项目:
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /* * KEYIN:输入kv数据对中key的数据类型 * VALUEIN:输入kv数据对中value的数据类型 * KEYOUT:输出kv数据对中key的数据类型 * VALUEOUT:输出kv数据对中value的数据类型 */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ /* * map方法是提供给map task进程来调用的,map task进程是每读取一行文本来调用一次我们自定义的map方法 * map task在调用map方法时,传递的参数: * 一行的起始偏移量LongWritable作为key * 一行的文本内容Text作为value */ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //拿到一行文本内容,转换成String 类型 String line = value.toString(); //将这行文本切分成单词,以空格切分。 String[] words=line.split(" "); //输出格式<单词,1> for(String word:words){ context.write(new Text(word), new IntWritable(1)); } } }
WordCount程序中的Map步的输出结果为<单词,1>对,上面有一个合并处理,将拥有相同key值的键值对进行合并,形成一个<key,valuelist>,这个<key,valuelist>的键值对集合,作为Reduce步的输入。
<key,valuelist>或者Iterable 的键值对集合,作为Reduce步的输入。
代码如下:
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /* * KEYIN:对应mapper阶段输出的key类型 * VALUEIN:对应mapper阶段输出的value类型 * KEYOUT:reduce处理完之后输出的结果kv对中key的类型 * VALUEOUT:reduce处理完之后输出的结果kv对中value的类型 */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override /* * reduce方法提供给reduce task进程来调用 * * reduce task会将shuffle阶段分发过来的大量kv数据对进行聚合,聚合的机制是相同key的kv对聚合为一组 * 然后reduce task对每一组聚合kv调用一次我们自定义的reduce方法 * 比如:<hello,1><hello,1><hello,1><tom,1><tom,1><tom,1> * hello组会调用一次reduce方法进行处理,tom组也会调用一次reduce方法进行处理 * 调用时传递的参数: * key:一组kv中的key * values:一组kv中所有value的迭代器 */ protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { //定义一个计数器 int count = 0; //通过value这个迭代器,遍历这一组kv中所有的value,进行累加 for(IntWritable value:values){ count+=value.get(); } //输出这个单词的统计结果 context.write(key, new IntWritable(count)); } }
context 对象是用来传递数据以及其他运行状态信息,map中的key、value写入context,让它传递给下一层。
3.xxxJob.java
提供主函数入口,加载job下的jar包,mapper,reducer文件。
代码如下:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountJob { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job wordCountJob = Job.getInstance(conf); //重要:指定本job所在的jar包 wordCountJob.setJarByClass(WordCountJob.class); //设置wordCountJob所用的mapper逻辑类为哪个类 wordCountJob.setMapperClass(WordCountMapper.class); //设置wordCountJob所用的reducer逻辑类为哪个类 wordCountJob.setReducerClass(WordCountReducer.class); //设置map阶段输出的kv数据类型 wordCountJob.setMapOutputKeyClass(Text.class); wordCountJob.setMapOutputValueClass(IntWritable.class); //设置最终输出的kv数据类型 wordCountJob.setOutputKeyClass(Text.class); wordCountJob.setOutputValueClass(IntWritable.class); //设置要处理的文本数据所存放的路径 FileInputFormat.setInputPaths(wordCountJob, args[0]); FileOutputFormat.setOutputPath(wordCountJob, new Path(args[1])); //提交job给hadoop集群 wordCountJob.waitForCompletion(true); } }
编码完成以后了,一般不再eclipse中运行,而是编码好了以后,打包为jar包以后,导出到hadoop下面去运行。
然后linux桌面上便出现了导出来的jar包
《注意:由于我是先做了再写文章的,可能图中名字不太一样,但是不影响。》
在home/gznc/下新建一个myjar文件夹,把wc.jar移动到myjar文件夹下。(你可以自己选择其他路径。无影响)
其次准备一个文本文件1.txt,放在那里无所谓。
里面直接填入一些单词!
这里我就随便写一个了:
启动你的hadoop集群环境。
我的是在root权限下操作。(你可以不一样。根据你自己的来启动)
1.命令:start-all.sh
或者单独启动:
使用start-dfs.sh 和 start-yarn.sh代替start-all.sh命令
启动完成:
这里需要注意的是在将文件内容上传至HDFS时,需要是已经创建了存放文件内容的文件夹,如果在上传之间我没有建立wordconut这个文件夹名,
那么在上传之前我需要利用命令hadoop fs –mkdir /user/gznc/wordconut先创建wordconut文件夹。
1.使用hadoop命令新建一个wordcount文件夹
hadoop fs -mkdir /wordcount
查看是否创建成功:
hadoop fs -ls / 或者 –ls –R / 查看所有
2.使用hadoop命令把txt文本上传到新建的那个文件夹中去。
把1.txt上传到新建的文件夹里面去和查看是否上传成功:
hadoop fs -put /home/gznc/1.txt /wordcount
hadoop fs -ls /wordcount
查看一下文本内容是不是正常的:
hadoop fs -cat /wordcount/1.txt
hadoop jar /home/gznc/myjar/wc.jar cn.edu.gznc.wc.WcJob hdfs://master:9000/wordcount hdfs://master:9000/outputfile
解释如下:
hadoop jar :hadoop命令
wc.jar:导出的jar包
cn.edu.gznc.wc.WcJob:wcjob文件的路径。
Wordcount:新建的,用来保存单词文本的文件夹。
Outputfile:统计以后输出的文件夹—自动创建。
需要注意的是这里输出的路径在之前不需要建立,但每次跑集群时一定要注意输出路径或者路径名不能一致。
运行结果:
4.使用hadoop命令查看统计的单词结果文本:
查看统计结果:
hadoop fs -ls /outputfile
hadoop fs -cat /outputfile/part-r-00000
延展知识:
hadoop fs –ls /----查看建立的hadoop创建的文件目录
hadoop fs –ls –R /—查看文件系统目录的根目录
hadoop fs –rm –r /output 删除hadoop文件系统目录:
总结:
HDFS和MapReduce是Hadoop的两个重要核心,其中MapReduce是Hadoop的分布式计算模型。
MapReduce主要分为两步Map步和Reduce步,引用一个故事来解释:
现在你要统计一个图书馆里面有多少本书,为了完成这个任务,你可以指派小明去统计书架1,指派小红去统计书架2,这个指派的过程就是Map步,最后,每个人统计完属于自己负责的书架后,再对每个人的结果进行累加统计,累计统计过程就是Reduce步。
假设现在有n个文本,WordCount程序就是利用MR计算模型来统计这n个文本中每个单词出现的总次数。
现在有两个文件:
• File 0:有两行,第一行的内容为“Hello World”,第二行的内容为“Hello Hadoop”
• File 1:有两行,第一行的内容为“Bye World”,第二行的内容为“Bye Hadoop”
假设我们现在要统计这两个文件每种单词出现的次数,首先我们要对每个文本进行处理,即把其中的句子划分成词语,按照上面讲到的统计图书的故事,我们会将这两个文件分派给两个人,让这两个人各自去处理,待这两个人都处理完成之后,再对结果进行汇总统计,在图中充当这两个人角色的就是Map1和Map2,Map步的输入为<key,value>对,输出也为<key,value>对。
统计单词原理图示:
到这里我们的Hadoop实战之WordCount统计单词数目就完成了。
下一篇文章,将继续hadoop实战。
需要源代码的可以下方留下邮箱。
You got a dream, you gotta protect it.
如果你有梦想的话,就要去捍卫它 。 ——《当幸福来敲门》
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。