赞
踩
常见的需求如QQ中的推荐好友,例如下图:
我们想给如花推荐好友,途中相邻连线的人之间是彼此直接好友的关系,那么我们推荐的规则是同一对”好友的好友”(简称FOF)出现的次数,比如:如花的好友的好友有“小明”“李刚”“凤姐”,而FOF关系如下:
如花 小明 李刚 凤姐
那么对于如花来说,小明,李刚,凤姐三者之间都是以如花为中心的FOF好友的好友关系。
接下来,我们使用mapreduce来计算应该给每个用户推荐哪些好友?
在这个例子中,我们会有两个mapTask和reduceTask:
第一组mapTask和reducetask:
根据直接好友关系的输入文件,计算出每组FOF关系及出现总次数
第二组mapTask和reducetask:
根据第一组输出的FOF及总次数,计算出给每个用户推荐的好友顺序。
代码:
- package com.jeff.mr.friend;
-
- import org.apache.hadoop.io.Text;
-
- public class Fof extends Text{
-
- public Fof(){
- super();
- }
-
- public Fof(String a,String b){
- /**
- * 主要:
- * 此行代码保证了无论a和b,还是b和a的顺序传进来,这两对组合在reduce端洗牌时都被分到一组去
- */
- super(getFof(a, b));
- }
-
- /**
- * 定义a和b,b和a传进来的顺序不同,是两组不一样的数据
- * @param a
- * @param b
- * @return
- */
- public static String getFof(String a,String b){
- int r =a.compareTo(b);
- if(r<0){
- return a+"\t"+b;
- }else{
- return b+"\t"+a;
- }
- }
- }
- package com.jeff.mr.friend;
-
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.StringUtils;
-
- public class RunJob {
-
- public static void main(String[] args) {
- Configuration config =new Configuration();
- config.set("fs.defaultFS", "hdfs://node4:8020");
- config.set("yarn.resourcemanager.hostname", "node4");
- // config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
- // config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
- if(run1(config)){
- run2(config);
- }
- }
-
- public static boolean run1(Configuration config) {
- try {
- FileSystem fs =FileSystem.get(config);
- Job job =Job.getInstance(config);
- job.setJarByClass(RunJob.class);
- job.setJobName("friend");
- job.setMapperClass(FofMapper.class);
- job.setReducerClass(FofReducer.class);
- job.setMapOutputKeyClass(Fof.class);
- job.setMapOutputValueClass(IntWritable.class);
-
- job.setInputFormatClass(KeyValueTextInputFormat.class);
-
- FileInputFormat.addInputPath(job, new Path("/usr/input/friend"));
-
- Path outpath =new Path("/usr/output/f1");
- if(fs.exists(outpath)){
- fs.delete(outpath, true);
- }
- FileOutputFormat.setOutputPath(job, outpath);
-
- boolean f= job.waitForCompletion(true);
- return f;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return false;
- }
-
- public static void run2(Configuration config) {
- try {
- FileSystem fs =FileSystem.get(config);
- Job job =Job.getInstance(config);
- job.setJarByClass(RunJob.class);
-
- job.setJobName("fof2");
-
- job.setMapperClass(SortMapper.class);
- job.setReducerClass(SortReducer.class);
- job.setSortComparatorClass(FoFSort.class);
- job.setGroupingComparatorClass(FoFGroup.class);
- job.setMapOutputKeyClass(User.class);
- job.setMapOutputValueClass(User.class);
-
- job.setInputFormatClass(KeyValueTextInputFormat.class);
-
- //设置MR执行的输入文件
- FileInputFormat.addInputPath(job, new Path("/usr/output/f1"));
-
- //该目录表示MR执行之后的结果数据所在目录,必须不能存在
- Path outputPath=new Path("/usr/output/f2");
- if(fs.exists(outputPath)){
- fs.delete(outputPath, true);
- }
- FileOutputFormat.setOutputPath(job, outputPath);
-
- boolean f =job.waitForCompletion(true);
- if(f){
- System.out.println("job 成功执行");
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 定义MapTask
- * 第一阶段洗牌,将数据输出为:<FOF,出现次数>
- *
- * @desc 输入文件中每一行的key是第一个字符串就代表了用户,之后的所有以\t隔开的字符串表示了用户对应的直接好友
- *
- * @author jeffSheng
- * 2018年9月23日
- */
- static class FofMapper extends Mapper<Text, Text, Fof, IntWritable>{
- protected void map(Text key, Text value,
- Context context)
- throws IOException, InterruptedException {
- //输入文件中的每一行的key仍然是按照第一个制表符\t拆分
- String user =key.toString();
- //按照\t拆分为user的所有直接好友,他们彼此之间是fof关系(不重复计算)
- String[] friends =StringUtils.split(value.toString(), '\t');
- for (int i = 0; i < friends.length; i++) {
- String f1 = friends[i];
- /**
- * 为了防止出现FOF关系中的两个用户其实是直接好友的关系,我们将<用户,好友>这种关系也加入到输出键值对中,以0来标识这种关系,在计算阶段可以用来剔除去重
- */
- Fof ofof =new Fof(user, f1);
- context.write(ofof, new IntWritable(0));
- for (int j = i+1; j < friends.length; j++) {
- String f2 = friends[j];
- Fof fof =new Fof(f1, f2);
- context.write(fof, new IntWritable(1));
- }
- }
- }
- }
-
- /**
- * 定义ReduceTask(第一个REDUCE),计算FOF关系及出现的次数
- * 第二阶段洗牌:key为FOF,value是每个FOF出现的次数可能是0和1:0标识直接好友,1标识FOF关系
- *
- * @author jeffSheng
- * 2018年9月23日
- */
- static class FofReducer extends Reducer<Fof, IntWritable, Fof, IntWritable>{
- protected void reduce(Fof fof, Iterable<IntWritable> iterable,
- Context context)
- throws IOException, InterruptedException {
- int sum =0;
- boolean f =true;
- /**
- * 迭代IntWritable,每组FOF中的value值,0或者1,
- * 是0则FOF为直接好友,终止当前组的计算循环,设置f为false不输出结果
- * 是1则累计次数sum,最后输出FOF的次数
- */
- for(IntWritable i: iterable){
- if(i.get()==0){
- f=false;
- break;
- }else{
- sum=sum+i.get();
- }
- }
- if(f){
- context.write(fof, new IntWritable(sum));
- }
- }
- }
-
-
- /**
- * 1 以第一个Reduce分区输出的结果文件为第二个mapTask的输入数据
- * 2 输入数据的key是FOF关系的第一个值,比如: 老王 如花 3,则key是老王,value是如花 3
- * 3 mapTask输出数据格式为<User, User>,第一个User是要推荐的用户,第二个是推荐的好友是谁,一个FOF输出相互推荐,context.write两次
- * @author jeffSheng
- * 2018年9月23日
- */
- static class SortMapper extends Mapper<Text, Text, User, User>{
-
-
- protected void map(Text key, Text value,Context context)
- throws IOException, InterruptedException {
- /**
- * 以老王 如花 3为例,args为如花 3,other就是如花,key就是老王了,friendsCount就是3
- */
- String[] args=StringUtils.split(value.toString(),'\t');
- String other=args[0];
- int friendsCount =Integer.parseInt(args[1]);
- /**
- * key是比如老王 如花 3为例的老王,other是如花,那么老王和如花是FOF关系,他们关系出现的次数是friendsCount
- * 输出数据的时候,推荐给老王一个用户如花,推荐给如花一个用户老王。
- */
- context.write(new User(key.toString(),friendsCount), new User(other,friendsCount));
- context.write(new User(other,friendsCount), new User(key.toString(),friendsCount));
- }
- }
-
-
- static class SortReducer extends Reducer<User, User, Text, Text>{
- protected void reduce(User arg0, Iterable<User> arg1,
- Context arg2)
- throws IOException, InterruptedException {
- String user = arg0.getUname();
- StringBuffer sb = new StringBuffer();
- for(User u: arg1 ){
- sb.append(u.getUname()+":"+u.getFriendsCount());
- sb.append(",");
- }
- arg2.write(new Text(user), new Text(sb.toString()));
- }
- }
- }
- package com.jeff.mr.friend;
-
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.io.WritableComparator;
-
- /**
- * 自定义分组
- * @author jeffSheng
- * 2018年9月24日
- */
- public class FoFGroup extends WritableComparator{
-
- public FoFGroup() {
- super(User.class,true);
- }
-
- /**
- * 就根据姓名分组
- */
- public int compare(WritableComparable a, WritableComparable b) {
- User u1 =(User) a;
- User u2=(User) b;
-
- return u1.getUname().compareTo(u2.getUname());
- }
- }
- package com.jeff.mr.friend;
-
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.io.WritableComparator;
-
- /**
- * 自定义排序
- * @author jeffSheng
- * 2018年9月24日
- */
- public class FoFSort extends WritableComparator{
-
- public FoFSort() {
- super(User.class,true);
- }
-
- /**
- * 推荐规则,先根据姓名字典排序,如果姓名相同则根据FOF出现次数排序
- */
- public int compare(WritableComparable a, WritableComparable b) {
- User u1 =(User) a;
- User u2=(User) b;
- int result =u1.getUname().compareTo(u2.getUname());
- if(result==0){
- return -Integer.compare(u1.getFriendsCount(), u2.getFriendsCount());
- }
- return result;
- }
- }
- package com.jeff.mr.friend;
-
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
-
- import org.apache.hadoop.io.WritableComparable;
-
- public class User implements WritableComparable<User>{
-
- private String uname;
- private int friendsCount;
-
- public String getUname() {
- return uname;
- }
- public void setUname(String uname) {
- this.uname = uname;
- }
- public int getFriendsCount() {
- return friendsCount;
- }
- public void setFriendsCount(int friendsCount) {
- this.friendsCount = friendsCount;
- }
-
- public User() {
- // TODO Auto-generated constructor stub
- }
-
- public User(String uname,int friendsCount){
- this.uname=uname;
- this.friendsCount=friendsCount;
- }
-
- public void write(DataOutput out) throws IOException {
- out.writeUTF(uname);
- out.writeInt(friendsCount);
- }
-
-
- public void readFields(DataInput in) throws IOException {
- this.uname=in.readUTF();
- this.friendsCount=in.readInt();
- }
-
- public int compareTo(User o) {
- int result = this.uname.compareTo(o.getUname());
- if(result==0){
- return Integer.compare(this.friendsCount, o.getFriendsCount());
- }
- return result;
- }
-
-
-
- }
上传输入文件friend,表示的是直接好友关系表
文件内容,就是文章开始那幅图的数字化表示。
- 小明 老王 如花 林志玲
- 老王 小明 凤姐
- 如花 小明 李刚 凤姐
- 林志玲 小明 李刚 凤姐 郭美美
- 李刚 如花 凤姐 林志玲
- 郭美美 凤姐 林志玲
- 凤姐 如花 老王 林志玲 郭美美
执行Runjob.java:
我们开始执行时发现报错,原因是我们设置的是node1作为config,但是node1并不是active二是standby,所以我们需要改成node1.
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby
因为node4是active状态,所以要改成node4
Configuration config =new Configuration(); config.set("fs.defaultFS", "hdfs://node4:8020"); config.set("yarn.resourcemanager.hostname", "node4"); |
任务1和任务2执行结束:
输出文件计算结果:
第一组MapTask和reduceTask计算结果:列出了每一组FOF出现的总次数
第二组MapTask和reduceTask计算结果:
如“给李刚推荐的顺序就是如花(3)、老王(2)”,可以展示在QQ推荐面板上!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。