赞
踩
根据用户手机上网的行为记录,基于 MapReduce编程模型设计程序统计不同手机号的用户使用的总流量。其中,数据记录的字段描述如下。
序号 | 字段 | 字段类型 | 描述 |
---|---|---|---|
0 | reportTime | long | 记录报告时间戳 |
1 | msisdn | String | 手机号码 |
2 | apmac | String | AP mac |
3 | acmac | String | AC mac |
4 | host | String | 访问的网址 |
5 | siteType | String | 网址种类 |
6 | upPackNum | long | 上行数据包数,单位:个 |
7 | downPackNum | long | 下行数据包数,单位:个 |
8 | upPayLoad | long | 上行总流量,要注意单位的转换:byte |
9 | downPayLoad | long | 下行总流量。要注意单位的转换:byte |
10 | httpStatus | String | HTTP Response |
数据文件具体内容如下:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 iface.qiyi.co 视频网站 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 s19.cnzz.com 站点统计 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 rank.ie.sogou.com 搜索引擎 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 sug.so.360.cn 信息安全 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 input.shouji.sogou.com 搜索引擎 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 rank.ie.sogou.com 搜索引擎 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 sug.so.360.cn 信息安全 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 站点统计 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 t3.baidu.com 搜索引擎 2 2 120 120 200
1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 t3.baidu.com 搜索引擎 6 3 360 180 200
1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 18 138 1080 186852 200
我们需要从数据中统计出每个用户的所有请求的使用的总流量,即统计用户所有请求的上行流量(索引为8)、下行流量(索引为9)之和。得到结果后输出到单独的文件中。
在pom.xml中添加以下依赖:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.30</version> </dependency>
如图所示:
导入后记得ctrl+s保存一下。
在Flow类中写入如下代码:
package com.njupt.flowsum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class Flow implements Writable{ private String phone; //手机号 private long up; //上行流量 private long down; //下线流量 private long sum; //总流量 //无参构造函数 public Flow() { } //有参构造函数 public Flow(String phone, long up, long down) { super(); this.phone = phone; this.up = up; this.down = down; this.sum=this.up+this.down; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.phone); out.writeLong(this.up); out.writeLong(this.down); out.writeLong(this.sum); } @Override public void readFields(DataInput in) throws IOException { this.phone=in.readUTF(); this.up=in.readLong(); this.down=in.readLong(); this.sum=in.readLong(); } @Override public String toString() { return this.up+"\t"+this.down+"\t"+this.sum; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public long getUp() { return up; } public void setUp(long up) { this.up = up; } public long getDown() { return down; } public void setDown(long down) { this.down = down; } public long getSum() { return sum; } }
如图所示:
在FlowSumMapper类中输入以下代码:
package com.njupt.flowsum; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.commons.lang3.StringUtils; public class FlowSumMapper extends Mapper<LongWritable, Text, Text, Flow>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿一行数据 String line = value.toString(); //切分成各个字段 String[] fields = StringUtils.split(line, "\t"); //拿到我们需要的字段 String phone = fields[1]; long up= Long.parseLong(fields[8]); long down = Long.parseLong(fields[9]); //封装数据为kv并输出 <phone:flow> context.write(new Text(phone), new Flow(phone,up,down)); } }
如图所示:
创建FlowSumReducer类
在FlowSumReducer中输入以下代码:
package com.njupt.flowsum; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowSumReducer extends Reducer<Text, Flow, Text, Flow> { @Override protected void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException { // <phone:{flow,flow,flow,flow}> // reduce中的业务逻辑就是遍历values,然后进行累加求和再输出 long up = 0;// long down = 0; for (Flow flow : values) { up += flow.getUp(); down += flow.getDown(); } context.write(key, new Flow(key.toString(), up, down)); } }
如图所示:
创建FlowSumRunner类
在FlowSumRunner类中输入以下代码:
package com.njupt.flowsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FlowSumRunner extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumRunner.class); job.setMapperClass(FlowSumMapper.class); job.setReducerClass(FlowSumReducer.class); //设置map程序的输出key、value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Flow.class); //设置 输出 key、value job.setOutputKeyClass(Text.class); job.setOutputValueClass(Flow.class); FileInputFormat.setInputPaths(job, new Path(args[0]));//输入数据路径 /flow/input //检查一下参数所指定的输出路径是否存在,如果已存在,先删除 Path output = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if(fs.exists(output)){ fs.delete(output, true); } FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出数据路径 /flow/output return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { int status = ToolRunner.run(new Configuration(), new FlowSumRunner(), args); System.exit(status); } }
如图所示:
查看porn.xml中是否包含这行代码:
<packaging>jar</packaging>
如图所示:
将flowsum项目打包:
在终端出现build success即可,如图所示:
对flowsum进行刷新
此时target文件夹中就会出现打包好的jar包。
将该jar包改名成fs.jar,放入一个你可以找到的文件夹中。
创建一个记事本文件,命名为“data.txt”。
将数据复制到该文件中并保存,如图所示:
输入以下命令:
start-dfs.sh
start-yarn.sh
如图所示:
http://192.168.198.130:9870 (其中192.168.198.130换成自己的node1的IP地址)
查看文件夹:
cd /export/server/hadoop-3.3.0/share/hadoop/mapreduce
hdfs dfs -mkdir -p /flow/input
如图所示:
可以在浏览器查看创建情况
hdfs dfs -put data.txt /flow/input
如图所示:
可以在浏览器查看上传情况:
hadoop jar fs.jar com.njupt.flowsum.FlowSumRunner /flow/input /flow/output
如图所示:
hdfs dfs -text /flow/output/part-r-00000
如图所示:
image-20231020202354304.png&pos_id=img-ha3JxHz7-1698633023279)
也可以在浏览器下载结果文件:
20231020202416090.png&pos_id=img-2pQkSVHA-1698633023279)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。