当前位置:   article > 正文

Spark案例之流量统计(三种方法)_spark rdd 实战 分区并排序 流量

spark rdd 实战 分区并排序 流量

数据集

1,2020-02-18 14:20:30,2020-02-18 14:46:30,20
1,2020-02-18 14:47:20,2020-02-18 15:20:30,30
1,2020-02-18 15:37:23,2020-02-18 16:05:26,40
1,2020-02-18 16:06:27,2020-02-18 17:20:49,50
1,2020-02-18 17:21:50,2020-02-18 18:03:27,60
2,2020-02-18 14:18:24,2020-02-18 15:01:40,20
2,2020-02-18 15:20:49,2020-02-18 15:30:24,30
2,2020-02-18 16:01:23,2020-02-18 16:40:32,40
2,2020-02-18 16:44:56,2020-02-18 17:40:52,50
3,2020-02-18 14:39:58,2020-02-18 15:35:53,20
3,2020-02-18 15:36:39,2020-02-18 15:24:54,30
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

需求:统计每个用户各时间段(相隔小于10分钟的两端上网时间需要合并在一起)所用的总流量

方法1 使用RDD算子


import java.text.SimpleDateFormat

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SPARK_BRANCH, SparkConf, SparkContext}

import scala.collection.mutable

/**
 * @Auther Zhang
 * @Date 2020/8/14
 */
object FloatCount2 {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd = sc.textFile("data/data.csv")

    //处理数据
    val splited: RDD[((String, Long, Long, Long), Null)] = rdd.mapPartitions(it => {
      //因为要处理日期格式的数据,所以需要new SimpleDateFormat
      val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      it.map(line => {
        val sp = line.split(",")
        val uid = sp(0)
        val startTime = sp(1)
        val endTime = sp(2)
        val flow = sp(3).toLong
        //将字符串格式的日期转换为Date格式的日期
        val startTimeStamp = sdf.parse(startTime).getTime
        val endTimeStamp = sdf.parse(endTime).getTime
        ((uid, startTimeStamp, endTimeStamp, flow), null)
      })
    })

    //计算用户id,返回一个数组
    val uids: Array[String] = splited.map(_._1._1).distinct().collect()

    implicit val sorter = Ordering[Long].on[(String, Long, Long, Long)](t => t._2)

    //搞一个自定义分区器,按uid分区
    //repartitionAndSortWithPartitions 不仅可以自定义分区,还能排序
    val partitioned: RDD[((String, Long, Long, Long), Null)] = splited.repartitionAndSortWithinPartitions(new UidAndPartitioner(uids))

    val grouped: RDD[((String, Int), Iterable[(Long, Long, Long)])] = partitioned.mapPartitions(it => {
      //这里要计算两端时间是否是在相隔没有超过10分钟,
      // 其实就是要用下一条数据的startTime 减去上一条的endTime,经过转换后看是否大于10分钟
      // 要处理第一条时间,同时也需要个中间值(用来存放每次上一条的endTime),所以需要定义一个temp(long类型)
      //flag是用来标记是否大于10分钟的,若大于10分钟,标记1,没有则是0
      //最后将flag累计,前面所有例至当前列这样的加,会得到不同的结果,结果相同的就是可以划定为一个时间段(没超过10分钟)的多条数据
      var temp = 0L
      var flag = 0
      it.map(x => {
        //获取起始时间
        val startTimeStamp = x._1._2
        //获取结束时间
        val endTimeStamp = x._1._3

        if (temp != 0) {

          if ((startTimeStamp - temp) / 1000 / 60 > 10) {
            flag += 1
          } else {
            flag += 0
          }

        }
        //当temp == 0时,上面的if语句没有被执行,也就是处理第一条数据时,直接将第一条数据的endtime赋值给了temp
        //然后处理第二条数据时,判断temp!=0的条件成立,所以用startTime-temp,也就是用第二条数据的startTime-第一条的endTime,完美
        //处理完第二条后,temp又重新赋值,将第二条的endTime赋值给自己,不停的循环往复
        temp = endTimeStamp
        ((x._1._1, flag), (x._1._2, x._1._3, x._1._4))
      })
    }).groupByKey()


    //处理最后结果,这里用了mapValues
    val result: RDD[(String, Long, Long, Long)] = grouped.mapValues(it => {
      //先将迭代器转换为List,然后按照时间排序
      val tuples: List[(Long, Long, Long)] = it.toList.sortBy(_._1)
      //取出开始的时间
      val startTimeStamp = tuples.head._1
      //取出结束的时间
      val endTimeStamp = tuples.last._2
      //求出这段时间的所耗费的总流量
      val totalflow: Long = tuples.map(x => {
        x._3
      }).reduce(_ + _)
      (startTimeStamp, endTimeStamp, totalflow)
    }).map(x => {
      //重新map,不要flag那个标记,只保留uid,startTimeStamp,endTimeStamp,totalFlow
      (x._1._1, x._2._1, x._2._2, x._2._3)
    })

    val buffer = result.collect().toBuffer
    println(buffer)



  }

}

