赞
踩
数据集
1,2020-02-18 14:20:30,2020-02-18 14:46:30,20
1,2020-02-18 14:47:20,2020-02-18 15:20:30,30
1,2020-02-18 15:37:23,2020-02-18 16:05:26,40
1,2020-02-18 16:06:27,2020-02-18 17:20:49,50
1,2020-02-18 17:21:50,2020-02-18 18:03:27,60
2,2020-02-18 14:18:24,2020-02-18 15:01:40,20
2,2020-02-18 15:20:49,2020-02-18 15:30:24,30
2,2020-02-18 16:01:23,2020-02-18 16:40:32,40
2,2020-02-18 16:44:56,2020-02-18 17:40:52,50
3,2020-02-18 14:39:58,2020-02-18 15:35:53,20
3,2020-02-18 15:36:39,2020-02-18 15:24:54,30
需求:统计每个用户各时间段(相隔小于10分钟的两端上网时间需要合并在一起)所用的总流量
import java.text.SimpleDateFormat import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SPARK_BRANCH, SparkConf, SparkContext} import scala.collection.mutable /** * @Auther Zhang * @Date 2020/8/14 */ object FloatCount2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]") val sc = new SparkContext(conf) val rdd = sc.textFile("data/data.csv") //处理数据 val splited: RDD[((String, Long, Long, Long), Null)] = rdd.mapPartitions(it => { //因为要处理日期格式的数据,所以需要new SimpleDateFormat val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") it.map(line => { val sp = line.split(",") val uid = sp(0) val startTime = sp(1) val endTime = sp(2) val flow = sp(3).toLong //将字符串格式的日期转换为Date格式的日期 val startTimeStamp = sdf.parse(startTime).getTime val endTimeStamp = sdf.parse(endTime).getTime ((uid, startTimeStamp, endTimeStamp, flow), null) }) }) //计算用户id,返回一个数组 val uids: Array[String] = splited.map(_._1._1).distinct().collect() implicit val sorter = Ordering[Long].on[(String, Long, Long, Long)](t => t._2) //搞一个自定义分区器,按uid分区 //repartitionAndSortWithPartitions 不仅可以自定义分区,还能排序 val partitioned: RDD[((String, Long, Long, Long), Null)] = splited.repartitionAndSortWithinPartitions(new UidAndPartitioner(uids)) val grouped: RDD[((String, Int), Iterable[(Long, Long, Long)])] = partitioned.mapPartitions(it => { //这里要计算两端时间是否是在相隔没有超过10分钟, // 其实就是要用下一条数据的startTime 减去上一条的endTime,经过转换后看是否大于10分钟 // 要处理第一条时间,同时也需要个中间值(用来存放每次上一条的endTime),所以需要定义一个temp(long类型) //flag是用来标记是否大于10分钟的,若大于10分钟,标记1,没有则是0 //最后将flag累计,前面所有例至当前列这样的加,会得到不同的结果,结果相同的就是可以划定为一个时间段(没超过10分钟)的多条数据 var temp = 0L var flag = 0 it.map(x => { //获取起始时间 val startTimeStamp = x._1._2 //获取结束时间 val endTimeStamp = x._1._3 if (temp != 0) { if ((startTimeStamp - temp) / 1000 / 60 > 10) { flag += 1 } else { flag += 0 } } //当temp == 0时,上面的if语句没有被执行,也就是处理第一条数据时,直接将第一条数据的endtime赋值给了temp //然后处理第二条数据时,判断temp!=0的条件成立,所以用startTime-temp,也就是用第二条数据的startTime-第一条的endTime,完美 //处理完第二条后,temp又重新赋值,将第二条的endTime赋值给自己,不停的循环往复 temp = endTimeStamp ((x._1._1, flag), (x._1._2, x._1._3, x._1._4)) }) }).groupByKey() //处理最后结果,这里用了mapValues val result: RDD[(String, Long, Long, Long)] = grouped.mapValues(it => { //先将迭代器转换为List,然后按照时间排序 val tuples: List[(Long, Long, Long)] = it.toList.sortBy(_._1) //取出开始的时间 val startTimeStamp = tuples.head._1 //取出结束的时间 val endTimeStamp = tuples.last._2 //求出这段时间的所耗费的总流量 val totalflow: Long = tuples.map(x => { x._3 }).reduce(_ + _) (startTimeStamp, endTimeStamp, totalflow) }).map(x => { //重新map,不要flag那个标记,只保留uid,startTimeStamp,endTimeStamp,totalFlow (x._1._1, x._2._1, x._2._2, x._2._3) }) val buffer = result.collect().toBuffer println(buffer) } } class UidAndPartitioner(uids: Array[String]) extends Partitioner { //在主构造器中定义分区规则(一个用户id一个分区) private val uidAndNum = new mutable.HashMap[String, Int]() var index = 0 for (elem <- uids) { uidAndNum(elem) = index index += 1 } //分区的个数 override def numPartitions: Int = uids.length //获取分区id的方法 override def getPartition(key: Any): Int = { val uid = key.asInstanceOf[(String, Long, Long, Long)]._1 uidAndNum(uid) } }
上述代码可以优化,就是在处理最后结果时,可以这么操作
val res = uidAndFlag.reduceByKey((t1, t2) => {
(Math.min(t1._1, t2._1), Math.max(t1._2, t2._2), t1._3 + t2._3)
}).mapPartitions(it => {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
it.map(t => {
(t._1._1, dateFormat.format(new Date(t._2._1)), dateFormat.format(new Date(t._2._2)), t._2._3)
})
}).collect()
println(res.toBuffer)
使用reduceByKey
算子提高效率,reduceByKey会 先局部使用传入的函数逻辑,再全局使用传入的逻辑,这里就是先局部先两两比较,判断哪个startTime最早Math.min(t1._1, t2._1
,判断哪个endTime最晚Math.max(t1._2, t2._2)
,局部统计耗费的流量t1._3 + t2._3
,然后再全局统计。
小结:
1.案例涉及到对时间格式数据的处理,需要SimpleDateFormat类帮助,比如字符串格式的时间和Date格式的时间相互转换以便计算
2.案例最难的部分就是判断两条数据是否间隔超过10分钟,这里的逻辑最难写(和判断连续n天做xxx有异曲同工之妙,连续n天的案例需要按照用户分组,并按照时间排序, 然后依次打上一个number做标记, 用时间减去这个number标记得到一个新的时间, 时间相同的就是连续的),这里的案例是运用了flag做标记,超过10分钟就标记1,没有就标记0,然后起始行到当前行的累加,得到结果相同的就是要划分到同一个时间段的数据。
import org.apache.spark.sql.{DataFrame, SparkSession} /** * @Auther Zhang * @Date 2020/8/19 */ object SQLflow { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName(this.getClass.getCanonicalName) .master("local[*]") .getOrCreate() //spark读取csv数据 val df: DataFrame = spark .read .option("inferSchema",true) .csv("data/data.csv") .toDF("uid","start_time","end_time","flow") df.createTempView("v_uid_flow") spark.sql( """ SELECT | uid, | MIN(start_time) start_time, | MAX(end_time) end_time, | SUM(flow) total_flow |FROM | (SELECT | uid, | start_time, | end_time, | flow, | SUM(flag) OVER(PARTITION BY uid ORDER BY start_time) cn | FROM | (SELECT | uid, | start_time, | end_time, | flow, | IF((UNIX_TIMESTAMP(start_time)- UNIX_TIMESTAMP(lag_time)) / 60 > 10 ,1,0) flag | FROM | (SELECT | uid, | start_time, | end_time, | flow, | LAG(end_time,1,start_time) OVER(PARTITION BY uid ORDER BY start_time) lag_time | FROM | v_uid_flow) v1)v2)v3 |GROUP BY uid,cn |""".stripMargin).show() } }
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.{DataFrame, SparkSession} /** * @Auther Zhang * @Date 2020/8/19 */ object DSLFlow { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName(this.getClass.getCanonicalName) .master("local[*]") .getOrCreate() //spark读取csv数据 val df: DataFrame = spark .read .option("inferSchema", true) .csv("data/data.csv") .toDF("uid", "start_time", "end_time", "flow") import spark.implicits._ import org.apache.spark.sql.functions._ df.select( $"uid", $"start_time", $"end_time", $"flow", expr("lag(end_time,1,start_time)") over(Window.partitionBy($"uid").orderBy($"start_time")) as "lag_time" ).select( $"uid", $"start_time", $"end_time", $"flow", expr("IF((UNIX_TIMESTAMP(start_time)- UNIX_TIMESTAMP(lag_time)) / 60 > 10 ,1,0) flag") ).select( $"uid", $"start_time", $"end_time", $"flow", sum($"flag") over(Window.partitionBy($"uid").orderBy($"start_time")) as "fid" ).groupBy( $"uid", $"fid" ).agg( min($"start_time") as "start_time", max($"end_time") as "end_time", sum($"flow") as "total_flow" ).drop($"fid").show() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。