当前位置:   article > 正文

Spark之SparkSQL数据源

sparksql可以处理的数据源
SparkSQL数据源:parquet Json Mysql Hive:

SparkSQL数据源

手动指定选项

Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDD的方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。

Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。修改配置项spark.sql.sources.default,可修改默认数据源格式。

1
2
3
val df = spark.read.load("/input/sparksql/users.parquet") 
df.show()
df.select("name","favorite_color").write.save("/output/sparksql_out/namesAndFavColors.parquet")

当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称定json, parquet, jdbc, orc, libsvm, csv, text来指定数据的格式。可以通过SparkSession提供的read.load方法用于通用加载数据,使用write和save保存数据。

1
2
3
val peopleDF = spark.read.format("json").load("/input/sparksql/people.json")
peopleDF.show()
peopleDF.write.format("parquet").save("/output/sparksql_out/namesAndAges.parquet")

同时也可以直接运行SQL在文件上:

1
2
3
4
val sqlDF = spark.sql("SELECT * FROM parquet.`/output/sparksql_out/namesAndAges.parquet`")
sqlDF.show()
val sqlDF = spark.sql("SELECT * FROM parquet.`/output/sparksql_out/namesAndAges.parquet`")
sqlDF.show()

文件保存选项

SaveMode定义了对数据的处理模式,这些保存模式不使用任何锁定,f非原子操作。当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。

Scala/JavaAny LanguageMeaning
SaveMode.ErrorIfExists(default)“error”(default)如果文件存在,则报错
SaveMode.Append“append”追加
SaveMode.Overwrite“overwrite”覆写
SaveMode.Ignore“ignore”数据存在,则忽略

Parquet格式

Parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,2015年5月从Apache的孵化器里毕业成为Apache顶级项目。

Parquet文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此Parquet格式文件是自解析的。

通常情况下,在存储Parquet数据的时候会按照Block大小设置行组的大小,由于一般情况下每一个Mapper任务处理数据的最小单位是一个Block,这样可以把每一个行组由一个Mapper任务处理,增大任务执行并行度。

k9A97d.png

Parquet文件的内容,一个文件中可以存储多个行组,文件的首位都是该文件的Magic Code,用于校验它是否是一个Parquet文件,Footer length记录了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和该文件存储数据的Schema信息。除了文件中每一个行组的元数据,每一页的开始都会存储该页的元数据,在Parquet中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引,目前Parquet中还不支持索引页。

1
2
3
4
import spark.implicits._

val peopleDF = spark.read.json("/input/sparksqls/people.json")
peopleDF.write.parquet("hdfs://datanode1:9000/people.parquet")
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/579690
推荐阅读
相关标签
  

闽ICP备14008679号