当前位置:   article > 正文

基于SparkSQL的网站日志分析实战_【spark实战】慕课网日志分析按照流量

【spark实战】慕课网日志分析按照流量

基于SparkSQL的网站日志分析实战

用户行为日志概述

          用户行为日志:用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击...)
                                   用户行为轨迹、流量日志
  1. 为什么要记录用户访问行为日志
  • 网站页面的访问量
  • 网站的黏性
  • 推荐

           2.用户行为日志生成渠道

  • Nginx
  • Ajax
3.用户行为日志内容

日志数据内容:
1)访问的系统属性: 操作系统、浏览器等等
2)访问特征:点击的url、从哪个url跳转过来的(referer)、页面上的停留时间等
3)访问信息:session_id、访问ip(访问城市)等
日志信息如下所示:
2013-05-19 13:00:00     http://www.taobao.com/17/?tracker_u=1624169&type=1      B58W48U4WKZCJ5D1T3Z9ZY88RU7QA7B1        http://hao.360.cn/      1.196.34.243   
4.用户行为日志分析的意义
  • 网站的眼睛(访问者来自什么地方,找什么东西,那些页面最受欢迎,访问者的入口地址是什么等)
  • 网站的神经(网站布局是否合理,导航层次是否清晰,功能是否存在问题,转换路径是否靠谱)
  • 网站的大脑(如何分析目标,如何分配广告预算(广告推广))

离线数据处理架构

数据处理流程
1)数据采集
Flume: web日志写入到HDFS


2)数据清洗
脏数据
Spark、Hive、MapReduce 或者是其他的一些分布式计算框架  
清洗完之后的数据可以存放在HDFS(Hive/Spark SQL)


3)数据处理
按照我们的需要进行相应业务的统计和分析
Spark、Hive、MapReduce 或者是其他的一些分布式计算框架


4)处理结果入库
结果可以存放到RDBMS、NoSQL


5)数据的可视化
通过图形化展示的方式展现出来:饼图、柱状图、地图、折线图
ECharts、HUE、Zeppelin
离线数据处理架构

项目需求

  • 需求一:统计imooc主站最受欢迎的课程/手记的Top N访问次数
  • 需求二:按地市统计imooc主站最受欢迎的Top N课程
(1).根据IP地址提取出城市信息
(2).窗口函数在Spark SQL中的使用
  • 按流量统计imooc主站最受欢迎的Top N课程

功能实现

          网站主站日志介绍
  • 访问时间
  • 访问URL
  • 访问过程耗费流量
  • 访问IP地址
数据清洗
原始日志
10.100.0.1 - - [10/Nov/2016:00:01:02 +0800] "HEAD / HTTP/1.1" 301 0 "117.121.101.40" "-" - "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.16.2.3 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2" "-" - - - 0.000
 第一次清洗后(访问时间 主站URL 耗费的流量 ip地址)
2017-05-11 15:07:17     http://www.imooc.com/video/14322        245     202.96.134.133
2017-05-11 06:52:31     http://www.imooc.com/article/17891      535     222.129.235.182
2017-05-11 18:46:43     http://www.imooc.com/article/17898      807     218.75.35.226
代码实现参见gitee地址【https://gitee.com/robei/SparkSQLProject/blob/master/src/main/scala/com/imooc/log/SparkStatFormatJob.scala】
第二次数据清洗
输入:访问时间、访问URL、耗费的流量、访问IP地址信息
输出:URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天

代码实现参见gitee地址【https://gitee.com/robei/SparkSQLProject/blob/master/src/main/scala/com/imooc/log/SparkStatCleanJob.scala】

根据ip地址解析城市信息

使用github上已有的开源项目
1)git clone https://github.com/wzhe06/ipdatabase.git
2)编译下载的项目:mvn clean package -DskipTests
3)安装jar包到自己的maven仓库
mvn install:install-file -Dfile=~/source/ipdatabase/target/ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar

数据清洗存储到目标地址() 

