赞
踩
分享嘉宾:苏舒 腾讯 高级工程师
编辑整理:刘鹏鹏 滴滴出行
出品平台:DataFunTalk
导读:本文主要介绍腾讯是如何基于Apache Iceberg进行数据的入湖、治理以及后面的一些优化。将从数据入湖、数据治理服务、数据查询优化以及未来展望四个方面展开介绍。
01
数据入湖
本部分主要介绍Apache Iceberg基本概念以及结合Flink构建实时数据入湖链路。
1. Apache Iceberg是什么?
iceberg其实就是在存储和计算层之间的一个表格式,表格式的作用主要是对计算引擎提供一个访问存储层的接口,能够提供一些ACID语义和MVCC的能力,以及像历史回溯之类的功能。像传统的Hive表都会带一些partition或者是数据的格式、压缩格式、目录信息等,但这些信息都存储在Hive Metastore里,这里也可以将Metastore理解为一种文件的组织格式。从图中可以看到,下层存储层这块是一个比较开放的存储层,可以支持传统的HDFS、对象存储,文件格式也是支持行和列。
2. Apache Iceberg的特性
基于快照的读写分离和回溯
流批统一的写入和读取
不强绑定计算存储引擎
ACID语义以及数据多版本
表,模式及分区的变更
Iceberg很重要的一个特性是支持快照的读写分离回溯以及不绑定任何计算存储引擎,可以方便用户快速接入自己的存储引擎,比如Spark、Flink或者Presto,在Iceberg上用的时候可以基于Iceberg的API做一些connector。还有一个很重要的功能就是支持ACID语义及数据多版本控制,并且也支持表、模式及分区的一个变更。这些功能对于后面我们来构建准实时的数据入湖是一些非常重要的特性。
3. Apache Iceberg文件组织
图中上层可以看到Commit的一个Timeline,Iceberg每次写Commit操作都会生成一个新的Snapshot,比如snapshot-1其实是包含snapshot-0的所有数据,这里可以想象我们平时用git的时候,git的每一次新的提交都包含之前提交的所有信息,通过git log就可以看到,这里我们也可以这么理解snapshot的Timeline。下层是Iceberg基本的文件组织格式,每一个manifest管理n个DataFiles,DataFiles在manifest记录的是DataFiles的路径信息。通过这样的一个文件组织格式,在读取的时候就可以很方便地做Commit TimeLine,比如说现在是11点,Commit的是Snapshot-1,如果想读snapshot-0的话其实只需要指定Snapshot-id,就可以很方便的实现数据的回溯。
4. Apache Iceberg读写流程
接下来这个是在写Iceberg的时候的一个简单读写过程。从这个链路上我们可以看到每次Iceberg的写操作,比如说正在写第一个S1的时候是没有办法读的,只有发生了Commit C之后S1才是可读的,这个时候如果有n个线程同时在读但有一些线程在写的时候,可以做到只有commit完整的数据之后,对用户的读操作才能被用户的读线程所看到。这也是Iceberg里面非常重要的特性,即读写分离。在对S4进行写操作的时候,S3、S2、S1的读操作是不受影响的,同时这个时候S4是没有办法读得到的,只有Commit之后S4才能够读得到。Current Snapshot这时候就会指向S4,默认Iceberg读的时候都是从最新的Current Snapshot开始读Iceberg的数据。那么如果要读前面的数据其实就可以指定Snapshot的id去进行数据回溯的读。
5. 增量读取
我们前面知道Iceberg每个不同的Snapshot都包含了之前的所有数据,比如说像图中S2是包含了S1的数据,在每次读取的时候就可以指定S1之间的增量即紫色这部分的数据,不需要每次重复地读全量的数据。增量数据在后面构建准实时的入湖链路是非常重要的,因为从构建一个Flink的job比如从Kafka写入数据到Iceberg,写入之后下游的任务可以继续读Iceberg,读的时候就可以选择增量的读取,在整个链路上就可以实现实时的入湖链路。
6. Apache Iceberg Flink Sink
在Iceberg实时入湖的链路上我们用的是现在比较流行的实时计算Flink,我们知道上游InputStream是会源源不断地往下游去写的,如果在写的时候不做多个并发写的话对整个性能会有非常大的影响,因此把Iceberg Flink Sink拆成Writer和Commiter两个部分。那么为什么只有一个Commiter呢?我们知道Iceberg的commit操作其实要到Hive MetaStore去获得一个锁,如果进行多个commit的话,每个commit都会到MetaStore获取那个锁,对MetaStore来说不管有多少commit操作都会进行排队。所以这里只有一个并发commit是为了让Iceberg前面n个Writer所写的数据一次性从不可见的状态变成可见的状态。其实到commit状态的数据已经都到了存储上了,只是现在的状态是不可见,这对准实时的数据接入有非常大的帮助,比如说HDFS写数据的时候需要在一个temp里面把整个temp目录move到可见的目录上,这里其实数据已经全部都写到存储上了,所做的操作仅仅是把它的状态从原来的不可见状态变成了可见的状态,也就是前面我们所说的每个snapshot commit操作。
7. 近实时数据入湖
上图是我们内部大量采用的Iceberg实时入湖的一个简单链路。上游消费Kafka或者Binlog的数据,中间采用Flink将数据写入到Iceberg,下游可以基于Iceberg基础上继续再接Flink去做一些ETL或者其他的操作,也可以直接在Iceberg的基础上跑Spark或者Presto的一些任务。
8. 实时入湖平台化建设
这块是我们内部在整个实时入湖链路的基础上为了方便用户而构建的一个任务管理平台,可以非常方便地帮助用户去新建一个端到端的入湖任务,也可以看到一些任务运行的状态。
02
数据治理服务
我们知道Flink任务跟传统的任务不同的是,Flink任务是一个实时任务,实时任务的特点是常驻性,起了一个Flink任务就长时间运行,理想状态下是不会中断的。这种情况下,上游数据是源源不断地进来的,Flink任务会源源不断地进行Commit操作,如果对数据的实效性要求比较高的话,比如说Flink任务运行的时间是一分钟、五分钟或者是十分钟级别。当运行了几天或者是一两个星期之后,在磁盘上发生Commit的次数就会非常地多,如果根据partition进行分区的话,磁盘上的文件数量会膨胀的非常大。如果是传统的批任务的话,跑完一批之后在后面再跑一次compaction任务进行compact。实时任务因为是不中断的,所以就会遇到小文件数量膨胀、元数据膨胀等的一些问题。
1. 实时数据入湖遇到的问题
我们知道实时任务为了保证实时性进行高频的commit操作引起的小文件数量以及元数据数目膨胀引起查询性能的降低,还有数据本身缺乏生命周期管理。有时候写很多的数据到HDFS上,一段时间就要根据实际业务场景的需求对过去的数据进行清理,比如清理掉两个星期前的数据,这时候就需要额外的服务化平台去帮助用户去做这个事情。还有一点就是数据的实时写入并不能够根据用户真实的查询条件进行分布,因为写入只能根据写入的条件去写入,但是查询条件比如说where或者过滤的条件可能是不一样的,这个时候如果某些查询经常频繁发生的话,就会导致访问这n个节点的查询性能不太高,后面的服务也需要对数据做一个合理的重分布。
2. 不合适的小文件合并方案
我们在FlinkIcebergSink这边尝试了很多小文件合并的方案。我们知道Flink Sink上游每次都会做commit操作告诉当前commit操作是commit到了哪个Snapshot,snapshot里面增加了哪些文件,这些文件其实都是当前Snapshot里面的。比如说真实的数据文件在下游再接上一个operator的话,就可以对每次commit操作的文件进行compaction的操作。这里的rewrite也是同样的道理,比如上游commit了90个文件,假设rewrite分到了30个文件,会对这30个文件进行rewrite操作把这30个文件rewrite成一个文件,把文件rewrite成一个文件的时候就会把rewrite的文件数量告诉下面的replace,replace知道当前的事物比如说新增30个rewrite,就会对30个rewrite文件再次进行commit操作,也就是说replace和sink其实做的都是commit操作,只是replace commit的是rewrite的结果,而sink commit的是上游写下来的数据。replace之后生成新的snapshot的文件数量就是3而不是之前的90。这个合并方案我们之前也做过非常多的尝试以及生产和测试环境的大量测试,实际证明这个其实是不合适的。因为下游所有的逻辑都是跟着Flink的任务走的,下游的不管是replace、rewrite或者是ScanTaskGen都要占用Flink TaskManager的计算资源。在计算资源有限的情况下,在后面再接上任务的compaction写一些任务的话都会大量的占用整个集群的计算资源。如果是同步的任务,下游的rewrite都会阻塞掉上游数据的输入,假设把它改造成异步的去跑后台的线程,后台的线程也要占用task的计算资源,所以整体在生产上面通过观察发现,每次如果rewrite操作的时候整个集群主链路上的数据处理都会受到大大的影响,我们为了保证用户对小文件合并的透明,就想到了要提出一个完整的数据治理的服务。
3. 架构总览
上图是我们数据治理的整体架构,中间蓝色的四个框是我们主要要做的业务逻辑,我们把数据治理服务主要分为四大块,第一块是Compaction Service,Compaction Service是为了解决小文件过多的问题,我们知道Iceberg是读写分离的,我们可以对Iceberg实时链路上写到磁盘上的一些小文件进行异步的compaction,这个compaction需要独立的一部分计算资源。这样的话,计算资源能够帮助解决一些小文件的问题,又不会影响到主链路上的数据。Compaction Service服务会定期的根据Iceberg上游的表决定进行多大力度的合并,比方说文件合并的target是128M,每个snapshot文件数量是n,那就会根据这样的数值去判断当前的Iceberg写入snapshot里面这些值的状态是多少,以此决定要不要去触发异步文件compaction的操作。Expiration Service是为了定期清理snapshot,比如说现在的snapshot里面的文件是1、2、3,合并之后的文件假设是1+2+3=6,我们知道6其实是包含1、2、3的一些数据,那么现在6的snapshot的数据就跟1、2、3的snapshot数据是重复的,在磁盘上是存在Double的数据,这个时候就需要定期的跑snapshot把合并之前的那些snapshot数据进行定期的清理操作,删除一些冗余的数据可以大大的减少存储压力。第三个就是Clustering Service,对某些查询比较频繁的操作可以通过Clustering Service进行数据的重分布,比如说根据某些查询的列进行数据的一些聚合,这样的话在某些查询经常发生时尽可能避免扫描过多的文件,对查询的性能会有极大的提升。最后是Cleaning Service,针对某些用户会有一些对过去数据的清理操作,后台会有一个Service根据用户会去配置表里,比如说配置表里面的TTL是3天或者30天,Cleaning Service就会定期地根据用户配置的去清理这些过期的数据,这点比较类似kafka,kafka也有一些类似数据超时的机制。
4. 总体流程
上图是整体数据的流程,可以看到Compaction整个服务中的数据流。我们先从Compaction服务来简单的介绍,比如说用户对Iceberg进行操作,我们在Iceberg接口这边已经为Iceberg实现了Iceberg的Metrics汇报到外接系统的功能,首先Metrics的Reporter会将Iceberg的一些建表、删除、更新或者任何Commit操作所产生的snapshot创建snapshot的summery汇报到iceberg的Metrics Event Handler那边,Metrics Event Handler接收到不同的事件之后会根据不同事件的类型将这些事件存储到MySQL。这里我们做了一个改造采用一个消息队列来保证事件的时效性,并且对消息队列里面的数据定期的保存在CheckPoint中。我们知道表其实是有两种状态,DDL状态或者DML状态,表的一些基础的记录信息比如表在compact之前/后的文件数量、以及表的文件数、操作的类型比如新建、commit、delete这些表所能提供的一些metrics信息,当数据通过消息队列发送到中间阶段的时候,中间阶段内部有个规则管理器会去配置大量的规则,比如一些用户希望表在每产生100个10M文件的时候就进行一次合并。这些规则的接口其实是开放给用户去配置的,这些接口配置之后会将配置传给下游的任务调度器,任务读取器会读取上游发送过来的一些规则,以决定现在要根据这些规则去起一个什么样的任务。图中我们可以看到下游会有很多不同的任务,比如JOB1、JOB2、JOB3这些任务目前是采用离线的Spark任务去跑上游发送过来的信息。执行的频率有5分钟、10分钟和60分钟,主要就是根据用户所配置的表。用户的表里面如果要保留过去100个文件,这个时候在监控里面看到用户会一直频繁地在提交,那么在单位时间内所产生的文件数量会非常非常的多,这个时候就需要更低的频率比如5分钟去对用户的表执行一次compact操作。有些用户比是10分钟或者20分钟才commit一次,这个时候可能只需要跑小时级别或者跑5个小时调度一次去做这个文件的处理,这边是针对不同用户的表的一些metrics的情况来决定应该将用户的表放给哪一个粒度的调度任务去执行。那么我们知道每个job可以看到很多用户的表,一个job可能会处理三四个表,每次处理完之后会将这三四个表的处理逻辑通过Metrics System消息队列再反馈给刚刚我们记录的MySQL,然后再通过Grafana或者一些监控的工具就可以看到整个任务的compaction的运行情况,包括compact之后表是什么状态这里都可以看得到。还有一个记录重要的点是每次compact的表任务执行的过程中花了多少时间,这样就可以通过Job Handler动态地调整每个Job所负责的表的数量,比方说一个job执行的表是1、2、3三个表,发现1表跟2表执行compact任务花了10秒钟就执行完了,3表执行了5分钟,因为整个任务是并发提交的,所以需要等到第三个表执行完之后这个任务才能够继续调度下一次。这个时候就可以在下次调度的时候把3表调度到其他的任务区,1表就可以在一分钟之内进行不断地做文件数量的处理。
5. 实践效果
对于用户来说要使用compaction服务其实是非常简单的,只需要创建一个表然后在表里面配置文件处理的参数,图中表示的是snapshot保留过去10分钟的snapshot,或者是过去10个snapshot的数量,metadata的文件保存过去10个metadata文件,每新产生5个snapshot就触发一次rewrite操作。这个时候用户只需要去配置后端的 metrics的汇报和文件的compaction以及文件的一些expiration,这些所有的动作在这个时候全部对用户是透明的,用户只需要去配置这个表,后面我们的服务都会自动地帮用户去做好。
接下来可以看到整体的数据文件和meta文件数量,根据刚刚我们配置的值,如果在长时间运行compaction之后是能够控制在一个比较合理的范围。我们可以看到下面meta文件夹里面放的其实是iceberg的meta信息,包括像m0、avro这些都是snapshot的信息。上层的parquet是真实的数据文件,我们可以看到第二位有140、269、286这些的文件其实都是执行的compact之后的rewrite之后的文件。
03
数据查询优化
从刚刚这个合并里面我们可以知道,在做rewrite的时候只是把这些文件进行简单的重写,比方说将三个文件写成一个文件,对整个的查询性能其实是已经能够得到一定的提升,因为相当于扫描的文件数量得到大大的降低,但是如果说真的要对某些频繁发生的查询性能进一步优化的话,这样是远远不够的。所以我们接下去会介绍我们在数据查询优化方面所做的一些工作,首先介绍基于空间曲线算法优化iceberg的数据查询效率。
1. 空间填充曲线简介
首先介绍一下什么是空间查询曲线?
在数学分析里面,空间填充曲线是一个参数化的组合函数,会将单位区间内的区间映射到单位的正方形或者立方体中。比如说在一个二维的空间里,可以通过一维的一条曲线穿过这个空间里面的每一个点,直到填充满整个二维的空间平面,如果曲线所填充的粒度越来越密的话,其实整个二维平面会被填充满,这个是数学的一个重要特性。查询优化这块其实就是基于这样的特性,我们可以看到空间填充曲线的话,因为在二维平面里面它经过了二维平面的每一个点所以我们就可以将整个二维平面的空间降成一维,将多维的一个空间点转化成一维对于后面的数据查询优化算法是非常大的帮助,今天我们讲的一个重点其实是利用了图中第四个Z-Order的算法。
2. GeoHash算法介绍
我们知道GeoHash算法就是基于Geo的特性来做的,我们可以看到图中GeoHash算法其实是一个地理位置编码将空间分成一个网格,在网格中可以定位某一些点以及哪些点离这个点最近。这个算法常用的一个场景是点评、外卖查看附近有多少外卖商家。从图中我们可以看到,对于生成一个z order地址来说,比如说黑色的虚线所画的这块,四个地址我们就可以认为是靠的比较近的。一个点附近hash字符串如果前缀是一样的我们就认为它的点是靠的比较近的,我们可以看到每个虚线框里面的前缀都是100,通过这样的一个z地址可以把二维平面里面的数据进行降维,可以让降维之后的距离变得比较近,通过这种算法我们就可以很方便的进行多维数据的聚合操作。
3. 为什么需要多维数据聚合
我们知道在N列数排序的时候比如order by FirstName,第一列的效果往往是比较好的,越往后可能效果会越差,到n列之后整个数据可能就是离散的。如果查询条件比较多的情况下,文件过滤效果是比较差的,因为可能需要扫描表的所有数据才能去读。数据如果呈现自然聚集的话会有几个特点,比如单调递增的id或者其他根据数据写入的时间或是写入前对数据进行的排序,这在这个例子中可以看到越往后几列数据越乱。
4. Iceberg表多维聚合
同理,我们在iceberg表中间做多维聚合时,首先将不同的snapshot的文件进行合并写入小文件,然后进行optimize优化数据的分布,其实就是刚刚我们说的基于z order算法。我们将原先分散在集群中不同地方的文件进行重分布,这样在查询的时候只需要根据查询optimize之后的结果文件就可以了。在这个例子中绿色的点可以理解为是符合过滤条件的,红色是不符合过滤条件的,过滤条件指的是在做where的时候的过滤条件。可以看到在Snapshot N的时候数据是处于上游写入的状态,在第二个阶段的时候进行optimize的时候可能是第一次进行optimize操作它的strategy是all需要扫描所有的文件,这个时候不符合过滤条件的都被聚集到m1和m4,m1、m4里面都是红点不符合聚合条件。当然它也不能保证所有的红点都scan到某些文件,因为数据要保证相同的过滤条件尽可能的聚集在一个文件里。在第二个阶段比如说这个时候会在第一次进行optimize之后还会进行一次写,因为上游只要是实时的就会不断的往Snapshot里面写入数据,比如f2001到f3000这一段写了n个数据文件,在Snapshot N+3阶段执行的是incremental的optimize就只去优化新写入的这些文件,经过这样的操作之后,文件的数量会大大的减少,在查询的时候就可以避免非常多的没有用的文件扫描操作。
我们在图中可以看到,在Spark sql这边其实是支持这样的一个语法,比如说在optimize table employee zorder by first_name, last_name,我们假设first_name和last_name是二维的,因为是两个字段其实就是二维的。首先根据first_name和last_name会先计算它的分区id,计算的规则目前我们实现的是一个固定的partition值,然后将这个分区id转换成一个二进制,然后基于GEOHash算法。这里可以看到是交错位去生成z地址的,Thomas 0放奇数位,More 0放偶数位,以奇偶交错的方式生成z地址。这样生成z地址可以看到Thomas More和Thomas Alva Edison以及Melisa Kort在第二列的排序都不是有序的。经过ZOrder之后前缀都是16个0,这时候就可以将这几个聚合在一个文件里。当根据FirstName或者LastName进行查找的时候就可以很方便的根据ZOrder的地址进行查询操作避免其他文件的扫描。可以看到它其实是根据多维Column生成Z地址,从f2到f1000的数据先根据里面的数据进行线性扫描,扫描生成一次地址,生成地址之后,接下来要做的一个很重要的事情就是Repartition。
根据Z地址进行Range重分区,因为只有根据Z地址进行Range重分区之后我们才能够将原先分布在不同的点的数据的文件聚合到同一个点上。比如根据Z地址重分区之后可能生成了两个partition,在查询的时候就必须对数据进行写回存储。repartition之后要进行一个重写操作,重写之后生成一个新的snapshot N+1,这个过程也就是刚刚S N到S N+1的一个中间发生的详细的过程。
经过事物回写存储之后,在查询的时候就根据where条件智慧扫描m1和m3的数据,因为m2里面都是红点不符合查询的条件。
5. 查询性能优化评测
经过我们的优化之后,可以看到图底部有条select语句计算一个简单的count,根据first_name和last_name进行过滤。上部分是在没有优化之前在HDFS上和优化之后的性能对比,可以看到性能差距是非常大的,性能优化的一个主要的点就是把大量的小文件扫描的时间优化了。
04
未来展望
Iceberg内核及数据湖平台化的工作规划
1. Iceberg内核能力
进一步优化索引系统,提升查询性能。前面说到我们对查询性能进行了zorder索引系统的构建以提升查询的性能。但是zorder是有一定的局限性的,它需要根据查询条件去进行re-clusting,如果查询条件发生变化的话需要重新计算。另一个是如果查询条件特别多达到几十个或者上百个的话,zorder会面临维度膨胀的问题,计算出来的z地址会非常的长。这一点我们后面会根据不同的场景需求进行不同的索引来尽量避免过多的文件扫描以及使用zorder没有解决的一些问题。
增量读取能力的增强,MOR方式入湖。我们知道现在的增量读取是每次读取的是incremental的snapshot,但这个时候如果发生replace操作的话,产生的rewrite之后的snapshot在这里的增量读取整个语义目前是没有很好的定义的,我们是希望在引擎层通过skip以及通过记录rewrite之前的一些meta信息来解决这个问题,这块的话也是我们下一步的一个任务的规划。
SQL能力的增强。用户可能很希望只通过一些SQL就能够执行一些任务,比如用户通过我们的平台化建表,我们可以将这个表很方便地纳入平台的管理上,用户自己建的表如果我们没有办法check到的话,也是需要提供一些SQL增强的能力,方便用户更好的去执行使用iceberg过程中遇到的一些数据管理问题。
2. 数据湖平台化建设
持续迭代数据治理服务平台。我们会持续迭代数据治理服务化平台,包括如何更好地去执行小文件合并的策略,包括怎么样尽量避免重复的小文件的rewrite操作,已经重写了的小文件什么时候将它合并到我应该要被写的文件列表里,这些都是需要在后面不断地迭代中去不断地优化。
统一的元数据管理,元数据发现。对于用户来说,因为数据湖本身是写后schema的模式,所以用户其实并不希望数据只是上传了CSV或者JSON这样的原始数据对于里面的schema其实可能并不知道,希望平台能够帮助他发现这些schema,这一块也是平台化建设中后面不断地去优化的内容。
与更多数据系统打通,构建入湖+分析平台。比如说数据已经写入iceberg里面去了,可能会在上面继续构建一些分析型的任务,比如更好地去优化presto的查询性能,或者去进行平台化构建更多分析型的任务,比如spark或者flink的批任务,这块会跟更多的计算引擎去打通,更方便用户使用从端到端,从入湖到分析的整个的一个链路平台。
05
问答环节
Q:iceberg的zorder有计划提交到社区吗?
A:我们现在是把剩下的一些测试和优化进一步做完善之后,后续有计划反馈给社区。
Q:zorder优化是针对大量小文件场景下的优化吗?
A:zorder优化的场景是某些经常发生的查询,查询条件相对固定这种情况下为了提升查询的性能会用zorder的算法根据查询的条件将数据进行重新的分布。小文件就是刚刚讲到的上面会有一个compaction先将文件进行一个合并,然后zorder对数据进行重分布,以此来提升整个的查询性能。
Q:也就是说小文件不多的时候效果也是很好的吗?
A:是的。
Q:hive全量历史数据的迁移入湖有相关的支持方案吗?
A:这块我们内部构建的一个平台上其实是已经有相应的任务,比如执行了一个hive的一个入库,从hive表导入到iceberg表的链路上,内部其实是会有平台化上面会取一些类似于像spark导数任务去做这样的事情。
Q:需要把hive的全量导数这部分全量读一遍,然后才能入湖吗?
A:现在我们是这么做的,就是spark导数。
Q:Clustering service是通过数据的冗余存储,把数据以其他的column做partition或bucket或sort来提高file proning的效果吗?
A:clustering就刚刚我们说的,它其实根据的是Column,比如说查询的话是根据Column就是我会将这个column的数据在某些经常发生的一些查询的条件的column数据会把它聚合在同一个文件里面去,其实在iceberg上这个阶段如果说针对clustering数据生成新的snapshot,针对去读snapshot的时候就会读到下面clustering之后的一些文件。如果说是担心数据冗余的话,因为bean重新生成后的数据文件肯定是需要通过snapshot对外暴露的,可以去跑一些数据定期清理的一些动作去完成这个事情。
今天的分享就到这里,谢谢大家。
分享嘉宾:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。