赞
踩
Spark 2.x的第二代tungsten引擎原理之前,先看一下当前的Spark的工作原理。我们可以通过一个SQL来举例,这个SQL扫描了单个表,然后对属性等于指定值的记录进行汇总计数。SQL语句如下:select count(*) from store_sales where ss_item_sk=1000。
要执行这个查询,Spark 1.x会使用一种最流行、最经典的查询求值策略,该策略主要基于Volcano Iterator Model。这个策略如下图
spark1.0 查询求值策略.png
在这种模型中,一个查询会包含多个operator,每个operator都会实现一个接口,提供一个next()方法,该方法返回operator tree中的下一个operator。
举例来说,上面那个查询中的filter operator的代码大致如下所示
- class Filter(child:Operator, predicate:(Row => Boolean))extends Operator{
- def next():Row ={
- var current = child.next()
- while(current == null || predicate(current)) {
- current = child.next()
- }
- return current
- }
- }
让每一个operator都实现一个iterator接口,可以让查询引擎优雅的组装任意operator在一起。而不需要查询引擎去考虑每个operator具体的一些处理逻辑,比如数据类型等。
Vocano Iterator Model也因此成为了数据库SQL执行引擎领域内内的20年中最流行的一种标准。而且Spark SQL最初的SQL执行引擎也是基于这个思想来实现的。
对于上面的那个查询,如果我们通过代码来手工编写一段代码实现那个功能,代码大致如下所示
- def function() {
- var count = 0
- for(ss_item_sk in store_sales) {
- if(ss_item_sk == 1000) {
- count += 1
- }
- }
- }
手写代码的性能比Volcano Iterator Model高了一整个数量级,而这其中的原因包含以下几点
手写代码的好处就在于,它是专门为实现这个功能而编写的,代码简单,因此可以吸收上述所有优点,包括避免虚函数调用,将中间数据保存在CPU寄存器中,而且还可以被底层硬件进行for循环的自动优化。
之前描述了了手工编写的代码的性能,为什么比Volcano Iterator Model要好。所以如果要对Spark进行性能优化,一个思路就是在运行时动态生成代码,以避免使用Volcano模型,转而使用性能更高的代码方式。要实现上述目的,就引出了Spark第二代Tungsten引擎的新技术,whole-stage code generation。通过该技术,SQL语句编译后的operator-treee中,每个operator执行时就不是自己来执行逻辑了,而是通过whole-stage code generation技术,动态生成代码,生成的代码中会尽量将所有的操作打包到一个函数中,然后再执行动态生成的代码。
就以上面的SQL语句来作为示例,Spark会自动生成以下代码。如果只是一个简单的查询,那么Spark会尽可能就生成一个stage,并且将所有操作打包到一起。但是如果是复杂的操作,就可能会生成多个stage。
Whole-stage code generation优化.png
Spark提供了explain()方法来查看一个SQL的执行计划,而且这里面是可以看到通过whole-stage code generation生成的代码的执行计划的。如果看到一个步骤前面有个*符号,那么就代表这个步骤是通过该技术自动生成的。在这个例子中,Range、Filter和Aggregation都是自动生成的,Exchange不是自动生成的,因为这是一个网络传输数据的过程。
Whole-stage code generation explain()方法.png
从Spark 1.1版本开始,就一直听说有code generation类的feature引入,这跟spark 2.0中的这个技术有什么不同呢。实际上在spark 1.x版本中,code generation技术仅仅被使用在了expression evoluation方面(比如a + 1),即表达式求值,还有极其少数几个算子上(比如filter等)。而spark 2.0中的whole-stage code generation技术是应用在整个spark运行流程上的。
对于很多查询操作,whole-stage code generation技术都可以很好地优化其性能。但是有一些特殊的操作,却无法很好的使用该技术,比如说比较复杂一些操作,如parquet文件扫描、csv文件解析等,或者是跟其他第三方技术进行整合。
如果要在上述场景提升性能,spark引入了另外一种技术,称作“vectorization”,即向量化。向量化的意思就是避免每次仅仅处理一条数据,相反,将多条数据通过面向列的方式来组织成一个一个的batch,然后对一个batch中的数据来迭代处理。每次next()函数调用都返回一个batch的数据,这样可以减少virtual function dispatch的开销。同时通过循环的方式来处理,也可以使用编译器和CPU的loop unrolling等优化特性。
行存储和列存储.png
这种向量化的技术,可以使用到之前说的3个点中的2个点。即,减少virtual function dispatch,以及进行loop unrolling优化。但是还是需要通过内存缓冲来读写中间数据的。所以,仅仅当实在无法使用whole-stage code generation时,才会使用vectorization技术。
目前的spark架构已经搭载了目前世界上最先进的性能优化技术,但是并不是所有的操作都可以大幅度提升性能的。简单来说,CPU密集型的操作,可以通过这些新技术得到性能的大幅度提升,但是很多IO密集型的操作,比如shuffle过程的读写磁盘,是无法通过该技术提升性能的。在未来,spark会花费更多的精力在优化IO密集型的操作的性能上。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。