当前位置:   article > 正文

Flink SQL在B站的实践

flinksql关闭slotshare

本期作者

FlinkSql团队

B站实时平台flinksql团队,负责flink引擎sql功能的研发,支持的业务包括实时计算,流批一体以及数据湖

01 FlinkSql在B站

目前在B站,线上大概有4000+的flink实时任务,主要支撑数据集成,实时数仓,模型训练,特征指标计算,以及增量化等业务。其中90%以上都是sql任务,存量的jar包任务,也在逐步推进尽可能往sql改写,sql在计算描述上的优点比较明显,既降低了用户的使用运维门槛,也降低了平台日常的答疑成本。

大量的sql作业,对flinksql本身可用性提出了比较高的要求,我们的主版本是基于flink1.11.0演化而来,围绕稳定性、功能和性能三个方面进行开发优化。同时,在探索流批一体的新场景上,引入了社区最新的1.15版本,为了拥有更完善的批处理能力。

下面一张图是展示了B站内部的flink生态:

9bf8370ea361b1b97495ce6180f1b546.png

在2022的过去时间里,我们围绕sql做了三个事情。

1. sql本身的可用性扩展和优化。

2. 基于流批一体的实时project探索。

3. sql的性能基础flink state优化。

02 FlinkSql的扩展和优化

flink1.11.0作为社区流计算sql相对完善的第一个大版本,本身的基础功能基本能满足大部分的场景,但是在一些特殊场景下,需要进行一些扩展优化才能满足业务需求。

 2.1 实现Delay-Join

在flink业务上,常常有这样的场景,一条流去join一张维表做字段补全。其中维表本身也是一个flink任务是实时生成的,维表任务一旦发生延迟或者抖动,join任务的join成功率就会显著下降,对业务来说会造成比较大的影响。

在和业务方沟通并且参考业界,我们在异步维表join上提供了Delay-Join这样一种能力,流式数据在进行join维表的时候,可以配置没有join上后的数据行为:

1. 和以前一样继续输出没有join上的流数据;

2. 数据暂时不输出,延迟一定的时间,再去进行一次join,并且这个backoff+retry流程,可以配置循环的次数,超过次数后,无论是否join上,也会进行输出。

d646ccf8f1707051ba9d93699922f2b3.png

Delay-Join在正常情况下不会对join任务引入额外的延迟,但是在实时维表出现延迟的情况,能有效的解决join成功率下降的问题。下面我们简单讲下实现方式。

flink里面维表join从框架层看就是Operator+ProcessFunction+TableFunction嵌套实现,无状态,用户末端实现TableFunction即可,实现Delay-Join能力最直接的思路就是在TableFunction里面加内存队列和定时器,但是这种方式存在比较大的问题就是难以把控内存,在流量比较大的场景容易oom,且无法对接到flink的checkpoint机制,无法保障一致性。考虑到内存限制,以及与checkpoint的兼容性,数据存放在KeyedStateBackend里面才是最优解。

对于KeyedState顾名思义,在flink的机制里面必须与keyBy逻辑绑定,当前算子与上游的连接方式必须是KeyBy,才能使用KeyedState,代码侧分析也能看出。默认的的维表look-join框架,是没有按照join的condition进行KeyBy操作的,与上游的连接是非KeyBy模式,function侧无法初始化KeyedState,因此我们对look-join的生成规则进行的改变,按照join的condition的字段进行了KeyBy操作,保证了我们KeyedState的获取。同时,在定时器的实现上,我们也直接使用了flink的timer机制,也是与KeyedState绑定的。

由于 KeyBy逻辑会导致上下游算子无法chain在一起,在一些原本能chain的场景会带来性能的退化,因此该功能我们默认关闭,需要有需求的用户手动开启。KeyBy除了在Delay-Join的场景需求,在优化维表cache命中率方面也能发挥比较好的效果,保证维表join里相同condition的数据一直发到一个并发度上,目前connector团队的同事也在积极对接中。

