当前位置:   article > 正文

编程实现Hadoop按日期统计访问次数_hadoop按照日期统计访问次数

hadoop按照日期统计访问次数

一、实训目标

(1)编程实现按日期统计访问次数

二、实训环境

(1)使用CentOSd Linux操作系统搭建的3个节点

(2)使用JDK

(3)使用Hadoop

三、实训内容

(1)统计用户在2016年度每个自然日的总访问次数,数据格式如图,第一列为用户名,第二列为登录的日期。

四、实例步骤

4.1分析思路与处理逻辑

数据总共有两列,第一列为用户名,第二列为登录的日期,想要统计每个自然日,也就是每一天的访问次数,可以转换为对日期值的词频统计,只要统计出每个日期出现的次数,就可以知道对应日期的日访问次数。将思路转化为MapReduce编程逻辑,需要从以下3个模块考虑。

(1)输入输出格式

(2)Mapper要实现 的计算逻辑

(3)Reducer要实现的计算逻辑

接下来依次分析这几个模块的解决思路与处理逻辑。

1.定义输出格式

通过统计日期的词频来统计每个自然日的访问次数,那么Map的输出就是<访问日期,1>,Reduce输出就是<访问日期,访问次数>。

社区网站用户的访问日期,在格式上都属于文本格式,访问次数为整型数值格式。组成的键值对为<访问日期,访问次数>,因此Map的输出与Reduce的输出都选用Text类与IntWritable类。

2.Mapper类的逻辑实现

Mapper类中最主要的部分就是map函数。map函数的主要任务是读取用户访问文件中的数据,输出所有访问日期与初始次数的键值对。因为访问日期是数据文件中的第2列,所以先定义一个数组后,再提取第2个元素,与初始次数1一起构成要输出的键值对,即<访问日期,1>。

以下为伪代码来编写Mapper的处理逻辑,代码如下

3.Reducer类的逻辑实现

Reducer类中最主要的部分就是reduce函数,reduce函数的主要任务就是读取Mapper输出的键值对<访问日期,1>。这一部分的处理逻辑与官方示例wordcount中的Reducer完全相同。如图

4.2编写核心模块代码

(1)编写Mapper模块代码

  1. package cn.demo.myfriend.data;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. /**
  8. *
  9. * @author zhongyulin
  10. * LongWritable 输入的偏移量
  11. * Text 输入的数据
  12. * Text 输出的key
  13. * IntWritable 输出的value
  14. */
  15. public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  16. private final static IntWritable one=new IntWritable(1);
  17. @Override
  18. protected void map(LongWritable key, Text value, Context context)
  19. throws IOException, InterruptedException {
  20. String line=value.toString();
  21. //按规则拆分成数组
  22. String[] arry=line.split(",");
  23. String keyout=arry[1];
  24. context.write(new Text(keyout),one);
  25. }
  26. }

(2)编写Reducer模块代码

  1. package cn.demo.myfriend.data;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Reducer;
  6. /**
  7. * Reducer模块
  8. * @author zhongyulin
  9. *
  10. */
  11. public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  12. private IntWritable result=new IntWritable();
  13. @Override
  14. protected void reduce(Text key, Iterable<IntWritable> values,
  15. Context context) throws IOException, InterruptedException {
  16. int sum=0;
  17. for (IntWritable val : values) {
  18. sum +=val.get();
  19. }
  20. result.set(sum);
  21. context.write(key, result);
  22. }
  23. }

