赞
踩
转载:https://blog.csdn.net/liubenlong007/article/details/54584303
本文内容来自于官网文档示例: AKKA Persistence
Akka持久化可以使有状态的actor能够保持其内部状态,以便在启动、JVM崩溃后重新启动、或在集群中迁移时,恢复它们的内部状态。 Akka持久性关键点在于,只有对actor内部状态的更改才会被持久化,而不会直接保持其当前状态(可选快照除外)。 这些更改只会追加到存储,没有任何修改,这允许非常高的事务速率和高效的复制。 通过加载持久化的数据Stateful actors
可以重建内部状态。 这可以是所有修改的完整历史记录,也可以从一个快照开始,这可以显着减少恢复时间。 Akka持久性还提供至少一次消息传递语义的点对点通信。【翻译与官方文档】
接下来将官方文档的示例运行一遍
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence_2.11</artifactId>
<version>2.4.16</version>
</dependency>
Akka持久性扩展依赖一些内置持久性插件,包括基于内存堆的日志,基于本地文件系统的快照存储和基于LevelDB的日志。
基于LevelDB的插件将需要以下附加的依赖声明:
<dependency>
<groupId>org.iq80.leveldb</groupId>
<artifactId>leveldb</artifactId>
<version>0.7</version>
</dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
<version>1.8</version>
</dependency>
不过本文没有使用leveldb。
这里需要配置持久化使用到的配置信息journal
reference.conf
:
akka {
loglevel = "INFO"
}
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
# Absolute path to the default snapshot store plugin configuration entry.
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
package akka.serializable;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.persistence.SnapshotOffer;
import akka.persistence.UntypedPersistentActor;
import com.alibaba.fastjson.JSON;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.UUID;
import static java.util.Arrays.asList;
class Cmd implements Serializable {
private static final long serialVersionUID = 1L;
private final String data;
public Cmd(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
class Evt implements Serializable {
private static final long serialVersionUID = 1L;
private final String data;
private final String uuid;
public Evt(String data, String uuid) {
this.data = data;
this.uuid = uuid;
}
public String getUuid() {
return uuid;
}
public String getData() {
return data;
}
}
class ExampleState implements Serializable {
private static final long serialVersionUID = 1L;
private final ArrayList<String> events;
public ExampleState() {
this(new ArrayList<String>());
}
public ExampleState(ArrayList<String> events) {
this.events = events;
}
public ExampleState copy() {
return new ExampleState(new ArrayList<String>(events));
}
public void update(Evt evt) {
events.add(evt.getData());
}
public int size() {
return events.size();
}
@Override
public String toString() {
return events.toString();
}
}
class ExamplePersistentActor extends UntypedPersistentActor {
LoggingAdapter log = Logging.getLogger(getContext().system (), this );
@Override
public String persistenceId() { return "sample-id-1"; }
private ExampleState state = new ExampleState();
public int getNumEvents() {
return state.size();
}
/**
* Called on restart. Loads from Snapshot first, and then replays Journal Events to update state.
* @param msg
*/
@Override
public void onReceiveRecover(Object msg) {
log.info("onReceiveRecover: " + JSON.toJSONString(msg));
if (msg instanceof Evt) {
log.info("onReceiveRecover -- msg instanceof Event");
log.info("event --- " + ((Evt) msg).getData());
state.update((Evt) msg);
} else if (msg instanceof SnapshotOffer) {
log.info("onReceiveRecover -- msg instanceof SnapshotOffer");
state = (ExampleState)((SnapshotOffer)msg).snapshot();
} else {
unhandled(msg);
}
}
/**
* Called on Command dispatch
* @param msg
*/
@Override
public void onReceiveCommand(Object msg) {
log.info("onReceiveCommand: " + JSON.toJSONString(msg));
if (msg instanceof Cmd) {
final String data = ((Cmd)msg).getData();
// generate an event we will persist after being enriched with a uuid
final Evt evt1 = new Evt(data + "-" + getNumEvents(), UUID.randomUUID().toString());
final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1), UUID.randomUUID().toString());
// persist event and THEN update the state of the processor
persistAll(asList(evt1, evt2), evt -> {
state.update(evt);
if (evt.equals(evt2)) {
// broadcast event on eventstream 发布该事件
getContext().system().eventStream().publish(evt);
}
});
} else if (msg.equals("snap")) {
// IMPORTANT: create a copy of snapshot
// because ExampleState is mutable !!!
saveSnapshot(state.copy());
} else if (msg.equals("print")) {
System.out.println("state: " + state);
} else {
unhandled(msg);
}
}
}
class EventHandler extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@Override
public void onReceive(Object msg ) throws Exception {
log.info( "Handled Event: " + JSON.toJSONString(msg));
}
}
main方法
package akka.serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
public class MainTest {
public static final Logger log = LoggerFactory.getLogger(System.class);
public static void main(String... args) throws Exception {
final ActorSystem actorSystem = ActorSystem.create("actor-server");
final ActorRef handler = actorSystem.actorOf(Props.create(EventHandler. class));
// 订阅
actorSystem.eventStream().subscribe(handler , Evt.class);
Thread.sleep(5000);
final ActorRef actorRef = actorSystem.actorOf(Props.create(ExamplePersistentActor. class), "eventsourcing-processor" );
actorRef.tell( new Cmd("CMD 1" ), null);
actorRef.tell( new Cmd("CMD 2" ), null);
actorRef.tell( new Cmd("CMD 3" ), null);
actorRef.tell( "snap", null );//发送保存快照命令
actorRef.tell( new Cmd("CMD 4" ), null);
actorRef.tell( new Cmd("CMD 5" ), null);
actorRef.tell( "print", null );
Thread.sleep(5000);
log.info( "Actor System Shutdown Starting..." );
actorSystem.shutdown();
}
}
运行结果:
[INFO] [01/17/2017 16:16:02.526] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveRecover: {}
[INFO] [01/17/2017 16:16:02.526] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveRecover -- msg instanceof SnapshotOffer
[INFO] [01/17/2017 16:16:02.560] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveRecover: {"instance":{"$ref":"@"}}
[INFO] [01/17/2017 16:16:02.565] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 1"}
[INFO] [01/17/2017 16:16:02.591] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 2"}
[INFO] [01/17/2017 16:16:02.592] [actor-server-akka.actor.default-dispatcher-6] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 3"}
[INFO] [01/17/2017 16:16:02.593] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 1-37","uuid":"fe0fca08-a415-425d-b618-c730b117ca7d"}
[INFO] [01/17/2017 16:16:02.594] [actor-server-akka.actor.default-dispatcher-6] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: "snap"
[INFO] [01/17/2017 16:16:02.595] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 2-39","uuid":"76fbf9e8-befd-40f4-a4fe-49b5698c7c7d"}
[INFO] [01/17/2017 16:16:02.595] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 3-41","uuid":"3b9f70c2-83c5-47d3-9f0a-4710905c3cfe"}
[INFO] [01/17/2017 16:16:02.595] [actor-server-akka.actor.default-dispatcher-6] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 4"}
[INFO] [01/17/2017 16:16:02.596] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 4-43","uuid":"f8bb1e7e-716e-4f3c-9d81-e4570e0501c0"}
[INFO] [01/17/2017 16:16:02.596] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 5"}
[INFO] [01/17/2017 16:16:02.597] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 5-45","uuid":"574f9cdf-59f3-421c-8337-df8a50c95ee6"}
state: [CMD 1-0, CMD 1-1, CMD 2-2, CMD 2-3, CMD 3-4, CMD 3-5, CMD 1-6, CMD 1-7, CMD 2-8, CMD 2-9, CMD 3-10, CMD 3-11, CMD 1-12, CMD 1-13, CMD 2-14, CMD 2-15, CMD 3-16, CMD 3-17, CMD 1-18, CMD 1-19, CMD 2-20, CMD 2-21, CMD 3-22, CMD 3-23, CMD 1-24, CMD 1-25, CMD 2-26, CMD 2-27, CMD 3-28, CMD 3-29, CMD 1-30, CMD 1-31, CMD 2-32, CMD 2-33, CMD 3-34, CMD 3-35, CMD 1-36, CMD 1-37, CMD 2-38, CMD 2-39, CMD 3-40, CMD 3-41, CMD 4-42, CMD 4-43, CMD 5-44, CMD 5-45]
[INFO] [01/17/2017 16:16:02.598] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: "print"
[INFO] [01/17/2017 16:16:02.613] [actor-server-akka.actor.default-dispatcher-15] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {}
2017-01-17 16:16:06.835 [main ] INFO System - Actor System Shutdown Starting...
Process finished with exit code 0
这里注意:如果再次执行该程序,系统会首先恢复上次处理器的状态。本程序中会首先恢复快照中的状态然后是其他的状态。
快照使得处理器的状态持久化更加高效。
假如现在运行结果是酱紫的:
state: [
CMD 1-0, CMD 1-1,
CMD 2-2, CMD 2-3,
CMD 3-4, CMD 3-5,
CMD 1-6, CMD 1-7,
CMD 2-8, CMD 2-9,
CMD 3-10, CMD 3-11,
CMD 4-12, CMD 4-13,
CMD 5-14, CMD 5-15]
在运行一次,结果就是酱紫的:
state: [
CMD 1-0, CMD 1-1,
CMD 2-2, CMD 2-3,
CMD 3-4, CMD 3-5,
CMD 1-6, CMD 1-7,
CMD 2-8, CMD 2-9,
CMD 3-10, CMD 3-11,
CMD 1-12, CMD 1-13,
CMD 2-14, CMD 2-15,
CMD 3-16, CMD 3-17,
CMD 4-18, CMD 4-19,
CMD 5-20, CMD 5-21]
由于我们采用的是默认的持久化策略,会持久化到本地磁盘,进入项目统计目录下,会看到文件夹snapshots
:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。