目前维表join仅支持任务全局配置,在多维表场景下对单维表做支持的功能暂时还未具备,任务配置参数如下:

f302dde1cf383c607f03ae7d9c5754b5.png

后期我们将会通过 sqlhint 的方式将配置细化到每个维表 join,达到多维表场景下的单维表具有独立配置的能力。

 2.2 支持TVF

开窗作为flink里面一种非常典型的计算方法,在指标以及特征的场景被大量使用,但是传统的group window有几个问题,业务用起来体验并不太好。

1. 不支持mini-batch,每条数据过来,都会触发state更新,window本身是天然的mini-batch语义,如果能支持mini-batch,减少state读写,性能要好很多;

2. 不支持两阶段,倾斜问题无法解决,线上不少任务计算维度只有时间窗口,在任意窗口期间,所有的数据都会集中到这个窗口,很容易成为瓶颈,无法scale解决;

3. 不支持CUMULATE窗口,无法定期输出HOP窗口中间结果数据,难以描述类似Dau场景的计算;

4. 语法的可读性不太好,写起来非常啰嗦。

上面的三个问题,在社区的1.13支持的TVF新语法上都得到了比较好的解决,TVF作为sql的新的标准语法,规范性也比较好。同时我们测试下来,性能大概能提升30%,因此我们在1.11的版本基础上,对高版本的tvf进行了backport操作。

整个开发过程中,比较值得关注的是除了tvf语法涉及到flink与calcite的改动,有另外一个花费了比较多精力的点是tvf算子是依赖于对一部分ManageredManager内存的使用,在1.11的内存模型里面,ManageredManager基本完全是RocksDB StateBackend使用,算子侧是无法分配的。但是从1.12开始,ManageredManager进行了分区操作,除了State外,其他的算子也能使用一部分了,比例可以通过在算子里面声明权重进行自动化调整,算法比较复杂,有兴趣可以去看下代码。因此这个内存模型变化我们也进行了backport。

目前整个tvf我们已经在线上逐步替换掉了传统的window使用,除了session window不支持外,上面提到的四个问题都比较好的解决了。

一个 HOP tvf 的window 处理过程如下:

935eeaba64c7efffe4627a1111ee3faa.png

 2.3 支持Projection-PushDown

我们发现在大量的指标计算场景,业务的输入是一张大宽表,几十上百个字段,但是只用了其中几个字段进行聚合计算。在离线计算里面,引擎Projection-PushDown配合列式存储存能很好的解决这个问题。flinksql优化器也支持了Projection-PushDown的操作,1.11只支持一些可以按列查询数据的数据源,比如hive/hbase/jdbc,流计算里面最常用的数据源kafka并不支持。kafka是行存储,读取没有太多的下推优化空间,但是在字段序列化上存在改进空间。

B站大数据场景的kafka数据格式,目前主要还是分隔符这种弱schema格式为主,行数据读出来后,通过指定的分隔符分割出字段,再按照ddl里面指定的字段格式进行反序列化到具体类型。