(3)编写Dirver模块

  1. //编写Driver
  2. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  3. //1.初始化相应的hadoop配置
  4. Configuration conf = new Configuration();
  5. //收集异常信息
  6. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  7. if(otherArgs.length<2){
  8. System.err.println("需要两个参数,第一个参数是输入文件路径,第二个参数是输出文件路径");
  9. System.exit(2);
  10. }
  11. //2.新建job并且设置主类,这里的job实例需要把configuraction的实例传入,后面的“word count”是该mapreduce任务的名字
  12. Job job = Job.getInstance(conf,"Daily Access Aount");
  13. //3.设置jar包名 通过类型名生成
  14. job.setJarByClass(DailyAccessCount.class);
  15. job.setMapperClass(MyMapper.class);//TODO
  16. //4.里面类名为实际任务的Mapper
  17. //设置combiner类,可选 优化处理
  18. job.setReducerClass(MyReducer.class);//TODO
  19. //5.里面的类名是为实际任务的reducer
  20. job.setMapOutputKeyClass(Text.class);
  21. job.setMapOutputValueClass(IntWritable.class);
  22. job.setOutputKeyClass(Text.class);
  23. job.setOutputValueClass(IntWritable.class);
  24. //6.设置输出键值对类型 ,如果map和reducer输出类型一样,只需要设置总输出
  25. //设置读取的文件路径
  26. //hadoop jar ...jar wordcount /路径1 /路径2
  27. for (int i = 0; i < otherArgs.length-1; i++) {
  28. FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  29. }
  30. //设置输出的文件路径
  31. for (String str : otherArgs) {
  32. FileOutputFormat.setOutputPath(job, new Path(str));
  33. }
  34. System.exit(job.waitForCompletion(true)?0:1);
  35. //7.提交mapreduce任务运行(固定写法),并等待任务运行结束
  36. }

4.3任务实现

在本地上创建一个user_login.txt内容如下

  1. Nehru,2016-01-01
  2. Dane,2016-01-01
  3. Walter,2016-01-01
  4. Gloria,2016-01-01
  5. Clarke,2016-01-01
  6. Madeline,2016-01-01
  7. Kevyn,2016-01-01
  8. Rebecca,2016-01-01
  9. Calista,2016-01-01
  10. Lana,2016-01-01
  11. Phoebe,2016-01-01
  12. Clayton,2016-01-01
  13. Kimberly,2016-01-01
  14. Drew,2016-01-01
  15. Giselle,2016-01-01
  16. Nolan,2016-01-01
  17. Madeson,2016-01-01
  18. Janna,2016-01-01
  19. Raja,2016-01-01
  20. Aurelia,2016-02-01
  21. Wynter,2016-02-01
  22. Mari,2016-02-01
  23. Molly,2016-02-01
  24. Marshall,2016-02-01
  25. Brynne,2016-02-01
  26. Hannah,2016-02-01
  27. Whilemina,2016-02-01
  28. Gage,2016-02-01
  29. Wallace,2016-03-15
  30. Penelope,2016-03-15
  31. Ursa,2016-03-15
  32. Cassidy,2016-03-15
  33. Venus,2016-03-15
  34. Ethan,2016-03-15
  35. Regina,2016-03-15
  36. Orla,2016-03-15
  37. Avram,2016-03-15
  38. Barry,2016-03-15
  39. Dalton,2016-03-15
  40. Rhea,2016-03-15
  41. Patrick,2016-03-15
  42. Unity,2016-03-15
  43. Zachary,2016-03-15
  44. Hedley,2016-03-15
  45. Sasha,2016-03-15

在集群上创建一个文件夹user

(1)上传文件/opt/user_login.txt到/user

hdfs dfs -put /opt/user_login.txt /user

