赞
踩
词频统计是最能体现MapReduce
思想的程序,结构简单,上手容易。
词频统计的大致功能是:统计单个或者多个文本文件中每个单词出现的次数,并将每个单词及其出现频率按照<k,v>
键值对的形式输出,其基本执行流程如下图所示:
由图可知:
<k1,v1>
键值对,具体形式很多,例如<行数,字符偏移>
等;Spliting
将<k1,v1>
细化为单词键值对<k2,v2>
;Map
分发到各个节点,同时将<k2,v2>
归结为list(<k2,v2>);Shuffing
将相同主键k2归结在一起形成<k2,list(v2)>
;Reduce
阶段直接对<k2, list(v2)>
进行合计得到list(<k3,v3>)
并将结果返回主节点。MapReduce
采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题。将处理过程高度抽象为两个函数:map
和reduce
。
map负责把任务分解成多个任务;
reduce负责把分解后多任务处理的结果汇总起来。
注:MapReduce
处理的数据集必须可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。不是关系型数据库,而是结构化的。
对于给定的待处理文本文档,其map阶段的处理如下:
通过Text对象,获取文本文档的内容。
逐行处理文档,将单词提取出来。
每个单词为key,对应的value设为1,将<k2,v2>对输出。
关键性说明:
map
阶段的处理,主要是如何对文本进行逐行的单词分割,从而获取单词,以及将键值对分发到各个节点(此处由hadoop
隐性提供,用户先不必关心hdfs存储过程)。
可以参考的单词分割提取代码模板如下:
public void map(Object key,Text value,Context context)throws IOException,InterruptedException
{
//对文本内容对象value进行分割
StringTokenizer itr=new StringTokenizer(valu e.toString());
while(itr.hasMoreTokens()) {
String word=itr.nextToken();/*获取分割好的单词*/
/*
可以在该循环体中,使用获取好的单词word变量进行key和value的设定。
*/
}
}
在Wordcount
的reduce
阶段,主要是将每个单词的数量统计出来,包括:
reduce
函数参考模板:public void reduce(Object key,Iterable<IntWritable> values,Context context)throws IOException, InterruptedException
{
int count=0;
for(IntWritable itr:vlaues)
{
count+=itr.get(); /*循环统计*/
}
/*统计完成后,将结果输出.....*/
}
本关的编程任务是补全右侧代码片段中map
和reduce
函数中的代码,具体要求及说明如下:
以下是测试样例:
测试输入样例数据集:文本文档test1.txt
和test2.txt
文档test1.txt中的内容为:
tale as old as time
true as it can be
beauty and the beast
文档test2.txt中的内容为:
ever just the same
ever as before
beauty and the beast
预期输出result.txt
文档中的内容为:
and 2
as 4
beast 2
beauty 2
before 1
can 1
ever 2
it 1
just 1
old 1
same 1
tale 1
the 3
time 1
true 1
``
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
/*
* MapReduceBase类:实现Mapper和Reducer接口的基类
* Mapper接口:
* WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类要实现此接口。
*/
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
/*
*LongWritable,IntWritable,Text是Hadoop中实现的用于封装Java数据类型的类,这些类实现了WritableComparable接口,
*都能够被串行化,便于在分布式环境中进行数据交换,可以视为long,int,String数据类型的替代。
*/
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();//Text实现了BinaryComparable类,可以作为key值
/*
* Mapper接口中的map方法:
* void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
* 映射一个单个的输入<K1,V1>对到一个中间输出<K2,V2>对
* 中间输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。
* OutputCollector接口:收集Mapper和Reducer输出的<K,V>对。
* OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output
* Reporter 用于报告整个应用的运行进度
*/
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
/*
* 原始数据(以test1.txt为例):
* tale as old as time
true as it can be
beauty and the beast
map阶段,数据如下形式作为map的输入值:key为偏移量
<0 tale as old as time>
<21 world java hello>
<39 you me too>
*/
/**
* 解析(Spliting)后以得到键值对<K2,V2>(仅以test1.txt为例)
* 格式如下:前者是键值,后者数字是值
* tale 1
* as 1
* old 1
* as 1
* time 1
* true 1
* as 1
* it 1
* can 1
* be 1
* beauty 1
* and 1
* the 1
* beast 1
* 这些键值对作为map的输出数据
*/
//****请补全map函数内容****//
/*********begin*********/
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
/*********end**********/
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
/*
* reduce过程是对输入键值对洗牌(Shuffing)形成<K2,list(V2)>格式数据(仅以test1.txt为例):
* (tablie [1])
* (as [1,1,1])
* (old [1])
* (time [1])
* (true [1])
* (it [1])
* (can [1])
* (be [1])
* (beauty [1])
* (and [1])
* (the [1])
* (beast [1])
* 作为reduce的输入
*
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
//****请补全reduce对<k2, list(v2)> 进行合计得到list(<k3,v3>)过程****//
/*********begin*********/
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
/*********end**********/
//****请将list(<k3,v3>)统计输出****//
/*********begin*********/
/*********end**********/
}
}
public static void main(String[] args) throws Exception {
/**
* JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作
* 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等
*/
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
/*
* 需要配置输入和输出的HDFS的文件路径参数
* 可以使用"Usage: wordcount <in> <out>"实现程序运行时动态指定输入输出
*/
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");//Job(Configuration conf,String jobName)设置job名称
job.setJarByClass(WordCount.class);//为job设置Mapper类
/*********begin*********/
//****请为job设置Mapper类****//
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
//****请为job设置Reduce类****//
//****请设置输出key的参数类型****//
//****请设置输出value的类型****//
/*********end**********/
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//为map-reduce任务设置InputFormat实现类,设置输入路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类,设置输出路径
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
利用HDFS文件系统开放的API对HDFS系统进行文件的创建和读写
要求:
在HDFS的路径/user/hadoop/下新建文件myfile,并且写入内容“china cstor cstor cstor china”;
输出HDFS系统中刚写入的文件myfile的内容
HDFS文件系统
HDFS设计成能可靠地在集群中大量机器之间存储大量的文件,它以块序列的形式存储文件。文件中除了最后一个块,其他块都有相同的大小(一般64M)。属于文件的块为了故障容错而被复制到不同节点备份(备份数量有复制因子决定)。块的大小和读写是以文件为单位进行配置的。HDFS中的文件是一次写的,并且任何时候都只有一个写操作,但是可以允许多次读。
创建HDFS文件
HDFS文件还提供文件数据流操作API,利用这些可以将文件读取简化为三大步骤。
//读取hadoop文件系统配置
Configuration conf = new Configuration(); //实例化设置文件,configuration类实现hadoop各模块之间值的传递
FileSystem fs = FileSystem.get(conf); //是hadoop访问系统的抽象类,获取文件系统, FileSystem的get()方法得到实例fs,然后fs调动create()创建文件,open()打开文件
System.out.println(fs.getUri());
Path file = new Path(""); //命名一个文件及路径
if (fs.exists(file)) {
System.out.println("File exists.");
} else
{
通过输入数据流进行写入
FSDataOutputStream outStream = fs.create(file); //获取文件流
outStream.writeUTF("XXXXXXXX"); //使用文件流写入文件内容
通过输出数据流将文件内容输出
// FSDataInputStream实现了和接口,从而使Hadoop中的文件输入流具有流式搜索和流式定位读取的功能
String data = inStream.readUTF(); //使用输出流读取文件
本关的编程任务是补全右侧代码片段中的代码,具体要求及说明如下:
本关无测试样例,直接比较文件内容确定输出是否为“china cstor cstor cstor china”
import java.io.IOException;
import java.sql.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class hdfs {
public static void main(String[] args) throws IOException {
//throws IOException捕获异常声明
//****请根据提示补全文件创建过程****//
/*********begin*********/
//实现文件读写主要包含以下步骤:
//读取hadoop文件系统配置
//实例化设置文件,configuration类实现hadoop各模块之间值的传递
//FileSystem是hadoop访问系统的抽象类,获取文件系统, FileSystem的get()方法得到实例fs,然后fs调动create()创建文件,调用open()打开文件,调用close()关闭文件
// 读取hadoop文件系统配置
Configuration conf = new Configuration();
// 实例化文件系统
FileSystem fs = FileSystem.get(conf);
//*****请按照题目填写要创建的路径,其他路径及文件名无法被识别******//
Path file = new Path("/user/hadoop/myfile");
/*********end**********/
if (fs.exists(file)) {
System.out.println("File exists.");
} else
{
//****请补全使用文件流将字符写入文件过程,使用outStream.writeUTF()函数****//
/*********begin*********/
// 使用文件流将字符写入文件
FSDataOutputStream outStream = fs.create(file);
outStream.writeUTF("china cstor cstor cstor china");
outStream.close();
/*********end**********/
}
//****请补全读取文件内容****//
/*********begin*********/
// 提示:FSDataInputStream实现接口,使Hadoop中的文件输入流具有流式搜索和流式定位读取的功能
// 读取文件内容
FSDataInputStream inStream = fs.open(file);
String data = inStream.readUTF();
/*********end**********/
//输出文件状态
//FileStatus对象封装了文件的和目录的元数据,包括文件长度、块大小、权限等信息
FileSystem hdfs = file.getFileSystem(conf);
FileStatus[] fileStatus = hdfs.listStatus(file);
for(FileStatus status:fileStatus)
{
System.out.println("FileOwer:"+status.getOwner());//所有者
System.out.println("FileReplication:"+status.getReplication());//备份数
System.out.println("FileModificationTime:"+new Date(status.getModificationTime()));//目录修改时间
System.out.println("FileBlockSize:"+status.getBlockSize());//块大小
}
System.out.println(data);
System.out.println("Filename:"+file.getName());
inStream.close();
fs.close();
}
}
要求:编写处理带词频属性的文档倒排索引程序,运行程序,对莎士比亚文集文档数据进行倒排索引处理,结果输出到指定文件。
注:输入输出文件的路径已经指定
文档(Document):一般搜索引擎的处理对象是互联网网页,而文档这个概念要更宽泛些,代表以文本形式存在的存储对象,相比网页来说,涵盖更多种形式,比如Word,PDF,html,XML等不同格式的文件都可以称之为文档。再在本关后续内容,很多情况下会使用文档来表征文本信息。
文档集合(Document Collection):由若干文档构成的集合称之为文档集合。
文档编号(Document ID):在搜索引擎内部,会将文档集合内每个文档赋予一个唯一的内部编号,以此编号来作为这个文档的唯一标识,这样方便内部处理,每个文档的内部编号即称之为“文档编号”,后文有时会用DocID来便捷地代表文档编号。
单词编号(Word ID):与文档编号类似,搜索引擎内部以唯一的编号来表征某个单词,单词编号可以作为某个单词的唯一表征。
倒排索引:
倒排索引(Inverted Index):倒排索引是实现“单词-文档矩阵”的一种具体存储形式,通过倒排索引,可以根据单词快速获取包含这个单词的文档列表。倒排索引主要由两个部分组成:“单词词典”和“倒排文件”。
单词词典(Lexicon):搜索引擎的通常索引单位是单词,单词词典是由文档集合中出现过的所有单词构成的字符串集合,单词词典内每条索引项记载单词本身的一些信息以及指向“倒排列表”的指针。
倒排列表(PostingList):倒排列表记载了出现过某个单词的所有文档的文档列表及单词在该文档中出现的位置信息,每条记录称为一个倒排项(Posting)。根据倒排列表,即可获知哪些文档包含某个单词。
倒排文件(Inverted File):所有单词的倒排列表往往顺序地存储在磁盘的某个文件里,这个文件即被称之为倒排文件,倒排文件是存储倒排索引的物理文件。
编程要求
本关的编程任务是补全右侧代码片段中map和reduce函数中的代码,具体要求及说明如下:
测试输入样例数据集:文本文档test1.txt, test2.txt
文档test1.txt中
的内容为:
tale as old as time
true as it can be
beauty and the beast
文档test2.txt
中的内容为:
ever just the same
ever as before
beauty and the beast
import java.io.IOException;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.util.Iterator;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedIndex {
public static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
String word;
IntWritable frequence=new IntWritable();
int one=1;
Hashtable<String,Integer> hashmap=new Hashtable();//key关键字设置为String
StringTokenizer itr = new StringTokenizer(value.toString());
//****请用hashmap定义的方法统计每一行中相同单词的个数,key为行值是每一行对应的偏移****//
/*********begin*********/
while (itr.hasMoreTokens()) {
word = itr.nextToken();
if (hashmap.containsKey(word)) {
hashmap.put(word, hashmap.get(word) + 1);
} else {
hashmap.put(word, one);
}
}
/*********end**********/
for(Iterator<String> it=hashmap.keySet().iterator();it.hasNext();){
word=it.next();
frequence=new IntWritable(hashmap.get(word));
Text fileName_frequence = new Text(fileName+"@"+frequence.toString());//以<K2,“单词 文件名@出现频次”> 的格式输出
context.write(new Text(word),fileName_frequence);
}
}
}
public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text>{
protected void reduce(Text key,Iterable<Text> values,Context context)
throws IOException ,InterruptedException{
//****请合并mapper函数的输出,并提取“文件@1”中‘@’后面的词频,以<K2,list(“单词 文件名@出现频次”)>的格式输出****//
/*********begin*********/
HashMap<String, Integer> map = new HashMap<String, Integer>();
for (Text value : values) {
String str = value.toString();
String[] parts = str.split("@");
String fileName = parts[0];
int frequency = Integer.parseInt(parts[1]);
if (map.containsKey(fileName)) {
map.put(fileName, map.get(fileName) + frequency);
} else {
map.put(fileName, frequency);
}
}
for (String fileName : map.keySet()) {
Text result = new Text(fileName + "@" + map.get(fileName));
context.write(key, result);
}
}
}
/*********end**********/
public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>
{ @Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{ Iterator<Text> it = values.iterator();
StringBuilder all = new StringBuilder();
if(it.hasNext()) all.append(it.next().toString());
for(;it.hasNext();) {
all.append(";");
all.append(it.next().toString());
}
//****请输出最终键值对list(K3,“单词", “文件1@频次; 文件2@频次;...")****//
/*********begin*********/
// 输出最终键值对list(K3,“单词", “文件1@频次; 文件2@频次;...")//
context.write(key, new Text(all.toString()));
/*********end**********/
}
}
public static void main(String[] args)
{
if(args.length!=2){
System.err.println("Usage: InvertedIndex <in> <out>");
System.exit(2);
}
try {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "invertedindex");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(InvertedIndexMapper.class);
//****请为job设置Combiner类****//
/*********begin*********/
job.setCombinerClass(InvertedIndexCombiner.class);
/*********end**********/
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
//****请设置输出value的类型****//
/*********begin*********/
job.setOutputValueClass(Text.class);
/*********end**********/
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
编写实现网页数据集PageRank算法的程序,对网页数据集进行处理得到网页权重排序。
######PageRank算法原理
基本思想:
如果网页T存在一个指向网页A的连接,则表明T的所有者认为A比较重要,从而把T的一部分重要性得分赋予A。这个重要性得分值为:PR(T)/L(T)
其中PR(T)为T的PageRank值,L(T)为T的出链数。则A的PageRank值为一系列类似于T的页面重要性得分值的累加。
即一个页面的得票数由所有链向它的页面的重要性来决定,到一个页面的超链接相当于对该页投一票。一个页面的PageRank是由所有链向它的页面(链入页面)的重要性经过递归算法得到的。一个有较多链入的页面会有较高的等级,相反如果一个页面没有任何链入页面,那么它没有等级。
PageRank简单计算:
假设一个由只有4个页面组成的集合:A,B,C和D。如图所示,如果所有页面都链向A,那么A的PR(PageRank)值将是B,C及D的和。
继续假设B也有链接到C,并且D也有链接到包括A的3个页面。一个页面不能投票2次。所以B给每个页面半票。以同样的逻辑,D投出的票只有三分之一算到了A的PageRank上。
换句话说,根据链出总数平分一个页面的PR值。
完整PageRank计算公式
由于存在一些出链为0不链接任何其他网页的网页,因此需要对 PageRank公式进行修正,即在简单公式的基础上增加了阻尼系数(damping factor)q, q一般取值q=0.85
更加准确的表达为:
P1,P2,…,Pn是被研究的页面,M(Pi)是Pi链入页面的数量,L(Pj)是Pj链出页面的数量,而N是所有页面的数量。PageRank值是一个特殊矩阵中的特征向量。这个特征向量为:
R是如下等式的一个解:
如果网页i有指向网页j的一个链接,则
否则
=0.
PageRank计算过程
PageRank 公式可以转换为求解的值,
其中矩阵为 A = q × P + ( 1 一 q) * 。 P 为概率转移矩阵,为 n 维的全 1 行.
则=
幂法计算过程如下:
X 设任意一个初始向量, 即设置初始每个网页的 PageRank值均。一般为1。R = AX。
while (1){
if ( |X - R| < e)
return R; //如果最后两次的结果近似或者相同,返回R
else {
X =R;
R = AX;
}
}
本关的编程任务是补全右侧代码片段中map和reduce
函数中的代码,具体要求及说明如下:
main
中已初始化hadoop
的系统设置,包括hadoop运行环境的连接。main
函数中,已经设置好了待处理文档路径(即input
),在评测中设置了结果输出路径(即output
),不要修改循环输出路径即可保证完成。job
对象,程序运行的工作调度已经设定好。map
和reduce
函数的指定区域进行代码编写,其他区域请勿改动。以下是测试样例:
输入文件格式如下:
1 1.0 2 3 4 5 6 7 8
2 2.0 3 4 5 6 7 8
3 3.0 4 5 6 7 8
4 4.0 5 6 7 8
5 5.0 6 7 8
6 6.0 7 8
7 7.0 8
8 8.0 1 2 3 4 5 6 7
注:为了简化运算,已经对网页集关系进行了规整,并且给出了相应的初始PR值。
以第一行为例: 1表示网址(以tab键隔开),1.0为给予的初始pr值,2,3,4,5,6,7,8为从网址1指向的网址。
输出文件格式:
The origin result
1 1.0 2 3 4 5 6 7 8
2 2.0 3 4 5 6 7 8
3 3.0 4 5 6 7 8
4 4.0 5 6 7 8
5 5.0 6 7 8
6 6.0 7 8
7 7.0 8
8 8.0 1 2 3 4 5 6 7
The 1th result
1 0.150 1.121 _2 3 4 5 6 7 8
2 0.150 1.243 _3 4 5 6 7 8
3 0.150 1.526 _4 5 6 7 8
4 0.150 2.036 _5 6 7 8
5 0.150 2.886 _6 7 8
6 0.150 4.303 _7 8
7 0.150 6.853 _8
8 0.150 11.831 _1 2 3 4 5 6 7
The 2th result
1 0.150 1.587 _2 3 4 5 6 7 8
2 0.150 1.723 _3 4 5 6 7 8
3 0.150 1.899 _4 5 6 7 8
4 0.150 2.158 _5 6 7 8
5 0.150 2.591 _6 7 8
6 0.150 3.409 _7 8
7 0.150 5.237 _8
8 0.150 9.626 _1 2 3 4 5 6 7
The 3th result
1 0.150 1.319 _2 3 4 5 6 7 8
2 0.150 1.512 _3 4 5 6 7 8
3 0.150 1.756 _4 5 6 7 8
4 0.150 2.079 _5 6 7 8
5 0.150 2.537 _6 7 8
6 0.150 3.271 _7 8
7 0.150 4.720 _8
8 0.150 8.003 _1 2 3 4 5 6 7
The 4th result
1 0.150 1.122 _2 3 4 5 6 7 8
2 0.150 1.282 _3 4 5 6 7 8
3 0.150 1.496 _4 5 6 7 8
4 0.150 1.795 _5 6 7 8
5 0.150 2.236 _6 7 8
6 0.150 2.955 _7 8
7 0.150 4.345 _8
8 0.150 7.386 _1 2 3 4 5 6 7
The 5th result
1 0.150 1.047 _2 3 4 5 6 7 8
2 0.150 1.183 _3 4 5 6 7 8
3 0.150 1.365 _4 5 6 7 8
4 0.150 1.619 _5 6 7 8
5 0.150 2.000 _6 7 8
6 0.150 2.634 _7 8
7 0.150 3.890 _8
8 0.150 6.686 _1 2 3 4 5 6 7
注:迭代方法和次数不同会对结果产生影响,不必完全与答案匹配,只需运行结果趋于合理即可。(第二列为多余值)
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.StringTokenizer;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class PageRank {
public static class MyMapper extends Mapper<Object, Text, Text, Text>
{
private Text id = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException
{
String line = value.toString();
if(line.substring(0,1).matches("[0-9]{1}"))
{
boolean flag = false;
if(line.contains("_"))
{
line = line.replace("_","");
flag = true;
}
String[] values = line.split("\t");
Text t = new Text(values[0]);
String[] vals = values[1].split(" ");
String url="_";
double pr = 0;
int i = 0;
int num = 0;
if(flag)
{
i=2;
pr=Double.valueOf(vals[1]);
num=vals.length-2;
}
else
{
i=1;
pr=Double.valueOf(vals[0]);
num=vals.length-1;
}
for(;i<vals.length;i++)
{
url=url+vals[i]+" ";
id.set(vals[i]);
Text prt = new Text(String.valueOf(pr/num));
context.write(id,prt);
}
context.write(t,new Text(url));
}
}
}
public static class MyReducer extends Reducer<Text,Text,Text,Text>
{
private Text result = new Text();
private Double pr = new Double(0);
public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException
{
double sum=0;
String url="";
for(Text val:values)
{
if(!val.toString().contains("_"))
{
sum=sum+Double.valueOf(val.toString());
}
else
{
url=val.toString();
}
}
pr=0.15+0.85*sum;
String str=String.format("%.3f",pr);
result.set(new Text(str+" "+url));
context.write(key,result);
}
}
public static void main(String[] args) throws Exception
{
String paths="file:///tmp/input/Wiki0";
String path1=paths;
String path2="";
for(int i=1;i<=20;i++)
{
System.out.println("This is the "+i+"th job!");
System.out.println("path1:"+path1);
System.out.println("path2:"+path2);
Configuration conf = new Configuration();
Job job = new Job(conf, "PageRank");
path2=paths+i;
job.setJarByClass(PageRank.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(path1));
FileOutputFormat.setOutputPath(job, new Path(path2));
path1=path2;
job.waitForCompletion(true);
System.out.println(i+"th end!");
}
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。