当前位置:   article > 正文

Apache Iceberg 对推荐应用架构的优化及读写流程解析本文_iceberg读取流程

iceberg读取流程

Apache Iceberg应用场景示例:

在之前的文章中相信大家已经对Iceberg有了很多了解,Iceberg具有 ACID,隐式分区,partition evolution等功能。那么这些功能在实际的应用场景中会有什么收益呢?

我们来看一个大数据应用中比较典型的业务 “广告/信息流推荐服务"。在推荐服务中,业务往往通过join用户的点击、曝光数据来生成正负样本,然后用增量的样本数据比较实时的更新在线模型。在增量样本更新过程中总有延时到达的点击数据因为无法及时join而被当做负样本处理了,所以需要每天利用一天的全量数据进行一次迭代来修正在线模型,这是一个标准的lambda架构,通常的实现模式如 [图1]:

图1

当前数据处理流程的描述:

1、按照小时或者N分钟创建分区目录,按照时间把数据落地到相应的目录中,在完成一个分区的数据落地之后,创建一个success的标记文件,表示这个目录的文件已经可读了。

2、实时join任务以曝光数据为视角,向后推迟N分钟和点击数据进行join,把样本数据输出到增量样本表中,增量模型训练任务定时(1小时 或 10分钟)读取增量样本表中的数据来比较实时的修正业务模型。

3、全量join任务以曝光数据为视角,向后推迟24小时和点击数据进行join,把样本数据输出到全量样本表中,全量模型训练任务每天读取全量样本表的数据来修正增量模型的偏差。

当前流程中存在的一些问题:

1、落地管理复杂:为了保障落地数据读取的稳定性,一般不会再对含有success标记的目录做更改操作,那么延时到达的数据要么直接丢弃,要么落在临时数据目录中,等待合适的时机再补充回正确的时间分区目录,这不仅增加了落地任务的复杂性,也加剧了延时数据的读取时效。

2、分区固化:数据落地任务的分区策略通常是固定的,想根据数据量的大小来灵活修改落地分区的成本非常高,每次SQL读取都需要明确指定确切的分区参数,数据读取不够灵活。

3、数据不可变:正负样本数据表中的数据一旦生成了就不可更改,因此必须依赖天级的任务来修正跑偏的增量模型,这样明显的增加了资源的使用量和任务维护的复杂度。

引入Iceberg后数据处理流程的变化:

1、数据落地任务直接把数据落入Iceberg表中,不再需要完整性写入检测、延时数据处理、目录管理等逻辑,只要保障每次写入成功调用了commit操作就行了。[图2]

2、实时(小时/10分钟)join任务可以按照时间片或者快照对点击数据和曝光数据增量读取,然后把计算的正负样本数据写入样本表中;

3、样本表可支持update操作,因此模型训练任务可以随意设定时间窗口来读取样本表中的数据做模型训练,而不需要再单独维护一套更大窗口的数据。

图2

Iceberg对当前流程的优化:

1、自带ACID能力:能够保障每一次写入的数据都是一个完整的快照,落地任务把数据直接写入Iceberg表中,不需要任务再做额外的success状态维护。Iceberg会根据分区字段自动处理延时到来的数据,把延时的数据及时的写入到正确的分区,因为有ACID的保障,延时数据写入过程中Iceberg表依然提供可靠的读取能力。[图3]

2、分区管理灵活:Iceberg 表提供partition的evolution功能,可以根据数据量的变化灵活调整分区策略而不需要修改落地任务。分区在Iceberg中是自动转换的,读取数据的时候不需要在SQL中指定分区,数据的存储和读取的灵活性都很高。

3、结果数据可更改:目前社区版本的Ieberg正在支持row level的update功能(腾讯内部已经支持),这使得上面案例中的样本表可以及时的修正样本数据,在只有一份数据的情况下可以保障模型训练任务不把模型跑偏。

4、支持增量读取:Iceberg还支持增量读取功能,可以根据snapshot 来增量读取每一次修改的数据,这大大增加了数据读取的灵活性。

5、数据质量更直观:之前的数据写入完整性检测依赖落地任务的健康度,可能会出现数据落地成功而没有生成success文件或者生成了success文件而数据不完整的情况,而使用iceberg之后可以根据snapshot的生成时间和数据状态来更加合理直观的判断数据是否正常。

图3

Iceberg 数据读写流程解析:

不仅仅在ACID和partition维护等功能上给业务减轻了负担,而且在数据读取性能方面也做了非常多的优化,我们来看一下Iceberg内部的文件组织模式以及它在数据的读取流程中作了哪些优化:

1、Iceberg写入流程及文件结构:Iceberg在数据写入的时候,  先把数据写入到data file文件中;  当一组data file文件写完之后,会根据这个data file文件中column的一些统计信息(如:每个column的min/max值),生成一个对应的manifest文件;  然后Iceberg把一次写入后涉及到的manifest文件组成一个 manifest list, manifest list文件中也会存入一些相关manifest的统计信息(如:分区信息,manifest有效性)等;  然后按照整个manifest list 生成一个对应的snapshot文件;  生成完snapshot文件之后,Iceberg会把当前snapshot的ID及存储路径等信息写入到metadata文件中;  当一切准备完毕之后,会以原子操作的方式commit这个metadata文件,这样一次iceberg的数据写入就完成了。随着每次的写入iceberg就生成了如[图4]这样的一个文件组织模式。

图4

2、Iceberg的分区查找优化:Iceberg数据表每一次的修改后的状态都会生成一个snapshot(s0,s1)文件,snapshot文件中包含了一个manifest文件的list,list中存储了当前的snapshot状态是由哪些manifest文件组成的。每一个manifest的文件中会指向到真实数据的存储文件 data file(一般是parquet格式)。在这种结构中,每一个快照读取所需要的数据文件都已经清晰的定义在了manifest list 和 manifest的文件中,并且manifest文件中还存储了相关的partition信息,那么在读取数据的时候如果需要删选partition,通过manifest的中存储的信息以K&V映射方式在O1复杂度的计算中就能定位到需要读取的partition目录。当前常用的数据读取引擎,例如hive需要遍历整个数据目录下的文件索引来寻找必要的partition,是一个O(n)的复杂度查找过程。在大数据常见的海量分区下,采用partition映射的模式来选取目录的优化效果是非常明显的,可以在Ryan Blue的讲座中看到在NetFlix的应用场景中2600个分区只需要10S就列出了,而使用hive大概10分钟还没有完成 。

3、Iceberg谓词下推的三层过滤:  分区过滤:Iceberg支持查询中的谓词下推,前面已经说了Iceberg是支持隐式分区的,就是说在读取数据的时候不需要在SQL中指定分区。Iceberg会接收上层计算引擎下推过来的谓词表达式,根据谓词表达式中column分区列的信息进行分区转换的计算。例如 一个Iceberg表有一列 time ,用户设定了在 time 列上按照小时分区,当查询条件为   time >= 2020-01-01 10:00 AND < 2020-01-01 13:00 的时候Iceberg会根据下推过来的谓词表达式和Schema中定义的分区转换表达式进行计算。直接算出数据分区是在 10点11点12点三个分区中,然后依据manifest中的分区字段直接定位到分区目录。  文件过滤:Iceberg会把谓词继续下推到更细的筛选粒度,根据谓词的表达式和manifest中column的min/max值Iceberg可以有效的过滤查询数据所覆盖的具体data file,对扫描集做进一步的筛选,如果筛选column是有序的那么下推效果将更加明显。  RowGroup过滤:经过分区过滤和文件过滤之后Iceberg还会继续把谓词表达式下推到data file文件内部的RowGroup级别,根据parquet文件的metadata信息对RowGroup做进一步的筛选。经过以上三层的筛选,Iceberg最终把数据的扫描集缩小到必须读取的RowGroup级别,然后把需要读取的RowGroup数据读入到内存之中。(同样在Ryan Blue的讲座中我们可以看到,通过层层筛选(命中 min/max)之后,iceberg使得数据计算任务从61小时降低到了22分钟)。

4、Iceberg的向量化读取和数据的zero copy:在低版本的spark中,由于spark DataSourceV2的API不支持批量读取,因此Iceberg通过for循环把筛选后的数据一行一行的返回给spark去处理这个过程中既需要数据不断的在内存中互相拷贝,也无法发挥列式数据在现代CPU架构中的向量化处理能力。为了进一步提升读取速度,Iceberg在spark2.4.4版本之后,利用spark BatchColumn的读取特性引入了向量化读取的能力。  经过谓词下推后,Iceberg把需要的RowGroup数据读入到了内存中。RowGroup是列式组织的,具有可向量化处理的优势;  Iceberg会根据SQL语句的project来删减需要读取的 column trunk;  然后Iceberg借助Arrow插件作为共享内存,以page + Batch size 为单位一次性的把一个批次大小的数据存入到共享内存中;  当数据存储完之后把共享内存地址返回给spark,spark拿到共享内存地址之后,可以不再进行数据拷贝直接通过偏移量来访问Arrow获取数据。

结语:Iceberg   还很年轻,但是它目前已经展现出了相当优秀的潜质,相信在将来云环境中,在存算分离和对象存储更加普及的情况下Iceberg会有更大的发挥空间。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/1001502
推荐阅读
相关标签
  

闽ICP备14008679号