赞
踩
原始数据如图:
目的:求每两个用户(冒号前的是用户名,冒号后是他的好友列表)之间有哪些共同的好友,最终结果形如:
制表符前是两个用户的对,指标符后是他们之间共同拥有的好友。
需要分为两个MapReduce程序来完成。
1、第一个MapReduce程序:
①首先,在Map阶段将每个用户的每个好友作为key2,而用户本身作为value2,输出到Reduce阶段
②在Reduce阶段,将key2作为value3,value2的集合则写成A-B-C-E的形式输出为key3
③将第一个MapReduce程序的结果输出为文件
2、第二个MapReduce程序:
①读取第一个MapReduce程序的结果文件
②在Map阶段将形如A-B-C-E的用户列表字符串按“-”分隔为字符串数组,并且将数组排序,后将数组每两个元素写成形如A-B的用户对,作为key2,而第一个MapReduce程序的结果文件的共同好友则写到value2中
③在Reduce阶段,将共同好友的集合写成形如C-D-E的形式作为value3,key2续写为key3。
自定义Mapper
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class Step1Mapper extends Mapper<LongWritable, Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String userNmae=value.toString().split(":")[0]; String firents=value.toString().split(":")[1]; for (String f:firents.split(",")) { context.write(new Text(f),new Text(userNmae)); } } }
自定义Reducer
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class Step1Reducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String userNames=""; for (Text value : values) { userNames+=value.toString()+"-"; } //去除因为循环产生的多余的- userNames=userNames.substring(0,(userNames.length()-1)); context.write(new Text(userNames),key); } }
主类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Step1Run extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { Job job=Job.getInstance(super.getConf(),"STEP1"); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("E:\\共同好友\\input")); job.setMapperClass(Step1Mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(Step1Reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("E:\\共同好友\\my_s1_output")); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(),new Step1Run(),args)); } }
自定义Mapper
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.Arrays; public class Step2Mapper extends Mapper<LongWritable, Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] userNames=value.toString().split("\t")[0].split("-"); String friend=value.toString().split("\t")[1]; //避免出现A-B和B-A被作为不同的key Arrays.sort(userNames); String userPair=""; for (int i = 0; i < userNames.length-1; i++) { for (int j = i+1; j < userNames.length; j++) { userPair=userNames[i]+"-"+userNames[j]; context.write(new Text(userPair),new Text(friend)); } } } }
自定义Reducer
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Step2Reducer extends Reducer<Text,Text,Text,Text>
{
@Override
protected void reduce(Text key, Iterable<Text
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。