当前位置:   article > 正文

大数据技术之SparkSQL——数据的读取和保存_sparksql save

sparksql save

一、通用的加载和保存方式

        SparkSQL提供了通用的保存数据和数据加载的方式。根据不同的参数读取,并保存不同格式的数据。SparkSQL默认读取和保存的文件格式为Parquet。

1.1 加载数据

spark.read.load 是加载数据的通用方式。

 如果读取不同格式的数据,可以对不同的数据格式进行设定,如:

  1. spark.read.format("json").load("data/user.json")
  2. // 可以简化为如下:
  3. spark.read.json("data/user.json")

 

 1.2 保存数据

spark.write.save 是保存数据的通用方式。

          如果保存不同格式的数据,可以对不同的数据格式进行设定,如:

  1. df.write.format("json").save("data/user.json")
  2. df.write.format("orc").saveAsTable("dws_events.DF_user_friend_count")
  3. // 可以简化为如下:
  4. spark.write.json("data/user.json")

        保存操作可以使用SaveMode,用来指明如何处理数据,使用mode()方法来设置。SaveMode是一个枚举类,其中常量包括:

Scala/ JavaAny LanguageMeaning
SaveMode.ErrorIfExists(default)"error"(default)如果文件已经存在则抛出异常
SaveMode.Append"append"如果文件已经存在则追加
SaveMode.Overwrite"overwrite"如果文件已经存在则覆盖
SaveMode.Ignore"ignore"如果文件已经存在则忽略

如:

df.write.mode("append").json("data/user.json")

二、Parquet

        SparkSQL的默认数据源为Parquet格式。Parquet是一种能够有效存储嵌套数据的列式存储格式。

        数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default,可以修改默认数据源格式。

三、JSON

        SparkSQL能够自动推测JSON数据集的结构,并将它加载为一个Dataset[Row]。可以通过SparkSession.read.json()去加载JSON文件。

1)导入隐式转换

import spark.implicits._

2)加载JSON文件

  1. val path = "/opt/module/spark-local/people.json"
  2. val peopleDF = spark.read.json(path)

3)创建临时表

peopleDF.createOrReplaceTempView("people")

4)数据查询

val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 and 19")

四、CSV

        SparkSQL可以配置CSV文件的列表信息,读取CSV文件,CSV文件的第一行设置为数据列。

  1. spark.read.format("csv")
  2. .option("sep",";")
  3. .option("inferSchema", "true")
  4. .option("header", "true")
  5. .load("data/user.csv")

五、MySQL

        SparkSQL可以通过JDBC从关系型数据库中读取数据的方式来创建DataFrame。通过对DataFrame进行一系列的计算后,再将数据写回到关系型数据库中。如果使用spark-shell操作,可在启动shell时指定相关的数据库驱动路径,或将相关的数据库驱动放到spark的类路径下。

spark-shell

  1. bin/spark-shell
  2. --jars mysql-connector-java-8.0.30.jar

IDEA中通过JDBC对MySQL进行操作

1)导入依赖

  1. <dependency>
  2. <groupId>mysql</groupId>
  3. <artifactId>mysql-connector-java</artifactId>
  4. <version>8.0.30</version>
  5. </dependency>

2)读取数据

  1. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
  2. // 创建 SparkSession 对象
  3. val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  4. import spark.implicits._
  5. // 方式1:通过load方式读取
  6. val df = spark.read.format("jdbc")
  7. .option("url", "jdbc:mysql://192.168.136.20:3306/spark-sql") // 数据库名字
  8. .option("driver", "com.mysql.jdbc.Driver")
  9. .option("user", "xsqone")
  10. .option("pasword", "root")
  11. .option("dbtable", "user") // 表名
  12. .load()
  13. df.show
  14. // 方式2:通用的load方法读取(参数另一种形式)
  15. spark.read.format("jdbc")
  16. .options(
  17. Map(
  18. "url"->"jdbc:mysql://linux1:3306/spark-sql?user=root&password=123123",
  19. "dbtable"->"user",
  20. "driver"->"com.mysql.jdbc.Driver"))
  21. .load().show
  22. // 方式3:使用JDBC读取
  23. val props: Properties = new Properties()
  24. props.setProperty("user", "root")
  25. props.setProperty("password", "123123")
  26. val df: DataFrame = spark.read.jdbc("jdbc:mysql://linux1:3306/spark-sql", "user", props)
  27. df.show
  28. // 释放资源
  29. spark.stop()

3)写入数据

  1. //方式1:通用的方式 format 指定写出类型
  2. val df = spark.write.format("jdbc")
  3. .option("url", "jdbc:mysql://192.168.136.20:3306/spark-sql") // 数据库名字
  4. .option("driver", "com.mysql.jdbc.Driver")
  5. .option("user", "xsqone")
  6. .option("pasword", "root")
  7. .option("dbtable", "user") // 表名
  8. .mode(SaveMode.Append)
  9. .save()
  10. // 方式2:通过JDBC方法
  11. val props: Properties = new Properties()
  12. props.setProperty("user", "root")
  13. props.setProperty("password", "123123")
  14. ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://linux1:3306/spark-sql", "user", props)

六、Hive

        Hive是Hadoop上的SQL引擎,SparkSQL编译时可以包含Hive支持,也可以不包含。