如果在优化器侧把字段下推到kafka source,那么在字段反序列化的时候,按需要的字段反序列化即可,在大流量的宽表指标计算场景下,能节省大量的序列化开销,我们在线上实现并默认开启了这个功能,表字段越多,使用字段越少,无用的字段越复杂,节省的开销越多。在开发过程中,我们还发现,1.11的Push-Down规则在遇到watermark算子会失效,无法把字段推到source,导致一旦遇到任务配置了watermark任务,就会全字段反序列化,并且全字段发送到下一个算子,除了序列化开销,也带来了大量的网络开销,这个问题高版本flink已经解决,我们参考高版本进行了修改。

  1. ## 简化sql如下
  2. CREATE TABLE source ( `a` VARCHAR, `b` BIGINT, `c` VARCHAR, `d` BIGINT, `e` VARCHAR, `f` VARCHAR, `ts` AS TO_TIMESTAMP(f) ) WITH ( 'connector'='bsql-kafka',.... )
  3. CREATE TABLE STD_LOG ( `a` VARCHAR, `b` BIGINT, `f` VARCHAR, `ts` Timestamp(3) )WITH('connector' = 'bsql-log','infinite'='true')
  4. INSERT INTO STD_LOG select a,b,f,ts from source
  5. ## 开启Projection-PushDown前,摘取执行计划结果如下
  6. == Optimized Logical Plan ==
  7. Sink(table=[default_catalog.default_database.STD_LOG], fields=[a, b, f, ts])
  8. +- Calc(select=[a, b, f, TO_TIMESTAMP(f) AS ts])
  9.   +- TableSourceScan(table=[[default_catalog, default_database, source]], fields=[a, b, c, d, e, f])             // source查询所有列
  10. ## 开启Projection-PushDown后,摘取执行计划结果如下
  11. == Optimized Logical Plan ==
  12. Sink(table=[default_catalog.default_database.STD_LOG], fields=[a, b, f, ts])
  13. +- Calc(select=[a, b, f, TO_TIMESTAMP(f) AS ts])
  14.   +- TableSourceScan(table=[[default_catalog, default_database, source, project=[a, b, f]]], fields=[a, b, f])   // source查询三列

我们的部分数据在逐渐转向protobuff格式,针对这类格式的序列化优化,我们目前还在探索中。

 2.4 支持UDF-Reuse

在线上,我们常常发现这样的现象,一段相同的udf计算逻辑在where和查询字段里面都存在,按照直觉,应该计算一次就可以了,但是实际运行时是算了两次,where算一次,查询输出再算一次,对于大部分udf开销比较小还可以接受,但是对于一些比较消耗性能的udf,这个问题就会带来巨大的资源浪费。我们希望能做一些udf的复用,来减少相同计算的多次重复执行。

这里我们只关注单task内部的scalar udf重复计算,跨task的复用由于涉及到整个dag的变化,我们暂时没有实现,其他类型udf不存在task内重复计算的场景。

对于flink单task内部,整个task是通过codegen编译成一整段大代码执行,相同的udf,相同的输入,在codegen编译整段代码时候,完全可以在类里面保证只计算一次,下次使用,直接引用结果即可,无需重复计算。我们在codegen编译阶段,进行了修改,对udf和参数进行了记录,并且放在了类的成员变量里,在task里面生成下一次udf执行代码的时候,如果成员变量不为空,直接引用即可。这里需要注意的是udf存在确定与非确定两种,可以通过flink udf的isDeterministic()接口进行声明,非确定的udf由于每次执行得到的结果都无法保证相同,即使参数一样,如now()函数,对于非确定的udf不进行复用。

定义一个UDF的使用SQL,分别在where和query中,样例如下:

  1. CREATE view tmpView as select id, my_udf(id,name) as f3 from source; -- my_udf是自定义function(MyUdf)
  2. INSERT INTO printTable SELECT id, f3 FROM tmpView where f3 is not null;

通过参数(“table.optimizer.reuse-user-defined-function.enabled”)来控制是否开启reuse udf的功能。当未开启reuse udf时,udf无论如何都会被调用两次,其codegen的部分代码如下:

04d2caf57cee090053635087bee0129d.png

当开启udf的复用后,udf会在第一次计算结果为非null后不再做后续计算,另外此处做null判断的原因是避免sql中存在or表达式,引发第一次udf未被计算到而做的兼容。开启reuse udf的codegen代码细节如下:

8e23a59f225a5ff774f16d837c775e75.png

目前我们已经在线上默认开启了此功能。

 2.5 默认开启Object-Reuse

在大部分流量型任务上,我们分析发现,数据的开销主要是来源于算子间的序列化与反序列。

