当前位置:   article > 正文

akka学习之untypedpersistentactor_untypedactor

untypedactor

1.untypedpersistentactor:

          是一个持久的状态的actor。它能够坚持事件的记录,并可以以一种线程安全的方式对他们作出反应。它可以用来实现两个命令以及事件源的actor。当一个actor是启动或重新启动,日志消息回放,actor可以从这些消息中恢复内部状态。

2.快照的存储:

                快照存储持久性actor或视图的内部状态的快照。快照用于优化恢复时间。一个快照存储后端存储是可插拔。持久性扩展带有一个“本地”快照存储插件,它将写入到本地文件系统中。复制的快照存储是可用的社区插件。  

3.事件溯源Event sourcing

Event sourcing背后的基本理念是很简单的。一个持久的actor接收一个(非持久性)命令,它首先被验证,如果它可以应用到当前状态。这里的验证可以意味着任何东西,从简单的检查一个命令消息的字段到一个会话与几个外部服务,例如。如果验证成功,事件是由命令产生的,代表该命令的效果。然后,这些事件是持续的,成功的持久性后,用来改变actor的状态。当持续的actor需要恢复,只有坚持事件重演,我们知道他们可以成功地应用。换句话说,事件不能被持续的actor时,在相反的命令。Event sourcing的参与者当然也可以处理不改变应用程序状态的命令,例如查询命令。
AKKA坚持支持Event sourcing与untypedpersistentactor抽象类。一个扩展这个类的一个actor使用的是坚持和处理事件的坚持方法。一个untypedpersistentactor行为是通过实施receiverecover和receivecommand定义。这是证明在下面的例子。

  1. import akka.actor.ActorRef;
  2. import akka.actor.ActorSystem;
  3. import akka.actor.Props;
  4. import akka.japi.Procedure;
  5. import akka.persistence.SnapshotOffer;
  6. import akka.persistence.UntypedPersistentActor;
  7. import java.io.Serializable;
  8. import java.util.ArrayList;
  9. import static java.util.Arrays.asList;
  10. class Cmd implements Serializable {
  11. private static final long serialVersionUID = 1L;
  12. private final String data;
  13. public Cmd(String data) {
  14. this.data = data;
  15. }
  16. public String getData() {
  17. return data;
  18. }
  19. }
  20. class Evt implements Serializable {
  21. private static final long serialVersionUID = 1L;
  22. private final String data;
  23. public Evt(String data) {
  24. this.data = data;
  25. }
  26. public String getData() {
  27. return data;
  28. }
  29. }
  30. class ExampleState implements Serializable {
  31. private static final long serialVersionUID = 1L;
  32. private final ArrayList<String> events;
  33. public ExampleState() {
  34. this(new ArrayList<String>());
  35. }
  36. public ExampleState(ArrayList<String> events) {
  37. this.events = events;
  38. }
  39. public ExampleState copy() {
  40. return new ExampleState(new ArrayList<String>(events));
  41. }
  42. public void update(Evt evt) {
  43. events.add(evt.getData());
  44. }
  45. public int size() {
  46. return events.size();
  47. }
  48. @Override
  49. public String toString() {
  50. return events.toString();
  51. }
  52. }
  53. class ExamplePersistentActor extends UntypedPersistentActor {
  54. @Override
  55. public String persistenceId() { return "sample-id-1"; }
  56. private ExampleState state = new ExampleState();
  57. public int getNumEvents() {
  58. return state.size();
  59. }
  60. @Override
  61. public void onReceiveRecover(Object msg) {
  62. if (msg instanceof Evt) {
  63. state.update((Evt) msg);
  64. } else if (msg instanceof SnapshotOffer) {
  65. state = (ExampleState)((SnapshotOffer)msg).snapshot();
  66. } else {
  67. unhandled(msg);
  68. }
  69. }
  70. @Override
  71. public void onReceiveCommand(Object msg) {
  72. if (msg instanceof Cmd) {
  73. final String data = ((Cmd)msg).getData();
  74. final Evt evt1 = new Evt(data + "-" + getNumEvents());
  75. final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1));
  76. persistAll(asList(evt1, evt2), new Procedure<Evt>() {
  77. public void apply(Evt evt) throws Exception {
  78. state.update(evt);
  79. if (evt.equals(evt2)) {
  80. getContext().system().eventStream().publish(evt);
  81. }
  82. }
  83. });
  84. } else if (msg.equals("snap")) {
  85. // IMPORTANT: create a copy of snapshot
  86. // because ExampleState is mutable !!!
  87. saveSnapshot(state.copy());
  88. } else if (msg.equals("print")) {
  89. System.out.println(state);
  90. } else {
  91. unhandled(msg);
  92. }
  93. }
  94. }
