赞
踩
需求分析
样例数据
import java.util.Properties import org.apache.commons.lang.StringUtils import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} /* * 数据清洗 * * */ object ETLDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("ETL") val sc = SparkContext.getOrCreate(conf) val spark = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ //1.加载数据 val rdd = sc.textFile("in/test.log") // println(rdd.count()) //2.过滤为8个字段的数据,并封装为Row类型 /* * event_time、url、method、status、sip、user_uip、action_prepend、action_client * * */ val rowRDD = rdd.map(_.split("\t")).filter(x=>x.length==8).map(x=>Row(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7))) //创建Schema val logs_schema = StructType( Array( StructField("event_time",StringType), StructField("url",StringType), StructField("method",StringType), StructField("status",StringType), StructField("sip",StringType), StructField("user_uip",StringType), StructField("action_prepend",StringType), StructField("action_client",StringType) ) ) //创建DataFrame val logDF = spark.createDataFrame(rowRDD,logs_schema) } }
//继续清洗 val filterLogs = logDF.dropDuplicates("event_time", "url") //按照第一列和第二列对数据进行去重 .filter(x => x(3) == "200") //过滤掉状态码非200 .filter(x => StringUtils.isNotEmpty(x(0).toString)) //过滤掉event_time为空的数据 //将url按照”&”以及”=”切割 val full_logs_RDD: RDD[Row] = filterLogs.map(line => { val str = line.getAs[String]("url") //先获取到 url这一列数据 val paramArray = str.split("\\?") //按照 ? 进行切分 var paramMap: Map[String, String] = null //先定义 存储 A=B 这样的键值对的类型 if (paramArray.length == 2) { val strings = paramArray(1).split("&") //对 ?后面的字符串进行按 & 切分 paramMap = strings.map(x => x.split("=")).filter(x => x.length == 2) .map(x => (x(0), x(1))).toMap //按照 =切分后的数据放入 Map } ( //最好不要超过22个,如果非要那么多,建议使用元组中套元组 line.getAs[String]("event_time"), //再次获取数据 paramMap.getOrElse[String]("userUID", ""), paramMap.getOrElse[String]("userSID", ""), paramMap.getOrElse[String]("actionBegin", ""), paramMap.getOrElse[String]("actionEnd", ""), paramMap.getOrElse[String]("actionType", ""), paramMap.getOrElse[String]("actionName", ""), paramMap.getOrElse[String]("actionValue", ""), paramMap.getOrElse[String]("actionTest", ""), paramMap.getOrElse[String]("ifEquipment", ""), line.getAs[String]("method"), line.getAs[String]("status"), line.getAs[String]("sip"), line.getAs[String]("user_uip"), line.getAs[String]("action_prepend"), line.getAs[String]("action_client") ) }).toDF().rdd val full_logs_schema = StructType( //重新定义结构 Array( StructField("event_time",StringType), StructField("userUID",StringType), StructField("userSID",StringType), StructField("actionBegin",StringType), StructField("actionEnd",StringType), StructField("actionType",StringType), StructField("actionName",StringType), StructField("actionValue",StringType), StructField("actionTest",StringType), StructField("ifEquipment",StringType), StructField("method",StringType), StructField("status",StringType), StructField("sip",StringType), StructField("user_uip",StringType), StructField("action_prepend",StringType), StructField("action_client",StringType) ) ) val full_logDF = spark.createDataFrame(full_logs_RDD,full_logs_schema) //得到最终的数据
在执行写入数据程序之前,先创建好对应的数据库
进入到MySQL中
create database if not exists etl;
//连接Mysql,将数据存入其中
val properties = new Properties()
properties.setProperty("url",JdbcUtil.url) //这里可以直接写自己的配置信息,但是最好封装一下
properties.setProperty("user",JdbcUtil.user)
properties.setProperty("password",JdbcUtil.password)
properties.setProperty("driver",JdbcUtil.driver)
//写入Mysql
println("将filterlogs写入Mysql")
filterLogs.write.mode(SaveMode.Overwrite).jdbc(JdbcUtil.url,"access_logs",properties)
println("写入完成")
println("将full_logDF写入Mysql")
full_logDF.write.mode(SaveMode.Overwrite).jdbc(JdbcUtil.url,"full_access_logs",properties)
println("写入完成")
spark.stop()
如果将写入Mysql和读取Mysql的封装成一个方法
就可以直接调用方法
例如下面
记得导包
package etl.util import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import java.util.Properties object JdbcUtil { val url = "jdbc:mysql://192.168.XXX.100:3306/etl" val user = "root" val password = "root" val driver = "com.mysql.jdbc.Driver" val properties = new Properties() properties.setProperty("url",JdbcUtil.url) properties.setProperty("user",JdbcUtil.user) properties.setProperty("password",JdbcUtil.password) properties.setProperty("driver",JdbcUtil.driver) def getDataFrame(spark:SparkSession,tableName:String):DataFrame={ val frame = spark.read.jdbc(url,tableName,properties) frame } def dataFrameToMysql(df:DataFrame, tableName:String)={ df.write.mode(SaveMode.Overwrite).jdbc(url,tableName,properties) println("写入完成") } }
import java.text.SimpleDateFormat import java.util.Properties import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{SaveMode, SparkSession} import etl.util.JdbcUtil import org.apache.commons.lang.StringUtils import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.DoubleType /* * 留存率 * * */ object Retention { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("ETL") val sc = SparkContext.getOrCreate(conf) val spark = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ //连接Mysql,将数据存入其中:普通写法,后期可以封装 val properties = new Properties() properties.setProperty("url",JdbcUtil.url) properties.setProperty("user",JdbcUtil.user) properties.setProperty("password",JdbcUtil.password) properties.setProperty("driver",JdbcUtil.driver) val logs = spark.read.jdbc(JdbcUtil.url,"full_access_logs",properties) logs.cache() //拉取actionName为注册的信息 val registered = logs.filter($"actionName" === "Registered") .withColumnRenamed("event_time", "register_time") .select("userUID", "register_time") val signin = logs.filter($"actionName" === "Signin") .withColumnRenamed("event_time", "signin_time") .select("userUID", "signin_time") val joined = registered.join(signin,Seq("userUID"),"left") val sdf = new SimpleDateFormat("yyyy-MM-dd") //注册UDF,传入参数为时间:2018-09-04T20:34:45+08:00 //输出为 Long类型的数字 val datetrans = spark.udf.register("datetrans", (event_time: String) => { if (StringUtils.isEmpty(event_time)) 0 else sdf.parse(event_time.substring(0, 10)).getTime }) datetrans val joined2 = joined.withColumn("register_date",datetrans($"register_time")) .withColumn("signin_date",datetrans($"signin_time")) // joined2.show(false) //一天的毫秒数:8640,0000 val daysc = 86400000 val result = joined2.groupBy("register_date").agg(countDistinct("userUID").cast(DoubleType).as("num")) //当天新增用户总量 // result.show() // 1 天留存率 ---- 7 天留存率 改一下daysc,>= val result1 = joined2.filter($"register_date" + daysc === $"signin_date") .groupBy($"register_date").agg(countDistinct("userUID").as("num1")) //1 日留存量 // 355 381 val result_one_day = result.join(result1, "register_date") .select($"register_date", round($"num1" / $"num", 2).as("一日留存率")) //使用封装好的方法进行写入 // result_one_day.write.mode(SaveMode.Overwrite).jdbc(JdbcUtil.url,"",properties) } }
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession import etl.util.JdbcUtil import org.apache.spark.sql.functions._ /* * 活跃用户:买课或者看课的用户 * */ object Active { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("etl0") val spark = SparkSession.builder().config(conf).getOrCreate() val sc = SparkContext.getOrCreate(conf) import spark.implicits._ val logs = JdbcUtil.getDataFrame(spark,"full_access_logs") // logs.printSchema() val ds = logs.filter($"actionName"==="BuyCourse" || $"actionName"==="StartLearn") // ds.show(false) val ds2 = ds.map(x=>(x.getAs[String]("userUID"),x.getAs[String]("event_time").substring(0,10))) // ds2.show(false) ds2.withColumnRenamed("_2","日期").groupBy($"日期").agg(countDistinct($"_1").as("活跃人数")).show() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。