当前位置:   article > 正文

MapReduce计算手机流量案例_根据提示,在右侧编辑器补充代码,计算出每个手机号码的一年总流量。 main 方法已给

根据提示,在右侧编辑器补充代码,计算出每个手机号码的一年总流量。 main 方法已给

准备数据

我们的需求是:
统计每个手机号的上行流量之和,下行流量之和,以及每个手机号的总流量之和,下面的数据有7列,以制表符隔开,倒数第三列是上行,倒数第二列是下行。

在这里插入图片描述

自定义Bean对象

为什么要自定义Bean对象?
因为Mapper<?, ?, ?, ?>这个里面的泛型数量只有四个,然后input_key和input_value的类型是固定的,但是经过mapper之后的output_key和output_value应当是(手机号,(上行,下行。。))的这种形式,因为output_value只能放一个类型,所以我要放入一个对象,将上行下行的放到那个对象里封装,并且该对象要实现序列化接口。

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/*
 * 
 */
public class FlowBean implements Writable{
	
	private long upFlow;
	private long downFlow;
	private long sumFlow;
	
	public FlowBean() {
		
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}

	// 序列化   在写出属性时,如果为引用数据类型,属性不能为null
	@Override
	public void write(DataOutput out) throws IOException {
		
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
		
		
	}

	//反序列化   序列化和反序列化的顺序要一致
	@Override
	public void readFields(DataInput in) throws IOException {
		upFlow=in.readLong();
		downFlow=in.readLong();
		sumFlow=in.readLong();
		
	}

	@Override
	public String toString() {
		return  upFlow + "\t" + downFlow + "\t" + sumFlow;
	}
	
	

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

构建Mapper

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
 * 1. 统计手机号(String)的上行(long,int),下行(long,int),总流量(long,int)
 * 
 * 手机号为key,Bean{上行(long,int),下行(long,int),总流量(long,int)}为value
 * 		
 * 
 * 
 * 
 */
public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
	
	private Text out_key=new Text();
	private FlowBean out_value=new FlowBean();
	
	// (0,1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200)
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
			throws IOException, InterruptedException {
		
		String[] words = value.toString().split("\t");
		
		//封装手机号
		out_key.set(words[1]);
		// 封装上行,倒数第三列
		out_value.setUpFlow(Long.parseLong(words[words.length-3]));
		// 封装下行,倒数第二列
		out_value.setDownFlow(Long.parseLong(words[words.length-2]));
		
		context.write(out_key, out_value);
	
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

构建Reducer

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowBeanReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
	
	private FlowBean out_value=new FlowBean();
	
	@Override
	protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
			throws IOException, InterruptedException {
		
		long sumUpFlow=0;
		long sumDownFlow=0;
		
		for (FlowBean flowBean : values) {
			
			sumUpFlow+=flowBean.getUpFlow();
			sumDownFlow+=flowBean.getDownFlow();
			
		}
		
		out_value.setUpFlow(sumUpFlow);
		out_value.setDownFlow(sumDownFlow);
		out_value.setSumFlow(sumDownFlow+sumUpFlow);
		
		context.write(key, out_value);
		
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

构建Driver

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*
 * 1.一旦启动这个线程,运行Job
 * 
 * 2.本地模式主要用于测试程序是否正确!
 * 
 * 3. 报错:
 * 	ExitCodeException exitCode=1: /bin/bash: line 0: fg: no job control
 */
public class FlowBeanDriver {
	
	public static void main(String[] args) throws Exception {
		
		Path inputPath=new Path("e:/mrinput/flowbean");
		Path outputPath=new Path("e:/mroutput/flowbean");
		
		//作为整个Job的配置
		Configuration conf = new Configuration();
		
		//保证输出目录不存在
		FileSystem fs=FileSystem.get(conf);
		
		if (fs.exists(outputPath)) {
			
			fs.delete(outputPath, true);
			
		}
		
		// ①创建Job
		Job job = Job.getInstance(conf);
		
		// ②设置Job
		// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
		job.setMapperClass(FlowBeanMapper.class);
		job.setReducerClass(FlowBeanReducer.class);
		
		// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
		// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		// 设置输入目录和输出目录
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		// ③运行Job
		job.waitForCompletion(true);
		
		
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

统计结果

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/653680
推荐阅读
相关标签
  

闽ICP备14008679号