这个例子中定义了两个数据类型,CMD和EVT代表命令和事件,分别。examplepersistentactor的状态是一个持续的事件数据包含在examplestate。
持续的actor的onreceiverecover方法定义了状态更新恢复期间处理EVT和SnapshotOffer的消息。持续的actor的onreceivecommand方法是命令处理程序。在这个示例中,一个命令是通过生成两个事件来处理的,然后持续和处理。事件是持续的,调用坚持以一个事件(或一系列事件)作为第一个参数和一个事件处理程序作为第二个参数。
持久化方法持续存在事件,并执行事件处理程序以成功执行持续的事件。成功地持续的事件在内部发送回持续的actor作为单独的消息,触发事件处理程序执行。事件处理程序可以在持续的actor状态和变异。持续事件的发送者是相应命令的发送者。这允许事件处理程序回复到命令的发送端(未显示)。
事件处理程序的主要职责是使用事件数据改变持续状态的状态,并通过发布事件通知其他人关于成功状态更改的通知。
当坚持持续的事件时,保证持久的actor不会在关联的事件处理程序的持续调用和执行之间接收更多的命令。这也适用于单个命令的上下文中的多个持久性调用。传入的消息都藏到坚持完成。
如果一个事件持续失败,onpersistfailure将被调用(默认记录错误),和actor将无条件停止。如果一个事件持续被拒绝在储存之前,由于序列化错误,onpersistrejected将被调用(默认情况下记录一个警告),和actor继续下一个消息。
最简单的方式来运行这个例子自己下载lightbend激活和打开教程为java akka坚持样品。它包含有关如何运行persistentactorexample。


切换不同的命令处理程序之间的正常处理和恢复过程中getcontext()也是有可能的。become()和getcontext()。unbecome()。让actor在同一状态恢复后,你需要特别注意执行相同的状态转换成为unbecome在receiverecover方法你会在命令处理程序完成。注意,当使用成为receiverecover仍只使用receiverecover行为回放时的事件。当重播完成后,它将使用新的行为。


Identifiers


一个持久的actor必须有一个标识符,并没有改变在不同的actor的化身。标识符必须与persistenceid方法定义。
  1. @Override
  2. publicStringpersistenceId(){
  3. return"my-stable-persistence-id";
  4. }

Recovery

默认情况下,一个持久的actor自动恢复启动,通过重放日志消息重启。新的消息发送到一个持久的actor在恢复期间不影响重播信息。他们被缓存和接收由一个持久的 actor恢复阶段完成后。

对于重播信息访问sender()总是会导致一个deadletters参考,作为原始发件人可能是长了。如果你真的要通知actor在未来恢复期间,储存actorpath明确你坚持事件。
Recovery customization

应用程序还可以自定义如何恢复是在一个untypedpersistentactor返回一个定制的恢复恢复对象上执行,例如设置一个上限重播使actor要重播到一定点“过去”的最新状态:

    
    
  1. @Override
  2. public Recovery recovery() {
  3. return Recovery.create(457L);
  4. }

