赞
踩
三个离线需求,一个实时需求。
涉及三张hive表,动作表、用户表、物品表
动作表详细说明:
导入项目到idea。E:\DOCUMENT\尚硅谷电商分析项目\初始化项目\commerce
运行MockDataGenerate
类的main方法,生成数据到spark自带的hive中。
创建一个Session子模块,导入依赖:
<dependencies> <dependency> <groupId>com.atguigu</groupId> <artifactId>commons</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> <!-- Spark的依赖引入 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> </dependency> <!-- 引入Scala --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <!-- scala-maven-plugin插件用于在任意的maven项目中对scala代码进行编译/测试/运行/文档化 --> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>com.atguigu.session.UserVisitSessionAnalyze</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build>
基本概念:
访问时长:session的最早时间与最晚时间之差。
访问步长:session中的action个数。
数据流程:
一个用户表的session_id,对应动作表中的多条数据。
思路分析:先按照session_id进行分组,在每一个分组中,进行遍历:在for循环外面,定义一个startTime和endTime,每次循环进行比较。
def main(args: Array[String]): Unit = { // 获取筛选条件 val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS) // 获取筛选条件对应的JsonObject val taskParam = JSONObject.fromObject(jsonStr) // 创建全局唯一的主键 val taskUUID = UUID.randomUUID().toString // 创建sparkConf val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]") // 创建sparkSession(包含sparkContext) val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() // 获取原始的动作表数据 // actionRDD: RDD[UserVisitAction] val actionRDD = getOriActionRDD(sparkSession, taskParam) // sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)] val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item)) // session2GroupActionRDD: RDD[(sessionId, iterable_UserVisitAction)] val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() session2GroupActionRDD.cache() session2GroupActionRDD.foreach(println(_)) } def getOriActionRDD(sparkSession: SparkSession, taskParam: JSONObject) = { val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE) val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE) val sql = "select * from user_visit_action where date>='" + startDate + "' and date<='" + endDate + "'" import sparkSession.implicits._ sparkSession.sql(sql).as[UserVisitAction].rdd }
结果文件:
(4b37a0ea33324b3aa78d790453b88e18,
CompactBuffer(
UserVisitAction(2020-05-10,36,4b37a0ea33324b3aa78d790453b88e18,4,2020-05-10 14:39:13,null,-1,-1,null,null,15,51,3),
UserVisitAction(2020-05-10,36,4b37a0ea33324b3aa78d790453b88e18,0,2020-05-10 14:58:22,卫生纸,-1,-1,null,null,null,null,9),
UserVisitAction(2020-05-10,36,4b37a0ea33324b3aa78d790453b88e18,8,2020-05-10 14:08:55,华为手机,-1,-1,null,null,null,null,3),
UserVisitAction(2020-05-10,36,4b37a0ea33324b3aa78d790453b88e18,4,2020-05-10 14:40:22,null,31,76,null,null,null,null,9),
UserVisitAction(2020-05-10,36,4b37a0ea33324b3aa78d790453b88e18,4,2020-05-10 14:40:48,null,-1,-1,82,81,null,null,9),
UserVisitAction(2020-05-10,36,4b37a0ea33324b3aa78d790453b88e18,0,2020-05-10 14:32:18,null,-1,-1,25,84,null,null,8),
UserVisitAction(2020-05-10,36,4b37a0ea33324b3aa78d790453b88e18,0,2020-05-10 14:50:22,null,-1,-1,null,null,62,26,5)
)
)
def main(args: Array[String]): Unit = { // 获取筛选条件 val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS) // 获取筛选条件对应的JsonObject val taskParam = JSONObject.fromObject(jsonStr) // 创建全局唯一的主键 val taskUUID = UUID.randomUUID().toString // 创建sparkConf val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]") // 创建sparkSession(包含sparkContext) val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() // 获取原始的动作表数据 // actionRDD: RDD[UserVisitAction] val actionRDD = getOriActionRDD(sparkSession, taskParam) // sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)] val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item)) // session2GroupActionRDD: RDD[(sessionId, iterable_UserVisitAction)] val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() session2GroupActionRDD.cache() val userId2AggrInfoRDD = getSessionFullInfo(sparkSession, session2GroupActionRDD) userId2AggrInfoRDD.foreach(println(_)) } def getSessionFullInfo(sparkSession: SparkSession, session2GroupActionRDD: RDD[(String, Iterable[UserVisitAction])]) = { // userId2AggrInfoRDD: RDD[(userId, aggrInfo)] val userId2AggrInfoRDD = session2GroupActionRDD.map{ case (sessionId, iterableAction) => var userId = -1L var startTime:Date = null var endTime:Date = null var stepLength = 0 val searchKeywords = new StringBuffer("") val clickCategories = new StringBuffer("") for(action <- iterableAction){ if(userId == -1L){ userId = action.user_id } val actionTime = DateUtils.parseTime(action.action_time) if(startTime == null || startTime.after(actionTime)){ startTime = actionTime } if(endTime == null || endTime.before(actionTime)){ endTime = actionTime } val searchKeyword = action.search_keyword if(StringUtils.isNotEmpty(searchKeyword) && !searchKeywords.toString.contains(searchKeyword)){ searchKeywords.append(searchKeyword + ",") } val clickCategoryId = action.click_category_id if(clickCategoryId != -1 && !clickCategories.toString.contains(clickCategoryId)){ clickCategories.append(clickCategoryId + ",") } stepLength += 1 } // searchKeywords.toString.substring(0, searchKeywords.toString.length) val searchKw = StringUtils.trimComma(searchKeywords.toString) val clickCg = StringUtils.trimComma(clickCategories.toString) val visitLength = (endTime.getTime - startTime.getTime) / 1000 val aggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionId + "|" + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKw + "|" + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCg + "|" + Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" + Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" + Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime) (userId, aggrInfo) } userId2AggrInfoRDD }
结果:
(16,sessionid=242c9a3e86b64652b28be009e4e77364|searchKeywords=苹果,机器学习,洗面奶,华为手机,保温杯,小龙虾,吸尘器,联想笔记本,卫生纸|clickCategoryIds=58,46,15,33,79,34,77,1|visitLength=3370|stepLength=44|startTime=2020-05-10 18:01:44)
(78,sessionid=984a108a374143fcb416c57ed91d22a4|searchKeywords=卫生纸,吸尘器,机器学习,洗面奶,小龙虾,联想笔记本,华为手机,Lamer,保温杯,苹果|clickCategoryIds=86,33,84,49,91,11,1,9,84|visitLength=3138|stepLength=54|startTime=2020-05-10 12:01:30)
(91,sessionid=970f3c77a5bc4967983b736f945822ae|searchKeywords=小龙虾,华为手机,Lamer,联想笔记本,苹果,卫生纸,洗面奶,机器学习|clickCategoryIds=73,20,71,35,61,18,60,19,2,64,34,98,10,83,17,13,59,22,73,78,87,68|visitLength=3446|stepLength=99|startTime=2020-05-10 03:00:48)
在getSessionFullInfo方法中添加联立操作
def getSessionFullInfo(sparkSession: SparkSession, session2GroupActionRDD: RDD[(String, Iterable[UserVisitAction])]) = { // userId2AggrInfoRDD: RDD[(userId, aggrInfo)] val userId2AggrInfoRDD = session2GroupActionRDD.map{ case (sessionId, iterableAction) => var userId = -1L var startTime:Date = null var endTime:Date = null var stepLength = 0 val searchKeywords = new StringBuffer("") val clickCategories = new StringBuffer("") for(action <- iterableAction){ if(userId == -1L){ userId = action.user_id } val actionTime = DateUtils.parseTime(action.action_time) if(startTime == null || startTime.after(actionTime)){ startTime = actionTime } if(endTime == null || endTime.before(actionTime)){ endTime = actionTime } val searchKeyword = action.search_keyword if(StringUtils.isNotEmpty(searchKeyword) && !searchKeywords.toString.contains(searchKeyword)){ searchKeywords.append(searchKeyword + ",") } val clickCategoryId = action.click_category_id if(clickCategoryId != -1 && !clickCategories.toString.contains(clickCategoryId)){ clickCategories.append(clickCategoryId + ",") } stepLength += 1 } // searchKeywords.toString.substring(0, searchKeywords.toString.length) val searchKw = StringUtils.trimComma(searchKeywords.toString) val clickCg = StringUtils.trimComma(clickCategories.toString) val visitLength = (endTime.getTime - startTime.getTime) / 1000 val aggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionId + "|" + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKw + "|" + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCg + "|" + Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" + Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" + Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime) (userId, aggrInfo) } val sql = "select * from user_info" import sparkSession.implicits._ // userId2InfoRDD:RDD[(userId, UserInfo)] val userId2InfoRDD = sparkSession.sql(sql).as[UserInfo].rdd.map(item => (item.user_id, item)) val sessionId2FullInfoRDD = userId2AggrInfoRDD.join(userId2InfoRDD).map{ case (userId, (aggrInfo, userInfo)) => val age = userInfo.age val professional = userInfo.professional val sex = userInfo.sex val city = userInfo.city val fullInfo = aggrInfo + "|" + Constants.FIELD_AGE + "=" + age + "|" + Constants.FIELD_PROFESSIONAL + "=" + professional + "|" + Constants.FIELD_SEX + "=" + sex + "|" + Constants.FIELD_CITY + "=" + city val sessionId = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_SESSION_ID) (sessionId, fullInfo) } sessionId2FullInfoRDD }
在main方法改为:
val sessionId2FullInfoRDD = getSessionFullInfo(sparkSession, session2GroupActionRDD)
sessionId2FullInfoRDD.foreach(println(_))
结果:
(f16e7282452440538799f9a61455d5d6,sessionid=f16e7282452440538799f9a61455d5d6|searchKeywords=苹果,保温杯,Lamer,吸尘器,联想笔记本,机器学习,小龙虾,华为手机|clickCategoryIds=50,1,35,47,54,72,54,36,0,67,9,47,21,93,2,36,62,37,94|visitLength=3172|stepLength=59|startTime=2020-05-10 22:05:16|age=29|professional=professional28|sex=male|city=city16)
(871c5ffdeabc43939bfd7f292d6f94bb,sessionid=871c5ffdeabc43939bfd7f292d6f94bb|searchKeywords=Lamer,洗面奶,机器学习,吸尘器,苹果,小龙虾,卫生纸,华为手机,联想笔记本|clickCategoryIds=42,14,18,5,43,91,65,76,14,87|visitLength=3504|stepLength=64|startTime=2020-05-10 22:00:34|age=36|professional=professional30|sex=female|city=city32)
(01ec51b1e98d4616a3bfd9c24df235a5,sessionid=01ec51b1e98d4616a3bfd9c24df235a5|searchKeywords=卫生纸,保温杯,Lamer,华为手机,小龙虾|clickCategoryIds=58,70,94,4,72,77,95,31|visitLength=3314|stepLength=35|startTime=2020-05-10 19:00:36|age=29|professional=professional28|sex=male|city=city16)
def getSessionFilteredRDD(taskParam: JSONObject, sessionId2FullInfoRDD: RDD[(String, String)]) = { val startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE) val endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE) val professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS) val cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES) val sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX) val keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS) val categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS) var filterInfo = (if(startAge != null) Constants.PARAM_START_AGE + "=" + startAge + "|" else "") + (if (endAge != null) Constants.PARAM_END_AGE + "=" + endAge + "|" else "") + (if (professionals != null) Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" else "") + (if (cities != null) Constants.PARAM_CITIES + "=" + cities + "|" else "") + (if (sex != null) Constants.PARAM_SEX + "=" + sex + "|" else "") + (if (keywords != null) Constants.PARAM_KEYWORDS + "=" + keywords + "|" else "") + (if (categoryIds != null) Constants.PARAM_CATEGORY_IDS + "=" + categoryIds else "") if(filterInfo.endsWith("\\|")) filterInfo = filterInfo.substring(0, filterInfo.length - 1) sessionId2FullInfoRDD.filter{ case (sessionId, fullInfo) => var success = true if(!ValidUtils.between(fullInfo, Constants.FIELD_AGE, filterInfo, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)){ success = false }else if(!ValidUtils.in(fullInfo, Constants.FIELD_PROFESSIONAL, filterInfo, Constants.PARAM_PROFESSIONALS)){ success = false }else if(!ValidUtils.in(fullInfo, Constants.FIELD_CITY, filterInfo, Constants.PARAM_CITIES)){ success = false }else if(!ValidUtils.equal(fullInfo, Constants.FIELD_SEX, filterInfo, Constants.PARAM_SEX)){ success = false }else if(!ValidUtils.in(fullInfo, Constants.FIELD_SEARCH_KEYWORDS, filterInfo, Constants.PARAM_KEYWORDS)){ success = false }else if(!ValidUtils.in(fullInfo, Constants.FIELD_CLICK_CATEGORY_IDS, filterInfo, Constants.PARAM_CATEGORY_IDS)){ success = false } if(success){ // acc.add() } success } }
修改main方法:
val sessionId2FilterRDD = getSessionFilteredRDD(taskParam, sessionId2FullInfoRDD)
sessionId2FilterRDD.foreach(println(_))
累加器:
维护了一个map结构,如果累加器中已经有key,就进行加1,如果没有key,就添加一个新的key。
import org.apache.spark.util.AccumulatorV2 import scala.collection.mutable class SessionAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]]{ val countMap = new mutable.HashMap[String, Int]() override def isZero: Boolean = { countMap.isEmpty } override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = { val acc = new SessionAccumulator acc.countMap ++= this.countMap acc } override def reset(): Unit = { countMap.clear() } override def add(v: String): Unit = { if(!this.countMap.contains(v)) this.countMap += (v -> 0) this.countMap.update(v, countMap(v) + 1) } override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = { other match { // (0 /: (1 to 100))(_+_) // (0 /: (1 to 100)){case (int1, !) => int1 + int2} // (1 /: 100).foldLeft(0) // (this.countMap /: acc.countMap) case acc:SessionAccumulator => acc.countMap.foldLeft(this.countMap){ case (map, (k,v)) => map += (k -> (map.getOrElse(k, 0) + v)) } } } override def value: mutable.HashMap[String, Int] = { this.countMap } }
acc.countMap.foldLeft(this.countMap)
上面代码解释:this.countMap和acc.countMap中的每个(k,v)做操作。
在getSessionFilteredRDD添加:
if(success){
sessionAccumulator.add(Constants.SESSION_COUNT)
val visitLength = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_VISIT_LENGTH).toLong
val stepLength = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_STEP_LENGTH).toLong
calculateVisitLength(visitLength, sessionAccumulator)
calculateStepLength(stepLength, sessionAccumulator)
}
success
def calculateVisitLength(visitLength: Long, sessionStatisticAccumulator: SessionAccumulator) = { if(visitLength >= 1 && visitLength <= 3){ sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1s_3s) }else if(visitLength >=4 && visitLength <= 6){ sessionStatisticAccumulator.add(Constants.TIME_PERIOD_4s_6s) }else if (visitLength >= 7 && visitLength <= 9) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_7s_9s) } else if (visitLength >= 10 && visitLength <= 30) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10s_30s) } else if (visitLength > 30 && visitLength <= 60) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30s_60s) } else if (visitLength > 60 && visitLength <= 180) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1m_3m) } else if (visitLength > 180 && visitLength <= 600) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_3m_10m) } else if (visitLength > 600 && visitLength <= 1800) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10m_30m) } else if (visitLength > 1800) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30m) } } def calculateStepLength(stepLength: Long, sessionStatisticAccumulator: SessionAccumulator) = { if(stepLength >=1 && stepLength <=3){ sessionStatisticAccumulator.add(Constants.STEP_PERIOD_1_3) }else if (stepLength >= 4 && stepLength <= 6) { sessionStatisticAccumulator.add(Constants.STEP_PERIOD_4_6) } else if (stepLength >= 7 && stepLength <= 9) { sessionStatisticAccumulator.add(Constants.STEP_PERIOD_7_9) } else if (stepLength >= 10 && stepLength <= 30) { sessionStatisticAccumulator.add(Constants.STEP_PERIOD_10_30) } else if (stepLength > 30 && stepLength <= 60) { sessionStatisticAccumulator.add(Constants.STEP_PERIOD_30_60) } else if (stepLength > 60) { sessionStatisticAccumulator.add(Constants.STEP_PERIOD_60) } }
在main方法中添加:
val sessionAccumulator = new SessionAccumulator
sparkSession.sparkContext.register(sessionAccumulator)
// sessionId2FilterRDD: RDD[(sessionId, fullInfo)] 是所有符合过滤条件的数据组成的RDD
// getSessionFilteredRDD: 实现根据限制条件对session数据进行过滤,并完成累加器的更新
val sessionId2FilterRDD = getSessionFilteredRDD(taskParam, sessionId2FullInfoRDD, sessionAccumulator)
sessionId2FilterRDD.foreach(println(_))
/** * 聚合统计表 * * @param taskid 当前计算批次的ID * @param session_count 所有Session的总和 * @param visit_length_1s_3s_ratio 1-3sSession访问时长占比 * @param visit_length_4s_6s_ratio 4-6sSession访问时长占比 * @param visit_length_7s_9s_ratio 7-9sSession访问时长占比 * @param visit_length_10s_30s_ratio 10-30sSession访问时长占比 * @param visit_length_30s_60s_ratio 30-60sSession访问时长占比 * @param visit_length_1m_3m_ratio 1-3mSession访问时长占比 * @param visit_length_3m_10m_ratio 3-10mSession访问时长占比 * @param visit_length_10m_30m_ratio 10-30mSession访问时长占比 * @param visit_length_30m_ratio 30mSession访问时长占比 * @param step_length_1_3_ratio 1-3步长占比 * @param step_length_4_6_ratio 4-6步长占比 * @param step_length_7_9_ratio 7-9步长占比 * @param step_length_10_30_ratio 10-30步长占比 * @param step_length_30_60_ratio 30-60步长占比 * @param step_length_60_ratio 大于60步长占比 */ case class SessionAggrStat(taskid: String, session_count: Long, visit_length_1s_3s_ratio: Double, visit_length_4s_6s_ratio: Double, visit_length_7s_9s_ratio: Double, visit_length_10s_30s_ratio: Double, visit_length_30s_60s_ratio: Double, visit_length_1m_3m_ratio: Double, visit_length_3m_10m_ratio: Double, visit_length_10m_30m_ratio: Double, visit_length_30m_ratio: Double, step_length_1_3_ratio: Double, step_length_4_6_ratio: Double, step_length_7_9_ratio: Double, step_length_10_30_ratio: Double, step_length_30_60_ratio: Double, step_length_60_ratio: Double ) /** * Session随机抽取表 * * @param taskid 当前计算批次的ID * @param sessionid 抽取的Session的ID * @param startTime Session的开始时间 * @param searchKeywords Session的查询字段 * @param clickCategoryIds Session点击的类别id集合 */ case class SessionRandomExtract(taskid:String, sessionid:String, startTime:String, searchKeywords:String, clickCategoryIds:String) /** * Session随机抽取详细表 * * @param taskid 当前计算批次的ID * @param userid 用户的ID * @param sessionid Session的ID * @param pageid 某个页面的ID * @param actionTime 点击行为的时间点 * @param searchKeyword 用户搜索的关键词 * @param clickCategoryId 某一个商品品类的ID * @param clickProductId 某一个商品的ID * @param orderCategoryIds 一次订单中所有品类的ID集合 * @param orderProductIds 一次订单中所有商品的ID集合 * @param payCategoryIds 一次支付中所有品类的ID集合 * @param payProductIds 一次支付中所有商品的ID集合 **/ case class SessionDetail(taskid:String, userid:Long, sessionid:String, pageid:Long, actionTime:String, searchKeyword:String, clickCategoryId:Long, clickProductId:Long, orderCategoryIds:String, orderProductIds:String, payCategoryIds:String, payProductIds:String) /** * 品类Top10表 * @param taskid * @param categoryid * @param clickCount * @param orderCount * @param payCount */ case class Top10Category(taskid:String, categoryid:Long, clickCount:Long, orderCount:Long, payCount:Long) /** * Top10 Session * @param taskid * @param categoryid * @param sessionid * @param clickCount */ case class Top10Session(taskid:String, categoryid:Long, sessionid:String, clickCount:Long)
def getSessionRatio(sparkSession: SparkSession, taskUUID: String, value: mutable.HashMap[String, Int]): Unit = { val session_count = value.getOrElse(Constants.SESSION_COUNT, 1).toDouble val visit_length_1s_3s = value.getOrElse(Constants.TIME_PERIOD_1s_3s, 0) val visit_length_4s_6s = value.getOrElse(Constants.TIME_PERIOD_4s_6s, 0) val visit_length_7s_9s = value.getOrElse(Constants.TIME_PERIOD_7s_9s, 0) val visit_length_10s_30s = value.getOrElse(Constants.TIME_PERIOD_10s_30s, 0) val visit_length_30s_60s = value.getOrElse(Constants.TIME_PERIOD_30s_60s, 0) val visit_length_1m_3m = value.getOrElse(Constants.TIME_PERIOD_1m_3m, 0) val visit_length_3m_10m = value.getOrElse(Constants.TIME_PERIOD_3m_10m, 0) val visit_length_10m_30m = value.getOrElse(Constants.TIME_PERIOD_10m_30m, 0) val visit_length_30m = value.getOrElse(Constants.TIME_PERIOD_30m, 0) val step_length_1_3 = value.getOrElse(Constants.STEP_PERIOD_1_3, 0) val step_length_4_6 = value.getOrElse(Constants.STEP_PERIOD_4_6, 0) val step_length_7_9 = value.getOrElse(Constants.STEP_PERIOD_7_9, 0) val step_length_10_30 = value.getOrElse(Constants.STEP_PERIOD_10_30, 0) val step_length_30_60 = value.getOrElse(Constants.STEP_PERIOD_30_60, 0) val step_length_60 = value.getOrElse(Constants.STEP_PERIOD_60, 0) val visit_length_1s_3s_ratio = NumberUtils.formatDouble(visit_length_1s_3s / session_count, 2) val visit_length_4s_6s_ratio = NumberUtils.formatDouble(visit_length_4s_6s / session_count, 2) val visit_length_7s_9s_ratio = NumberUtils.formatDouble(visit_length_7s_9s / session_count, 2) val visit_length_10s_30s_ratio = NumberUtils.formatDouble(visit_length_10s_30s / session_count, 2) val visit_length_30s_60s_ratio = NumberUtils.formatDouble(visit_length_30s_60s / session_count, 2) val visit_length_1m_3m_ratio = NumberUtils.formatDouble(visit_length_1m_3m / session_count, 2) val visit_length_3m_10m_ratio = NumberUtils.formatDouble(visit_length_3m_10m / session_count, 2) val visit_length_10m_30m_ratio = NumberUtils.formatDouble(visit_length_10m_30m / session_count, 2) val visit_length_30m_ratio = NumberUtils.formatDouble(visit_length_30m / session_count, 2) val step_length_1_3_ratio = NumberUtils.formatDouble(step_length_1_3 / session_count, 2) val step_length_4_6_ratio = NumberUtils.formatDouble(step_length_4_6 / session_count, 2) val step_length_7_9_ratio = NumberUtils.formatDouble(step_length_7_9 / session_count, 2) val step_length_10_30_ratio = NumberUtils.formatDouble(step_length_10_30 / session_count, 2) val step_length_30_60_ratio = NumberUtils.formatDouble(step_length_30_60 / session_count, 2) val step_length_60_ratio = NumberUtils.formatDouble(step_length_60 / session_count, 2) val stat = SessionAggrStat(taskUUID, session_count.toInt, visit_length_1s_3s_ratio, visit_length_4s_6s_ratio, visit_length_7s_9s_ratio, visit_length_10s_30s_ratio, visit_length_30s_60s_ratio, visit_length_1m_3m_ratio, visit_length_3m_10m_ratio, visit_length_10m_30m_ratio, visit_length_30m_ratio, step_length_1_3_ratio, step_length_4_6_ratio, step_length_7_9_ratio, step_length_10_30_ratio, step_length_30_60_ratio, step_length_60_ratio) val sessionRatioRDD = sparkSession.sparkContext.makeRDD(Array(stat)) sessionRatioRDD.foreach(println(_)) import sparkSession.implicits._ sessionRatioRDD.toDF().write .format("jdbc") .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)) .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER)) .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)) .option("dbtable", "session_stat_ratio_0416") .mode(SaveMode.Append) .save() }
在main方法中:
getSessionRatio(sparkSession, taskUUID, sessionAccumulator.value)
执行完成后mysql数据库中的结果:
package com.atguigu.session.test import java.util.{Date, UUID} import SessionStat.{calculateStepLength, calculateVisitLength, getSessionFilteredRDD, getSessionRatio} import com.atguigu.commons.conf.ConfigurationManager import com.atguigu.commons.constant.Constants import com.atguigu.commons.model.{UserInfo, UserVisitAction} import com.atguigu.commons.utils._ import net.sf.json.JSONObject import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SaveMode, SparkSession} import scala.collection.mutable /** * ClassName:SessionStat * Package:com.atguigu.session.test * Desciption: * * @date:2020 /5/10 20:07 * @author:17611219021 @sina.cn */ object SessionStat { def main(args: Array[String]): Unit = { // 获取筛选条件 val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS) // 获取筛选条件对应的JsonObject val taskParam = JSONObject.fromObject(jsonStr) // 创建全局唯一的主键 val taskUUID = UUID.randomUUID().toString // 创建sparkConf val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]") // 创建sparkSession(包含sparkContext) val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() // 获取原始的动作表数据 // actionRDD: RDD[UserVisitAction] val actionRDD = getOriActionRDD(sparkSession, taskParam) // sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)] val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item)) // session2GroupActionRDD: RDD[(sessionId, iterable_UserVisitAction)] val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() session2GroupActionRDD.cache() val sessionId2FullInfoRDD = getSessionFullInfo(sparkSession, session2GroupActionRDD) val sessionAccumulator = new SessionAccumulator sparkSession.sparkContext.register(sessionAccumulator) // sessionId2FilterRDD: RDD[(sessionId, fullInfo)] 是所有符合过滤条件的数据组成的RDD // getSessionFilteredRDD: 实现根据限制条件对session数据进行过滤,并完成累加器的更新 val sessionId2FilterRDD = getSessionFilteredRDD(taskParam, sessionId2FullInfoRDD, sessionAccumulator) sessionId2FilterRDD.foreach(println(_)) getSessionRatio(sparkSession, taskUUID, sessionAccumulator.value) } def getSessionRatio(sparkSession: SparkSession, taskUUID: String, value: mutable.HashMap[String, Int]): Unit = { val session_count = value.getOrElse(Constants.SESSION_COUNT, 1).toDouble val visit_length_1s_3s = value.getOrElse(Constants.TIME_PERIOD_1s_3s, 0) val visit_length_4s_6s = value.getOrElse(Constants.TIME_PERIOD_4s_6s, 0) val visit_length_7s_9s = value.getOrElse(Constants.TIME_PERIOD_7s_9s, 0) val visit_length_10s_30s = value.getOrElse(Constants.TIME_PERIOD_10s_30s, 0) val visit_length_30s_60s = value.getOrElse(Constants.TIME_PERIOD_30s_60s, 0) val visit_length_1m_3m = value.getOrElse(Constants.TIME_PERIOD_1m_3m, 0) val visit_length_3m_10m = value.getOrElse(Constants.TIME_PERIOD_3m_10m, 0) val visit_length_10m_30m = value.getOrElse(Constants.TIME_PERIOD_10m_30m, 0) val visit_length_30m = value.getOrElse(Constants.TIME_PERIOD_30m, 0) val step_length_1_3 = value.getOrElse(Constants.STEP_PERIOD_1_3, 0) val step_length_4_6 = value.getOrElse(Constants.STEP_PERIOD_4_6, 0) val step_length_7_9 = value.getOrElse(Constants.STEP_PERIOD_7_9, 0) val step_length_10_30 = value.getOrElse(Constants.STEP_PERIOD_10_30, 0) val step_length_30_60 = value.getOrElse(Constants.STEP_PERIOD_30_60, 0) val step_length_60 = value.getOrElse(Constants.STEP_PERIOD_60, 0) val visit_length_1s_3s_ratio = NumberUtils.formatDouble(visit_length_1s_3s / session_count, 2) val visit_length_4s_6s_ratio = NumberUtils.formatDouble(visit_length_4s_6s / session_count, 2) val visit_length_7s_9s_ratio = NumberUtils.formatDouble(visit_length_7s_9s / session_count, 2) val visit_length_10s_30s_ratio = NumberUtils.formatDouble(visit_length_10s_30s / session_count, 2) val visit_length_30s_60s_ratio = NumberUtils.formatDouble(visit_length_30s_60s / session_count, 2) val visit_length_1m_3m_ratio = NumberUtils.formatDouble(visit_length_1m_3m / session_count, 2) val visit_length_3m_10m_ratio = NumberUtils.formatDouble(visit_length_3m_10m / session_count, 2) val visit_length_10m_30m_ratio = NumberUtils.formatDouble(visit_length_10m_30m / session_count, 2) val visit_length_30m_ratio = NumberUtils.formatDouble(visit_length_30m / session_count, 2) val step_length_1_3_ratio = NumberUtils.formatDouble(step_length_1_3 / session_count, 2) val step_length_4_6_ratio = NumberUtils.formatDouble(step_length_4_6 / session_count, 2) val step_length_7_9_ratio = NumberUtils.formatDouble(step_length_7_9 / session_count, 2) val step_length_10_30_ratio = NumberUtils.formatDouble(step_length_10_30 / session_count, 2) val step_length_30_60_ratio = NumberUtils.formatDouble(step_length_30_60 / session_count, 2) val step_length_60_ratio = NumberUtils.formatDouble(step_length_60 / session_count, 2) val stat = SessionAggrStat(taskUUID, session_count.toInt, visit_length_1s_3s_ratio, visit_length_4s_6s_ratio, visit_length_7s_9s_ratio, visit_length_10s_30s_ratio, visit_length_30s_60s_ratio, visit_length_1m_3m_ratio, visit_length_3m_10m_ratio, visit_length_10m_30m_ratio, visit_length_30m_ratio, step_length_1_3_ratio, step_length_4_6_ratio, step_length_7_9_ratio, step_length_10_30_ratio, step_length_30_60_ratio, step_length_60_ratio) val sessionRatioRDD = sparkSession.sparkContext.makeRDD(Array(stat)) sessionRatioRDD.foreach(println(_)) import sparkSession.implicits._ sessionRatioRDD.toDF().write .format("jdbc") .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)) .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER)) .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)) .option("dbtable", "session_stat_ratio_0416") .mode(SaveMode.Append) .save() } def calculateVisitLength(visitLength: Long, sessionStatisticAccumulator: SessionAccumulator) = { if(visitLength >= 1 && visitLength <= 3){ sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1s_3s) }else if(visitLength >=4 && visitLength <= 6){ sessionStatisticAccumulator.add(Constants.TIME_PERIOD_4s_6s) }else if (visitLength >= 7 && visitLength <= 9) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_7s_9s) } else if (visitLength >= 10 && visitLength <= 30) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10s_30s) } else if (visitLength > 30 && visitLength <= 60) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30s_60s) } else if (visitLength > 60 && visitLength <= 180) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1m_3m) } else if (visitLength > 180 && visitLength <= 600) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_3m_10m) } else if (visitLength > 600 && visitLength <= 1800) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10m_30m) } else if (visitLength > 1800) { sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30m) } } def calculateStepLength(stepLength: Long, sessionStatisticAccumulator: SessionAccumulator) = { if(stepLength >=1 && stepLength <=3){ sessionStatisticAccumulator.add(Constants.STEP_PERIOD_1_3) }else if (stepLength >= 4 && stepLength <= 6) { sessionStatisticAccumulator.add(Constants.STEP_PERIOD_4_6) } else if (stepLength >= 7 && stepLength <= 9) { sessionStatisticAccumulator.add(Constants.STEP_PERIOD_7_9) } else if (stepLength >= 10 && stepLength <= 30) { sessionStatisticAccumulator.add(Constants.STEP_PERIOD_10_30) } else if (stepLength > 30 && stepLength <= 60) { sessionStatisticAccumulator.add(Constants.STEP_PERIOD_30_60) } else if (stepLength > 60) { sessionStatisticAccumulator.add(Constants.STEP_PERIOD_60) } } def getSessionFilteredRDD(taskParam: JSONObject, sessionId2FullInfoRDD: RDD[(String, String)],sessionAccumulator: SessionAccumulator) = { val startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE) val endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE) val professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS) val cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES) val sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX) val keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS) val categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS) var filterInfo = (if(startAge != null) Constants.PARAM_START_AGE + "=" + startAge + "|" else "") + (if (endAge != null) Constants.PARAM_END_AGE + "=" + endAge + "|" else "") + (if (professionals != null) Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" else "") + (if (cities != null) Constants.PARAM_CITIES + "=" + cities + "|" else "") + (if (sex != null) Constants.PARAM_SEX + "=" + sex + "|" else "") + (if (keywords != null) Constants.PARAM_KEYWORDS + "=" + keywords + "|" else "") + (if (categoryIds != null) Constants.PARAM_CATEGORY_IDS + "=" + categoryIds else "") if(filterInfo.endsWith("\\|")) filterInfo = filterInfo.substring(0, filterInfo.length - 1) sessionId2FullInfoRDD.filter{ case (sessionId, fullInfo) => var success = true if(!ValidUtils.between(fullInfo, Constants.FIELD_AGE, filterInfo, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)){ success = false }else if(!ValidUtils.in(fullInfo, Constants.FIELD_PROFESSIONAL, filterInfo, Constants.PARAM_PROFESSIONALS)){ success = false }else if(!ValidUtils.in(fullInfo, Constants.FIELD_CITY, filterInfo, Constants.PARAM_CITIES)){ success = false }else if(!ValidUtils.equal(fullInfo, Constants.FIELD_SEX, filterInfo, Constants.PARAM_SEX)){ success = false }else if(!ValidUtils.in(fullInfo, Constants.FIELD_SEARCH_KEYWORDS, filterInfo, Constants.PARAM_KEYWORDS)){ success = false }else if(!ValidUtils.in(fullInfo, Constants.FIELD_CLICK_CATEGORY_IDS, filterInfo, Constants.PARAM_CATEGORY_IDS)){ success = false } if(success){ if(success){ sessionAccumulator.add(Constants.SESSION_COUNT) val visitLength = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_VISIT_LENGTH).toLong val stepLength = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_STEP_LENGTH).toLong calculateVisitLength(visitLength, sessionAccumulator) calculateStepLength(stepLength, sessionAccumulator) } } success } } def getSessionFullInfo(sparkSession: SparkSession, session2GroupActionRDD: RDD[(String, Iterable[UserVisitAction])]) = { // userId2AggrInfoRDD: RDD[(userId, aggrInfo)] val userId2AggrInfoRDD = session2GroupActionRDD.map{ case (sessionId, iterableAction) => var userId = -1L var startTime:Date = null var endTime:Date = null var stepLength = 0 val searchKeywords = new StringBuffer("") val clickCategories = new StringBuffer("") for(action <- iterableAction){ if(userId == -1L){ userId = action.user_id } val actionTime = DateUtils.parseTime(action.action_time) if(startTime == null || startTime.after(actionTime)){ startTime = actionTime } if(endTime == null || endTime.before(actionTime)){ endTime = actionTime } val searchKeyword = action.search_keyword if(StringUtils.isNotEmpty(searchKeyword) && !searchKeywords.toString.contains(searchKeyword)){ searchKeywords.append(searchKeyword + ",") } val clickCategoryId = action.click_category_id if(clickCategoryId != -1 && !clickCategories.toString.contains(clickCategoryId)){ clickCategories.append(clickCategoryId + ",") } stepLength += 1 } // searchKeywords.toString.substring(0, searchKeywords.toString.length) val searchKw = StringUtils.trimComma(searchKeywords.toString) val clickCg = StringUtils.trimComma(clickCategories.toString) val visitLength = (endTime.getTime - startTime.getTime) / 1000 val aggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionId + "|" + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKw + "|" + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCg + "|" + Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" + Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" + Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime) (userId, aggrInfo) } val sql = "select * from user_info" import sparkSession.implicits._ // userId2InfoRDD:RDD[(userId, UserInfo)] val userId2InfoRDD = sparkSession.sql(sql).as[UserInfo].rdd.map(item => (item.user_id, item)) val sessionId2FullInfoRDD = userId2AggrInfoRDD.join(userId2InfoRDD).map{ case (userId, (aggrInfo, userInfo)) => val age = userInfo.age val professional = userInfo.professional val sex = userInfo.sex val city = userInfo.city val fullInfo = aggrInfo + "|" + Constants.FIELD_AGE + "=" + age + "|" + Constants.FIELD_PROFESSIONAL + "=" + professional + "|" + Constants.FIELD_SEX + "=" + sex + "|" + Constants.FIELD_CITY + "=" + city val sessionId = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_SESSION_ID) (sessionId, fullInfo) } sessionId2FullInfoRDD } def getOriActionRDD(sparkSession: SparkSession, taskParam: JSONObject) = { val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE) val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE) val sql = "select * from user_visit_action where date>='" + startDate + "' and date<='" + endDate + "'" import sparkSession.implicits._ sparkSession.sql(sql).as[UserVisitAction].rdd } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。