当前位置:   article > 正文

大数据入门--hadoop(四)--MR案例_使用mapreduce编程统计某超市一月商品被购买的次数

使用mapreduce编程统计某超市一月商品被购买的次数

案例一:ReduceJoin

场景:
假设我们现在有两张表数据:商品表和订单表
订单表:订单编号 商品ID 购买数量

001 01 2 
002 01 1
003 02 1
004 03 2
005 04 1
  • 1
  • 2
  • 3
  • 4
  • 5

商品表:商品ID 商品名称 商品单价

01 华为Mate40 5999
02 华为笔记本 6999
03 小米笔记本 3999
  • 1
  • 2
  • 3

现在要求我们对两个表进行left join操作
解析思路:
逆向思考–>Reduce阶段需要让相同的商品ID为一组,也就是排序时需要按照商品ID进行排序
Map阶段:分别读取商品表信息和订单表信息(文件),排序顺序为商品ID、相同顺序下商品表信息在前,订单信息在后
Reduce阶段前:自定义分组按照商品ID进行分组
Reduce阶段:输出商品信息Join订单信息
Mapper输出key:ProductOrderBean,val:NullWritable(切忌:虽然看似是分组以商品ID分组,但是不可以用Text作为Mapper的输出Key,因为如果按照Text分组只能保证key是有序的,我们这里需要的是第一顺序是商品ID,第二顺序是商品信息在订单信息前面)

ProductOrderBean.java

