当前位置:   article > 正文

Apache Beam适时有状态计算_apache beam 做数据缓冲区

apache beam 做数据缓冲区

      在先前的Apache Beam中的有状态计算中,介绍了Apache Beam中有状态计算的基础知识,重点介绍了对每个元素的处理中添加的状态特性。所谓的适时处理,是有状态计算的补充,是通过设置定时器来,在将来某个时间点上的(有状态的)进行回调。

      在Beam中,计时器能做些什么?下面是一些例子:

  • • 可以输出数据缓冲,在一定数量的处理时间之后。当WaterMark触发时(估计已在在当前时间内收到所有数据时),可以根据业务需求进行进行一次集中处理,而不是只能对某一个元素进行处理,在之前的Beam中是没有这样的机制的。

  • • 可以使用超时的工作流来改变状态,并在一定时间内不需要额外的输入而释放输出。

      以上只是一些举例。实际上状态和计时器一起组成了一个强大的编程范式,可以用于细粒度的控制,表达丰富的工作流语义(数据处理的过程,在绝大部分情况下可以看做是一种数据处理的工作流)。有状态和适时计算对于使用者来说,是大数据处理引擎的,并且可以与Beam的基于事件时间的窗口模型使用在流式和批处理中。

什么是有状态和适时计算?

      在之前的文章中,对有状态的计算有了很大的了解,主要是与关联、交换的组合形成了对比。在这篇文章中,我将强调一种我曾经简单提到过的观点:对每个键和窗口状态和定时器的访问的elementwise处理是“令人尴尬的并行”计算的基本模式,与Beam中的其他方法截然不同。

      事实上,有状态和适时计算是跟底层相关的的计算模式。正因为它的层地较低,所以能够让我们对此层进行细粒度的控制,这可能够让我们应对更多的应用场景,也能带来性能效率的提升,但是同时也增加了复杂性,所以有得必有失!

      接下来在回顾一下ApacheBeam的两个主要计算模式:

基于每一个元素的处理 (ParDo, Map等)

      最常见的并行计算模式是,组建一个集群,对海量的数据的每一条应用相同的操作。

      在Beam中这样的操作使用ParDo进行抽象,类似于MapReduce中的Map,但是比MapReduce的Map更加强大,可以视为函数式编程中的map、flatmap等。

      下图演示了每个元素的处理。正方形代表输入,三角形代表输出。颜色代表输入/输出元素的Key,这在以后会很重要。每个输入元素都完全独立地映射到相应的输出元素。数据的处理可以在任何的分布式集群上执行,本质上可以具备无限的并行计算能力。
这里写图片描述

      这种模式最常见,也最简单,几乎在所有的并行计算中中都有这种无状态的处理。每一个输入元素相互独立的被处理,即便是元素是乱序也无关紧要,因为在这种处理模型下,不需要考虑元素之间的顺序关系。

      高效的在集群中调度分布式的计算任务相当的困难,但是可以有一些方法去解决,例如Spliting、进度估计、work_stealing等。

基于Key和时间窗口的聚合处理(Combine, Reduce, GroupByKey等.)

      Beam的另一个核心计算模式,具有相同Key的元素被调度到同一个机器上,然后将数据关联在一起。

      在Beam中,这被表示为一个GroupByKey或Combine.perKey,对应于MapReuce中的shuffle和Reduce。Combine在Beam中是一个聚合的抽象类,是聚合中最基本的操作,类似于最原始的GroupBy,对输入元素进行相同的分组处理。

      在下图中颜色、方块和三角形的含义跟上边是一样的。区别在于具有相同key的输入元素(即相同的颜色的方块)被路由到相同的位置,进行聚合运算。图中虽然只有几条线但不代表此种计算模式下,并行度会有多少的降低,在现实的应用中,key的数量可能有几十万上百万个, 并行度依然很高。


