赞
踩
更多大数据专栏文章请点击 : –> 小马哥大数据专栏博文导航 <–
统计每个手机号的上行数据包总和,下行数据包总和,上行总流量之和,下行总流量之和分析:以手机号码作为key值,上行流量,下行流量,上行总流量,下行总流量四个字段作为value值,然后以这个key,和value作为map阶段的输出,reduce阶段的输入.
数据格式如下:
1, map输出:
key: 手机号码msisdn
value: 原始line
2, reduce输出:
key: 手机号码msisdn
value: 对四个字段 upPackNum, downPackNum, upPayLoad, downPayLoad累计求和
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 代表流量记录的JavaBean */ public class Flow implements WritableComparable<Flow> { private String phoneNum; //手机号码 private Long upPackNum; //上行数据包数量 private Long downPackNum; //下行数据包数量 private Long upPayLoad; //上行总流量 private Long downPayLoad; //下行总流量 private Long totalUpPackNum; //上行数据包数量_总和 private Long totalDownPackNum; //下行数据包数量_总和 private Long totalUpPayLoad; //上行总流量_总和 private Long totalDownPayLoad; //下行总流量_总和 public Flow() { } public Flow(Long totalUpPackNum, Long totalDownPackNum, Long totalUpPayLoad, Long totalDownPayLoad) { this.totalUpPackNum = totalUpPackNum; this.totalDownPackNum = totalDownPackNum; this.totalUpPayLoad = totalUpPayLoad; this.totalDownPayLoad = totalDownPayLoad; } public String getPhoneNum() { return phoneNum; } // ... 省略getter与setter方法 @Override public String toString() { return totalUpPackNum + "\t" + totalDownPackNum + "\t" + totalUpPayLoad + "\t" + totalDownPayLoad; } @Override public int compareTo(Flow o) { return 0; } @Override public void write(DataOutput out) throws IOException { } @Override public void readFields(DataInput in) throws IOException { } }
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Example1Mapper extends Mapper<LongWritable, Text,Text,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
context.write(new Text(fields[1]),value);
}
}
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class Example1Reducer extends Reducer<Text,Text,Text,Flow> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { long totalUpPackNum = 0L; //上行数据包数量_总和 long totalDownPackNum = 0L; //下行数据包数量_总和 long totalUpPayLoad = 0L; //上行总流量_总和 long totalDownPayLoad = 0L; //下行总流量_总和 for (Text flow : values) { String[] fields = flow.toString().split("\t"); totalUpPackNum+= Long.valueOf(fields[6]); totalDownPackNum+= Long.valueOf(fields[7]); totalUpPayLoad+= Long.valueOf(fields[8]); totalDownPayLoad+= Long.valueOf(fields[9]); } Flow flowOut = new Flow(totalUpPackNum, totalDownPackNum, totalUpPayLoad, totalDownPayLoad); context.write(key,flowOut); } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MainJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { //1,创建一个Job类 Job job = Job.getInstance(super.getConf(), "Example1_job"); //2, 设置输入类,输入路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1")); //3, 设置Mapper类, map输出类型 job.setMapperClass(Example1Mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //4, 设置Reducer类, reduce输出类型 job.setReducerClass(Example1Reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Flow.class); //5, 设置输出类, 输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1_result")); //6, 启动Job, 等待Job执行 boolean completion = job.waitForCompletion(true); return completion?1:0; } public static void main(String[] args) { int run = 0; try { run = ToolRunner.run(new Configuration(), new MainJob(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(run); } }
计数器显示
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30 File System Counters FILE: Number of bytes read=11298 FILE: Number of bytes written=984294 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=23 <-- Mapper读入23条 Map output records=23 <-- Mapper输出23条 Map output bytes=2830 Map output materialized bytes=2882 Input split bytes=112 Combine input records=0 Combine output records=0 Reduce input groups=21 Reduce shuffle bytes=2882 Reduce input records=23 <-- 输入结果23条 Reduce output records=21 <-- 输出结果21条(验证结果正确) Spilled Records=46 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=0 Total committed heap usage (bytes)=382730240 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=2583 File Output Format Counters Bytes Written=572
文件输出显示
13480253104 3 3 180 180
13502468823 57 102 7335 110349
13560439658 33 24 2034 5892
13600217502 37 266 2257 203704 <-- 验证是正确的
13602846565 15 12 1938 2910
......
更多大数据专栏文章请点击 : –> 小马哥大数据专栏博文导航 <–
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。