赞
踩
Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDD的方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。
Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。修改配置项spark.sql.sources.default,可修改默认数据源格式。
1 | val df = spark.read.load("/input/sparksql/users.parquet") |
当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet
),如果数据源格式为内置格式,则只需要指定简称定json, parquet, jdbc, orc, libsvm, csv,
text来指定数据的格式。可以通过SparkSession提供的read.load方法用于通用加载数据,使用write和save保存数据。
1 | val peopleDF = spark.read.format("json").load("/input/sparksql/people.json") |
同时也可以直接运行SQL在文件上:
1 | val sqlDF = spark.sql("SELECT * FROM parquet.`/output/sparksql_out/namesAndAges.parquet`") |
SaveMode定义了对数据的处理模式,这些保存模式不使用任何锁定,f非原子操作。当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。
Scala/Java | Any Language | Meaning |
---|---|---|
SaveMode.ErrorIfExists(default) | “error”(default) | 如果文件存在,则报错 |
SaveMode.Append | “append” | 追加 |
SaveMode.Overwrite | “overwrite” | 覆写 |
SaveMode.Ignore | “ignore” | 数据存在,则忽略 |
Parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,2015年5月从Apache的孵化器里毕业成为Apache顶级项目。
Parquet文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此Parquet格式文件是自解析的。
通常情况下,在存储Parquet数据的时候会按照Block大小设置行组的大小,由于一般情况下每一个Mapper任务处理数据的最小单位是一个Block,这样可以把每一个行组由一个Mapper任务处理,增大任务执行并行度。
Parquet文件的内容,一个文件中可以存储多个行组,文件的首位都是该文件的Magic Code,用于校验它是否是一个Parquet文件,Footer length记录了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和该文件存储数据的Schema信息。除了文件中每一个行组的元数据,每一页的开始都会存储该页的元数据,在Parquet中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引,目前Parquet中还不支持索引页。
1 | import spark.implicits._ |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。