Recovery can be disabled by returning Recovery.none() in the recovery method of a PersistentActor:

    
    
  1. @Override
  2. public Recovery recovery() {
  3. return Recovery.none();
  4. }

Recovery status


一个持久的actor可以通过方法查询自己的恢复状态

1. public boolean recoveryRunning();

2. public boolean recoveryFinished();


有时有需要进行额外的初始化时,恢复完成前处理任何其他消息发送到持久的actor。持续的actor会收到后恢复和建造其他接收消息的特殊recoverycompleted消息

1. @Override

2. public void onReceiveRecover(Object message) {

3.   if (message instanceof RecoveryCompleted) {

4.     // perform init after recovery, beforeany other messages

5.   }

6. }

7.     

8. @Override

9. public void onReceiveCommand(Object message)throwsException {

10.  if (message instanceof String) {

11.    // ...

12.  }else {

13.    unhandled(message);

14.  }

15.}

如果有从日志恢复actor的状态问题,onrecoveryfailure叫做(默认记录错误)和actorwill停止。

Internal stash

持续的actor有一个私有的stash内部缓存的传入消息中恢复或thepersist \ persistall方法坚持事件。你仍然可以使用/继承stash接口。internal stash与正常的连接到unstashall方法并确保信息正确unstashed内部储备来维持有序的保证。
你应该小心不要发送更多的消息的一个持久的actor比能跟上,否则将消息数量将增长没有界限。它可以防止OutOfMemoryError定义一个最大的藏在邮箱配置能力智慧:

1. akka.actor.default-mailbox.stash-capacity=10000

请注意,藏量每actor。如果你有很多持久的actor,如采用聚类分片的时候,你可能需要定义一个小的储藏能力,保证系统中的总含量的消息数不消耗太多的内存。此外,持续的actor定义了三个策略来处理故障时,内藏容量超过。默认溢出策略是throwoverflowexceptionstrategy,丢弃当前收到的消息并抛出一个stashoverflowexception,导致actor重启如果默认的监管策略应用。您可以重写internalstashoverflowstrategy方法返回discardtodeadletterstrategy orreplytostrategy任何“个人”执着的actor,或定义的“默认”通过提供fqcn所有执着的 actor,这是必须的stashoverflowstrategyconfigurator的子类,在持久的配置:

1. akka.persistence.internal-stash-overflow-strategy=

2.   "akka.persistence.ThrowExceptionConfigurator"

discardtodeadletterstrategy策略也有预包装的同伴configuratorakka.persistence.discardconfigurator。
你也可以查询默认策略通过akka持久化扩展的单例:

1. Persistence.get(context().system()).defaultInternalStashOverflowStrategy();

Relaxed local consistency requirementsand high throughput use-cases

如果面对轻松的局部一致性要求和高吞吐量的需求,有时persistentactor及其坚持不在以很高的速度消耗输入命令的话,因为它要等到一个给定的命令相关的所有事件的处理,开始处理下一个命令。而这种抽象为大多数情况下是非常有用的,有时你可能会面临放松要求一致性–例如你可能要处理命令尽可能快的速度,假设事件最终会持续在后台处理得当,反应持续的失败如果需要追溯。
的persistasync方法实现高通量的持续的actor提供了一个工具。它不会把输入的命令,日志仍然在坚持和/或用户代码执行事件回调。
在下面的例子中,事件回调可能被称为“在任何时间,甚至在接下来的命令已被处理。事件之间的顺序仍然是保证(“evt-b-1”将被发送后的“evt-a-2”,这将给在“evt-a-1”等)。