这里写图片描述

      上图可以视为一个简单的抽象模型,在实际的计算引擎中,会对针对每一个Key进行聚合运算,这可以视为是有状态的计算,不同于第1个计算模型。

      需要特别说明的是,在流式计算中,因为网络延迟等各种原因,我们无法准确的判断,什么时间点一个时间窗口内的数据全部到达了,所以一般用如下方式处理:

  1. 待某一个特定时间长度
          例如时间窗口结束30秒之后,视为所有的数据都到达了。例如时间窗口结束30秒之后,视为所有的数据都到达了。
  2. WaterMark机制
          例如时间窗口结束30秒之后,视为所有的数据都到达了。WaterMark是一种通过估算的方式判断所有的数据是否到达了,出自于Google的MinnWheel的论文,有兴趣的可以看一下。

      这种情况下,需要对中间计算结果进行保存,可以是内存、Redis、数据库等等,视需求决定,当到达触发条件的时候进行回调(回调函数一般是我们自己编写的函数),本次窗口的计算结果发送到下游。

      所以对于流式计算来说,有状态的计算和定时器是必需的,但此处的定时器可以是周期性的也可以是根据某个条件触发的。

      特别强调一下,Apache Beam只是一种大数据计算模型的抽象,实际的执行依赖于底层Spark Flink Apex等计算引擎的处理,所以对于Apache Beam的开发者来说,不需要关注如何处理乱序问题,也不能直接操作State如何存储、合适触发回调。

      关于基于State的计算,有专门的文章进行解释,请参考如下链接中的文章。
http://blog.csdn.net/ffjl1985/article/details/78062296

基于Key和时间窗口的有状态适时计算

      ParDo和Combine.perKey是常见的标准并行计算模式,所有的大数据分布式引擎都提供了支持,然而在实际的场景中,这还不够,还需要一些其他的重要特性。

      首先分析一下ParDoCombine.perKey的特点。

      ParDo
  • • 单线程模型处理每一个输入元素
  • • 支持数据的乱序处理,元素的处理相互独立,元素之间没有交集,计算结果不会产生问题
      Combine.perKey
  • • 具有相同key和时间窗口的数据元素的集中处理,例如求和、均值等。
  • • 用户可以自定义聚合算法(Combine是一个抽象的聚合模型,我们可以自己实现自己的聚合类)

      将ParDo和Combine.perKey的特性结合在一起,可以总结出有状态适时计算的基本特性:

  1. 具有相同key和时间窗口的数据元素的集中处理,例如求和、均值等。

  2. 支持乱序的数据处理

  3. 单线程每次处理1个元素或者根据Timer定时器集中处理一批元素

      如下图所示,方块表示的数据元素依次被有状态适时计算DoFn(DoFn是ApacheBeam编写具体的业务逻辑的抽象类)处理。在DoFn中,可以访问State(内存、数据库等等),可以设置回调函数(此时,DoFn依然可以不使用回调函数,沿用旧的处理模式)。


这里写图片描述

      这就是Apache Beam中基于key和时间窗口的有状态适时计算的抽象模型。接下来我们通过代码示例来看,在DoFn中是如何使用State,如何设置Timer定时器以及如何编写回调函数。

示例 批量RPC(Batched RPC)

      假如我们要实现一个事件分析系统,事件量非常大,在处理中,需要调用外部的第三方系统为每一个事件补充信息(增加字段),如果每一个事件都产生一次调用,对外部系统的冲击会非常大,性能会随着事件数量的增加剧下降,最严重的情况下,可以将第三方系统拖垮,此时就需要批量处理来提升性能。

State代码

new DoFn<Event, EnrichedEvent>() {
  @StateId("buffer")
  private final StateSpec<BagState<Event>> bufferedEvents = StateSpecs.bag();
  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
  … TBD … 
}   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

      代码中我们声明了:

  • • buffer是事件的缓冲区
  • • count是事件的计数器

DoFn代码

      接下来我们就可以正式开始编写DoFn部分的@ProcessElement方法,方法中会使用声明的State。当事件buffer的事件个数到达MAX_BUFFER_SIZE时,就触发一个批量调用。

