赞
踩
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame。
通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
package com.fgm.sparksql import java.util.Properties import org.apache.spark.sql.SparkSession /** *通过sparksql读取mysql表中的数据 * * @Auther: fgm */ object DataFromMysql { def main(args: Array[String]): Unit = { //创建对象 val spark = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate() //通过sparkSession对象加载mysql中的数据 val url="jdbc:mysql://localhost:3306/spark" //定义表名 val table="test" //properties val properties=new Properties() properties.setProperty("user","root") properties.setProperty("password","123") val jdbc = spark.read.jdbc(url,table,properties) jdbc.printSchema() jdbc.show() jdbc.createTempView("test") spark.sql("select * from test").show() spark.stop() } }
package com.fgm.sparksql import java.util.Properties import org.apache.spark.sql.SparkSession /** *通过sparksql把结果数据写入到mysql表 * @Auther: fgm */ case class User(val id:Int,val name:String,val age:Int) object DataToMysql { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("DataToMysql").master("local[2]").getOrCreate() val sc = spark.sparkContext sc.setLogLevel("WARN") //读取数据文件 val RDD1 = sc.textFile("D:\\tmp\\user.txt").map(_.split(" ")) //将RDD与样例类关联 val userRDD = RDD1.map(x=>User(x(0).toInt,x(1),x(2).toInt)) //构建DataFrame import spark.implicits._ val df = userRDD.toDF() df.printSchema() df.show() df.createTempView("user") val result = spark.sql("select * from user where age >30") //定义表名 val table="user" //将结果写入到mysql //定义数据库url val url="jdbc:mysql://localhost:3306/spark" //properties val properties=new Properties() properties.setProperty("user","root") properties.setProperty("password","123") result.write.mode("append").jdbc(url,table,properties) //再将数据库中的数据读取出来,检查是否写入成功,也可以进行其他相关操作 //val jdbc=spark.read.jdbc(url,table,properties) //jdbc.show() spark.stop() } }
D:\tmp\user.txt
1 zhangsan 20
2 lisi 29
3 wangwu 33
4 zhaoliu 30
5 hahaha 44
未注释读取的代码时,数据如下:
并且查看数据库发现,新建的user表中已经有了数据。
注意:以上代码,都可以打成jar包之后在集群中运行。参数(如:文件url,以及table等,)可以通过args(0)等方式传入,不要写死在代码里。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。