赞
踩
1.untypedpersistentactor:
是一个持久的状态的actor。它能够坚持事件的记录,并可以以一种线程安全的方式对他们作出反应。它可以用来实现两个命令以及事件源的actor。当一个actor是启动或重新启动,日志消息回放,actor可以从这些消息中恢复内部状态。
2.快照的存储:
快照存储持久性actor或视图的内部状态的快照。快照用于优化恢复时间。一个快照存储后端存储是可插拔。持久性扩展带有一个“本地”快照存储插件,它将写入到本地文件系统中。复制的快照存储是可用的社区插件。
3.事件溯源(Event sourcing)
Event sourcing背后的基本理念是很简单的。一个持久的actor接收一个(非持久性)命令,它首先被验证,如果它可以应用到当前状态。这里的验证可以意味着任何东西,从简单的检查一个命令消息的字段到一个会话与几个外部服务,例如。如果验证成功,事件是由命令产生的,代表该命令的效果。然后,这些事件是持续的,成功的持久性后,用来改变actor的状态。当持续的actor需要恢复,只有坚持事件重演,我们知道他们可以成功地应用。换句话说,事件不能被持续的actor时,在相反的命令。Event sourcing的参与者当然也可以处理不改变应用程序状态的命令,例如查询命令。
AKKA坚持支持Event sourcing与untypedpersistentactor抽象类。一个扩展这个类的一个actor使用的是坚持和处理事件的坚持方法。一个untypedpersistentactor行为是通过实施receiverecover和receivecommand定义。这是证明在下面的例子。
- import akka.actor.ActorRef;
- import akka.actor.ActorSystem;
- import akka.actor.Props;
- import akka.japi.Procedure;
- import akka.persistence.SnapshotOffer;
- import akka.persistence.UntypedPersistentActor;
-
- import java.io.Serializable;
- import java.util.ArrayList;
- import static java.util.Arrays.asList;
-
- class Cmd implements Serializable {
- private static final long serialVersionUID = 1L;
- private final String data;
-
- public Cmd(String data) {
- this.data = data;
- }
-
- public String getData() {
- return data;
- }
- }
-
- class Evt implements Serializable {
- private static final long serialVersionUID = 1L;
- private final String data;
-
- public Evt(String data) {
- this.data = data;
- }
-
- public String getData() {
- return data;
- }
- }
-
- class ExampleState implements Serializable {
- private static final long serialVersionUID = 1L;
- private final ArrayList<String> events;
-
- public ExampleState() {
- this(new ArrayList<String>());
- }
-
- public ExampleState(ArrayList<String> events) {
- this.events = events;
- }
-
- public ExampleState copy() {
- return new ExampleState(new ArrayList<String>(events));
- }
-
- public void update(Evt evt) {
- events.add(evt.getData());
- }
-
- public int size() {
- return events.size();
- }
-
- @Override
- public String toString() {
- return events.toString();
- }
- }
-
- class ExamplePersistentActor extends UntypedPersistentActor {
- @Override
- public String persistenceId() { return "sample-id-1"; }
-
- private ExampleState state = new ExampleState();
-
- public int getNumEvents() {
- return state.size();
- }
-
- @Override
- public void onReceiveRecover(Object msg) {
- if (msg instanceof Evt) {
- state.update((Evt) msg);
- } else if (msg instanceof SnapshotOffer) {
- state = (ExampleState)((SnapshotOffer)msg).snapshot();
- } else {
- unhandled(msg);
- }
- }
-
- @Override
- public void onReceiveCommand(Object msg) {
- if (msg instanceof Cmd) {
- final String data = ((Cmd)msg).getData();
- final Evt evt1 = new Evt(data + "-" + getNumEvents());
- final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1));
- persistAll(asList(evt1, evt2), new Procedure<Evt>() {
- public void apply(Evt evt) throws Exception {
- state.update(evt);
- if (evt.equals(evt2)) {
- getContext().system().eventStream().publish(evt);
- }
- }
- });
- } else if (msg.equals("snap")) {
- // IMPORTANT: create a copy of snapshot
- // because ExampleState is mutable !!!
- saveSnapshot(state.copy());
- } else if (msg.equals("print")) {
- System.out.println(state);
- } else {
- unhandled(msg);
- }
- }
- }
这个例子中定义了两个数据类型,CMD和EVT代表命令和事件,分别。examplepersistentactor的状态是一个持续的事件数据包含在examplestate。注
切换不同的命令处理程序之间的正常处理和恢复过程中getcontext()也是有可能的。become()和getcontext()。unbecome()。让actor在同一状态恢复后,你需要特别注意执行相同的状态转换成为unbecome在receiverecover方法你会在命令处理程序完成。注意,当使用成为receiverecover仍只使用receiverecover行为回放时的事件。当重播完成后,它将使用新的行为。
应用程序还可以自定义如何恢复是在一个untypedpersistentactor返回一个定制的恢复恢复对象上执行,例如设置一个上限重播使actor要重播到一定点“过去”的最新状态:
- @Override
- public Recovery recovery() {
- return Recovery.create(457L);
- }
Recovery can be disabled by returning Recovery.none() in the recovery method of a PersistentActor:
- @Override
- public Recovery recovery() {
- return Recovery.none();
- }
Recovery status
1. public boolean recoveryRunning();
2. public boolean recoveryFinished();
有时有需要进行额外的初始化时,恢复完成前处理任何其他消息发送到持久的actor。持续的actor会收到后恢复和建造其他接收消息的特殊recoverycompleted消息
1. @Override
2. public void onReceiveRecover(Object message) {
3. if (message instanceof RecoveryCompleted) {
4. // perform init after recovery, beforeany other messages
5. }
6. }
7.
8. @Override
9. public void onReceiveCommand(Object message)throwsException {
10. if (message instanceof String) {
11. // ...
12. }else {
13. unhandled(message);
14. }
15.}
如果有从日志恢复actor的状态问题,onrecoveryfailure叫做(默认记录错误)和actorwill停止。
Internal stash
持续的actor有一个私有的stash内部缓存的传入消息中恢复或thepersist \ persistall方法坚持事件。你仍然可以使用/继承stash接口。internal stash与正常的连接到unstashall方法并确保信息正确unstashed内部储备来维持有序的保证。1. akka.actor.default-mailbox.stash-capacity=10000
请注意,藏量每actor。如果你有很多持久的actor,如采用聚类分片的时候,你可能需要定义一个小的储藏能力,保证系统中的总含量的消息数不消耗太多的内存。此外,持续的actor定义了三个策略来处理故障时,内藏容量超过。默认溢出策略是throwoverflowexceptionstrategy,丢弃当前收到的消息并抛出一个stashoverflowexception,导致actor重启如果默认的监管策略应用。您可以重写internalstashoverflowstrategy方法返回discardtodeadletterstrategy orreplytostrategy任何“个人”执着的actor,或定义的“默认”通过提供fqcn所有执着的 actor,这是必须的stashoverflowstrategyconfigurator的子类,在持久的配置:1. akka.persistence.internal-stash-overflow-strategy=
2. "akka.persistence.ThrowExceptionConfigurator"
discardtodeadletterstrategy策略也有预包装的同伴configuratorakka.persistence.discardconfigurator。
你也可以查询默认策略通过akka持久化扩展的单例:
1. Persistence.get(context().system()).defaultInternalStashOverflowStrategy();
Relaxed local consistency requirementsand high throughput use-cases
如果面对轻松的局部一致性要求和高吞吐量的需求,有时persistentactor及其坚持不在以很高的速度消耗输入命令的话,因为它要等到一个给定的命令相关的所有事件的处理,开始处理下一个命令。而这种抽象为大多数情况下是非常有用的,有时你可能会面临放松要求一致性–例如你可能要处理命令尽可能快的速度,假设事件最终会持续在后台处理得当,反应持续的失败如果需要追溯。1. class MyPersistentActor extends UntypedPersistentActor {
2. @Override
3. public String persistenceId(){return"some-persistence-id"; }
4.
5. @Override
6. public void onReceiveRecover(Object msg) {
7. // handlerecovery here
8. }
9.
10. @Override
11. public void onReceiveCommand(Object msg) {
12. sender().tell(msg,self());
13.
14. persist Async(String.format("evt-%s-1", msg),new Procedure<String>(){
15. @Override
16. public void apply(Stringevent)throws Exception {
17. sender().tell(event,self());
18. }
19. });
20. persist Async(String.format("evt-%s-2", msg),new Procedure<String>(){
21. @Override
22. public void apply(Stringevent)throws Exception {
23. sender().tell(event,self());
24. }
25. });
26. }
27.}
注Warning
回调不会被调用,如果actor是重新启动(或停止)在persistasync调用和日志之间已经证实写。
Deferring actions until precedingpersist handlers have executed
一些actor以“'happens-after前persistasync句柄被调用”。persistentactor提供了一种实用的方法称为deferasync,其工作方式类似于persistasync却不坚持过去的事件。建议使用它来读取操作,以及在域模型中没有相应的事件的操作。1. classMyPersistentActorextendsUntypedPersistentActor {
2. @Override
3. publicString persistenceId(){return"some-persistence-id"; }
4.
5. @Override
6. publicvoidonReceiveRecover(Object msg) {
7. // handlerecovery here
8. }
9.
10. @Override
11. public void onReceiveCommand(Object msg) {
12. final Procedure<String>replyToSender =new Procedure<String>() {
13. @Override
14. public void apply(Stringevent)throws Exception {
15. sender().tell(event,self());
16. }
17. };
18.
19. persistAsync(String.format("evt-%s-1", msg),replyToSender);
20. persistAsync(String.format("evt-%s-2", msg),replyToSender);
21. deferAsync(String.format("evt-%s-3", msg),replyToSender);
22. }
23.}
请注意,sender()安全访问处理回调,并将指向原始发件人的命令,这deferasync处理器被称为。
Nested persist calls
可以叫坚持和persistasync各自回调块内和他们能够保留其线程安全(包括sender()权价值)以及将保证。1. @Override
2. publicvoid onReceiveCommand(Object msg) {
3. finalProcedure<String>replyToSender =newProcedure<String>() {
4. @Override
5. publicvoid apply(Stringevent)throwsException {
6. sender().tell(event,self());
7. }
8. };
9.
10. finalProcedure<String>outer1Callback =newProcedure<String>() {
11. @Override
12. publicvoid apply(Stringevent)throwsException {
13. sender().tell(event,self());
14. persist(String.format("%s-inner-1", msg),replyToSender);
15. }
16. };
17. finalProcedure<String>outer2Callback =newProcedure<String>() {
18. @Override
19. publicvoid apply(Stringevent)throwsException {
20. sender().tell(event,self());
21. persist(String.format("%s-inner-2", msg),replyToSender);
22. }
23. };
24.
25. persist(String.format("%s-outer-1", msg),outer1Callback);
26. persist(String.format("%s-outer-2", msg),outer2Callback);
27.}
在这种情况下没有把发生的事件,但仍坚持回调在预期的顺序执行:
1. persistentActor.tell("a",ActorRef.noSender());
2. persistentActor.tell("b",ActorRef.noSender());
3.
4. // order ofreceived messages:
5. // a
6. // b
7. // a-outer-1
8. // a-outer-2
9. // b-outer-1
10.// b-outer-2
11.// a-inner-1
12.// a-inner-2
13.// b-inner-1
14.// b-inner-2
15.
16.// which can beseen as the following causal relationship:
17.// a ->a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2
18.// b ->b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2
Failures
如果一个事件持续失败,onpersistfailure将被调用(默认记录错误),和actor将无条件停止。1. @Override
2. publicvoid preStart() throws Exception {
3. finalProps childProps =Props.create(MyPersistentActor1.class);
4. finalProps props =BackoffSupervisor.props(
5. childProps,
6. "myActor",
7. Duration.create(3,TimeUnit.SECONDS),
8. Duration.create(30,TimeUnit.SECONDS),
9. 0.2);
10. getContext().actorOf(props,"mySupervisor");
11. super.preStart();
12.}
如果一个事件持续被拒绝在储存之前,由于序列化错误,onpersistrejected将被调用(默认情况下记录一个警告),和actor继续下一个消息。Atomic writes
每个事件当然是存储的原子,但它也可以存储几个事件原子利用thepersistall或persistallasync方法。这意味着所有传递给该方法的事件都被存储或没有存储,如果有一个错误。Batch writes
为了使用persistasync当优化吞吐量,持续的 actor内部批事件被存储在高负荷下之前写他们的日记(作为一个批处理)。批量大小是动态确定的,有多少事件是在日记往返的时间:发送一个批处理后,没有进一步的批处理可以发送之前确认已收到,以前的批处理已被写入。批量写入是从来没有计时器的基础上,保持在最低限度的延迟。Message deletion
可以删除所有邮件(日志由一个持久的 actor)到一个指定的序列号;持久的 actor可以叫deletemessages方法为此。Persistence status handling
Persisting,deleting, and replaying messages can either succeed or fail.
Method | Success | Failure / Rejection | After failure handler invoked |
persist / persistAsync | persist handler invoked | onPersistFailure | Actor is stopped. |
onPersistRejected | No automatic actions. | ||
recovery | RecoverySuccess | onRecoveryFailure | Actor is stopped. |
deleteMessages | DeleteMessagesSuccess | DeleteMessagesFailure | No automatic actions. |
最重要的行动(坚持和恢复)故障处理程序为明确的回调,用户可以覆盖在persistentactor。这些处理程序的默认实现发出登录消息(误差持续/恢复失败,并警告他人),记录关于这消息引起故障的故障原因和信息。
对于关键的失败,如恢复或持久性事件失败,将在调用失败处理程序后停止持久性actor的失败。这是因为,如果底层实现持久性故障信号日志最有可能的是它未能完全超载和重启马上试图坚持事件再次将最有可能帮助恢复–杂志因为它可能会导致一大群的问题,很多actir都会重启和持续努力坚持他们的事件再次。相反,使用一个backoffsupervisor(如故障),实现了指数退避策略,允许更多的呼吸空间日志之间的持久的actor重新恢复。
注
日志实现可以选择实施一个重试机制,例如,这样,只有在一个写失败N多次的持久性失败是向用户发出信号。换句话说,一旦日志返回一个失败,它是由akka坚持认为是致命的,造成失败的执着的actor将停止。
检查日记实现的文档,您正在使用该技术的详细信息,如果/它是如何使用这种技术。
Safely shutting down persistent actors
当关闭来自外部的 persistentactor时,应该给予特别的照顾。与正常的actor通常是使用特殊poisonpill消息信号的一个 actor,应该停止本身一旦事实上这个消息是由akka自动处理接收到该消息–接受,离开目标的 actor没有办法拒绝停止自己当了毒丸。
1. final class Shutdown {}
2.
3. class MyPersistentActor extend sUntypedPersistentActor {
4. @Override
5. public String persistenceId(){
6. return"some-persistence-id";
7. }
8.
9. @Override
10. public void onReceiveCommand(Object msg)throwsException {
11. if (msginstanceof Shutdown) {
12. context().stop(self());
13. }elseif (msginstanceof String) {
14. System.out.println(msg);
15. persist("handle-" + msg,new Procedure<String>() {
16. @Override
17. publicvoid apply(String param)throwsException {
18. System.out.println(param);
19. }
20. });
21. }else unhandled(msg);
22. }
23.
24. @Override
25. public void onReceiveRecover(Object msg)throwsException {
26. // handle recovery...
27. }
28.}
1. // UN-SAFE, dueto PersistentActor's command stashing:
2. persistentActor.tell("a",ActorRef.noSender());
3. persistentActor.tell("b",ActorRef.noSender());
4. persistentActor.tell(PoisonPill.getInstance(),ActorRef.noSender());
5. // order ofreceived messages:
6. // a
7. // # b arrives at mailbox, stashing; internal-stash = [b]
8. // # PoisonPill arrives at mailbox, stashing;internal-stash = [b, Shutdown]
9. // PoisonPill isan AutoReceivedMessage, is handled automatically
10.// !! stop !!
11.// Actor is stoppedwithout handling `b` nor the `a` handler!
1. // SAFE:
2. persistentActor.tell("a",ActorRef.noSender());
3. persistentActor.tell("b",ActorRef.noSender());
4. persistentActor.tell(newShutdown(),ActorRef.noSender());
5. // order ofreceived messages:
6. // a
7. // # b arrives at mailbox, stashing; internal-stash = [b]
8. // # Shutdown arrives at mailbox, stashing;internal-stash = [b, Shutdown]
9. // handle-a
10.// # unstashing; internal-stash =[Shutdown]
11.// b
12.// handle-b
13.// # unstashing; internal-stash = []
14.// Shutdown
15.// -- stop --
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。