赞
踩
谨以此博客作为记录
小编这里用的版本是:
<hadoop.version>2.7.7</hadoop.version>
<spark.version>2.4.5</spark.version>
<scala.version>2.12.10</scala.version>
如果没用到Hadoop可以忽略
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
object readData {
def main(args: Array[String]): Unit = {
//SparkConf是用来配置Spark应用程序的参数的类。在这个代码中,
//setAppName("ReadData")设置了应用程序的名称为"ReadData",
//而setMaster("local[*]")指定了Spark应用程序的运行模式为本地模式,使用所有可用的CPU核心
val conf = new SparkConf().setAppName("ReadData").setMaster("local[*]")
//初始化一个SparkContext对象。SparkContext是与Spark集群通信的主要入口点
val sc = new SparkContext(conf)
//创建了一个SparkSession对象,它是Spark 2.0引入的API,提供了一种统一的入口点来访问Spark功能
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
//创建数据库连接
var prop=new java.util.Properties()
prop.put("user","root") //MySQL用户名
prop.put("password","123456") //MySQL密码
prop.put("driver","com.mysql.jdbc.Driver") //MySQL驱动
val url="jdbc:mysql://localhost:3306/login" MySQL URL,端口号后面是数据库名(login)
//如果数据在HDFS集群上,就可以将路径替换为HDFS集群上的路径
//val hdfsPath = "hdfs://IP:9000/data/0007/CharacterLogin";
//路径
val path="\\2015-6-12\\0001\\0007\\CharacterLogin"
//wholeTextFiles: 是SparkContext对象的一个方法,用于读取整个文本文件
//返回的RDD: 其中键是文件路径,值是文件内容。
val rdd1 = sc.wholeTextFiles(path)
// 打印文件路径进行调试
// rdd1.foreach { t =>
// val filePath = t._1
// println("File Path: " + filePath)
// }
//从文件名中截取日期并写入文件
//rdd2返回的是alllines拼接之后的数据
val rdd2 = rdd1.flatMap(t=>{
val filePath = t._1
val alllines = t._2
//使用了 substring 方法来截取文件路径中从最后一个 / 符号(包括)之后到 .txt 扩展名(不包括)之前的部分
val dateStr=filePath.substring(filePath.lastIndexOf("/")+1, filePath.lastIndexOf(".txt"))
alllines.split("\r\n").map(line=>dateStr+"|"+line)
})
//对数据进行拆分,提取相关信息
// 这里主要统计人均登陆次数(DEC)==总的登录次数除以用户数
// LoginStatus==0表示登录
val res = rdd2.map(line => line.split('|')).map(arr => (arr(6), arr(11),arr(0))).toDF("userID", "LoginStatus","loginDate").filter("LoginStatus==0")
//临时表创建
res.createTempView("LoginCount")
//查询
val res1=spark.sql("select loginDate,count(LoginStatus)/count(distinct userID) as loginNum from LoginCount where LoginStatus=0 group by loginDate").sort("loginDate")
//写入mysql数据库
res1.write.mode("append").jdbc(url,"logincount",prop)
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。