(2)在工程src下创建一个test包,创建类dailyAccessCount.java ,类中完整内容如下

  1. package dailyAccessCount;
  2. import java.io.IOException;
  3. import java.util.StringTokenizer;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.examples.WordCount;
  6. import org.apache.hadoop.examples.WordCount.IntSumReducer;
  7. import org.apache.hadoop.examples.WordCount.TokenizerMapper;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.io.IntWritable;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.Mapper;
  13. import org.apache.hadoop.mapreduce.Reducer;
  14. import org.apache.hadoop.mapreduce.Mapper.Context;
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  17. import org.apache.hadoop.util.GenericOptionsParser;
  18. /**
  19. * 任务目标是统计用户在2019年每个自然日的总访问次数
  20. * 原始数据文件中提供了用户名称和访问日期
  21. * @author student
  22. * 想要的数据格式:2019-11-06 3
  23. * oax,2019-11-06
  24. * map任务输出格式:2019-11-06,1
  25. * reduce任务输出格式:2019-11-06,3
  26. */
  27. public class DailyAccessCount {
  28. //extends Mapper 变成一个map模块
  29. //1.继承Mapper
  30. //2.设置输入/输出键值对类型 tips:输出对类型需要和Driver中设置的mapper输出的键值对类型保持一致
  31. public static class MyMapper extends Mapper<Object,Text,Text,IntWritable>{
  32. private final static IntWritable one = new IntWritable(1) ;
  33. //3.编写map方法,针对每条输入键值对执行函数中定义的逻辑处理,并按照规定的键值对格式输出。
  34. @Override
  35. public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
  36. throws IOException, InterruptedException {
  37. String array[] = value.toString().split(",");
  38. //4.mapper输出内容
  39. context.write(new Text(array[1]), one);
  40. }
  41. }
  42. //extends Reducer 变成一个reducer 模块
  43. //1.继承reducer类
  44. //2.设置输入\输出格式 (reducer输入格式是是mapper的输出格式,reducer输出格式要个driver中设置reducer输出格式保持一致)
  45. public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
  46. //3.编写reduce 对shuffle处理后的map数据进行处理
  47. @Override
  48. public void reduce(Text key, Iterable<IntWritable> values,
  49. Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  50. int sum = 0 ;
  51. for (IntWritable value : values) {
  52. sum = sum + value.get();
  53. }
  54. //4.输出内容
  55. context.write(key, new IntWritable(sum));
  56. }
  57. }
  58. //编写Driver
  59. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  60. //1.初始化相应的hadoop配置
  61. Configuration conf = new Configuration();
  62. //收集异常信息
  63. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  64. if(otherArgs.length<2){
  65. System.err.println("需要两个参数,第一个参数是输入文件路径,第二个参数是输出文件路径");
  66. System.exit(2);
  67. }
  68. //2.新建job并且设置主类,这里的job实例需要把configuraction的实例传入,后面的“word count”是该mapreduce任务的名字
  69. Job job = Job.getInstance(conf,"Daily Access Aount");
  70. //3.设置jar包名 通过类型名生成
  71. job.setJarByClass(DailyAccessCount.class);
  72. job.setMapperClass(MyMapper.class);//TODO
  73. //4.里面类名为实际任务的Mapper
  74. //设置combiner类,可选 优化处理
  75. job.setReducerClass(MyReducer.class);//TODO
  76. //5.里面的类名是为实际任务的reducer
  77. job.setMapOutputKeyClass(Text.class);
  78. job.setMapOutputValueClass(IntWritable.class);
  79. job.setOutputKeyClass(Text.class);
  80. job.setOutputValueClass(IntWritable.class);
  81. //6.设置输出键值对类型 ,如果map和reducer输出类型一样,只需要设置总输出
  82. //设置读取的文件路径
  83. //hadoop jar ...jar wordcount /路径1 /路径2
  84. for (int i = 0; i < otherArgs.length-1; i++) {
  85. FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  86. }
  87. //设置输出的文件路径
  88. for (String str : otherArgs) {
  89. FileOutputFormat.setOutputPath(job, new Path(str));
  90. }
  91. System.exit(job.waitForCompletion(true)?0:1);
  92. //7.提交mapreduce任务运行(固定写法),并等待任务运行结束
  93. }
  94. }

(3)编译打包程序,生成JAR文件

dailyAccessCount.jar。右键单击dailyAccessCount类,选择“Export” a "java" a "jar file" ,单击“next”在所示界面填写jar文件名称和jar文件存放路径,单击“finish”

将dailyAccessCount.java生成jar包上传到hadoop的/opt目录下

(4)在jar文件所在目录,以hadoop jar命令提交任务,具体命令如下

hadoop jar dailyAccessCount.jar test.dailyAccessCount /user/user_login.txt /user/AccessCount

(5)检查输出结果类似如下

1.在HDFS中/user/AccessCount/part-r-00000下即可查看结果

2.集群监控查看如下

结论比较:数据量比较低,节点比较少

  1. /***
  2. * ,%%%%%%%%,
  3. * ,%%/\%%%%/\%%
  4. * ,%%%\c "" J/%%%
  5. * %. %%%%/ o o \%%%
  6. * `%%. %%%% _ |%%%
  7. * `%% `%%%%(__Y__)%%'
  8. * // ;%%%%`\-/%%%'
  9. * (( / `%%%%%%%'
  10. * \\ .' |
  11. * \\ / \ | |
  12. * \\/ ) | |
  13. * \ /_ | |__
  14. * (___________))))))) 攻城湿
  15. */

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/337604
推荐阅读
  

闽ICP备14008679号