赞
踩
SparkSQL提供了通用的保存数据和数据加载的方式。根据不同的参数读取,并保存不同格式的数据。SparkSQL默认读取和保存的文件格式为Parquet。
spark.read.load 是加载数据的通用方式。
如果读取不同格式的数据,可以对不同的数据格式进行设定,如:
- spark.read.format("json").load("data/user.json")
-
- // 可以简化为如下:
- spark.read.json("data/user.json")
spark.write.save 是保存数据的通用方式。
如果保存不同格式的数据,可以对不同的数据格式进行设定,如:
- df.write.format("json").save("data/user.json")
- df.write.format("orc").saveAsTable("dws_events.DF_user_friend_count")
-
- // 可以简化为如下:
- spark.write.json("data/user.json")
保存操作可以使用SaveMode,用来指明如何处理数据,使用mode()方法来设置。SaveMode是一个枚举类,其中常量包括:
Scala/ Java | Any Language | Meaning |
---|---|---|
SaveMode.ErrorIfExists(default) | "error"(default) | 如果文件已经存在则抛出异常 |
SaveMode.Append | "append" | 如果文件已经存在则追加 |
SaveMode.Overwrite | "overwrite" | 如果文件已经存在则覆盖 |
SaveMode.Ignore | "ignore" | 如果文件已经存在则忽略 |
如:
df.write.mode("append").json("data/user.json")
SparkSQL的默认数据源为Parquet格式。Parquet是一种能够有效存储嵌套数据的列式存储格式。
数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default,可以修改默认数据源格式。
SparkSQL能够自动推测JSON数据集的结构,并将它加载为一个Dataset[Row]。可以通过SparkSession.read.json()去加载JSON文件。
import spark.implicits._
- val path = "/opt/module/spark-local/people.json"
- val peopleDF = spark.read.json(path)
peopleDF.createOrReplaceTempView("people")
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 and 19")
SparkSQL可以配置CSV文件的列表信息,读取CSV文件,CSV文件的第一行设置为数据列。
- spark.read.format("csv")
- .option("sep",";")
- .option("inferSchema", "true")
- .option("header", "true")
- .load("data/user.csv")
SparkSQL可以通过JDBC从关系型数据库中读取数据的方式来创建DataFrame。通过对DataFrame进行一系列的计算后,再将数据写回到关系型数据库中。如果使用spark-shell操作,可在启动shell时指定相关的数据库驱动路径,或将相关的数据库驱动放到spark的类路径下。
- bin/spark-shell
- --jars mysql-connector-java-8.0.30.jar
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.30</version>
- </dependency>
- val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
-
- // 创建 SparkSession 对象
- val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
-
- import spark.implicits._
-
- // 方式1:通过load方式读取
- val df = spark.read.format("jdbc")
- .option("url", "jdbc:mysql://192.168.136.20:3306/spark-sql") // 数据库名字
- .option("driver", "com.mysql.jdbc.Driver")
- .option("user", "xsqone")
- .option("pasword", "root")
- .option("dbtable", "user") // 表名
- .load()
-
- df.show
-
- // 方式2:通用的load方法读取(参数另一种形式)
- spark.read.format("jdbc")
- .options(
- Map(
- "url"->"jdbc:mysql://linux1:3306/spark-sql?user=root&password=123123",
- "dbtable"->"user",
- "driver"->"com.mysql.jdbc.Driver"))
- .load().show
-
-
- // 方式3:使用JDBC读取
- val props: Properties = new Properties()
- props.setProperty("user", "root")
- props.setProperty("password", "123123")
-
- val df: DataFrame = spark.read.jdbc("jdbc:mysql://linux1:3306/spark-sql", "user", props)
-
- df.show
-
-
- // 释放资源
- spark.stop()
- //方式1:通用的方式 format 指定写出类型
- val df = spark.write.format("jdbc")
- .option("url", "jdbc:mysql://192.168.136.20:3306/spark-sql") // 数据库名字
- .option("driver", "com.mysql.jdbc.Driver")
- .option("user", "xsqone")
- .option("pasword", "root")
- .option("dbtable", "user") // 表名
- .mode(SaveMode.Append)
- .save()
-
- // 方式2:通过JDBC方法
- val props: Properties = new Properties()
-
- props.setProperty("user", "root")
- props.setProperty("password", "123123")
-
- ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://linux1:3306/spark-sql", "user", props)
Hive是Hadoop上的SQL引擎,SparkSQL编译时可以包含Hive支持,也可以不包含。
如果使用spark内嵌的hive,那么可以直接使用。hive的元数据存储在derby中,默认仓库地址:$SPARK_HOME/spark-warehouse
- scala> spark.sql("show tables").show
- +--------+---------+-----------+
- |database|tableName|isTemporary|
- +--------+---------+-----------+
- +--------+---------+-----------+
-
- scala> spark.sql("create table aa(id int)")
-
- scala> spark.sql("show tables").show
- +--------+---------+-----------+
- |database|tableName|isTemporary|
- +--------+---------+-----------+
- | default| aa| false|
- +--------+---------+-----------+
向表中加载本地数据
- scala> spark.sql("load data local inpath 'input/ids.txt' into table aa")
-
- scala> spark.sql("select * from aa").show
- +---+
- | id|
- +---+
- | 1|
- | 2|
- | 3|
- | 4|
- +---+
想要连接外部已经部署好的HIVE,需要下面几个步骤:
1、spark 要接管hive需要把 hive-site.xml 拷贝到 conf/ 目录下
2、把 MySQL 的驱动拷贝到 jars/ 目录下
3、如果访问不到 hdfs ,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/ 目录下
4、重启 spark-shell
- scala> spark.sql("show tables").show
- 20/04/25 22:05:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
- +--------+--------------------+-----------+
- |database| tableName|isTemporary|
- +--------+--------------------+-----------+
- | default| emp| false|
- | default|hive_hbase_emp_table| false|
- | default| relevance_hbase_emp| false|
- | default| staff_hive| false|
- | default| ttt| false|
- | default| user_visit_action| false|
- +--------+--------------------+-----------+
SparkSQL CLI可以很方便的在本地运行HIVE元数据服务以及从命令行执行查询任务。在spark目录下执行如下命令启动SparkSQL CLI,直接执行SQL语句。类似hive窗口。
bin/spark-sql
Spark Thrift Server 是 spark 基于 HiveServer2 实现的一个 Thrift 服务。旨在无缝兼容 HiveServer2 。由于 Spark Thrift Server 的接口和协议都与 HiveServer2 完全一致,因此部署好 Spark Thrift Server 后,可以直接使用 hive 的 beeline 访问 spark Thrift Serve r执行相关语句。
如果想连接 Thrift Server ,需要通过以下几个步骤:
1、spark 要接管 hive 需要把 hive-site 拷贝到 conf/ 目录下
2、把 MySQL 的驱动拷贝到 jars/ 目录下
3、如果访问不到 hdfs ,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/ 目录下
4、启动Thrift Server
sbin/start-thriftserver.sh
5、使用 beeline 连接 Thrift Server
bin/beeline -u jdbc:hive2://linux1:10000 -n root
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.12</artifactId>
- <version>3.1.2</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.12</artifactId>
- <version>3.1.2</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.12</artifactId>
- <version>3.1.2</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.29</version>
- </dependency>
- <?xml version="1.0"?>
- <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
- <configuration>
-
- <!--配置数据库连接-->
- <property>
- <name>javax.jdo.option.ConnectionURL</name>
- <value>jdbc:mysql://localhost:3306/hive2?createDatabaseIfNotExist=true</value>
- <description>JDBC connect string for a JDBC metastore</description>
- </property>
-
- <!--配置数据库连接驱动-->
- <property>
- <name>javax.jdo.option.ConnectionDriverName</name>
- <value>com.mysql.jdbc.Driver</value>
- <description>Driver class name for a JDBC metastore</description>
- </property>
-
- <!--配置数据库连接用户名-->
- <property>
- <name>javax.jdo.option.ConnectionUserName</name>
- <value>admin</value>
- <description>username to use against metastore database</description>
- </property>
-
- <!--配置数据库连接密码-->
- <property>
- <name>javax.jdo.option.ConnectionPassword</name>
- <value>admin</value>
- <description>password to use against metastore database</description>
- </property>
-
- <!--配置使用hive查询数据时,显示所查询字段的头信息-->
- <property>
- <name>hive.cli.print.header</name>
- <value>true</value>
- <description>Whether to print the names of the columns in query output.</description>
- </property>
- <property>
- <name>hive.cli.print.current.db</name>
- <value>true</value>
- <description>Whether to include the current database in the Hive prompt.</description>
- </property>
- </configuration>
- //创建 SparkSession
- val spark: SparkSession = SparkSession
- .builder()
- // 添加对应主机IP
- .config("hive.metastore.uris", "thrift://192.168.153.139:9083")
- .enableHiveSupport()
-
- .master("local[*]")
- .appName("sql")
- .getOrCreate()
spark.sql("show tables").show
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。