当前位置:   article > 正文

spark应用----统计分析电商网站的用户行为数据_用spark进行用户行为分析

用spark进行用户行为分析

目录

项目说明

题目一:Top5热门品类

题目二:Top5热门品类中每个品类的Top5活跃Session统计

 scala实现

 新建maven项目结构如下

 配置pom.xml文件

scala代码

python实现


项目说明

本项目的数据是采集电商网站的用户行为数据,主要包含用户的4种行为:搜索、点击、下单和支付。需求说明:品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。

数据格式如下:

下面是数据文件:
链接:https://pan.baidu.com/s/1w93qlqKGDp7bLbW569l2Aw?pwd=uvuw 

编号

字段名称

字段类型

字段含义

1

date

String

用户点击行为的日期

2

user_id

Long

用户的ID

3

session_id

String

Session的ID

4

page_id

Long

某个页面的ID

5

action_time

String

动作的时间点

6

search_keyword

String

用户搜索的关键词

7

click_category_id

Long

点击某一个商品品类的ID

8

click_product_id

Long

某一个商品的ID

9

order_category_ids

String

一次订单中所有品类的ID集合

10

order_product_ids

String

次订单中所有商品的ID集合

11

pay_category_ids

String

一次支付中所有品类的ID集合

12

pay_product_ids

String

次支付中所有商品的ID集合

13

city_id

Long

城市 id

题目一:Top5热门品类

需求:品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。本项目需求为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。输出结果如下:

 

  1. 通过使用groupby实现,Top5热门品类,将聚合后的数据排序,取前5名
  2. 通过使用reduceByKey实现,Top5热门品类,将聚合后的数据排序,取前5名

题目二:Top5热门品类中每个品类的Top5活跃Session统计

需求:对于排名前5的品类,分别获取每个品类点击次数排名前5的sessionId。(注意: 这里我们只关注点击次数,不关心下单和支付次数)这个就是说,对于top5的品类,每一个都要获取对它点击次数排名前5的sessionId。这个功能,可以让我们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的session的行为。

  1. 输出结果


 scala实现

 新建maven项目结构如下

 配置pom.xml文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>sparkrdd-141</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <scala.version>2.11.8</scala.version>
  11. <spark.version>2.1.0</spark.version>
  12. </properties>
  13. <dependencies>
  14. <!-- Spark Core -->
  15. <dependency>
  16. <groupId>org.apache.spark</groupId>
  17. <artifactId>spark-core_2.11</artifactId>
  18. <version>${spark.version}</version>
  19. </dependency>
  20. <!-- Scala 标准库 -->
  21. <dependency>
  22. <groupId>org.scala-lang</groupId>
  23. <artifactId>scala-library</artifactId>
  24. <version>${scala.version}</version>
  25. </dependency>
  26. </dependencies>
  27. </project>