new DoFn<Event, EnrichedEvent>() {

  private static final int MAX_BUFFER_SIZE = 500;

  @StateId("buffer")
  private final StateSpec<BagState<Event>> bufferedEvents = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<Event> bufferState,
      @StateId("count") ValueState<Integer> countState) {

    int count = firstNonNull(countState.read(), 0);
    count = count + 1;
    countState.write(count);
    bufferState.add(context.element());

    if (count > MAX_BUFFER_SIZE) {
      for (EnrichedEvent enrichedEvent : enrichEvents(bufferState.read())) {
        context.output(enrichedEvent);
      }
      bufferState.clear();
      countState.clear();
    }
  }

  … TBD … 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

      下图是整个过程的示意图


这里写图片描述

  • • 浅蓝色的框表示 DoFn

  • • 黄色的框表示 @ProcessElement方法

  • • 输入事件是红色的小方块—为了简化表示,图中值画了一个key的情况,实际情况下几十万个可以的情况下,DoFn的处理与此类似。

  • • 每一个输入事件被写入到buffer缓冲区中,用红色的三角形表示,之所以不使用红色的方块表示,是因为在写入buffer缓冲区的时候,可能是原始的事件,可能能是根据业务需要处理过的事件,虽然此段代码中确实是将原始数据写入了State buffer缓冲区,但此处特意区分一下,更清晰的表达数据的处理过程。

  • • 每一个事件在调用第三方外部服务补充完信息后,用红色的圆圈表示,然后被依次发送给下游。

      到此为止,我们使用了State,但是还没有使用Timer定时器,这里有一个潜在的问题,如果没有新的事件进来,缓冲区未满,那么缓冲区中已经缓冲的事件,永远没有机会得到处理,所以,此时需要一个超时机制,当超过一定的时间,认为时间窗口超时,缓冲区虽然没有满,但仍然要触发一次回调函数,即便是没有新的数据来,所有的事件也会得到处理。

Event Time Timers

      增加一个时间时间定时器,当PCollection的Watermark触发的时候,调用回调函数。也就是说触发回调的时候会有两种情况:

  1. 缓冲区满

  2. 窗口超时

      如下代码所示,当窗口超时的时候,State事件缓冲区 buffer中的的事件被读取处理,进行一次RPC调用。

