赞
踩
首先,导入pom依赖:
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spark.version>3.1.1</spark.version>
<spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${spark.scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
只需要导入这一个坐标即可:spark-sql_${spark.scala.version}
接着编写代码:
package com.zxl.spark.sql
import org.apache.spark.sql.catalog.{Database, Table}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, catalog}
object lesson01_sql_basic {
def main(args: Array[String]): Unit = {
// sql字符串 ==》 dataset 是对rdd的包装
// 只有rdd才能触发DAGScheduler
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("testsql")
val session: SparkSession = SparkSession.builder()
.config(conf)
//.enableHiveSupport() spark sql on hive需要开启 支持ddl
.getOrCreate()
//可以获取到sc
val sparkContext: SparkContext = session.sparkContext
sparkContext.setLogLevel("ERROR")
val dataFrame: DataFrame = session.read.json("input/json")
//展示数据
println(">" * 50)
dataFrame.show()
//展示schema
println(">" * 50)
dataFrame.printSchema()
//while (true){}
println(">" * 50)
val databases: Dataset[Database] = session.catalog.listDatabases()
databases.show()
println(">" * 50)
val tables: Dataset[Table] = session.catalog.listTables()
tables.show()
println(">" * 50)
val functions: Dataset[catalog.Function] = session.catalog.listFunctions()
functions.show()
println(">" * 50)
dataFrame.createTempView("abc")
val dataFrame1: DataFrame = session.sql("select * from abc")
dataFrame1.show()
println(">" * 50)
session.catalog.listTables().show()
import scala.io.StdIn.readLine
while (true){
val sqlStr: String = readLine("input your sql:")
session.sql(sqlStr).show()
}
}
}
执行结果演示》》
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。