当前位置:   article > 正文

akka 持久化_使用Akka Persistence使React性队列持久

akka 持久化 批处理消息

akka 持久化

前段时间,我写了如何使用Akka Streams实现React消息队列 。 队列支持带有反向压力的流发送和接收操作,但有一个缺点:所有消息都存储在内存中,因此在重新启动的情况下会丢失。

但这可以通过实验性的akka-persistence模块轻松解决​​,该模块在Akka 2.3.4中进行了更新

队列参与者刷新

为了使队列持久,我们只需要更改队列参与者即可。 React/流部件保持完整。 提醒一下,React式队列包括:

  • 单个队列参与者 ,其中包含要传递的消息的内部优先级队列。 队列参与者接受参与者消息以发送,接收和删除队列消息
  • 一个Broker ,它创建队列参与者,侦听来自发送者和接收者的连接,并在建立连接时创建React式流
  • 一个Sender ,它将消息发送到队列(用于测试,每秒发送一条消息)。 可以启动多个发件人。 仅在可以接受邮件时发送邮件(来自代理的背压)
  • 一个Receiver ,它从队列中接收消息,这些消息变得可用并且可以处理(来自接收器的反压)。

reactmq-actors1

坚持不懈(保持被动)

所需的更改很小。

首先, QueueActor需要扩展PersistentActor ,并定义两个方法:

  • receiveCommand ,它定义actor消息(命令)到达时的“正常”行为
  • receiveRecover ,仅在恢复期间使用,并将重播事件发送receiveRecover位置

但是为了恢复,我们首先需要坚持一些事件! 在处理消息队列操作时,当然应该这样做。

例如,发送消息时,使用persistAsync持久保存MessageAdded事件:

  1. def handleQueueMsg: Receive = {
  2. case SendMessage(content) =>
  3. val msg = sendMessage(content)
  4. persistAsync(msg.toMessageAdded) { msgAdded =>
  5. sender() ! SentMessage(msgAdded.id)
  6. tryReply()
  7. }
  8.  
  9. // ...
  10. }

persistAsync是使用akka-persistence持久化事件的一种方法。 另一个, persist (也是默认值),缓冲后续命令(actor-messages),直到事件被持久为止。 这有点慢,但也更容易推理并保持一致。 但是,在消息队列中,这种行为不是必需的。 我们唯一需要保证的是,仅在事件持续后才确认消息发送。 这就是为什么在持久事件处理程序中发送回复的原因。 您可以在docs中阅读有关persistAsync更多信息。

类似地,事件对于其他命令(actor-messages,请参阅QueueActorReceiveQueueActorReceive 。 对于删除和接收,我们都使用persistAsync ,因为队列旨在提供至少一次的交付保证。

最后一个组件是恢复处理程序,该处理程序在QueueActorRecover定义(然后在QueueActor )。 恢复非常简单:事件对应于添加新消息,更新“下一次传递”时间戳或删除。

内部表示同时使用优先级队列和by-id映射以提高效率,因此,在RecoveryCompleted过程中处理事件时,我们仅构建映射,并使用RecoveryCompleted特殊事件构建队列。 特殊事件由akka-persistence自动发送。

就这样! 如果现在运行代理,发送一些消息,停止代理,然后再次启动,您将看到消息已恢复,并且实际上,如果运行接收器,则消息会被接收。

该代码当然不是生产准备就绪的。 事件日志将不断增长,因此使用快照 ,删除旧事件/快照以减小存储大小并快速恢复无疑是有意义的。

复写

既然队列是持久的,我们几乎还可以免费获得一个复制的持久队列 :我们只需要使用其他日志插件即可 ! 默认值依赖LevelDB并将数据写入本地磁盘。 其他实现可用: CassandraHBaseMongo

对持久性后端进行简单的切换,我们就可以在集群中复制消息。

摘要

借助两个实验性的Akka模块, 响应流持久性 ,我们已经能够以最少的代码实现持久的响应队列。 这仅仅是开始,因为这两种技术才刚刚开始成熟!

翻译自: https://www.javacodegeeks.com/2014/07/making-the-reactive-queue-durable-with-akka-persistence.html

akka 持久化

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/179724
推荐阅读
  

闽ICP备14008679号