当前位置:   article > 正文

Spark 8:Spark SQL 执行流程、执行引擎_sparksql执行引擎是工具吗

sparksql执行引擎是工具吗

RDD的执行流程回顾

SparkSQL的自动优化
RDD的运行会完全按照开发者的代码执行, 如果开发者水平有限,RDD的执行效率也会受到影响。
而SparkSQL会对写完的代码,执行“自动优化”, 以提升代码运行效率,避免开发者水平影响到代码执行效率。

为什么SparkSQL可以自动优化而RDD不可以?

RDD:内含数据类型不限格式和结构
DataFrame:100% 是二维表结构,可以被针对SparkSQL的自动优化,依赖于:Catalyst优化器

Catalyst优化器

总结 

 SparkSQL的执行流程

DataFrame因为存储的是二维表数据结构,可以被针对,所以可以自动优化执行流程。
自动优化依赖Catalyst优化器
自动优化2个大的优化项是:1. 断言(谓词)下推(行过滤) 2. 列值裁剪(列过滤)
DataFrame代码在被优化有,最终还是被转换成RDD去执行 

Spark On Hive

原理

配置

在代码中集成Spark On Hive

  1. # coding:utf8
  2. import string
  3. import time
  4. from pyspark.sql import SparkSession
  5. from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
  6. import pandas as pd
  7. from pyspark.sql import functions as F
  8. if __name__ == '__main__':
  9. # 0. 构建执行环境入口对象SparkSession
  10. spark = SparkSession.builder.\
  11. appName("test").\
  12. master("local[*]").\
  13. config("spark.sql.shuffle.partitions", 2).\
  14. config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse").\
  15. config("hive.metastore.uris", "thrift://node3:9083").\
  16. enableHiveSupport().\
  17. getOrCreate()
  18. sc = spark.sparkContext
  19. spark.sql("SELECT * FROM student").show()

Spark On Hive 就是因为Spark自身没有元数据管理功能, 所以使用Hive的Metastore服务作为元数据管理服务。计算由Spark执行。

分布式SQL执行引擎

概念

测试
客户端工具测试

代码测试

  1. # coding:utf8
  2. from pyhive import hive
  3. if __name__ == '__main__':
  4. # 获取到Hive(Spark ThriftServer的链接)
  5. conn = hive.Connection(host="node1", port=10000, username="hadoop")
  6. # 获取一个游标对象
  7. cursor = conn.cursor()
  8. # 执行SQL
  9. cursor.execute("SELECT * FROM student")
  10. # 通过fetchall API 获得返回值
  11. result = cursor.fetchall()
  12. print(result)

分布式SQL执行引擎就是使用Spark提供的ThriftServer服务,以“后台进程”的模式持续运行,对外提供端口。
可以通过客户端工具或者代码,以JDBC协议连接使用。
SQL提交后,底层运行的就是Spark任务。
相当于构建了一个以MetaStore服务为元数据,Spark为执行引擎的数据库服务,像操作数据库那样方便的操作SparkSQL进行分布式的SQL计算。

Spark Shuffle回顾
Map 和 Reduce
在Shuffle过程中. 提供数据的称之为Map端(Shuffle Write) 接收数据的 称之为 Reduce端(Shuffle Read)
在Spark的两个阶段中, 总是前一个阶段产生 一批Map提供数据, 下一阶段产生一批Reduce接收数据 

HashShuffleManager
Spark 提供2种Shuffle管理器:
HashShuffleManager
SortShuffleManager

未经优化的HashShuffleManager

优化后 HashShuffleManager 基本和未优化的一致,不同点在于
1. 在一个Executor内, 不同Task是共享Buffer缓冲区
2. 这样减少了缓冲区乃至写入磁盘文件的数量, 提高性能

SortShuffleManager
SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。

普通机制的SortShuffleManager

ByPass机制的SortShuffleManager 

bypass运行机制的触发条件如下:
1)shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。
2)不是聚合类的shuffle算子(比如reduceByKey)。
同普通机制基本类同, 区别在于, 写入磁盘临时文件的时候不会在内存中进行排序而是直接写, 最终合并为一个task一个最终文件。
所以和普通模式IDE区别在于:
第一,磁盘写机制不同;
第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

1. SortShuffle对比HashShuffle可以减少很多的磁盘文件,以节省网络IO的开销。
2. SortShuffle主要是对磁盘文件进行合并来进行文件数量的减少, 同时两类Shuffle都需要经过内存缓冲区溢写磁盘的场景. 所以可以得知, 尽管Spark是内存迭代计算框架, 但是内存迭代主要在窄依赖中. 在宽依赖(Shuffle)中磁盘交互还是一个无可避免的情况. 所以, 我们要尽量减少Shuffle的出现, 不要进行无意义的Shuffle计算。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/727431
推荐阅读
相关标签
  

闽ICP备14008679号