当前位置:   article > 正文

spark项目实战-电商分析平台

spark项目

第1章 项目概述

电商分析平台是对用户访问电商平台的行为进行分析。

1.1 项目简介

  本项目主要讲解一个大型电商网站后台的企业级大数据统计分析平台,该平台以 Spark 为主,对电商网站的流量进行离线和实时的分析。
  该大数据分析平台对电商网站的各类用户行为(访问行为、购物行为、广告点击行为等)进行复杂的分析。用统计分析出来的数据,辅助公司中的 PM(产品经理)、数据分析师以及管理人员分析现有产品的状况,并根据用户行为分析结果持续改进产品的设计,以及调整公司的战略和业务。最终达到用大数据技术来帮助提高公司的业绩、营业额以及市场占有率的目标。
  项目主要使用了 Spark 技术生态栈中最经常使用的三个技术框架,Spark CoreSpark SQL 和 Spark Streaming,进行离线计算和实时计算业务模块的开发。实现了包括用户访问 session 分析、页面单跳转化率统计、热门商品离线统计、广告流量实时统计 4 个业务模块。
  项目中全部的业务功能模块都是直接从实际企业项目中抽取出来的,业务复杂度没有任何缩水,经过合理的将实际业务模块进行技术整合与改造,该项目几乎彻底涵盖了 Spark Core、Spark SQL 和 Spark Streaming 这三个技术框架中大部分的功能点、知识点。mysql

1.2 项目目标

  一、掌握电商系统中 Spark 的主要使用场景以及建设流程。
  二、掌握企业级的 Spark 项目的复杂性能调优、线上故障解决经验、数据倾斜全套处理方案。
  三、经过项目实战,彻底将 Spark 全部技术点和知识点都应用在项目中,掌握如何灵活应用 Spark 各项技术来实现各类复杂业务需求。nginx

1.3 业务需求简介

1.3.1 用户访问 session 统计

  用户在电商网站上,一般会有不少的访问行为,一般都是进入首页,而后可能点击首页上的一些商品,点击首页上的一些品类,也可能随时在搜索框里面搜索关键词,还可能将一些商品加入购物车,对购物车中的多个商品下订单,最后对订单中的多个商品进行支付。
  用户的每一次操做,其实能够理解为一个 action,在本项目中,咱们关注点击搜索下单支付这四个用户行为。
  用户 session,是在电商平台的角度定义的会话概念,指的就是,从用户第一次进入首页,session 就开始了。而后在必定时间范围内,直到最后操做完(可能作了几十次、甚至上百次操做),离开网站,关闭浏览器,或者长时间没有作操做,那么 session 就结束了。
  以上用户在网站内的访问过程,就称之为一次 session。简单理解,session 就是某一天某一个时间段内,某个用户对网站从打开/进入,到作了大量操做,到最后关闭浏览器的过程,就叫作 session。
  session 实际上就是一个电商网站中最基本的数据。那么面向消费者/用户端的大数据分析(C端),最基本的就是面向用户访问行为/用户访问 session 的分析。
  该模块主要是对用户访问 session 进行统计分析,包括 session 的聚合指标计算、按时间比例随机抽取 session、获取天天点击、下单和购买排名前 10 的品类、并获取 top10 品类的点击量排名前 10 的 session。该模块可让产品经理、数据分析师以及企业管理层形象地看到各类条件下的具体用户行为以及统计指标,从而对公司的产品设计以及业务发展战略作出调整。主要使用 Spark Core 实现。web

1.3.2 页面单跳转化率统计

  页面单跳转化率是一个很是有用的统计数据。
  产品经理,能够根据这个指标,去尝试分析整个网站/产品,各个页面的表现怎么样,是否是须要去优化产品的布局;吸引用户最终能够进入最后的支付页面。
  数据分析师,能够基于此数据,作更深一步的计算和分析。
  企业管理层,能够看到整个公司的网站,各个页面的之间的跳转的表现如何,作到内心有数,能够适当调整公司的经营战略或策略。
  该模块主要是计算关键页面之间的单步跳转转化率,涉及到页面切片算法以及页面流匹配算法。该模块可让产品经理、数据分析师以及企业管理层看到各个关键页面之间的转化率,从而对网页布局,进行更好的优化设计。主要使用 Spark Core 实现。算法

1.3.3 区域热门商品离线统计

  该模块主要实现天天统计出各个区域的 top3 热门商品
  咱们认为,不一样地区的经济发展水平不一样,地理环境及气候不一样,人们的风土人情和风俗习惯不一样,所以对于不一样商品的需求不一样,根据区域热门商品的统计,可让公司决策层更好的对不一样类型商品进行布局,使商品进入最须要他的区域。
  该模块可让企业管理层看到公司售卖的商品的总体状况,从而对公司的商品相关的战略进行调整。主要使用 Spark SQL 实现。sql

1.3.4 广告流量实时统计

  网站/app 中常常会给第三方平台作广告,这也是一些互联网公司的核心收入来源;当广告位招商完成后,广告会在 网站/app 的某个广告位发布出去,当用户访问 网站/app 的时候,会看到相应位置的广告,此时,有些用户可能就会去点击那个广告。
  咱们要获取用户点击广告的行为,并针对这一行为进行计算和统计。
  用户每次点击一个广告之后,会产生相应的埋点日志;在大数据实时统计系统中,会经过某些方式将数据写入到分布式消息队列中(Kafka)。
  日志发送给后台 web 服务器(nginx),nginx 将日志数据负载均衡到多个 Tomcat 服务器上,Tomcat 服务器会不断将日志数据写入 Tomcat 日志文件中,写入后,就会被日志采集客户端(好比 Flume Agent)所采集,随后写入到消息队列中(Kafka),咱们的实时计算程序会从消息队列中( Kafka)去实时地拉取数据,而后对数据进行实时的计算和统计。
Kafka这个模块的意义在于,让产品经理、高管能够实时地掌握到公司打的各类广告的投放效果。以便于后期持续地对公司的广告投放相关的战略和策略,进行调整和优化;以指望得到最好的广告收益。
  该模块负责实时统计公司的广告流量,包括广告展示流量和广告点击流量。实现动态黑名单机制,以及黑名单过滤实现滑动窗口内的各城市的广告展示流量和广告点击流量的统计实现每一个区域每一个广告的点击流量实时统计实现每一个区域 top3 点击量的广告的统计。主要使用 Spark Streaming 实现。数据库

第2章 项目主体架构

2.1 项目架构

  用户行为数据在网站上最简单的存在形式就是日志。网站在运行过程当中会产生大量的原始日志 RAW LOG,将其存储在文件系统中,企业会将多种原始日志按照用户行为汇总成会话日志 SESSION LOG,每个会话日志表示用户的一种反馈。
  本项目分为离线分析系统实时分析系统两大模块。
  在离线分析系统中,咱们将模拟业务数据写入 Hive 表中,离线分析系统从 Hive 中获取数据,并根据实际需求(用户访问 Session 分析、页面单跳转化率分析、各区域热门商品统计) 对数据进行处理,最终将分析完毕的统计数据存储到 MySQL 的对应表格中。
  在实时分析系统中,咱们将模拟业务数据写入 Kafka 集群中, 实时分析系统从 Kafka broker 中获取数据,经过 Spark Streaming 的流式处理对广告点击流量进行实时分析,最终将统计结果存储到 MySQL 的对应表格中。express

2.2 离线日志采集宏观流程(参考)

2.3 实时日志采集宏观流程(参考)

2.4 离线/实时日志采集框架

  上图是一个企业级的日志处理框架,这一框架实现了对日志信息进行采集、汇总、清洗、聚合、分析的完整过程,并将日志数据分别存储到了离线和实时数据处理模块中,使得分析系统能够经过离线和实时两个角度对数据进行分析统计,并根据统计结果指导业务平台的改良和优化。apache

第3章 模拟业务数据源

3.1 离线数据

举例

3.1.1 数据模型与数听说明

一、user_visit_action
user_visit_action 表,存放网站或者 APP 天天的点击流数据。通俗地讲,就是用户对 网站/APP 每点击一下,就会产生一条存放在这个表里面的数据。


user_visit_action 表中的字段解析以下所示:

  1. 字段名称            说明
  2. date                日期,表明这个用户点击行为是在哪一天发生的
  3. user_id             用户 ID,惟一地标识某个用户
  4. session_id          Session ID,惟一地标识某个用户的一个访问 session
  5. page_id             页面 ID,点击了某些商品/品类,也多是搜索了某个关键词,而后进入了某个页面,页面的 id
  6. action_time         动做时间,这个点击行为发生的时间点
  7. search_keyword      搜索关键词,若是用户执行的是一个搜索行为,好比说在 网站/app 中,搜索了某个关键词,而后会跳转到商品列表页面
  8. click_category_id   点击品类 ID,多是在网站首页,点击了某个品类(美食、电子设备、电脑)
  9. click_product_id    点击商品 ID,多是在网站首页,或者是在商品列表页,点击了某个商品(好比呷哺呷哺火锅 XX 路店 3 人套餐、iphone 6s)
  10. order_category_ids  下单品类 ID,表明了可能将某些商品加入了购物车,而后一次性对购物车中的商品下了一个订单,这就表明了某次下单的行为中,有哪些商品品类,可能有 6 个商品,可是就对应了 2 个品类,好比有 3 根火腿肠(食品品类),3 个电池(日用品品类)
  11. order_product_ids   下单商品 ID,某次下单,具体对哪些商品下的订单
  12. pay_category_ids    付款品类 ID,对某个订单,或者某几个订单,进行了一次支付的行为,对应了哪些品类
  13. pay_product_ids     付款商品 ID,支付行为下,对应的哪些具体的商品
  14. city_id             城市 ID,表明该用户行为发生在哪一个城市

二、user_info
user_info 表,是一张普通的用户基本信息表;这张表中存放了 网站/APP 全部注册用户的基本信息。


user_info 表中的字段解析以下所示:

  1. 字段名称            说明
  2. user_id             用户 ID,惟一地标识某个用户
  3. username            用户登陆名
  4. name                用户昵称或真实姓名
  5. age                 用户年龄
  6. professional        用户职业
  7. city                用户所在城市
  8. sex                 用户性别

三、product_info
product_info 表,是一张普通的商品基本信息表;这张表中存放了 网站/APP 全部商品的基本信息。


product_info 表中的字段解析以下所示:

  1. 字段名称            说明
  2. proudct_id          商品 ID,惟一地标识某个商品
  3. product_name        商品名称
  4. extend_info         额外信息,例如商品为自营商品仍是第三方商品

3.2 实时数据

3.2.1 数据模型与数听说明

程序每 5 秒向 Kafka 集群写入数据,格式以下:
格式 :timestamp province city userid adid
在线数据的字段解析以下所示:

  1. 字段名称            取值范围
  2. timestamp           当前时间毫秒
  3. userId              0 – 99
  4. provice/city        1 – 9((0L," 北京"," 北京"),(1L," 上海"," 上海"),(2L," 南京"," 江苏省"),(3L,"广州","广东省"),(4L,"三亚","海南省"),(5L,"武汉","湖北省"),(6L,"长沙","湖南省"),(7L,"西安","陕西省"),(8L,"成都","四川省"),(9L,"哈尔滨","东北省"))
  5. adid                0 - 19

第4章 程序框架解析

新建一个 maven 工程 commerce_basic 做为父 maven 工程,引入依赖 pom.xml
pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <groupId>com.atguigu</groupId>
  7.     <artifactId>commerce</artifactId>
  8.     <packaging>pom</packaging>
  9.     <version>1.0-SNAPSHOT</version>
  10.     <modules>
  11.         <module>commons</module>
  12.         <module>mock</module>
  13.     </modules>
  14.     <!-- 声明子项目公用的配置属性 -->
  15.     <properties>
  16.         <spark.version>2.1.1</spark.version>
  17.         <scala.version>2.11.8</scala.version>
  18.         <log4j.version>1.2.17</log4j.version>
  19.         <slf4j.version>1.7.22</slf4j.version>
  20.     </properties>
  21.     <!-- 声明并引入子项目共有的依赖 -->
  22.     <dependencies>
  23.         <!-- 全部子项目的日志框架 -->
  24.         <dependency>
  25.             <groupId>org.slf4j</groupId>
  26.             <artifactId>jcl-over-slf4j</artifactId>
  27.             <version>${slf4j.version}</version>
  28.         </dependency>
  29.         <dependency>
  30.             <groupId>org.slf4j</groupId>
  31.             <artifactId>slf4j-api</artifactId>
  32.             <version>${slf4j.version}</version>
  33.         </dependency>
  34.         <dependency>
  35.             <groupId>org.slf4j</groupId>
  36.             <artifactId>slf4j-log4j12</artifactId>
  37.             <version>${slf4j.version}</version>
  38.         </dependency>
  39.         <!-- 具体的日志实现 -->
  40.         <dependency>
  41.             <groupId>log4j</groupId>
  42.             <artifactId>log4j</artifactId>
  43.             <version>${log4j.version}</version>
  44.         </dependency>
  45.         <!-- Logging End -->
  46.     </dependencies>
  47.     <dependencyManagement>
  48.         <dependencies>
  49.             <!-- 引入 Spark 相关的 Jar 包 -->
  50.             <dependency>
  51.                 <groupId>org.apache.spark</groupId>
  52.                 <artifactId>spark-core_2.11</artifactId>
  53.                 <version>${spark.version}</version>
  54.                 <!-- provider 若是存在,那么运行时该 Jar 包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
  55.                 <!--<scope>provided</scope>-->
  56.             </dependency>
  57.             <dependency>
  58.                 <groupId>org.apache.spark</groupId>
  59.                 <artifactId>spark-sql_2.11</artifactId>
  60.                 <version>${spark.version}</version>
  61.                 <!--<scope>provided</scope>-->
  62.             </dependency>
  63.             <dependency>
  64.                 <groupId>org.apache.spark</groupId>
  65.                 <artifactId>spark-streaming_2.11</artifactId>
  66.                 <version>${spark.version}</version>
  67.                 <!--<scope>provided</scope>-->
  68.             </dependency>
  69.             <dependency>
  70.                 <groupId>org.apache.spark</groupId>
  71.                 <artifactId>spark-mllib_2.11</artifactId>
  72.                 <version>${spark.version}</version>
  73.                 <!--<scope>provided</scope>-->
  74.             </dependency>
  75.             <dependency>
  76.                 <groupId>org.apache.spark</groupId>
  77.                 <artifactId>spark-graphx_2.11</artifactId>
  78.                 <version>${spark.version}</version>
  79.                 <!--<scope>provided</scope>-->
  80.             </dependency>
  81.             <dependency>
  82.                 <groupId>org.scala-lang</groupId>
  83.                 <artifactId>scala-library</artifactId>
  84.                 <version>${scala.version}</version>
  85.                 <!--<scope>provided</scope>-->
  86.             </dependency>
  87.             <dependency>
  88.                 <groupId>org.apache.spark</groupId>
  89.                 <artifactId>spark-hive_2.11</artifactId>
  90.                 <version>${spark.version}</version>
  91.                 <!--<scope>provided</scope>-->
  92.             </dependency>
  93.         </dependencies>
  94.     </dependencyManagement>
  95.     <!-- 声明构建信息 -->
  96.     <build>
  97.         <!-- 声明并引入子项目共有的插件:插件就是负责到 Maven 各个声明周期的具体实现 -->
  98.         <plugins>
  99.             <plugin>
  100.                 <groupId>org.apache.maven.plugins</groupId>
  101.                 <artifactId>maven-compiler-plugin</artifactId>
  102.                 <version>3.6.1</version>
  103.                 <!-- 全部的编译都依照 JDK1.8 来搞 -->
  104.                 <configuration>
  105.                     <source>1.8</source>
  106.                     <target>1.8</target>
  107.                 </configuration>
  108.             </plugin>
  109.         </plugins>
  110.         <!-- 仅声明子项目共有的插件,若是子项目须要此插件,那么子项目须要声明 -->
  111.         <pluginManagement>
  112.             <plugins>
  113.                 <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
  114.                 <plugin>
  115.                     <groupId>net.alchim31.maven</groupId>
  116.                     <artifactId>scala-maven-plugin</artifactId>
  117.                     <version>3.2.2</version>
  118.                     <executions>
  119.                         <execution>
  120.                             <!-- 声明绑定到 maven 的 compile 阶段 -->
  121.                             <goals>
  122.                                 <goal>compile</goal>
  123.                                 <goal>testCompile</goal>
  124.                             </goals>
  125.                         </execution>
  126.                     </executions>
  127.                 </plugin>
  128.                 <!-- 用于项目的打包插件 -->
  129.                 <plugin>
  130.                     <groupId>org.apache.maven.plugins</groupId>
  131.                     <artifactId>maven-assembly-plugin</artifactId>
  132.                     <version>3.0.0</version>
  133.                     <executions>
  134.                         <execution>
  135.                             <id>make-assembly</id>
  136.                             <phase>package</phase>
  137.                             <goals>
  138.                                 <goal>single</goal>
  139.                             </goals>
  140.                         </execution>
  141.                     </executions>
  142.                 </plugin>
  143.             </plugins>
  144.         </pluginManagement>
  145.     </build>
  146. </project>

4.1 mock 模块(模拟数据产生模块)

新建一个模块 maven 工程 mock 做为子 maven 工程,引入依赖 pom.xml
pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <parent>
  7.         <artifactId>commerce</artifactId>
  8.         <groupId>com.atguigu</groupId>
  9.         <version>1.0-SNAPSHOT</version>
  10.     </parent>
  11.     <artifactId>mock</artifactId>
  12.     <dependencies>
  13.         <dependency>
  14.             <groupId>com.atguigu</groupId>
  15.             <artifactId>commons</artifactId>
  16.             <version>1.0-SNAPSHOT</version>
  17.         </dependency>
  18.         <!-- Spark 的依赖引入 -->
  19.         <dependency>
  20.             <groupId>org.apache.spark</groupId>
  21.             <artifactId>spark-core_2.11</artifactId>
  22.         </dependency>
  23.         <dependency>
  24.             <groupId>org.apache.spark</groupId>
  25.             <artifactId>spark-sql_2.11</artifactId>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>org.apache.spark</groupId>
  29.             <artifactId>spark-hive_2.11</artifactId>
  30.         </dependency>
  31.         <dependency>
  32.             <groupId>org.apache.kafka</groupId>
  33.             <artifactId>kafka-clients</artifactId>
  34.             <version>0.10.2.1</version>
  35.         </dependency>
  36.     </dependencies>
  37.     <build>
  38.         <plugins>
  39.             <plugin>
  40.                 <groupId>net.alchim31.maven</groupId>
  41.                 <artifactId>scala-maven-plugin</artifactId>
  42.             </plugin>
  43.         </plugins>
  44.     </build>
  45. </project>

类(包)名称

类(包)结构图

MockDataGenerate.scala

  1. import java.util.UUID
  2. import commons.model.{ProductInfo, UserInfo, UserVisitAction}
  3. import commons.utils.{DateUtils, StringUtils}
  4. import org.apache.spark.SparkConf
  5. import org.apache.spark.sql.{DataFrame, SparkSession}
  6. import scala.collection.mutable.ArrayBuffer
  7. import scala.util.Random
  8. /**
  9.   * 离线模拟数据的生成
  10.   *
  11.   * date:是当前日期
  12.   * age: 0 - 59
  13.   * professionals: professional[0 - 99]
  14.   * cities: 0 - 99
  15.   * sex: 0 - 1
  16.   * keywords: ("火锅""蛋糕""重庆辣子鸡""重庆小面""呷哺呷哺""新辣道鱼火锅""国贸大厦""太古商场""日本料理""温泉")
  17.   * categoryIds: 0 - 99
  18.   * ProductId: 0 - 99
  19.   */
  20. object MockDataGenerate {
  21.   /**
  22.     * 模拟用户的行为信息
  23.     *
  24.     * @return
  25.     */
  26.   private def mockUserVisitActionData(): Array[UserVisitAction] = {
  27.     val rows = ArrayBuffer[UserVisitAction]()
  28.     val random = new Random()
  29.     val searchKeywords = Array("华为手机""联想笔记本""小龙虾""卫生纸""吸尘器""Lamer""机器学习""苹果""洗面奶""保温杯")
  30.     // yyyy-MM-dd
  31.     val date = DateUtils.getTodayDate()
  32.     // 关注四个行为:搜索、点击、下单、支付
  33.     val actions = Array("search""click""order""pay")
  34.     // 一共 100 个用户(有重复)
  35.     for (i <0 until 100) {
  36.       val userid = random.nextInt(100)
  37.       // 每一个用户产生 10 个 session
  38.       for (j <0 until 10) {
  39.         // 不可变的,全局的,独一无二的 128bit 长度的标识符,用于标识一个 session,体现一次会话产生的 sessionId 是独一无二的
  40.         val sessionid = UUID.randomUUID().toString().replace("-""")
  41.         // 在 yyyy-MM-dd 后面添加一个随机的小时时间(0-23
  42.         val baseActionTime = date + " " + random.nextInt(23// 2019-05-30 12
  43.         // 每一个 (userid + sessionid) 生成 0-100 条用户访问数据
  44.         for (k <0 to random.nextInt(100)) {
  45.           val pageid = random.nextInt(10)
  46.           // 在 yyyy-MM-dd HH 后面添加一个随机的分钟时间和秒时间,2019-05-30 12:25:30
  47.           val actionTime = baseActionTime + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))) + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59)))
  48.           var searchKeyword: String = null
  49.           var clickCategoryId: Long = -1L
  50.           var clickProductId: Long = -1L
  51.           var orderCategoryIds: String = null
  52.           var orderProductIds: String = null
  53.           var payCategoryIds: String = null
  54.           var payProductIds: String = null
  55.           val cityid = random.nextInt(10).toLong
  56.           // 随机肯定用户在当前 session 中的行为
  57.           val action = actions(random.nextInt(4))
  58.           // 根据随机产生的用户行为 action 决定对应字段的值
  59.           action match {
  60.             case "search" => searchKeyword = searchKeywords(random.nextInt(10))
  61.             case "click" => clickCategoryId = random.nextInt(100).toLong
  62.               clickProductId = String.valueOf(random.nextInt(100)).toLong
  63.             case "order" => orderCategoryIds = random.nextInt(100).toString
  64.               orderProductIds = random.nextInt(100).toString
  65.             case "pay" => payCategoryIds = random.nextInt(100).toString
  66.               payProductIds = random.nextInt(100).toString
  67.           }
  68.           rows += UserVisitAction(date, userid, sessionid,
  69.             pageid, actionTime, searchKeyword,
  70.             clickCategoryId, clickProductId,
  71.             orderCategoryIds, orderProductIds,
  72.             payCategoryIds, payProductIds, cityid)
  73.         }
  74.       }
  75.     }
  76.     rows.toArray
  77.   }
  78.   /**
  79.     * 模拟用户信息表
  80.     *
  81.     * @return
  82.     */
  83.   private def mockUserInfo(): Array[UserInfo] = {
  84.     val rows = ArrayBuffer[UserInfo]()
  85.     val sexes = Array("male""female")
  86.     val random = new Random()
  87.     // 随机产生 100 个用户的我的信息
  88.     for (i <0 until 100) {
  89.       val userid = i
  90.       val username = "user" + i
  91.       val name = "name" + i
  92.       val age = random.nextInt(60)
  93.       val professional = "professional" + random.nextInt(100)
  94.       val city = "city" + random.nextInt(100)
  95.       val sex = sexes(random.nextInt(2))
  96.       rows += UserInfo(userid, username, name, age, professional, city, sex)
  97.     }
  98.     rows.toArray
  99.   }
  100.   /**
  101.     * 模拟产品数据表
  102.     *
  103.     * @return
  104.     */
  105.   private def mockProductInfo(): Array[ProductInfo] = {
  106.     val rows = ArrayBuffer[ProductInfo]()
  107.     val random = new Random()
  108.     val productStatus = Array(01)
  109.     // 随机产生 100 个产品信息
  110.     for (i <0 until 100) {
  111.       val productId = i
  112.       val productName = "product" + i
  113.       val extendInfo = "{\"product_status\": " + productStatus(random.nextInt(2)) + "}" // 注意这里是 json 串
  114.       rows += ProductInfo(productId, productName, extendInfo)
  115.     }
  116.     rows.toArray
  117.   }
  118.   /**
  119.     * 将 DataFrame 插入到 Hive 表中
  120.     *
  121.     * @param spark     SparkSQL 客户端
  122.     * @param tableName 表名
  123.     * @param dataDF    DataFrame
  124.     */
  125.   private def insertHive(spark: SparkSession, tableName: String, dataDF: DataFrame): Unit = {
  126.     spark.sql("DROP TABLE IF EXISTS " + tableName)
  127.     dataDF.write.saveAsTable(tableName)
  128.   }
  129.   val USER_VISIT_ACTION_TABLE = "user_visit_action"
  130.   val USER_INFO_TABLE = "user_info"
  131.   val PRODUCT_INFO_TABLE = "product_info"
  132.   /**
  133.     * 主入口方法
  134.     *
  135.     * @param args 启动参数
  136.     */
  137.   def main(args: Array[String]): Unit = {
  138.     // 建立 Spark 配置
  139.     val sparkConf = new SparkConf().setAppName("MockData").setMaster("local[*]")
  140.     // 建立 Spark SQL 客户端
  141.     val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
  142.     // 模拟数据
  143.     val userVisitActionData = this.mockUserVisitActionData()
  144.     val userInfoData = this.mockUserInfo()
  145.     val productInfoData = this.mockProductInfo()
  146.     // 将模拟数据转换为 RDD
  147.     val userVisitActionRdd = spark.sparkContext.makeRDD(userVisitActionData)
  148.     val userInfoRdd = spark.sparkContext.makeRDD(userInfoData)
  149.     val productInfoRdd = spark.sparkContext.makeRDD(productInfoData)
  150.     // 加载 SparkSQL 的隐式转换支持
  151.     import spark.implicits._
  152.     // 将用户访问数据转换为 DF 保存到 Hive 表中
  153.     val userVisitActionDF = userVisitActionRdd.toDF()
  154.     insertHive(spark, USER_VISIT_ACTION_TABLE, userVisitActionDF)
  155.     // 将用户信息数据转换为 DF 保存到 Hive 表中
  156.     val userInfoDF = userInfoRdd.toDF()
  157.     insertHive(spark, USER_INFO_TABLE, userInfoDF)
  158.     // 将产品信息数据转换为 DF 保存到 Hive 表中
  159.     val productInfoDF = productInfoRdd.toDF()
  160.     insertHive(spark, PRODUCT_INFO_TABLE, productInfoDF)
  161.     spark.close
  162.   }
  163. }

