当前位置:   article > 正文

使用Akka持久化——持久化与快照_system.eventstream().publish

system.eventstream().publish

前言

对于Java web而言,一个用户的HTTP请求最终会转换为一条Java线程处理。HTTP本身是无状态的,具体的请求逻辑一般也是无状态的。如果进程奔溃或者系统宕机,用户会发觉当前网页不可用之类的错误。虽然会影响一些用户体验,但是只要服务重启了,用户依然可以完成他的请求并满足其需要。但是有些情况下则势必会造成混乱甚至恐慌,例如跨行转账。用户从自己A银行的账户转账1万元至自己在B银行的账户,如果转出的动作成功了,但是转入却失败了,用户的心情是可想而知的,自己的财产不翼而飞了!一种解决的方式是引入事务,在此场景下还必须是分布式事务。如果只是银行内部实现分布式事务多少还是可行的,但是不同银行之间要实现的成本是可想而知的,甚至不可行的。如果A银行转出时对用户的状态作持久化,B银行对收到的转入请求也进行持久化,那么恢复用户的损失才有可能。

以上啰里啰嗦说了这么多,无非就是抛出个引子,进而介绍Akka提供的持久化功能。

Akka的持久化架构

  • UntypedPersistentActor: 是一个持久化、有状态的Actor。它能够将将事件持久化到日志并且以线程安全的方式重新执行这些事件。它能够被用于实现命令和事件源的Actor。当一个集成了UntypedPersistentActor的Actor启动或者重启时,日志化的消息被重新发送给此Actor,以便于通过这些消息回复内部状态。
  • UntypedPersistentView: 视图是一个持久化、有状态的Actor,它接收由另一个持久化Actor写入日志的消息。视图本身不会日志化新的消息,它仅仅从一个持久化Actor的复制消息流来更新内部状态。
  • UntypedPersistentActorAtLeastOnceDelivery: 用于实现向目标至少发送一次消息的语义,也适用于发送者和接收者的JVM进程奔溃。
  • AsyncWriteJournal: 异步存储发送给持久化Actor的消息序列的日志。应用程序能够通过持久化Actor控制哪个消息被日志化,哪个消息不用被日志化。日志为每条消息维护一个不断增加的序列号。日志存储的底层实现是可插拔的。Akka的持久化扩展自带一个叫做"leveldb",向本地文件系统写入的日志插件。Akka社区里还有更多日志存储插件提供。
  • Snapshot store: 快照存储对持久化Actor或持久化视图的内部状态的快照进行持久化。快照用于优化回复的时间。快照存储的底层是可插拔的。Akka持久化扩展自带一个向本地文件系统写入的“本地”快照存储插件。Akka社区里还有更多快照存储插件提供。
本文基于Akka官网提供的持久化例子,并对其进行一些适应性改造,将着重介绍UntypedPersistentActor、AsyncWriteJournal及Snapshot store的应用。

配置

有关Akka的日志持久化和快照持久化的配置如下:

  1. persistence {  
  2.   journal {  
  3.     plugin = "akka.persistence.journal.leveldb"  
  4.     leveldb.dir = "target/example/journal"  
  5.     leveldb.native = false  
  6.   }  
  7.   snapshot-store {  
  8.     plugin = "akka.persistence.snapshot-store.local"  
  9.     local.dir = "target/example/snapshots"  
  10.   }  
  11. }  
  persistence {
    journal {
      plugin = "akka.persistence.journal.leveldb"
      leveldb.dir = "target/example/journal"
      leveldb.native = false
    }
    snapshot-store {
      plugin = "akka.persistence.snapshot-store.local"
      local.dir = "target/example/snapshots"
    }
  }
根据配置,我们知道日志插件使用了leveldb,leveldb的存储目录为当前项目编译路径下的example/journal路径下。快照插件使用了local,存储路径与前者相同。

持久化Actor的例子

消息与状态

