赞
踩
(1)数据样例
- 13726238888 2481 24681
- 13560436666 1116 954
- 13726230503 2481 24681
- 13826544101 264 0
- 13926435656 132 1512
- 13926251106 240 0
- 18211575961 1527 2106
(2)字段释义
字段中文释义 | 字段英文释义 | 数据类型 |
---|---|---|
手机号 | phone | String |
上行流量 | upflow | Long |
下行流量 | downflow | Long |
(3)项目需求二
得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序。
期望输出数据格式:
- 13502468823 101663100 1529437140 1631100240
- 13925057413 153263880 668647980 821911860
- 13726238888 34386660 342078660 376465320
- ...
(4)项目解析
基本思路:实现自定义的 bean 来封装流量信息,并将 bean 作为 Map 输出的 key 来传输。
MapReduce 程序在处理数据的过程中会对数据排序(Map 输出的 kv 对传输到 Reduce 之前,会排序),排序的依据是 Map 输出的 key, 所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到 key 中。
排序是 MapReduce 框架中最重要的操作之一。 MapTask 和 ReduceTak 均会对数据按照 key 进行排序。该操作属于 Hadoop 的默认行为,任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序。
对于 MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于 RecduceTak,它从每个 MapTask 上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask 统一对内存和磁盘上的所有数据进行一次归并排序。
2.1 排序分类
(1)部分排序
MapReduce 根据输入记录的键对数据集排序,保证输出的每个文件内部有序。
(2)全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。
但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了 MapReduce 所提供的并行架构。
(3)辅助排序:(GroupingComparator分组)
在Reduce端对key进行分组。
应用于:在接收 key 为 bean 对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce()
方法时,可以采用分组排序。
(4)二次排序
在自定义排序过程中,如果 compareTo()
方法中的判断条件为两个即为二次排序。
2.2 自定义排序 WritableComparable
自定义 bean 对象将其作为 Map 输出的 key 来传输,需要实现 WritableComparable 接口重写 compareTo()
方法,就可以实现排序。
compareTo()
方法解析:
int compareTo(T o)
参数:o表示要比较的对象。
作用:比较此对象与指定对象的顺序。
返回值:负整数、零或正整数,根据此对象是小于、等于还是大于指定对象。
实现 WritableComparable 接口重写 compareTo()
方法的具体实现:
- public class FlowBeanSort implements WritableComparable<FlowBeanSort> {
- private long upFlow; // 上行总流量
- private long downFlow; // 下行总流量
- private long sumFlow; // 总流量
- ...
-
- // 自定义排序规则,按照总流量倒序排序
- @Override
- public int compareTo(FlowBeanSort o) {
- // 自定义降序排列
- return this.sumFlow > o.getSumFlow() ? -1 : 1;
- }
- }
也就是说当语句return this.sumFlow > o.getSumFlow() ? 1 : -1;
的返回值为1时,即当this的值大于o的值时,compareTo
是按照升序(由小到大)排序的。
当返回值为-1时,也就是说this的值小于o的值时,compareTo
是按照降序(由大到小)排序的。
完整的 bean 程序如下所示:
- package com.hongyaa.mr.sort;
-
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
-
- import org.apache.hadoop.io.WritableComparable;
-
- /**
- * 自定义一个 bean 类,实现 WritableComparable 接口重写 compareTo()方法,实现自定义排序
- *
- * @author Administrator
- *
- */
- public class FlowBeanSort implements WritableComparable<FlowBeanSort> {
- private long upFlow; // 总上行流量
- private long downFlow; // 总下行流量
- private long sumFlow; // 总流量
-
- // 无参构造方法必须有,目的是为了在反序列化操作创建对象实例时调用无参构造器
- public FlowBeanSort() {
- super();
- }
-
- // 序列化方法,目的是为了对象的初始化
- public FlowBeanSort(long upFlow, long downFlow, long sumFlow) {
- super();
- this.upFlow = upFlow;
- this.downFlow = downFlow;
- this.sumFlow = sumFlow;
- }
-
- public long getUpFlow() {
- return upFlow;
- }
-
- public void setUpFlow(long upFlow) {
- this.upFlow = upFlow;
- }
-
- public long getDownFlow() {
- return downFlow;
- }
-
- public void setDownFlow(long downFlow) {
- this.downFlow = downFlow;
- }
-
- public long getSumFlow() {
- return sumFlow;
- }
-
- public void setSumFlow(long sumFlow) {
- this.sumFlow = sumFlow;
- }
-
- // 序列化方法,将对象的字段信息写入输出流
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(upFlow);
- out.writeLong(downFlow);
- out.writeLong(sumFlow);
- }
-
- // 反序列化方法,从输入流中读取各个字段信息
- // 注意:字段的反序列化顺序需要和序列化的顺序保持一致,而且字段的类型和个数也要保持一致
- @Override
- public void readFields(DataInput in) throws IOException {
- this.upFlow = in.readLong();
- this.downFlow = in.readLong();
- this.sumFlow = in.readLong();
- }
-
- // 实现自定义排序,按照总流量倒序排序
- @Override
- public int compareTo(FlowBeanSort o) {
- // 自定义降序排列
- return this.sumFlow > o.getSumFlow() ? -1 : 1;
- }
-
- // 重写toString()方法
- @Override
- public String toString() {
- return upFlow + "\t" + downFlow + "\t" + sumFlow;
- }
- }

