赞
踩
from educoder实训
实训项目地址:https://www.educoder.net/shixuns/aekgf6pz/challenges
词频统计是最能体现MapReduce思想的程序,结构简单,上手容易。
词频统计的大致功能是:统计单个或者多个文本文件中每个单词出现的次数,并将每个单词及其出现频率按照<k,v>键值对的形式输出,其基本执行流程如下图所示:
由图可知:
主节点对预设文本文档进行词频统计,并将最终结果输出。
注:输入和输出事先已经预定,只要比较输出是否达到预期即可判断是否达到要求。
MapReduce采用”分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题。将处理过程高度抽象为两个函数:map和reduce。
map负责把任务分解成多个任务;
reduce负责把分解后多任务处理的结果汇总起来。
注:MapReduce处理的数据集必须可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。不是关系型数据库,而是结构化的。
对于给定的待处理文本文档,其map阶段的处理如下:
通过Text对象,获取文本文档的内容。
逐行处理文档,将单词提取出来。
每个单词为key,对应的value设为1,将<k2,v2>对输出。
关键性说明:
map阶段的处理,主要是如何对文本进行逐行的单词分割,从而获取单词,以及将键值对分发到各个节点(此处由hadoop隐性提供,用户先不必关心hdfs存储过程)。
可以参考的单词分割提取代码模板如下:
public void map(Object key,Text value,Context context)throws IOException,InterruptedException
{
//对文本内容对象value进行分割
StringTokenizer itr=new StringTokenizer(valu e.toString());
while(itr.hasMoreTokens()) {
String word=itr.nextToken();/*获取分割好的单词*/
/*
可以在该循环体中,使用获取好的单词word变量进行key和value的设定。
*/
}
}
在Wordcount的reduce阶段,主要是将每个单词的数量统计出来,包括:
在各个节点并行循环统计每个单词出现的次数。
将各个节点的结果汇总以list(<k3,v3>)的形式输出。
reduce函数参考模板:
public void reduce(Object key,Iterable<IntWritable> values,Context context)throws IOException, InterruptedException
{
int count=0;
for(IntWritable itr:vlaues)
{
count+=itr.get(); /*循环统计*/
}
/*统计完成后,将结果输出.....*/
}
本关的编程任务是补全右侧代码片段中map和reduce函数中的代码,具体要求及说明如下:
在主函数main中已初始化hadoop的系统设置,包括hadoop运行环境的连接。
在main函数中,已经设置好了待处理文档路径(即input),以及结果输出路径(即output)。
在main函数中,已经声明了job对象,程序运行的工作调度已经设定好。
本关只要求在map和reduce函数的指定区域进行代码编写,其他区域请勿改动。
以下是测试样例:
测试输入样例数据集:文本文档test1.txt和test2.txt
文档test1.txt中的内容为:
tale as old as time
true as it can be
beauty and the beast
文档test2.txt中的内容为:
ever just the same
ever as before
beauty and the beast
预期输出result.txt文档中的内容为:
and 2
as 4
beast 2
beauty 2
before 1
can 1
ever 2
it 1
just 1
old 1
same 1
tale 1
the 3
time 1
true 1
注:由于启动服务、编译等耗时,以及MapReduce过程资源消耗较大,评测时间较长(30s左右)!
请耐心等待!相信自己!通往成功的路上不会太久!
建议完成本关后尝试在本机上根据相关指导搭建环境运行程序,这样理解更深刻!
合抱之木,生于毫末;九层之台,起于累土!复杂源于简单,要想铸就高楼大厦必须打牢基础!
开始你的任务吧,祝你成功!
如果你觉得这一关的内容对你有帮助,请你在下面点赞。
import java.io.IOException; import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text; import
org.apache.hadoop.mapreduce.Job; import
org.apache.hadoop.mapreduce.Mapper; import
org.apache.hadoop.mapreduce.Reducer; import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import
org.apache.hadoop.util.GenericOptionsParser; public class
WordCount { /* * MapReduceBase类:实现Mapper和Reducer接口的基类 *
Mapper接口: *
WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类要实现此接口。
*/ public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{ /* *LongWritable,IntWritable,Text是Hadoop中实现的用于封装Java数据类型的类,这些类实现了WritableComparable接口,
*都能够被串行化,便于在分布式环境中进行数据交换,可以视为long,int,String数据类型的替代。 */ private final static IntWritable one = new IntWritable(1);
private Text word = new Text();//Text实现了BinaryComparable类,可以作为key值
/* * Mapper接口中的map方法: * void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter) *
映射一个单个的输入<K1,V1>对到一个中间输出<K2,V2>对 *
中间输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。 *
OutputCollector接口:收集Mapper和Reducer输出的<K,V>对。 *
OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output * Reporter
用于报告整个应用的运行进度
*/
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
/*
* 原始数据(以test1.txt为例):
* tale as old as time true as it can be beauty and the beast map阶段,数据如下形式作为map的输入值:key为偏移量 <0 tale as old as time>
<21 world java hello> <39 you me too>
*/
/**
* 解析(Spliting)后以得到键值对<K2,V2>(仅以test1.txt为例)
* 格式如下:前者是键值,后者数字是值
* tale 1
* as 1
* old 1
* as 1
* time 1
* true 1
* as 1
* it 1
* can 1
* be 1
* beauty 1
* and 1
* the 1
* beast 1
* 这些键值对作为map的输出数据
*/
//****请补全map函数内容****// /*********begin*********/
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
/*********end**********/
} } public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();
/* * reduce过程是对输入键值对洗牌(Shuffing)形成<K2,list(V2)>格式数据(仅以test1.txt为例): *
(tablie [1]) * (as [1,1,1]) * (old [1]) * (time [1]) * (true
[1]) * (it [1]) * (can [1]) * (be [1]) * (beauty [1]) *
(and [1]) * (the [1]) * (beast [1]) * 作为reduce的输入 * */
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException { //****请补全reduce对<k2, list(v2)> 进行合计得到list(<k3,v3>)过程****//
/*********begin*********/
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
/*********end**********/
//****请将list(<k3,v3>)统计输出****//
/*********begin*********/
/*********end**********/ } } public static void main(String[] args) throws Exception { /**
* JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作
* 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等
*/ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
/* * 需要配置输入和输出的HDFS的文件路径参数 * 可以使用"Usage: wordcount <in>
<out>"实现程序运行时动态指定输入输出 */ if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2); } Job job = new Job(conf, "word count");//Job(Configuration conf,String jobName)设置job名称
job.setJarByClass(WordCount.class);//为job设置Mapper类
/*********begin*********/
//****请为job设置Mapper类****// job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);//为job设置Combiner类
//****请为job设置Reduce类****// job.setReducerClass(IntSumReducer.class);
//****请设置输出key的参数类型****// job.setOutputKeyClass(Text.class);
//****请设置输出value的类型****//
job.setOutputValueClass(IntWritable.class);
/*********end**********/
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//为map-reduce任务设置InputFormat实现类,设置输入路径
FileOutputFormat.setOutputPath(job, new
Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类,设置输出路径
System.exit(job.waitForCompletion(true) ? 0 : 1); } }
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。