当前位置:   article > 正文

【Spark实训】--竞赛网站访问日志分析_某竞赛网站每年开展数据挖掘的竞赛,在竞赛期间网站会有大量人群访问,生成了大量用

某竞赛网站每年开展数据挖掘的竞赛,在竞赛期间网站会有大量人群访问,生成了大量用

 目录

一. 训练要点

二.需求说明

三.关键实现思路及步骤

 四、LogCount.scala文件完整代码实现:

五、运行过程与结果截图:

​ 六、具体实现步骤

 七、相关知识点 

 1、过滤出访问次数在 50 次以上的用户记录

 2、统计访问 50 次以上的用户主要访问的前 5 类网页

 3. 合并部分网页

 4.根据访问时间加入对应时段:


实训题目:竞赛网站访问日志分析

一. 训练要点

(1)搭建Spurk工程环境。

(2) Spark编程。

(3)通过spark-submit提交应用。

二.需求说明

     某竞赛网站每年都会开展数据挖据的竞赛,在竞赛期间网站会有大量人群访问,生成了大量的用户访向记录。现在提供2016年10月到2017年6月的部分脱敏访问日志数据。日志数据的基本内容如图所示,仅提供以下6个字段。

属性名称

属性解析

Id

序号

Content_id

网页ID

Page_path

网址

Userid

用户ID

Sessionid

缓存生成ID

Date_time

访问时间

     要求根据提供的用户访问日志数据,利用Spark技术统计访向的用户数、被访问的不同网页个数以及每月的访问量,并将结果保存到HDFS上。

文章所用文档以及目录等等说明:

(点击可免费下载)访问日志数据:    jc_content_viewlog.txt

IDEA内实现代码存储路径与名字:LogCount.scala

  jc_content_viewlog.txt   内部分数据如下图:

三.关键实现思路及步骤

(1)配置好Spark的IntelliJ IDEA开发环境。

(2)启动IntelliJ IDEA,并进行Spark编程。

(3)对访向记录中的网页去重,统计本周期内被访问网页的个数。

  1. val logs_all: RDD[Array[String]] = sc.textFile(args(0)).map{_.split(",")}
  2. val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
  3. val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))

 (4) userid为用户注册登录的标识,对userid去重,统计登录用户的数量。

  1. val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
  2. val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))

 (5)按月统计访问记录数。

  1. val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
  2. val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)

 (6)将结果保存到不同文件中。

  1. wy_count.repartition(1).saveAsTextFile(args(1))
  2. user_count.repartition(1).saveAsTextFile(args(2))
  3. ny_count.repartition(1).saveAsTextFile(args(3))

 (7)打包Spark工程,在集群提交应用程序。

[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client   --class   net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3

[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client   --class   net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3

注:jc.jar是上面文件生成的jar包改名并上传而来;

hdfs://node1:8020/user/root/jc_content_viewlog.txt  是hdfs里面jc_content_viewlog.txt存储路径,也需要自己上传,目录自己决定;

hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3  是设置它的输出存储路径,因为会输出三个不同数据,需要三个目录,不然会报错。

 四、LogCount.scala文件完整代码实现:

  1. package net
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object LogCount {
  5. def main(args: Array[String]): Unit = {
  6. if(args.length < 2){
  7. println("请指定input和output")
  8. System.exit(1)//0表示非正常退出程序
  9. }
  10. //TODO 1.env/准备sc/SparkContext/Spark上下文执行环境
  11. val conf: SparkConf = new SparkConf().setAppName("wc")
  12. val sc: SparkContext = new SparkContext(conf)
  13. sc.setLogLevel("WARN")
  14. //TODO 2.source/读取数据
  15. //RDD:A Resilient Distributed Dataset (RDD):弹性分布式数据集,简单理解为分布式集合!使用起来和普通集合一样简单!
  16. //RDD[就是一行行的数据]
  17. val logs_all: RDD[Array[String]] = sc.textFile(args(0)).map{_.split(",")}
  18. //TODO 3.transformation/数据操作/转换
  19. //对访问记录中的网页去重,统计本周期内被访问网页的个数
  20. val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
  21. val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))
  22. //userid为用户注册登录的标识,对userid去重,统计登录用户的数量
  23. val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
  24. val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))
  25. //按月统计访问记录数
  26. val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
  27. val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)
  28. //TODO 4.sink/输出
  29. //输出到指定path(可以是文件/夹)
  30. wy_count.repartition(1).saveAsTextFile(args(1))
  31. user_count.repartition(1).saveAsTextFile(args(2))
  32. ny_count.repartition(1).saveAsTextFile(args(3))
  33. //为了便于查看Web-UI可以让程序睡一会
  34. Thread.sleep(1000 * 60)
  35. //TODO 5.关闭资源
  36. sc.stop()
  37. }
  38. //获取年月,时间段作为输入参数
  39. def date_time(date:String):String={
  40. val nianye =date.trim.substring(0,7)
  41. nianye
  42. }
  43. }