class UidAndPartitioner(uids: Array[String]) extends Partitioner {
  //在主构造器中定义分区规则(一个用户id一个分区)
  private val uidAndNum = new mutable.HashMap[String, Int]()
  var index = 0
  for (elem <- uids) {
    uidAndNum(elem) = index
    index += 1
  }

  //分区的个数
  override def numPartitions: Int = uids.length

  //获取分区id的方法
  override def getPartition(key: Any): Int = {
    val uid = key.asInstanceOf[(String, Long, Long, Long)]._1
    uidAndNum(uid)
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124

上述代码可以优化,就是在处理最后结果时,可以这么操作

 val res = uidAndFlag.reduceByKey((t1, t2) => {
      (Math.min(t1._1, t2._1), Math.max(t1._2, t2._2), t1._3 + t2._3)
    }).mapPartitions(it => {
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      it.map(t => {
        (t._1._1, dateFormat.format(new Date(t._2._1)), dateFormat.format(new Date(t._2._2)), t._2._3)
      })
    }).collect()

    println(res.toBuffer)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

使用reduceByKey算子提高效率,reduceByKey会 先局部使用传入的函数逻辑,再全局使用传入的逻辑,这里就是先局部先两两比较,判断哪个startTime最早Math.min(t1._1, t2._1,判断哪个endTime最晚Math.max(t1._2, t2._2),局部统计耗费的流量t1._3 + t2._3,然后再全局统计。

小结:
1.案例涉及到对时间格式数据的处理,需要SimpleDateFormat类帮助,比如字符串格式的时间和Date格式的时间相互转换以便计算
2.案例最难的部分就是判断两条数据是否间隔超过10分钟,这里的逻辑最难写(和判断连续n天做xxx有异曲同工之妙,连续n天的案例需要按照用户分组,并按照时间排序, 然后依次打上一个number做标记, 用时间减去这个number标记得到一个新的时间, 时间相同的就是连续的),这里的案例是运用了flag做标记,超过10分钟就标记1,没有就标记0,然后起始行到当前行的累加,得到结果相同的就是要划分到同一个时间段的数据。

方法2 使用SparkSql

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * @Auther Zhang
 * @Date 2020/8/19
 */
object SQLflow {


  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName(this.getClass.getCanonicalName)
      .master("local[*]")
      .getOrCreate()

    //spark读取csv数据
    val df: DataFrame = spark
      .read
      .option("inferSchema",true)
      .csv("data/data.csv")
      .toDF("uid","start_time","end_time","flow")


    df.createTempView("v_uid_flow")
    spark.sql(
      """
        SELECT
        | uid,
        | MIN(start_time) start_time,
        | MAX(end_time) end_time,
        | SUM(flow) total_flow
        |FROM
        | (SELECT
        |  uid,
        |  start_time,
        |  end_time,
        |  flow,
        |  SUM(flag) OVER(PARTITION BY uid ORDER BY start_time) cn
        | FROM
        |  (SELECT
        |   uid,
        |   start_time,
        |   end_time,
        |   flow,
        |   IF((UNIX_TIMESTAMP(start_time)- UNIX_TIMESTAMP(lag_time)) / 60 > 10 ,1,0) flag
        |  FROM
        |   (SELECT
        |    uid,
        |    start_time,
        |    end_time,
        |    flow,
        |    LAG(end_time,1,start_time) OVER(PARTITION BY uid ORDER BY start_time) lag_time
        |   FROM
        |    v_uid_flow) v1)v2)v3
        |GROUP BY uid,cn
        |""".stripMargin).show()

  }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

方法3 使用DSL


import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * @Auther Zhang
 * @Date 2020/8/19
 */
object DSLFlow {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName(this.getClass.getCanonicalName)
      .master("local[*]")
      .getOrCreate()

    //spark读取csv数据
    val df: DataFrame = spark
      .read
      .option("inferSchema", true)
      .csv("data/data.csv")
      .toDF("uid", "start_time", "end_time", "flow")

    import spark.implicits._
    import org.apache.spark.sql.functions._
    df.select(
      $"uid",
      $"start_time",
      $"end_time",
      $"flow",
      expr("lag(end_time,1,start_time)") over(Window.partitionBy($"uid").orderBy($"start_time")) as "lag_time"
    ).select(
      $"uid",
      $"start_time",
      $"end_time",
      $"flow",
      expr("IF((UNIX_TIMESTAMP(start_time)- UNIX_TIMESTAMP(lag_time)) / 60 > 10 ,1,0) flag")
    ).select(
      $"uid",
      $"start_time",
      $"end_time",
      $"flow",
      sum($"flag") over(Window.partitionBy($"uid").orderBy($"start_time")) as "fid"
    ).groupBy(
      $"uid",
      $"fid"
    ).agg(
      min($"start_time") as "start_time",
      max($"end_time") as "end_time",
      sum($"flow") as "total_flow"
    ).drop($"fid").show()
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/609799
推荐阅读
相关标签
  

闽ICP备14008679号