赞
踩
/** Spark SQL源码分析系列文章*/
Spark SQL 可以将数据缓存到内存中,我们可以见到的通过调用cache table tableName即可将一张表缓存到内存中,来极大的提高查询效率。
这就涉及到内存中的数据的存储形式,我们知道基于关系型的数据可以存储为基于行存储结构 或 者基于列存储结构,或者基于行和列的混合存储,即Row Based Storage、Column Based Storage、 PAX Storage。
Spark SQL 的内存数据是如何组织的?
Spark SQL 将数据加载到内存是以列的存储结构。称为In-Memory Columnar Storage。
若直接存储Java Object 会产生很大的内存开销,并且这样是基于Row的存储结构。查询某些列速度略慢,虽然数据以及载入内存,查询效率还是低于面向列的存储结构。
基于Row的Java Object存储:
内存开销大,且容易FULL GC,按列查询比较慢。
基于Column的ByteBuffer存储(Spark SQL):
内存开销小,按列查询速度较快。
Spark SQL的In-Memory Columnar Storage是位于spark列下面org.apache.spark.sql.columnar包内:
核心的类有 ColumnBuilder, InMemoryColumnarTableScan, ColumnAccessor, ColumnType.
如果列有压缩的情况:compression包下面有具体的build列和access列的类。
当我们调用spark sql 里的cache table command时,会生成一CacheCommand,这个Command是一个物理计划。
scala> val cached = sql("cache table src")
- cached: org.apache.spark.sql.SchemaRDD =
- SchemaRDD[0] at RDD at SchemaRDD.scala:103
- == Query Plan ==
- == Physical Plan ==
- CacheCommand src, true
这里打印出来tableName是src, 和一个是否要cache的boolean flag.
我们看下CacheCommand的构造:
CacheCommand支持2种操作,一种是把数据源加载带内存中,一种是将数据源从内存中卸载。
对应于SQLContext下的cacheTable和uncacheTabele。
- case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
- extends LeafNode with Command {
-
- override protected[sql] lazy val sideEffectResult = {
- if (doCache) {
- context.cacheTable(tableName) //缓存表到内存
- } else {
- context.uncacheTable(tableName)//从内存中移除该表的数据
- }
- Seq.empty[Any]
- }
- override def execute(): RDD[Row] = {
- sideEffectResult
- context.emptyResult
- }
- override def output: Seq[Attribute] = Seq.empty
- }
如果调用cached.collect(),则会根据Command命令来执行cache或者uncache操作,这里我们执行cache操作。
cached.collect()将会调用SQLContext下的cacheTable函数:
首先通过catalog查询关系,构造一个SchemaRDD。
- /** Returns the specified table as a SchemaRDD */
- def table(tableName: String): SchemaRDD =
- new SchemaRDD(this, catalog.lookupRelation(None, tableName))
- /** Caches the specified table in-memory. */
- def cacheTable(table
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。