赞
踩
大数据平台中Hadoop的分布式文件系统(HDFS)之上形成了一种极具特色的技术群体,那就是SQL查询引擎。这就包括了Hive、Impala、Presto、Spark SQL等;在分布式数据库HBase也具有Impala、phoenix这样的SQL外观,可以通过SQL与HBase交互;另外分布式关系模型数据库(NewSQL),例如:cockroachdb的sql layer、TiDB的tidb模块等在自身架构体系中不仅是查询引擎,也是SQL插入、更新和删除的SQL执行引擎。
作为海量数据计算存储平台,特别容易提出的一个概念就是NoSQL,可是从上面的发展看,不像No SQL么,感觉像是:这个世界没法NoSQL,以前我有篇文章,深入分析了关系模型的真正价值。
我把这篇文章中的一小段摘录出来,回顾一下关系模型是什么:
关系型模型之父Edgar F. Codd,在1970年发表了《大型共享数据库数据的关系模型》 原始的关系模型:
(1)结构(structure)
(2)完整性(integrity)
(3)操作(manipulation)
原始理论具体到实现再翻译成我们好理解的描述:
结构:就是我们经常要先对数据库预先定义的表名和字段(名称、类型)
完整性:就是表的主键不能为空,表与表之间的主/外键关联必须保证是完整的,外键一定是能找到主键的。
操作:那就是SQL表达式啦,SQL的子查询就是典型的闭包(Closure),可以形成嵌套表达式。
因此SQL只是关系模型的一种具体实现。我们只是在大数据领域需要替换关系型数据库的存储逻辑,使得数据库更分布式化,更容易实现存储架构的扩展。因为这是单机存储与计算到了天花板后,必须横向扩展的硬需求,但这也并不是说关系模型过时了!
实际上Hadoo的HDFS、HBase提供的Java接口也好,Spark提供的Java、Scala、Python接口也好,这都需要使用者至少懂一门编程语言才行,Python虽然易学易懂,但也只限于Spark的原生支持,而且Python好歹也是个面向对象编程语言正规军,至少也得让使用者明白什么是面向对象吧?那对于科学家、科研人员、数据分析师、数据运维工程师,学生,基本都是大数据的主要使用者,编程接口的方式对于他们就极为不友好了。这时候再看SQL,老少皆宜,不分阶层,大家好,才是真的好!对吧。
在SQL-ON-Hadoop架构中,Hive依然是成熟度最高SQL引擎,有些文章也把它叫做数仓工具,总之Hive已经从SQL解释器发展成为独立性很高的SQL引擎层,仍依赖Hadoop HDFS存储层,但摆脱了执行层Mapreduce计算引擎的束缚,形成可替换的计算引擎层,可以将MapReduce引擎替换成Tez、Spark。
为什么要替换MapReduce呢?就是因为MapReduce对数据写进写出的方式太过生硬,直接用磁盘作为计算中间过程的数据存储,导致分布式并行处理的性能很差。并行计算在磁盘上进行,其实也是一把双刃剑,不能一味地认为Mapreduce性能差,对于超大规模数据量的批处理,内存是放不下的,所有基于内存模型的并行处理引擎,都会因为内存不够,将需要处理的数据吐到磁盘上,这反而加重了I/O的吞吐,导致不佳的性能。但是在足够的内存条件下,中小数据量的批处理上,基于内存的计算引擎会非常快地给出结果,所以这是必然趋势。
Tez的性能比起MR很快,我在实际环境中也测试过,不仅是内存计算带来的性能提升,它在执行数据处理前比MR引擎有自动的优化策略,以后会成为Hive的默认执行引擎,Hive 执行引擎也可以替换成Spark,即Hive-on-Spark。但是我认为在新项目中直接就是Spark-SQL就够了,没必要再搞个Hive,因为底层都是Spark。Hive-on-Spark的主要目标是那些已经部署实施的Hive系统,通过Spark对Mr的替换,极大提升查询分析速度。
如下图所示:Hive依赖Hadoop,左边是Hive SQL查询过程,右边是Hadooop协作过程。
第一阶段:Hive接收UI的SQL查询,进入Driver和Compiler进行SQL解析并生成执行计划。并发送执行计划给Hive执行引擎(Execution engine)。
第二阶段:主要是在Hive执行引擎将执行计划形成阶段DAG,调度进Hadoop。
第三阶段:Hadoop的MR执行引擎中实现Map和Reduce的任务执行,并且将过程产生的临时数据保留在HDFS。
第四阶段:最终计算结果由Hive执行引擎从HDFS中取出,并发送结果到Driver,再由Driver返回结果给UI。
曾经Google公开了两篇论文,一篇涉及F1支持容错的分布式SQL数据库,另一篇涉及名为Dremel的可扩展的交互式即时查询引擎,在前者的指引下,出现了像PingCAP公司的开源分布式关系数据库TiDB,可以看这篇文章:每日一答:CAP 理论常被解释为“三选二”定律,是否是一种误解?对Spanner/F1容错设计的简描述。在后者的启发下,2012年发布了Hadoop之上的开源、低延迟SQL查询引擎Impala。
Impala有几个特点:
支持HDFS、HBase作为数据源。
复用HiveSQL方言,使用Hive元数据服务,因此Hive表一次定义,Hive、Impala皆可使用。
Impala的查询延迟比Hive+mapreduce要低得多。
Impala架构是无共享模式,因此支持系统级容错性和强大的可扩张性。
Impala也是基于内存的计算引擎,多个处理阶段的中间过程,数据会保存在内存中,2.0版本之后支持大数据量内存与磁盘I/O数据交换。
Impala是以后台服务的形成长期运行,一般推荐Impala服务在HDFS的DataNode节点上运行,保证计算尽量在本地读取数据,减少网络传输。
Impala是C++语言实现,可以充分利用机器内存,单个Impala进程不用像Java程序那样,受限于JVM垃圾回收机制的延迟影响。
Impala具有一项高性能的查询黑科技:底层虚拟机(Low Level Virtual Machine, LLVM)编译查询,可以将查询方法编译成优化的机器码,提高CPU对代码的执行性能。
Impala架构如下图所示:impala后台服务(Impalad)尽量在数据节点上运行(HDFS的Datanode,DD、HBase);每个Impalad包含了查询规划器(Query Planner)、查询规划器(Query Coordinator)和查询执行器(Query Exec Engine)
第一步查询规划器用于解析由JDBC、ODBC、impala-shell或者Thrift等客户端请求的SQL语句,产生执行计划规划器。
第二步连接到当前客户端的查询规划器将执行计划将不同执行部分分配到各个Impalad中。
第三部各个Impalad将获取到的部分执行计划,并交给查询执行引擎,访问本地HDFS DD或HBase数据,实现大规模并行处理(MPP),完成查询处理后返回结果,由原计划分发点实现汇聚与回应客户端结果。
2013年Facebook开源了Presto,支持标准ANSI SQL。Presto的特点特别适合与Impala做一个比较。
首先如果把Impala比喻成一位用情专一的君子,那么Presto就是处处留情的公子。Impala用于查询的主要基础存储系统就是Hadoop HDFS与HBase,我们再看看Presto都设计了多少种存储连接:
Accumulo Connector |基于HDFS的K-V存储
BigQuery Connector |Google的大数据查询引擎云服务
Cassandra Connector
Druid Connector | 实时分析型数据库
Elasticsearch Connector
Hive Connector
Kafka Connector
Kudu Connector |基于HDFS的列式快速分析存储系统
Local File Connector
Memory Connector |直接在内存中建立数据表进行操作
MongoDB Connector
MySQL Connector
Oracle Connector
PostgreSQL Connector
Redis Connector
SQL Server Connector
Thrift Connector |远程过程调用RPC框架
......
但是有一点我感到很不解,Presto都支持那么多存储系统和框架了,而且最早的设计初衷就是为了Hadoop HDFS的高效查询,既然Impala都是直接支持HBase了,为什么Presto不支持HBase呢?有了解情况的朋友还希望给予指点。如下图所示:
其次,Impala更像是一个传统的大规模并行处理(MPP)数仓工具,每个进程都是独立接受客户端请求的,客户端连接到哪个服务,该服务就作为分布式调度者,大家可以看看上篇中的Impala架构图。
但是Presto就是一个标准的主从架构了,客户端要先与主节点打交道,主节点再将SQL解释后的执行计划分配到不同的work节点去执行,并由一个work节点作为结果汇聚返回节点,如下图所示:
最后,我们再看看Impala和Presto在SQL解释、优化和调度之间的对比。如下图所示:左边是Impala架构,右边是Presto架构。
上篇已经描述过Impala主要是任意连接客户端的Impalad后台服务,接收SQL请求,解析SQL并制定执行计划后,由当前节点在Query Coordinator阶段对执行计划各个部分进行调度分配,再由集群其他Impalad后台服务根据调度进入Query Exec Engine阶段实现并行处理。最终结果还是汇聚到调度节点反馈客户端。
Presto获取用户提交的SQL查询,首先进行SQL语句解析( Parser),接着形成逻辑计划相关对象(Planner)放入线程池,最后由调度器(Scheduler)对逻辑计划生成的SubPlan提交到多个Work节点上执行。
Presto也是完全基于内存的并行计算,注意内存保护,根据数据量情况,为每个节点设置合适的内存大小,否则大数据量情况下,内存溢出就是家常便饭;Impala2.0之后支持内存不够情况,数据吐给磁盘,虽然有了可靠性保护,但是内存与磁盘的I/O交换会带来更慢的吞吐。
Presto由调度节点(Coordinator)和注册与发现节点(Discovery Service)实现SQL执行的集群调度管理,由于Coordinator和Discovery Service都是单节点部署,可能会因为Coordinator故障,产生多个Coordinator服务,导致集群的脑裂问题,因此一般建议都是Coordinator和Discovery Service放在一个节点上部署,形成Discovery对Coordinator的唯一性指定,然后再做成一主一备两个节点形成HA,这样主节点服务挂掉,备集群Work都连接到备节点上,实现主备节点的热替换。
Impala的后台服务因为是无共享的架构模式,非常适合集群节点的无限扩张,但往往需要在Impala集群之上安装一个客户端请求负载均衡器,实现对Impalad服务的负载调度。
最后就是Impala的架构更适合于计算与存储放在一起的就近读取原则,有效防止网络传输数据的损耗;但是Presto的架构更倾向于计算与存储分离,存储系统多样化的支持,虽然使自身变得灵活,适配性高,但是默认读数据源并不是采用就近读取原则,导致计算访问数据都是远程操作,例如:在复杂的大数据集Join操作,会导致严重的网络传输的性能损耗。
作为大数据处理计算的大一统软件栈Spark,或将是大数据处理领域里面的Spring framework。我们从下图中可以看到Spark core之上具有了四种面向不同计算领域或方式的Spark模块,Spark streaming模块面向实时流计算,具体方式采用微批处理;MLlib模块面向Spark的机器学习库,尤其是Spark默认对Python的支持,成为Python开发者接入Hadoop生态平台的绝佳入口;GraphX面向图处理,有了GraphX,对于社交网络、知识库、超文本关联度分析、传染病传播预测等应用领域,都可以使用Spark来处理。
我们本次主要分析了解到是Spark SQL,一个将不同来源数据进行关系结构化再进行计算处理的模块。
示例
Spark SQL可以支持从很多种数据源的结构化抽象,Spark SQL从数据源中抽取数据集后抽象成一种具有schema结构的rdd对象:DataFrame,有点类似拿到了一个Hibernate的Session与实体对象的合体(类似,可能不太恰当),可以执行类似下面例子中DataFrame(df)的查询操作。
大家可以看到DataFrame实际上就是将数据源结构化为SQL表列。因为DataFreame也需要进行schema定义。类似下图:
支持的数据源
Spark SQL可以从哪些数据源建立DataFrame结构化模型呢?
json:Spark自动推断数据结构和类型
Parquet Spark的默认数据源,自动保存schema
Hive table Spark支持直接读取Hive数据
JDBC Spark通过Jdbc驱动拉取Rdbms数据表数据
CSV 可根据CSV行头定义列
等等......
架构
如下图所示:
1.客户端根据自己的目标语言,Java、Python、Scala进行Spark SQL操作。
2.Spark SQL访问上述的各种数据源,创建DataFrame对象。
3.通过对DataFrame API的调用,实现SQL方式操作数据(查询、聚合、分组等、连接等)。
4.Spark SQL将SQL操作语句调入Catalyst Optimizer引擎形成执行计划。
5.执行计划进入Spark处理引擎,由分布在不同节点的Spark集群任务并行处理SchemaRDD(DataFrame)。
Spark SQL执行的SQL语句在Catalyst优化器中经历了逻辑计划、物理计划两个过程,逻辑计划过程主要依赖Antlr。首先SQL语句在unresolved logical plan阶段由antlr转换成抽象语法树,这时候会根据Catalog中(存储了所有的表信息、DataFrame信息)的元数据,对unresolved logical plan进行表达式解析,确定表、列都存在后,才会形成真正的resolved logical plan,最后交付Catalyst优化器进行优化逻辑计划(Optimized logical plan)。如下图所示:
转换成功的逻辑计划将进入物理计划阶段,Optimized logical plan会分解为多个物理计划(Physical Plans),最终进入代价模型(Cost Model),根据资源开销成本,去选择最佳的物理计划(Best Physical Plan),最终进入到集群中运行。如下图所示:
在Dataframe之后Spark推出了一个新的数据抽象:DataSet,DataSet可以理解为DataFrame的扩展,对象类型更为显性,这种优势就是在开发起来具有更友好的API风格,更适合工程化管理。
例如:我们定义了一个叫Flight的Dataset实体类
我们可以将DataFrame转换成Flight class类型的Dataset,这时候的变量flights就是Dataset[Flight]强类型了,即具有类型安全检查,也具有Dataframe的查询优化特性。
Dataset在编译时就会检查类型是否符合规范。Dataset仅适合用于基于JVM的Scala、Java,通过case类或Javabeans指定类型。
当调用DataFrame的API时,返回的结果结构就是Row类型;当使用DatasetAPI时,就可以将将Row格式的每一行转换为指定的业务领域对象(case类或Java类)
我们可以将Hive、Impala、Presto理解为比较独立的数仓工具,在上一篇中Impala和Presto的对比,我们甚至可以看到它们俩具有独立的分布式架构。Hive则是Hadoop生态独立性很高SQL解析与执行工具,插接Mapreduce、Spark、Tez计算引擎,高度依赖HDFS存储系统。
反观Spark SQL,它并不独立,应是Spark平台上的一组模块,彻底与Spark糅合在一起,因此谈Spark SQL的分布式架构,其实就是在讲Spark架构。我们从下图可以看到Spark架构的特征,在集群计算资源调度方面与Spark无关,主要依赖Hadooop Yarn或者Mesos实现分布式集群计算资源的调度管理。
同理Spark SQL解析完成物理计划后就完成交由Spark集群进行并行任务处理,Spark集群中Driver提交作业、实现调度,Executor具体执行任务、返回结果。
Executor中通过多线程方式运行任务(Task),而且Executor通过堆内内存、堆外内存管理,实现高性能的内存计算,这点是Spark性能上优于Mapreduce将中间过程数据写入磁盘导致性能慢的关键原因之一。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。