当前位置:   article > 正文

通过Java编写MapReduce程序,统计每一个手机号耗费的总上行流量、下行流量、总流量,并且按照不同的手机号输出到不同的文件中_用mapreduce程序 求以下数据中上行流量总和、下行流量总和、及总流量

用mapreduce程序 求以下数据中上行流量总和、下行流量总和、及总流量

题目
通过Java编写MapReduce程序,统计每一个手机号耗费的总上行流量、下行流量、总流量,并且按照不同的手机号输出到不同的文件中
本题声明:
1.采用Linux系统
2.已搭建好的hadoop集群
3.使用java编写MapReduce程序
题目分析:
1.编写MapReduce程序
2.hadoop调用MapReduce程序的数据源要上传至集群当中
项目流程:
我将该项目分成了两步来做
(1) 通过数据源文件,首先求出每个手机号的总上行流量、下行流量、总流量。
(2) 根据(1)的手机号流量汇总结果再按照不同的手机号进行分组输出到不同的文件中
项目实施:
(1) 通过数据源文件,首先求出每个手机号的上行流量、下行流量、总流量。
一、首先将该题的数据源上传至集群
我已经将数据源复制到了Linux系统当中
在这里插入图片描述

数据源文件内容为:
在这里插入图片描述

(1)在集群创建空目录,用于存放该题的数据源文件
命名:hadoop fs -mkdir -p /zs/java/input/input8/
在这里插入图片描述

集群中的空目录创建成功
在这里插入图片描述

(2)将该题的数据源上传至创建的空目录中
(就在该数据源的文件夹内使用hadoop命名)
命名:
hadoop fs -put phone_data.txt /zs/java/input/input8/
在这里插入图片描述

数据源文件上传成功
在这里插入图片描述

这道题的数据源已经上传成功了,接下来就是编写MapReduce程序

二、创建java项目编写MapReduce程序
(1)创建java项目并导入连接hadoop的jar包
我创建的是一个名为Eight的java项目(项目名可自行设置)
为该项目导入连接hadoop的jar包
File --> Project Structure --> Modules --> Dependencies
选择右边的+号点击:JARs or directories,找到已准备好的lib包并选中点击ok
(没有lib包的联系我,我发给你,lib包一般存放在 “ /opt/software/ ” 下)
在这里插入图片描述
在这里插入图片描述

(2)使用java编写MapReduce程序
这次项目的MapReduce程序包含以下几个类:
FlowBean、FlowDriver、FlowMapper、FlowReducer

定义一个FlowBean.java类
代码如下:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

// bean对象要实例化
public class FlowBean implements Writable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // 反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    /**
     * 序列化方法
     *
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
     * 反序列化方法
     注意反序列化的顺序和序列化的顺序完全一致
     *
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86