new DoFn<Event, EnrichedEvent>() {
  …

  @TimerId("expiry")
  private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
      ProcessContext context,
      BoundedWindow window,
      @StateId("buffer") BagState<Event> bufferState,
      @StateId("count") ValueState<Integer> countState,
      @TimerId("expiry") Timer expiryTimer) {

    expiryTimer.set(window.maxTimestamp().plus(allowedLateness));

    … same logic as above …
  }

  @OnTimer("expiry")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<Event> bufferState) {
    if (!bufferState.isEmpty().read()) {
      for (EnrichedEvent enrichedEvent : enrichEvents(bufferState.read())) {
        context.output(enrichedEvent);
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

代码片段解读:

  • • 首先使用@TimerId(“expiry”)声明了一个定时器,定时器的Id是expiry.接下来我们就可以使用这个定时器设定回调函数。

  • • 使用@@TimerId 声明的定时器变量expiryTimer,

  • • @ProcessElement 方法中,我们声明了@TimerId(“expiry”) Timer。Beam的执行引擎会自动的提供Timer的参数,我们可以来设定或者重新设定。但是重新设定Timer是性能杀手,所以简单的对每一个输入元素设定。

  • • 我们用 @OnTimer(“expiry”)声明了onExpiry方法,这个方法里会执行对第三方系统的RPC调用,并将计算结果发送给下游。


这里写图片描述

      @ProcessElement 和 @OnTimer(“expiry”) 方法都会访问State事件缓冲,执行相同的RPC调用,然后将数据发送到下游。

      现在如果实时计算的模式处理数据,对缓冲数据而言,会存在不可预知的延迟,如果Watermark进度太慢,或者事件时间窗口长度太长,在计算窗口结果之前会等待很长时间。此时可以使用计时器来限制等待的时钟时间,又叫做处理时间,超过一定的等待时间即便是缓冲区没有满,也会触发一次RPC调用,选择等待时间的时候需要考虑RPC调用对外部服务的冲击。

处理时间计时器Processing Time Timers

      处理时间计时器(与事件时间不同,处理时间一般晚于事件时间),相对来说简单,等待一个固定的时间段,然后执行一次回调。

      作为本例的最后一个部分,当事件写入State事件缓冲区 buffer的时候,我们设定一个处理时间定时器。我们跟踪定时器是否设定了,这样就不用每次都重新设定。当事件来的时候,如果定时器没有设定,设定定时器为(当前时间+MAX_BUFFER_DURATION),当设定的处理时间定时器超时的时候,触发回调函数,调用RPC,然后将数据发送给下游。

new DoFn<Event, EnrichedEvent>() {
  …
  private static final Duration MAX_BUFFER_DURATION = Duration.standardSeconds(1);

  @TimerId("stale")
  private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @ProcessElement
  public void process(
      ProcessContext context,
      BoundedWindow window,
      @StateId("count") ValueState<Integer> countState,
      @StateId("buffer") BagState<Event> bufferState,
      @TimerId("stale") Timer staleTimer,
      @TimerId("expiry") Timer expiryTimer) {

    boolean staleTimerSet = firstNonNull(staleSetState.read(), false);
    if (firstNonNull(countState.read(), 0) == 0) {
      staleTimer.offset(MAX_BUFFER_DURATION).setRelative());
    }

    … same processing logic as above …
  }

  @OnTimer("stale")
  public void onStale(
      OnTimerContext context,
      @StateId("buffer") BagState<Event> bufferState,
      @StateId("count") ValueState<Integer> countState) {
    if (!bufferState.isEmpty().read()) {
      for (EnrichedEvent enrichedEvent : enrichEvents(bufferState.read())) {
        context.output(enrichedEvent);
      }
      bufferState.clear();
      countState.clear();
    }
  }

  … same expiry as above …
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

      下图是完整的代码逻辑:


这里写图片描述

  • • 当事件被 @ProcessElement 处理的时候,首先在State中缓存事件。

  • • 如果事件产生的非常少,导致长时间没有填满缓冲区,那么会由时间定时器触发一次回调,从缓冲区取出所有缓存的事件,执行一次RPC条用,并将事件发送到下游.

  • • 最后,当窗口失效,并且在这个窗口中缓存的事件都被处理,并且发送到下游之后,Window就会被销毁。

Beam统一模型中的State和定时器

      使用Beam统一的流式和批处理,你不需要关注State和定时器的细节就,由底层的大数据处理引擎Spark flink等来处理细节问题。但是以下需要考虑的点:

基于事件窗口机制

      Apache Beam的一大特性是对乱序数据的良好应对,通过基于事件时间窗口机制,不管什么类型的时间窗口、事件的顺序混乱程度,都能得到正确的结果。

      如果编写有状态的适时的转换逻辑,也是一样通用的。如果选择了固定时间窗口,时间长度为1小时,或者时间长度为30分钟、每10分钟滑动一次的滑动窗口,这应该对于编写有状态的适时计算是透明的。


这里写图片描述

统一的实时和历史数据处理

      Beam第二大特性是统一的实时(流式)和历史(批量)数据处理的语义,事实上,Beam只有一套API,我们可以用相同的方式编写实时数据和历史数据(如每天存储在磁盘上经过压缩后的历史数据)处理的代码,不需要考虑两者的区别。

      历史数据可是能完全的乱序的。对历史数据分片进行处理时候的顺序,与实时处理时,事件的顺序完全不同。此时数据已经全部就位,数据延迟的问题也不存在了,对于执行引擎来说,不需要像实时处理一样需要等待时间窗口结束,这种等待往往是不精确的。不论是处理历史数据和实时数据几乎是一模一样的方式。


这里写图片描述

      有时,我们需要关注顺序或者数据处理的时序,这种情况下需要开发人员来进行处理。

全文完!
转载需标明文章来源!
http://blog.csdn.net/ffjl1985/article/details/78063757

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/969866
推荐阅读
相关标签
  

闽ICP备14008679号