赞
踩
Spark SQL 提供了丰富的API来与各种数据源进行交互,包括Parquet、JSON、CSV、JDBC等。以下是一些使用Spark SQL与数据源进行基本操作的基本步骤和示例代码。
首先,你需要初始化一个SparkSession
对象,这是Spark SQL的入口点。
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Spark SQL Basic Operations")
.master("local[*]") // 在本地运行,使用所有可用的核心
.getOrCreate()
// 导入隐式转换,以便可以直接使用DataFrame的API
import spark.implicits._
val parquetDF = spark.read.parquet("path/to/your/people.parquet")
parquetDF.show()
parquetDF.printSchema()
val jsonDF = spark.read.json("path/to/your/people.json")
jsonDF.show()
jsonDF.printSchema()
val csvDF = spark.read
.option("header", "true") // 如果CSV文件包含标题行
.option("inferSchema", "true") // 自动推断列的数据类型
.csv("path/to/your/people.csv")
csvDF.show()
csvDF.printSchema()
val selectedDF = parquetDF.select("name", "age")
selectedDF.show()
val filteredDF = parquetDF.filter($"age" > 20)
filteredDF.show()
val groupedDF = parquetDF.groupBy("age").count()
groupedDF.show()
val sortedDF = parquetDF.orderBy($"age".asc)
sortedDF.show()
假设你有两个DataFrame,df1
和df2
,它们都有一个共同的列id
,你可以使用join
函数将它们连接起来。
val joinedDF = df1.join(df2, df1("id") === df2("id"))
joinedDF.show()
parquetDF.write.parquet("path/to/output/people.parquet")
parquetDF.write
.option("header", "true")
.csv("path/to/output/people.csv")
虽然Spark不直接支持将DataFrame写入单个JSON文件,但你可以将数据写入到一个JSON文件夹中,每个分区的数据会写入到一个单独的JSON文件中。
parquetDF.write.json("path/to/output/people.json")
完成所有操作后,确保停止SparkSession以释放资源。
spark.stop()
这些示例展示了Spark SQL与数据源进行交互的基本操作。你可以根据自己的需求进一步扩展这些示例,使用更复杂的查询和转换来处理数据。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。