赞
踩
我们的需求是:
统计每个手机号的上行流量之和,下行流量之和,以及每个手机号的总流量之和,下面的数据有7列,以制表符隔开,倒数第三列是上行,倒数第二列是下行。
为什么要自定义Bean对象?
因为Mapper<?, ?, ?, ?>这个里面的泛型数量只有四个,然后input_key和input_value的类型是固定的,但是经过mapper之后的output_key和output_value应当是(手机号,(上行,下行。。))的这种形式,因为output_value只能放一个类型,所以我要放入一个对象,将上行下行的放到那个对象里封装,并且该对象要实现序列化接口。
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /* * */ public class FlowBean implements Writable{ private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } // 序列化 在写出属性时,如果为引用数据类型,属性不能为null @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //反序列化 序列化和反序列化的顺序要一致 @Override public void readFields(DataInput in) throws IOException { upFlow=in.readLong(); downFlow=in.readLong(); sumFlow=in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } }
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /* * 1. 统计手机号(String)的上行(long,int),下行(long,int),总流量(long,int) * * 手机号为key,Bean{上行(long,int),下行(long,int),总流量(long,int)}为value * * * * */ public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ private Text out_key=new Text(); private FlowBean out_value=new FlowBean(); // (0,1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200) @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\t"); //封装手机号 out_key.set(words[1]); // 封装上行,倒数第三列 out_value.setUpFlow(Long.parseLong(words[words.length-3])); // 封装下行,倒数第二列 out_value.setDownFlow(Long.parseLong(words[words.length-2])); context.write(out_key, out_value); } }
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowBeanReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ private FlowBean out_value=new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException { long sumUpFlow=0; long sumDownFlow=0; for (FlowBean flowBean : values) { sumUpFlow+=flowBean.getUpFlow(); sumDownFlow+=flowBean.getDownFlow(); } out_value.setUpFlow(sumUpFlow); out_value.setDownFlow(sumDownFlow); out_value.setSumFlow(sumDownFlow+sumUpFlow); context.write(key, out_value); } }
import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /* * 1.一旦启动这个线程,运行Job * * 2.本地模式主要用于测试程序是否正确! * * 3. 报错: * ExitCodeException exitCode=1: /bin/bash: line 0: fg: no job control */ public class FlowBeanDriver { public static void main(String[] args) throws Exception { Path inputPath=new Path("e:/mrinput/flowbean"); Path outputPath=new Path("e:/mroutput/flowbean"); //作为整个Job的配置 Configuration conf = new Configuration(); //保证输出目录不存在 FileSystem fs=FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } // ①创建Job Job job = Job.getInstance(conf); // ②设置Job // 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型 job.setMapperClass(FlowBeanMapper.class); job.setReducerClass(FlowBeanReducer.class); // Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化 // 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 设置输入目录和输出目录 FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // ③运行Job job.waitForCompletion(true); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。