MockRealTimeData.scala

  1. import java.util.Properties
  2. import commons.conf.ConfigurationManager
  3. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
  4. import scala.collection.mutable.ArrayBuffer
  5. import scala.util.Random
  6. object MockRealTimeData {
  7.   def main(args: Array[String]): Unit = {
  8.     // 获取配置文件 commerce.properties 中的 Kafka 配置参数
  9.     val broker = ConfigurationManager.config.getString("kafka.broker.list")
  10.     val topic = ConfigurationManager.config.getString("kafka.topics")
  11.     // 建立 Kafka 生产者
  12.     val kafkaProducer = createKafkaProducer(broker)
  13.     while (true) {
  14.       // 随机产生实时数据并经过 Kafka 生产者发送到 Kafka 集群中
  15.       for (item <- generateMockData()) {
  16.         kafkaProducer.send(new ProducerRecord[StringString](topic, item))
  17.       }
  18.       Thread.sleep(5000)
  19.     }
  20.   }
  21.   /**
  22.     * 实时模拟数据的生成
  23.     *
  24.     * 时间点: 当前时间毫秒
  25.     * userId: 0 - 99
  26.     * 省份、城市 ID 相同: 1 - 9
  27.     * adid: 0 - 19
  28.     * ((0L,"北京","北京"),(1L,"上海","上海"),(2L,"南京","江苏省"),(3L,"广州","广东省"),(4L,"三亚","海南省"),(5L,"武汉","湖北省"),(6L,"长沙","湖南省"),(7L,"西安","陕西省"),(8L,"成都","四川省"),(9L,"哈尔滨","东北省"))
  29.     *
  30.     * 格式 :timestamp province city userid adid
  31.     *       某个时间点 某个省份 某个城市 某个用户 某个广告
  32.     */
  33.   def generateMockData(): Array[String= {
  34.     val array = ArrayBuffer[String]()
  35.     val random = new Random()
  36.     // 模拟实时数据:timestamp province city userid adid
  37.     for (i <0 until 50) {
  38.       val timestamp = System.currentTimeMillis()
  39.       val province = random.nextInt(10)
  40.       val city = province
  41.       val adid = random.nextInt(20)
  42.       val userid = random.nextInt(100)
  43.       // 拼接实时数据
  44.       array += timestamp + " " + province + " " + city + " " + userid + " " + adid
  45.     }
  46.     array.toArray
  47.   }
  48.   def createKafkaProducer(broker: String): KafkaProducer[StringString= {
  49.     // 建立配置对象
  50.     val prop = new Properties()
  51.     // 添加配置
  52.     prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
  53.     prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  54.     prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  55.     // 根据配置建立 Kafka 生产者
  56.     new KafkaProducer[StringString](prop)
  57.   }
  58. }

4.2 commons 模块(公共模块)

新建一个模块 maven 工程 commons 做为子 maven 工程,引入依赖 pom.xml
pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <parent>
  6.         <artifactId>commerce</artifactId>
  7.         <groupId>com.atguigu</groupId>
  8.         <version>1.0-SNAPSHOT</version>
  9.     </parent>
  10.     <modelVersion>4.0.0</modelVersion>
  11.     <artifactId>commons</artifactId>
  12.     <dependencies>
  13.         <dependency>
  14.             <groupId>net.sf.json-lib</groupId>
  15.             <artifactId>json-lib</artifactId>
  16.             <version>2.4</version>
  17.             <classifier>jdk15</classifier>
  18.         </dependency>
  19.         <dependency>
  20.             <groupId>org.apache.commons</groupId>
  21.             <artifactId>commons-pool2</artifactId>
  22.             <version>2.4.2</version>
  23.         </dependency>
  24.         <dependency>
  25.             <groupId>org.apache.commons</groupId>
  26.             <artifactId>commons-configuration2</artifactId>
  27.             <version>2.2</version>
  28.         </dependency>
  29.         <dependency>
  30.             <groupId>commons-beanutils</groupId>
  31.             <artifactId>commons-beanutils</artifactId>
  32.             <version>1.9.3</version>
  33.         </dependency>
  34.         <dependency>
  35.             <groupId>org.scala-lang</groupId>
  36.             <artifactId>scala-library</artifactId>
  37.         </dependency>
  38.         <dependency>
  39.             <groupId>mysql</groupId>
  40.             <artifactId>mysql-connector-java</artifactId>
  41.             <version>5.1.6</version>
  42.         </dependency>
  43.         <dependency>
  44.             <groupId>joda-time</groupId>
  45.             <artifactId>joda-time</artifactId>
  46.             <version>2.9.9</version>
  47.         </dependency>
  48.     </dependencies>
  49.     <build>
  50.         <plugins>
  51.             <plugin>
  52.                 <groupId>net.alchim31.maven</groupId>
  53.                 <artifactId>scala-maven-plugin</artifactId>
  54.             </plugin>
  55.         </plugins>
  56.     </build>
  57. </project>

类(包)名称

工具类名称

类(包)结构图

ConfigurationManager.scala

  1. package commons.conf
  2. import org.apache.commons.configuration2.{FileBasedConfiguration, PropertiesConfiguration}
  3. import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder
  4. import org.apache.commons.configuration2.builder.fluent.Parameters
  5. /**
  6.   * 配置工具类:新的读取配置文件信息的方式
  7.   */
  8. object ConfigurationManager {
  9.   // 建立用于初始化配置生成器实例的参数对象
  10.   private val params = new Parameters()
  11.   // FileBasedConfigurationBuilder : 产生一个传入的类的实例对象
  12.   // FileBasedConfiguration : 融合 FileBased 与 Configuration 的接口
  13.   // PropertiesConfiguration : 从一个或者多个文件读取配置的标准配置加载器
  14.   // configure() : 经过 params 实例初始化配置生成器
  15.   // 向 FileBasedConfigurationBuilder() 中传入一个标准配置加载器类,生成一个加载器类的实例对象,而后经过 params 参数对其初始化
  16.   private val builder = new FileBasedConfigurationBuilder[FileBasedConfiguration](classOf[PropertiesConfiguration])
  17.     .configure(params.properties().setFileName("commerce.properties"))
  18.   // 经过 getConfiguration 获取配置对象
  19.   val config = builder.getConfiguration()
  20. }

Constants.scala

  1. package commons.constant
  2. /**
  3.   * 常量接口
  4.   */
  5. object Constants {
  6.   /**
  7.     * 项目配置相关的常量
  8.     */
  9.   val JDBC_DATASOURCE_SIZE = "jdbc.datasource.size"
  10.   val JDBC_URL = "jdbc.url"
  11.   val JDBC_USER = "jdbc.user"
  12.   val JDBC_PASSWORD = "jdbc.password"
  13.   val KAFKA_BROKERS = "kafka.broker.list"
  14.   val KAFKA_TOPICS = "kafka.topics"
  15.   /**
  16.     * Spark 做业相关的常量
  17.     */
  18.   val SPARK_APP_NAME_SESSION = "UserVisitSessionAnalyzeSpark"
  19.   val SPARK_APP_NAME_PAGE = "PageOneStepConvertRateSpark"
  20.   /**
  21.     * user_visit_action、user_info、product_info 表中字段对应的字段名常量
  22.     */
  23.   val FIELD_SESSION_ID = "sessionid"
  24.   val FIELD_SEARCH_KEYWORDS = "searchKeywords"
  25.   val FIELD_CLICK_CATEGORY_IDS = "clickCategoryIds"
  26.   val FIELD_AGE = "age"
  27.   val FIELD_PROFESSIONAL = "professional"
  28.   val FIELD_CITY = "city"
  29.   val FIELD_SEX = "sex"
  30.   val FIELD_VISIT_LENGTH = "visitLength"
  31.   val FIELD_STEP_LENGTH = "stepLength"
  32.   val FIELD_START_TIME = "startTime"
  33.   val FIELD_CLICK_COUNT = "clickCount"
  34.   val FIELD_ORDER_COUNT = "orderCount"
  35.   val FIELD_PAY_COUNT = "payCount"
  36.   val FIELD_CATEGORY_ID = "categoryId"
  37.   /**
  38.     * Spark 累加器 Key 名称常量
  39.     */
  40.   val SESSION_COUNT = "session_count"
  41.   val TIME_PERIOD_1s_3= "1s_3s"
  42.   val TIME_PERIOD_4s_6= "4s_6s"
  43.   val TIME_PERIOD_7s_9= "7s_9s"
  44.   val TIME_PERIOD_10s_30= "10s_30s"
  45.   val TIME_PERIOD_30s_60= "30s_60s"
  46.   val TIME_PERIOD_1m_3= "1m_3m"
  47.   val TIME_PERIOD_3m_10= "3m_10m"
  48.   val TIME_PERIOD_10m_30= "10m_30m"
  49.   val TIME_PERIOD_30= "30m"
  50.   val STEP_PERIOD_1_3 = "1_3"
  51.   val STEP_PERIOD_4_6 = "4_6"
  52.   val STEP_PERIOD_7_9 = "7_9"
  53.   val STEP_PERIOD_10_30 = "10_30"
  54.   val STEP_PERIOD_30_60 = "30_60"
  55.   val STEP_PERIOD_60 = "60"
  56.   /**
  57.     * task.params.json 中限制条件对应的常量字段
  58.     */
  59.   val TASK_PARAMS = "task.params.json"
  60.   val PARAM_START_DATE = "startDate"
  61.   val PARAM_END_DATE = "endDate"
  62.   val PARAM_START_AGE = "startAge"
  63.   val PARAM_END_AGE = "endAge"
  64.   val PARAM_PROFESSIONALS = "professionals"
  65.   val PARAM_CITIES = "cities"
  66.   val PARAM_SEX = "sex"
  67.   val PARAM_KEYWORDS = "keywords"
  68.   val PARAM_CATEGORY_IDS = "categoryIds"
  69.   val PARAM_TARGET_PAGE_FLOW = "targetPageFlow"
  70. }

DataModel.scala

  1. package commons.model
  2. //***************** 输入表 *********************
  3. /**
  4.   * 用户访问动做表
  5.   *
  6.   * @param date               用户点击行为的日期
  7.   * @param user_id            用户的 ID
  8.   * @param session_id         Session 的 ID
  9.   * @param page_id            某个页面的 ID
  10.   * @param action_time        点击行为的时间点
  11.   * @param search_keyword     用户搜索的关键词
  12.   * @param click_category_id  某一个商品品类的 ID
  13.   * @param click_product_id   某一个商品的 ID
  14.   * @param order_category_ids 一次订单中全部品类的 ID 集合
  15.   * @param order_product_ids  一次订单中全部商品的 ID 集合
  16.   * @param pay_category_ids   一次支付中全部品类的 ID 集合
  17.   * @param pay_product_ids    一次支付中全部商品的 ID 集合
  18.   * @param city_id            城市 ID
  19.   */
  20. case class UserVisitAction(dateString,
  21.                            user_id: Long,
  22.                            session_id: String,
  23.                            page_id: Long,
  24.                            action_timeString,
  25.                            search_keyword: String,
  26.                            click_category_id: Long,
  27.                            click_product_id: Long,
  28.                            order_category_ids: String,
  29.                            order_product_ids: String,
  30.                            pay_category_ids: String,
  31.                            pay_product_ids: String,
  32.                            city_id: Long)
  33. /**
  34.   * 用户信息表
  35.   *
  36.   * @param user_id      用户的 ID
  37.   * @param username     用户的名称
  38.   * @param name         用户的名字
  39.   * @param age          用户的年龄
  40.   * @param professional 用户的职业
  41.   * @param city         用户所在的城市
  42.   * @param sex          用户的性别
  43.   */
  44. case class UserInfo(user_id: Long,
  45.                     username: String,
  46.                     name: String,
  47.                     age: Int,
  48.                     professional: String,
  49.                     city: String,
  50.                     sex: String)
  51. /**
  52.   * 产品表
  53.   *
  54.   * @param product_id   商品的 ID
  55.   * @param product_name 商品的名称
  56.   * @param extend_info  商品额外的信息
  57.   */
  58. case class ProductInfo(product_id: Long,
  59.                        product_name: String,
  60.                        extend_info: String)

PooledMySqlClientFactory.scala

  1. package commons.pool
  2. import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
  3. import commons.conf.ConfigurationManager
  4. import commons.constant.Constants
  5. import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool, GenericObjectPoolConfig}
  6. import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}
  7. // 建立用于处理 MySQL 查询结果的类的抽象接口
  8. trait QueryCallback {
  9.   def process(rs: ResultSet)
  10. }
  11. /**
  12.   * MySQL 客户端代理对象
  13.   *
  14.   * @param jdbcUrl      MySQL URL
  15.   * @param jdbcUser     MySQL 用户
  16.   * @param jdbcPassword MySQL 密码
  17.   * @param client       默认客户端实现
  18.   */
  19. case class MySqlProxy(jdbcUrl: String, jdbcUser: String, jdbcPassword: String, client: Option[Connection] = None) {
  20.   // 获取客户端链接对象
  21.   private val mysqlClient = client getOrElse {
  22.     DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword)
  23.   }
  24.   /**
  25.     * 执行增删改 SQL 语句
  26.     *
  27.     * @param sql
  28.     * @param params
  29.     * @return 影响的行数
  30.     */
  31.   def executeUpdate(sql: String, params: Array[Any]): Int = {
  32.     var rtn = 0
  33.     var pstmt: PreparedStatement = null
  34.     try {
  35.       // 第一步:关闭自动提交
  36.       mysqlClient.setAutoCommit(false)
  37.       // 第二步:根据传入的 sql 语句建立 prepareStatement
  38.       pstmt = mysqlClient.prepareStatement(sql)
  39.       // 第三步:为 prepareStatement 中的每一个参数填写数值
  40.       if (params != null && params.length > 0) {
  41.         for (i <0 until params.length) {
  42.           pstmt.setObject(i + 1, params(i))
  43.         }
  44.       }
  45.       // 第四步:执行增删改操做
  46.       rtn = pstmt.executeUpdate()
  47.       // 第五步:手动提交
  48.       mysqlClient.commit()
  49.     } catch {
  50.       case e: Exception => e.printStackTrace
  51.     }
  52.     rtn
  53.   }
  54.   /**
  55.     * 执行查询 SQL 语句
  56.     *
  57.     * @param sql
  58.     * @param params
  59.     */
  60.   def executeQuery(sql: String, params: Array[Any], queryCallback: QueryCallback) {
  61.     var pstmt: PreparedStatement = null
  62.     var rs: ResultSet = null
  63.     try {
  64.       // 第一步:根据传入的 sql 语句建立 prepareStatement
  65.       pstmt = mysqlClient.prepareStatement(sql)
  66.       // 第二步:为 prepareStatement 中的每一个参数填写数值
  67.       if (params != null && params.length > 0) {
  68.         for (i <0 until params.length) {
  69.           pstmt.setObject(i + 1, params(i))
  70.         }
  71.       }
  72.       // 第三步:执行查询操做
  73.       rs = pstmt.executeQuery()
  74.       // 第四步:处理查询后的结果
  75.       queryCallback.process(rs)
  76.     } catch {
  77.       case e: Exception => e.printStackTrace
  78.     }
  79.   }
  80.   /**
  81.     * 批量执行 SQL 语句
  82.     *
  83.     * @param sql
  84.     * @param paramsList
  85.     * @return 每条SQL语句影响的行数
  86.     */
  87.   def executeBatch(sql: String, paramsList: Array[Array[Any]]): Array[Int] = {
  88.     var rtn: Array[Int] = null
  89.     var pstmt: PreparedStatement = null
  90.     try {
  91.       // 第一步:关闭自动提交
  92.       mysqlClient.setAutoCommit(false)
  93.       pstmt = mysqlClient.prepareStatement(sql)
  94.       // 第二步:为 prepareStatement 中的每一个参数填写数值
  95.       if (paramsList != null && paramsList.length > 0) {
  96.         for (params <- paramsList) {
  97.           for (i <0 until params.length) {
  98.             pstmt.setObject(i + 1, params(i))
  99.           }
  100.           pstmt.addBatch()
  101.         }
  102.       }
  103.       // 第三步:执行批量的 SQL 语句
  104.       rtn = pstmt.executeBatch()
  105.       // 第四步:手动提交
  106.       mysqlClient.commit()
  107.     } catch {
  108.       case e: Exception => e.printStackTrace
  109.     }
  110.     rtn
  111.   }
  112.   // 关闭 MySQL 客户端
  113.   def shutdown(): Unit = mysqlClient.close()
  114. }
  115. /**
  116.   * 扩展知识:将 MySqlProxy 实例视为对象,MySqlProxy 实例的建立使用对象池进行维护
  117.   *
  118.   * 建立自定义工厂类,继承 BasePooledObjectFactory 工厂类,负责对象的建立、包装和销毁
  119.   *
  120.   * @param jdbcUrl
  121.   * @param jdbcUser
  122.   * @param jdbcPassword
  123.   * @param client
  124.   */
  125. class PooledMySqlClientFactory(jdbcUrl: String, jdbcUser: String, jdbcPassword: String, client: Option[Connection] = None)
  126.   extends BasePooledObjectFactory[MySqlProxy] with Serializable {
  127.   // 用于池来建立对象
  128.   override def create(): MySqlProxy = MySqlProxy(jdbcUrl, jdbcUser, jdbcPassword, client)
  129.   // 用于池来包装对象
  130.   override def wrap(obj: MySqlProxy): PooledObject[MySqlProxy] = new DefaultPooledObject(obj)
  131.   // 用于池来销毁对象
  132.   override def destroyObject(p: PooledObject[MySqlProxy]): Unit = {
  133.     p.getObject.shutdown()
  134.     super.destroyObject(p)
  135.   }
  136. }
  137. /**
  138.   * 建立 MySQL 池工具类
  139.   */
  140. object CreateMySqlPool {
  141.   // 加载 JDBC 驱动,只须要一次
  142.   Class.forName("com.mysql.jdbc.Driver")
  143.   // 在 org.apache.commons.pool2.impl 中预设了三个能够直接使用的对象池:GenericObjectPool、GenericKeyedObjectPool 和 SoftReferenceObjectPool
  144.   // 建立 genericObjectPool 为 GenericObjectPool
  145.   // GenericObjectPool 的特色是能够设置对象池中的对象特征,包括 LIFO 方式、最大空闲数、最小空闲数、是否有效性检查等等
  146.   private var genericObjectPool: GenericObjectPool[MySqlProxy] = null
  147.   // 伴生对象经过 apply 完成对象的建立
  148.   def apply(): GenericObjectPool[MySqlProxy] = {
  149.     // 单例模式
  150.     if (this.genericObjectPool == null) {
  151.       this.synchronized {
  152.         // 获取 MySQL 配置参数
  153.         val jdbcUrl = ConfigurationManager.config.getString(Constants.JDBC_URL)
  154.         val jdbcUser = ConfigurationManager.config.getString(Constants.JDBC_USER)
  155.         val jdbcPassword = ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)
  156.         val size = ConfigurationManager.config.getInt(Constants.JDBC_DATASOURCE_SIZE)
  157.         val pooledFactory = new PooledMySqlClientFactory(jdbcUrl, jdbcUser, jdbcPassword)
  158.         val poolConfig = {
  159.           // 建立标准对象池配置类的实例
  160.           val c = new GenericObjectPoolConfig
  161.           // 设置配置对象参数
  162.           // 设置最大对象数
  163.           c.setMaxTotal(size)
  164.           // 设置最大空闲对象数
  165.           c.setMaxIdle(size)
  166.           c
  167.         }
  168.         // 对象池的建立须要工厂类和配置类
  169.         // 返回一个 GenericObjectPool 对象池
  170.         this.genericObjectPool = new GenericObjectPool[MySqlProxy](pooledFactory, poolConfig)
  171.       }
  172.     }
  173.     genericObjectPool
  174.   }
  175. }