1. class MyPersistentActor extends UntypedPersistentActor {

2.     @Override

3.     public String persistenceId(){return"some-persistence-id"; }

4.  

5.     @Override

6.     public void onReceiveRecover(Object msg) {

7.         // handlerecovery here

8.     }

9.  

10.    @Override

11.    public void onReceiveCommand(Object msg) {

12.        sender().tell(msg,self());

13. 

14.        persist Async(String.format("evt-%s-1", msg),new Procedure<String>(){

15.            @Override

16.            public void apply(Stringevent)throws Exception {

17.                sender().tell(event,self());

18.            }

19.        });

20.        persist Async(String.format("evt-%s-2", msg),new Procedure<String>(){

21.            @Override

22.            public void apply(Stringevent)throws Exception {

23.                sender().tell(event,self());

24.            }

25.        });

26.    }

27.}


为了实现模式称为“命令采购”只是persistasync所有传入消息马上在回调处理

Warning

回调不会被调用,如果actor是重新启动(或停止)在persistasync调用和日志之间已经证实写。


Deferring actions until precedingpersist handlers have executed

一些actor以“'happens-after前persistasync句柄被调用”。persistentactor提供了一种实用的方法称为deferasync,其工作方式类似于persistasync却不坚持过去的事件。建议使用它来读取操作,以及在域模型中没有相应的事件的操作。
使用这种方法是非常相似的方法的坚持的家庭,但它不坚持在事件中通过。它将保存在内存中,并在调用处理程序时使用。

1. classMyPersistentActorextendsUntypedPersistentActor {

2.     @Override

3.     publicString persistenceId(){return"some-persistence-id"; }

4.  

5.     @Override

6.     publicvoidonReceiveRecover(Object msg) {

7.         // handlerecovery here

8.     }

9.  

10.    @Override

11.    public void onReceiveCommand(Object msg) {

12.        final Procedure<String>replyToSender =new Procedure<String>() {

13.            @Override

14.            public void apply(Stringevent)throws Exception {

15.                sender().tell(event,self());

16.            }

17.        };

18. 

19.        persistAsync(String.format("evt-%s-1", msg),replyToSender);

20.        persistAsync(String.format("evt-%s-2", msg),replyToSender);

21.        deferAsync(String.format("evt-%s-3", msg),replyToSender);

22.    }

23.}

请注意,sender()安全访问处理回调,并将指向原始发件人的命令,这deferasync处理器被称为。

Nested persist calls

可以叫坚持和persistasync各自回调块内和他们能够保留其线程安全(包括sender()权价值)以及将保证。
在一般情况下,鼓励创建命令处理程序,不需要诉诸嵌套的事件坚持,但是有情况下,它可能是有用的。这是了解这些情况的回调执行顺序的重要,以及他们对隐藏行为的含义(即persist()执行)。在下面的示例中,两个坚持调用是发出的,并且它们中的每一个都将发出另一个持续的回调函数:

1. @Override

2. publicvoid onReceiveCommand(Object msg) {

3.     finalProcedure<String>replyToSender =newProcedure<String>() {

4.         @Override

5.         publicvoid apply(Stringevent)throwsException {

6.             sender().tell(event,self());

7.         }

8.     };

9.  

10.    finalProcedure<String>outer1Callback =newProcedure<String>() {

11.        @Override

12.        publicvoid apply(Stringevent)throwsException {

13.            sender().tell(event,self());

14.            persist(String.format("%s-inner-1", msg),replyToSender);

15.        }

16.    };

17.    finalProcedure<String>outer2Callback =newProcedure<String>() {

18.        @Override

19.        publicvoid apply(Stringevent)throwsException {

20.            sender().tell(event,self());

21.            persist(String.format("%s-inner-2", msg),replyToSender);

22.        }

23.    };

24. 

25.    persist(String.format("%s-outer-1", msg),outer1Callback);

26.    persist(String.format("%s-outer-2", msg),outer2Callback);

27.}

在这种情况下没有把发生的事件,但仍坚持回调在预期的顺序执行:

 

1. persistentActor.tell("a",ActorRef.noSender());

2. persistentActor.tell("b",ActorRef.noSender());

3.  

4. // order ofreceived messages:

5. // a

6. // b

7. // a-outer-1

8. // a-outer-2