对于流量型任务,由于没有聚合逻辑,dag上看大部分的算子都是chain在一起,数据在同一个线程内直接进行传递,没有跨进程,分析代码,发现这种线程内部的数据传递,默认情况下也会进行一次序列化反序列化的操作,由于单条记录大,总流量大,带来的性能开销也占了任务非常大的资源比例。同时,也注意到,社区本身也提供了一个object-reuse的参数,允许同一个线程的算子之间直接通过引用传递数据,避免掉序列化操作。不过这个开关非常危险,需要确保当前线程处理下一条数据之前,一定要把数据深拷贝出去才行,否则无法保证了准确性了。因为flink的每个task的serializer是线程不安全的,同时引用的底层segment存储是复用的,一旦当前数据只是通过引用传递到了某个组件,下一条数据在处理的时候就会被当前线程的serializer填充到segment里面,冲掉老的数据,如果老的数据在别的组件还没有被处理,就会出现问题。可以发现mini-batch的实现里面,收到数据后先做了一次数据的深拷贝,然后再放到到mini-batch的内存map里。

这是个非常棒的功能,但是线上到处充斥着算子chain在一起,但是没有针对object-reuse处理好的操作,我们梳理了sql用到所有算子,并且和社区最新版本进行了比对,最终划定了风险范围,主要是在维表的异步join,和sink的异步batch写上。这两个地方我们进行了特殊处理后,线上针对sql作业目前已经完全打开了object-reuse功能,典型的benchmark场景能达到成倍的性能提升,单条数据越大,流量越大,效果越好。

a0913fe073a1379bc214f62005cf033c.png

8591b841a5fca0ebd086f3060c97d565.png

 2.6 自动开启Mini-Batch/两阶段聚合等

在上面也提到过,支持tvf的其中之一的原因是t可以彻底解决window的数据倾斜问题。在一般的group by场景,线上也存在大量倾斜性能问题,我们深入的了解了flink的agg/rank/topn机制,发现社区提供的mini-batch和两阶段聚合这两个特性,在开启的情况下能解决,基本能彻底解决group by的性能问题。不过mini-batch本身是针对算子实现的,默认全部开启,我们测试发现在一些非聚合场景会存在问题,会影响watermark的传递,同时也会插入一些不必要的算子,带来性能消耗。因此我们在编译阶段,对dag进行了遍历分析,再决定是否自动开启mini-batch/两阶段聚合。

7f976036dcea5c70cddc9c2eaf46f792.png

我们在使用过程中,也发现了mini-batch低版本的一些问题,比如使用了MapView/ListView的情况下,会造成state无法过期,我们也参考更高的版本(FLINK-17096)进行了修复。

目前我们在线上进行进行了自动化开启功能的默认打开,线上的倾斜问题基本已经逐渐收敛掉了。

 2.7 Key-Group机制倾斜优化

在sql本身没有明显会造成倾斜逻辑的情况下,我们发现线上任务的算子发生了明显的数据倾斜。部分SubTask处理的数据居然是其他SubTask两倍,但是业务逻辑本身是非常均匀了,数据分布没有倾斜。我们分析后发现,造成的原因是flink底层的KeyGroup机制引起的。flink state本身的scale能力并不是无限的,为了平衡scale的性能,state本身其实是进行了分片处理,任务state第一次创建的时候,会根据最大并发度的值创建出state的分片数,也就是KeyGroup数,这个数在整个state的生命周期内都不许发生变化,除非任务重启放弃state,重新创建新的state。KeyGroup会尽可能均匀的分配到所有的并发度也就是SubTask上,但是无法完全保证均匀,在最大并发度稍微大于任务并发度的情况下,很容易产生部分SubTask的KeyGroup数是其他SubTask两倍的情况,任务的scale本质也就是KeyGroup在SubTask间的移动。业务数据的分配是通过hash均匀的分配到KeyGroup的,但是KeyGroup到SubTask的不均匀分配导致了最终表现出的倾斜现象。

借用一张网图:

38b584679763a45636f1751ff3db0918.jpeg

