当前位置:   article > 正文

Hadoop实战——MapReduce实现主播的播放量等数据的统计及TopN排序(第二篇)_mapreduce项目实战案例

mapreduce项目实战案例

本次实战项目一共分三篇教学(第三篇下周更新)

第一篇:对主播文本数据的清洗,从大量数据中获取我们所需要的数据(如播放量,时长等)

第二篇:对清洗后的数据进行统计求和处理操作,按照主播id号依次整齐显示

第三篇:对统计好的数据进行TopN展示的操作,排序规则可自定义(如播放量,粉丝数量),N的大小也可以自定义

所有技术、实战教学文章总目录点击此处链接


更多MapReduce设计案例地址: https://github.com/yuanprogrammer/MapReduce-Case-Statistics

目录

前言:

一、流程简介

 二、创建Maven工程项目

 (1)新建maven

 (2)添加依赖

三、编写MapReduce程序

(1)自定义数据类

(2)Mapper类

(3)Reduce类

(4)主类(入口类)

四、编译打包jar上传

五、拷贝数据集

 六、执行Jar包程序

 Gitee仓库Hadoop项目下载地址

其他系列技术教学、实战开发


前言:

在第一篇教学我们已经将主播的数据进行了初步的提取,但是数据看起来杂乱无章,这篇文章教大家如何对提取出来的数据进一步处理

一、流程简介

第一篇提取好的数据如下:

 通过文本数据我们可以发现一个主播的id号有多条数据记录,那么我们要做的就是要把这些相同的id将他对应数据进行累计求和处理操作,同时对id进行一个升序操作,看起来更加整洁

这次涉及到统计求和、排序等操作,用到了Reduce,整体项目流程如下:

 二、创建Maven工程项目

 (1)新建maven

打开IDEA,新建一个项目,,在左侧一栏选择maven工程,点击下一步

 添加项目名称,点击完成

 创建之后,右下角会弹出提示,选择Auto自动导入依赖(没有也没关系,待会添加依赖也会弹出)

 (2)添加依赖

展开项目目录,编辑pom.xml配置文件,添加如下依赖

  1. <dependencies>
  2. <!-- Hadoop所需对应依赖包 -->
  3. <dependency>
  4. <groupId>org.apache.hadoop</groupId>
  5. <artifactId>hadoop-common</artifactId>
  6. <version>3.3.1</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.apache.hadoop</groupId>
  10. <artifactId>hadoop-hdfs</artifactId>
  11. <version>3.3.1</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.hadoop</groupId>
  15. <artifactId>hadoop-client</artifactId>
  16. <version>3.3.1</version>
  17. </dependency>
  18. </dependencies>
  19. <build>
  20. <plugins>
  21. <!-- 编译打包项目的插件 -->
  22. <plugin>
  23. <groupId>org.apache.maven.plugins</groupId>
  24. <artifactId>maven-compiler-plugin</artifactId>
  25. <configuration>
  26. <source>6</source>
  27. <target>6</target>
  28. </configuration>
  29. </plugin>
  30. </plugins>
  31. </build>

 项目创建完毕

三、编写MapReduce程序

如果对Map和Reduce对应阶段的任务和处理结果不熟悉的可以看我之前那一篇 单词统计排序教程,有详细介绍

(1)自定义数据类

该类的作用:方便统计主播的指标数据,需要把这些字段整合到一个对象中,以便日后维护

新建一个VideoInfoWritable类继承Writable类,设置四个属性,自己补上get()和set()方法

  1. public class VideoInfoWritable extends Writable {
  2. private long gold; //金币
  3. private long watchnumpv; //观看时长
  4. private long follower; //粉丝数量
  5. private long length; //总播放时长
  6. }

复写Writable类的两个方法,一个readFileds输入数据,一个write输出数据

  1. @Override
  2. public void readFields(DataInput dataInput) throws IOException {
  3. this.gold = dataInput.readLong();
  4. this.watchnumpv = dataInput.readLong();
  5. this.follower = dataInput.readLong();
  6. this.length = dataInput.readLong();
  7. }
  8. @Override
  9. public void write(DataOutput dataOutput) throws IOException {
  10. dataOutput.writeLong(gold);
  11. dataOutput.writeLong(watchnumpv);
  12. dataOutput.writeLong(follower);
  13. dataOutput.writeLong(length);
  14. }

(2)Mapper类

该类的作用:对数据进行按行读取,切割获取对应字段数据,封装字段数据到(1)自定义的类对象中

新建一个videoInfoMap类继承Mapper类,复写mapper方法

  1. public class VideoInfoMap extends Mapper<LongWritable, Text, Text, VideoInfoWritable> {
  2. @Override
  3. public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
  4. String line = v1.toString(); //读取清先之后的每一行数据
  5. String[] fields = line.split("\t"); //通过"\t"对数据进行切割
  6. String id = fields[0]; //获取主播的id
  7. /**
  8. * 获取主播的其他数据
  9. * gold-------->>金币
  10. * watchnumpv-->>播放量
  11. * follower---->>粉丝
  12. * length------>>开播时长
  13. */
  14. long gold = Long.parseLong(fields[1]);
  15. long watchnumpv = Long.parseLong(fields[2]);
  16. long follower = Long.parseLong(fields[3]);
  17. long length = Long.parseLong(fields[4]);
  18. Text k2 = new Text();
  19. k2.set(id);
  20. // 封装到自定义的VideoInfoWritable类对象中
  21. VideoInfoWritable v2 = new VideoInfoWritable();
  22. v2.set(gold, watchnumpv, follower, length);
  23. context.write(k2, v2);
  24. }
  25. }