1)内嵌的HIVE(使用较少)

        如果使用spark内嵌的hive,那么可以直接使用。hive的元数据存储在derby中,默认仓库地址:$SPARK_HOME/spark-warehouse

  1. scala> spark.sql("show tables").show
  2. +--------+---------+-----------+
  3. |database|tableName|isTemporary|
  4. +--------+---------+-----------+
  5. +--------+---------+-----------+
  6. scala> spark.sql("create table aa(id int)")
  7. scala> spark.sql("show tables").show
  8. +--------+---------+-----------+
  9. |database|tableName|isTemporary|
  10. +--------+---------+-----------+
  11. | default| aa| false|
  12. +--------+---------+-----------+

向表中加载本地数据

  1. scala> spark.sql("load data local inpath 'input/ids.txt' into table aa")
  2. scala> spark.sql("select * from aa").show
  3. +---+
  4. | id|
  5. +---+
  6. | 1|
  7. | 2|
  8. | 3|
  9. | 4|
  10. +---+

 

2)外部的HIVE

        想要连接外部已经部署好的HIVE,需要下面几个步骤:

1、spark 要接管hive需要把 hive-site.xml 拷贝到 conf/ 目录下

2、把 MySQL 的驱动拷贝到 jars/ 目录下

3、如果访问不到 hdfs ,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/ 目录下

4、重启 spark-shell

  1. scala> spark.sql("show tables").show
  2. 20/04/25 22:05:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
  3. +--------+--------------------+-----------+
  4. |database| tableName|isTemporary|
  5. +--------+--------------------+-----------+
  6. | default| emp| false|
  7. | default|hive_hbase_emp_table| false|
  8. | default| relevance_hbase_emp| false|
  9. | default| staff_hive| false|
  10. | default| ttt| false|
  11. | default| user_visit_action| false|
  12. +--------+--------------------+-----------+

 

3)运行SparkSQL CLI

        SparkSQL CLI可以很方便的在本地运行HIVE元数据服务以及从命令行执行查询任务。在spark目录下执行如下命令启动SparkSQL CLI,直接执行SQL语句。类似hive窗口。

bin/spark-sql

4)运行Spark beeline

        Spark Thrift Server 是 spark 基于 HiveServer2 实现的一个 Thrift 服务。旨在无缝兼容 HiveServer2 。由于 Spark Thrift Server 的接口和协议都与 HiveServer2 完全一致,因此部署好 Spark Thrift Server 后,可以直接使用 hive 的 beeline 访问 spark Thrift Serve r执行相关语句。

        如果想连接 Thrift Server ,需要通过以下几个步骤:

1、spark 要接管 hive 需要把 hive-site 拷贝到 conf/ 目录下

2、把 MySQL 的驱动拷贝到 jars/ 目录下

3、如果访问不到 hdfs ,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/ 目录下

4、启动Thrift Server

sbin/start-thriftserver.sh

5、使用 beeline 连接 Thrift Server

bin/beeline -u jdbc:hive2://linux1:10000 -n root

5)代码操作HIVE:enableHiveSupport()

1> 导入依赖

  1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-core_2.12</artifactId>
  5. <version>3.1.2</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
  8. <dependency>
  9. <groupId>org.apache.spark</groupId>
  10. <artifactId>spark-sql_2.12</artifactId>
  11. <version>3.1.2</version>
  12. </dependency>
  13. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
  14. <dependency>
  15. <groupId>org.apache.spark</groupId>
  16. <artifactId>spark-hive_2.12</artifactId>
  17. <version>3.1.2</version>
  18. </dependency>
  19. <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
  20. <dependency>
  21. <groupId>mysql</groupId>
  22. <artifactId>mysql-connector-java</artifactId>
  23. <version>8.0.29</version>
  24. </dependency>

2> 将hive-site.xml文件拷贝到项目的resource目录中

  1. <?xml version="1.0"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3. <configuration>
  4. <!--配置数据库连接-->
  5. <property>
  6. <name>javax.jdo.option.ConnectionURL</name>
  7. <value>jdbc:mysql://localhost:3306/hive2?createDatabaseIfNotExist=true</value>
  8. <description>JDBC connect string for a JDBC metastore</description>
  9. </property>
  10. <!--配置数据库连接驱动-->
  11. <property>
  12. <name>javax.jdo.option.ConnectionDriverName</name>
  13. <value>com.mysql.jdbc.Driver</value>
  14. <description>Driver class name for a JDBC metastore</description>
  15. </property>
  16. <!--配置数据库连接用户名-->
  17. <property>
  18. <name>javax.jdo.option.ConnectionUserName</name>
  19. <value>admin</value>
  20. <description>username to use against metastore database</description>
  21. </property>
  22. <!--配置数据库连接密码-->
  23. <property>
  24. <name>javax.jdo.option.ConnectionPassword</name>
  25. <value>admin</value>
  26. <description>password to use against metastore database</description>
  27. </property>
  28. <!--配置使用hive查询数据时,显示所查询字段的头信息-->
  29. <property>
  30. <name>hive.cli.print.header</name>
  31. <value>true</value>
  32. <description>Whether to print the names of the columns in query output.</description>
  33. </property>
  34. <property>
  35. <name>hive.cli.print.current.db</name>
  36. <value>true</value>
  37. <description>Whether to include the current database in the Hive prompt.</description>
  38. </property>
  39. </configuration>

3> 启用hive支持

  1. //创建 SparkSession
  2. val spark: SparkSession = SparkSession
  3. .builder()
  4. // 添加对应主机IP
  5. .config("hive.metastore.uris", "thrift://192.168.153.139:9083")
  6. .enableHiveSupport()
  7. .master("local[*]")
  8. .appName("sql")
  9. .getOrCreate()

4> 增加对应的依赖关系(包含MySQL驱动)

spark.sql("show tables").show

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号