赞
踩
Spark SQL可以与多种数据源交互,如普通文本、json、parquet、csv、MySQL等
1.写入不同数据源
2.读取不同数据源
package cn.itcast.sql import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object WriterDataSourceDemo { case class Person(id:Int,name:String,age:Int) def main(args: Array[String]): Unit = { //1.创建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL") .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.读取文件 val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt)) //3.将RDD转成DF //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换 import spark.implicits._ //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息 //所以SparkSQL可以通过反射自动获取到并添加给DF val personDF: DataFrame = rowRDD.toDF //==================将DF写入到不同数据源=================== //Text data source supports only a single column, and you have 3 columns.; //personDF.write.text("D:\\data\\output\\text") personDF.write.json("D:\\data\\output\\json") personDF.write.csv("D:\\data\\output\\csv") personDF.write.parquet("D:\\data\\output\\parquet") val prop = new Properties() prop.setProperty("user","root") prop.setProperty("password","root") personDF.write.mode(SaveMode.Overwrite).jdbc( "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop) println("写入成功") sc.stop() spark.stop() } }
package cn.itcast.sql import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession object ReadDataSourceDemo { def main(args: Array[String]): Unit = { //1.创建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL") .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.读取文件 spark.read.json("D:\\data\\output\\json").show() spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show() spark.read.parquet("D:\\data\\output\\parquet").show() val prop = new Properties() prop.setProperty("user","root") prop.setProperty("password","root") spark.read.jdbc( "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show() sc.stop() spark.stop() } }
1.SparkSQL写数据:
DataFrame/DataSet.write.json/csv/jdbc
2.SparkSQL读数据:
SparkSession.read.json/csv/text/jdbc/format
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。