这个机制本身比较难以改进,我们和runtime团队进行了一些沟通,他们目前通过对最大并发度算法进行了一些改动,默认计算的最大并发度计算的比较保守,在流量上涨的过程中,很容易触达最大并发度稍大于任务并发度的危险情况,扩大了最大并发度的计算结果,使得并发数正常情况下都远远低于最大并发度,SubTask的KeyGroup分配倾斜在10%以内。当然,最大并发度的增大会带来state元数据的增多,进行了测试后,checkpoint/restore没有明显的性能下降,目前线上我们已经全部开启了优化,KeyGroup倾斜基本得到了彻底的解决。

 2.8 支持Connector设置Slot-Group

我们在线上遇到了这样的问题,同一个任务的部分TaskManager的资源使用率明显高于其他TaskManager,导致任务堆积,同时扩容增加并发度也无法解决资源不平衡问题,通过分析后我们确认了是任务的Kafka Connector部分序列化消耗的资源过多引起的,导致计算部分获取不到足够的CPU资源,但是Connctor的SubTask数量受到kafka分区数的限制,分区数已经不建议继续增加。

这个本质是个Task的隔离问题,目前线上默认的Task到slot分配测试是share模式,Slot-Group一致的情况下,Task就会共享Slot,争抢同一个CPU资源,大部分场景都没有问题,资源不够了,横向扩容就行。但是对于部分Connector来说,横向扩容受到外部存储分区数的限制,导致无法用上额外的资源,持续影响别的Task。关闭share模式不是一个好的选择,会导致所有Task都会使用独立的Slot,引起过多的资源浪费,因此我们支持的Connector的Slot-Group配置,支持对connector进行slot隔离,分配独立的计算资源,计算部分由于一般scale能力都比较强,无需进行隔离。

03 基于流批一体的实时projection探索

 3.1 背景

当前基于hudi/iceberg的湖仓一体概念非常火热,数据湖基本统一了流批两种场景下的数据存储(秒级延迟暂时还取代不了消息中间件),解决了lamba架构下存储一致性问题,B站目前也在积极推进。同时,在计算引擎的流批统一上,我们也在使用flinksql进行一些场景的探索和落地。

在和业务和反复沟通中我们发现,业务希望离线sql能够达到实时的数据产出效率,秒级出结果,资源也能可控,也希望实时flinksql能像离线sql一样灵活,无需关注存储,免运维,随意修改sql逻辑,自由回溯数据。

我们对实时dqc的业务场景进行了重点的跟踪,实时dqc场景是对数据做到分钟级别的监控,做同环比指标比较。同时为了避免短区间数据的不平滑,采用滑动窗口的计算方式,如每隔5分钟计算过去1小时的数据。原始数据我们落入了hudi,如果使用离线dqc的计算方式,每隔5min要去拉过去1h的数据进行计算,非常消耗性能,读放大非常严重,实际每次查询只有5min的数据增量,但是却要读1h的数据。如果使用流计算,借助于state的状态,每5min只要摄入增量数据增量计算即可,结果写出到用户存储,计算的资源消耗很低。但是用户需要对流计算任务持续运维,自行解决结果存储,计算延迟等问题。

流处理滑动窗口示意图如下:

1c6019120f6a710769ef0b243e1b04c0.png

能否结合流批一体的技术,把流批计算各自的优势集中到一起开放给用户呢。我们借鉴了clickhouse的projection思想,依托于flink+hudi的流批一体架构,实现了基于flink的实时projection方案,在一些偏固定场景的batch查询上,使用流式物化的能力来加速批查询。

clickhouse的projection可以参考[1],本文不详细展开projection概念。

 3.2 技术架构

7474caa60042a77980d8fea059f3cb75.png

上面就是整套projection的架构,整个架构分为3个核心模块,kyuubi,flink-local-executor,以及projection-manager。