scala代码

  1. import org.apache.log4j.{Level, Logger}
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.storage.StorageLevel
  4. object Main {
  5. def main(args: Array[String]): Unit = {
  6. Logger.getLogger("org").setLevel(Level.WARN)
  7. val conf = new SparkConf().setMaster("local[*]").setAppName("WordCountHelloWorld")
  8. val sc = new SparkContext(conf)
  9. // 加载数据
  10. val rdd01 = sc.textFile("src/main/resources/user_visit_action.txt")
  11. val rdd02 = rdd01.map(_.split("_"))
  12. val rdd03 = rdd02.map(x => x.slice(6, x.length)) // ['-1', '-1', 'null', 'null', 'null', 'null', '18'], ['9', '51', 'null', 'null', 'null', 'null', '6']
  13. // 过滤点击、订单和支付数据
  14. val click_RDD = rdd03.filter(_(0) != "-1")
  15. val order_RDD = rdd03.filter(_(2) != "null")
  16. val pay_RDD = rdd03.filter(_(4) != "null")
  17. // ==================================================================================================================
  18. // 题目1
  19. // 使用groupBy
  20. val click_RDD11 = click_RDD.map(x => (x(0), 1))
  21. val click_RDD12 = click_RDD11.groupBy(_._1)
  22. val click_RDD13 = click_RDD12.mapValues(_.map(_._2).sum)
  23. // println(click_RDD13.collect())
  24. val order_RDD11 = order_RDD.flatMap(x => x(2).split(','))
  25. val order_RDD12 = order_RDD11.map(x => (x, 1))
  26. val order_RDD13 = order_RDD12.groupBy(_._1)
  27. val order_RDD14 = order_RDD13.mapValues(_.map(_._2).sum)
  28. // println(order_RDD14.collect())
  29. val pay_RDD11 = pay_RDD.flatMap(x => x(4).split(','))
  30. val pay_RDD12 = pay_RDD11.map(x => (x, 1))
  31. val pay_RDD13 = pay_RDD12.groupBy(_._1)
  32. val pay_RDD14 = pay_RDD13.mapValues(_.map(_._2).sum)
  33. // println(pay_RDD14.collect())
  34. val end_RDD10 = click_RDD13.join(order_RDD14).join(pay_RDD14) // ('16', ((5928, 1782), 1233)), ('19', ((6044, 1722), 1158)),
  35. val end_RDD11 = end_RDD10.map(x => (x._1.toInt, x._2._1._1, x._2._1._2, x._2._2))
  36. // 缓存一下
  37. // end_RDD11.cache()
  38. end_RDD11.persist(StorageLevel.DISK_ONLY)
  39. // 法1 使用top
  40. val end_RDD_top = end_RDD11.top(5)(Ordering[(Int, Int, Int)].on(x => (x._2, x._3, x._4)))
  41. println('\n' + end_RDD_top.mkString("\n"))
  42. // 法2 使用sortBy
  43. val end_RDD_sort = end_RDD11.sortBy(x => (-x._2, -x._3, -x._4))
  44. println('\n' + end_RDD_sort.take(5).mkString("\n"))
  45. // ==================================================================================================================
  46. // 题目2
  47. // reduceByKey实现
  48. // 点击
  49. val click_RDD01 = click_RDD.map(x => (x(0), 1))
  50. val click_RDD02 = click_RDD01.reduceByKey(_ + _)
  51. // println('\n' + click_RDD02.collect())
  52. // 订单
  53. val order_RDD01 = order_RDD.flatMap(x => x(2).split(','))
  54. val order_RDD02 = order_RDD01.map(x => (x, 1))
  55. val order_RDD03 = order_RDD02.reduceByKey(_ + _)
  56. // println('\n' + order_RDD03.collect())
  57. // 支付
  58. val pay_RDD01 = pay_RDD.flatMap(x => x(4).split(','))
  59. val pay_RDD02 = pay_RDD01.map(x => (x, 1))
  60. val pay_RDD03 = pay_RDD02.reduceByKey(_ + _)
  61. // println('\n' + pay_RDD03.collect())
  62. val end_RDD = click_RDD02.join(order_RDD03).join(pay_RDD03)
  63. val end_RDD01 = end_RDD.map(x => (x._1.toInt, x._2._1._1, x._2._1._2, x._2._2))
  64. // 缓存一下
  65. // end_RDD01.cache()
  66. end_RDD01.persist(StorageLevel.DISK_ONLY)
  67. // 法1 使用top
  68. val end_RDD02 = end_RDD01.top(5)(Ordering[(Int, Int, Int)].on(x => (x._2, x._3, x._4)))
  69. println('\n' + end_RDD02.mkString("\n"))
  70. // 法2 使用sortBy
  71. val end_RDD03 = end_RDD01.sortBy(x => (-x._2, -x._3, -x._4))
  72. println('\n' + end_RDD03.take(5).mkString("\n"))
  73. // ==================================================================================================================
  74. // 题目3
  75. // 过滤出top5点击品类id 中的会话id
  76. val list_clickID_top = end_RDD_top.map(_._1).toList
  77. println(list_clickID_top)
  78. val session_RDD = rdd02.map(x => (x(6), x(2)))
  79. val session_RDD01 = session_RDD.filter(x => list_clickID_top.contains(x._1.toInt))
  80. // 构造 (clickid_sessionid,1)
  81. val session_RDD02 = session_RDD01.map(x => (x._1 + "_" + x._2, 1))
  82. // 聚合
  83. val session_RDD03 = session_RDD02.reduceByKey(_ + _)
  84. // 构造 (clickid,(sessionid,sum))
  85. val session_RDD04 = session_RDD03.map(x => (x._1.split("_"), x._2)).map(x => (x._1(0), (x._1(1), x._2)))
  86. // 根据clickid分组
  87. val session_RDD05 = session_RDD04.groupBy(_._1)
  88. // 排序取Top5热门品类中每个品类的Top5活跃Session统计
  89. val sorted_rdd = session_RDD05.mapValues(x => x.toList.sortBy(_._2._2).reverse.take(5))
  90. val sorted_rdd2 = sorted_rdd.mapValues(x => x.map(_._2))
  91. sorted_rdd2.foreach(x => println("(" + x._1 + ", " + x._2.toList + ")"))
  92. // println(sorted_rdd.collect())
  93. // val session_RDD06 = session_RDD05.map(x => (x._1, x._2.toList)).mapValues(x => (x.sorted(Ordering.by((_: (String, Int))._2).reverse).take(5))) // [('12', ('6502cdc9-cf95-4b08-8854-f03a25baa917', 1)),
  94. // println(sorted_rdd.collect())
  95. }
  96. }