本例子中需要用到Cmd和Evt两种消息,Cmd代表命令,Evt代表事件。ExampleState代表我们例子中的状态。以上三个类的定义如下:

  1. public interface Persistence {  
  2.   
  3.     public static class Cmd implements Serializable {  
  4.         private static final long serialVersionUID = 1L;  
  5.         private final String data;  
  6.   
  7.         public Cmd(String data) {  
  8.             this.data = data;  
  9.         }  
  10.   
  11.         public String getData() {  
  12.             return data;  
  13.         }  
  14.     }  
  15.   
  16.     public static class Evt implements Serializable {  
  17.         private static final long serialVersionUID = 1L;  
  18.         private final String data;  
  19.   
  20.         public Evt(String data) {  
  21.             this.data = data;  
  22.         }  
  23.   
  24.         public String getData() {  
  25.             return data;  
  26.         }  
  27.     }  
  28.   
  29.     public static class ExampleState implements Serializable {  
  30.         private static final long serialVersionUID = 1L;  
  31.         private final ArrayList<String> events;  
  32.   
  33.         public ExampleState() {  
  34.             this(new ArrayList<String>());  
  35.         }  
  36.   
  37.         public ExampleState(ArrayList<String> events) {  
  38.             this.events = events;  
  39.         }  
  40.   
  41.         public ExampleState copy() {  
  42.             return new ExampleState(new ArrayList<String>(events));  
  43.         }  
  44.   
  45.         public void update(Evt evt) {  
  46.             events.add(evt.getData());  
  47.         }  
  48.   
  49.         public int size() {  
  50.             return events.size();  
  51.         }  
  52.   
  53.         @Override  
  54.         public String toString() {  
  55.             return events.toString();  
  56.         }  
  57.     }  
  58. }  
  1. public interface Persistence {
  2. public static class Cmd implements Serializable {
  3. private static final long serialVersionUID = 1L;
  4. private final String data;
  5. public Cmd(String data) {
  6. this.data = data;
  7. }
  8. public String getData() {
  9. return data;
  10. }
  11. }
  12. public static class Evt implements Serializable {
  13. private static final long serialVersionUID = 1L;
  14. private final String data;
  15. public Evt(String data) {
  16. this.data = data;
  17. }
  18. public String getData() {
  19. return data;
  20. }
  21. }
  22. public static class ExampleState implements Serializable {
  23. private static final long serialVersionUID = 1L;
  24. private final ArrayList<String> events;
  25. public ExampleState() {
  26. this(new ArrayList<String>());
  27. }
  28. public ExampleState(ArrayList<String> events) {
  29. this.events = events;
  30. }
  31. public ExampleState copy() {
  32. return new ExampleState(new ArrayList<String>(events));
  33. }
  34. public void update(Evt evt) {
  35. events.add(evt.getData());
  36. }
  37. public int size() {
  38. return events.size();
  39. }
  40. @Override
  41. public String toString() {
  42. return events.toString();
  43. }
  44. }
  45. }
上面代码展示的Cmd和Evt都很简单,它们有一样的data字段作为内容。ExampleState中维护了一个列表,次列表用于缓存所有的事件内容。

持久化Actor的实现

在具体介绍本例中持久化Actor之前,先看看其实现,其代码清单如下:

  1. @Named("ExamplePersistentActor")  
  2. @Scope("prototype")  
  3. public class ExamplePersistentActor extends UntypedPersistentActor {  
  4.     LoggingAdapter log = Logging.getLogger(getContext().system(), this);  
  5.     @Override  
  6.     public String persistenceId() {  
  7.         return "sample-id-1";  
  8.     }  
  9.   
  10.     private ExampleState state = new ExampleState();  
  11.   
  12.     public int getNumEvents() {  
  13.         return state.size();  
  14.     }  
  15.   
  16.     @Override  
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/179753
推荐阅读
相关标签
  

闽ICP备14008679号