kyuubi是sql层统一入口。kyuubi-server负责接收用户的sql请求,路由到对应的flink-sql-engine,进行sql解析和提交操作。

flink-local-executor是sql的客户端,完成具体的解析和提交工作。解析过程中会执行projection的创建,物化规则的加载以及projection元数据的加载。

projection-manager是projection的管理服务,负责存取projection的元数据,并执行projection任务的创建。

技术侧我们主要解决了这几个问题。

1. 支持projection的创建,用户提交批查询,可以通过在select语句上增加hint,提示查询引擎,该查询会进行复用,引擎会针对该查询进行projection的创建。我们支持了如下语法:

  1. select /*+ OPTIONS('table.optimizer.materialization-enabled'='true') */
  2.  window_end,
  3.  f1,
  4.  sum_f2
  5. from
  6.  (
  7.    select /*+ OPTIONS('table.optimizer.materialization.sub-query'='true') */
  8.      cast(window_end as varchar) as window_end,
  9.      f1,
  10.      SUM(f2) AS sum_f2
  11.    from
  12.      TABLE(
  13.        HOP(
  14.          TABLE bili.dwd_data_example,
  15.          DESCRIPTOR(ts),
  16.          INTERVAL '5' MINUTES,
  17.          INTERVAL '60' MINUTES
  18.        )
  19.      )
  20.    group by
  21.      window_end,
  22.      f1
  23.  ) c
  24. where
  25.  f1 = 1
  26.  and window_end = '2022-08-01 12:00:00.000';

/*+ OPTIONS('table.optimizer.materialization-enabled'='true') */这个hint表示该查询尝试使用projection进行查询改写,但是不保证能改写成功。

/*+ OPTIONS('table.optimizer.materialization.sub-query'='true') */这个hint表示需要对这个hint所在的子查询进行projection,加速后续相同逻辑的查询。

2. 支持flink sql的projection ddl语法以及sql查询改写规则。用户提交批查询,如果有对应的projection,能够改写查询到projection,加速查询。我们主要参考了calcite的物化规则,并且增加了对tvf语法的支持。

下面看下projection改写前后的查询执行计划。

  1. projection命中前:
  2. Calc(select=[record_num, 49126 AS ruleId])
  3. +- HashAggregate(isMerge=[true], groupBy=[window_start, window_end], select=[window_start, window_end, Final_COUNT(count1$0) AS record_num])
  4.   +- Exchange(distribution=[hash[window_start, window_end]])
  5.      +- LocalHashAggregate(groupBy=[window_start, window_end], select=[window_start, window_end, Partial_COUNT(*) AS count1$0])
  6.         +- Calc(select=[window_start, window_end], where=[=(CAST(window_end AS VARCHAR(2147483647)), '2022-09-26 16:51:00.000')])
  7.            +- WindowTableFunction(window=[CUMULATE(time_col=[time_iso], max_size=[86400000 ms], step=[3 min])])
  8.               +- TableSourceScan(table=[[hive, bili, dwd_data_example, project=[time_iso], partitions=[{log_date=20220926}]]], fields=[time_iso], hints=[[[OPTIONS inheritPath:[0, 0, 0, 0, 0] options:{table.optimizer.materialization.sub-query=true}][OPTIONS inheritPath:[0, 0, 0, 0, 0, 0, 0] options:{table.optimizer.materialization-enabled=false}]]])
  9. projection命中后:
  10. Calc(select=[CAST(record_num AS BIGINT) AS record_num, 49126 AS $f1], where=[=(CAST(window_end AS VARCHAR(2147483647)), '2022-09-26 16:51:00.000')])
  11. +- TableSourceScan(table=[[hive, flink_mv, dwd_data_example_mv_yhqk95lf, project=[window_end, record_num]]], fields=[window_end, record_num])

改写后,直接使用了projection的结果,直接从projection表里面读取数据即可,能大大的加速查询,做到秒级甚至毫秒级的响应。

