当前位置:   article > 正文

Mapreduce案例之---统计手机号耗费的总上行流量、下行流量、总流量_本关任务:根据手机流量数据,编写 mapreduce 程序来统计出每个手机号码的一年总流

本关任务:根据手机流量数据,编写 mapreduce 程序来统计出每个手机号码的一年总流

1.需求:

统计每一个手机号耗费的总上行流量、下行流量、总流量

2.数据准备:

2.1 输入数据格式:

时间戳、电话号码、基站的物理地址、访问网址的ip、网站域名、数据包、接包数、上行/传流量下行/载流量、响应码

2.2 最终输出的数据格式:

手机号码       上行流量        下行流量           总流量

3.基本思路:

3.1 Map阶段:

(1) 读取一行数据,转换为字符串类型

(2) 切分字段

(3) 抽取手机号、上行流量、下行流量

(4)以手机号为key,bean对象(上行流量、下行流量、总流量)为value 进行封装 

(5)文件写出,即context.write(手机号,bean)

3.2 Reduce阶段

(1) 遍历集合上行流量和下行流量总和得到总流量

(2)实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输

(3)MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key

4.代码实现

4.1 编写流量统计的bean对象--FlowBean.java

  1. package com.jike.hdfs;
  2. import org.apache.hadoop.io.Writable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. //
  7. /*
  8. * 1.定义实现writable接口
  9. * 2.重写序列化和反序列化方法
  10. * 3.重写空参构造
  11. * 4.tostring方法
  12. * */
  13. public class FlowBean implements Writable{
  14. private long upFlow;
  15. private long downFlow;
  16. private long sumFlow;
  17. //空参构造
  18. public FlowBean() {
  19. }
  20. public long getUpFlow() {
  21. return upFlow;
  22. }
  23. public void setUpFlow(long upFlow) {
  24. this.upFlow = upFlow;
  25. }
  26. public long getDownFlow() {
  27. return downFlow;
  28. }
  29. public void setDownFlow(long downFlow) {
  30. this.downFlow = downFlow;
  31. }
  32. public long getSumFlow() {
  33. return sumFlow;
  34. }
  35. public void setSumFlow() {
  36. this.sumFlow = this.upFlow + this.downFlow;
  37. }
  38. public void write(DataOutput dataOutput) throws IOException {
  39. dataOutput.writeLong(upFlow);
  40. dataOutput.writeLong(downFlow);
  41. dataOutput.writeLong(sumFlow);
  42. }
  43. public void readFields(DataInput dataInput) throws IOException {
  44. this.upFlow = dataInput.readLong();
  45. this.downFlow = dataInput.readLong();
  46. this.sumFlow = dataInput.readLong();
  47. }
  48. @Override
  49. public String toString() {
  50. return upFlow +"\t" + downFlow +"\t" + sumFlow ;
  51. }
  52. }

4.2 Mapper阶段--FlowBeanMapper.java

  1. package com.jike.hdfs;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. public class FlowBeanMapper extends Mapper<LongWritable,Text, Text,FlowBean> {
  7. private Text outK = new Text();
  8. private FlowBean outV = new FlowBean();
  9. @Override
  10. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  11. //1.获取和一行信息
  12. String line = value.toString();
  13. //2.切割
  14. //时间戳、 电话号码、 基站的物理地址、 访问网址的ip、 网站域名、 数据包、接包数、上行/传流量、下行/载流量、响应码
  15. // 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
  16. String[] split = line.split("\t");
  17. //3.抓取数据
  18. String phoneno = split[1];
  19. String upflow = split[split.length-3];
  20. String downflow = split[split.length-2];
  21. //4.封装
  22. outK.set(phoneno);
  23. outV.setUpFlow(Long.parseLong(upflow));
  24. outV.setDownFlow(Long.parseLong(downflow));
  25. outV.setSumFlow();
  26. //5.写出
  27. context.write(outK,outV);
  28. }
  29. }

4.3 Reduce阶段--FlowBeanReducer.java

  1. package com.jike.hdfs;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. public class FlowBeanReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
  6. private FlowBean outV = new FlowBean();
  7. @Override
  8. protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
  9. //1.遍历集合累加值
  10. long totalup = 0;
  11. long totaldown = 0;
  12. for (FlowBean value : values) {
  13. totalup += value.getUpFlow();
  14. totaldown += value.getDownFlow();
  15. }
  16. //2.封装
  17. outV.setUpFlow(totalup);
  18. outV.setDownFlow(totaldown);
  19. outV.setSumFlow();
  20. //3.写出
  21. context.write(key,outV);
  22. }
  23. }

4.4 Driver 阶段--FlowBeanDriver.java

  1. package com.jike.hdfs;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  6. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  7. /*import org.apache.hadoop.mapred.FileInputFormat;
  8. import org.apache.hadoop.mapred.FileOutputFormat;*/ //这个是老的包,废弃了,需要用新的包才可以
  9. import org.apache.hadoop.mapreduce.Job;
  10. import java.io.IOException;
  11. public class FlowBeanDriver {
  12. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  13. //1.获取job
  14. Configuration conf = new Configuration();
  15. Job job = Job.getInstance(conf);
  16. //2.设置jar
  17. job.setJarByClass(FlowBeanDriver.class);
  18. //3.关联mapper Reducer
  19. job.setMapperClass(FlowBeanMapper.class);
  20. job.setReducerClass(FlowBeanReducer.class);
  21. //4.设置 mapper 输出key 和 value 类型
  22. job.setMapOutputKeyClass(Text.class);
  23. job.setMapOutputValueClass(FlowBean.class);
  24. //5.设置最终数据输出key 和value 类型
  25. job.setOutputKeyClass(Text.class);
  26. job.setOutputValueClass(FlowBean.class);
  27. //6.设置数据的输入和输出路径
  28. FileInputFormat.setInputPaths(job, new Path("D:\\BigData\\input"));
  29. FileOutputFormat.setOutputPath(job, new Path("D:\\BigData\\output1"));
  30. //7.提交job
  31. boolean result = job.waitForCompletion(true);
  32. System.exit(result ? 0 : 1);
  33. }
  34. }

5.运行结果

参考文章: https://blog.csdn.net/zhao2chen3/article/details/110201664

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/780804
推荐阅读
相关标签
  

闽ICP备14008679号