Utils.scala

  1. package commons.utils
  2. import java.util.Date
  3. import net.sf.json.JSONObject
  4. import org.joda.time.DateTime
  5. import org.joda.time.format.DateTimeFormat
  6. import scala.collection.mutable
  7. /**
  8.   * 日期时间工具类
  9.   * 使用 joda 实现,若是使用 Java 提供的 Date 会存在线程安全问题
  10.   */
  11. object DateUtils {
  12.   val DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd")
  13.   val TIME_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
  14.   val DATE_KEY_FORMAT = DateTimeFormat.forPattern("yyyyMMdd")
  15.   val DATE_TIME_FORMAT = DateTimeFormat.forPattern("yyyyMMddHHmm")
  16.   /**
  17.     * 判断一个时间是否在另外一个时间以前
  18.     *
  19.     * @param time1 第一个时间
  20.     * @param time2 第二个时间
  21.     * @return 判断结果
  22.     */
  23.   def before(time1Stringtime2String): Boolean = {
  24.     if (TIME_FORMAT.parseDateTime(time1).isBefore(TIME_FORMAT.parseDateTime(time2))) {
  25.       return true
  26.     }
  27.     false
  28.   }
  29.   /**
  30.     * 判断一个时间是否在另外一个时间以后
  31.     *
  32.     * @param time1 第一个时间
  33.     * @param time2 第二个时间
  34.     * @return 判断结果
  35.     */
  36.   def after(time1Stringtime2String): Boolean = {
  37.     if (TIME_FORMAT.parseDateTime(time1).isAfter(TIME_FORMAT.parseDateTime(time2))) {
  38.       return true
  39.     }
  40.     false
  41.   }
  42.   /**
  43.     * 计算时间差值(单位为秒)
  44.     *
  45.     * @param time1 时间1
  46.     * @param time2 时间2
  47.     * @return 差值
  48.     */
  49.   def minus(time1Stringtime2String): Int = {
  50.     return (TIME_FORMAT.parseDateTime(time1).getMillis - TIME_FORMAT.parseDateTime(time2).getMillis) / 1000 toInt
  51.   }
  52.   /**
  53.     * 获取年月日和小时
  54.     *
  55.     * @param datetime 时间(yyyy-MM-dd HH:mm:ss)
  56.     * @return 结果(yyyy-MM-dd_HH)
  57.     */
  58.   def getDateHour(datetime: String): String = {
  59.     val date = datetime.split(" ")(0)
  60.     val hourMinuteSecond = datetime.split(" ")(1)
  61.     val hour = hourMinuteSecond.split(":")(0)
  62.     date + "_" + hour
  63.   }
  64.   /**
  65.     * 获取当天日期(yyyy-MM-dd)
  66.     *
  67.     * @return 当天日期
  68.     */
  69.   def getTodayDate(): String = {
  70.     DateTime.now().toString(DATE_FORMAT)
  71.   }
  72.   /**
  73.     * 获取昨天的日期(yyyy-MM-dd)
  74.     *
  75.     * @return 昨天的日期
  76.     */
  77.   def getYesterdayDate(): String = {
  78.     DateTime.now().minusDays(1).toString(DATE_FORMAT)
  79.   }
  80.   /**
  81.     * 格式化日期(yyyy-MM-dd)
  82.     *
  83.     * @param date Date对象
  84.     * @return 格式化后的日期
  85.     */
  86.   def formatDate(dateDate): String = {
  87.     new DateTime(date).toString(DATE_FORMAT)
  88.   }
  89.   /**
  90.     * 格式化时间(yyyy-MM-dd HH:mm:ss)
  91.     *
  92.     * @param date Date对象
  93.     * @return 格式化后的时间
  94.     */
  95.   def formatTime(dateDate): String = {
  96.     new DateTime(date).toString(TIME_FORMAT)
  97.   }
  98.   /**
  99.     * 解析时间字符串
  100.     *
  101.     * @param time 时间字符串
  102.     * @return Date
  103.     */
  104.   def parseTime(timeString): Date = {
  105.     TIME_FORMAT.parseDateTime(time).toDate
  106.   }
  107.   def main(args: Array[String]): Unit = {
  108.     print(DateUtils.parseTime("2017-10-31 20:27:53")) // Tue Oct 31 20:27:53 CST 2017
  109.   }
  110.   /**
  111.     * 格式化日期 key
  112.     * yyyyMMdd
  113.     *
  114.     * @param date
  115.     * @return
  116.     */
  117.   def formatDateKey(dateDate): String = {
  118.     new DateTime(date).toString(DATE_KEY_FORMAT)
  119.   }
  120.   /**
  121.     * 解析日期 key
  122.     *
  123.     * @return
  124.     */
  125.   def parseDateKey(datekey: String): Date = {
  126.     DATE_KEY_FORMAT.parseDateTime(datekey).toDate
  127.   }
  128.   /**
  129.     * 格式化时间,保留到分钟级别
  130.     * yyyyMMddHHmm
  131.     *
  132.     * @param date
  133.     * @return
  134.     */
  135.   def formatTimeMinute(dateDate): String = {
  136.     new DateTime(date).toString(DATE_TIME_FORMAT)
  137.   }
  138. }
  139. /**
  140.   * 数字格式化工具类
  141.   */
  142. object NumberUtils {
  143.   /**
  144.     * 格式化小数
  145.     *
  146.     * @param scale 四舍五入的位数
  147.     * @return 格式化小数
  148.     */
  149.   def formatDouble(num: Double, scale: Int): Double = {
  150.     val bd = BigDecimal(num)
  151.     bd.setScale(scale, BigDecimal.RoundingMode.HALF_UP).doubleValue()
  152.   }
  153. }
  154. /**
  155.   * 参数工具类
  156.   */
  157. object ParamUtils {
  158.   /**
  159.     * 从 JSON 对象中提取参数
  160.     *
  161.     * @param jsonObject JSON对象
  162.     * @return 参数
  163.     */
  164.   def getParam(jsonObject: JSONObject, field: String): String = {
  165.     jsonObject.getString(field)
  166.   }
  167. }
  168. /**
  169.   * 字符串工具类
  170.   *
  171.   */
  172. object StringUtils {
  173.   /**
  174.     * 判断字符串是否为空
  175.     *
  176.     * @param str 字符串
  177.     * @return 是否为空
  178.     */
  179.   def isEmpty(str: String): Boolean = {
  180.     str == null || "".equals(str)
  181.   }
  182.   /**
  183.     * 判断字符串是否不为空
  184.     *
  185.     * @param str 字符串
  186.     * @return 是否不为空
  187.     */
  188.   def isNotEmpty(str: String): Boolean = {
  189.     str != null && !"".equals(str)
  190.   }
  191.   /**
  192.     * 截断字符串两侧的逗号
  193.     *
  194.     * @param str 字符串
  195.     * @return 字符串
  196.     */
  197.   def trimComma(str: String): String = {
  198.     var result = ""
  199.     if (str.startsWith(",")) {
  200.       result = str.substring(1)
  201.     }
  202.     if (str.endsWith(",")) {
  203.       result = str.substring(0, str.length() - 1)
  204.     }
  205.     result
  206.   }
  207.   /**
  208.     * 补全两位数字
  209.     *
  210.     * @param str
  211.     * @return
  212.     */
  213.   def fulfuill(str: String): String = {
  214.     if (str.length() == 2) {
  215.       str
  216.     } else {
  217.       "0" + str
  218.     }
  219.   }
  220.   /**
  221.     * 从拼接的字符串中提取字段
  222.     *
  223.     * @param str       字符串
  224.     * @param delimiter 分隔符
  225.     * @param field     字段
  226.     * @return 字段值
  227.     */
  228.   def getFieldFromConcatString(str: StringdelimiterString, field: String): String = {
  229.     try {
  230.       val fields = str.split(delimiter);
  231.       for (concatField <- fields) {
  232.         if (concatField.split("=").length == 2) {
  233.           val fieldName = concatField.split("=")(0)
  234.           val fieldValue = concatField.split("=")(1)
  235.           if (fieldName.equals(field)) {
  236.             return fieldValue
  237.           }
  238.         }
  239.       }
  240.     } catch {
  241.       case e: Exception => e.printStackTrace()
  242.     }
  243.     null
  244.   }
  245.   /**
  246.     * 从拼接的字符串中给字段设置值
  247.     *
  248.     * @param str           字符串
  249.     * @param delimiter     分隔符
  250.     * @param field         字段名
  251.     * @param newFieldValue 新的field值
  252.     * @return 字段值
  253.     */
  254.   def setFieldInConcatString(str: StringdelimiterString, field: String, newFieldValue: String): String = {
  255.     val fieldsMap = new mutable.HashMap[StringString]()
  256.     for (fileds <- str.split(delimiter)) {
  257.       val arra = fileds.split("=")
  258.       if (arra(0).compareTo(field) == 0)
  259.         fieldsMap += (field -> newFieldValue)
  260.       else
  261.         fieldsMap += (arra(0) -> arra(1))
  262.     }
  263.     fieldsMap.map(item => item._1 + "=" + item._2).mkString(delimiter)
  264.   }
  265. }
  266. /**
  267.   * 校验工具类
  268.   */
  269. object ValidUtils {
  270.   /**
  271.     * 校验数据中的指定字段,是否在指定范围内(范围区间)
  272.     *
  273.     * @param data            数据
  274.     * @param dataField       数据字段
  275.     * @param parameter       参数
  276.     * @param startParamField 起始参数字段
  277.     * @param endParamField   结束参数字段
  278.     * @return 校验结果
  279.     */
  280.   def between(dataString, dataField: String, parameter: String, startParamField: String, endParamField: String): Boolean = {
  281.     val startParamFieldStr = StringUtils.getFieldFromConcatString(parameter, "\\|", startParamField)
  282.     val endParamFieldStr = StringUtils.getFieldFromConcatString(parameter, "\\|", endParamField)
  283.     if (startParamFieldStr == null || endParamFieldStr == null) {
  284.       return true
  285.     }
  286.     val startParamFieldValue = startParamFieldStr.toInt
  287.     val endParamFieldValue = endParamFieldStr.toInt
  288.     val dataFieldStr = StringUtils.getFieldFromConcatString(data"\\|", dataField)
  289.     if (dataFieldStr != null) {
  290.       val dataFieldValue = dataFieldStr.toInt
  291.       if (dataFieldValue >= startParamFieldValue && dataFieldValue <= endParamFieldValue) {
  292.         return true
  293.       } else {
  294.         return false
  295.       }
  296.     }
  297.     false
  298.   }
  299.   /**
  300.     * 校验数据中的指定字段,是否有值与参数字段的值相同(多选一)
  301.     *
  302.     * @param data       数据
  303.     * @param dataField  数据字段
  304.     * @param parameter  参数
  305.     * @param paramField 参数字段
  306.     * @return 校验结果
  307.     */
  308.   def in(dataString, dataField: String, parameter: String, paramField: String): Boolean = {
  309.     val paramFieldValue = StringUtils.getFieldFromConcatString(parameter, "\\|", paramField)
  310.     if (paramFieldValue == null) {
  311.       return true
  312.     }
  313.     val paramFieldValueSplited = paramFieldValue.split(",")
  314.     val dataFieldValue = StringUtils.getFieldFromConcatString(data"\\|", dataField)
  315.     if (dataFieldValue != null && dataFieldValue != "-1") {
  316.       val dataFieldValueSplited = dataFieldValue.split(",")
  317.       for (singleDataFieldValue <- dataFieldValueSplited) {
  318.         for (singleParamFieldValue <- paramFieldValueSplited) {
  319.           if (singleDataFieldValue.compareTo(singleParamFieldValue) == 0) {
  320.             return true
  321.           }
  322.         }
  323.       }
  324.     }
  325.     false
  326.   }
  327.   /**
  328.     * 校验数据中的指定字段,是否在指定范围内(二选一)
  329.     *
  330.     * @param data       数据
  331.     * @param dataField  数据字段
  332.     * @param parameter  参数
  333.     * @param paramField 参数字段
  334.     * @return 校验结果
  335.     */
  336.   def equal(dataString, dataField: String, parameter: String, paramField: String): Boolean = {
  337.     val paramFieldValue = StringUtils.getFieldFromConcatString(parameter, "\\|", paramField)
  338.     if (paramFieldValue == null) {
  339.       return true
  340.     }
  341.     val dataFieldValue = StringUtils.getFieldFromConcatString(data"\\|", dataField)
  342.     if (dataFieldValue != null) {
  343.       if (dataFieldValue.compareTo(paramFieldValue) == 0) {
  344.         return true
  345.       }
  346.     }
  347.     false
  348.   }
  349. }

commerce.properties

  1. # jbdc 配置
  2. jdbc.datasource.size=10
  3. jdbc.url=jdbc:mysql://localhost:3306/commerce?useUnicode=true&characterEncoding=utf8
  4. jdbc.user=root
  5. jdbc.password=root
  6. # 筛选条件的配置
  7. # 可使用的属性以下:
  8. #      startDate:       格式: yyyy-MM-DD   [必选]
  9. #      endDate:         格式: yyyy-MM-DD   [必选]
  10. #      startAge:        范围: 0 - 59
  11. #      endAge:          范围: 0 - 59
  12. #      professionals:   范围:professionals[0 - 99]
  13. #      cities:          0 - 99  ((0,"北京","华北"),(1,"上海","华东"),(2,"南京","华东"),(3,"广州","华南"),(4,"三亚","华南"),(5,"武汉","华中"),(6,"长沙","华中"),(7,"西安","西北"),(8,"成都","西南"),(9,"哈尔滨","东北"),...)
  14. #      sex:             范围: 0 - 1
  15. #      keywords:        范围: ("火锅""蛋糕""重庆辣子鸡""重庆小面""呷哺呷哺""新辣道鱼火锅""国贸大厦""太古商场""日本料理""温泉")
  16. #      categoryIds:     0 - 99,以逗号分隔
  17. #      targetPageFlow:  0 - 99, 以逗号分隔
  18. task.params.json={\
  19.   startDate:"2019-06-01", \
  20.   endDate:"2019-06-30", \
  21.   startAge: 20, \
  22.   endAge: 50, \
  23.   professionals: "",  \
  24.   cities: "", \
  25.   sex:"", \
  26.   keywords:"", \
  27.   categoryIds:"", \
  28.   targetPageFlow:"1,2,3,4,5,6,7"}
  29. # Kafka 配置
  30. kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092
  31. kafka.topics=AdRealTimeLog1

log4j.properties

  1. log4j.rootLogger=info, stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n
  5. log4j.appender.R=org.apache.log4j.RollingFileAppender
  6. log4j.appender.R.File=../log/agent.log
  7. log4j.appender.R.MaxFileSize=1024KB
  8. log4j.appender.R.MaxBackupIndex=1
  9. log4j.appender.R.layout=org.apache.log4j.PatternLayout
  10. log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%6L)  :  %m%n

4.3 analyse 模块(数据分析模块)

新建一个模块 maven 工程 analyse 做为子 maven 工程,删除掉 src 目录,引入依赖 pom.xml,添加对 scala 框架的支持。
注意:在该子模块中有不少子模块。即具体需求实现的模块。
pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <parent>
  6.         <artifactId>commerce</artifactId>
  7.         <groupId>com.atguigu</groupId>
  8.         <version>1.0-SNAPSHOT</version>
  9.     </parent>
  10.     <modelVersion>4.0.0</modelVersion>
  11.     <artifactId>analyse</artifactId>
  12. </project>

analyse 模块是需求的具体实现模块, 咱们将会在第 5 章中进行详细解析。

第5章 需求解析

5.1 需求一:Session 各范围访问步长、访问时长占比统计

5.1.1 需求解析

  需求一:要统计出符合筛选条件的 session 中,访问时长在 1s~3s、4s~6s、7s~9s、10s~30s、30s~60s、1m~3m、3m~10m、10m~30m、30m 以上各个范围内的 session 占比;访问步长在 1~三、4~六、7~九、10~30、30~60、60 以上各个范围内的 session 占比,并将结果保存到 MySQL 数据库中。
  在计算以前须要根据查询条件筛选 session,查询条件好比搜索过某些关键词的用户、访问时间在某个时间段内的用户、年龄在某个范围内的用户、职业在某个范围内的用户、所在某个城市的用户,发起的 session。找到对应的这些用户的 session,并进行统计,之因此须要有筛选主要是可让使用者,对感兴趣的和关系的用户群体,进行后续各类复杂业务逻辑的统计和分析,那么拿到的结果数据,就是只是针对特殊用户群体的分析结果;而不是对全部用户进行分析的泛泛的分析结果。好比说,如今某个企业高层,就是想看到用户群体中,28~35 岁的老师职业的群体,对应的一些统计和分析的结果数据,从而辅助高管进行公司战略上的决策制定。
  session 访问时长,也就是说一个 session 对应的开始的 action 到结束的 action 之间的时间范围;还有,就是访问步长,指的是,一个 session 执行期间内,依次点击过多少个页面,好比说,一次 session 维持了 1 分钟,那么访问时长就是 1m,而后在这 1 分钟内,点击了 10 个页面,那么 session 的访问步长,就是 10。
  好比说,符合第一步筛选出来的 session 的数量大概是有 1000 万个。那么里面,咱们要计算出,访问时长在 1s~3s 内的 session 的数量,并除以符合条件的总 session 数量(好比 1000 万),好比是 100 万/1000 万,那么 1s~3s 内的 session 占比就是 10%。依次类推,这里说的统计,就是这个意思。
  这个功能可让人从全局的角度看到,符合某些条件的用户群体使用咱们的产品的一些习惯。好比大多数人,究竟是会在产品中停留多长时间,大多数人,会在一次使用产品的过程当中,访问多少个页面。那么对于使用者来讲, 有一个全局和清晰的认识。

5.1.2 数据源解析

5.1.3 数据结构解析

一、UserVisitAction 样例类

  1. /**
  2.   * 用户访问动做表
  3.   *
  4.   * @param date               用户点击行为的日期
  5.   * @param user_id            用户的 ID
  6.   * @param session_id         Session 的 ID
  7.   * @param page_id            某个页面的 ID
  8.   * @param action_time        点击行为的时间点
  9.   * @param search_keyword     用户搜索的关键词
  10.   * @param click_category_id  某一个商品品类的 ID
  11.   * @param click_product_id   某一个商品的 ID
  12.   * @param order_category_ids 一次订单中全部品类的 ID 集合
  13.   * @param order_product_ids  一次订单中全部商品的 ID 集合
  14.   * @param pay_category_ids   一次支付中全部品类的 ID 集合
  15.   * @param pay_product_ids    一次支付中全部商品的 ID 集合
  16.   * @param city_id            城市 ID
  17.   */
  18. case class UserVisitAction(dateString,
  19.                            user_id: Long,
  20.                            session_id: String,
  21.                            page_id: Long,
  22.                            action_timeString,
  23.                            search_keyword: String,
  24.                            click_category_id: Long,
  25.                            click_product_id: Long,
  26.                            order_category_ids: String,
  27.                            order_product_ids: String,
  28.                            pay_category_ids: String,
  29.                            pay_product_ids: String,
  30.                            city_id: Long
  31.                           )

二、UserInfo 样例类

  1. /**
  2.   * 用户信息表
  3.   *
  4.   * @param user_id      用户的 ID
  5.   * @param username     用户的名称
  6.   * @param name         用户的名字
  7.   * @param age          用户的年龄
  8.   * @param professional 用户的职业
  9.   * @param city         用户所在的城市
  10.   * @param sex          用户的性别
  11.   */
  12. case class UserInfo(user_id: Long,
  13.                     username: String,
  14.                     name: String,
  15.                     age: Int,
  16.                     professional: String,
  17.                     city: String,
  18.                     sex: String
  19.                    )

为何联立用户表?
  用户表中记录了用户详细的我的信息,包括年龄、职业、城市、性别等,在实际的业务场景中,咱们可能会在一段时间关注某一个群体的用户的行为,好比在某一段时间关注北京的白领们的购物行为,那么咱们就能够经过联立用户表,让咱们的统计数据中具备用户属性,而后根据用户属性对统计信息进行过滤,将不属于咱们所关注的用户群体的用户所产生的行为数据过滤掉,这样就能够实现对指定人群的精准分析。

5.1.4 需求实现简要流程

5.1.5 需求实现详细流程

5.1.6 MySQL 存储结构解析