(3)Reduce类

该类的作用:遍历具有相同主播id的VideoInfoWritable类对象,获取对应的四个字段值,进行求和

新建一个videoInfoReduce类,继承Reduce类,复写reduce方法

  1. public class VideoInfoReduce extends Reducer<Text,VideoInfoWritable, Text,VideoInfoWritable> {
  2. @Override
  3. protected void reduce(Text k2,Iterable<VideoInfoWritable> v2s,Context context) throws IOException,InterruptedException{
  4. // 从v2s中把相同的k2的value取出来,进行遍历,进行累加求和。
  5. long goldsum=0;
  6. long watchnumpvsum=0;
  7. long followersum=0;
  8. long lengthsum=0;
  9. /**
  10. * v2s:相同主播id的对应对象集合(VideoInfoWritable类对象,有四个属性)
  11. * 遍历具有相同id的对象,获取对应四个字段值,进行求和
  12. */
  13. for(VideoInfoWritable v2:v2s){
  14. goldsum+=v2.getGold();
  15. watchnumpvsum+=v2.getWatchnumpv();
  16. followersum+=v2.getFollower();
  17. lengthsum+=v2.getLength();
  18. }
  19. // 将求和统计好的封装进来,写入context中,交由Job主类打印输出
  20. Text k3=k2;
  21. VideoInfoWritable v3=new VideoInfoWritable();
  22. v3.set(goldsum,watchnumpvsum,followersum,lengthsum);
  23. context.write(k3,v3);
  24. }
  25. }

(4)主类(入口类)

这个没什么好说的了,运行jar包程序的入口类,对应代码注释我也标上了

  1. public class VideoInfoJob {
  2. public static void main(String[] args) {
  3. try {
  4. // 运行jar包程序指令输入错误,直接退出程序
  5. if (args.length != 2) {
  6. System.exit(100);
  7. }
  8. Configuration conf = new Configuration();//job需要的配置参数
  9. Job job = Job.getInstance(conf);//创建一个job作业
  10. job.setJarByClass(VideoInfoJob.class);//设置入口类
  11. FileInputFormat.setInputPaths(job, new Path(args[0]));//指定输入路径(可以是文件,也可以是目录)
  12. FileOutputFormat.setOutputPath(job, new Path(args[1]));//指定输出路径(只能是指定一个不存在的目录)
  13. // 指定Mapper阶段的相关类
  14. job.setMapperClass(VideoInfoMap.class);
  15. // 指定K2的输出数据类型
  16. job.setMapOutputKeyClass(Text.class);
  17. // 指定v2的输出数据类型
  18. job.setMapOutputValueClass(VideoInfoWritable.class);
  19. // 指定Reduce阶段的相关类
  20. job.setReducerClass(VideoInfoReduce.class);
  21. // 指定K3的输出数据类型
  22. job.setOutputKeyClass(Text.class);
  23. // 指定V3的输出数据类型
  24. job.setOutputValueClass(VideoInfoWritable.class);
  25. //提交作业job
  26. job.waitForCompletion(true);
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }

四、编译打包jar上传

点击右侧的maven,展开Lifecycle,双击clean清理一下,再双击package生成jar包

 运行结束,在项目目录会生成一个target文件夹,展开可以看到有一个jar包,右键复制jar包并且通过winscp连接虚拟机拷贝到虚拟机里 (不会的看第一篇)

我已经成功拷贝到虚拟机了

五、拷贝数据集

在第一篇文章里,我们已经初步处理好了数据,并且把结果数据集输出到了HDFS文件系统,现在我们将这个结果集拷贝到data目录,并且重新命名

输入指令,将part-m-00000文件拷贝到/zhubo/data/文件夹下,并命名为zhuboClean.log

hadoop fs -cp /zhubo/resultClean/part-m-00000 /zhubo/data/zhuboClean.log

 六、执行Jar包程序

最后我们执行jar包运行一下

zhuboCensus-1.0.jar要执行的jar包名称
VideoInfoJob 主类(入口类)类名
/zhubo/data/zhuboClean.log输入路径(数据所在位置)
/zhubo/resultCensus/输出路径(结果输出路径,要为不存在的文件夹)
hadoop jar zhuboCensus-1.0.jar VideoInfoJob /zhubo/data/zhuboClean.log /zhubo/resultCensus/

 刷新浏览器,将输出的结果数据集下载下来

以记事本方式打开,可以看到已经统计完毕了,并且自动按照id依次显示

 Gitee仓库Hadoop项目下载地址

Gitee仓库地址(Hadoop实战项目源码集合)

Hadoop实战项目源码集合: https://blog.csdn.net/weixin_47971206CSDN文章教学中的源码汇总集合

其他系列技术教学、实战开发

各大技术基础教学、实战开发教学(最新更新时间2021-11-28)_Bald programmer的博客-CSDN博客

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/599509
推荐阅读
相关标签
  

闽ICP备14008679号