定义一个FlowDriver.java类
代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class FlowDriver {

    public static void main(String[] args) throws Exception {

        //1.获取配置信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2.获取jar包信息
        job.setJarByClass(FlowDriver.class);

        //3.配置mapper、reducer类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //4.配置mapper输出key、value值
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //5.配置输出key、value值
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

//		//设置分区
//		job.setPartitionerClass(FlowPartitioner.class);
//
//		//设置Reducenum,依据是看flowpartitioner里分了几个区
//		job.setNumReduceTasks(5);

        //6.配置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //7.提交
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

定义一个FlowMapper.java类
代码如下:


import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

    //不能在map方法中new对象,map方法执行频率高,内存消耗大。这也就是需要在bean对象中要有一个空构造方法的原因
    FlowBean bean = new FlowBean();
    Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        //1.获取一行数据()
        String line = value.toString();

        //2.截取字段(1、2可归结为一大步:对输入数据的处理)
        String[] fields = line.split("\t");

        //3.封装bean对象,获取电话号码(第二大步:具体的业务逻辑)
        String phoneNum = fields[1];
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long downFlow = Long.parseLong(fields[fields.length - 2]);
        //在map方法中new对象是不好的,因为在输入数据时,每读一行数据,会执行以下map方法,这一造成内存消耗很大
        //FlowBean bean = new FlowBean(upFlow,downFlow);
        bean.set(upFlow, downFlow);
        k.set(phoneNum);

        //4.写出去(第三大步:将数据输出出去,key和value分别是什么,规定清楚)
        //context.write(new Text(phoneNum), bean);
        context.write(k, bean);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

定义一个FlowReducer.java类
代码如下:


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean>{

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)
            throws IOException, InterruptedException {
        //1.计算总的流量
        long sum_upFlow = 0;
        long sum_downFlow = 0;
        for(FlowBean bean : values){
            sum_upFlow += bean.getUpFlow();
            sum_downFlow += bean.getDownFlow();
        }

        //2.输出
        context.write(key, new FlowBean(sum_upFlow,sum_downFlow));
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

MapReduce的所有java类已创建成功

在这里插入图片描述

三、将编写的MapReduce程序打包并上传至,启动集群的Linux系统中
我这里是:hadoop111,hadoop112
Idea中打包 java程序:
File --> Project Structure --> Artifacts --> + -->JAR --> From modules with dependencies
随后在Main Class中选择 WordcountDriver,随后点击OK
在这里插入图片描述

点击Ok后,再点击OK
在这里插入图片描述

点击ok后,选择: Build --> Build Artifacts --> Build
在这里插入图片描述

然后耐心等待一会
在左侧会自动生成一个out的文件,点击: out —> artifacts —> Eight_ jar
即可查看是否打包成功
在这里插入图片描述

打包后在Linux系统中的 /root/ 下面找到 ideaProjects
在这里插入图片描述
然后点击 IdeaProjects --> Eight --> out --> artifacts -->Eight_jar
找到已打好的jar包

在该目录下,右击打开终端输入scp命名,将jar包上传至启动集群的hadoop111的Linux系统中"/opt/software/" 文件夹下
SCP 命名:scp Eight.jar root@hadoop111:/opt/software/
在这里插入图片描述

查看文件是否传输成功
在这里插入图片描述

执行zip命名对该包进行删除两个会使程序出错的文件
ZIP命名:
zip -d Eight.jar META-INF/.RSA META-INF/.SF
在这里插入图片描述

就在该目录下使用hadoop调用该包,通过数据源文件,求出每个手机号的总上行流量、下行流量、总流量。
hadoop命名:
hadoop jar Eight.jar /zs/java/input/input8/phone_data.txt /zs/java/output/output8
(/zs/java/input/input8/phone_data.txt 代表的是这道题的数据源,数据源在前面已经上传)
(/zs/java/output/output8 代表hadop调用jar包将数据源文件处理后的文件存放位置)
在这里插入图片描述

执行结果,执行结果在集群中查看
查看位置为 hadoop调用jar包将数据源文件处理后的文件存放位置
(/zs/java/output/output8)
hadoop调用jar包执行效果为:

在这里插入图片描述
到这儿(1)的要求做好了,求出了每个手机号的流量数据
接下来我们继续做(2)

(2) 根据(1)的手机号流量汇总结果再按照不同的手机号进行分组输出到不同的文件中
一、创建java项目编写MapReduce程序**
(1)创建java项目并导入连接hadoop的jar包
我创建的是一个名为Eight_Group的java项目(项目名可自行设置)
为该项目导入连接hadoop的jar包
File --> Project Structure --> Modules --> Dependencies
选择右边的+号点击:JARs or directories,找到已准备好的lib包并选中点击ok
(没有lib包的联系我,我发给你,lib包一般存放在 “ /opt/software/ ” 下)
在这里插入图片描述

在这里插入图片描述

(2)使用java编写MapReduce程序
这道题的MapReduce程序包含以下几个类:
FlowBean、FlowDriver、FlowMapper、FlowReducer、ProvincePartitioner

定义一个FlowBean.java类
代码如下:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

// bean对象要实例化
public class FlowBean implements Writable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // 反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    /**
     * 序列化方法
     *
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
     * 反序列化方法
     注意反序列化的顺序和序列化的顺序完全一致
     *
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

定义一个FlowDriver.java类
代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class FlowDriver {

    public static void main(String[] args) throws Exception {

        //1.获取配置信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2.获取jar包信息
        job.setJarByClass(FlowDriver.class);

        //3.配置mapper、reducer类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //4.配置mapper输出key、value值
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //5.配置输出key、value值
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

		//设置分区
		job.setPartitionerClass(ProvincePartitioner.class);

		//设置Reducenum,依据是看flowpartitioner里分了几个区
		job.setNumReduceTasks(4);

        //6.配置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //7.提交
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

定义一个FlowMapper.java类
代码如下:


import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

    //不能在map方法中new对象,map方法执行频率高,内存消耗大。这也就是需要在bean对象中要有一个空构造方法的原因
    FlowBean bean = new FlowBean();
    Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        //1.获取一行数据()
        String line = value.toString();

        //2.截取字段(1、2可归结为一大步:对输入数据的处理)
        String[] fields = line.split("\t");

        //3.封装bean对象,获取电话号码(第二大步:具体的业务逻辑)
        String phoneNum = fields[0];
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long downFlow = Long.parseLong(fields[fields.length - 2]);
        //在map方法中new对象是不好的,因为在输入数据时,每读一行数据,会执行以下map方法,这一造成内存消耗很大
        //FlowBean bean = new FlowBean(upFlow,downFlow);
        bean.set(upFlow, downFlow);
        k.set(phoneNum);

        //4.写出去(第三大步:将数据输出出去,key和value分别是什么,规定清楚)
        //context.write(new Text(phoneNum), bean);
        context.write(k, bean);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

定义一个FlowReducer.java类
代码如下:


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean>{

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)
            throws IOException, InterruptedException {
        //1.计算总的流量
        long sum_upFlow = 0;
        long sum_downFlow = 0;
        for(FlowBean bean : values){
            sum_upFlow += bean.getUpFlow();
            sum_downFlow += bean.getDownFlow();
        }

        //2.输出
        context.write(key, new FlowBean(sum_upFlow,sum_downFlow));
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

定义一个ProvincePartitioner.java类
代码如下:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * K2 V2 对应的是map输出kv类型
 * @author Administrator
 */
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 获取电话号码的前三位
        String preNum = key.toString().substring(0, 3);

        int partition = 4;

        // 2 判断是哪个省
        if ("136".equals(preNum)) {
            partition = 0;
        }else if ("137".equals(preNum)) {
            partition = 1;
        }else if ("138".equals(preNum)) {
            partition = 2;
        }else  {
            partition = 3;
        }
        return partition;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

MapReduce的所有java类已创建成功

在这里插入图片描述

三、将编写的MapReduce程序打包并上传至,启动集群的Linux系统中
我这里是:hadoop111,hadoop112
Idea中打包 java程序:
File --> Project Structure --> Artifacts --> + -->JAR --> From modules with dependencies
随后在Main Class中选择 WordcountDriver,随后点击OK
在这里插入图片描述
点击Ok后,再点击OK
在这里插入图片描述

点击ok后,选择: Build --> Build Artifacts --> Build
在这里插入图片描述

然后耐心等待一会
在左侧会自动生成一个out的文件,点击: out —> artifacts —> Eight_Group_ jar
即可查看是否打包成功
在这里插入图片描述

打包后在Linux系统中的 /root/ 下面找到 ideaProjects
在这里插入图片描述
然后点击 IdeaProjects --> Eight_Group --> out --> artifacts -->Eight_Group
找到已打好的jar包

在该目录下,右击打开终端输入scp命名,将jar包上传至启动集群的hadoop111的Linux系统中"/opt/software/" 文件夹下
SCP 命名:scp Eight_Group.jar root@hadoop111:/opt/software/
在这里插入图片描述

查看文件是否传输成功
在这里插入图片描述

执行zip命名对该包进行删除两个会使程序出错的文件
ZIP命名:
zip -d Eight_Group.jar META-INF/.RSA META-INF/.SF
在这里插入图片描述

就在该目录下使用hadoop调用该包,根据(1)的手机号流量汇总结果再按照手机号不同归属地进行分组输出到不同的文件中
hadoop命名:
hadoop jar Eight_Group.jar /zs/java/output/output8/part-r-00000 /zs/java/output/output8_Group
(/zs/java/output/output8/part-r-00000 是(1)统计手机流量汇总的结果)
(/zs/java/output/output8_Group 代表hadop调用jar包将(1)的结果数据文件处理后的文件存放位置)
在这里插入图片描述

执行结果,执行结果在集群中查看
查看位置为 hadoop调用jar包将数据源文件处理后的文件存放位置
(/zs/java/output/output8_Group)
hadoop调用jar包执行效果为:

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/780803
推荐阅读
相关标签
  

闽ICP备14008679号