赞
踩
在先前的Apache Beam中的有状态计算中,介绍了Apache Beam中有状态计算的基础知识,重点介绍了对每个元素的处理中添加的状态特性。所谓的适时处理,是有状态计算的补充,是通过设置定时器来,在将来某个时间点上的(有状态的)进行回调。
在Beam中,计时器能做些什么?下面是一些例子:
• 可以输出数据缓冲,在一定数量的处理时间之后。当WaterMark触发时(估计已在在当前时间内收到所有数据时),可以根据业务需求进行进行一次集中处理,而不是只能对某一个元素进行处理,在之前的Beam中是没有这样的机制的。
• 可以使用超时的工作流来改变状态,并在一定时间内不需要额外的输入而释放输出。
以上只是一些举例。实际上状态和计时器一起组成了一个强大的编程范式,可以用于细粒度的控制,表达丰富的工作流语义(数据处理的过程,在绝大部分情况下可以看做是一种数据处理的工作流)。有状态和适时计算对于使用者来说,是大数据处理引擎的,并且可以与Beam的基于事件时间的窗口模型使用在流式和批处理中。
在之前的文章中,对有状态的计算有了很大的了解,主要是与关联、交换的组合形成了对比。在这篇文章中,我将强调一种我曾经简单提到过的观点:对每个键和窗口状态和定时器的访问的elementwise处理是“令人尴尬的并行”计算的基本模式,与Beam中的其他方法截然不同。
事实上,有状态和适时计算是跟底层相关的的计算模式。正因为它的层地较低,所以能够让我们对此层进行细粒度的控制,这可能够让我们应对更多的应用场景,也能带来性能效率的提升,但是同时也增加了复杂性,所以有得必有失!
接下来在回顾一下ApacheBeam的两个主要计算模式:
最常见的并行计算模式是,组建一个集群,对海量的数据的每一条应用相同的操作。
在Beam中这样的操作使用ParDo进行抽象,类似于MapReduce中的Map,但是比MapReduce的Map更加强大,可以视为函数式编程中的map、flatmap等。
下图演示了每个元素的处理。正方形代表输入,三角形代表输出。颜色代表输入/输出元素的Key,这在以后会很重要。每个输入元素都完全独立地映射到相应的输出元素。数据的处理可以在任何的分布式集群上执行,本质上可以具备无限的并行计算能力。
这种模式最常见,也最简单,几乎在所有的并行计算中中都有这种无状态的处理。每一个输入元素相互独立的被处理,即便是元素是乱序也无关紧要,因为在这种处理模型下,不需要考虑元素之间的顺序关系。
高效的在集群中调度分布式的计算任务相当的困难,但是可以有一些方法去解决,例如Spliting、进度估计、work_stealing等。
Beam的另一个核心计算模式,具有相同Key的元素被调度到同一个机器上,然后将数据关联在一起。
在Beam中,这被表示为一个GroupByKey或Combine.perKey,对应于MapReuce中的shuffle和Reduce。Combine在Beam中是一个聚合的抽象类,是聚合中最基本的操作,类似于最原始的GroupBy,对输入元素进行相同的分组处理。
在下图中颜色、方块和三角形的含义跟上边是一样的。区别在于具有相同key的输入元素(即相同的颜色的方块)被路由到相同的位置,进行聚合运算。图中虽然只有几条线但不代表此种计算模式下,并行度会有多少的降低,在现实的应用中,key的数量可能有几十万上百万个, 并行度依然很高。
上图可以视为一个简单的抽象模型,在实际的计算引擎中,会对针对每一个Key进行聚合运算,这可以视为是有状态的计算,不同于第1个计算模型。
需要特别说明的是,在流式计算中,因为网络延迟等各种原因,我们无法准确的判断,什么时间点一个时间窗口内的数据全部到达了,所以一般用如下方式处理:
这种情况下,需要对中间计算结果进行保存,可以是内存、Redis、数据库等等,视需求决定,当到达触发条件的时候进行回调(回调函数一般是我们自己编写的函数),本次窗口的计算结果发送到下游。
所以对于流式计算来说,有状态的计算和定时器是必需的,但此处的定时器可以是周期性的也可以是根据某个条件触发的。
特别强调一下,Apache Beam只是一种大数据计算模型的抽象,实际的执行依赖于底层Spark Flink Apex等计算引擎的处理,所以对于Apache Beam的开发者来说,不需要关注如何处理乱序问题,也不能直接操作State如何存储、合适触发回调。
关于基于State的计算,有专门的文章进行解释,请参考如下链接中的文章。
http://blog.csdn.net/ffjl1985/article/details/78062296
ParDo和Combine.perKey是常见的标准并行计算模式,所有的大数据分布式引擎都提供了支持,然而在实际的场景中,这还不够,还需要一些其他的重要特性。
首先分析一下ParDo和Combine.perKey的特点。
将ParDo和Combine.perKey的特性结合在一起,可以总结出有状态适时计算的基本特性:
具有相同key和时间窗口的数据元素的集中处理,例如求和、均值等。
支持乱序的数据处理
单线程每次处理1个元素或者根据Timer定时器集中处理一批元素
如下图所示,方块表示的数据元素依次被有状态适时计算DoFn(DoFn是ApacheBeam编写具体的业务逻辑的抽象类)处理。在DoFn中,可以访问State(内存、数据库等等),可以设置回调函数(此时,DoFn依然可以不使用回调函数,沿用旧的处理模式)。
这就是Apache Beam中基于key和时间窗口的有状态适时计算的抽象模型。接下来我们通过代码示例来看,在DoFn中是如何使用State,如何设置Timer定时器以及如何编写回调函数。
假如我们要实现一个事件分析系统,事件量非常大,在处理中,需要调用外部的第三方系统为每一个事件补充信息(增加字段),如果每一个事件都产生一次调用,对外部系统的冲击会非常大,性能会随着事件数量的增加剧下降,最严重的情况下,可以将第三方系统拖垮,此时就需要批量处理来提升性能。
State代码
new DoFn<Event, EnrichedEvent>() {
@StateId("buffer")
private final StateSpec<BagState<Event>> bufferedEvents = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
… TBD …
}
代码中我们声明了:
DoFn代码
接下来我们就可以正式开始编写DoFn部分的@ProcessElement方法,方法中会使用声明的State。当事件buffer的事件个数到达MAX_BUFFER_SIZE时,就触发一个批量调用。
new DoFn<Event, EnrichedEvent>() {
private static final int MAX_BUFFER_SIZE = 500;
@StateId("buffer")
private final StateSpec<BagState<Event>> bufferedEvents = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("buffer") BagState<Event> bufferState,
@StateId("count") ValueState<Integer> countState) {
int count = firstNonNull(countState.read(), 0);
count = count + 1;
countState.write(count);
bufferState.add(context.element());
if (count > MAX_BUFFER_SIZE) {
for (EnrichedEvent enrichedEvent : enrichEvents(bufferState.read())) {
context.output(enrichedEvent);
}
bufferState.clear();
countState.clear();
}
}
… TBD …
}
下图是整个过程的示意图
• 浅蓝色的框表示 DoFn
• 黄色的框表示 @ProcessElement方法
• 输入事件是红色的小方块—为了简化表示,图中值画了一个key的情况,实际情况下几十万个可以的情况下,DoFn的处理与此类似。
• 每一个输入事件被写入到buffer缓冲区中,用红色的三角形表示,之所以不使用红色的方块表示,是因为在写入buffer缓冲区的时候,可能是原始的事件,可能能是根据业务需要处理过的事件,虽然此段代码中确实是将原始数据写入了State buffer缓冲区,但此处特意区分一下,更清晰的表达数据的处理过程。
• 每一个事件在调用第三方外部服务补充完信息后,用红色的圆圈表示,然后被依次发送给下游。
到此为止,我们使用了State,但是还没有使用Timer定时器,这里有一个潜在的问题,如果没有新的事件进来,缓冲区未满,那么缓冲区中已经缓冲的事件,永远没有机会得到处理,所以,此时需要一个超时机制,当超过一定的时间,认为时间窗口超时,缓冲区虽然没有满,但仍然要触发一次回调函数,即便是没有新的数据来,所有的事件也会得到处理。
增加一个时间时间定时器,当PCollection的Watermark触发的时候,调用回调函数。也就是说触发回调的时候会有两种情况:
缓冲区满
窗口超时
如下代码所示,当窗口超时的时候,State事件缓冲区 buffer中的的事件被读取处理,进行一次RPC调用。
new DoFn<Event, EnrichedEvent>() {
…
@TimerId("expiry")
private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(
ProcessContext context,
BoundedWindow window,
@StateId("buffer") BagState<Event> bufferState,
@StateId("count") ValueState<Integer> countState,
@TimerId("expiry") Timer expiryTimer) {
expiryTimer.set(window.maxTimestamp().plus(allowedLateness));
… same logic as above …
}
@OnTimer("expiry")
public void onExpiry(
OnTimerContext context,
@StateId("buffer") BagState<Event> bufferState) {
if (!bufferState.isEmpty().read()) {
for (EnrichedEvent enrichedEvent : enrichEvents(bufferState.read())) {
context.output(enrichedEvent);
}
}
}
}
代码片段解读:
• 首先使用@TimerId(“expiry”)声明了一个定时器,定时器的Id是expiry.接下来我们就可以使用这个定时器设定回调函数。
• 使用@@TimerId 声明的定时器变量expiryTimer,
• @ProcessElement 方法中,我们声明了@TimerId(“expiry”) Timer。Beam的执行引擎会自动的提供Timer的参数,我们可以来设定或者重新设定。但是重新设定Timer是性能杀手,所以简单的对每一个输入元素设定。
• 我们用 @OnTimer(“expiry”)声明了onExpiry方法,这个方法里会执行对第三方系统的RPC调用,并将计算结果发送给下游。
@ProcessElement 和 @OnTimer(“expiry”) 方法都会访问State事件缓冲,执行相同的RPC调用,然后将数据发送到下游。
现在如果实时计算的模式处理数据,对缓冲数据而言,会存在不可预知的延迟,如果Watermark进度太慢,或者事件时间窗口长度太长,在计算窗口结果之前会等待很长时间。此时可以使用计时器来限制等待的时钟时间,又叫做处理时间,超过一定的等待时间即便是缓冲区没有满,也会触发一次RPC调用,选择等待时间的时候需要考虑RPC调用对外部服务的冲击。
处理时间计时器(与事件时间不同,处理时间一般晚于事件时间),相对来说简单,等待一个固定的时间段,然后执行一次回调。
作为本例的最后一个部分,当事件写入State事件缓冲区 buffer的时候,我们设定一个处理时间定时器。我们跟踪定时器是否设定了,这样就不用每次都重新设定。当事件来的时候,如果定时器没有设定,设定定时器为(当前时间+MAX_BUFFER_DURATION),当设定的处理时间定时器超时的时候,触发回调函数,调用RPC,然后将数据发送给下游。
new DoFn<Event, EnrichedEvent>() {
…
private static final Duration MAX_BUFFER_DURATION = Duration.standardSeconds(1);
@TimerId("stale")
private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@ProcessElement
public void process(
ProcessContext context,
BoundedWindow window,
@StateId("count") ValueState<Integer> countState,
@StateId("buffer") BagState<Event> bufferState,
@TimerId("stale") Timer staleTimer,
@TimerId("expiry") Timer expiryTimer) {
boolean staleTimerSet = firstNonNull(staleSetState.read(), false);
if (firstNonNull(countState.read(), 0) == 0) {
staleTimer.offset(MAX_BUFFER_DURATION).setRelative());
}
… same processing logic as above …
}
@OnTimer("stale")
public void onStale(
OnTimerContext context,
@StateId("buffer") BagState<Event> bufferState,
@StateId("count") ValueState<Integer> countState) {
if (!bufferState.isEmpty().read()) {
for (EnrichedEvent enrichedEvent : enrichEvents(bufferState.read())) {
context.output(enrichedEvent);
}
bufferState.clear();
countState.clear();
}
}
… same expiry as above …
}
下图是完整的代码逻辑:
• 当事件被 @ProcessElement 处理的时候,首先在State中缓存事件。
• 如果事件产生的非常少,导致长时间没有填满缓冲区,那么会由时间定时器触发一次回调,从缓冲区取出所有缓存的事件,执行一次RPC条用,并将事件发送到下游.
• 最后,当窗口失效,并且在这个窗口中缓存的事件都被处理,并且发送到下游之后,Window就会被销毁。
使用Beam统一的流式和批处理,你不需要关注State和定时器的细节就,由底层的大数据处理引擎Spark flink等来处理细节问题。但是以下需要考虑的点:
Apache Beam的一大特性是对乱序数据的良好应对,通过基于事件时间窗口机制,不管什么类型的时间窗口、事件的顺序混乱程度,都能得到正确的结果。
如果编写有状态的适时的转换逻辑,也是一样通用的。如果选择了固定时间窗口,时间长度为1小时,或者时间长度为30分钟、每10分钟滑动一次的滑动窗口,这应该对于编写有状态的适时计算是透明的。
Beam第二大特性是统一的实时(流式)和历史(批量)数据处理的语义,事实上,Beam只有一套API,我们可以用相同的方式编写实时数据和历史数据(如每天存储在磁盘上经过压缩后的历史数据)处理的代码,不需要考虑两者的区别。
历史数据可是能完全的乱序的。对历史数据分片进行处理时候的顺序,与实时处理时,事件的顺序完全不同。此时数据已经全部就位,数据延迟的问题也不存在了,对于执行引擎来说,不需要像实时处理一样需要等待时间窗口结束,这种等待往往是不精确的。不论是处理历史数据和实时数据几乎是一模一样的方式。
有时,我们需要关注顺序或者数据处理的时序,这种情况下需要开发人员来进行处理。
全文完!
转载需标明文章来源!
http://blog.csdn.net/ffjl1985/article/details/78063757
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。