要完成上述的需求,还需要完成三个程序分别是一个 Mapper 类、一个 Reducer 类和一个用于连接整个过程的驱动 Driver 主程序。
分析:以需求一的输出结果作为排序的输入数据,自定义FlowBeanSort,以 FlowBeanSort 为 Map 输出的 key,以手机号作为 Map 输出的 value,因为 MapReduce 程序会对 Map 阶段输出 的key 进行排序。具体实现如下所示:
- // 输入数据是上一个统计程序的输出结果,已经是各个手机号的总流量信息
- public class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBeanSort, Text> {
-
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBeanSort, Text>.Context context)
- throws IOException, InterruptedException {
- // (1)获取一行文本的内容,并将其转换为String类型,之后按照分隔符“\t”进行切分
- String[] splits = value.toString().split("\t");
- // (2)取出手机号
- String telephone = splits[0];
- // (3)封装对象
- FlowBeanSort fbs = new FlowBeanSort();
- fbs.setUpFlow(Long.parseLong(splits[1]));
- fbs.setDownFlow(Long.parseLong(splits[2]));
- fbs.setSumFlow(Long.parseLong(splits[3]));
- // (4)将封装的fbs对象作为key,将手机号作为value,分发给Reduce端
- context.write(fbs, new Text(telephone));
- }
- }

3.2:Reduce 端程序编写
分析:这里的 reduce()
方法只需要实现将 Map 端输出的 key-value 调换后输出即可。
- public class FlowSumSortReducer extends Reducer<FlowBeanSort, Text, Text, FlowBeanSort> {
- /*
- * <FlowBeanSort,电话号> ===> <电话号,FlowBeanSort>
- */
- @Override
- protected void reduce(FlowBeanSort key, Iterable<Text> values,
- Reducer<FlowBeanSort, Text, Text, FlowBeanSort>.Context context) throws IOException, InterruptedException {
- // 遍历集合
- for (Text tele : values) {
- // 将手机号作为key,将封装好的流量信息作为value,作为最终的输出结果
- context.write(new Text(tele), key);
- }
- }
- }
Driver 端为该 FlowSumSortDemo 程序运行的入口,相当于 YARN 集群(分配运算资源)的客户端,需要创建一个 Job 类对象来管理 MapReduce 程序运行时需要的相关运行参数,最后将该 Job 类对象提交给 YARN。
Job对象指定作业执行规范,我们可以用它来控制整个作业的运行。接下来,我们来看一下作业从提交到执行的整个过程。
FlowSumSortDemo.java 的完整代码如下所示:
- public class FlowSumSortDemo {
- /**
- * 得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序;
- */
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- // (1)获取配置信息类
- Configuration conf = new Configuration();
- // 指定mapreduce程序运行的hdfs的相关运行参数
- conf.set("fs.defaultFS", "hdfs://localhost:9000");
-
- // (2)新建一个Job对象
- Job job = Job.getInstance(conf);
-
- // (3)将 job 所用到的那些类(class)文件,打成jar包 (打成jar包在集群运行必须写)
- job.setJarByClass(FlowSumSortDemo.class);
-
- // (4)指定 Mapper 类和 Reducer 类
- job.setMapperClass(FlowSumSortMapper.class);
- job.setReducerClass(FlowSumSortReducer.class);
-
- // (5)指定 MapTask 的输出key-value类型
- job.setMapOutputKeyClass(FlowBeanSort.class);
- job.setMapOutputValueClass(Text.class);
-
- // (6)指定 ReduceTask 的输出key-value类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(FlowBeanSort.class);
-
- // (7)指定该 mapreduce 程序数据的输入和输出路径
- Path inPath = new Path("/flow/output_sum");
- Path outPath = new Path("/flow/output_sort");
-
- // 获取fs对象
- FileSystem fs = FileSystem.get(conf);
- if (fs.exists(outPath)) {
- fs.delete(outPath, true);
- }
-
- FileInputFormat.setInputPaths(job, inPath);
- FileOutputFormat.setOutputPath(job, outPath);
-
- // (8)最后给YARN来运行,等着集群运行完成返回反馈信息,客户端退出
- boolean waitForCompletion = job.waitForCompletion(true);
- System.exit(waitForCompletion ? 0 : 1);
- }
- }

执行结果如下所示:
- 13502468823 101663100 1529437140 1631100240
- 13925057413 153263880 668647980 821911860
- 13726238888 34386660 342078660 376465320
- 13726230503 34386660 342078660 376465320
- 18320173382 132099660 33430320 165529980
- 13560439658 28191240 81663120 109854360
- 13660577991 96465600 9563400 106029000
- 15013685858 50713740 49036680 99750420
- 13922314466 41690880 51559200 93250080
- 15920133257 43742160 40692960 84435120
- 84138413 57047760 19847520 76895280
- 13602846565 26860680 40332600 67193280
- 18211575961 21164220 29189160 50353380
- 15989002119 26860680 2494800 29355480
- 13560436666 15467760 13222440 28690200
- 13926435656 1829520 20956320 22785840
- 13480253104 2494800 2494800 4989600
- 13826544101 3659040 0 3659040
- 13926251106 3326400 0 3326400
- 13760778710 1663200 1663200 3326400
- 13719199419 3326400 0 3326400

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。