package com.xbz.study.bigdata.hadoop.mr.reducejoin;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class ProductOrderBean implements WritableComparable<ProductOrderBean> {
    //商品信息
    private String productId;
    private String productName;
    private Float price;

    //订单信息
    private String orderId;
    private int number;
    private float totalPrice;


    @Override
    public int compareTo(ProductOrderBean o) {
        int productIdOrder = this.getProductId().compareTo(o.getProductId());
        if(productIdOrder == 0 ){
            //商品ID相同时,商品信息
            return o.getProductName().compareTo(this.getProductName());
        }
        return productIdOrder;
    }

    @Override
    public String toString() {
        return this.orderId + "\t"
                + this.productId +"\t"
                + this.productName+"\t"
                + this.number +"\t"
                + this.price +"\t"
                + this.totalPrice +"\t"
                ;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(productId);
        out.writeUTF(productName);
        out.writeFloat(price);

        out.writeUTF(orderId);
        out.writeInt(number);
        out.writeFloat(totalPrice);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        productId = in.readUTF();
        productName = in.readUTF();
        price = in.readFloat();

        orderId = in.readUTF();
        number = in.readInt();
        totalPrice = in.readFloat();
    }
    //setter getter省略
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

GroupingComparator.java/Reducer.java/Mapper.java/Driver.java

package com.xbz.study.bigdata.hadoop.mr.reducejoin;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Iterator;

public class Driver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(Driver.class);

        job.setMapperClass(ReduceJoinMapper.class);
        job.setReducerClass(ReduceJoinReducer.class);

        job.setMapOutputKeyClass(ProductOrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(ProductOrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job,new Path("D:\\hadoop-study\\reducejoin\\input"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop-study\\reducejoin\\output"));

        job.setGroupingComparatorClass(ProductIdGroupingComparator.class);
        boolean b = job.waitForCompletion(true);
        System.exit( b ? 0 : 1);
    }

    private static class ReduceJoinMapper extends Mapper<LongWritable, Text,ProductOrderBean,NullWritable> {
        private FileSplit fs;
        private ProductOrderBean k = new ProductOrderBean();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            fs = (FileSplit)context.getInputSplit();
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String name = fs.getPath().getName();
            String oneLine = value.toString();
            String[] s = oneLine.split(" ");
            if("production.txt".equals(name)){
                k.setProductId(s[0]);
                k.setProductName(s[1]);
                k.setPrice(Float.valueOf(s[2]));
                //重点:将未用到的字段覆盖值
                k.setOrderId("");
                k.setNumber(0);
            }else if("order.txt".equals(name)){
                k.setOrderId(s[0]);
                k.setProductId(s[1]);
                k.setNumber(Integer.valueOf(s[2]));
                //重点:将未用到的字段覆盖值
                k.setProductName("");
                k.setPrice(0f);
            }
            context.write(k, NullWritable.get());
        }
    }

    private static class ReduceJoinReducer extends Reducer<ProductOrderBean,NullWritable,ProductOrderBean,NullWritable> {

        @Override
        protected void reduce(ProductOrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            String productName = key.getProductName();
            Float price = key.getPrice();
            Iterator<NullWritable> iterator = values.iterator();
            while(iterator.hasNext()){
                iterator.next();
                if(StringUtils.isNotBlank(key.getOrderId())){
                    key.setProductName(productName);
                    key.setPrice(price);
                    key.setTotalPrice(key.getNumber()*price);
                    context.write(key,NullWritable.get());
                }
            }
        }
    }

    private static class ProductIdGroupingComparator extends WritableComparator {
        public ProductIdGroupingComparator(){
            super(ProductOrderBean.class,true);
        }
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            ProductOrderBean pa = (ProductOrderBean) a;
            ProductOrderBean pb = (ProductOrderBean) b;
            return pa.getProductId().compareTo(pb.getProductId());
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104

数据倾斜

如果ReduceTask有多个,可能出现由于数据分布不均匀造成数据倾斜,主要是shuffle过程分区造成数据倾斜。数据倾斜只会发生在Reduce阶段,而不会是Map阶段,因为Map阶段多个任务主要是通过分片情况决定的。
针对数据倾斜问题,我们可以考虑使用MapJoin的方式进行处理,但是MapJoin的局限性在于只能处理大表和小表的join问题。

MapJoin

依旧是上面的案例
我们假设商品表是小表,我们在任务启动时,在小表数据文件,加载到任务的缓存文件,然后在mapper阶段完成Bean的输出,无需Reduce参与

Mapper.java/Driver.java

package com.xbz.study.bigdata.hadoop.mr.mapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

public class Driver {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(Driver.class);

        job.setMapperClass(MyMapper.class);

        job.setMapOutputKeyClass(ProductOrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(ProductOrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job,new Path("D:\\hadoop-study\\mapjoin\\input\\order.txt"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop-study\\mapjoin\\output"));

        //添加缓存文件
        job.addCacheFile(URI.create("file:///D:/hadoop-study/mapjoin/input/production.txt"));

        boolean b = job.waitForCompletion(true);
        System.exit( b ? 0 : 1);

    }
    static class MyMapper extends Mapper<LongWritable, Text,ProductOrderBean, NullWritable>{
        private HashMap<String,ProductOrderBean> productions = new HashMap<>();
        private ProductOrderBean k = new ProductOrderBean();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            URI[] cacheFiles = context.getCacheFiles();
            URI orderFileUrl = cacheFiles[0];
            FileSystem fileSystem = FileSystem.get(context.getConfiguration());
            FSDataInputStream inputStream = fileSystem.open(new Path(orderFileUrl));
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
            String oneLine = null;
            while((oneLine = bufferedReader.readLine())!=null){
                String[] s = oneLine.split(" ");
                ProductOrderBean bean = new ProductOrderBean();
                bean.setProductId(s[0]);
                bean.setProductName(s[1]);
                bean.setPrice(Float.valueOf(s[2]));
                productions.put(bean.getProductId(),bean);
            }
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String oneLine = value.toString();
            String[] s = oneLine.split(" ");
            k.setOrderId(s[0]);
            k.setProductId(s[1]);
            k.setNumber(Integer.valueOf(s[2]));
            ProductOrderBean productBean = productions.get(k.getProductId());
            if(productBean != null){
                k.setProductName(productBean.getProductName());
                k.setPrice(productBean.getPrice());
                k.setTotalPrice(k.getPrice()*k.getNumber());
            }else{
                //重点:将未用到的字段覆盖值
                k.setProductName("");
                k.setPrice(0f);
                k.setTotalPrice(0f);
            }
            context.write(k,NullWritable.get());
        }
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90

TOP N

案例
如下给出一张分数表,要求计算出前三名的信息
重点:要尽快完成结果输出,比如总数据量为1亿数据量,输出前三程序停止运行
成绩单

张三 60
李四 70
王五 90
赵六 75
孙七 88
  • 1
  • 2
  • 3
  • 4
  • 5

低效率代码演示:

/**
 * 通过bean作为key,并实现WritableComparable,map-reduce阶段完成排序
 */
@Data
public class ScoreBean implements WritableComparable<ScoreBean> {
    private String name;
    private Float score;
    @Override
    public int compareTo(ScoreBean o) {
        return o.getScore().compareTo(this.getScore());
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeFloat(score);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        name = in.readUTF();
        score = in.readFloat();
    }
}
//map阶段只做组装bean输出
public static class IMapper extends Mapper<LongWritable, Text,ScoreBean, NullWritable> {

    private ScoreBean k = new ScoreBean();

    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        String oneLine = value.toString();
        String[] s = oneLine.split(" ");
        k.setName(s[0]);
        k.setScore(Float.valueOf(s[1]));
        context.write(k, NullWritable.get());
    }
}
//reduce阶段看似只获取了前三个key,即TOPN-->no problem???
//实际上,假设map端输出1亿调数据,reduce的方法就会被循环执行1亿次,但是只有前N次是有效的。
public static class IReducer extends Reducer<ScoreBean,NullWritable,ScoreBean,NullWritable>{
    private int i = 0;
    @Override
    protected void reduce(ScoreBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        if(i++<3){
            context.write(key,NullWritable.get());
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

分析此过程:假设数据量很大,在Reduce阶段需要将全部数据迭代一次才能输出完成,实际我们只需要Top3,性能浪费。
解决思路:既然是Reduce循环造成,那么我们是否可以让他只做一次Reduce,即将所有数据看做是一组
改进版
不要忘记 job设置自定义分组

    public static class IReducer2 extends Reducer<ScoreBean,NullWritable,ScoreBean,NullWritable>{
        @Override
        protected void reduce(ScoreBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            Iterator<NullWritable> iterator = values.iterator();
            for (int i = 0; i < 3; i++) {
                if(iterator.hasNext()){
                    iterator.next();
                    context.write(key,NullWritable.get());
                }
            }
        }
    }

    public static class AllOneGroupingComparator extends WritableComparator{
        public AllOneGroupingComparator(){
            super(ScoreBean.class,true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            return 0;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
//设置自定义分组
job.setGroupingComparatorClass(AllOneGroupingComparator.class);
  • 1
  • 2

倒排索引

案例
解析文件单词生成倒排索引
a.txt

hello world
every body like bigdata
but me only like money
  • 1
  • 2
  • 3

b.txt

hello everyone
hello ladies and gentlemen
ladies are beautiful
  • 1
  • 2
  • 3

最终我们需要生成的文件内容
[单词]-><[文件名],[单词出现次数]>
例如:
hello-><a.txt,1> <b.txt,2>

解析
可以将其分解为两个MR
首先第一个MR对每个文件进行单词统计
key:文件名+单词
val:单词数量
第二步MR
Mapper将上一个key进行拆分,成为
key:单词
val:文件名+单词出现次数
Reducer按照Key进行汇总结果

全量代码

package com.xbz.study.bigdata.hadoop.mr.reverseindex;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;

public class Driver {
    public static void main(String[] args) throws Exception {
        //第一个Job 负责统计每个文件单词出现数量
        Configuration conf = new Configuration();
        Job job1 = Job.getInstance(conf);

        job1.setJarByClass(Driver.class);

        job1.setMapperClass(FirstMapper.class);
        job1.setReducerClass(FirstReducer.class);

        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(IntWritable.class);

        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job1,new Path("D:\\hadoop-study\\reverseindex\\input"));
        FileOutputFormat.setOutputPath(job1,new Path("D:\\hadoop-study\\reverseindex\\output"));

        //设置job输出
        job1.setOutputFormatClass(SequenceFileOutputFormat.class);
        boolean b = job1.waitForCompletion(true);
        if(!b){
            System.err.println("第一步执行错误");
            System.exit(1);
            return;
        }
        //第二个Job 负责将单词和文件进行切分并统计最终结果
        Job job2 = Job.getInstance(conf);

        job2.setJarByClass(Driver.class);

        job2.setMapperClass(SecondMapper.class);
        job2.setReducerClass(SecondReducer.class);

        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);

        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job2,new Path("D:\\hadoop-study\\reverseindex\\output"));
        FileOutputFormat.setOutputPath(job2,new Path("D:\\hadoop-study\\reverseindex\\output2"));

        //设置job输入格式
        job2.setInputFormatClass(SequenceFileInputFormat.class);
        boolean b2 = job2.waitForCompletion(true);
        if(!b2){
            System.err.println("第二步执行错误");
        }
        System.exit(b2 ? 0 : 1);
    }

    static class FirstMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
        private FileSplit fs;
        private Text k = new Text();
        private IntWritable v = new IntWritable(1);
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            fs = (FileSplit) context.getInputSplit();
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String fileName = fs.getPath().getName();
            String oneLine = value.toString();
            String[] split = oneLine.split("\\s+");
            for (String word : split) {
                k.set(fileName+"--"+ word);
                context.write(k,v);
            }
        }
    }

    static class FirstReducerextends Reducer<Text,IntWritable,Text,IntWritable>{
        private IntWritable v = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            AtomicInteger count = new AtomicInteger();
            values.forEach((i->{
                count.addAndGet(i.get());
            }));
            v.set(count.get());
            context.write(key,v);
        }
    }

    private static class SecondMapper extends Mapper<Text,IntWritable,Text,Text> {
        private Text k = new Text();
        private Text v = new Text();
        @Override
        protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
            String keyStr = key.toString();
            String[] split = keyStr.split("--");
            k.set(split[1]);
            v.set(split[0]+","+value.get());
            context.write(k,v);
        }
    }

    private static class SecondReducer extends Reducer<Text,Text,Text,Text> {
        private Text v = new Text();
        private StringBuilder sb = new StringBuilder();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            sb.delete(0,sb.length());
            Iterator<Text> iterator = values.iterator();
            while(iterator.hasNext()){
                Text next = iterator.next();
                sb.append("<").append(next.toString()).append("> ");
            }
            v.set(sb.toString());
            context.write(key,v);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138

找出共同好友

假设数据

A->C,U,H,I,J,K,O
B->A,D,G,H,I,K,L,M,N,P,Q,R,S,T,U,V,X,Z
C->L
D->Q,R,C,S,F,Z,L
E->Q,B,S,T,U,W,X,H,Y,N,O
F->A,G,H,Z,M,N
G->P,A,S,E,W,K,M,N
I->D,V,K,N
J->A,B,E,F,G,H,L,M,N,O,Q,S,T,U,W,X,Y,Z
K->B,D,E,F,H,J,L,M,O,R,U,W,X,Z
L->Q,A,Z,J,K
M->B,G,H,I,J,K,L,N,P,Q,R,S,T,V,W,X,Z
N->B,C,D,F,H,J,K,L,M,O,P,Q,S,T,X,Z
O->C,D,E,F,G,H,K,L,P,Q,V,W,X,Y,Z
P->A,C,E,G,H,I,J,K,L,N,R,V,X
Q->P,R,D,T,U,W,H,I,J,L,N,O
R->P,Q,S,F,V,H,I,M
S->Q
T->D,F,G,I,L,M,N,Q,R,S,U,W,Z
U->P,B,C,T,E,F,H,X,K,M,N
V->A,C,E,H,J,K,M,R,T,U,W,X,Y
W->B,C,D,I,J,K,L,M,N,O,R,S,X,Y
Y->B,R,C,D,T,E,X,I,K,N
Z->Q,A,T,U,E,F,V,G,I,L
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

解析思路:
首先我们将关系反向梳理
比如:A->B,C,D,是正向的,用户A的好友有B,C,D。那么谁把B作为他的好友这样就是反向的。
第一步:我们先找出每个人所有的被好友的数据,即
Map:key->好友,val->当前用户
Reduce:key:好友,val:所有将key作为好友的集合
第二步:以第一步结果为输入,迭代val(内嵌循环),输出key:用户1-用户2(此处用户1,2需要按照一定的规则固定顺序),val:好友(即第一步的key)
Map:用户1-用户2
Reduce:用户1和用户2的共同好友的集合

package com.xbz.study.bigdata.hadoop.mr.friends;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import java.io.IOException;
import java.util.Iterator;

public class Driver {
    public static void main(String[] args) throws Exception {
        //第一个Job
        Configuration conf = new Configuration();
        Job job1 = Job.getInstance(conf);

        job1.setJarByClass(Driver.class);

        job1.setMapperClass(FirstMapper.class);
        job1.setReducerClass(FirstReducer.class);

        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(Text.class);

        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job1,new Path("D:\\hadoop-study\\friends\\input"));
        FileOutputFormat.setOutputPath(job1,new Path("D:\\hadoop-study\\friends\\output"));

        //设置job输出
        job1.setOutputFormatClass(SequenceFileOutputFormat.class);
        boolean b = job1.waitForCompletion(true);
        if(!b){
            System.err.println("第一步执行错误");
            System.exit(1);
            return;
        }
        //第二个Job 负责将单词和文件进行切分并统计最终结果
        Job job2 = Job.getInstance(conf);

        job2.setJarByClass(Driver.class);

        job2.setMapperClass(SecondMapper.class);
        job2.setReducerClass(SecondReducer.class);

        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);

        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job2,new Path("D:\\hadoop-study\\friends\\output"));
        FileOutputFormat.setOutputPath(job2,new Path("D:\\hadoop-study\\friends\\output2"));

        //设置job输入格式
        job2.setInputFormatClass(SequenceFileInputFormat.class);
        boolean b2 = job2.waitForCompletion(true);
        if(!b2){
            System.err.println("第二步执行错误");
        }
        System.exit(b2 ? 0 : 1);
    }

    static class FirstMapper extends Mapper<LongWritable, Text,Text, Text> {
        private Text k = new Text();
        private Text v = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String oneLine = value.toString();
            String[] split = oneLine.split("->");
            String[] split1 = split[1].split(",");
            for (String friend :split1) {
                k.set(friend);
                v.set(split[0]);
                context.write(k,v);
            }
        }
    }

    static class FirstReducer extends Reducer<Text,Text, Text,Text> {
        private Text v = new Text();
        private StringBuilder sb = new StringBuilder();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            sb.delete(0,sb.length());
            Iterator<Text> iterator = values.iterator();
            while(iterator.hasNext()){
                Text next = iterator.next();
                sb.append(next.toString()+" ");
            }
            v.set(sb.toString());
            context.write(key,v);
        }
    }

    private static class SecondMapper extends Mapper<Text,Text,Text,Text> {
        private Text k = new Text();
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String oneLine = value.toString();
            String[] s = oneLine.split(" ");
            String userA = null;
            String userB = null;
            for (int i = 0; i < s.length; i++) {
                userA = s[i];
                for (int j = i+1; j < s.length; j++) {
                    userB = s[j];
                    if(userA.compareTo(userB)>0){
                      k.set("<"+userB + ","+userA+">");
                    }else{
                        k.set("<"+userA + ","+userB+">");
                    }
                    context.write(k,key);
                }
            }
        }
    }

    private static class SecondReducer extends Reducer<Text,Text,Text,Text> {
        private Text v = new Text();
        private StringBuilder sb = new StringBuilder();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            sb.delete(0,sb.length());
            values.forEach((value)->{
                sb.append(value).append(" ");
            });
            v.set(sb.toString());
            context.write(key,v);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/781089
推荐阅读
相关标签
  

闽ICP备14008679号