五、运行过程与结果截图:

 


 六、具体实现步骤

1、修改打包好的jar名字,并把jar上传到node1结点

2、开启一系列集群:

start-dfs.sh   //一键开启
start-yarn.sh  //开启
cd /myserver/
 mr-jobhistory-daemon.sh start historyserver
 /myserver/spark301/sbin/start-history-server.sh
 jps  //查看

这里不再具体说明如何开启。

3、上传jc_content_viewlog.txt到node1节点,并上传到hdfs

  1. [root@node1 ~]# hdfs dfs -put jc_content_viewlog.txt  /user/root/

 4、在集群提交应用程序

[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client   --class   net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3

 七、相关知识点 

进入spark-shell

[root@node1 bin]# /myserver/spark301/bin/spark-shell

 1、过滤出访问次数在 50 次以上的用户记录

(1)统计用户访问次数并筛选出访问次数在50次以上的用户ID

scala> val data = sc.textFile("hdfs://node1:8020/user/root/jc_content_viewlog.txt").map{x=> x.split(",")}

data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:24

scala> val userid=data.map(line=>(line(3),1)).reduceByKey((a,b)=>a+b).filter(x=>x._2>50).keys .collect

 (2)根据过滤后的用户ID,在原数据中筛选出这一部分用户的访问记录

scala> val valib_data=data.filter(x=>userid.contains(x(3)))

valib_data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at filter at <console>:27

scala> valib_data.take(2)   //查看

res1: Array[Array[String]] = Array(Array(480343, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:56:49), Array(480358, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:58:50))

 2、统计访问 50 次以上的用户主要访问的前 5 类网页

 scala> val web = valib_data.map(x=>(x(2),1)).reduceByKey((a,b)=>a+b).sortBy(x=>x._2,false)

web: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at sortBy at <console>:25

scala> web.take(5)

res2: Array[(String, Int)] = Array((/jingsa/1030.jhtml,67899), (/view/contentViewLog.jspx,5008), (/jingsa/712.jhtml,2551), (/youxiuzuopin/823.jhtml,1212), (/jingsa/613.jhtml,968))

 3. 合并部分网页

(URL 后面带有_1、_2 字样的翻页网址,统一为一个网址)通过字符串截取的方法,对网页网址字符串进行截取,只截取“_”前面的字符串

 scala> val data2=data.filter(_.length>=6).map{

    x=>

      var page="";

      if(x(2).contains("_"))

        { page=x(2).substring(0,x(2).lastIndexOf("_")) }

      else

        { page=x(2) };

      (x(0),x(1),page,x(3),x(4),x(5))

      }

data2: org.apache.spark.rdd.RDD[(String, String, String, String, String, String)] = MapPartitionsRDD[14] at map at <console>:25

 4.根据访问时间加入对应时段:

6:30~11:30 为上午,11:30~14:00 为中午,14:00~17:30为下午,17:30~19:00 为傍晚,19:00~23:00 为晚上,23:00~6:30 为深夜,统计所有用户各时段访问情况

(1)首先定义一个函数,用于匹配时间段并返回相应的字段值

  1. scala> def date_time(date:String):String={
  2. val hour=date.substring(date.indexOf(" ")+1,date.indexOf(":")).toInt
  3. val min=date.substring(date.indexOf(":")+1,date.lastIndexOf(":")).toInt
  4. if(hour<6 && hour>=23) "深夜"
  5. else if(hour==6 && min<=30) "深夜"
  6. else if(hour<11 && hour>=6) "上午"
  7. else if(hour==11 && min<=30) "上午"
  8. else if(hour<14 && hour>=11) "中午"
  9. else if(hour>=14 && hour<17) "下午"
  10. else if(hour==17 && hour<=30) "下午"
  11. else if(hour>=17 && hour<19) "傍晚"
  12. else if(hour==19 && min<=30) "傍晚"
  13. else "晚上"
  14. }
  15. date_time: (date: String)String

(2)通过map方法对每一条记录的时间进行匹配,增加一个时间段的值到记录中

 scala> val data_new = data2.map{x=>(x._1,x._2,x._3,x._4,x._5,x._6,date_time(x._6))}

data_new: org.apache.spark.rdd.RDD[(String, String, String, String, String, String, String)] = MapPartitionsRDD[17] at map at <console>:27

(3)将时段值作为键,值为1,利用reduceByKey的方法统计各时段访问情况 

scala> val date_count = data_new.map(x=>(x._7,1)).reduceByKey((a,b)=>a+b)

date_count: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:25

scala> date_count.take(10)

res3: Array[(String, Int)] = Array((上午,31675), (傍晚,14511), (中午,18799), (下午,39720), (深夜,81), (晚上,67073))

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/300962
推荐阅读
相关标签
  

闽ICP备14008679号