当前位置:   article > 正文

scala实现通过Spark统计人均登录次数最终写入MySQL

scala实现通过Spark统计人均登录次数最终写入MySQL

谨以此博客作为记录
小编这里用的版本是:
<hadoop.version>2.7.7</hadoop.version>
<spark.version>2.4.5</spark.version>
<scala.version>2.12.10</scala.version>
如果没用到Hadoop可以忽略

步骤

  1. 准备数据,知道每个文件什么内容,每一列什么意思
    我这里对应文件夹下,是按日期作为文件名存储的txt文件数据
    在这里插入图片描述
  2. 数据可放在本地,也可放在HDFS上,数据在本地代码完全是可以跑通的;但是小编想模拟数据存在集群(HDFS)上的情况,条件有限,DataNode总是dead,搞得小编也很sad(押上了hhh)
  3. 编写代码
  • ①首先创建SparkContext和SparkSession对象;
  • ②引入implicit隐式转换包,方便后面将RDD转换为DataFrame;
  • ③创建数据库连接,方便后面将结果写入数据库;
  • ④从文件名中截取日期并写入文件,后面分析数据需要;
  • ⑤利用RDD的算子将数据进行拆分、切割、整理,并转换为DataFrame;
  • ⑥创建临时表,便于用SQL语句对表进行查询;
  • ⑦将查询结果写入数据库;

代码

import org.apache.spark.sql.{SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

object readData {
  def main(args: Array[String]): Unit = {
  	//SparkConf是用来配置Spark应用程序的参数的类。在这个代码中,
  	//setAppName("ReadData")设置了应用程序的名称为"ReadData",
  	//而setMaster("local[*]")指定了Spark应用程序的运行模式为本地模式,使用所有可用的CPU核心
    val conf = new SparkConf().setAppName("ReadData").setMaster("local[*]")
    //初始化一个SparkContext对象。SparkContext是与Spark集群通信的主要入口点
    val sc = new SparkContext(conf)
    //创建了一个SparkSession对象,它是Spark 2.0引入的API,提供了一种统一的入口点来访问Spark功能
    val spark = SparkSession.builder().getOrCreate()
    import spark.implicits._
    //创建数据库连接
    var prop=new java.util.Properties()
    prop.put("user","root") //MySQL用户名
    prop.put("password","123456")  //MySQL密码
    prop.put("driver","com.mysql.jdbc.Driver")  //MySQL驱动
    val url="jdbc:mysql://localhost:3306/login"  MySQL URL,端口号后面是数据库名(login)
    //如果数据在HDFS集群上,就可以将路径替换为HDFS集群上的路径
    //val hdfsPath = "hdfs://IP:9000/data/0007/CharacterLogin";
    //路径
    val path="\\2015-6-12\\0001\\0007\\CharacterLogin"
    
    //wholeTextFiles: 是SparkContext对象的一个方法,用于读取整个文本文件
    //返回的RDD: 其中键是文件路径,值是文件内容。
    val rdd1 = sc.wholeTextFiles(path)
//    打印文件路径进行调试
//    rdd1.foreach { t =>
//      val filePath = t._1
//      println("File Path: " + filePath)
//    }
	//从文件名中截取日期并写入文件
    //rdd2返回的是alllines拼接之后的数据
    val rdd2 = rdd1.flatMap(t=>{
      val filePath = t._1
      val alllines = t._2
      //使用了 substring 方法来截取文件路径中从最后一个 / 符号(包括)之后到 .txt 扩展名(不包括)之前的部分
      val dateStr=filePath.substring(filePath.lastIndexOf("/")+1, filePath.lastIndexOf(".txt"))
      alllines.split("\r\n").map(line=>dateStr+"|"+line)
    })

    //对数据进行拆分,提取相关信息
    // 这里主要统计人均登陆次数(DEC)==总的登录次数除以用户数
    // LoginStatus==0表示登录
    val res = rdd2.map(line => line.split('|')).map(arr => (arr(6), arr(11),arr(0))).toDF("userID", "LoginStatus","loginDate").filter("LoginStatus==0")
    //临时表创建
    res.createTempView("LoginCount")
    //查询
    val res1=spark.sql("select loginDate,count(LoginStatus)/count(distinct userID) as loginNum from LoginCount where LoginStatus=0 group by loginDate").sort("loginDate")
    //写入mysql数据库
    res1.write.mode("append").jdbc(url,"logincount",prop)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/389330
推荐阅读
相关标签
  

闽ICP备14008679号