赞
踩
mahout是一个推荐系统的apache下的框架,而hadoop是一个分布式的框架。基于《mahout in action》一书中的第六章介绍了关于分布式的hadoop实现,首先先介绍关于mahout下的基于共现矩阵的物品相似度的算法实现。
这篇文章主要论述我在实现上一篇文章所述功能时的具体操作过程。因为Hadoop
现在有两套新旧API接口,因此在实现过程中需要十分注意你import进来的class是属于新的API还是旧的API。本文的所使用的hadoop
版本是2.6
版本。
mahout in action
用的是维基百科的数据,数据量较大,考虑到不便于验证我们的测试程序是否运行正确,我们这里用的是自己写的一个小数据文件mywiki.txt
- 100 101 102 103
- 101 100 105 106
- 102 100
- 103 109 101 102
- 104 100 103
- 105 106
- 106 110 107
- 107 101 100
- 108 101 100
- 109 102 103
- 110 108 105
每一行代表的数分别是userID
itemID1
itemID2
。
具体流程在我前面的文章有论述。这里给出工程布局以及pom.xml
文件:
wiki项目工程文件:
简单说明一下,这里WikipediaToItemPrefsMapper
和WikipediaToUserVectorReducer
是第一次MapReduce操作,UserVectorToCooccurrenceMapper
和UserVectorToCooccurrenceReducer
是第二次MapReduce操作。这两次MapReduce操作分别完成的功能同样可以参考上一篇文章。
wiki项目的pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.jd.bigdata</groupId>
- <artifactId>1.0</artifactId>
- <version>1.0-SNAPSHOT</version>
-
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>2.6.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>1.2.1</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-examples</artifactId>
- <version>1.2.1</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.mahout</groupId>
- <artifactId>mahout-examples</artifactId>
- <version>0.9</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.mahout</groupId>
- <artifactId>mahout-integration</artifactId>
- <version>0.9</version>
- <exclusions>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.cassandra</groupId>
- <artifactId>cassandra-all</artifactId>
- </exclusion>
- <exclusion>
- <groupId>me.prettyprint</groupId>
- <artifactId>hector-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- </dependencies>
-
-
- </project>
特别提醒一下,这个pom.xml
是根据我集群的实际环境配置的hadoop 2.6
版本,mahout 0.9
版本。如果你们的集群环境和我的不一样,需要进行一些调整。
工程源码大部分与mahout in action
一样,根据实际情况进行了一些调整。
WikipediaToItemPrefsMapper.Java
- package com.chenbiaolong.wiki;
-
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.mahout.math.VarLongWritable;
- import java.io.IOException;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
-
- public final class WikipediaToItemPrefsMapper extends
- Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {
- private static final Pattern NUMBERS = Pattern.compile("(\\d+)");
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- String line = value.toString();
- Matcher m = NUMBERS.matcher(line);
- m.find();
- VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
- VarLongWritable itemID = new VarLongWritable();
- while (m.find()) {
- itemID.set(Long.parseLong(m.group()));
- context.write(userID, itemID);
- }
- }
- }
WikipediaToUserVectorReducer.java
- package com.chenbiaolong.wiki;
-
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.mahout.math.RandomAccessSparseVector;
- import org.apache.mahout.math.VarLongWritable;
- import org.apache.mahout.math.Vector;
- import org.apache.mahout.math.VectorWritable;
- import java.io.IOException;
-
- public class WikipediaToUserVectorReducer
- extends
- Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {
- public void reduce(VarLongWritable userID,
- Iterable<VarLongWritable> itemPrefs, Context context)
- throws IOException, InterruptedException {
- Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
- for (VarLongWritable itemPref : itemPrefs) {
- userVector.set((int) itemPref.get(), 1.0f);
- }
- context.write(userID, new VectorWritable(userVector));
- }
- }
第一次mapreduce操作将会将userID itemID1 itemID2
的输入数据转变为userID itemID1:1.0 itemID2:1.0
的形式。
UserVectorToCooccurrenceMapper.java
- package com.chenbiaolong.wiki;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.mahout.math.VarLongWritable;
- import org.apache.mahout.math.Vector;
- import org.apache.mahout.math.VectorWritable;
- import java.io.IOException;
- import java.util.Iterator;
- public class UserVectorToCooccurrenceMapper extends
- Mapper<VarLongWritable, VectorWritable, IntWritable, IntWritable> {
- public void map(VarLongWritable userID, VectorWritable userVector,
- Context context) throws IOException, InterruptedException {
- Iterator<Vector.Element> it = userVector.get().nonZeroes().iterator();
- while (it.hasNext()) {
- int index1 = it.next().index();
- Iterator<Vector.Element> it2 = userVector.get().nonZeroes().iterator();
- while (it2.hasNext()) {
- int index2 = it2.next().index();
- context.write(new IntWritable(index1), new IntWritable(index2));
- }
- }
- }
- }
UserVectorToCooccurrenceReducer.java
- package com.chenbiaolong.wiki;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.mahout.math.RandomAccessSparseVector;
- import org.apache.mahout.math.Vector;
- import org.apache.mahout.math.VectorWritable;
- import java.io.IOException;
- public class UserVectorToCooccurrenceReducer extends
- Reducer<IntWritable, IntWritable, IntWritable, VectorWritable> {
- public void reduce(IntWritable itemIndex1,
- Iterable<IntWritable> itemIndex2s, Context context)
- throws IOException, InterruptedException {
- Vector cooccurrenceRow = new RandomAccessSparseVector(
- Integer.MAX_VALUE, 100);
- for (IntWritable intWritable : itemIndex2s) {
- int itemIndex2 = intWritable.get();
- cooccurrenceRow.set(itemIndex2,
- cooccurrenceRow.get(itemIndex2) + 1.0);
- }
- context.write(itemIndex1, new VectorWritable(cooccurrenceRow));
- }
- }
接下来是job的驱动代码。我们将第一次MapReduce设置为Job0,第二次MapReduce设置为Job1. job1的输入是job0的输出,因此我们设置一个临时目录存放job0的输出。具体的配置如下面源码所示:
wikiDriver.java
- package com.chenbiaolong.wiki;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
- import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
- import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
- import org.apache.mahout.math.VarLongWritable;
- import org.apache.mahout.math.VectorWritable;
-
- import java.io.IOException;
-
- public class wikiDriver {
- //hadoop的安装目录
- static final String HADOOP_HOME = "/users/sunwangdong/hadoop/hadoop-2.5.2";
- //HDFS的临时目录,用来放job1的输出结果,注意这个地址是放到hdfs节点上的地址
- static final String TMP_PATH = "/sunwangdong/tmp/part-r-00000";
-
- //操作hdfs的文件夹
- static void OperatingFiles(String input, String output) {
- Path inputPath = new Path(input);
- Path outputPath = new Path(output);
- Path tmpPath = new Path(TMP_PATH);
- Configuration conf = new Configuration();
- conf.addResource(new Path(HADOOP_HOME+"/etc/hadoop/core-site.xml"));
- conf.addResource(new Path(HADOOP_HOME+"/etc/hadoop/hdfs-site.xml"));
- try {
- FileSystem hdfs = FileSystem.get(conf);
- if (!hdfs.exists(inputPath)) {
- System.out.println("input path no exist!");
- }
- if (hdfs.exists(outputPath)) {
- System.out.println("output path:"+outputPath.toString()+ " exist, deleting this path...");
- hdfs.delete(outputPath,true);
- }
- if (hdfs.exists(tmpPath)) {
- System.out.println("tmp path:"+tmpPath.toString()+ " exist, deleting this path...");
- hdfs.delete(tmpPath,true);
- }
- } catch (Exception e) {
-
- }
-
- }
- public static int main(String[] args) throws IOException {
- if (args.length!=2) {
- System.out.println("useage: <input dir> <output dir>");
- return 1;
- }
- OperatingFiles(args[0], args[1]);
- Configuration job1Conf = new Configuration();
- Job job1 = new Job(job1Conf, "job1");
- job1.setJarByClass(wikiDriver.class);
- job1.setMapperClass(WikipediaToItemPrefsMapper.class);
- job1.setReducerClass(WikipediaToUserVectorReducer.class);
- job1.setMapOutputKeyClass(VarLongWritable.class);
- job1.setMapOutputValueClass(VarLongWritable.class);
-
- //将job1输出的文件格式设置为SequenceFileOutputFormat
- job1.setOutputFormatClass(SequenceFileOutputFormat.class);
- job1.setOutputKeyClass(VarLongWritable.class);
- job1.setOutputValueClass(VectorWritable.class);
- FileInputFormat.addInputPath(job1, new Path(args[0]));
- FileOutputFormat.setOutputPath(job1, new Path(TMP_PATH));
-
- Configuration job2Conf = new Configuration();
- Job job2 = new Job(job2Conf, "job2");
- job2.setJarByClass(wikiDriver.class);
- job2.setMapperClass(UserVectorToCooccurrenceMapper.class);
- job2.setReducerClass(UserVectorToCooccurrenceReducer.class);
- job2.setMapOutputKeyClass(IntWritable.class);
- job2.setMapOutputValueClass(IntWritable.class);
- job2.setOutputKeyClass(IntWritable.class);
- job2.setOutputValueClass(VectorWritable.class);
-
- //将job2的输入文件格式设置为SequenceFileInputFormat
- job2.setInputFormatClass(SequenceFileInputFormat.class);
- FileInputFormat.addInputPath(job2, new Path(TMP_PATH));
- FileOutputFormat.setOutputPath(job2, new Path(args[1]));
-
- ControlledJob ctrlJob1 = new ControlledJob(job1.getConfiguration());
- ctrlJob1.setJob(job1);
-
- ControlledJob ctrlJob2 = new ControlledJob(job2.getConfiguration());
- ctrlJob2.setJob(job2);
-
- ctrlJob2.addDependingJob(ctrlJob1);
-
- JobControl JC = new JobControl("wiki job");
- JC.addJob(ctrlJob1);
- JC.addJob(ctrlJob2);
-
- Thread thread = new Thread(JC);
- thread.start();
- while (true) {
- if(JC.allFinished()) {
- System.out.println(JC.getSuccessfulJobList());
- JC.stop();
- System.exit(0);
- }
-
- if (JC.getFailedJobList().size() > 0) {
- System.out.println(JC.getFailedJobList());
- JC.stop();
- System.exit(1);
- }
- }
- }
- }
将wiki工程打成jar包,上传到Linux服务器运行。具体的打包过程参考我前面写的文章。
首先需要在hdfs节点上新建一个文件夹,
hdfs dfs -mkdir /user/chenbiaolong/data
接下来将测试数据文件上传到hdfs,主要文件的路径最好写绝对路径
[hdfs@localhost wiki]$ hdfs dfs -put mywiki.txt /user/chenbiaolong/data
运行jar文件
hadoop jar wiki.jar com.chenbiaolong.wiki.wikiDriver /user/chenbiaolong/data/mywiki.txt /user/chenbiaolong/wiki_output
这条命令指定了user/chenbiaolong/data/mywiki.txt
为输入文件,/user/chenbiaolong/wiki_output
为输出路径。在程序中我们指定了/user/chenbiaolong/tmp/
为job0的输出路径。 但是LZ在执行这条命令的时候,出错了,报了一个mkdir can not create .../META-INF/license的错。后来经过百度,才发现了解决方案,原来是我在打包后的文件夹里已经存在了META-INF/LICENSE的文件夹,需要删除。用
zip -d '打包后的文件夹中的jar' META-INF/LICENSE
那么就把这包给删除了,然后继续条用上一个命令,就可以执行了。
执行的输出结果如下:
- 15/04/07 13:58:33 INFO impl.TimelineClientImpl: Timeline service address: http://localhost:8188/ws/v1/timeline/
- 15/04/07 13:58:33 INFO client.RMProxy: Connecting to ResourceManager at localhost/127.0.0.1:8050
- 15/04/07 13:58:34 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
- 15/04/07 13:58:35 INFO input.FileInputFormat: Total input paths to process : 1
- 15/04/07 13:58:35 INFO mapreduce.JobSubmitter: number of splits:1
- 15/04/07 13:58:36 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1427799851545_0023
- 15/04/07 13:58:36 INFO impl.YarnClientImpl: Submitted application application_1427799851545_0023
- 15/04/07 13:58:36 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1427799851545_0023/
- 15/04/07 13:59:01 INFO impl.TimelineClientImpl: Timeline service address: http://localhost:8188/ws/v1/timeline/
- 15/04/07 13:59:01 INFO client.RMProxy: Connecting to ResourceManager at localhost/127.0.0.1:8050
- 15/04/07 13:59:01 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
- 15/04/07 13:59:03 INFO input.FileInputFormat: Total input paths to process : 1
- 15/04/07 13:59:03 INFO mapreduce.JobSubmitter: number of splits:1
- 15/04/07 13:59:03 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1427799851545_0024
- 15/04/07 13:59:03 INFO impl.YarnClientImpl: Submitted application application_1427799851545_0024
- 15/04/07 13:59:03 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1427799851545_0024/
- [job name: job1
- job id: wiki job0
- job state: SUCCESS
- job mapred id: job_1427799851545_0023
- job message: just initialized
- job has no depending job:
- , job name: job2
- job id: wiki job1
- job state: SUCCESS
- job mapred id: job_1427799851545_0024
- job message: just initialized
- job has 1 dependeng jobs:
- depending job 0: job1
- ]
有些时候,在执行mapreduce任务时,可能出现卡壳了的情况,那是因为出现了资源分配不足的问题,我是通过重启了hadoop任务就好了。
查看输出结果,job0的输出结果:
[hdfs@localhost wiki]$ hdfs dfs -cat /user/chenbiaolong/tmp/part-r-00000/part-r-00000
结果输出的是乱码。这是因为我们将job0的输出格式设置为SequenceFileOutputFormat
,这个格式便于job1读取,但无法直接用cat输出易读的字符串。
查看job1的输出结果:
- [hdfs@localhost wiki]$ hdfs dfs -cat /user/chenbiaolong/wiki_output/part-r-00000
- 100 {106:1.0,101:2.0,103:1.0,100:5.0,105:1.0}
- 101 {101:4.0,103:1.0,109:1.0,100:2.0,102:2.0}
- 102 {101:2.0,103:2.0,109:1.0,102:3.0}
- 103 {101:1.0,103:3.0,100:1.0,102:2.0}
- 105 {106:1.0,100:1.0,108:1.0,105:2.0}
- 106 {106:2.0,100:1.0,105:1.0}
- 107 {107:1.0,110:1.0}
- 108 {108:1.0,105:1.0}
- 109 {101:1.0,109:1.0,102:1.0}
- 110 {107:1.0,110:1.0}
可以看出通过两次MapReduce操作正确得到了各个itemID
的共现值向量。通过这个向量可以估计两个itemID
的相似程度,便于下一步的推荐操作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。