【https://gitee.com/robei/SparkSQLProject/blob/e60788c9e0f3c2ffc446d9aa8acaa5a66ac006fc/src/main/scala/com/imooc/log/SparkStatCleanJob.scala】
需求一的实现:统计主站最受欢迎的课程/手记的Top N访问次数
  1. /**
  2. * 需求一:主站最受欢迎的TopN课程统计
  3. *
  4. * @param spark
  5. * @param cleanDF
  6. */
  7. def videoAccessTopNStat(spark: SparkSession, cleanDF: DataFrame, day:String): Unit = {
  8. //------------------使用DataFrame API完成统计操作--------------------------------------------
  9. /* import spark.implicits._
  10. val videoAccessTopNDF = cleanDF.filter($"day" === day && $"cmsType" === "video")
  11. .groupBy("day","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)*/
  12. // videoAccessTopNDF.printSchema()
  13. /**
  14. * root
  15. * |-- day: string (nullable = true)
  16. * |-- cmsId: long (nullable = true)
  17. * |-- times: long (nullable = false)
  18. */
  19. // videoAccessTopNDF.show(10,false)
  20. /**
  21. * +--------+-----+------+
  22. * |day |cmsId|times |
  23. * +--------+-----+------+
  24. * |20170511|14540|111027|
  25. * |20170511|4000 |55734 |
  26. * |20170511|14704|55701 |
  27. * |20170511|14390|55683 |
  28. * |20170511|14623|55621 |
  29. * |20170511|4600 |55501 |
  30. * |20170511|4500 |55366 |
  31. * |20170511|14322|55102 |
  32. * +--------+-----+------+
  33. */
  34. //-------------------------使用SQL API完成操作-------------------------
  35. cleanDF.createOrReplaceTempView("access_logs")
  36. //创建临时表 access_logs
  37. val videoAccessTopNDF = spark.sql("select day,cmsId,count(1) as times from access_logs" +
  38. " where day="+day+" and cmsType='video'" +
  39. " group by day,cmsId order by times desc")
  40. videoAccessTopNDF.show(10, false)
  41. /**
  42. * +--------+-----+------+
  43. * |day |cmsId|times |
  44. * +--------+-----+------+
  45. * |20170511|14540|111027|
  46. * |20170511|4000 |55734 |
  47. * |20170511|14704|55701 |
  48. * |20170511|14390|55683 |
  49. * |20170511|14623|55621 |
  50. * |20170511|4600 |55501 |
  51. * |20170511|4500 |55366 |
  52. * |20170511|14322|55102 |
  53. * +--------+-----+------+
  54. */
  55. //-------------------将统计结果写入数据库-------------------
  56. try {
  57. videoAccessTopNDF.foreachPartition(partitionOfRecords => {
  58. val list = new ListBuffer[DayVideoAccessStat]
  59. partitionOfRecords.foreach(info => {
  60. val day = info.getAs[String]("day")
  61. val cmsId = info.getAs[Long]("cmsId")
  62. val times = info.getAs[Long]("times")
  63. list.append(DayVideoAccessStat(day, cmsId, times))
  64. })
  65. StatDAO.insertDayVideoAccessTopN(list)
  66. })
  67. }catch {
  68. case e:Exception => e.printStackTrace()
  69. }
  70. /**
  71. * 在mysql中创建day_video_access_topn_stat,主站最受欢迎的Top N课程
  72. * create table day_video_access_topn_stat (
  73. * day varchar(8) not null,
  74. * cms_id bigint(10) not null,
  75. * times bigint(10) not null,
  76. * primary key (day, cms_id)
  77. * );
  78. */
  79. }