3. projection的改写降级,根据watermark等指标屏蔽掉projection的实时任务延迟失败等问题,保障查询结果的可靠性。

我们目前的存储主要使用的hudi,在数据写入存储过程中,对应的commit里面,会记录下当前commit完成时,projection任务对应的watermark信息,在执行物化规则匹配时,会参考对应的信息进行规则匹配,如果落后当前太多时间,则会对拒绝当前的projection改写,直接降级到原表进行查询。

 3.3 一些后续的优化

后续我们会主要围绕proection的效率来进行。

1. 长期无法命中的projection会进行回收。

2. 多个维度相同的projection会进行定期的合并,降低projection计算成本。

04 sql的性能基座flink state优化

flink state作为flinksql的性能基座,在优化flinksql的性能过程中,我们发现难以绕开,因此也对flink state做了一些开发优化。内部目前使用的默认state是rocksdb statebackend,我们重点优化了该state的性能。

 4.1 State Metrics完善

社区1.11版本自带的性能监控比较偏向RocksDB本身,监控了大量内存层面的使用,但是没有业务层最需要的读写性能数据,每每出现问题,监控上基本难以看出认为问题。因此我们对StateBackend的监控进行了完善,由于State的操作非常频繁,监控本身对性能也有影响,参考了最新1.15的做法,也对监控进行了采样。同时我们对接口操作以及返回结果进行了不同的分类,在RocksDB里面,读写是完全不同的路径,读可以分为点查get,前缀查找seek,前缀迭代next,写可以分为kv写write,list追加merge,以及删除操作delete。从返回结果看,读到数据还是读到null代价也是完全不同的。我们希望metrics能针对这些进行分类,细化监控,为我们后续的优化指导方向。

我们上线后,发现了一些有价值的信息,线上大量的读取比例,都是读取null的结果,同时读取null的代价大大高于读取数据,我们后续在RocksDB和Flink框架侧进行了bloom优化,避免这种读取null穿透到底层,过长的搜索路径带来过大的查询代价。

c2665ba706db60978fcdcf1da4ce1ec3.png

 4.2 RocksDB State压缩机制调整

线上的模型特征任务,有相当一部分存在非常大的State,超过500GB,最大的接近10TB,占用了大量CPU,同时在进行CheckPoint时也会带来比较大的抖动,性能优化一直是一个非常重要的问题。

我们对一些大任务进程进行了火焰图分析,发现大量的CPU消耗是在压缩与解压缩上,既有compact过程的压缩解压缩,也存在于block加载到缓存的解压缩,其中Compaction有明显的读写放大效益,导致压缩解压缩也会放大。一个直观的解决办法就是直接关闭RocksDB的压缩,但是这有个明显的问题,就是state大小会膨胀很多倍。另一个想法关闭RocksDB压缩后能不能直接在flink写入RocksDB之前压缩,读取出来后解压缩,平衡底层的压缩开销与关闭压缩导致的state放大问题,在和公司内部的RocksDB团队沟通后,这个想法得到了支持。

我们在双流JOIN场景对几种方案进行了性能对比,默认配置下500GB的state,最终在flink读写层的压缩效果最好,无论是均值还是峰值CPU都有更好的表现,均值CPU大概下降了20%。

由于改变了checkpoint存储的数据内容结构,无法做到透明升级,我们增加了一个配置开关,放弃之前的savepoint/checkpoint后可以一键切换到这个压缩方案,目前我们线上的大State场景都在逐步切换这种方案,小State场景由于性能瓶颈不在RocksDB,优化效果不明显,没必要开启。

4f7e6f0a7c93d99c7c7eb212f34dd9a6.png

7bc1628d38dde5b494f61c893f04cfb9.png

 4.3 RocksDB State磁盘分配优化

目前内部的主要部署模式是yarn,开启了Cgroup进行cpu隔离,但是磁盘实际上还是混用的。实际运行中,我们确实观察到节点之间以及机器内部磁盘之间存在比较大的io不均衡情况,而且带来了任务的稳定性问题。

