赞
踩
首先我们需要搭建大数据环境,详情可参考下文
https://blog.csdn.net/kilig_CSM/article/details/131275449?spm=1001.2014.3001.5502
根据提供的五个数据集,可到此处查看:
链接:https://pan.baidu.com/s/1WCtHzlyyWqoGdoTn0DhV_A?pwd=bokt
提取码:bokt
我们可以对每个数据集进行分析如下.
terminal_no:客户地址编号
phone_no:客户编号
sm_name:品牌名称
run_name:状态名称
sm_code:品牌编号
owner_name:客户等级名称
owner_code:客户等级编号
run_time:状态变更时间
addressoj:完整地址
estate_name:街道或小区地址
force:宽带是否生效
open_time:开户时间
该表包含用户的基本信息,如客户地址编号、客户编号、品牌名称、状态名称、品牌编号、客户等级名称、客户等级编号、状态变更时间、完整地址、街道或小区地址、宽带是否生效以及开户时间。
run_name:状态名称
run_time:更改状态时间
owner_code:客户等级编号
owner_name:客户等级名称
sm_name:品牌名称
open_time:开户时间
phone_no:客户编号
owner_name:客户等级名称
这个表记录了用户的状态变更信息,包括状态名称、更改状态时间、客户等级编号、客户等级名称、品牌名称、开户时间和客户编号。
fee_code:费用类型
phone_no:客户编号
owner_code:客户等级编号
owner_name:客户等级名称
sm_name:品牌名
year_month:账单时间
terminal_no:用户地址编号
favour_fee:优惠金额(+ 代表优惠,- 代表额外费用)
should_pay:应收金额,单位:元
该表存储了用户的账单信息,包括费用类型、客户编号、客户等级编号、客户等级名称、品牌名称、账单时间、用户地址编号、优惠金额、应收金额。
phone_no:用户编号
owner_name:客户等级名称
optdate:产品订购状态更新时间
Prodname:订购产品名称
sm_name:用户品牌名称
offerid:订购套餐编号
offername:订购套餐名称
business_name:订购业务状态
owner_code:客户等级
prodprcid:订购产品名称(带价格)的编号
prodprcname:订购产品名称(带价格)
effdate:产品生效时间
expdate:产品失效时间
orderdate:产品订购时间
cost:订购产品价格
mode_time:产品标识,辅助标识电视主、附销售品
prodstatus:订购产品状态
run_name:状态名
orderno:订单编号
这个表记录了用户的订单信息,包括用户编号、客户等级名称、产品订购状态更新时间、订购产品名称、用户品牌名称、订购套餐编号、订购套餐名称、订购业务状态、客户等级、订购产品名称(带价格)的编号、订购产品名称(带价格)、产品生效时间、产品失效时间、产品订购时间、订购产品价格、产品标识、订购产品状态、状态名和订单编号。
terminal_no:用户地址编号
phone_no:用户编号
duration:观看时长,单位:毫秒
station_name:直播频道名称
origin_time:观看行为开始时间
end_time:观看行为结束时间
owner_code:客户等级
owner_name:客户等级名称
vod_cat_tags:VOD节目包相关信息(嵌套对象),按不同的节目包目录组织
resolution:点播节目的清晰度
audio_lang:点播节目的语言类别
region:节目地区信息
res_name:设备名称
res_type:媒体节目类型,0 是直播,1 是点播或回看
vod_title:VOD节目名称
category_name:节目所属分类
program_title:直播节目名称
sm_name:用户品牌名称
该表记录了用户的收视行为信息,包括用户地址编号、用户编号、观看时长、直播频道名称、观看行为开始时间、观看行为结束时间、客户等级、客户等级名称、VOD节目包相关信息、点播节目的清晰度、点播节目的语言类别、节目地区信息、设备名称、媒体节目类型、VOD节目名称、节目所属分类和用户品牌名称。
使用Hive支持来访问Hive中的表数据。通过循环遍历表名,并调用Analyse()函数对每个表进行分析,统计记录数和空值记录数,并打印输出。
package code.anaylse import org.apache.spark.sql.SparkSession object BasicAnaylse { val spark=SparkSession.builder().appName("BasicAnalyse") .master("local[*]") .enableHiveSupport() .getOrCreate() // spark.sparkContext.setLogLevel("WARN") def main(args: Array[String]): Unit = { //探索每个表中的重复记录表和空值记录数 val tableName = Array("media_index","mediamatch_userevent","mediamatch_usermsg","mmconsume_billevents","order_index") var i = ""; for(i<-tableName){ Analyse(i) } // val mediamatch_userevent = spark.table("user_project.mediamatch_userevent") // mediamatch_userevent.show(false) } def Analyse(tableName:String): Unit ={ val data = spark.table("user_project."+tableName) print(tableName+"表数据:"+data.count()) print(tableName+"表phone_no字段为空数:"+(data.count()-data.select("phone_no").na.drop().count)) } }
导入所需的库和模块:导入org.apache.spark.sql.SparkSession库,这是使用Spark进行数据分析的核心库。
创建SparkSession:使用SparkSession.builder()创建一个SparkSession实例。在builder()方法中,设置应用程序的名称为"BasicAnalyse",设置Spark的master节点为本地模式(.master(“local[*]”)),使用所有可用的本地线程进行运算。并启用Hive支持(.enableHiveSupport()),以便通过Hive访问数据。最后,通过.getOrCreate()获取或创建SparkSession实例。
设置日志级别:使用spark.sparkContext.setLogLevel(“WARN”)将Spark的日志级别设置为"WARN",以减少日志输出。
创建main()函数:主要的代码逻辑将放置在main()函数中。
定义一个字符串数组tableName,包含要分析的表的名称,例如"media_index"、"mediamatch_userevent"等。
使用for循环遍历tableName数组中的每个表名。
在循环内部,调用Analyse()函数,并将当前表名作为参数传递给它。
可以选择性地注释掉一行代码,使用spark.table()和show()方法显示指定表的内容。
创建Analyse()函数:该函数用于对指定表进行分析。
在函数参数中接收表名tableName。
用spark.table(“user_project.”+tableName)获取指定表的数据,前缀"user_project."用于指定数据库名称。
使用count()方法获取表中的记录数,并通过print()函数打印出来。
使用data.select(“phone_no”).na.drop().count选择"phone_no"字段,并使用na.drop()方法去除该字段中的空值,再使用count()方法获取非空值记录的数量。计算空值记录数,并通过print()函数打印出来。
在main()函数的最后,执行代码逻辑并运行应用程序。
println(tableName + "表数据:" + data.count())
println(tableName + "表phone_no字段为空数:" + (data.count() - data.select("phone_no").na.drop().count))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。