MySQL 写入数据格式
session_aggr_stat

  1. -- ----------------------------
  2. --  Table structure for `session_aggr_stat`
  3. -- ----------------------------
  4. DROP TABLE IF EXISTS `session_aggr_stat`;
  5. CREATE TABLE `session_aggr_stat` (
  6.   `taskid` varchar(255DEFAULT NULL,
  7.   `session_count` int(11DEFAULT NULL,
  8.   `visit_length_1s_3s_ratio` double DEFAULT NULL,
  9.   `visit_length_4s_6s_ratio` double DEFAULT NULL,
  10.   `visit_length_7s_9s_ratio` double DEFAULT NULL,
  11.   `visit_length_10s_30s_ratio` double DEFAULT NULL,
  12.   `visit_length_30s_60s_ratio` double DEFAULT NULL,
  13.   `visit_length_1m_3m_ratio` double DEFAULT NULL,
  14.   `visit_length_3m_10m_ratio` double DEFAULT NULL,
  15.   `visit_length_10m_30m_ratio` double DEFAULT NULL,
  16.   `visit_length_30m_ratio` double DEFAULT NULL,
  17.   `step_length_1_3_ratio` double DEFAULT NULL,
  18.   `step_length_4_6_ratio` double DEFAULT NULL,
  19.   `step_length_7_9_ratio` double DEFAULT NULL,
  20.   `step_length_10_30_ratio` double DEFAULT NULL,
  21.   `step_length_30_60_ratio` double DEFAULT NULL,
  22.   `step_length_60_ratio` double DEFAULT NULL,
  23.   KEY `idx_task_id` (`taskid`)
  24. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.1.7 代码解析

在模块 analyse 新建一个模块 session,引入 pom 文件,修改 src 目录名称为 scala,同时添加 scala 框架的支持。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <parent>
  6.         <artifactId>analyse</artifactId>
  7.         <groupId>com.atguigu</groupId>
  8.         <version>1.0-SNAPSHOT</version>
  9.     </parent>
  10.     <modelVersion>4.0.0</modelVersion>
  11.     <artifactId>session</artifactId>
  12.     <dependencies>
  13.         <dependency>
  14.             <groupId>com.atguigu</groupId>
  15.             <artifactId>commons</artifactId>
  16.             <version>1.0-SNAPSHOT</version>
  17.         </dependency>
  18.         <!-- Spark 的依赖引入 -->
  19.         <dependency>
  20.             <groupId>org.apache.spark</groupId>
  21.             <artifactId>spark-core_2.11</artifactId>
  22.         </dependency>
  23.         <dependency>
  24.             <groupId>org.apache.spark</groupId>
  25.             <artifactId>spark-sql_2.11</artifactId>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>org.apache.spark</groupId>
  29.             <artifactId>spark-hive_2.11</artifactId>
  30.         </dependency>
  31.         <!-- 引入 Scala -->
  32.         <dependency>
  33.             <groupId>org.scala-lang</groupId>
  34.             <artifactId>scala-library</artifactId>
  35.         </dependency>
  36.     </dependencies>
  37.     <build>
  38.         <plugins>
  39.             <plugin>
  40.                 <!-- scala-maven-plugin 插件用于在任意的 maven 项目中对 scala 代码进行编译/测试/运行/文档化 -->
  41.                 <groupId>net.alchim31.maven</groupId>
  42.                 <artifactId>scala-maven-plugin</artifactId>
  43.             </plugin>
  44.             <plugin>
  45.                 <groupId>org.apache.maven.plugins</groupId>
  46.                 <artifactId>maven-assembly-plugin</artifactId>
  47.                 <configuration>
  48.                     <archive>
  49.                         <manifest>
  50.                             <mainClass>com.atguigu.session.UserVisitSessionAnalyze</mainClass>
  51.                         </manifest>
  52.                     </archive>
  53.                     <descriptorRefs>
  54.                         <descriptorRef>jar-with-dependencies</descriptorRef>
  55.                     </descriptorRefs>
  56.                 </configuration>
  57.             </plugin>
  58.         </plugins>
  59.     </build>
  60. </project>

代码实现示例以下:
SessionStat.scala

  1. package com.atguigu.session
  2. import java.util.{Date, UUID}
  3. import commons.conf.ConfigurationManager
  4. import commons.constant.Constants
  5. import commons.model.{UserInfo, UserVisitAction}
  6. import commons.utils._
  7. import net.sf.json.JSONObject
  8. import org.apache.spark.SparkConf
  9. import org.apache.spark.rdd.RDD
  10. import org.apache.spark.sql.{SaveMode, SparkSession}
  11. import scala.collection.mutable
  12. object SessionStat {
  13.   def main(args: Array[String]): Unit = {
  14.     // 获取过滤条件,【为了方便,直接从配置文件中获取,企业中会从一个调度平台获取】
  15.     val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
  16.     // 获取过滤条件对应的 JsonObject 对象
  17.     val taskParam = JSONObject.fromObject(jsonStr)
  18.     // 建立全局惟一的主键,每次执行 main 函数都会生成一个独一无二的 taskUUID,来区分不一样任务,做为写入 MySQL 数据库中那张表的主键
  19.     val taskUUID = UUID.randomUUID().toString
  20.     // 建立 sparkConf
  21.     val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")
  22.     // 建立 sparkSession(包含 sparkContext)
  23.     val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
  24.     // 获取原始的动做表数据(带有过滤条件)
  25.     // actionRDD: RDD[UserVisitAction]
  26.     val actionRDD = getOriActionRDD(sparkSession, taskParam)
  27.     // 将用户行为信息转换为 K-V 结构,sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
  28.     val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))
  29.     // session2GroupActionRDD: RDD[(sessionId, Iterable[UserVisitAction])]
  30.     val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() // 把同一个 sessionId 的数据聚合到一块儿,获得斧子形数据
  31.     // 将数据进行内存缓存
  32.     session2GroupActionRDD.cache()
  33.     // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)]
  34.     val sessionId2FullAggrInfoRDD = getSessionFullAggrInfo(sparkSession, session2GroupActionRDD)
  35.     // 建立自定义累加器对象
  36.     val sessionStatisticAccumulator = new SessionStatisticAccumulator
  37.     // 在 sparkSession 中注册自定义累加器,这样后面就能够用了
  38.     sparkSession.sparkContext.register(sessionStatisticAccumulator)
  39.     // 根据过滤条件对 sessionId2FullAggrInfoRDD 进行过滤操做,即过滤掉不符合条件的数据,并根据自定义累加器 统计不一样范围的 访问时长 和 访问步长 的 session 个数 以及 总的 session 个数
  40.     val seeionId2FilterRDD = getSessionFilterRDD(taskParam, sessionId2FullAggrInfoRDD, sessionStatisticAccumulator)
  41.     // 必须引入任意一个 action 的算子,才能启动
  42.     seeionId2FilterRDD.foreach(println(_))
  43.     // 计算各个 session 的占比
  44.     getSessionRatio(sparkSession,taskUUID, sessionStatisticAccumulator.value)
  45.   }
  46.   def getSessionRatio(sparkSession: SparkSession, taskUUID: Stringvalue: mutable.HashMap[String, Int]): Unit = {
  47.     val session_count = value.getOrElse(Constants.SESSION_COUNT1).toDouble
  48.     // 先获取各个值
  49.     val visit_length_1s_3= value.getOrElse(Constants.TIME_PERIOD_1s_3s, 0)
  50.     val visit_length_4s_6= value.getOrElse(Constants.TIME_PERIOD_4s_6s, 0)
  51.     val visit_length_7s_9= value.getOrElse(Constants.TIME_PERIOD_7s_9s, 0)
  52.     val visit_length_10s_30= value.getOrElse(Constants.TIME_PERIOD_10s_30s, 0)
  53.     val visit_length_30s_60= value.getOrElse(Constants.TIME_PERIOD_30s_60s, 0)
  54.     val visit_length_1m_3= value.getOrElse(Constants.TIME_PERIOD_1m_3m, 0)
  55.     val visit_length_3m_10= value.getOrElse(Constants.TIME_PERIOD_3m_10m, 0)
  56.     val visit_length_10m_30= value.getOrElse(Constants.TIME_PERIOD_10m_30m, 0)
  57.     val visit_length_30= value.getOrElse(Constants.TIME_PERIOD_30m, 0)
  58.     val step_length_1_3 = value.getOrElse(Constants.STEP_PERIOD_1_30)
  59.     val step_length_4_6 = value.getOrElse(Constants.STEP_PERIOD_4_60)
  60.     val step_length_7_9 = value.getOrElse(Constants.STEP_PERIOD_7_90)
  61.     val step_length_10_30 = value.getOrElse(Constants.STEP_PERIOD_10_300)
  62.     val step_length_30_60 = value.getOrElse(Constants.STEP_PERIOD_30_600)
  63.     val step_length_60 = value.getOrElse(Constants.STEP_PERIOD_600)
  64.     // 计算比例
  65.     val visit_length_1s_3s_ratio = NumberUtils.formatDouble(visit_length_1s_3/ session_count2)
  66.     val visit_length_4s_6s_ratio = NumberUtils.formatDouble(visit_length_4s_6/ session_count2)
  67.     val visit_length_7s_9s_ratio = NumberUtils.formatDouble(visit_length_7s_9/ session_count2)
  68.     val visit_length_10s_30s_ratio = NumberUtils.formatDouble(visit_length_10s_30/ session_count2)
  69.     val visit_length_30s_60s_ratio = NumberUtils.formatDouble(visit_length_30s_60/ session_count2)
  70.     val visit_length_1m_3m_ratio = NumberUtils.formatDouble(visit_length_1m_3/ session_count2)
  71.     val visit_length_3m_10m_ratio = NumberUtils.formatDouble(visit_length_3m_10/ session_count2)
  72.     val visit_length_10m_30m_ratio = NumberUtils.formatDouble(visit_length_10m_30/ session_count2)
  73.     val visit_length_30m_ratio = NumberUtils.formatDouble(visit_length_30/ session_count2)
  74.     val step_length_1_3_ratio = NumberUtils.formatDouble(step_length_1_3 / session_count2)
  75.     val step_length_4_6_ratio = NumberUtils.formatDouble(step_length_4_6 / session_count2)
  76.     val step_length_7_9_ratio = NumberUtils.formatDouble(step_length_7_9 / session_count2)
  77.     val step_length_10_30_ratio = NumberUtils.formatDouble(step_length_10_30 / session_count2)
  78.     val step_length_30_60_ratio = NumberUtils.formatDouble(step_length_30_60 / session_count2)
  79.     val step_length_60_ratio = NumberUtils.formatDouble(step_length_60 / session_count2)
  80.     // 封装数据
  81.     val stat = SessionAggrStat(taskUUID, session_count.toInt,
  82.       visit_length_1s_3s_ratio, visit_length_4s_6s_ratio, visit_length_7s_9s_ratio,
  83.       visit_length_10s_30s_ratio, visit_length_30s_60s_ratio, visit_length_1m_3m_ratio,
  84.       visit_length_3m_10m_ratio, visit_length_10m_30m_ratio, visit_length_30m_ratio,
  85.       step_length_1_3_ratio, step_length_4_6_ratio, step_length_7_9_ratio,
  86.       step_length_10_30_ratio, step_length_30_60_ratio, step_length_60_ratio)
  87.     // 样例类实例 -> 数组 -> RDD
  88.     val sessionRatioRDD = sparkSession.sparkContext.makeRDD(Array(stat))
  89.     // 写入 MySQL 数据库中
  90.     import sparkSession.implicits._
  91.     sessionRatioRDD.toDF().write
  92.       .format("jdbc")
  93.       .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
  94.       .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
  95.       .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
  96.       .option("dbtable""session_aggr_stat")
  97.       .mode(SaveMode.Append) // 表存在就追加,表不存在就新建
  98.       .save()
  99.   }
  100.   def calculateVisitLength(visitLength: Long, sessionStatisticAccumulator: SessionStatisticAccumulator) = {
  101.     if (visitLength >= 1 && visitLength <= 3) {
  102.       sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1s_3s)
  103.     } else if (visitLength >= 4 && visitLength <= 6) {
  104.       sessionStatisticAccumulator.add(Constants.TIME_PERIOD_4s_6s)
  105.     } else if (visitLength >= 7 && visitLength <= 9) {
  106.       sessionStatisticAccumulator.add(Constants.TIME_PERIOD_7s_9s)
  107.     } else if (visitLength >= 10 && visitLength <= 30) {
  108.       sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10s_30s)
  109.     } else if (visitLength > 30 && visitLength <= 60) {
  110.       sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30s_60s)
  111.     } else if (visitLength > 60 && visitLength <= 180) {
  112.       sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1m_3m)
  113.     } else if (visitLength > 180 && visitLength <= 600) {
  114.       sessionStatisticAccumulator.add(Constants.TIME_PERIOD_3m_10m)
  115.     } else if (visitLength > 600 && visitLength <= 1800) {
  116.       sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10m_30m)
  117.     } else if (visitLength > 1800) {
  118.       sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30m)
  119.     }
  120.   }
  121.   def calculateStepLength(stepLength: Long, sessionStatisticAccumulator: SessionStatisticAccumulator) = {
  122.     if (stepLength >= 1 && stepLength <= 3) {
  123.       sessionStatisticAccumulator.add(Constants.STEP_PERIOD_1_3)
  124.     } else if (stepLength >= 4 && stepLength <= 6) {
  125.       sessionStatisticAccumulator.add(Constants.STEP_PERIOD_4_6)
  126.     } else if (stepLength >= 7 && stepLength <= 9) {
  127.       sessionStatisticAccumulator.add(Constants.STEP_PERIOD_7_9)
  128.     } else if (stepLength >= 10 && stepLength <= 30) {
  129.       sessionStatisticAccumulator.add(Constants.STEP_PERIOD_10_30)
  130.     } else if (stepLength > 30 && stepLength <= 60) {
  131.       sessionStatisticAccumulator.add(Constants.STEP_PERIOD_30_60)
  132.     } else if (stepLength > 60) {
  133.       sessionStatisticAccumulator.add(Constants.STEP_PERIOD_60)
  134.     }
  135.   }
  136.   def getSessionFilterRDD(taskParam: JSONObject,
  137.                           sessionId2FullAggrInfoRDD: RDD[(StringString)],
  138.                           sessionStatisticAccumulator: SessionStatisticAccumulator) = {
  139.     // 先获取所用到的过滤条件:
  140.     val startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE)
  141.     val endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE)
  142.     val professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS)
  143.     val cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES)
  144.     val sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX)
  145.     val keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS)
  146.     val categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS)
  147.     // 拼装过滤条件的字符串:
  148.     var filterInfo = (if (startAge != null) Constants.PARAM_START_AGE + "=" + startAge + "|" else ""+
  149.       (if (endAge != null) Constants.PARAM_END_AGE + "=" + endAge + "|" else ""+
  150.       (if (professionals != null) Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" else ""+
  151.       (if (cities != null) Constants.PARAM_CITIES + "=" + cities + "|" else ""+
  152.       (if (sex != null) Constants.PARAM_SEX + "=" + sex + "|" else ""+
  153.       (if (keywords != null) Constants.PARAM_KEYWORDS + "=" + keywords + "|" else ""+
  154.       (if (categoryIds != null) Constants.PARAM_CATEGORY_IDS + "=" + categoryIds else "")
  155.     // 去除过滤条件字符串末尾的 "|"
  156.     if (filterInfo.endsWith("\\|"))
  157.       filterInfo = filterInfo.substring(0, filterInfo.length - 1)
  158.     // 进行过滤操做(过滤自带遍历功能)
  159.     sessionId2FullAggrInfoRDD.filter {
  160.       case (sessionId, fullAggrInfo) =>
  161.         var success = true
  162.         // 若是 age 不在过滤条件范围以内,则当前 sessionId 对应的 fullAggrInfo 数据被过滤掉
  163.         if (!ValidUtils.between(fullAggrInfo, Constants.FIELD_AGE, filterInfo, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) { // 范围用 between
  164.           success = false
  165.         } else if (!ValidUtils.in(fullAggrInfo, Constants.FIELD_PROFESSIONAL, filterInfo, Constants.PARAM_PROFESSIONALS)) {
  166.           success = false
  167.         } else if (!ValidUtils.in(fullAggrInfo, Constants.FIELD_CITY, filterInfo, Constants.PARAM_CITIES)) {
  168.           success = false
  169.         } else if (!ValidUtils.equal(fullAggrInfo, Constants.FIELD_SEX, filterInfo, Constants.PARAM_SEX)) { // 二选一用 equal
  170.           success = false
  171.         } else if (!ValidUtils.in(fullAggrInfo, Constants.FIELD_SEARCH_KEYWORDS, filterInfo, Constants.PARAM_KEYWORDS)) { // 多选一用 in
  172.           success = false
  173.         } else if (!ValidUtils.in(fullAggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS, filterInfo, Constants.PARAM_CATEGORY_IDS)) {
  174.           success = false
  175.         }
  176.         // 自定义累加器,统计不一样范围的 访问时长 和 访问步长 的个数 以及 总的 session 个数
  177.         if (success) {
  178.           sessionStatisticAccumulator.add(Constants.SESSION_COUNT// 总的 session 个数
  179.           // 获取当前 sessionId 对应的 访问时长 和 访问步长
  180.           val visitLength = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_VISIT_LENGTH).toLong
  181.           val stepLength = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_STEP_LENGTH).toLong
  182.           // 统计不一样范围的 访问时长 和 访问步长 的个数
  183.           calculateVisitLength(visitLength, sessionStatisticAccumulator)
  184.           calculateStepLength(stepLength, sessionStatisticAccumulator)
  185.         }
  186.         success
  187.     }
  188.   }
  189.   def getSessionFullAggrInfo(sparkSession: SparkSession,
  190.                              session2GroupActionRDD: RDD[(String, Iterable[UserVisitAction])]) = {
  191.     // userId2PartAggrInfoRDD: RDD[(userId, partAggrInfo)]
  192.     val userId2PartAggrInfoRDD = session2GroupActionRDD.map {
  193.       // 使用模式匹配:当结果是 KV 对的时候尽可能使用 case 模式匹配,这样更清楚,更简洁直观
  194.       case (sessionId, iterableAction) =>
  195.         var userId = -1L
  196.         var startTime: Date = null
  197.         var endTime: Date = null
  198.         var stepLength = 0 // 有多少个 action
  199.         val searchKeywords = new StringBuffer(""// 搜索行为
  200.         val clickCategories = new StringBuffer(""// 点击行为
  201.         for (action <- iterableAction) {
  202.           if (userId == -1) {
  203.             userId = action.user_id
  204.           }
  205.           val actionTime = DateUtils.parseTime(action.action_time// action_time = "2019-05-30 18:17:11" 是字符串类型
  206.           if (startTime == null || startTime.after(actionTime)) { // startTime 在 actionTime 的后面   正常区间:[startTime, actionTime, endTime]
  207.             startTime = actionTime
  208.           }
  209.           if (endTime == null || endTime.before(actionTime)) { // endTime 在 actionTime 的前面
  210.             endTime = actionTime
  211.           }
  212.           val searchKeyword = action.search_keyword
  213.           if (StringUtils.isNotEmpty(searchKeyword) && !searchKeywords.toString.contains(searchKeyword)) {
  214.             searchKeywords.append(searchKeyword + ",")
  215.           }
  216.           val clickCategoryId = action.click_category_id
  217.           if (clickCategoryId != -1 && !clickCategories.toString.contains(clickCategoryId)) {
  218.             clickCategories.append(clickCategoryId + ",")
  219.           }
  220.           stepLength += 1
  221.         }
  222.         // searchKeywords.toString.substring(0, searchKeywords.toString.length - 1// 等价于下面
  223.         val searchKw = StringUtils.trimComma(searchKeywords.toString) // 去除最后一个逗号
  224.         val clickCg = StringUtils.trimComma(clickCategories.toString) // 去除最后一个逗号
  225.         val visitLength = (endTime.getTime - startTime.getTime) / 1000
  226.         // 拼装聚合数据的字符串:
  227.         // (31,sessionid=7291cc307f96432f8da9d926fd7d88e5|searchKeywords=洗面奶,小龙虾,机器学习,苹果,华为手机|clickCategoryIds=11,93,36,66,
  228.         // 60|visitLength=3461|stepLength=43|startTime=2019-05-30 14:01:01)
  229.         val partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionId + "|" +
  230.           Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKw + "|" +
  231.           Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCg + "|" +
  232.           Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" +
  233.           Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" +
  234.           Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime) // 格式化时间为字符串类型
  235.         (userId, partAggrInfo)
  236.     }
  237.     // user_visit_action 表联立 user_info 表,让咱们的统计数据中具备用户属性
  238.     val sql = "select * from user_info"
  239.     import sparkSession.implicits._
  240.     // userId2InfoRDD: RDD[(userId, UserInfo)]
  241.     val userId2InfoRDD = sparkSession.sql(sql).as[UserInfo].rdd.map(item => (item.user_id, item))
  242.     val sessionId2FullAggrInfoRDD = userId2PartAggrInfoRDD.join(userId2InfoRDD).map {
  243.       case (userId, (partAggrInfo, userInfo)) =>
  244.         val age = userInfo.age
  245.         val professional = userInfo.professional
  246.         val sex = userInfo.sex
  247.         val city = userInfo.city
  248.         // 拼装最终的聚合数据字符串:
  249.         val fullAggrInfo = partAggrInfo + "|" +
  250.           Constants.FIELD_AGE + "=" + age + "|" +
  251.           Constants.FIELD_PROFESSIONAL + "=" + professional + "|" +
  252.           Constants.FIELD_SEX + "=" + sex + "|" +
  253.           Constants.FIELD_CITY + "=" + city
  254.         val seesionId = StringUtils.getFieldFromConcatString(partAggrInfo, "\\|", Constants.FIELD_SESSION_ID)
  255.         (seesionId, fullAggrInfo)
  256.     }
  257.     sessionId2FullAggrInfoRDD
  258.   }
  259.   def getOriActionRDD(sparkSession: SparkSession, taskParam: JSONObject) = {
  260.     // 先获取所用到的过滤条件:开始日期 和 结束日期
  261.     val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)
  262.     val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)
  263.     // 把全部的时间范围在 startDate 和 endDate 之间的数据查询出来
  264.     val sql = "select * from user_visit_action where date>='" + startDate + "' and date<='" + endDate + "'"
  265.     // 在对 DataFrame 和 Dataset 进行许多操做都须要这个包进行支持
  266.     import sparkSession.implicits._
  267.     sparkSession.sql(sql).as[UserVisitAction].rdd // DataFrame(Row类型) -> DataSet(样例类类型) -> rdd(样例类)
  268.   }
  269. }

自定义累加器 SessionStatisticAccumulator 代码以下:

  1. package com.atguigu.session
  2. import org.apache.spark.util.AccumulatorV2
  3. import scala.collection.mutable
  4. /**
  5.   * 自定义累加器
  6.   */
  7. class SessionStatisticAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]]() {
  8.   // 自定义累加器:要求要在类的里面维护一个 mutable.HashMap 结构
  9.   val countMap = new mutable.HashMap[String, Int]()
  10.   // 判断累加器是否为空
  11.   override def isZero: Boolean = {
  12.     this.countMap.isEmpty
  13.   }
  14.   // 复制一个如出一辙的累加器
  15.   override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {
  16.     val acc = new SessionStatisticAccumulator
  17.     acc.countMap ++= this.countMap // 将两个 Map 拼接在一块儿
  18.     acc
  19.   }
  20.   // 重置累加器
  21.   override def reset(): Unit = {
  22.     this.countMap.clear()
  23.   }
  24.   // 向累加器中添加 KV 对(K 存在,V 累加1,K 不存在,从新建立)
  25.   override def add(k: String): Unit = {
  26.     if (!this.countMap.contains(k)) {
  27.       this.countMap += (k -> 0)
  28.     }
  29.     this.countMap.update(k, this.countMap(k) + 1)
  30.   }
  31.   // 两个累加器进行合并(先判断两个累加器是不是同一类型的,再将两个 Map 进行合并(是个小难点))
  32.   override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {
  33.     other match {
  34.       // (1 : 100).foldLeft(0) 等价于 (0 : (1 to 100))(_+_)  又等价于 { case (int1, int2=> int1 + int2 }
  35.       // acc.countMap.foldLeft(this.countMap) 等价于 this.countMap : acc.countMap  又等价于 this.countMap 和 acc.countMap 的每个 KV 作操做
  36.       case acc: SessionStatisticAccumulator => acc.countMap.foldLeft(this.countMap) {
  37.         case (map, (k, v)) => map += (k -> (map.getOrElse(k, 0+ v))
  38.       }
  39.     }
  40.   }
  41.   override def value: mutable.HashMap[String, Int] = {
  42.     this.countMap
  43.   }
  44. }

数据模型代码以下:

  1. package com.atguigu.session
  2. //***************** 输出表 *********************
  3. /**
  4.   * Session 聚合统计表
  5.   *
  6.   * @param taskid                     当前计算批次的 ID
  7.   * @param session_count              全部 Session 的总和
  8.   * @param visit_length_1s_3s_ratio   1-3s Session 访问时长占比
  9.   * @param visit_length_4s_6s_ratio   4-6s Session 访问时长占比
  10.   * @param visit_length_7s_9s_ratio   7-9s Session 访问时长占比
  11.   * @param visit_length_10s_30s_ratio 10-30s Session 访问时长占比
  12.   * @param visit_length_30s_60s_ratio 30-60s Session 访问时长占比
  13.   * @param visit_length_1m_3m_ratio   1-3m Session 访问时长占比
  14.   * @param visit_length_3m_10m_ratio  3-10m Session 访问时长占比
  15.   * @param visit_length_10m_30m_ratio 10-30m Session 访问时长占比
  16.   * @param visit_length_30m_ratio     30m Session 访问时长占比
  17.   * @param step_length_1_3_ratio      1-3 步长占比
  18.   * @param step_length_4_6_ratio      4-6 步长占比
  19.   * @param step_length_7_9_ratio      7-9 步长占比
  20.   * @param step_length_10_30_ratio    10-30 步长占比
  21.   * @param step_length_30_60_ratio    30-60 步长占比
  22.   * @param step_length_60_ratio       大于 60 步长占比
  23.   */
  24. case class SessionAggrStat(taskid: String,
  25.                            session_count: Long,
  26.                            visit_length_1s_3s_ratio: Double,
  27.                            visit_length_4s_6s_ratio: Double,
  28.                            visit_length_7s_9s_ratio: Double,
  29.                            visit_length_10s_30s_ratio: Double,
  30.                            visit_length_30s_60s_ratio: Double,
  31.                            visit_length_1m_3m_ratio: Double,
  32.                            visit_length_3m_10m_ratio: Double,
  33.                            visit_length_10m_30m_ratio: Double,
  34.                            visit_length_30m_ratio: Double,
  35.                            step_length_1_3_ratio: Double,
  36.                            step_length_4_6_ratio: Double,
  37.                            step_length_7_9_ratio: Double,
  38.                            step_length_10_30_ratio: Double,
  39.                            step_length_30_60_ratio: Double,
  40.                            step_length_60_ratio: Double)
  41. /**
  42.   * Session 随机抽取表
  43.   *
  44.   * @param taskid           当前计算批次的 ID
  45.   * @param sessionid        抽取的 Session 的 ID
  46.   * @param startTime        Session 的开始时间
  47.   * @param searchKeywords   Session 的查询字段
  48.   * @param clickCategoryIds Session 点击的类别 id 集合
  49.   */
  50. case class SessionRandomExtract(taskid: String,
  51.                                 sessionid: String,
  52.                                 startTime: String,
  53.                                 searchKeywords: String,
  54.                                 clickCategoryIds: String)
  55. /**
  56.   * Session 随机抽取详细表
  57.   *
  58.   * @param taskid           当前计算批次的 ID
  59.   * @param userid           用户的 ID
  60.   * @param sessionid        Session的 ID
  61.   * @param pageid           某个页面的 ID
  62.   * @param actionTime       点击行为的时间点
  63.   * @param searchKeyword    用户搜索的关键词
  64.   * @param clickCategoryId  某一个商品品类的 ID
  65.   * @param clickProductId   某一个商品的 ID
  66.   * @param orderCategoryIds 一次订单中全部品类的 ID 集合
  67.   * @param orderProductIds  一次订单中全部商品的 ID 集合
  68.   * @param payCategoryIds   一次支付中全部品类的 ID 集合
  69.   * @param payProductIds    一次支付中全部商品的 ID 集合
  70.   **/
  71. case class SessionDetail(taskid: String,
  72.                          userid: Long,
  73.                          sessionid: String,
  74.                          pageid: Long,
  75.                          actionTime: String,
  76.                          searchKeyword: String,
  77.                          clickCategoryId: Long,
  78.                          clickProductId: Long,
  79.                          orderCategoryIds: String,
  80.                          orderProductIds: String,
  81.                          payCategoryIds: String,
  82.                          payProductIds: String)
  83. /**
  84.   * 品类 Top10 表
  85.   *
  86.   * @param taskid
  87.   * @param categoryid
  88.   * @param clickCount
  89.   * @param orderCount
  90.   * @param payCount
  91.   */
  92. case class Top10Category(taskid: String,
  93.                          categoryid: Long,
  94.                          clickCount: Long,
  95.                          orderCount: Long,
  96.                          payCount: Long)
  97. /**
  98.   * Top10 Session
  99.   *
  100.   * @param taskid
  101.   * @param categoryid
  102.   * @param sessionid
  103.   * @param clickCount
  104.   */
  105. case class Top10Session(taskid: String,
  106.                         categoryid: Long,
  107.                         sessionid: String,
  108.                         clickCount: Long)

5.1.8 需求一实现思路整理

5.2 需求二:Session 随机抽取

5.2.1 需求解析

  在符合条件的 session 中,按照时间比例随机抽取 1000 个 session。
  这个按照时间比例是什么意思呢?随机抽取自己是很简单的,可是按照时间比例,就很复杂了。好比说,这一天总共有 1000 万的 session。那么我如今总共要从这 1000 万 session 中,随机抽取出来 1000 个 session。可是这个随机不是那么简单的。须要作到以下几点要求:首先,若是这一天的 12:00~13:00 的 session 数量是 100万,那么这个小时的 session 占比就是 1/10,那么这个小时中的 100 万的 session,咱们就要抽取 1/10 * 1000 = 100 个。即从这个小时的 100 万 session 中,随机抽取出 100 个 session。以此类推,其余小时的抽取也是这样作。
  这个功能的做用是说,可让使用者,可以对于符合条件的 session,按照时间比例均匀的随机采样出 1000 个 session,而后观察每一个 session 具体的点击流/行为,   好比先进入了首页、而后点击了食品品类、而后点击了雨润火腿肠商品、而后搜索了火腿肠罐头的关键词、接着对王中王火腿肠下了订单、最后对订单作了支付。
  之因此要作到按时间比例随机采用抽取,就是要作到,观察样本的公平性。
  抽取完毕以后,须要将 Session 的相关信息和详细信息保存到 MySQL 数据库中。

5.2.2 数据源解析

  本需求的数据源来自于需求一中获取的 Session 聚合数据(fullAggrInfo)。

5.2.3 数据结构解析

SessionRandomExtract 样例类

  1. /**
  2.   * Session 随机抽取表
  3.   *
  4.   * @param taskid           当前计算批次的 ID
  5.   * @param sessionid        抽取的 Session 的 ID
  6.   * @param startTime        Session 的开始时间
  7.   * @param searchKeywords   Session 的查询字段
  8.   * @param clickCategoryIds Session 点击的类别 id 集合
  9.   */
  10. case class SessionRandomExtract(taskid: String,
  11.                                 sessionid: String,
  12.                                 startTime: String,
  13.                                 searchKeywords: String,
  14.                                 clickCategoryIds: String)

5.2.4 需求实现简要流程

5.2.5 需求实现详细流程

5.2.6 MySQL 存储结构解析

MySQL 写入数据格式
session_detail

  1. -- ----------------------------
  2. --  Table structure for `session_detail`
  3. -- ----------------------------
  4. DROP TABLE IF EXISTS `session_detail`; 
  5. CREATE TABLE `session_detail` (
  6.   `taskid` varchar(255DEFAULT NULL,
  7.   `userid` int(11DEFAULT NULL,
  8.   `sessionid` varchar(255DEFAULT NULL,
  9.   `pageid` int(11DEFAULT NULL,
  10.   `actionTime` varchar(255DEFAULT NULL,
  11.   `searchKeyword` varchar(255DEFAULT NULL,
  12.   `clickCategoryId` int(11DEFAULT NULL,
  13.   `clickProductId` int(11DEFAULT NULL,
  14.   `orderCategoryIds` varchar(255DEFAULT NULL,
  15.   `orderProductIds` varchar(255DEFAULT NULL,
  16.   `payCategoryIds` varchar(255DEFAULT NULL,
  17.   `payProductIds` varchar(255DEFAULT NULLKEY `idx_task_id` (`taskid`),
  18.   KEY `idx_session_id` (`sessionid`)
  19. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

session_random_extract

  1. -- ----------------------------
  2. --  Table structure for `session_random_extract`
  3. -- ----------------------------
  4. DROP TABLE IF EXISTS `session_random_extract`; 
  5. CREATE TABLE `session_random_extract` (
  6.   `taskid` varchar(255DEFAULT NULL,
  7.   `sessionid` varchar(255DEFAULT NULL,
  8.   `startTime` varchar(50DEFAULT NULL,
  9.   `searchKeywords` varchar(255DEFAULT NULL,
  10.   `clickCategoryIds` varchar(255DEFAULT NULL
  11.   KEY `idx_task_id` (`taskid`)
  12. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.2.7 代码解析

  1. package com.atguigu.session
  2. import java.util.{DateRandom, UUID}
  3. import commons.conf.ConfigurationManager
  4. import commons.constant.Constants
  5. import commons.model.{UserInfo, UserVisitAction}
  6. import commons.utils._
  7. import net.sf.json.JSONObject
  8. import org.apache.spark.SparkConf
  9. import org.apache.spark.rdd.RDD
  10. import org.apache.spark.sql.{SaveMode, SparkSession}
  11. import scala.collection.mutable
  12. import scala.collection.mutable.{ArrayBuffer, ListBuffer}
  13. object SessionStat {
  14.   def main(args: Array[String]): Unit = {
  15.     // 获取过滤条件,【为了方便,直接从配置文件中获取,企业中会从一个调度平台获取】
  16.     val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
  17.     // 获取过滤条件对应的 JsonObject 对象
  18.     val taskParam = JSONObject.fromObject(jsonStr)
  19.     // 建立全局惟一的主键,每次执行 main 函数都会生成一个独一无二的 taskUUID,来区分不一样任务,做为写入 MySQL 数据库中那张表的主键
  20.     val taskUUID = UUID.randomUUID().toString
  21.     // 建立 sparkConf
  22.     val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")
  23.     // 建立 sparkSession(包含 sparkContext)
  24.     val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
  25.     // ******************** 需求一:Session 各范围访问步长、访问时长占比统计 ********************
  26.     // 获取原始的动做表数据(带有过滤条件)
  27.     // actionRDD: RDD[UserVisitAction]
  28.     val actionRDD = getOriActionRDD(sparkSession, taskParam)
  29.     // 将用户行为信息转换为 K-V 结构,sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
  30.     val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))
  31.     // session2GroupActionRDD: RDD[(sessionId, Iterable[UserVisitAction])]
  32.     val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() // 把同一个 sessionId 的数据聚合到一块儿,获得斧子形数据
  33.     // 将数据进行内存缓存
  34.     session2GroupActionRDD.cache()
  35.     // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)]
  36.     val sessionId2FullAggrInfoRDD = getSessionFullAggrInfo(sparkSession, session2GroupActionRDD)
  37.     // 建立自定义累加器对象
  38.     val sessionStatisticAccumulator = new SessionStatisticAccumulator
  39.     // 在 sparkSession 中注册自定义累加器,这样后面就能够用了
  40.     sparkSession.sparkContext.register(sessionStatisticAccumulator)
  41.     // 根据过滤条件对 sessionId2FullAggrInfoRDD 进行过滤操做,即过滤掉不符合条件的数据,并根据自定义累加器 统计不一样范围的 访问时长 和 访问步长 的 session 个数 以及 总的 session 个数
  42.     val seeionId2FilterRDD = getSessionFilterRDD(taskParam, sessionId2FullAggrInfoRDD, sessionStatisticAccumulator)
  43.     // 必须引入任意一个 action 的算子,才能启动
  44.     seeionId2FilterRDD.foreach(println(_))
  45.     // 计算各个 session 的占比
  46.     getSessionRatio(sparkSession, taskUUID, sessionStatisticAccumulator.value)
  47.     // ******************** 需求二:Session 随机抽取 ********************
  48.     // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)],注意:到这里一个 sessionId 对应一条数据,也就是一个 fullAggrInfo
  49.     sessionRandomExtract(sparkSession, taskUUID, seeionId2FilterRDD)
  50.   }
  51.   // ******************** 需求二:Session 随机抽取 ********************
  52.   /**
  53.     * Session 随机抽取
  54.     *
  55.     * @param sparkSession
  56.     * @param taskUUID
  57.     * @param seeionId2FilterRDD
  58.     */
  59.   def sessionRandomExtract(sparkSession: SparkSession, taskUUID: String, seeionId2FilterRDD: RDD[(StringString)]): Unit = {
  60.     // 因为是按照 时间 为 key 进行聚合,因此先将 seeionId2FilterRDD 的 key 转化为 时间
  61.     // dateHour2FullAggrInfoRDD: RDD[(dateHour, fullAggrInfo)]
  62.     val dateHour2FullAggrInfoRDD = seeionId2FilterRDD.map {
  63.       case (sessionId, fullAggrInfo) =>
  64.         // 先从 fullAggrInfo 中提取出来 startTime
  65.         val startTime = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_START_TIME)
  66.         // 获得的 startTime = "2019-05-30 18:17:11" 是字符串类型,须要转换成咱们须要的格式:yyyy-MM-dd_HH
  67.         val dateHour = DateUtils.getDateHour(startTime)
  68.         (dateHour, fullAggrInfo)
  69.     }
  70.     // hourCountMap: Map[(dateHour, count)],示例:(yyyy-MM-dd_HH, 20)
  71.     val hourCountMap = dateHour2FullAggrInfoRDD.countByKey()
  72.     // dateHourCountMap: Map[data, Map[(hour, count)]],示例:(yyyy-MM-dd, (HH, 20))
  73.     val dateHourCountMap = new mutable.HashMap[String, mutable.HashMap[String, Long]]()
  74.     for ((dateHour, count<- hourCountMap) {
  75.       val date = dateHour.split("_")(0// yyyy-MM-dd_HH
  76.       val hour = dateHour.split("_")(1// HH
  77.       dateHourCountMap.get(date) match { // Map[(hour, count)
  78.         case None =>
  79.           dateHourCountMap(date= new mutable.HashMap[String, Long]() // 先建立 1 个空的 HashMap
  80.           dateHourCountMap(date+= (hour -> count// 再给 HashMap 赋值
  81.         case Some(map) =>
  82.           dateHourCountMap(date+= (hour -> count// 直接给 HashMap 赋值
  83.       }
  84.     }
  85.     // 解决问题一:
  86.     //   一共有多少天:dateHourCountMap.size
  87.     //   一天抽取多少条:1000 / dateHourCountMap.size
  88.     val extractPerDay = 1000 / dateHourCountMap.size
  89.     // 解决问题二:
  90.     //   一共有多少个:session:dateHourCountMap(date).values.sum
  91.     //   一个小时有多少个:session:dateHourCountMap(date)(hour)
  92.     val dateHourExtractIndexListMap = new mutable.HashMap[String, mutable.HashMap[String, ListBuffer[Int]]]()
  93.     // dateHourCountMap: Map[data, Map[(hour, count)]],示例:(yyyy-MM-dd, (HH, 20))
  94.     // hourCountMap: Map[(hour, count)],示例:(HH, 20) ,注意:这里面的 hourCountMap 含义发生变化了,要跟上面的最开始的 hourCountMap 区别开来
  95.     for ((date, hourCountMap) <- dateHourCountMap) {
  96.       // 一天共有多少个 session
  97.       val dataCount = hourCountMap.values.sum
  98.       dateHourExtractIndexListMap.get(date) match {
  99.         case None =>
  100.           dateHourExtractIndexListMap(date= new mutable.HashMap[String, mutable.ListBuffer[Int]]()
  101.           generateRandomIndexList(extractPerDay, dataCount, hourCountMap, dateHourExtractIndexListMap(date))
  102.         case Some(map) =>
  103.           generateRandomIndexList(extractPerDay, dataCount, hourCountMap, dateHourExtractIndexListMap(date))
  104.       }
  105.     }
  106.     // 到此为止,咱们得到了每一个小时要抽取的 session 的 index
  107.     // 以后在算子中使用 dateHourExtractIndexListMap 这个 Map,因为这个 Map 可能会很大,因此涉及到 广播大变量 的问题
  108.     // 广播大变量,提高任务 task 的性能
  109.     val dateHourExtractIndexListMapBroadcastVar = sparkSession.sparkContext.broadcast(dateHourExtractIndexListMap)
  110.     // dateHour2FullAggrInfoRDD: RDD[(dateHour, fullAggrInfo)]
  111.     // dateHour2GroupRDD: RDD[(dateHour, Iterable[fullAggrInfo])]
  112.     val dateHour2GroupRDD = dateHour2FullAggrInfoRDD.groupByKey()
  113.     // extractSessionRDD: RDD[SessionRandomExtract]
  114.     val extractSessionRDD = dateHour2GroupRDD.flatMap {
  115.       case (dateHour, iterableFullAggrInfo) =>
  116.         val date = dateHour.split("_")(0)
  117.         val hour = dateHour.split("_")(1)
  118.         val extractIndexList = dateHourExtractIndexListMapBroadcastVar.value.get(date).get(hour)
  119.         // 建立一个容器存储抽取的 session
  120.         val extractSessionArrayBuffer = new ArrayBuffer[SessionRandomExtract]()
  121.         var index = 0
  122.         for (fullAggrInfo <- iterableFullAggrInfo) {
  123.           if (extractIndexList.contains(index)) {
  124.             // 提取数据,封装成所须要的样例类,并追加进 ArrayBuffer 中
  125.             val sessionId = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_SESSION_ID)
  126.             val startTime = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_START_TIME)
  127.             val searchKeywords = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS)
  128.             val clickCategoryIds = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS)
  129.             val sessionRandomExtract = SessionRandomExtract(taskUUID, sessionId, startTime, searchKeywords, clickCategoryIds)
  130.             extractSessionArrayBuffer += sessionRandomExtract
  131.           }
  132.           index += 1
  133.         }
  134.         extractSessionArrayBuffer
  135.     }
  136.     // 将抽取后的数据保存到 MySQL
  137.     import sparkSession.implicits._
  138.     extractSessionRDD.toDF().write
  139.       .format("jdbc")
  140.       .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
  141.       .option("dbtable""session_random_extract")
  142.       .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
  143.       .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
  144.       .mode(SaveMode.Append)
  145.       .save()
  146.   }
  147.   /**
  148.     * 根据每一个小时应该抽取的数量,来产生随机值
  149.     *
  150.     * @param extractPerDay 一天抽取的 seesion 个数
  151.     * @param dataCount     当天全部的 seesion 总数
  152.     * @param hourCountMap  每一个小时的session总数
  153.     * @param hourListMap   主要用来存放生成的随机值
  154.     */
  155.   def generateRandomIndexList(extractPerDay: Long,
  156.                               dataCount: Long,
  157.                               hourCountMap: mutable.HashMap[String, Long],
  158.                               hourListMap: mutable.HashMap[String, mutable.ListBuffer[Int]]): Unit = {
  159.     // 先遍历 hourCountMap,hourCountMap: Map[(hour, count)],示例:(HH, 20) ,注意:这里面的 hourCountMap 含义发生变化了,要跟上面的最开始的 hourCountMap 区别开来
  160.     for ((hour, count<- hourCountMap) {
  161.       // 计算一个小时抽取多少个 session
  162.       var hourExtractCount = ((count / dataCount.toDouble) * extractPerDay).toInt
  163.       // 避免一个小时要抽取的数量超过这个小时的总数
  164.       if (hourExtractCount > count) {
  165.         hourExtractCount = count.toInt
  166.       }
  167.       val random = new Random()
  168.       hourListMap.get(hour) match {
  169.         case None =>
  170.           hourListMap(hour) = new mutable.ListBuffer[Int] // 没有 List,须要新建一个 List
  171.           for (i <0 until hourExtractCount) {
  172.             var index = random.nextInt(count.toInt) // 生成 index
  173.             while (hourListMap(hour).contains(index)) { // 若是 index 已存在
  174.               index = random.nextInt(count.toInt) // 则从新生成 index
  175.             }
  176.             // 将生成的 index 放入到 hourListMap 中
  177.             hourListMap(hour).append(index)
  178.           }
  179.         case Some(list) =>
  180.           for (i <0 until hourExtractCount) {
  181.             var index = random.nextInt(count.toInt) // 生成 index
  182.             while (hourListMap(hour).contains(index)) { // 若是 index 已存在
  183.               index = random.nextInt(count.toInt) // 则从新生成 index
  184.             }
  185.             // 将生成的 index 放入到 hourListMap 中
  186.             hourListMap(hour).append(index)
  187.           }
  188.       }
  189.     }
  190.   }
  191.   // ******************** 需求一:Session 各范围访问步长、访问时长占比统计 ********************
  192.   // ......
  193. }

5.2.8 需求二实现思路整理

5.3 需求三:Top10 热门品类统计

5.3.1 需求解析

  在符合条件的 session 中,获取点击、下单和支付数量排名前 10 的品类。
  数据中的每一个 session 可能都会对一些品类的商品进行点击、下单和支付等等行为,那么如今就须要获取这些 session 点击、下单和支付数量排名前 10 的最热门的品类。也就是说,要计算出全部这些 session 对各个品类的点击、下单和支付的次数, 而后按照这三个属性进行排序,获取前 10 个品类。
  这个功能很重要,可让咱们明白,符合条件的用户,他最感兴趣的商品是什么种类。这个可让公司里的人,清晰地了解到不一样层次、不一样类型的用户的心理和喜爱。
  计算完成以后,将数据保存到 MySQL 数据库中。

5.3.2 数据源解析

  1. sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
  2. seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]
  3. seeionId2ActionFilterRDD: RDD[(sessionId, UserVisitAction)]

5.3.3 数据结构解析

  1. /**
  2.   * 品类 Top10 表
  3.   *
  4.   * @param taskid
  5.   * @param categoryid
  6.   * @param clickCount
  7.   * @param orderCount
  8.   * @param payCount
  9.   */
  10. case class Top10Category(taskid: String,
  11.                          categoryid: Long,
  12.                          clickCount: Long,
  13.                          orderCount: Long,
  14.                          payCount: Long)

5.3.4 需求实现简要流程

5.3.5 需求实现详细流程

5.3.6 MySQL 存储结构解析

  1. -- ----------------------------
  2. --  Table structure for `top10_category`
  3. -- ----------------------------
  4. DROP TABLE IF EXISTS `top10_category`;
  5. CREATE TABLE `top10_category` (
  6.   `taskid` varchar(255DEFAULT NULL,
  7.   `categoryid` int(11DEFAULT NULL,
  8.   `clickCount` int(11DEFAULT NULL,
  9.   `orderCount` int(11DEFAULT NULL,
  10.   `payCount` int(11DEFAULT NULL,
  11.   KEY `idx_task_id` (`taskid`)
  12. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.3.7 代码解析

  1. package com.atguigu.session
  2. import java.util.{DateRandom, UUID}
  3. import commons.conf.ConfigurationManager
  4. import commons.constant.Constants
  5. import commons.model.{UserInfo, UserVisitAction}
  6. import commons.utils._
  7. import net.sf.json.JSONObject
  8. import org.apache.spark.SparkConf
  9. import org.apache.spark.rdd.RDD
  10. import org.apache.spark.sql.{SaveMode, SparkSession}
  11. import scala.collection.mutable
  12. import scala.collection.mutable.{ArrayBuffer, ListBuffer}
  13. object SessionStat {
  14.   def main(args: Array[String]): Unit = {
  15.     // 获取过滤条件,【为了方便,直接从配置文件中获取,企业中会从一个调度平台获取】
  16.     val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
  17.     // 获取过滤条件对应的 JsonObject 对象
  18.     val taskParam = JSONObject.fromObject(jsonStr)
  19.     // 建立全局惟一的主键,每次执行 main 函数都会生成一个独一无二的 taskUUID,来区分不一样任务,做为写入 MySQL 数据库中那张表的主键
  20.     val taskUUID = UUID.randomUUID().toString
  21.     // 建立 sparkConf
  22.     val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")
  23.     // 建立 sparkSession(包含 sparkContext)
  24.     val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
  25.     // ******************** 需求一:Session 各范围访问步长、访问时长占比统计 ********************
  26.     // 获取原始的动做表数据(带有过滤条件)
  27.     // actionRDD: RDD[UserVisitAction]
  28.     val actionRDD = getOriActionRDD(sparkSession, taskParam)
  29.     // 将用户行为信息转换为 K-V 结构
  30.     // sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
  31.     val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))
  32.     // session2GroupActionRDD: RDD[(sessionId, Iterable[UserVisitAction])]
  33.     val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() // 把同一个 sessionId 的数据聚合到一块儿,获得斧子形数据
  34.     // 将数据进行内存缓存
  35.     session2GroupActionRDD.cache()
  36.     // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)]
  37.     val sessionId2FullAggrInfoRDD = getSessionFullAggrInfo(sparkSession, session2GroupActionRDD)
  38.     // 建立自定义累加器对象
  39.     val sessionStatisticAccumulator = new SessionStatisticAccumulator
  40.     // 在 sparkSession 中注册自定义累加器,这样后面就能够用了
  41.     sparkSession.sparkContext.register(sessionStatisticAccumulator)
  42.     // 根据过滤条件对 sessionId2FullAggrInfoRDD 进行过滤操做,即过滤掉不符合条件的数据,并根据自定义累加器 统计不一样范围的 访问时长 和 访问步长 的 session 个数 以及 总的 session 个数
  43.     // seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]
  44.     val seeionId2FilterRDD = getSessionFilterRDD(taskParam, sessionId2FullAggrInfoRDD, sessionStatisticAccumulator)
  45.     // 必须引入任意一个 action 的算子,才能启动
  46.     seeionId2FilterRDD.foreach(println(_))
  47.     // 计算各个 session 的占比
  48.     getSessionRatio(sparkSession, taskUUID, sessionStatisticAccumulator.value)
  49.     // ******************** 需求二:Session 随机抽取 ********************
  50.     // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)],注意:到这里一个 sessionId 对应一条数据,也就是一个 fullAggrInfo
  51.     sessionRandomExtract(sparkSession, taskUUID, seeionId2FilterRDD)
  52.     // ******************** 需求三:Top10 热门品类统计 ********************
  53.     // sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
  54.     // seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]
  55.     // join 默认是内链接,即不符合条件的不显示(即被过滤掉)
  56.     // 获取全部符合过滤条件的原始的 UserVisitAction 数据
  57.     val seeionId2ActionFilterRDD = sessionId2ActionRDD.join(seeionId2FilterRDD).map {
  58.       case (sessionId, (userVisitAction, fullAggrInfo)) =>
  59.         (sessionId, userVisitAction)
  60.     }
  61.     val top10CategoryArray = top10PopularCategories(sparkSession, taskUUID, seeionId2ActionFilterRDD)
  62.   }
  63.   /**
  64.     * Top10 热门品类统计
  65.     *
  66.     * @param sparkSession
  67.     * @param taskUUID
  68.     * @param seeionId2ActionFilterRDD 全部符合过滤条件的原始的 UserVisitAction 数据
  69.     */
  70.   def top10PopularCategories(sparkSession: SparkSession, taskUUID: String, seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)]) = {
  71.     // 第一步:获取全部发生过点击、下单、付款的 categoryId,注意:其中被点击的 categoryId 只有一个,被下单和被付款的 categoryId 有多个,categoryId 之间使用逗号隔开的
  72.     var cid2CidRDD = seeionId2ActionFilterRDD.flatMap {
  73.       case (sessionId, userVisitAction) =>
  74.         val categoryIdBuffer = new ArrayBuffer[(Long, Long)]()
  75.         // 提取出数据填充 ArrayBuffer
  76.         if (userVisitAction.click_category_id != -1) { // 点击行为
  77.           categoryIdBuffer += ((userVisitAction.click_category_id, userVisitAction.click_category_id)) // 只有第一个 key 有用,第二个 value 任何值均可以,可是不能够没有
  78.         } else if (userVisitAction.order_category_ids != null) { // 下单行为
  79.           for (order_category_id <- userVisitAction.order_category_ids.split(",")) {
  80.             categoryIdBuffer += ((order_category_id.toLong, order_category_id.toLong))
  81.           }
  82.         } else if (userVisitAction.pay_category_ids != null) { // 付款行为
  83.           for (pay_category_id <- userVisitAction.pay_category_ids.split(",")) {
  84.             categoryIdBuffer += ((pay_category_id.toLong, pay_category_id.toLong))
  85.           }
  86.         }
  87.         categoryIdBuffer
  88.     }
  89.     // 第二步:进行去重操做
  90.     cid2CidRDD = cid2CidRDD.distinct()
  91.     // 第三步:统计各品类 被点击的次数、被下单的次数、被付款的次数
  92.     val cid2ClickCountRDD = getClickCount(seeionId2ActionFilterRDD)
  93.     val cid2OrderCountRDD = getOrderCount(seeionId2ActionFilterRDD)
  94.     val cid2PayCountRDD = getPayCount(seeionId2ActionFilterRDD)
  95.     // 第四步:获取各个 categoryId 的点击次数、下单次数、付款次数,并进行拼装
  96.     // cid2FullCountRDD: RDD[(cid, aggrCountInfo)]
  97.     // (81,categoryId=81|clickCount=68|orderCount=64|payCount=72)
  98.     val cid2FullCountRDD = getFullCount(cid2CidRDD, cid2ClickCountRDD, cid2OrderCountRDD, cid2PayCountRDD)
  99.     // 第五步:根据点击次数、下单次数、付款次数依次排序,会用到 【二次排序】,实现自定义的二次排序的 key
  100.     // 第六步:封装 SortKey
  101.     val sortKey2FullCountRDD = cid2FullCountRDD.map {
  102.       case (cid, fullCountInfo) =>
  103.         val clickCount = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_CLICK_COUNT).toLong
  104.         val orderCount = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_ORDER_COUNT).toLong
  105.         val payCount = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_PAY_COUNT).toLong
  106.         val sortKey = SortKey(clickCount, orderCount, payCount)
  107.         (sortKey, fullCountInfo)
  108.     }
  109.     // 第七步:降序排序,取出 top10 热门品类
  110.     val top10CategoryArray = sortKey2FullCountRDD.sortByKey(false).take(10)
  111.     // 第八步:将 Array 结构转化为 RDD,封装 Top10Category
  112.     val top10CategoryRDD = sparkSession.sparkContext.makeRDD(top10CategoryArray).map {
  113.       case (sortKey, fullCountInfo) =>
  114.         val categoryid = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLong
  115.         val clickCount = sortKey.clickCount
  116.         val orderCount = sortKey.orderCount
  117.         val payCount = sortKey.payCount
  118.         Top10Category(taskUUID, categoryid, clickCount, orderCount, payCount)
  119.     }
  120.     // 第九步:写入 MySQL 数据库
  121.     import sparkSession.implicits._
  122.     top10CategoryRDD.toDF().write
  123.       .format("jdbc")
  124.       .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
  125.       .option("dbtable""top10_category")
  126.       .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
  127.       .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
  128.       .mode(SaveMode.Append)
  129.       .save()
  130.     top10CategoryArray
  131.   }
  132.   /**
  133.     *
  134.     * @param cid2CidRDD
  135.     * @param cid2ClickCountRDD
  136.     * @param cid2OrderCountRDD
  137.     * @param cid2PayCountRDD
  138.     * @return
  139.     */
  140.   def getFullCount(cid2CidRDD: RDD[(Long, Long)],
  141.                    cid2ClickCountRDD: RDD[(Long, Long)],
  142.                    cid2OrderCountRDD: RDD[(Long, Long)],
  143.                    cid2PayCountRDD: RDD[(Long, Long)]) = {
  144.     // 左外链接:不符合添加显示为空(null
  145.     // 4.1 全部品类id 和 被点击的品类 作左外链接
  146.     val cid2ClickInfoRDD = cid2CidRDD.leftOuterJoin(cid2ClickCountRDD).map {
  147.       case (cid, (categoryId, option)) =>
  148.         val clickCount = if (option.isDefined) option.get else 0
  149.         val aggrCountInfo = Constants.FIELD_CATEGORY_ID + "=" + cid + "|" + Constants.FIELD_CLICK_COUNT + "=" + clickCount
  150.         (cid, aggrCountInfo)
  151.     }
  152.     // 4.2 4.1 的结果 和 被下单的品类 作左外链接
  153.     val cid2OrderInfoRDD = cid2ClickInfoRDD.leftOuterJoin(cid2OrderCountRDD).map {
  154.       case (cid, (clickInfo, option)) =>
  155.         val orderCount = if (option.isDefined) option.get else 0
  156.         val aggrCountInfo = clickInfo + "|" + Constants.FIELD_ORDER_COUNT + "=" + orderCount
  157.         (cid, aggrCountInfo)
  158.     }
  159.     // 4.3 4.2 的结果 和 被付款的品类 作左外链接
  160.     val cid2PayInfoRDD = cid2OrderInfoRDD.leftOuterJoin(cid2PayCountRDD).map {
  161.       case (cid, (orderInfo, option)) =>
  162.         val payCount = if (option.isDefined) option.get else 0
  163.         val aggrCountInfo = orderInfo + "|" + Constants.FIELD_PAY_COUNT + "=" + payCount
  164.         (cid, aggrCountInfo)
  165.     }
  166.     cid2PayInfoRDD
  167.   }
  168.   /**
  169.     * 统计各品类被点击的次数
  170.     *
  171.     * @param seeionId2ActionFilterRDD
  172.     */
  173.   def getClickCount(seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)]) = {
  174.     // 方式一:把发生过点击的 action 过滤出来
  175.     val clickActionFilterRDD = seeionId2ActionFilterRDD.filter {
  176.       case (sessionId, userVisitAction) =>
  177.         userVisitAction.click_category_id != 1L
  178.     }
  179.     // 方式二:把发生点击的 action 过滤出来,两者等价
  180.     // val clickActionFilterRDD2 = seeionId2ActionFilterRDD.filter(item => item._2.click_category_id != -1L)
  181.     // 获取每种类别的点击次数
  182.     val clickNumRDD = clickActionFilterRDD.map {
  183.       case (sessionId, userVisitAction) =>
  184.         (userVisitAction.click_category_id, 1L)
  185.     }
  186.     // 计算各个品类的点击次数
  187.     clickNumRDD.reduceByKey(_ + _)
  188.   }
  189.   /**
  190.     * 统计各品类被下单的次数
  191.     *
  192.     * @param seeionId2ActionFilterRDD
  193.     */
  194.   def getOrderCount(seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)]) = {
  195.     // 把发生过下单的 action 过滤出来
  196.     val orderActionFilterRDD = seeionId2ActionFilterRDD.filter {
  197.       case (sessionId, userVisitAction) =>
  198.         userVisitAction.order_category_ids != null
  199.     }
  200.     // 获取每种类别的下单次数
  201.     val orderNumRDD = orderActionFilterRDD.flatMap {
  202.       case (sessionId, userVisitAction) =>
  203.         userVisitAction.order_category_ids.split(",").map(item => (item.toLong, 1L))
  204.     }
  205.     // 计算各个品类的下单次数
  206.     orderNumRDD.reduceByKey(_ + _)
  207.   }
  208.   /**
  209.     * 统计各品类被付款的次数
  210.     *
  211.     * @param seeionId2ActionFilterRDD
  212.     */
  213.   def getPayCount(seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)]) = {
  214.     // 把发生过付款的 action 过滤出来
  215.     val payActionFilterRDD = seeionId2ActionFilterRDD.filter {
  216.       case (sessionId, userVisitAction) =>
  217.         userVisitAction.pay_category_ids != null
  218.     }
  219.     // 获取每种类别的支付次数
  220.     val payNumRDD = payActionFilterRDD.flatMap {
  221.       case (sessionId, userVisitAction) =>
  222.         userVisitAction.pay_category_ids.split(",").map(item => (item.toLong, 1L))
  223.     }
  224.     // 计算各个品类的支付次数
  225.     payNumRDD.reduceByKey(_ + _)
  226.   }
  227.   // ******************** 需求二:Session 随机抽取 ********************
  228.   // ******************** 需求一:Session 各范围访问步长、访问时长占比统计 ********************
  229. }

5.3.8 需求三实现思路整理

5.4 需求四:Top10 热门品类的 Top10 活跃 Session 统计

5.4.1 需求解析

  对于排名前 10 的品类,分别获取其点击次数排名前 10 的 session。
  这个就是说,对于 top10 的品类,每个都要获取对它点击次数排名前 10 的 session。
  这个功能,可让咱们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的 session 的行为。
  计算完成以后,将数据保存到 MySQL 数据库中。

5.4.2 数据源解析

  1. seeionId2ActionFilterRDD: RDD[(sessionId, UserVisitAction)]
  2. top10CategoryArray: Array[(sortKey, fullCountInfo)]

5.4.3 数据结构解析

  1. /**
  2.   * Top10 Session
  3.   *
  4.   * @param taskid
  5.   * @param categoryid
  6.   * @param sessionid
  7.   * @param clickCount
  8.   */
  9. case class Top10Session(taskid: String,
  10.                         categoryid: Long,
  11.                         sessionid: String,
  12.                         clickCount: Long)
  13. /**
  14.   * Session 随机抽取详细表
  15.   *
  16.   * @param taskid           当前计算批次的 ID
  17.   * @param userid           用户的 ID
  18.   * @param sessionid        Session 的 ID
  19.   * @param pageid           某个页面的 ID
  20.   * @param actionTime       点击行为的时间点
  21.   * @param searchKeyword    用户搜索的关键词
  22.   * @param clickCategoryId  某一个商品品类的 ID
  23.   * @param clickProductId   某一个商品的 ID
  24.   * @param orderCategoryIds 一次订单中全部品类的 ID 集合
  25.   * @param orderProductIds  一次订单中全部商品的 ID 集合
  26.   * @param payCategoryIds   一次支付中全部品类的 ID 集合
  27.   * @param payProductIds    一次支付中全部商品的 ID 集合
  28.   **/
  29. case class SessionDetail(taskid: String,
  30.                          userid: Long,
  31.                          sessionid: String,
  32.                          pageid: Long,
  33.                          actionTime: String,
  34.                          searchKeyword: String,
  35.                          clickCategoryId: Long,
  36.                          clickProductId: Long,
  37.                          orderCategoryIds: String,
  38.                          orderProductIds: String,
  39.                          payCategoryIds: String,
  40.                          payProductIds: String)

5.4.4 需求实现简要流程

5.4.5 需求实现详细流程

5.4.6 MySQL 存储结构解析

  1. -- ----------------------------
  2. --  Table structure for `top10_session`
  3. -- ----------------------------
  4. DROP TABLE IF EXISTS `top10_session`;
  5. CREATE TABLE `top10_session` (
  6.   `taskid` varchar(255DEFAULT NULL,
  7.   `categoryid` int(11DEFAULT NULL,
  8.   `sessionid` varchar(255DEFAULT NULL,
  9.   `clickCount` int(11DEFAULT NULLKEY `idx_task_id` (`taskid`)
  10. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.4.7 代码解析

  1. package com.atguigu.session
  2. import java.util.{DateRandom, UUID}
  3. import commons.conf.ConfigurationManager
  4. import commons.constant.Constants
  5. import commons.model.{UserInfo, UserVisitAction}
  6. import commons.utils._
  7. import net.sf.json.JSONObject
  8. import org.apache.spark.SparkConf
  9. import org.apache.spark.rdd.RDD
  10. import org.apache.spark.sql.{SaveMode, SparkSession}
  11. import scala.collection.mutable
  12. import scala.collection.mutable.{ArrayBuffer, ListBuffer}
  13. object SessionStat {
  14.   def main(args: Array[String]): Unit = {
  15.     // 获取过滤条件,【为了方便,直接从配置文件中获取,企业中会从一个调度平台获取】
  16.     val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
  17.     // 获取过滤条件对应的 JsonObject 对象
  18.     val taskParam = JSONObject.fromObject(jsonStr)
  19.     // 建立全局惟一的主键,每次执行 main 函数都会生成一个独一无二的 taskUUID,来区分不一样任务,做为写入 MySQL 数据库中那张表的主键
  20.     val taskUUID = UUID.randomUUID().toString
  21.     // 建立 sparkConf
  22.     val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")
  23.     // 建立 sparkSession(包含 sparkContext)
  24.     val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
  25.     // ******************** 需求一:Session 各范围访问步长、访问时长占比统计 ********************
  26.     // 获取原始的动做表数据(带有过滤条件)
  27.     // actionRDD: RDD[UserVisitAction]
  28.     val actionRDD = getOriActionRDD(sparkSession, taskParam)
  29.     // 将用户行为信息转换为 K-V 结构
  30.     // sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
  31.     val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))
  32.     // session2GroupActionRDD: RDD[(sessionId, Iterable[UserVisitAction])]
  33.     val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() // 把同一个 sessionId 的数据聚合到一块儿,获得斧子形数据
  34.     // 将数据进行内存缓存
  35.     session2GroupActionRDD.cache()
  36.     // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)]
  37.     val sessionId2FullAggrInfoRDD = getSessionFullAggrInfo(sparkSession, session2GroupActionRDD)
  38.     // 建立自定义累加器对象
  39.     val sessionStatisticAccumulator = new SessionStatisticAccumulator
  40.     // 在 sparkSession 中注册自定义累加器,这样后面就能够用了
  41.     sparkSession.sparkContext.register(sessionStatisticAccumulator)
  42.     // 根据过滤条件对 sessionId2FullAggrInfoRDD 进行过滤操做,即过滤掉不符合条件的数据,并根据自定义累加器 统计不一样范围的 访问时长 和 访问步长 的 session 个数 以及 总的 session 个数
  43.     // seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]
  44.     val seeionId2FilterRDD = getSessionFilterRDD(taskParam, sessionId2FullAggrInfoRDD, sessionStatisticAccumulator)
  45.     // 必须引入任意一个 action 的算子,才能启动
  46.     seeionId2FilterRDD.foreach(println(_))
  47.     // 计算各个 session 的占比
  48.     getSessionRatio(sparkSession, taskUUID, sessionStatisticAccumulator.value)
  49.     // ******************** 需求二:Session 随机抽取 ********************
  50.     // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)],注意:到这里一个 sessionId 对应一条数据,也就是一个 fullAggrInfo
  51.     sessionRandomExtract(sparkSession, taskUUID, seeionId2FilterRDD)
  52.     // ******************** 需求三:Top10 热门品类统计 ********************
  53.     // sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
  54.     // seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]
  55.     // join 默认是内链接,即不符合条件的不显示(即被过滤掉)
  56.     // 获取全部符合过滤条件的原始的 UserVisitAction 数据
  57.     // seeionId2ActionFilterRDD: RDD[(sessionId, UserVisitAction)]
  58.     val seeionId2ActionFilterRDD = sessionId2ActionRDD.join(seeionId2FilterRDD).map {
  59.       case (sessionId, (userVisitAction, fullAggrInfo)) =>
  60.         (sessionId, userVisitAction)
  61.     }
  62.     val top10CategoryArray = top10PopularCategories(sparkSession, taskUUID, seeionId2ActionFilterRDD)
  63.     // ******************** 需求四:Top10 热门品类的 Top10 活跃 Session 统计 ********************
  64.     // seeionId2ActionFilterRDD: RDD[(sessionId, UserVisitAction)]
  65.     // top10CategoryArray: Array[(sortKey, fullCountInfo)]
  66.     top10ActiveSession(sparkSession, taskUUID, seeionId2ActionFilterRDD, top10CategoryArray)
  67.   }
  68.   // ******************** 需求四:Top10 热门品类的 Top10 活跃 Session 统计 ********************
  69.   /**
  70.     * Top10 热门品类的 Top10 活跃 Session 统计
  71.     *
  72.     * @param sparkSession
  73.     * @param taskUUID
  74.     * @param seeionId2ActionFilterRDD
  75.     * @param top10CategoryArray
  76.     */
  77.   def top10ActiveSession(sparkSession: SparkSession,
  78.                          taskUUID: String,
  79.                          seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)],
  80.                          top10CategoryArray: Array[(SortKey, String)]): Unit = {
  81.     // 第一步:获取全部点击过 Top10 热门品类的 UserVisitAction
  82.     // 第一种方法:Join 方法,该方式须要引发 Shuffle,比较麻烦
  83.     /*
  84.     // 将 top10CategoryArray 转化为 RDD,而后将其 key sortKey 转化为 cid
  85.     val cid2FullCountInfoRDD = sparkSession.sparkContext.makeRDD(top10CategoryArray).map {
  86.       case (sortKey, fullCountInfo) =>
  87.         // 取出 categoryId
  88.         val cid = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLong
  89.         // 返回所需的 RDD
  90.         (cid, fullCountInfo)
  91.     }
  92.     // 将 seeionId2ActionFilterRDD 的 key sessionId 转化为 cid,对其使用 map 操做便可
  93.     val cid2ActionRDD = seeionId2ActionFilterRDD.map {
  94.       case (sessionId, userVisitAction) =>
  95.         val cid = userVisitAction.click_category_id
  96.         (cid, userVisitAction)
  97.     }
  98.     // joinn 操做(即内链接):两边都有的才留下,不然过滤掉
  99.     cid2FullCountInfoRDD.join(cid2ActionRDD).map {
  100.       case (cid, (fullCountInfo, userVisitAction)) =>
  101.         val sid = userVisitAction.session_id
  102.         (sid, userVisitAction)
  103.     }*/
  104.     // 第二种方法:使用 filter
  105.     // cidArray: Array[Long] 包含了 Top10 热门品类的 id
  106.     val cidArray = top10CategoryArray.map {
  107.       case (sortKey, fullCountInfo) =>
  108.         val cid = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLong
  109.         cid
  110.     }
  111.     // 全部符合过滤条件的,而且点击过 Top10 热门品类的 UserVisitAction
  112.     val seeionId2ActionRDD = seeionId2ActionFilterRDD.filter {
  113.       case (sessionId, userVisitAction) =>
  114.         cidArray.contains(userVisitAction.click_category_id)
  115.     }
  116.     // 第二步:先对 全部符合过滤条件的,而且点击过 Top10 热门品类的 UserVisitAction 按照 sessionId 进行聚合
  117.     val seeionId2GroupRDD = seeionId2ActionRDD.groupByKey()
  118.     // 第三步:统计 每个 sessionId 对于点击过的每个品类的点击次数
  119.     // cid2SessionCountRDD: RDD[(cid, sessionN=sessionCount)]
  120.     val cid2SessionCountRDD = seeionId2GroupRDD.flatMap {
  121.       case (sessionId, iterableUserVisitAction) =>
  122.         // 建立 Map,用于保存当前每个 sessionId 对于点击过的每个品类的点击次数
  123.         val categoryCountMap = new mutable.HashMap[Long, Long]()
  124.         for (userVisitAction <- iterableUserVisitAction) {
  125.           val cid = userVisitAction.click_category_id
  126.           if (!categoryCountMap.contains(cid))
  127.             categoryCountMap += (cid -> 0)
  128.           categoryCountMap.update(cid, categoryCountMap(cid) + 1)
  129.         }
  130.         // 该 Map 记录了一个 session 对于它全部点击过的品类的点击次数
  131.         // categoryCountMap
  132.         for ((cid, sessionCount) <- categoryCountMap)
  133.           yield (cid, sessionId + "=" + sessionCount)
  134.     }
  135.     // 第四步:对 cid2SessionCountRDD 进行聚合
  136.     // cid2GroupRDD: RDD[(cid, Iterable[sessionN=sessionCount]))]
  137.     // cid2GroupRDD 的每一条数据都是一个 cid 和它对应的全部点击过它的 sessionId 对它的点击次数
  138.     val cid2GroupRDD = cid2SessionCountRDD.groupByKey()
  139.     // 第五步:取出 top10SessionRDD: RDD[Top10Session]
  140.     val top10SessionRDD = cid2GroupRDD.flatMap {
  141.       case (cid, iterablesSessionCount) =>
  142.         val sortList = iterablesSessionCount.toList.sortWith((item1, item2=> { // true: item1 放在前面
  143.           item1.split("=")(1).toLong > item2.split("=")(1).toLong // item1: sessionCount 字符串类型 sessionIdN=count
  144.         }).take(10)
  145.         // 封装数据,准备写入 MySQL 数据库
  146.         val top10Session = sortList.map {
  147.           case item => {
  148.             val categoryid = cid
  149.             val sessionid = item.split("=")(0)
  150.             val clickCount = item.split("=")(1).toLong
  151.             Top10Session(taskUUID, categoryid, sessionid, clickCount)
  152.           }
  153.         }
  154.         top10Session
  155.     }
  156.     // 写入 MySQL 数据库
  157.     import sparkSession.implicits._
  158.     top10SessionRDD.toDF().write
  159.       .format("jdbc")
  160.       .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
  161.       .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
  162.       .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
  163.       .option("dbtable""top10_session")
  164.       .mode(SaveMode.Append)
  165.       .save()
  166.   }
  167.   // ******************** 需求三:Top10 热门品类统计 ********************
  168.   // ******************** 需求二:Session 随机抽取 ********************
  169.   // ******************** 需求一:Session 各范围访问步长、访问时长占比统计 ********************
  170. }

5.4.8 需求3、四实现思路整理

5.5 需求五:页面单跳转化率统计

5.5.1 需求解析

  计算页面单跳转化率 什么是页面单跳转换率 好比一个用户在一次 Session 过程当中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的几率,好比: 计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,而后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率,咱们记为 C;那么页面 5-7 的转化率怎么求呢?先须要求出符合条件的 Session 中访问页面 5 又紧接着访问了页面 7 的次数为 D,那么 D/B 即为 5-7 的单跳转化率。
  产品经理,能够根据这个指标,去尝试分析整个网站、产品各个页面的表现怎么样,是否是须要去优化产品的布局;吸引用户最终能够进入最后的支付页面。
  数据分析师,能够此数据作更深一步的计算和分析。
  企业管理层, 能够看到整个公司的网站,各个页面的之间的跳转的表现如何,能够适当调整公司的经营战略或策略。
  在如下模块中,须要根据查询对象中设置的 Session 过滤条件,先将对应的 Session 过滤出来,而后根据查询对象中设置的页面路径,计算页面单跳转化率,好比查询的页面路径为:三、五、七、8,那么就要计算 3-五、5-七、7-8 的页面单跳转化率。须要注意的一点是,页面的访问是有前后的。

5.5.2 数据源解析

动做表

5.5.3 数据结构解析

  1. /**
  2.   * 用户访问动做表
  3.   *
  4.   * @param date               用户点击行为的日期
  5.   * @param user_id            用户的 ID
  6.   * @param session_id         Session 的 ID
  7.   * @param page_id            某个页面的 ID
  8.   * @param action_time        点击行为的时间点
  9.   * @param search_keyword     用户搜索的关键词
  10.   * @param click_category_id  某一个商品品类的 ID
  11.   * @param click_product_id   某一个商品的 ID
  12.   * @param order_category_ids 一次订单中全部品类的 ID 集合
  13.   * @param order_product_ids  一次订单中全部商品的 ID 集合
  14.   * @param pay_category_ids   一次支付中全部品类的 ID 集合
  15.   * @param pay_product_ids    一次支付中全部商品的 ID 集合
  16.   * @param city_id            城市 ID
  17.   */
  18. case class UserVisitAction(dateString,
  19.                            user_id: Long,
  20.                            session_id: String,
  21.                            page_id: Long,
  22.                            action_timeString,
  23.                            search_keyword: String,
  24.                            click_category_id: Long,
  25.                            click_product_id: Long,
  26.                            order_category_ids: String,
  27.                            order_product_ids: String,
  28.                            pay_category_ids: String,
  29.                            pay_product_ids: String,
  30.                            city_id: Long)

5.5.4 需求实现简要流程

举例

如何作

5.5.5 需求实现详细流程

5.5.6 MySQL 存储结构解析

  1. -- ----------------------------
  2. --  Table structure for `page_split_convert_rate`
  3. -- ----------------------------
  4. DROP TABLE IF EXISTS `page_split_convert_rate`;
  5. CREATE TABLE `page_split_convert_rate` (
  6.   `taskid` varchar(255DEFAULT NULL,
  7.   `convertRate` varchar(255DEFAULT NULL
  8. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.5.7 代码解析

在 analyse 中新建子模块 page,配置 pom.xml 文件,添加 scala 框架的支持

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <parent>
  6.         <artifactId>analyse</artifactId>
  7.         <groupId>com.atguigu</groupId>
  8.         <version>1.0-SNAPSHOT</version>
  9.     </parent>
  10.     <modelVersion>4.0.0</modelVersion>
  11.     <artifactId>page</artifactId>
  12.     <dependencies>
  13.         <dependency>
  14.             <groupId>com.atguigu</groupId>
  15.             <artifactId>commons</artifactId>
  16.             <version>1.0-SNAPSHOT</version>
  17.         </dependency>
  18.         <!-- Spark 的依赖引入 -->
  19.         <dependency>
  20.             <groupId>org.apache.spark</groupId>
  21.             <artifactId>spark-core_2.11</artifactId>
  22.         </dependency>
  23.         <dependency>
  24.             <groupId>org.apache.spark</groupId>
  25.             <artifactId>spark-sql_2.11</artifactId>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>org.apache.spark</groupId>
  29.             <artifactId>spark-hive_2.11</artifactId>
  30.         </dependency>
  31.         <!-- 引入 Scala -->
  32.         <dependency>
  33.             <groupId>org.scala-lang</groupId>
  34.             <artifactId>scala-library</artifactId>
  35.         </dependency>
  36.     </dependencies>
  37.     <build>
  38.         <plugins>
  39.             <plugin>
  40.                 <!-- scala-maven-plugin 插件用于在任意的 maven 项目中对 scala 代码进行编译/测试/运行/文档化 -->
  41.                 <groupId>net.alchim31.maven</groupId>
  42.                 <artifactId>scala-maven-plugin</artifactId>
  43.             </plugin>
  44.             <plugin>
  45.                 <groupId>org.apache.maven.plugins</groupId>
  46.                 <artifactId>maven-assembly-plugin</artifactId>
  47.                 <configuration>
  48.                     <archive>
  49.                         <manifest>
  50.                             <mainClass>com.atguigu.page.PageOneStepConvertRate</mainClass>
  51.                         </manifest>
  52.                     </archive>
  53.                     <descriptorRefs>
  54.                         <descriptorRef>jar-with-dependencies</descriptorRef>
  55.                     </descriptorRefs>
  56.                 </configuration>
  57.             </plugin>
  58.         </plugins>
  59.     </build>
  60. </project>

示例代码:

  1. package com.atguigu.page
  2. import java.util.UUID
  3. import commons.conf.ConfigurationManager
  4. import commons.constant.Constants
  5. import commons.model.UserVisitAction
  6. import commons.utils.{DateUtils, ParamUtils}
  7. import net.sf.json.JSONObject
  8. import org.apache.spark.SparkConf
  9. import org.apache.spark.sql.{SaveMode, SparkSession}
  10. import org.apache.spark.storage.StorageLevel
  11. import scala.collection.mutable
  12. object PageConvertStat {
  13.   def main(args: Array[String]): Unit = {
  14.     // 获取过滤条件,【为了方便,直接从配置文件中获取,企业中会从一个调度平台获取】
  15.     val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
  16.     // 获取过滤条件对应的 JsonObject 对象
  17.     val taskParam = JSONObject.fromObject(jsonStr)
  18.     // 建立全局惟一的主键,每次执行 main 函数都会生成一个独一无二的 taskUUID,来区分不一样任务,做为写入 MySQL 数据库中那张表的主键
  19.     val taskUUID = UUID.randomUUID().toString
  20.     // 建立 sparkConf
  21.     val sparkConf = new SparkConf().setAppName("pageConvert").setMaster("local[*]")
  22.     // 建立 sparkSession(包含 sparkContext)
  23.     val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
  24.     // ******************** 需求五:页面单跳转化率统计 ********************
  25.     // 获取原始的动做表数据(带有过滤条件)
  26.     // actionRDD: RDD[UserVisitAction]
  27.     val actionRDD = getOriActionRDD(sparkSession, taskParam)
  28.     // 将用户行为信息转换为 K-V 结构
  29.     // sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
  30.     val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))
  31.     // 将数据进行内存缓存
  32.     sessionId2ActionRDD.persist(StorageLevel.MEMORY_ONLY)
  33.     // 目标页面切片:将页面流路径转换为页面切片
  34.     // targetPageFlowStr:"1,2,3,4,5,6,7"
  35.     val targetPageFlowStr = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW)
  36.     // targetPageFlowArray: Array[Long][1,2,3,4,5,6,7]
  37.     val targetPageFlowArray = targetPageFlowStr.split(",")
  38.     // targetPageFlowArray.slice(0, targetPageFlowArray.length - 1): [1,2,3,4,5,6]
  39.     // targetPageFlowArray.tail: [2,3,4,5,6,7]
  40.     // targetPageFlowArray.slice(0, targetPageFlowArray.length - 1).zip(targetPageFlowArray.tail): [(1,2),(2,3),(3,4),(4,5),(5,6),(6,7)]
  41.     val targetPageSplit = targetPageFlowArray.slice(0, targetPageFlowArray.length - 1).zip(targetPageFlowArray.tail).map {
  42.       case (page1page2=>
  43.         (page1 + "_" + page2)
  44.     }
  45.     // 获取实际页面切片
  46.     // 对 <sessionId,访问行为> RDD,作一次 groupByKey 操做,生成页面切片
  47.     val session2GroupActionRDD = sessionId2ActionRDD.groupByKey()
  48.     // realPageSplitNumRDD: RDD[(String1L)]
  49.     val realPageSplitNumRDD = session2GroupActionRDD.flatMap {
  50.       case (sessionId, iterableUserVisitAction) =>
  51.         // item1: UserVisitAction
  52.         // item2: UserVisitAction
  53.         // sortList: List[UserVisitAction] // 排好序的 UserVisitAction
  54.         val sortList = iterableUserVisitAction.toList.sortWith((item1, item2=> {
  55.           DateUtils.parseTime(item1.action_time).getTime < DateUtils.parseTime(item2.action_time).getTime
  56.         })
  57.         // 获取 page 信息
  58.         // pageList: List[Long]
  59.         val pageList = sortList.map {
  60.           case userVisitAction =>
  61.             userVisitAction.page_id
  62.         }
  63.         // pageList.slice(0, pageList.length - 1): List[1,2,3,...,N-1]
  64.         // pageList.tail: List[2,3,4,...,N]
  65.         // pageList.slice(0, pageList.length - 1).zip(pageList.tail): List[(1,2),(2,3),(3,4),...,(N-1,N)]
  66.         val realPageSplit = pageList.slice(0, pageList.length - 1).zip(pageList.tail).map {
  67.           case (page1page2=>
  68.             (page1 + "_" + page2)
  69.         }
  70.         // 过滤:留下存在于 targetPageSplit 中的页面切片
  71.         val realPageSplitFilter = realPageSplit.filter {
  72.           case realPageSplit =>
  73.             targetPageSplit.contains(realPageSplit)
  74.         }
  75.         realPageSplitFilter.map {
  76.           case realPageSplitFilter =>
  77.             (realPageSplitFilter, 1L)
  78.         }
  79.     }
  80.     // 聚合
  81.     // realPageSplitCountMap; Map[(page1_page2count)]
  82.     val realPageSplitCountMap = realPageSplitNumRDD.countByKey()
  83.     realPageSplitCountMap.foreach(println(_))
  84.     val startPage = targetPageFlowArray(0).toLong
  85.     val startPageCount = sessionId2ActionRDD.filter {
  86.       case (sessionId, userVisitAction) =>
  87.         userVisitAction.page_id == startPage.toLong
  88.     }.count()
  89.     println("哈啊哈"+ startPageCount)
  90.     // 获得最后的统计结果
  91.     getPageConvertRate(sparkSession, taskUUID, targetPageSplit, startPageCount, realPageSplitCountMap)
  92.   }
  93.   // ******************** 需求五:页面单跳转化率统计 ********************
  94.   /**
  95.     * 计算页面切片转化率
  96.     *
  97.     * @param sparkSession
  98.     * @param taskUUID
  99.     * @param targetPageSplit
  100.     * @param startPageCount
  101.     * @param realPageSplitCountMap
  102.     */
  103.   def getPageConvertRate(sparkSession: SparkSession,
  104.                          taskUUID: String,
  105.                          targetPageSplit: Array[String],
  106.                          startPageCount: Long,
  107.                          realPageSplitCountMap: collection.Map[String, Long]): Unit = {
  108.     val pageSplitRatioMap = new mutable.HashMap[String, Double]()
  109.     var lastPageCount = startPageCount.toDouble
  110.     // 1_2,2_3,3_4,...
  111.     for (pageSplit <- targetPageSplit) {
  112.       // 第一次循环:currentPageSplitCount: page1_page2   lastPageCount: page1
  113.       // 第二次循环:currentPageSplitCount: page2_page3   lastPageCount: page1_page2
  114.       val currentPageSplitCount = realPageSplitCountMap.get(pageSplit).get.toDouble
  115.       val rate = currentPageSplitCount / lastPageCount
  116.       pageSplitRatioMap.put(pageSplit, rate)
  117.       lastPageCount = currentPageSplitCount
  118.     }
  119.     val convertRateStr = pageSplitRatioMap.map {
  120.       case (pageSplit, rate) =>
  121.         pageSplit + "=" + rate
  122.     }.mkString("|")
  123.     // 封装数据
  124.     val pageSplitConvertRate = PageSplitConvertRate(taskUUID, convertRateStr)
  125.     val pageSplitConvertRateRDD = sparkSession.sparkContext.makeRDD(Array(pageSplitConvertRate))
  126.     // 写入到 MySQL
  127.     import sparkSession.implicits._
  128.     pageSplitConvertRateRDD.toDF().write
  129.       .format("jdbc")
  130.       .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
  131.       .option("dbtable""page_split_convert_rate")
  132.       .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
  133.       .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
  134.       .mode(SaveMode.Append)
  135.       .save()
  136.   }
  137.   /**
  138.     * 根据日期范围获取对象的用户行为数据
  139.     *
  140.     * @param sparkSession
  141.     * @param taskParam
  142.     * @return
  143.     */
  144.   def getOriActionRDD(sparkSession: SparkSession, taskParam: JSONObject) = {
  145.     // 先获取所用到的过滤条件:开始日期 和 结束日期
  146.     val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)
  147.     val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)
  148.     // 把全部的时间范围在 startDate 和 endDate 之间的数据查询出来
  149.     val sql = "select * from user_visit_action where date>='" + startDate + "' and date<='" + endDate + "'"
  150.     // 在对 DataFrame 和 Dataset 进行许多操做都须要这个包进行支持
  151.     import sparkSession.implicits._
  152.     sparkSession.sql(sql).as[UserVisitAction].rdd // DataFrame(Row类型) -> DataSet(样例类类型) -> rdd(样例类)
  153.   }
  154. }

5.5.8 需求五实现思路整理

5.6 需求六:各区域 Top3 商品统计

5.6.1 需求解析

根据用户指定的日期查询条件范围,统计各个区域下的最热门【点击】的 top3 商品,区域信息、各个城市的信息在项目中用固定值进行配置,由于不怎么变更。

  1. 1、查询 task,获取日期范围,经过 Spark SQL,查询 user_visit_action 表中的指定日期范围内的数据,过滤出商品点击行为,click_product_id is not null、click_product_id != 'NULL'、click_product_id != 'null'、city_id、click_product_id。
  2. 2、使用 Spark SQL 从 MySQL 中查询出来城市信息(city_id、city_name、area),用户访问行为数据要跟城市信息进行 join, city_id、city_name、area、product_id, 将 RDD 转换成 DataFrame,注册成一个临时表。
  3. 3、Spark SQL 内置函数(case when),对 area 打标记(华东大区,A 级;华中大区,B 级;东北大区,C 级;西北大区,D 级),area_level。
  4. 4、计算出来每一个区域下每一个商品的点击次数,group by area, product_id 保留每一个区域的城市名称列表;自定义 UDAF,group_concat_distinct()函数,聚合出来一个 city_names 字段,area、product_id、city_names、click_count
  5. 5、join 商品明细表,hive(product_id、product_name、extend_info),extend_info 是 json 类型;自定义 UDF,get_json_object() 函数,取出其中的 product_status 字段, if() 函数(Spark SQL 内置函数),判断: 0 自营,1 第三方(area、product_id、 city_names、click_count、product_name、product_status)。
  6. 6、开窗函数,根据 area 来聚合,获取每一个 area 下,click_count 排名前 3 的 product 信息:areaarea_level、product_id、city_names、click_count、product_name、product_status
  7. 7、结果写入 MySQL 表中。

5.6.2 数据源解析

商品信息表 + 用户访问行为表

5.6.3 数据结构解析

城市信息(城市 ID,城市名称,区域名称)

Array((0L, "北京""华北"), (1L, "上海""华东"), (2L, "南京""华东"), (3L, "广州""华南"), (4L, "三亚""华南"), (5L, "武汉""华中"), (6L, "长沙""华中"), (7L, "西安""西北"), (8L, "成都""西南"), (9L, "哈尔滨""东北"))

5.6.4 需求实现简要流程

5.6.5 需求实现详细流程

5.6.6 MySQL 存储结构解析

  1. -- ----------------------------
  2. --  Table structure for `area_top3_product`
  3. -- ----------------------------
  4. DROP TABLE IF EXISTS `area_top3_product`; 
  5. CREATE TABLE `area_top3_product` (
  6.   `taskid` varchar(255DEFAULT NULL,
  7.   `area` varchar(255DEFAULT NULL,
  8.   `areaLevel` varchar(255DEFAULT NULL,
  9.   `productid` int(11DEFAULT NULL,
  10.   `cityInfos` varchar(255DEFAULT NULL,
  11.   `clickCount` int(11DEFAULT NULL,
  12.   `productName` varchar(255DEFAULT NULL,
  13.   `productStatus` varchar(255DEFAULT NULL
  14. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.6.7 代码解析

在 analyse 中新建子模块 product,配置 pom.xml 文件,添加 scala 框架的支持

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <parent>
  6.         <artifactId>analyse</artifactId>
  7.         <groupId>com.atguigu</groupId>
  8.         <version>1.0-SNAPSHOT</version>
  9.     </parent>
  10.     <modelVersion>4.0.0</modelVersion>
  11.     <artifactId>product</artifactId>
  12.     <dependencies>
  13.         <dependency>
  14.             <groupId>com.atguigu</groupId>
  15.             <artifactId>commons</artifactId>
  16.             <version>1.0-SNAPSHOT</version>
  17.         </dependency>
  18.         <!-- Spark 的依赖引入 -->
  19.         <dependency>
  20.             <groupId>org.apache.spark</groupId>
  21.             <artifactId>spark-core_2.11</artifactId>
  22.         </dependency>
  23.         <dependency>
  24.             <groupId>org.apache.spark</groupId>
  25.             <artifactId>spark-sql_2.11</artifactId>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>org.apache.spark</groupId>
  29.             <artifactId>spark-hive_2.11</artifactId>
  30.         </dependency>
  31.         <!-- 引入 Scala -->
  32.         <dependency>
  33.             <groupId>org.scala-lang</groupId>
  34.             <artifactId>scala-library</artifactId>
  35.         </dependency>
  36.     </dependencies>
  37.     <build>
  38.         <plugins>
  39.             <plugin>
  40.                 <!-- scala-maven-plugin 插件用于在任意的 maven 项目中对 scala 代码进行编译/测试/运行/文档化 -->
  41.                 <groupId>net.alchim31.maven</groupId>
  42.                 <artifactId>scala-maven-plugin</artifactId>
  43.             </plugin>
  44.             <plugin>
  45.                 <groupId>org.apache.maven.plugins</groupId>
  46.                 <artifactId>maven-assembly-plugin</artifactId>
  47.                 <configuration>
  48.                     <archive>
  49.                         <manifest>
  50.                             <mainClass>com.atguigu.product.AreaTop3ProductApp</mainClass>
  51.                         </manifest>
  52.                     </archive>
  53.                     <descriptorRefs>
  54.                         <descriptorRef>jar-with-dependencies</descriptorRef>
  55.                     </descriptorRefs>
  56.                 </configuration>
  57.             </plugin>
  58.         </plugins>
  59.     </build>
  60. </project>

示例代码:

  1. package com.atguigu.product
  2. import java.util.UUID
  3. import commons.conf.ConfigurationManager
  4. import commons.constant.Constants
  5. import commons.utils.ParamUtils
  6. import net.sf.json.JSONObject
  7. import org.apache.spark.SparkConf
  8. import org.apache.spark.rdd.RDD
  9. import org.apache.spark.sql.{SaveMode, SparkSession}
  10. object AreaTop3ProductStat {
  11.   def main(args: Array[String]): Unit = {
  12.     // 获取过滤条件,【为了方便,直接从配置文件中获取,企业中会从一个调度平台获取】
  13.     val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
  14.     // 获取过滤条件对应的 JsonObject 对象
  15.     val taskParam = JSONObject.fromObject(jsonStr)
  16.     // 建立全局惟一的主键,每次执行 main 函数都会生成一个独一无二的 taskUUID,来区分不一样任务,做为写入 MySQL 数据库中那张表的主键
  17.     val taskUUID = UUID.randomUUID().toString
  18.     // 建立 sparkConf
  19.     val sparkConf = new SparkConf().setAppName("product").setMaster("local[*]")
  20.     // 建立 sparkSession(包含 sparkContext)
  21.     val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
  22.     // ******************** 需求六:各区域 Top3 商品统计 ********************
  23.     // 获取 用户访问行为的数据 (cityId, clickProductId)
  24.     // cityId2ProductIdRDD: RDD[(cityId, clickProductId)]
  25.     val cityId2ProductIdRDD = getCityAndProductInfo(sparkSession, taskParam)
  26.     // 获取 城市信息 (城市 ID,城市名称,区域名称)
  27.     // cityId2AreaInfoRDD: RDD[(cityId, CityAreaInfo)]
  28.     val cityId2AreaInfoRDD = getCityAreaInfo(sparkSession)
  29.     // 作 Join 操做,获得 (city_id, city_name, area, click_product_id)
  30.     // 临时表 temp_area_product_info 中的一条数据就表明一次点击商品的行为
  31.     getAreaProductIdBasicInfoTable(sparkSession, cityId2ProductIdRDD, cityId2AreaInfoRDD)
  32.     // 自定义 UDF 函数:实现字符串带去重的拼接
  33.     sparkSession.udf.register("concat_long_string", (v1: Long, v2String, split: String=> {
  34.       v1 + split + v2
  35.     })
  36.     sparkSession.udf.register("group_concat_distinct", new GroupConcatDistinct)
  37.     // 统计 每个区域里每个商品被点击的次数,获得 (area, click_product_id, click_count, city_infos)
  38.     getAreaProductClickCountTable(sparkSession)
  39.     // 自定义 UDAF 函数:实现从 json 串中取出指定字段的值
  40.     sparkSession.udf.register("get_json_field", (json: String, field: String=> {
  41.       val jsonObject = JSONObject.fromObject(json)
  42.       jsonObject.getString(field)
  43.     })
  44.     // 将 temp_area_product_count 表 join 商品信息表 product_info
  45.     getAreaProductClickCountInfo(sparkSession)
  46.     // 获取 各区域 Top3 商品(使用到了开窗函数)
  47.     getAreaTop3Product(sparkSession, taskUUID)
  48.     // 测试
  49.     // sparkSession.sql("select * from temp_area_product_info").show
  50.     // sparkSession.sql("select * from temp_area_product_count").show
  51.     // sparkSession.sql("select * from temp_area_count_product_info").show
  52.     // sparkSession.sql("select * from temp_test").show
  53.   }
  54.   /**
  55.     * 获取 各区域 Top3 商品(使用了开窗函数)
  56.     *
  57.     * @param sparkSession
  58.     * @param taskUUID
  59.     * @return
  60.     */
  61.   def getAreaTop3Product(sparkSession: SparkSession, taskUUID: String= {
  62. //    val sql = "select area, city_infos, click_product_id, click_count, product_name, product_status, " +
  63. //      "row_number() over(partition by area order by click_count desc) row_number from temp_area_count_product_info"
  64. //    sparkSession.sql(sql).createOrReplaceTempView("temp_test"// 测试
  65.     val sql = "select area, " +
  66.       "case " +
  67.       "when area='华北' or area='华东' then 'A_Level' " +
  68.       "when area='华中' or area='华男' then 'B_Level' " +
  69.       "when area='西南' or area='西北' then 'C_Level' " +
  70.       "else 'D_Level' " +
  71.       "end area_level, " +
  72.       "city_infos, click_product_id, click_count, product_name, product_status from (" +
  73.       "select area, city_infos, click_product_id, click_count, product_name, product_status, " +
  74.       "row_number() over(partition by area order by click_count desc) row_number from temp_area_count_product_info) t where row_number <= 3"
  75.     val areaTop3ProductRDD = sparkSession.sql(sql).rdd.map {
  76.       case row =>
  77.         AreaTop3Product(taskUUID, row.getAs[String]("area"), row.getAs[String]("area_level"),
  78.           row.getAs[Long]("click_product_id"), row.getAs[String]("city_infos"),
  79.           row.getAs[Long]("click_count"), row.getAs[String]("product_name"),
  80.           row.getAs[String]("product_status"))
  81.     }
  82.     import sparkSession.implicits._
  83.     areaTop3ProductRDD.toDF().write
  84.       .format("jdbc")
  85.       .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
  86.       .option("dbtable""area_top3_product")
  87.       .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
  88.       .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
  89.       .mode(SaveMode.Append)
  90.       .save()
  91.   }
  92.   /**
  93.     * 将 temp_area_product_count 表 join 商品信息表
  94.     *
  95.     * @param sparkSession
  96.     */
  97.   def getAreaProductClickCountInfo(sparkSession: SparkSession): Unit = {
  98.     // temp_area_product_count: (area, click_product_id, click_count, city_infos) tapc
  99.     // product_info: (product_id, product_name, extend_info)  pi
  100.     val sql = "select tapc.area, tapc.city_infos, tapc.click_product_id, tapc.click_count, pi.product_name, " +
  101.       "if (get_json_field(pi.extend_info, 'product_status') = '0', 'Self', 'Third Party') product_status" +
  102.       " from temp_area_product_count tapc join product_info pi on tapc.click_product_id = pi.product_id"
  103.     sparkSession.sql(sql).createOrReplaceTempView("temp_area_count_product_info")
  104.   }
  105.   /**
  106.     * 统计 每个区域里每个商品被点击的次数
  107.     */
  108.   def getAreaProductClickCountTable(sparkSession: SparkSession): Unit = {
  109.     val sql = "select area, click_product_id, count(*) click_count," +
  110.       " group_concat_distinct(concat_long_string(city_id, city_name, ':')) city_infos" +
  111.       " from temp_area_product_info group by area, click_product_id"
  112.     sparkSession.sql(sql).createOrReplaceTempView("temp_area_product_count")
  113.   }
  114.   /**
  115.     * 将 用户访问行为的数据 (cityId, clickProductId) 和 城市信息 (城市 ID,城市名称,区域名称) 作 join 操做,获得所需的临时表数据
  116.     *
  117.     * @param sparkSession
  118.     * @param cityId2ProductIdRDD
  119.     * @param cityId2AreaInfoRDD
  120.     */
  121.   def getAreaProductIdBasicInfoTable(sparkSession: SparkSession,
  122.                                      cityId2ProductIdRDD: RDD[(Long, Long)],
  123.                                      cityId2AreaInfoRDD: RDD[(Long, CityAreaInfo)]): Unit = {
  124.     val areaProductIdBasicInfoRDD = cityId2ProductIdRDD.join(cityId2AreaInfoRDD).map {
  125.       case (cityId, (clickProductId, cityAreaInfo)) =>
  126.         (cityId, cityAreaInfo.city_name, cityAreaInfo.area, clickProductId)
  127.     }
  128.     import sparkSession.implicits._
  129.     // 转换为临时表的时候须要指定字段的名称
  130.     areaProductIdBasicInfoRDD.toDF("city_id""city_name""area""click_product_id").createOrReplaceTempView("temp_area_product_info")
  131.   }
  132.   /**
  133.     * 获取 城市信息(城市 ID,城市名称,区域名称)
  134.     *
  135.     * @param sparkSession
  136.     */
  137.   def getCityAreaInfo(sparkSession: SparkSession) = {
  138.     val cityAreaInfoArray = Array((0L, "北京""华北"), (1L, "上海""华东"), (2L, "南京""华东"),
  139.       (3L, "广州""华南"), (4L, "三亚""华南"), (5L, "武汉""华中"),
  140.       (6L, "长沙""华中"), (7L, "西安""西北"), (8L, "成都""西南"),
  141.       (9L, "哈尔滨""东北"))
  142.     // RDD[(cityId, CityAreaInfo)]
  143.     sparkSession.sparkContext.makeRDD(cityAreaInfoArray).map {
  144.       case (cityId, cityName, area=>
  145.         (cityId, CityAreaInfo(cityId, cityName, area))
  146.     }
  147.   }
  148.   /**
  149.     * 获取 用户访问行为的数据 (city_id, click_product_id)
  150.     *
  151.     * @param sparkSession
  152.     * @param taskParam
  153.     * @return
  154.     */
  155.   def getCityAndProductInfo(sparkSession: SparkSession, taskParam: JSONObject) = {
  156.     val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)
  157.     val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)
  158.     // 只获取发生过点击的 action 的数据,获取到的一条 action 数据就表明一个点击行为
  159.     val sql = "select city_id, click_product_id from user_visit_action where date>='" + startDate +
  160.       "' and date<='" + endDate + "' and click_product_id != -1"
  161.     import sparkSession.implicits._
  162.     sparkSession.sql(sql).as[CityClickProduct].rdd.map {
  163.       case cityIdAndProductId =>
  164.         (cityIdAndProductId.city_id, cityIdAndProductId.click_product_id)
  165.     }
  166.   }
  167. }

自定义弱类型的 UDAF 函数

  1. package com.atguigu.product
  2. import org.apache.spark.sql.Row
  3. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
  4. import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
  5. /**
  6.   * 自定义弱类型的 UDAF 函数
  7.   */
  8. class GroupConcatDistinct extends UserDefinedAggregateFunction {
  9.   // 设置 UDAF 函数的输入类型为 String
  10.   override def inputSchema: StructType = StructType(StructField("cityInfoInput", StringType) :: Nil)
  11.   // 设置 UDAF 函数的缓冲区类型为 String
  12.   override def bufferSchema: StructType = StructType(StructField("cityInfoBuffer", StringType) :: Nil)
  13.   // 设置 UDAF 函数的输出类型为 String
  14.   override def dataType: DataType = StringType
  15.   // 设置 UDAF 函数的输入数据和输出数据是一致的
  16.   override def deterministic: Boolean = true
  17.   // 初始化自定义的 UDAF 函数
  18.   override def initialize(buffer: MutableAggregationBuffer): Unit = {
  19.     buffer(0= ""
  20.   }
  21.   // 设置 UDAF 函数的缓冲区更新:实现一个字符串带去重的拼接
  22.   override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  23.     var cityInfoBuffer = buffer.getString(0)
  24.     val cityInfoInput = input.getString(0)
  25.     if (!cityInfoBuffer.contains(cityInfoInput)) {
  26.       if ("".equals(cityInfoBuffer)) {
  27.         cityInfoBuffer += cityInfoInput
  28.       } else {
  29.         cityInfoBuffer += "," + cityInfoInput
  30.       }
  31.     }
  32.     buffer.update(0, cityInfoBuffer)
  33.   }
  34.   // 把两个自定义的 UDAF 函数的值合并在一块儿
  35.   override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  36.     // cityInfoBuffer1: cityId1:cityName1, cityId2:cityName2, cityId3:cityName3, ...
  37.     var cityInfoBuffer1 = buffer1.getString(0)
  38.     // cityInfoBuffer2: cityId1:cityName1, cityId2:cityName2, cityId3:cityName3, ...
  39.     val cityInfoBuffer2 = buffer2.getString(0)
  40.     // 将 cityInfoBuffer2 中的数据带去重的加入到 cityInfoBuffer1 中
  41.     for (cityInfo <- cityInfoBuffer2.split(",")) {
  42.       if (!cityInfoBuffer1.contains(cityInfo)) {
  43.         if ("".equals(cityInfoBuffer1)) {
  44.           cityInfoBuffer1 += cityInfo
  45.         } else {
  46.           cityInfoBuffer1 += "," + cityInfo
  47.         }
  48.       }
  49.     }
  50.     buffer1.update(0, cityInfoBuffer1)
  51.   }
  52.   // 返回结果
  53.   override def evaluate(buffer: Row): Any = {
  54.     buffer.getString(0)
  55.   }
  56. }

AreaTop3Product 类

  1. /**
  2.   * 各省 top3 热门广告
  3.   *
  4.   * @author
  5.   *
  6.   */
  7. case class AdProvinceTop3(dateString,
  8.                           province: String,
  9.                           adid: Long,
  10.                           clickCount: Long)

5.6.8 需求六实现思路整理

5.7 需求七:广告点击黑名单实时统计

广告流量实时统计【概览】

5.7.1 需求解析

  实现实时的动态黑名单机制:将天天对某个广告点击超过 100 次的用户拉黑。

5.7.2 数据源解析

Kafka 数据:timestamp province city userid adid

5.7.3 数据结构解析

5.7.4 需求实现简要流程

5.7.5 需求实现详细流程

5.7.6 MySQL 存储结构解析

  1. -- ----------------------------
  2. --  Table structure for `ad_blacklist`
  3. -- ----------------------------
  4. DROP TABLE IF EXISTS `ad_blacklist`; 
  5. CREATE TABLE `ad_blacklist` (
  6.   `userid` int(11DEFAULT NULL
  7. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  8. -- ----------------------------
  9. --  Table structure for `ad_user_click_count`
  10. -- ----------------------------
  11. DROP TABLE IF EXISTS `ad_user_click_count`; 
  12. CREATE TABLE `ad_user_click_count` (
  13.   `datevarchar(30DEFAULT NULL,
  14.   `userid` int(11DEFAULT NULL,
  15.   `adid` int(11DEFAULT NULL,
  16.   `clickCount` int(11DEFAULT NULL
  17. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.7.7 代码解析

在 analyse 中新建子模块 advertising,配置 pom.xml 文件,添加 scala 框架的支持

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <parent>
  6.         <artifactId>analyse</artifactId>
  7.         <groupId>com.atguigu</groupId>
  8.         <version>1.0-SNAPSHOT</version>
  9.     </parent>
  10.     <modelVersion>4.0.0</modelVersion>
  11.     <artifactId>advertising</artifactId>
  12.     <dependencies>
  13.         <dependency>
  14.             <groupId>com.atguigu</groupId>
  15.             <artifactId>commons</artifactId>
  16.             <version>1.0-SNAPSHOT</version>
  17.         </dependency>
  18.         <!-- Spark 的依赖引入 -->
  19.         <dependency>
  20.             <groupId>org.apache.spark</groupId>
  21.             <artifactId>spark-core_2.11</artifactId>
  22.         </dependency>
  23.         <dependency>
  24.             <groupId>org.apache.spark</groupId>
  25.             <artifactId>spark-sql_2.11</artifactId>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>org.apache.spark</groupId>
  29.             <artifactId>spark-streaming_2.11</artifactId>
  30.         </dependency>
  31.         <dependency>
  32.             <groupId>org.apache.spark</groupId>
  33.             <artifactId>spark-hive_2.11</artifactId>
  34.         </dependency>
  35.         <!-- 引入 Scala -->
  36.         <dependency>
  37.             <groupId>org.scala-lang</groupId>
  38.             <artifactId>scala-library</artifactId>
  39.         </dependency>
  40.         <dependency>
  41.             <groupId>org.apache.kafka</groupId>
  42.             <artifactId>kafka-clients</artifactId>
  43.             <version>0.10.2.1</version>
  44.         </dependency>
  45.         <dependency>
  46.             <groupId>org.apache.spark</groupId>
  47.             <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  48.             <version>${spark.version}</version>
  49.         </dependency>
  50.     </dependencies>
  51.     <build>
  52.         <plugins>
  53.             <plugin>
  54.                 <!-- scala-maven-plugin 插件用于在任意的 maven 项目中对 scala 代码进行编译/测试/运行/文档化 -->
  55.                 <groupId>net.alchim31.maven</groupId>
  56.                 <artifactId>scala-maven-plugin</artifactId>
  57.             </plugin>
  58.             <plugin>
  59.                 <groupId>org.apache.maven.plugins</groupId>
  60.                 <artifactId>maven-assembly-plugin</artifactId>
  61.                 <configuration>
  62.                     <archive>
  63.                         <manifest>
  64.                             <mainClass>com.atguigu.stream.AdClickRealTimeStat</mainClass>
  65.                         </manifest>
  66.                     </archive>
  67.                     <descriptorRefs>
  68.                         <descriptorRef>jar-with-dependencies</descriptorRef>
  69.                     </descriptorRefs>
  70.                 </configuration>
  71.             </plugin>
  72.         </plugins>
  73.     </build>
  74. </project>

准备工做:数据生成与数据消费测试

一、先启动 zookeeper 集群,再启动 kafka 集群

  1. [atguigu@hadoop102 ~]zkstart.sh
  2. [atguigu@hadoop102 ~]kafka-start.sh 

二、启动一个 kafka 消费者进程

  1. 老版本:消费者会将本身的 offset 文件保存在 zookeeper(低版本的kafka)。因此消费者链接的是 zookeeper。
  2. [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic AdRealTimeLog1
  3. 新版本:消费者会将本身的 offset 文件保存在 kafka 集群中(高版本的kafka)。因此消费者链接的是 kafka。这样作的好处是:提升了效率,减小了网络传输。
  4. [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic AdRealTimeLog1

三、启动数据生成程序 MockRealTimeData

  1. package com.atguigu.stream
  2. import commons.conf.ConfigurationManager
  3. import commons.constant.Constants
  4. import org.apache.kafka.common.serialization.StringDeserializer
  5. import org.apache.spark.SparkConf
  6. import org.apache.spark.sql.SparkSession
  7. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
  8. import org.apache.spark.streaming.{Seconds, StreamingContext}
  9. object AdClickRealTimeStat {
  10.   def main(args: Array[String]): Unit = {
  11.     // 构建 Spark 上下文
  12.     val sparkConf = new SparkConf().setAppName("streamingRecommendingSystem").setMaster("local[*]")
  13.     // 建立 Spark 客户端
  14.     val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  15.     val sc = spark.sparkContext
  16.     val ssc = new StreamingContext(sc, Seconds(5))
  17.     // 设置检查点目录
  18.     ssc.checkpoint("./streaming_checkpoint")
  19.     // 获取 kafka 的配置
  20.     val kafka_brokers = ConfigurationManager.config.getString(Constants.KAFKA_BROKERS)
  21.     val kafka_topics = ConfigurationManager.config.getString(Constants.KAFKA_TOPICS)
  22.     // kafka 消费者参数配置
  23.     val kafkaParam = Map(
  24.       "bootstrap.servers" -> kafka_brokers, // 用于初始化链接到集群的地址
  25.       "key.deserializer" -> classOf[StringDeserializer],
  26.       "value.deserializer" -> classOf[StringDeserializer],
  27.       "group.id" -> "commerce-consumer-group"// 用于标识这个消费者属于哪一个消费团体(组)
  28.       "auto.offset.reset" -> "latest"// 若是没有初始化偏移量或者当前的偏移量不存在任何服务器上,可使用这个配置属性,latest 表示自动重置偏移量为最新的偏移量
  29.       "enable.auto.commit" -> (false: java.lang.Boolean// 若是是 true,则这个消费者的偏移量会在后台自动提交
  30.     )
  31.     // 建立 DStream,返回接收到的输入数据
  32.     // LocationStrategies:                  根据给定的主题和集群地址建立 consumer
  33.     // LocationStrategies.PreferConsistent: 持续的在全部 Executor 之间均匀分配分区(即把 Executor 当成 Kafka Consumer),经常使用该方式
  34.     // ConsumerStrategies:                  选择如何在 Driver 和 Executor 上建立和配置 Kafka Consumer
  35.     // ConsumerStrategies.Subscribe:        订阅一系列主题
  36.     // adRealTimeLogDStream: DStream[RDD, RDD, RDD, ...] -> RDD[Message] -> Message: key value
  37.     val adRealTimeLogDStream = KafkaUtils.createDirectStream[StringString](
  38.       ssc,
  39.       LocationStrategies.PreferConsistent,
  40.       ConsumerStrategies.Subscribe[StringString](Array(kafka_topics), kafkaParam))
  41.     // 取出 DStream 里面每一条数据的 value 值
  42.     // adRealTimeLogDStream: DStream[RDD, RDD, RDD, ...] -> RDD[String] -> String: timestamp province city userid adid
  43.     val adRealTimeValueDStream = adRealTimeLogDStream.map(consumerRecordRDD => consumerRecordRDD.value())
  44.     // 用于 Kafka Stream 的线程非安全问题,从新分区切断血统
  45.     // adRealTimeValueDStream = adRealTimeValueDStream.repartition(400)
  46.     // 根据黑名单进行数据过滤
  47.     // 刚刚接受到原始的用户点击行为日志以后
  48.     // 根据 mysql 中的动态黑名单,进行实时的黑名单过滤(黑名单用户的点击行为,直接过滤掉,不要了)
  49.     // 使用 transform 算子(将 dstream 中的每一个 batch RDD 进行处理,转换为任意的其余 RDD,功能很强大)
  50.     val adRealTimeFilterDStream = adRealTimeValueDStream.transform {
  51.       consumerRecordRDD =>
  52.         // 首先从 MySQL 中查询全部黑名单用户
  53.         // adBlacklists: Array[AdBlacklist]  AdBlacklist: userId
  54.         val adBlacklistArray = AdBlacklistDAO.findAll()
  55.         // userIdArray: Array[Long]  [userId1, userId2, ...]
  56.         val userIdArray = adBlacklistArray.map(item => item.userid)
  57.         consumerRecordRDD.filter {
  58.           // consumerRecord: timestamp province city userid adid
  59.           case consumerRecord =>
  60.             val consumerRecordSplit = consumerRecord.split(" ")
  61.             val userId = consumerRecordSplit(3).toLong
  62.             !userIdArray.contains(userId)
  63.         }
  64.     }
  65.     adRealTimeFilterDStream.foreachRDD(rdd => rdd.foreach(println(_)))
  66.     ssc.start()
  67.     ssc.awaitTermination()
  68.   }
  69. }

四、启动数据消费程序 AdClickRealTimeStat

  1. import java.util.Properties
  2. import commons.conf.ConfigurationManager
  3. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
  4. import scala.collection.mutable.ArrayBuffer
  5. import scala.util.Random
  6. object MockRealTimeData {
  7.   def main(args: Array[String]): Unit = {
  8.     // 获取配置文件 commerce.properties 中的 Kafka 配置参数
  9.     val broker = ConfigurationManager.config.getString("kafka.broker.list")
  10.     val topic = ConfigurationManager.config.getString("kafka.topics")
  11.     // 建立 Kafka 生产者
  12.     val kafkaProducer = createKafkaProducer(broker)
  13.     while (true) {
  14.       // 随机产生实时数据并经过 Kafka 生产者发送到 Kafka 集群中
  15.       for (item <- generateMockData()) {
  16.         kafkaProducer.send(new ProducerRecord[StringString](topic, item))
  17.       }
  18.       Thread.sleep(5000)
  19.     }
  20.   }
  21.   /**
  22.     * 实时模拟数据的生成
  23.     *
  24.     * 时间点: 当前时间毫秒
  25.     * userId: 0 - 99
  26.     * 省份、城市 ID 相同: 1 - 9
  27.     * adid: 0 - 19
  28.     * ((0L,"北京","北京"),(1L,"上海","上海"),(2L,"南京","江苏省"),(3L,"广州","广东省"),(4L,"三亚","海南省"),(5L,"武汉","湖北省"),(6L,"长沙","湖南省"),(7L,"西安","陕西省"),(8L,"成都","四川省"),(9L,"哈尔滨","东北省"))
  29.     *
  30.     * 格式 :timestamp province city userid adid
  31.     *       某个时间点 某个省份 某个城市 某个用户 某个广告
  32.     */
  33.   def generateMockData(): Array[String= {
  34.     val array = ArrayBuffer[String]()
  35.     val random = new Random()
  36.     // 模拟实时数据:timestamp province city userid adid
  37.     for (i <0 until 50) {
  38.       val timestamp = System.currentTimeMillis()
  39.       val province = random.nextInt(10)
  40.       val city = province
  41.       val adid = random.nextInt(20)
  42.       val userid = random.nextInt(100)
  43.       // 拼接实时数据
  44.       array += timestamp + " " + province + " " + city + " " + userid + " " + adid
  45.     }
  46.     array.toArray
  47.   }
  48.   def createKafkaProducer(broker: String): KafkaProducer[StringString= {
  49.     // 建立配置对象
  50.     val prop = new Properties()
  51.     // 添加配置
  52.     prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
  53.     prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  54.     prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  55.     // 根据配置建立 Kafka 生产者
  56.     new KafkaProducer[StringString](prop)
  57.   }
  58. }

五、查看控制台消费者的输出 以及 Linux 上的 kafka 消费者的输出,没有问题!下面实现具体的需求。

业务代码

  1.   /**
  2.     * 需求七:广告点击黑名单实时统计
  3.     *
  4.     * @param adRealTimeFilterDStream
  5.     */
  6.   def generateBlackListStat(adRealTimeFilterDStream: DStream[String]) = {
  7.     // adRealTimeFilterDStream: DStream[RDD, RDD, RDD, ...] -> RDD[String] -> String: timestamp province city userid adid
  8.     // key2NumDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(key1L)]
  9.     val key2NumDStream = adRealTimeFilterDStream.map {
  10.       // consumerRecordRDD: timestamp province city userid adid
  11.       case consumerRecordRDD =>
  12.         val consumerRecordSplit = consumerRecordRDD.split(" ")
  13.         val timestamp = consumerRecordSplit(0).toLong
  14.         // dateKey: yyyyMMdd
  15.         val dateKey = DateUtils.formatDateKey(new Date(timestamp))
  16.         val userid = consumerRecordSplit(3).toLong
  17.         val adid = consumerRecordSplit(4).toLong
  18.         val key = dateKey + "_" + userid + "_" + adid // 组合 key
  19.         (key1L)
  20.     }
  21.     // key2CountDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(key25)]
  22.     val key2CountDStream = key2NumDStream.reduceByKey(_ + _)
  23.     // 根据每个 RDD 里面的数据,更新 用户点击次数表 数据
  24.     key2CountDStream.foreachRDD {
  25.       rdd => rdd.foreachPartition {
  26.         items =>
  27.           val clickCountArray = new ArrayBuffer[AdUserClickCount]()
  28.           for ((keycount<- items) {
  29.             // 切割数据
  30.             val keySplit = key.split("_")
  31.             // 取出数据
  32.             val date = keySplit(0)
  33.             val userid = keySplit(1).toLong
  34.             val adid = keySplit(2).toLong
  35.             // 封装数据,并放入 ArrayBuffer 中
  36.             clickCountArray += AdUserClickCount(date, userid, adid, count)
  37.           }
  38.           // 更新 MySQl 数据库中表的数据
  39.           AdUserClickCountDAO.updateBatch(clickCountArray.toArray)
  40.       }
  41.     }
  42.     // 对 DStream 作 filter 操做:就是遍历 DStream 中的每个 RDD 中的每一条数据
  43.     // key2BlackListDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(key150)]
  44.     val key2BlackListDStream = key2CountDStream.filter {
  45.       case (keycount=>
  46.         // 切割数据
  47.         val keySplit = key.split("_")
  48.         // 取出数据
  49.         val date = keySplit(0)
  50.         val userid = keySplit(1).toLong
  51.         val adid = keySplit(2).toLong
  52.         val clickCount = AdUserClickCountDAO.findClickCountByMultiKey(date, userid, adid)
  53.         if (clickCount > 100) {
  54.           true // 留下
  55.         } else {
  56.           false // 过滤掉
  57.         }
  58.     }
  59.     // key2BlackListDStream.map: DStream[RDD[userid]]
  60.     val userIdDStream = key2BlackListDStream.map {
  61.       case (keycount=>
  62.         key.split("_")(1).toLong
  63.     }.transform(rdd => rdd.distinct()) // 转换 key 并去重
  64.     // 将结果批量插入 MySQL 数据库中
  65.     userIdDStream.foreachRDD {
  66.       rdd => rdd.foreachPartition {
  67.         items =>
  68.           val userIdArray = new ArrayBuffer[AdBlacklist]()
  69.           for (userId <- items) {
  70.             userIdArray += AdBlacklist(userId)
  71.           }
  72.           AdBlacklistDAO.insertBatch(userIdArray.toArray)
  73.       }
  74.     }
  75.   }

AdBlacklist 类

  1. /**
  2.   * 广告黑名单
  3.   *
  4.   * @author
  5.   *
  6.   */
  7. case class AdBlacklist(userid: Long)
  8. /**
  9.   * 用户广告点击量
  10.   *
  11.   * @author
  12.   *
  13.   */
  14. case class AdUserClickCount(dateString,
  15.                             userid: Long,
  16.                             adid: Long,
  17.                             clickCount: Long)

5.7.8 需求七实现思路整理

5.8 需求八:各省各城市广告点击量实时统计

5.8.1 需求解析

5.8.2 数据源解析

Kafka 数据:timestamp province city userid adid

5.8.3 数据结构解析

5.8.4 需求实现简要流程

5.8.5 需求实现详细流程

5.8.6 MySQL 存储结构解析

  1. -- ----------------------------
  2. --  Table structure for `ad_stat`
  3. -- ----------------------------
  4. DROP TABLE IF EXISTS `ad_stat`; 
  5. CREATE TABLE `ad_stat` (
  6.   `datevarchar(30DEFAULT NULL,
  7.   `province` varchar(100DEFAULT NULL,
  8.   `city` varchar(100DEFAULT NULL,
  9.   `adid` int(11DEFAULT NULL,
  10.   `clickCount` int(11DEFAULT NULL
  11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.8.7 代码解析

业务代码

  1.   /**
  2.     * 需求八:各省各城市广告点击量实时统计(使用累计统计)
  3.     *
  4.     * @param adRealTimeFilterDStream
  5.     */
  6.   def provinceCityClickStat(adRealTimeFilterDStream: DStream[String]) = {
  7.     // adRealTimeFilterDStream: DStream[RDD, RDD, RDD, ...] -> RDD[String] -> String: timestamp province city userid adid
  8.     // key2NumDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(key1L)]
  9.     val key2NumDStream = adRealTimeFilterDStream.map {
  10.       case consumerRecordRDD =>
  11.         val consumerRecordSplit = consumerRecordRDD.split(" ")
  12.         val timestamp = consumerRecordSplit(0).toLong
  13.         // dateKey: yyyyMMdd
  14.         val dateKey = DateUtils.formatDateKey(new Date(timestamp))
  15.         val province = consumerRecordSplit(1)
  16.         val city = consumerRecordSplit(2)
  17.         val adid = consumerRecordSplit(4)
  18.         val key = dateKey + "_" + province + "_" + city + "_" + adid // 组合 key
  19.         (key1L)
  20.     }
  21.     // 执行 updateStateByKey 操做(全局的累计性的操做)
  22.     val key2StateDStream = key2NumDStream.updateStateByKey[Long] {
  23.       (values: Seq[Long], state: Option[Long]) =>
  24.         var newValue = 0L
  25.         if (state.isDefined) {
  26.           newValue = state.get
  27.         }
  28.         for (value <values) {
  29.           newValue += value
  30.         }
  31.         Some(newValue)
  32.     }
  33.     // 将结果批量插入 MySQL 数据库中
  34.     key2StateDStream.foreachRDD {
  35.       rdd =>
  36.         rdd.foreachPartition {
  37.           items =>
  38.             val adStatArray = new ArrayBuffer[AdStat]()
  39.             for ((keycount<- items) {
  40.               // 切割数据
  41.               val keySplit = key.split("_")
  42.               // 取出数据
  43.               val date = keySplit(0)
  44.               val province = keySplit(1)
  45.               val city = keySplit(2)
  46.               val adid = keySplit(3).toLong
  47.               // 封装数据,并放入 ArrayBuffer 中
  48.               adStatArray += AdStat(date, province, city, adid, count)
  49.             }
  50.             AdStatDAO.updateBatch(adStatArray.toArray)
  51.         }
  52.     }
  53.   }

AdStat 类

  1. /**
  2.   * 广告实时统计
  3.   *
  4.   * @author
  5.   *
  6.   */
  7. case class AdStat(dateString,
  8.                   province: String,
  9.                   city: String,
  10.                   adid: Long,
  11.                   clickCount: Long)

5.8.8 需求八实现思路整理

5.9 需求九:天天每一个省份 Top3 热门广告

5.9.1 需求解析

5.9.2 数据源解析

数据来源于需求八 updateStateByKey 获得的 DStream。

5.9.3 数据结构解析

Dstream[(dateKey_province_city_adid, count)]

5.9.4 需求实现简要流程

5.9.5 需求实现详细流程

5.9.6 MySQL 存储结构解析

  1. -- ----------------------------
  2. --  Table structure for `ad_province_top3`
  3. -- ----------------------------
  4. DROP TABLE IF EXISTS `ad_province_top3`; 
  5. CREATE TABLE `ad_province_top3` (
  6.   `date` varchar(30DEFAULT NULL,
  7.   `province` varchar(100DEFAULT NULL,
  8.   `adid` int(11DEFAULT NULL,
  9.   `clickCount` int(11DEFAULT NULL
  10. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.9.7 代码解析

业务代码

  1.   /**
  2.     * 需求九:天天每一个省份 Top3 热门广告
  3.     *
  4.     * @param spark
  5.     * @param key2ProvinceCityCountDStream
  6.     */
  7.   def provinceTop3AdverStat(spark: SparkSession, key2ProvinceCityCountDStream: DStream[(String, Long)]) = {
  8.     // key2ProvinceCityCountDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(keycount)]
  9.     // keydate_province_city_adid
  10.     // 转换 key
  11.     // key2ProvinceCountDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(newKey, count)]
  12.     // newKey: date_province_adid
  13.     val key2ProvinceCountDStream = key2ProvinceCityCountDStream.map {
  14.       case (keycount=>
  15.         val keySplit = key.split("_")
  16.         val date = keySplit(0)
  17.         val province = keySplit(1)
  18.         val adid = keySplit(3)
  19.         val newKey = date + "_" + province + "_" + adid // 组合 key
  20.         (newKey, count)
  21.     }
  22.     // 聚合新 key
  23.     val key2ProvinceAggrCountDStream = key2ProvinceCountDStream.reduceByKey(_ + _)
  24.     val top3DStream = key2ProvinceAggrCountDStream.transform {
  25.       rdd =>
  26.         // rdd: RDD[(newKey, count)]  newKey: date_province_adid
  27.         // 转化为新的 RDD
  28.         val basicDataRDD = rdd.map {
  29.           case (keycount=>
  30.             val keySplit = key.split("_")
  31.             val date = keySplit(0)
  32.             val province = keySplit(1)
  33.             val adid = keySplit(2).toLong
  34.             (date, province, adid, count)
  35.         }
  36.         // 将 RDD 转化为 DF
  37.         import spark.implicits._
  38.         basicDataRDD.toDF("date""province""adid""count").createOrReplaceTempView("temp_basic_info")
  39.         // 使用 Spark SQL 执行 SQL 语句,配合开窗函数,统计出各省份 top3 热门的广告
  40.         val sql = "select date, province, adid, count from (" +
  41.           "select date, province, adid, count, " +
  42.           "row_number() over(partition by date, province order by count desc) row_number from temp_basic_info) t " +
  43.           "where row_number <= 3"
  44.         spark.sql(sql).rdd
  45.     }
  46.     top3DStream.foreachRDD {
  47.       // rdd: RDD[Row]
  48.       rdd =>
  49.         rdd.foreachPartition {
  50.           // items: Row
  51.           items =>
  52.             val top3Array = new ArrayBuffer[AdProvinceTop3]()
  53.             for (item <- items) {
  54.               // 取出数据
  55.               val date = item.getAs[String]("date")
  56.               val province = item.getAs[String]("province")
  57.               val adid = item.getAs[Long]("adid")
  58.               val count = item.getAs[Long]("count")
  59.               // 封装数据
  60.               top3Array += AdProvinceTop3(date, province, adid, count)
  61.             }
  62.             // 写入 MySQL 数据库中
  63.             AdProvinceTop3DAO.updateBatch(top3Array.toArray)
  64.         }
  65.     }
  66.   }

AdProvinceTop3 类

  1. /**
  2.   * 各省 top3 热门广告
  3.   *
  4.   * @author
  5.   *
  6.   */
  7. case class AdProvinceTop3(dateString,
  8.                           province: String,
  9.                           adid: Long,
  10.                           clickCount: Long)

5.9.8 需求九实现思路整理

5.10 需求十:最近一小时广告点击量实时统计

5.10.1 需求解析

  统计各广告最近 1 小时内的点击量趋势:各广告最近 1 小时内各分钟的点击量。

5.10.2 数据源解析

5.10.3 数据结构解析

5.10.4 需求实现简要流程

5.10.5 需求实现详细流程

5.10.6 MySQL 存储结构解析

  1. -- ----------------------------
  2. --  Table structure for `ad_click_trend`
  3. -- ----------------------------
  4. DROP TABLE IF EXISTS `ad_click_trend`; 
  5. CREATE TABLE `ad_click_trend` (
  6.   `datevarchar(30DEFAULT NULL,
  7.   `hourvarchar(30DEFAULT NULL,
  8.   `minutevarchar(30DEFAULT NULL,
  9.   `adid` int(11DEFAULT NULL,
  10.   `clickCount` int(11DEFAULT NULL
  11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.10.7 代码解析

业务代码

  1.   /**
  2.     * 需求十:计算最近 1 小时滑动窗口内的广告点击趋势
  3.     *
  4.     * @param adRealTimeFilterDStream
  5.     * @return
  6.     */
  7.   def recentHourAdverClickStat(adRealTimeFilterDStream: DStream[String]) = {
  8.     val key2TimeMinuteDStream = adRealTimeFilterDStream.map {
  9.       // consumerRecordRDD: timestamp province city userid adid
  10.       case consumerRecordRDD =>
  11.         val consumerRecordSplit = consumerRecordRDD.split(" ")
  12.         val timestamp = consumerRecordSplit(0).toLong
  13.         // timeMinute: yyyyMMddHHmm
  14.         val timeMinute = DateUtils.formatTimeMinute(new Date(timestamp))
  15.         val adid = consumerRecordSplit(4).toLong
  16.         val key = timeMinute + "_" + adid // 组合 key
  17.         (key1L)
  18.     }
  19.     // 第一个 Minutes 表示 窗口大小,第二个 Minutes 表示 窗口滑动步长
  20.     val key2WindowDStream = key2TimeMinuteDStream.reduceByKeyAndWindow((a: Long, b: Long) => (a + b), Minutes(60), Minutes(1))
  21.     key2WindowDStream.foreachRDD {
  22.       rdd => rdd.foreachPartition {
  23.         // items: (keycount)
  24.         items =>
  25.           val trendArray = ArrayBuffer[AdClickTrend]()
  26.           for ((keycount<- items) {
  27.             val keySplit = key.split("_")
  28.             // 切割数据
  29.             // timeMinute: yyyyMMddHHmm
  30.             val timeMinute = keySplit(0)
  31.             // 获取数据
  32.             val date = timeMinute.substring(08)   // 包头不包尾,注意是索引
  33.             val hour = timeMinute.substring(810)  // 包头不包尾,注意是索引
  34.             val minute = timeMinute.substring(10)   // 包头不包尾,注意是索引
  35.             val adid = keySplit(1).toLong
  36.             // 封装数据
  37.             trendArray += AdClickTrend(date, hour, minute, adid, count)
  38.           }
  39.           // 写入 MySQL 数据库中
  40.           AdClickTrendDAO.updateBatch(trendArray.toArray)
  41.       }
  42.     }
  43.   }

AdClickTrend 类

  1. /**
  2.   * 广告点击趋势
  3.   *
  4.   * @author
  5.   *
  6.   */
  7. case class AdClickTrend(dateString,
  8.                         hour: String,
  9.                         minute: String,
  10.                         adid: Long,
  11.                         clickCount: Long)

5.10.8 需求7、8、9、十实现思路整理

第6章 项目总结

  本项目经过 Spark 技术生态栈中的 Spark Core、Spark SQL 和 Spark Streaming三个技术框架,实现了对电商平台业务的离线和实时数据统计与分析,完成了包括用户访问 session 分析、页面单跳转化率统计、热门商品离线统计、广告流量实时统计 4 个业务模块的开发工做。
  本项目涵盖了 Spark Core、Spark SQL 和 Spark Streaming 三个技术框架中核心的知识点与技术点,对于同窗们真正的理解和掌握 Spark 技术生态栈有着良好的促进做用。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/813514
推荐阅读
相关标签
  

闽ICP备14008679号