9. // b-outer-1

10.// b-outer-2

11.// a-inner-1

12.// a-inner-2

13.// b-inner-1

14.// b-inner-2

15. 

16.// which can beseen as the following causal relationship:

17.// a ->a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2

18.// b ->b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2



虽然它可能是巢混合坚持persistasync保持各自的语义是不推荐的做法,因为它可能会导致过于复杂的嵌套。

Failures

如果一个事件持续失败,onpersistfailure将被调用(默认记录错误),和actor将无条件停止。
当坚持失败时,它不能恢复的原因是,它是未知的,如果该事件实际上是持续或不,因此它是在一个不一致的状态。重新启动持久性故障将最有可能失败,因为该日志可能是不可用的。最好是停止actor和后一个后退超时重新开始它。theakka.pattern.backoffsupervisor  actor提供支持这样的重新启动。


1. @Override

2. publicvoid preStart() throws Exception {

3.   finalProps childProps =Props.create(MyPersistentActor1.class);

4.   finalProps props =BackoffSupervisor.props(

5.     childProps,

6.     "myActor",

7.     Duration.create(3,TimeUnit.SECONDS),

8.     Duration.create(30,TimeUnit.SECONDS),

9.     0.2);

10.  getContext().actorOf(props,"mySupervisor");

11.  super.preStart();

12.}

如果一个事件持续被拒绝在储存之前,由于序列化错误,onpersistrejected将被调用(默认情况下记录一个警告),和actor继续下一个消息。
如果有一个问题,从日志恢复的actor状态当actor开始,onrecoveryfailure叫做(默认记录错误),和actor将停止。


Atomic writes

每个事件当然是存储的原子,但它也可以存储几个事件原子利用thepersistall或persistallasync方法。这意味着所有传递给该方法的事件都被存储或没有存储,如果有一个错误。
一个持久的复苏将因此 actor不会做的部分只有事件持续bypersistall子集。
有些日志可能不支持原子写几个事件,他们就会拒绝persistall命令,i.e.onpersistrejected叫做例外(通常UnsupportedOperationException)。

Batch writes

为了使用persistasync当优化吞吐量,持续的 actor内部批事件被存储在高负荷下之前写他们的日记(作为一个批处理)。批量大小是动态确定的,有多少事件是在日记往返的时间:发送一个批处理后,没有进一步的批处理可以发送之前确认已收到,以前的批处理已被写入。批量写入是从来没有计时器的基础上,保持在最低限度的延迟。

Message deletion

可以删除所有邮件(日志由一个持久的 actor)到一个指定的序列号;持久的 actor可以叫deletemessages方法为此。
删除事件采购应用消息通常不使用,或配合使用withsnapshotting快照之后,即已成功保存,一deletemessages(tosequencenr)直到序列号的数据由快照可以发出安全地删除以前的事件仍有积累的状态在重赛通过加载快照。
deletemessages请求的结果暗示的持久的 actor有deletemessagessuccess信息如果删除成功或deletemessagesfailure消息如果失败。
信息缺失不影响日志最高的序列号,即使所有的信息被删除后,从这deletemessages调用。


Persistence status handling

Persisting,deleting, and replaying messages can either succeed or fail.

Method

Success

Failure / Rejection

After failure handler invoked

persist / persistAsync

persist handler invoked

onPersistFailure

Actor is stopped.

onPersistRejected

No automatic actions.

recovery

RecoverySuccess

onRecoveryFailure

Actor is stopped.

deleteMessages

DeleteMessagesSuccess

DeleteMessagesFailure

No automatic actions.