联合了离线的yarn团队,从yarn和flink的tm自身两个层面对state的磁盘选择进行了优化。离线侧对yarn的调度进行优化,支持基于磁盘io负载的节点打分,flink新任务申请conatiner,优先分配io负载表现低的节点,让机器之间的io负载相对平均。

同时在机器内部的磁盘之间,task的state在初始化选择磁盘的时候,也会参考分配到的多块盘的,磁盘存储空间,以及最近几秒内的io负载,进行二次选择,尽可能的选择较低的负载的磁盘,保障机器内部多块磁盘间的均衡。这里说明一点,从内核团队得知,ssd磁盘的负载无法通过linux传统的ioutil工具来进行评估,因此此处的磁盘负载我们也是通过一些其他的指标来进行了估算,未必特别准确。

 4.4 RockDB参数优化

针对RocksDB本身,我们也做了一些基本参数的调整,我们发现比较有用的参数主要是两个。

1. 分区索引 partitioned index/filters

通过对sst索引分区,并且增加额外的一级分区索引,提升cache的命中率,减少磁盘负载,在join场景下,能发挥比较好的性能,彻底解决了磁盘性能突降的问题。这个功能我们发现社区在1.13已经增加了参数支持了。

可以参考rocksdb官方文档:https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters

2. 压缩算法compression

目前默认的是snappy算法,在压缩率和性能上都不算最好,rocksdb的官方文档也明确说明了,lz4会是更好的选择,确实有微小的提升。对于磁盘io压力比较大的业务场景,调整压缩到业务层是更好的。

05 未完的一些工作

1. Flink Remote StateBackend

B站目前整个大的背景是在做离在线计算资源的混部,flink由于state的存在,我们早先申请的机型都配置了大容量高性能的磁盘,但是在线业务的机器其实并没有很好的磁盘,这导致混部推进会比较艰难。还有一个问题,很多超大state任务,在进行checkpoint和restore过程中,由于state过大,会导致时间过长,影响体验和稳定性。基于上面两个问题,我们和内部的kv存储团队合作拉起了Flink Remote Statebackend的项目,把flink的KeyedStateBackend从本地移到外部服务,实现flink的存算分离,目前这个项目的基本功能已经开发完成,比较大的一个难点是如何解决高频网络io带来的开销,我们通过大cache,攒批写,bloom过滤读来降低读写的rpc,目前看效果还不错,在做进一步的性能测试和线上灰度。

2. 流批一体的进一步探索

借助于flink实时projection,我们在流批一体的场景上进行进一步探索演进,如报表,实时指标,特征等场景,为业务带来更好的体验。

以上是今天的分享内容,如果你有什么想法或疑问,欢迎大家在留言区与我们互动,如果喜欢本期内容的话,请给我们点个赞吧!

参考链接:

[1] https://zhuanlan.zhihu.com/p/387877931

欢迎加入 大数据 |数仓技术交流群

进群方式:请加微信(微信号:dataclub_bigdata),回复:加群,通过审核会拉你进群。

281ab6f66d29e220487ff6e631ec4c3c.png

(备注:行业-职位-城市)

福利时刻

01. 后台回复「数据」,即可领取大数据经典资料。

02. 后台回复「转型」,即可传统数据仓库转型大数据必学资料。

03. 后台回复「加群」,或添加一哥微信IDdataclub_bigdata  拉您入群(大数据|数仓|分析)或领取资料。

9badf0fabbcbb9854d386eb13a75539b.png  

关注不迷路~ 各种福利、资源定期分享

往期推荐

大数据应用型产品设计方法及行业案例介绍(附110页PPT)

如何成为一名合格的数据产品经理?

数据治理怪象

数据平台中的海量标签如何治理?

阿里大数据之路:数据模型篇大总结

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/480454
推荐阅读
相关标签
  

闽ICP备14008679号