当前位置:   article > 正文

hadoop离线阶段(第十一节)MapReduce求取共同好友、互相好友、实现文件中单词计数、实现文件合并和分目录输出_mapreduce统计互粉好友对

mapreduce统计互粉好友对

MapReduce求取共同好友

原始数据如图:
在这里插入图片描述
目的:求每两个用户(冒号前的是用户名,冒号后是他的好友列表)之间有哪些共同的好友,最终结果形如:在这里插入图片描述
制表符前是两个用户的对,指标符后是他们之间共同拥有的好友。

实现方法

需要分为两个MapReduce程序来完成。
1、第一个MapReduce程序:
①首先,在Map阶段将每个用户的每个好友作为key2,而用户本身作为value2,输出到Reduce阶段
②在Reduce阶段,将key2作为value3,value2的集合则写成A-B-C-E的形式输出为key3
③将第一个MapReduce程序的结果输出为文件
2、第二个MapReduce程序:
①读取第一个MapReduce程序的结果文件
②在Map阶段将形如A-B-C-E的用户列表字符串按“-”分隔为字符串数组,并且将数组排序,后将数组每两个元素写成形如A-B的用户对,作为key2,而第一个MapReduce程序的结果文件的共同好友则写到value2中
③在Reduce阶段,将共同好友的集合写成形如C-D-E的形式作为value3,key2续写为key3。

代码实现

第一个MapReduce程序

自定义Mapper

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Step1Mapper extends Mapper<LongWritable, Text,Text,Text>
{
   
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
   
        String userNmae=value.toString().split(":")[0];
        String firents=value.toString().split(":")[1];
        for (String f:firents.split(","))
        {
   
            context.write(new Text(f),new Text(userNmae));
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

自定义Reducer

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Step1Reducer extends Reducer<Text,Text,Text,Text>
{
   
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
    {
   
        String userNames="";
        for (Text value : values)
        {
   
            userNames+=value.toString()+"-";
        }
		//去除因为循环产生的多余的-
        userNames=userNames.substring(0,(userNames.length()-1));
        
        context.write(new Text(userNames),key);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

主类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Step1Run extends Configured implements Tool
{
   

    @Override
    public int run(String[] strings) throws Exception
    {
   
        Job job=Job.getInstance(super.getConf(),"STEP1");
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("E:\\共同好友\\input"));

        job.setMapperClass(Step1Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(Step1Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("E:\\共同好友\\my_s1_output"));


        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception
    {
   
        System.exit(ToolRunner.run(new Configuration(),new Step1Run(),args));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

第二个MapReduce程序

自定义Mapper

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.Arrays;

public class Step2Mapper extends Mapper<LongWritable, Text,Text,Text>
{
   
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
   
        String[] userNames=value.toString().split("\t")[0].split("-");
        String friend=value.toString().split("\t")[1];
        //避免出现A-B和B-A被作为不同的key
        Arrays.sort(userNames);
        String userPair="";
        for (int i = 0; i < userNames.length-1; i++)
        {
   
            for (int j = i+1; j < userNames.length; j++)
            {
   
                userPair=userNames[i]+"-"+userNames[j];
                context.write(new Text(userPair),new Text(friend));
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

自定义Reducer

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Step2Reducer extends Reducer<Text,Text,Text,Text>
{
   
    @Override
    protected void reduce(Text key, Iterable<Text
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/855219
推荐阅读
相关标签
  

闽ICP备14008679号