当前位置:   article > 正文

第四章 Spark SQL多数据源交互_text data source supports only a single column, an

text data source supports only a single column, and you have 3 columns

Spark SQL可以与多种数据源交互,如普通文本、json、parquet、csv、MySQL等

1.写入不同数据源

2.读取不同数据源

 写数据

  1. package cn.itcast.sql
  2. import java.util.Properties
  3. import org.apache.spark.SparkContext
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
  6. object WriterDataSourceDemo {
  7. case class Person(id:Int,name:String,age:Int)
  8.   def main(args: Array[String]): Unit = {
  9.     //1.创建SparkSession
  10.     val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
  11. .getOrCreate()
  12.     val sc: SparkContext = spark.sparkContext
  13.     sc.setLogLevel("WARN")
  14.     //2.读取文件
  15.     val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
  16.     val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
  17.     val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
  18.     //3.将RDD转成DF
  19.     //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
  20.     import spark.implicits._
  21.     //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
  22.     //所以SparkSQL可以通过反射自动获取到并添加给DF
  23.     val personDF: DataFrame = rowRDD.toDF
  24.     //==================将DF写入到不同数据源===================
  25.     //Text data source supports only a single column, and you have 3 columns.;
  26.     //personDF.write.text("D:\\data\\output\\text")
  27.     personDF.write.json("D:\\data\\output\\json")
  28.     personDF.write.csv("D:\\data\\output\\csv")
  29.     personDF.write.parquet("D:\\data\\output\\parquet")
  30.     val prop = new Properties()
  31.     prop.setProperty("user","root")
  32.     prop.setProperty("password","root")
  33.     personDF.write.mode(SaveMode.Overwrite).jdbc(
  34. "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)
  35.     println("写入成功")
  36.     sc.stop()
  37.     spark.stop()
  38.   }
  39.  }

 

读数据

  1. package cn.itcast.sql
  2. import java.util.Properties
  3. import org.apache.spark.SparkContext
  4. import org.apache.spark.sql.SparkSession
  5. object ReadDataSourceDemo {
  6.   def main(args: Array[String]): Unit = {
  7.     //1.创建SparkSession
  8.     val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
  9. .getOrCreate()
  10.     val sc: SparkContext = spark.sparkContext
  11.     sc.setLogLevel("WARN")
  12.     //2.读取文件
  13.     spark.read.json("D:\\data\\output\\json").show()
  14.     spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()
  15.     spark.read.parquet("D:\\data\\output\\parquet").show()
  16.     val prop = new Properties()
  17.     prop.setProperty("user","root")
  18.     prop.setProperty("password","root")
  19.     spark.read.jdbc(
  20. "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()
  21.     sc.stop()
  22.     spark.stop()
  23.   }
  24. }

 

 总结

1.SparkSQL写数据:

DataFrame/DataSet.write.json/csv/jdbc

2.SparkSQL读数据:

SparkSession.read.json/csv/text/jdbc/format

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/579761
推荐阅读
相关标签
  

闽ICP备14008679号