需求二:按地市统计主站最受欢迎的Top N课程
  1. /**
  2. * 需求二:按地市统计主站最受欢迎的Top N课程
  3. * @param spark
  4. * @param cleanDF
  5. */
  6. def cityAccessTopSata(spark: SparkSession, cleanDF: DataFrame, day:String): Unit = {
  7. //------------------使用DataFrame API完成统计操作--------------------------------------------
  8. import spark.implicits._
  9. val cityAccessTopNDF = cleanDF.filter($"day" === day && $"cmsType" === "video")
  10. .groupBy("day","city","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
  11. // cityAccessTopNDF.printSchema()
  12. /**
  13. * root
  14. * |-- day: string (nullable = true)
  15. * |-- city: string (nullable = true)
  16. * |-- cmsId: long (nullable = true)
  17. * |-- times: long (nullable = false)
  18. */
  19. // cityAccessTopNDF.show(false)
  20. /**
  21. * +--------+----+-----+-----+
  22. * |day |city|cmsId|times|
  23. * +--------+----+-----+-----+
  24. * |20170511|浙江省 |14540|22435|
  25. * |20170511|北京市 |14540|22270|
  26. * |20170511|安徽省 |14540|22149|
  27. * |20170511|广东省 |14540|22115|
  28. * |20170511|上海市 |14540|22058|
  29. * |20170511|北京市 |4600 |11271|
  30. * |20170511|安徽省 |14390|11229|
  31. * |20170511|广东省 |14623|11226|
  32. * |20170511|上海市 |14704|11219|
  33. * |20170511|广东省 |14704|11216|
  34. * |20170511|广东省 |4600 |11215|
  35. * |20170511|上海市 |4000 |11182|
  36. * |20170511|北京市 |14390|11175|
  37. * |20170511|广东省 |4000 |11169|
  38. * |20170511|上海市 |4500 |11167|
  39. * |20170511|安徽省 |14704|11162|
  40. * |20170511|北京市 |4000 |11156|
  41. * |20170511|浙江省 |14322|11151|
  42. * |20170511|上海市 |14623|11149|
  43. * |20170511|广东省 |4500 |11136|
  44. * +--------+----+-----+-----+
  45. */
  46. //-----------Window函数在Spark SQL中的使用--------------------
  47. val cityTop3DF = cityAccessTopNDF.select(
  48. cityAccessTopNDF("day"),
  49. cityAccessTopNDF("city"),
  50. cityAccessTopNDF("cmsId"),
  51. cityAccessTopNDF("times"),
  52. row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
  53. .orderBy(cityAccessTopNDF("times").desc)).as("times_rank")
  54. ).filter("times_rank <= 3").orderBy($"city".desc,$"times_rank".asc)
  55. cityTop3DF.show(false)//展示每个地市的Top3
  56. /**
  57. * +--------+----+-----+-----+----------+
  58. * |day |city|cmsId|times|times_rank|
  59. * +--------+----+-----+-----+----------+
  60. * |20170511|浙江省 |14540|22435|1 |
  61. * |20170511|浙江省 |14322|11151|2 |
  62. * |20170511|浙江省 |14390|11110|3 |
  63. * |20170511|广东省 |14540|22115|1 |
  64. * |20170511|广东省 |14623|11226|2 |
  65. * |20170511|广东省 |14704|11216|3 |
  66. * |20170511|安徽省 |14540|22149|1 |
  67. * |20170511|安徽省 |14390|11229|2 |
  68. * |20170511|安徽省 |14704|11162|3 |
  69. * |20170511|北京市 |14540|22270|1 |
  70. * |20170511|北京市 |4600 |11271|2 |
  71. * |20170511|北京市 |14390|11175|3 |
  72. * |20170511|上海市 |14540|22058|1 |
  73. * |20170511|上海市 |14704|11219|2 |
  74. * |20170511|上海市 |4000 |11182|3 |
  75. * +--------+----+-----+-----+----------+
  76. */
  77. //-------------------将统计结果写入数据库-------------------
  78. try {
  79. cityTop3DF.foreachPartition(partitionOfRecords => {
  80. val list = new ListBuffer[DayCityVideoAccessStat]
  81. partitionOfRecords.foreach(info => {
  82. val day = info.getAs[String]("day")
  83. val cmsId = info.getAs[Long]("cmsId")
  84. val city = info.getAs[String]("city")
  85. val times = info.getAs[Long]("times")
  86. val timesRank = info.getAs[Int]("times_rank")
  87. list.append(DayCityVideoAccessStat(day, cmsId,city, times,timesRank))
  88. })
  89. StatDAO.insertDayCityVideoAccessTopN(list)
  90. })
  91. }catch {
  92. case e:Exception => e.printStackTrace()
  93. }
  94. /**
  95. * create table day_video_city_access_topn_stat (
  96. * day varchar(8) not null,
  97. * cms_id bigint(10) not null,
  98. * city varchar(20) not null,
  99. * times bigint(10) not null,
  100. * times_rank int not null,
  101. * primary key (day, cms_id, city)
  102. * );
  103. */
  104. }
需求三:按照流量统计主站最受欢迎的Top N课程
  1. /**
  2. * 需求三:按照流量统计主站最受欢迎的Top N课程
  3. * @param spark
  4. * @param cleanDF
  5. */
  6. def videoTraffsTopStat(spark: SparkSession, cleanDF: DataFrame, day:String): Unit = {
  7. //------------------使用DataFrame API完成统计操作--------------------------------------------
  8. import spark.implicits._
  9. val trafficsTopNDF = cleanDF.filter($"day" === day && $"cmsType" === "video")
  10. .groupBy("day","cmsId").agg(sum("traffic").as("traffics")).orderBy($"traffics".desc)
  11. trafficsTopNDF.show()
  12. /**
  13. * +--------+-----+--------+
  14. * | day|cmsId|traffics|
  15. * +--------+-----+--------+
  16. * |20170511|14540|55454898|
  17. * |20170511|14390|27895139|
  18. * |20170511| 4500|27877433|
  19. * |20170511| 4000|27847261|
  20. * |20170511|14623|27822312|
  21. * |20170511| 4600|27777838|
  22. * |20170511|14704|27737876|
  23. * |20170511|14322|27592386|
  24. * +--------+-----+--------+
  25. */
  26. //-------------------将统计结果写入数据库-------------------
  27. try {
  28. trafficsTopNDF.foreachPartition(partitionOfRecords => {
  29. val list = new ListBuffer[DayVideoTrafficsStat]
  30. partitionOfRecords.foreach(info => {
  31. val day = info.getAs[String]("day")
  32. val cmsId = info.getAs[Long]("cmsId")
  33. val traffics = info.getAs[Long]("traffics")
  34. list.append(DayVideoTrafficsStat(day, cmsId,traffics))
  35. })
  36. StatDAO.insertDayVideoTrafficsTopN(list)
  37. })
  38. }catch {
  39. case e:Exception => e.printStackTrace()
  40. }
  41. /**
  42. * create table day_video_traffics_topn_stat (
  43. * day varchar(8) not null,
  44. * cms_id bigint(10) not null,
  45. * traffics bigint(20) not null,
  46. * primary key (day, cms_id)
  47. * );
  48. */
  49. }
【https://gitee.com/robei/SparkSQLProject】
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/300955
推荐阅读
相关标签
  

闽ICP备14008679号