赞
踩
目录
实训题目:竞赛网站访问日志分析
(1)搭建Spurk工程环境。
(2) Spark编程。
(3)通过spark-submit提交应用。
某竞赛网站每年都会开展数据挖据的竞赛,在竞赛期间网站会有大量人群访问,生成了大量的用户访向记录。现在提供2016年10月到2017年6月的部分脱敏访问日志数据。日志数据的基本内容如图所示,仅提供以下6个字段。
属性名称 | 属性解析 |
Id | 序号 |
Content_id | 网页ID |
Page_path | 网址 |
Userid | 用户ID |
Sessionid | 缓存生成ID |
Date_time | 访问时间 |
要求根据提供的用户访问日志数据,利用Spark技术统计访向的用户数、被访问的不同网页个数以及每月的访问量,并将结果保存到HDFS上。
文章所用文档以及目录等等说明:
(点击可免费下载)访问日志数据: jc_content_viewlog.txt
IDEA内实现代码存储路径与名字:LogCount.scala
jc_content_viewlog.txt 内部分数据如下图:
(1)配置好Spark的IntelliJ IDEA开发环境。
(2)启动IntelliJ IDEA,并进行Spark编程。
(3)对访向记录中的网页去重,统计本周期内被访问网页的个数。
- val logs_all: RDD[Array[String]] = sc.textFile(args(0)).map{_.split(",")}
- val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
- val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))
(4) userid为用户注册登录的标识,对userid去重,统计登录用户的数量。
- val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
- val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))
(5)按月统计访问记录数。
- val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
- val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)
(6)将结果保存到不同文件中。
- wy_count.repartition(1).saveAsTextFile(args(1))
- user_count.repartition(1).saveAsTextFile(args(2))
- ny_count.repartition(1).saveAsTextFile(args(3))
(7)打包Spark工程,在集群提交应用程序。
[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client --class net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3
[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client --class net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3
注:jc.jar是上面文件生成的jar包改名并上传而来;
hdfs://node1:8020/user/root/jc_content_viewlog.txt 是hdfs里面jc_content_viewlog.txt存储路径,也需要自己上传,目录自己决定;
hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3 是设置它的输出存储路径,因为会输出三个不同数据,需要三个目录,不然会报错。
- package net
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object LogCount {
- def main(args: Array[String]): Unit = {
-
- if(args.length < 2){
- println("请指定input和output")
- System.exit(1)//非0表示非正常退出程序
- }
-
- //TODO 1.env/准备sc/SparkContext/Spark上下文执行环境
- val conf: SparkConf = new SparkConf().setAppName("wc")
- val sc: SparkContext = new SparkContext(conf)
- sc.setLogLevel("WARN")
-
- //TODO 2.source/读取数据
- //RDD:A Resilient Distributed Dataset (RDD):弹性分布式数据集,简单理解为分布式集合!使用起来和普通集合一样简单!
- //RDD[就是一行行的数据]
- val logs_all: RDD[Array[String]] = sc.textFile(args(0)).map{_.split(",")}
- //TODO 3.transformation/数据操作/转换
- //对访问记录中的网页去重,统计本周期内被访问网页的个数
- val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
- val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))
- //userid为用户注册登录的标识,对userid去重,统计登录用户的数量
- val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
- val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))
- //按月统计访问记录数
- val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
- val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)
-
- //TODO 4.sink/输出
- //输出到指定path(可以是文件/夹)
- wy_count.repartition(1).saveAsTextFile(args(1))
- user_count.repartition(1).saveAsTextFile(args(2))
- ny_count.repartition(1).saveAsTextFile(args(3))
- //为了便于查看Web-UI可以让程序睡一会
- Thread.sleep(1000 * 60)
-
- //TODO 5.关闭资源
- sc.stop()
- }
- //获取年月,时间段作为输入参数
- def date_time(date:String):String={
- val nianye =date.trim.substring(0,7)
- nianye
- }
-
- }
1、修改打包好的jar名字,并把jar上传到node1结点
2、开启一系列集群:
start-dfs.sh //一键开启
start-yarn.sh //开启
cd /myserver/
mr-jobhistory-daemon.sh start historyserver
/myserver/spark301/sbin/start-history-server.sh
jps //查看
这里不再具体说明如何开启。
3、上传jc_content_viewlog.txt到node1节点,并上传到hdfs
-
- [root@node1 ~]# hdfs dfs -put jc_content_viewlog.txt /user/root/
4、在集群提交应用程序
[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client --class net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3
进入spark-shell
[root@node1 bin]# /myserver/spark301/bin/spark-shell
(1)统计用户访问次数并筛选出访问次数在50次以上的用户ID
scala> val data = sc.textFile("hdfs://node1:8020/user/root/jc_content_viewlog.txt").map{x=> x.split(",")}
data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:24
scala> val userid=data.map(line=>(line(3),1)).reduceByKey((a,b)=>a+b).filter(x=>x._2>50).keys .collect
(2)根据过滤后的用户ID,在原数据中筛选出这一部分用户的访问记录
scala> val valib_data=data.filter(x=>userid.contains(x(3)))
valib_data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at filter at <console>:27
scala> valib_data.take(2) //查看
res1: Array[Array[String]] = Array(Array(480343, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:56:49), Array(480358, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:58:50))
scala> val web = valib_data.map(x=>(x(2),1)).reduceByKey((a,b)=>a+b).sortBy(x=>x._2,false)
web: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at sortBy at <console>:25
scala> web.take(5)
res2: Array[(String, Int)] = Array((/jingsa/1030.jhtml,67899), (/view/contentViewLog.jspx,5008), (/jingsa/712.jhtml,2551), (/youxiuzuopin/823.jhtml,1212), (/jingsa/613.jhtml,968))
(URL 后面带有_1、_2 字样的翻页网址,统一为一个网址)通过字符串截取的方法,对网页网址字符串进行截取,只截取“_”前面的字符串
scala> val data2=data.filter(_.length>=6).map{
x=>
var page="";
if(x(2).contains("_"))
{ page=x(2).substring(0,x(2).lastIndexOf("_")) }
else
{ page=x(2) };
(x(0),x(1),page,x(3),x(4),x(5))
}
data2: org.apache.spark.rdd.RDD[(String, String, String, String, String, String)] = MapPartitionsRDD[14] at map at <console>:25
6:30~11:30 为上午,11:30~14:00 为中午,14:00~17:30为下午,17:30~19:00 为傍晚,19:00~23:00 为晚上,23:00~6:30 为深夜,统计所有用户各时段访问情况
(1)首先定义一个函数,用于匹配时间段并返回相应的字段值
- scala> def date_time(date:String):String={
- val hour=date.substring(date.indexOf(" ")+1,date.indexOf(":")).toInt
- val min=date.substring(date.indexOf(":")+1,date.lastIndexOf(":")).toInt
- if(hour<6 && hour>=23) "深夜"
- else if(hour==6 && min<=30) "深夜"
- else if(hour<11 && hour>=6) "上午"
- else if(hour==11 && min<=30) "上午"
- else if(hour<14 && hour>=11) "中午"
- else if(hour>=14 && hour<17) "下午"
- else if(hour==17 && hour<=30) "下午"
- else if(hour>=17 && hour<19) "傍晚"
- else if(hour==19 && min<=30) "傍晚"
- else "晚上"
- }
- date_time: (date: String)String
(2)通过map方法对每一条记录的时间进行匹配,增加一个时间段的值到记录中
scala> val data_new = data2.map{x=>(x._1,x._2,x._3,x._4,x._5,x._6,date_time(x._6))}
data_new: org.apache.spark.rdd.RDD[(String, String, String, String, String, String, String)] = MapPartitionsRDD[17] at map at <console>:27
(3)将时段值作为键,值为1,利用reduceByKey的方法统计各时段访问情况
scala> val date_count = data_new.map(x=>(x._7,1)).reduceByKey((a,b)=>a+b)
date_count: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:25
scala> date_count.take(10)
res3: Array[(String, Int)] = Array((上午,31675), (傍晚,14511), (中午,18799), (下午,39720), (深夜,81), (晚上,67073))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。