当前位置:   article > 正文

SparkSQL数据源操作_savemode errorifexists

savemode errorifexists

SparkSQL数据源操作

版本说明: spark-2.3.0

SparkSQL支持很多数据源,我们可以使用Spark内置的数据源,目前Spark支持的数据源有:json,parquet,jdbc,orc,libsvm,csv,text。也可以指定自定义的数据源,只需要在读取数据源的时候,指定数据源的全名。在https://spark-packages.org/这个网站,我们可以获取到更多的第三方的数据源。

官网文档:http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources

收藏笔记:https://blog.csdn.net/wsdc0521/article/details/50011349

1 JSON数据源

1.1 以json格式写入

先手动生成一个DataFrame,然后以json格式写入文件

import spark.implicits._
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
//以json格式写入文件
df.write.format("json").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")
  • 1
  • 2
  • 3
  • 4
  • 5

保存数据时,可以指定SaveMode,如:

df.write.format("json").mode("errorifexists").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")
  • 1

这里指定了SaveMode为errorifexists也就是如果文件已经存在,就报错,这种保存模式也是系统默认的。常见的SaveMode有:

SaveMode描述
errorifexists默认,如果文件存在报错
append将DataFrame保存到数据源时,如果数据/表已存在,则DataFrame的内容应附加到现有数据。
overwrite覆盖模式意味着在将DataFrame保存到数据源时,如果数据/表已经存在,则预期现有数据将被DataFrame的内容覆盖。
ignore忽略模式意味着在将DataFrame保存到数据源时,如果数据已存在,则预期保存操作不会保存DataFrame的内容并且不会更改现有数据。这与CREATE TABLE IF NOT EXISTSSQL中的类似。

在写json时,我们也可以通过option传入特定的参数,支持参数如下所示:

名称默认值描述
compressionnull保存到文件时使用的压缩编解码器,如(nonebzip2gziplz4,*snappydeflate),不区分大小写。
dateFormatyyyy-MM-dd设置指示日期格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于日期类型。
timestampFormatyyyy-MM-dd’T’HH:mm:ss.SSSXXX设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于时间戳类型
encoding指定已保存的json 文件的编码(charset)。如果未设置,将使用UTF-8字符集。
lineSep\n指定行分隔符

1.2 以json格式读取

读取上面我们写入的json文件

  //读取json数据源
    val jsonDF = spark.read.format("json").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")
    jsonDF.show()
    /**
      * +---+----+-----------+
      * |age|name|      phone|
      * +---+----+-----------+
      * | 20|Ming|15552211521|
      * | 19|hong|13287994007|
      * | 21| zhi|15552211523|
      * +---+----+-----------+
      */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

当然读取json文件时,我们也可以通过option传入特定的参数,读取json支持的参数如下:

名称默认值描述
primitivesAsStringfalse将所有原始值推断为String类型
prefersDecimalfalse将所有浮点值推断为十进制类型。如果值不适合十进制,那么它将它们推断为双精度数。
allowCommentsfalse忽略JSON记录中的Java / C ++样式注释
allowUnquotedFieldNamesfalse允许不带引号的JSON字段名称
allowSingleQuotestrue除了双引号外允许使用单引号
allowNumericLeadingZerosfalse允许数字之前有零,(e.g. 00012)
allowBackslashEscapingAnyCharacterfalse允许使用反斜杠引用机制接受所有字符的引用
allowUnquotedControlCharsfalse允许JSON字符串包含不带引号的控制字符(值小于32的ASCII字符,包括制表符和换行符)或不包含。
modePERMISSIVE允许在解析过程中处理损坏记录的模式。共有三种模式:
PERMISSIVE:当它遇到损坏的记录时,将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中,并将其他字段设置为“null”。为了保持损坏的记录,用户可以在用户定义的模式中设置名为columnNameOfCorruptRecord的字符串类型字段。如果架构没有
字段,它在解析过程中丢弃损坏的记录。在推断模式时,它会在输出模式中隐式添加columnNameOfCorruptRecord字段。
DROPMALFORMED:忽略整个损坏的记录
FAILFAST:当遇到损坏记录时抛出异常
columnNameOfCorruptRecordspark.sql.columnNameOfCorruptRecord中指定的值允许重命名具有由’PERMISSIVE模式创建的格式错误的字符串的新字段。这会覆盖spark.sql.columnNameOfCorruptRecord`
dateFormatyyyy-MM-dd设置指示日期格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于日期类型。
timestampFormatyyyy-MM-dd’T’HH:mm:ss.SSSXXX设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于时间戳类型。
multiLinefalse解析一个记录,每个文件可能跨越多行
encoding允许强制为JSON文件设置标准基本或扩展编码之一。例如UTF-16BE,UTF-32LE。如果未指定encoding 并且multiLine设置为true,则会自动检测到它。
lineSep\r, \r\n and \n行分隔符
samplingRatio1.0定义用于模式推断的输入JSON对象的分数

2 CSV数据源

2.1 以csv格式写入

手动生成一个DataFrame然后以csv格式写入文件

import spark.implicits._
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
//以csv格式写入文件
df.write.format("csv").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/csv")
  • 1
  • 2
  • 3
  • 4
  • 5

支持option列表

名称默认值描述
sep,列分隔符
quote"设置用于转义引用值的单个字符,其中分隔符可以是值的一部分。如果设置了空字符串,则使用u0000 (空字符)
escape\设置一个用于在已引用的值内转义引号的单个字符。
charToEscapeQuoteEscapingescape or \0设置一个单独的字符,用于转义引号字符的转义。当转义和引号字符不同时,默认值为转义字符,否则为“\ 0”
escapeQuotestrue一个标志,指示包含引号的值是否应始终用引号括起来。默认是转义包含引号字符的所有值。
quoteAllfalse一个标志,指示是否所有值都应始终用引号括起来。默认是仅转义包含引号字符的值。
headerfalse将第一行写为列的名称
nullValue空字符串设置空值的字符串表示形式。
compressionnull保存到文件时使用的压缩编解码器,如(nonebzip2gziplz4snappydeflate),不区分大小写。
dateFormatyyyy-MM-dd设置指示日期格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于日期类型。
timestampFormatyyyy-MM-dd’T’HH:mm:ss.SSSXXX设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于时间戳类型。
ignoreLeadingWhiteSpacetrue一个标志,指示是否应该跳过正在写入的值的头部空格。
ignoreTrailingWhiteSpacetrue一个标志,指示是否应该跳过正在写入的值的尾部空格。

2.2 以csv格式读取

读取我们上面写入的csv文件

 //读取csv数据源
    val csvDF = spark.read.format("csv").options(Map("sep"->",","inferSchema"->"true","header"->"true")).load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/csv")
    csvDF.show()
  • 1
  • 2
  • 3

在读取csv文件时我们可以指定option参数:

名称默认值描述
sep,列分隔符
encodingUTF-8编码格式
quote"设置用于转义引用值的单个字符,其中分隔符可以是值的一部分。如果设置了空字符串,则使用u0000 (空字符)
escape\设置一个用于在已引用的值内转义引号的单个字符。
commentempty string设置一个用于跳过以此字符开头的行的单个字符。默认情况下,它被禁用。
headerfalse将第一行写为列的名称
enforceSchematrue如果将其设置为“true”,则将强制将指定或推断的模式应用于数据源文件,并忽略CSV文件中的标头。 如果该选项设置为“false”,则在header选项设置为“true”的情况下,将针对CSV文件中的所有标头验证模式。模式中的字段名称和CSV标题中的列名称通过考虑spark.sql.caseSensitive的位置进行检查。虽然默认值为true,但建议禁用 enforceSchema`选项以避免不正确的结果。
inferSchematrue从数据中自动推断输入模式。 需要对数据进行一次额外的传递。
samplingRatio1.0定义用于模式推断的行的分数。
dateFormatyyyy-MM-dd设置指示日期格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于日期类型。
timestampFormatyyyy-MM-dd’T’HH:mm:ss.SSSXXX设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于时间戳类型。
ignoreLeadingWhiteSpacefalse忽略开头是空格的值
ignoreTrailingWhiteSpacefalse忽略结尾是空格的值
nullValueempty string设置空值的字符串表示形式。从*2.0.1开始,这适用于所有支持的类型,包括字符串类型。
nanValueNaN设置非数字值的字符串表示形式。
positiveInfInf设置正无穷大值的字符串表示形式
negativeInf-Inf设置负无穷大值的字符串表示形式
maxColumns20480定义记录可以有多少列的硬限制。
maxCharsPerColumn-1定义允许读取的任何给定值的最大字符数。默认情况下,它为-1表示无限长度
mode允许在解析过程中处理损坏记录的模式。共有三种模式:
PERMISSIVE:当它遇到损坏的记录时,将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中,并将其他字段设置为“null”。为了保持损坏的记录,用户可以在用户定义的模式中设置名为columnNameOfCorruptRecord的字符串类型字段。如果架构没有
字段,它在解析过程中丢弃损坏的记录。在推断模式时,它会在输出模式中隐式添加columnNameOfCorruptRecord字段。
DROPMALFORMED:忽略整个损坏的记录
FAILFAST:当遇到损坏记录时抛出异常
columnNameOfCorruptRecordspark.sql.columnNameOfCorruptRecord中指定的值允许重命名具有由’PERMISSIVE模式创建的格式错误的字符串的新字段。这会覆盖spark.sql.columnNameOfCorruptRecord`
multiLinefalse解析一条记录,可能跨越多行。

3 Text数据源

3.1 以Text格式写入

将手动生成的DataFrame以text格式写入文件

 import spark.implicits._
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
    df.show()
    val textDF = df.map(_.toSeq.foldLeft("")(_+","+_).substring(1))
    //以text格式写入文件
    textDF.write.format("text").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/text")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

写入text格式时,要求我们的DataFrame只有一列,否则会报如下错误:

支持的option参数

名称默认值描述
compressionnull保存到文件时使用的压缩编解码器,如(nonebzip2gziplz4snappydeflate),不区分大小写。
lineSep\r, \r\n and \n行分隔符

3.2 以Text格式读取,并转为DataFrame

Spark SQL 读取text文件,只有一列,这是我们可以通过进一步的处理,转化为以“,”分割的多列DataFrame

//读取text文件
val text = spark.read.format("text").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/text")
text.show()
lazy val first = textDF.first()
val numAttrs = first.split(",").length
import org.apache.spark.sql.functions._
var newDF = textDF.withColumn("splitCols", split($"value", ","))
0.until(numAttrs).foreach(x => {
  newDF = newDF.withColumn("col" + "_" + x, $"splitCols".getItem(x))
})
newDF.show()

/**
  * +-------------------+--------------------+-----+-----+-----------+
  * |              value|           splitCols|col_0|col_1|      col_2|
  * +-------------------+--------------------+-----+-----+-----------+
  * |Ming,20,15552211521|[Ming, 20, 155522...| Ming|   20|15552211521|
  * |hong,19,13287994007|[hong, 19, 132879...| hong|   19|13287994007|
  * | zhi,21,15552211523|[zhi, 21, 1555221...|  zhi|   21|15552211523|
  * +-------------------+--------------------+-----+-----+-----------+
  */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

支持的option参数

名称默认值描述
wholetextfalse如果为true,则将文件作为单行读取,而不是按“\ n”拆分
lineSep\r, \r\n and \n行分隔符

4 Parquet数据源

Spark 默认的数据源操作为Parquet格式,也就是说,当我们read或者write的时候,不指定数据源类型,Spark默认会使用Parquet格式来处理。

4.1 以Parquet格式写入

import spark.implicits._
//从内存中创建一个DataFrame
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
//以Parquet的格式写入
df.write.format("parquet").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

option支持参数

名称默认值描述
compressionnull保存到文件时使用的压缩编解码器,如(nonebzip2gziplz4snappydeflate),不区分大小写。

4.2 以Parquet格式读取

//读取Parquet
val parquetDF = spark.read.format("parquet").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet")
parquetDF.printSchema()

/**
  * root
  * |-- name: string (nullable = true)
  * |-- age: integer (nullable = true)
  * |-- phone: long (nullable = true)
  */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

option参数

名称默认值描述
mergeSchemafalse如果为true,则将文件作为单行读取,而不是按“\ n”拆分

4.3 模式合并

官网的例子:

 // Create a simple DataFrame, store into a partition directory
    val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table/key=1")

    // Create another DataFrame in a new partition directory,
    // adding a new column and dropping an existing column
    val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
    cubesDF.write.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table/key=2")

    // Read the partitioned table
    val mergedDF = spark.read.option("mergeSchema", "true").parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table")
    mergedDF.printSchema()

    /***
      * root
      * |-- value: integer (nullable = true)
      * |-- square: integer (nullable = true)
      * |-- cube: integer (nullable = true)
      * |-- key: integer (nullable = true)
      */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

5 JDBC数据源

5.1 以JDBC数据源写入

import spark.implicits._
//从内存中创建一个DataFrame
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "hollysys")
//写入mysql数据库
df.write.jdbc("jdbc:mysql://localhost:3306/springboot?useSSL=false&characterEncoding=utf-8","spark_people",connectionProperties)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

执行后会在数据库中,自动创建表,并将数据写入到表中:

支持参数:

名称默认值描述
urljdbc:subprotocol:subname形式的JDBC数据库url
table外部数据库中表的名称。
connectionPropertiesJDBC数据库连接参数,任意字符串标记/值的列表。通常至少应包括“用户”和“密码”属性。 “batchsize”可用于控制每个插入的行数。 “isolationLevel”可以是“NONE”,“READ_COMMITTED”,“READ_UNCOMMITTED”,“REPEATABLE_READ”,或“SERIALIZABLE”之一,对应于JDBC的Connection对象定义的标准事务*隔离级别,默认值为“READ_UNCOMMITTED” 。

5.2 以JDBC数据源读取

使用Spark读取我们上面写入数据库表中的数据

//读取mysql数据库表数据
val mysqlDF = spark.read.jdbc("jdbc:mysql://localhost:3306/springboot?useSSL=false&characterEncoding=utf-8","spark_people",connectionProperties)
mysqlDF.show()

/**
  * +----+---+-----------+
  * |name|age|      phone|
  * +----+---+-----------+
  * |Ming| 20|15552211521|
  * |hong| 19|13287994007|
  * | zhi| 21|15552211523|
  * +----+---+-----------+
  */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

参数

名称默认值描述
urljdbc:subprotocol:subname形式的JDBC数据库url
table表名
columnName将用于分区的整数类型列的名称。
lowerBoundcolumnName的最小值用于决定分区步幅。
upperBoundcolumnName的最大值用于决定分区步幅。
numPartitions分区数量。这与lowerBound(包含),upperBound(不包括)一起形成分区,用于生成WHERE 子句表达式,用于均匀地分割列columnName。当*输入小于1时,数字设置为1。
connectionPropertiesJDBC数据库连接参数,任意字符串标记/值的列表。通常至少应包括“用户”和“密码”属性。 “batchsize”可用于控制每个插入的行数。 “isolationLevel”可以是“NONE”,“READ_COMMITTED”,“READ_UNCOMMITTED”,“REPEATABLE_READ”,或“SERIALIZABLE”之一,对应于JDBC的Connection对象定义的标准事务*隔离级别,默认值为“READ_UNCOMMITTED” 。

完整代码:

package com.hollysys.spark.sql

import java.util.Properties

import org.apache.spark.sql.SparkSession

/**
  * Created by shirukai on 2018/9/11
  * Spark SQL 对数据源操作
  */
object DataSourceApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local").getOrCreate()
    jsonExample(spark)
    //csvExample(spark)
    //textExample(spark)
    //parquetExample(spark)
    //jdbcExample(spark)
  }

  def jsonExample(spark: SparkSession): Unit = {
    import spark.implicits._
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
    df.show()
    //以json格式写入文件
    df.write.format("json").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")

    //读取json数据源
    val jsonDF = spark.read.format("json").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")
    jsonDF.printSchema()
    jsonDF.show()

    /**
      * +---+----+-----------+
      * |age|name|      phone|
      * +---+----+-----------+
      * | 20|Ming|15552211521|
      * | 19|hong|13287994007|
      * | 21| zhi|15552211523|
      * +---+----+-----------+
      */
  }

  def csvExample(spark: SparkSession): Unit = {
    import spark.implicits._
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
    df.show()
    //以csv格式写入文件
    df.write.format("csv").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/csv")
    //读取csv数据源
    val csvDF = spark.read.format("csv").options(Map("sep" -> ";", "inferSchema" -> "true", "header" -> "true")).load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/csv")
    csvDF.show()
  }

  def textExample(spark: SparkSession): Unit = {
    import spark.implicits._
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
    df.show()
    val textDF = df.map(_.toSeq.foldLeft("")(_ + "," + _).substring(1))
    //以text格式写入文件
    textDF.write.format("text").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/text")
    //读取text文件
    val text = spark.read.format("text").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/text")
    text.show()
    lazy val first = textDF.first()
    val numAttrs = first.split(",").length
    import org.apache.spark.sql.functions._
    var newDF = textDF.withColumn("splitCols", split($"value", ","))
    0.until(numAttrs).foreach(x => {
      newDF = newDF.withColumn("col" + "_" + x, $"splitCols".getItem(x))
    })
    newDF.show()

    /**
      * +-------------------+--------------------+-----+-----+-----------+
      * |              value|           splitCols|col_0|col_1|      col_2|
      * +-------------------+--------------------+-----+-----+-----------+
      * |Ming,20,15552211521|[Ming, 20, 155522...| Ming|   20|15552211521|
      * |hong,19,13287994007|[hong, 19, 132879...| hong|   19|13287994007|
      * | zhi,21,15552211523|[zhi, 21, 1555221...|  zhi|   21|15552211523|
      * +-------------------+--------------------+-----+-----+-----------+
      */
  }

  def parquetExample(spark: SparkSession): Unit = {
    import spark.implicits._
    //从内存中创建一个DataFrame
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
    df.show()
    //以Parquet的格式写入
    df.write.format("parquet").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet")

    //读取Parquet
    val parquetDF = spark.read.format("parquet").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet")
    parquetDF.printSchema()

    /**
      * root
      * |-- name: string (nullable = true)
      * |-- age: integer (nullable = true)
      * |-- phone: long (nullable = true)
      */

    // Create a simple DataFrame, store into a partition directory
    val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table/key=1")

    // Create another DataFrame in a new partition directory,
    // adding a new column and dropping an existing column
    val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
    cubesDF.write.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table/key=2")

    // Read the partitioned table
    val mergedDF = spark.read.option("mergeSchema", "true").parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table")
    mergedDF.printSchema()

    /** *
      * root
      * |-- value: integer (nullable = true)
      * |-- square: integer (nullable = true)
      * |-- cube: integer (nullable = true)
      * |-- key: integer (nullable = true)
      */
  }

  def jdbcExample(spark: SparkSession): Unit = {
    import spark.implicits._
    //从内存中创建一个DataFrame
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
    df.show()
    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "hollysys")
    //写入mysql数据库
    df.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/springboot?useSSL=false&characterEncoding=utf-8", "spark_people", connectionProperties)

    //读取mysql数据库表数据
    val mysqlDF = spark.read.jdbc("jdbc:mysql://localhost:3306/springboot?useSSL=false&characterEncoding=utf-8", "spark_people", connectionProperties)
    mysqlDF.show()

    /**
      * +----+---+-----------+
      * |name|age|      phone|
      * +----+---+-----------+
      * |Ming| 20|15552211521|
      * |hong| 19|13287994007|
      * | zhi| 21|15552211523|
      * +----+---+-----------+
      */

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/579674
推荐阅读
相关标签
  

闽ICP备14008679号