python实现

  1. from pyspark import SparkConf, SparkContext, StorageLevel
  2. if __name__ == '__main__':
  3. conf = SparkConf().setSparkHome('spark_hw').setMaster("local[*]")
  4. sc = SparkContext(conf=conf)
  5. rdd01 = sc.textFile(r"D:\TASK\PythonProject\pythonSpark\01_RDD\user_visit_action.txt")
  6. rdd02 = rdd01.map(lambda x: x.split('_'))
  7. rdd03 = rdd02.map(lambda x: x[
  8. 6:]) # ['-1', '-1', 'null', 'null', 'null', 'null', '18'], ['9', '51', 'null', 'null', 'null', 'null', '6']]
  9. click_RDD = rdd03.filter(lambda x: x[0] != '-1')
  10. order_RDD = rdd03.filter(lambda x: x[2] != 'null')
  11. pay_RDD = rdd03.filter(lambda x: x[4] != 'null')
  12. # ==================================================================================================================
  13. # reduceByKey实现
  14. # 点击
  15. click_RDD01 = click_RDD.map(lambda x: (x[0], 1))
  16. click_RDD02 = click_RDD01.reduceByKey(lambda sum1, x: sum1 + x)
  17. # print('\n', click_RDD02.collect())
  18. # 订单
  19. order_RDD01 = order_RDD.flatMap(lambda x: x[2].split(','))
  20. order_RDD02 = order_RDD01.map(lambda x: (x, 1))
  21. order_RDD03 = order_RDD02.reduceByKey(lambda sum1, x: sum1 + x)
  22. # print('\n', order_RDD03.collect())
  23. # 支付
  24. pay_RDD01 = pay_RDD.flatMap(lambda x: x[4].split(','))
  25. pay_RDD02 = pay_RDD01.map(lambda x: (x, 1))
  26. pay_RDD03 = pay_RDD02.reduceByKey(lambda sum1, x: sum1 + x)
  27. # print('\n', pay_RDD03.collect())
  28. end_RDD = click_RDD02.join(order_RDD03).join(
  29. pay_RDD03) # ('16', ((5928, 1782), 1233)), ('19', ((6044, 1722), 1158)),
  30. end_RDD01 = end_RDD.map(lambda x: (int(x[0]), x[1][0][0], x[1][0][1], x[1][1]))
  31. # 缓存一下
  32. # end_RDD01.cache()
  33. end_RDD01.persist(StorageLevel.DISK_ONLY)
  34. # 法1 使用top
  35. end_RDD02 = end_RDD01.top(5, key=lambda x: (x[1], x[2], x[3]))
  36. print('\n', end_RDD02)
  37. # 法2 使用sortBy
  38. end_RDD02 = end_RDD01.sortBy(lambda x: (-x[1], -x[2], -x[3]))
  39. print('\n', end_RDD02.take(5))
  40. # ==================================================================================================================
  41. # 使用sortBy
  42. click_RDD11 = click_RDD.map(lambda x: (x[0], 1))
  43. click_RDD12 = click_RDD11.groupBy(lambda x: x[0])
  44. click_RDD13 = click_RDD12.mapValues(lambda x: sum(i for j, i in x))
  45. # print(click_RDD13.collect())
  46. order_RDD11 = order_RDD.flatMap(lambda x: x[2].split(','))
  47. order_RDD12 = order_RDD11.map(lambda x: (x, 1))
  48. order_RDD13 = order_RDD12.groupBy(lambda x: x[0])
  49. order_RDD14 = order_RDD13.mapValues(lambda x: sum(i for j, i in x))
  50. # print(order_RDD14.collect())
  51. pay_RDD11 = pay_RDD.flatMap(lambda x: x[4].split(','))
  52. pay_RDD12 = pay_RDD11.map(lambda x: (x, 1))
  53. pay_RDD13 = pay_RDD12.groupBy(lambda x: x[0])
  54. pay_RDD14 = pay_RDD13.mapValues(lambda x: sum(i for j, i in x))
  55. # print(pay_RDD14.collect())
  56. end_RDD10 = click_RDD13.join(order_RDD14).join(
  57. pay_RDD14) # ('16', ((5928, 1782), 1233)), ('19', ((6044, 1722), 1158)),
  58. end_RDD11 = end_RDD10.map(lambda x: (int(x[0]), x[1][0][0], x[1][0][1], x[1][1]))
  59. # 缓存一下
  60. # end_RDD11.cache()
  61. end_RDD11.persist(StorageLevel.DISK_ONLY)
  62. # 法1 使用top
  63. end_RDD_top = end_RDD11.top(5, key=lambda x: (x[1], x[2], x[3]))
  64. print('\n', end_RDD_top)
  65. # 法2 使用sortBy
  66. end_RDD_sort = end_RDD11.sortBy(lambda x: (-x[1], -x[2], -x[3]))
  67. print('\n', end_RDD_sort.take(5))
  68. # ==================================================================================================================
  69. # 过滤出top5点击品类id 中的会话id
  70. list_clickID_top = [x[0] for x in end_RDD_top]
  71. print(list_clickID_top)
  72. session_RDD = rdd02.map(lambda x: (x[6], x[2]))
  73. session_RDD01 = session_RDD.filter(lambda x: int(x[0]) in list_clickID_top)
  74. # 构造 (clickid_sessionid,1)
  75. session_RDD02 = session_RDD01.map(lambda x: (x[0] + '_' + x[1], 1))
  76. # 聚合
  77. session_RDD03 = session_RDD02.reduceByKey(lambda sum1, x: sum1 + x)
  78. # 构造 (clickid,(sessionid,sum))
  79. session_RDD04 = session_RDD03.map(lambda x: (x[0].split('_'), x[1])).map(lambda x: (x[0][0], (x[0][1], x[1])))
  80. # 根据 clcikid分组
  81. session_RDD05 = session_RDD04.groupBy(lambda x: x[0])
  82. session_RDD05.cache()
  83. session_RDD05.foreach(print)
  84. # for i in session_RDD05.collect():
  85. # for j in i:
  86. # print(list(j))
  87. # session_RDD05.saveAsTextFile("../my.txt")
  88. # x为迭代器对象 y为迭代器对象中的每一个元素
  89. sorted_rdd = session_RDD05.mapValues(lambda x: sorted(x, key=lambda y: y[1][1], reverse=True)[:5])
  90. sorted_rdd = sorted_rdd.mapValues(lambda x: (j for i, j in x))
  91. # modified_rdd = sorted_rdd.mapValues(lambda values: [y for _, y in values])
  92. sorted_rdd.foreach(lambda x: print("(", x[0], list(x[1]), ")"))
  93. # print(sorted_rdd.collect())
  94. # session_RDD06 = session_RDD05.map(lambda x: (x[0], list(x[1]))).mapValues(lambda x: (sorted(-i[1][1] for i in x)[:5])) # [('12', ('6502cdc9-cf95-4b08-8854-f03a25baa917', 1)),
  95. # print(sorted_rdd.collect())

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/813493
推荐阅读
相关标签
  

闽ICP备14008679号