当前位置:   article > 正文

spark数据源_spark支持创建的数据源格式

spark支持创建的数据源格式

数据源分类

spark中支持多种数据源(jdbc、parquet、csv、json等),所以在可以读取多种类型的数据源。
csv格式的数据源,他的默认分隔符是",",可以使用Excel来打开,但是会出现数据乱码(因为CSV中不同操作系统的字符编码不一致);可以使用一下方式解决:https://jingyan.baidu.com/article/4dc408484776fbc8d846f168.html
parquet格式的数据源是以二进制的方式来保存数据,在保存数据时,会将数据按照字段来保存,同时还会保存各个字段的偏移量;在读取数据时,他会根据代码的内容(如果只读取部分字段),从对应的位置读取数据,这极大的加快了数据读取速度。

spark读取各种数据源

jdbc

//导入隐式转换
import spark.implicits._
    /**
      * load方法不会获取数据库中的数据,
      * 但是他会获取数据库中表的schema信息。
      */
//"read.format"表示指定读取的数据源种类
    val logs: DataFrame = spark.read.format("jdbc").options(
      Map(
  //数据库连接,"bigdata"表示指定的数据库名
  "url" -> "jdbc:mysql://localhost:3306/bigdata",
  //数据库的驱动
        "driver" -> "com.mysql.jdbc.Driver",
  //指定读取数据库中的数据表
        "dbtable" -> "logs",
 //指定读取数据库的用户
        "user" -> "root",
 //指定读取数据库的密码
        "password" -> "123568")
    ).load()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

csv

//指定以后读取csv类型的数据
    val csv: DataFrame = spark.read.csv("D:\\data\\out1\\csv")
    csv.printSchema()
    val pdf: DataFrame = csv.toDF("id", "name", "age")
    pdf.show()
    spark.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

json

  //指定以后读取json类型的数据(有表头)
    val jsons: DataFrame = spark.read.json("D:\\data\\out1\\json")
    jsons.printSchema()
    jsons.show()
    spark.stop()
  • 1
  • 2
  • 3
  • 4
  • 5

parquet

 //指定以后读取json类型的数据
    val parquetLine: DataFrame = spark.read.parquet("D:\\data\\out1\\parquet")
    //val parquetLine: DataFrame = spark.read.format("parquet").load("/Users/zx/Desktop/pq")
    parquetLine.printSchema()
    parquetLine.show()
    spark.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

写入数据

   /**
      * 将结果写入到指定的数据源中
      */
    val props = new Properties()
    props.put("user","root")
    props.put("password","root")
    /**
      * mode有四种方式,"Append","Overwrite",
      * "ErrorIfExists", "Ignore";
      * "Overwrite"表示先将已有的表及其数据全都删除,
      *   再重新创建该表,最后插入新的数据;
      * "Append"表示如果表已经存在,则追加在该表中
      * (此时,插入的字段名必须与数据库中的表的字段名相同,否则追加失败),
      *   若该表不存在,则会先创建表,再插入数据;
      * "Ignore"表示如果表不存在,则创建表,并存入数据;
      *   如果表存在的情况下,直接跳过数据的存储,不会报错,即不做任何操作
      *"ErrorIfExists"为默认模式,表示如果数据库中已经存在该表,
      *   则会直接报异常,导致数据不能存入数据库
      */
    reslut.write.mode(SaveMode.ErrorIfExists).jdbc("jdbc:mysql://localhost:3306/test", "student3", props)
    /**
      * 将数据写入到json中
      */
    reslut.write.json("D:\\data\\out1\\json")
    /**
      * 将数据写入到csv中
      */
    reslut.write.csv("D:\\data\\out1\\csv")
    /**
      * 将数据写入到parquet中
      */
    reslut.write.parquet("D:\\data\\out1\\parquet")
    reslut.show()
    spark.close()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/870827
推荐阅读
相关标签
  

闽ICP备14008679号