最重要的行动(坚持和恢复)故障处理程序为明确的回调,用户可以覆盖在persistentactor。这些处理程序的默认实现发出登录消息(误差持续/恢复失败,并警告他人),记录关于这消息引起故障的故障原因和信息。
对于关键的失败,如恢复或持久性事件失败,将在调用失败处理程序后停止持久性actor的失败。这是因为,如果底层实现持久性故障信号日志最有可能的是它未能完全超载和重启马上试图坚持事件再次将最有可能帮助恢复–杂志因为它可能会导致一大群的问题,很多actir都会重启和持续努力坚持他们的事件再次。相反,使用一个backoffsupervisor(如故障),实现了指数退避策略,允许更多的呼吸空间日志之间的持久的actor重新恢复。

日志实现可以选择实施一个重试机制,例如,这样,只有在一个写失败N多次的持久性失败是向用户发出信号。换句话说,一旦日志返回一个失败,它是由akka坚持认为是致命的,造成失败的执着的actor将停止。
检查日记实现的文档,您正在使用该技术的详细信息,如果/它是如何使用这种技术。

Safely shutting down persistent actors

当关闭来自外部的 persistentactor时,应该给予特别的照顾。与正常的actor通常是使用特殊poisonpill消息信号的一个 actor,应该停止本身一旦事实上这个消息是由akka自动处理接收到该消息–接受,离开目标的 actor没有办法拒绝停止自己当了毒丸。
这是危险的当使用persistentactor由于传入的命令是藏而持久的 actor正在等待从日志确认事件已经写persist()时使用。因为输入的命令将被从 actor的邮箱并投入其内部藏匿在等待确认(因此,呼吁坚持程序之前)的 actor可以接收和(自动)处理过程poisonpill之前已投入其藏匿的其他消息,造成预成熟的 actor关机。
警告
考虑使用显式关闭的消息,而不是poisonpill工作持续的 actor时。
下面的例子强调了消息到达 actor的邮箱和他们如何与内部隐藏的作用机理persist()时使用。注意提前停止行为时所发生的poisonpill使用:

 

1. final class Shutdown {}

2.  

3. class MyPersistentActor extend sUntypedPersistentActor {

4.     @Override

5.     public String persistenceId(){

6.         return"some-persistence-id";

7.     }

8.  

9.     @Override

10.    public void onReceiveCommand(Object msg)throwsException {

11.        if (msginstanceof Shutdown) {

12.            context().stop(self());

13.        }elseif (msginstanceof String) {

14.            System.out.println(msg);

15.            persist("handle-" + msg,new Procedure<String>() {

16.                @Override

17.                publicvoid apply(String param)throwsException {

18.                    System.out.println(param);

19.                }

20.            });

21.        }else unhandled(msg);

22.    }

23. 

24.    @Override

25.    public void onReceiveRecover(Object msg)throwsException {

26.        // handle recovery...

27.    }

28.}

1. // UN-SAFE, dueto PersistentActor's command stashing:

2. persistentActor.tell("a",ActorRef.noSender());

3. persistentActor.tell("b",ActorRef.noSender());

4. persistentActor.tell(PoisonPill.getInstance(),ActorRef.noSender());

5. // order ofreceived messages:

6. // a

7. //   # b arrives at mailbox, stashing;        internal-stash = [b]

8. //   # PoisonPill arrives at mailbox, stashing;internal-stash = [b, Shutdown]

9. // PoisonPill isan AutoReceivedMessage, is handled automatically

10.// !! stop !!

11.// Actor is stoppedwithout handling `b` nor the `a` handler!

1. // SAFE:

2. persistentActor.tell("a",ActorRef.noSender());

3. persistentActor.tell("b",ActorRef.noSender());

4. persistentActor.tell(newShutdown(),ActorRef.noSender());

5. // order ofreceived messages:

6. // a

7. //   # b arrives at mailbox, stashing;        internal-stash = [b]

8. //   # Shutdown arrives at mailbox, stashing;internal-stash = [b, Shutdown]

9. // handle-a

10.//   # unstashing;                            internal-stash =[Shutdown]

11.// b

12.// handle-b

13.//   # unstashing;                            internal-stash = []

14.// Shutdown

15.// -- stop --





声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号