1.几种缓存数据的方法
例如有一张hive表叫做activity
1.CACHE TABLE
- //缓存全表
- sqlContext.sql("CACHE TABLE activity")
-
- //缓存过滤结果
- sqlContext.sql("CACHE TABLE activity_cached as select * from activity where ...")
- 1
- 2
- 3
- 4
- 5
- 6
CACHE TABLE是即时生效(eager)的,如果你想等到一个action操作再缓存数据可以使用CACHE LAZY TABLE,这样操作会直到一个action操作才被触发,例如count(*)
sqlContext.sql("CACHE LAZY TABLE ...")
- 1
- 2
取消hive表缓存数据
sqlContext.sql("UNCACHE TABLE activity")
- 1
- 2
2.将dataFrame注册成表并缓存
- val df = sqlContext.sql("select * from activity")
- df.registerTempTable("activity_cached")
- sqlContext.cacheTable("activity_cached")
-
- Tip:cacheTable操作是lazy的,需要一个action操作来触发缓存操作。
- 1
- 2
- 3
- 4
- 5
- 6
对应的uncacheTable可以取消缓存
sqlContext.uncacheTable("activity_cached")
- 1
- 2
3.缓存dataFrame
- val df = sqlContext.sql("select * from tableName")
- df.cache()
- 1
- 2
- 3
2.缓存结果
缓存时看到如下提示:
Added rdd_xx_x in memory on ...
- 1
- 2
如果内存不足,则会存入磁盘中,提示如下:
Added rdd_xx_x on disk on ...
- 1
- 2
缓存数据后可以在Storage上看到缓存的数据
3.一些参数
spark.sql.autoBroadcastJoinThreshold
- 1
- 2
该参数默认为10M,在进行join等聚合操作时,将小于该值的表broadcast到每台worker,消除了大量的shuffle操作。
spark.rdd.compress true
- 1
- 2
将rdd存入mem或disk前再进行一次压缩,效果显著,我使用cacheTable了一张表,没有开启该参数前总共cache了54G数据,开启这个参数后只34G,可是执行速度并没有收到太大的影响。
spark.sql.shuffle.partitions
- 1
- 2
这个参数默认为200,是join等聚合操作的并行度,如果有大量的数据进行操作,造成单个任务比较重,运行时间过长的时候,会报如下的错误:
org.apache.spark.shuffle.FetchFailedException: Connection from /192.168.xx.xxx:53450 closed
- 1
- 2
这个时候需要提高该值。