赞
踩
本文是EventSourced库使用指南的中文译稿,基于EventSourced 0.6.0。
Eventsourced 库为Actor 的持久化提供了可缩放的解决方案,同时为基于 Akka 的消息传递提供了“ 至少一次 ”的传输保证。使用 Eventsourced Actor 可以做到:
通过追加一条记录到日志来对消息进行持久化
应用收到消息并进行处理从而产生其当前的状态
通常当前状态的状态保存在内存中(内存镜像)
通过重播接收到的消息(在应用程序正常启动或崩溃后)恢复当前(或历史)的状态
从不直接保存当前的状态(除了主动保状态快照,有快照的话恢复时间更短)
换句话说,Eventsourced 实现了预写日志(write-ahead log ,WAL)用于跟踪一个Actor 所接收消息,并通过回放记录的消息来恢复其状态。附加信息记录到日志,而不是直接持久化Actor状态,可以达到非常高的事务处理速度,并支持高效的复制。相对于其他 WAL-based 系统, Eventsourced 通常在日志中保存整个消息历史记录,也可以选用状态快照。
每个记录的消息代表一个改变一个Actor 状态的意图。记录变更,而不是更新当前状态的是event sourcing的核心理念之一 。Eventsourced 库可以用来实现 event sourcing 的概念,但它不局限于此。关于Eventsourced 库和 event sourcing 理论的关系可以读这篇文章,如何使用Eventsourced持久化Actor的状态,以及它与 event sourcing 理论的关系。
Eventsourced 也可以用来让Actor相互进行可靠之间的消息交换,以便他们在崩溃后可以恢复。为此目的,引入"信道 (Channel)" ,它提供了最少一次的消息传输保证。在日志消息回放时, 信道也用来防止持久化Actor的输出消息。因为正常工作时,这些消息已经发送给了相关的其他对象或服务,回放时再发送就是多余的了。
由Eventsourced提供的核心构建模块是处理器(processors),信道(channel)和日志(journal)。它们由一个 Akka 的扩展 EventsourcingExtension 进行管理。
处理器是一个有状态的Actor,它将收到的消息记录在日志里(持久化)。在构造时给一个有状态的Actor混入一个Eventsourced 特质(trait),它就变成了一个处理器。当然,处理器本身就是Actor,仍然可以像使用Actor一样使用其原本的功能。
包装在Message对象中的消息会被处理器作为日志记录下来,没有包装的消息不回被记录。在本指南中被包装的消息通常称为事件(events) 。包装的消息也可以是命令(Commands) ,见下文 Application。
日志记录通过Eventsourced特质来实现,处理器的receive方法并不需要关心这一点。通过向发送者发送一个应答来确认消息成功收到。处理器也可以在动态改变行为 (Actor可以用become/unbecome变更其对收到消息的处理逻辑。--译注) 时仍保持其日志记录功能。
处理器向 EventsourcingExtension 注册。这个 Akka 扩展提供了一些方法通过重放日志记录的消息来恢复处理器状态。在应用程序运行过程中的任何时候处理器都可以进行注册和恢复动作。
Eventsourced特质不会对处理器如何维护自己的状态强加任何限制。处理器记录状态可以使用 var ,可变数据结构或STM引用(软件事务内存)等等。
处理器使用信道是将消息发送到其他 Actor(信道目的地),并从他们那里收到应答。信道具有以下特征:
要求目的地(消息接收者)确认收到的消息,信道基于此来提供“最少一次”的传递保证(显式 ACK 重试协议)。确认应答会写入日志。
处理器恢复时(通过消息重播)防止将消息重复传递给目的地。信道会将重播的消息和它之前收到的应答进行匹配, 匹配上的消息会被丢弃(说明已经被目的地收到,不必再发--译注)
信道本身也是一个Actor,它与一个目的地向关联,提供上述功能。处理器通常将信道创建为子Actor,同时给它一个指向目的地的 actor 引用。
处理器也可以不使用的信道,直接发送消息到另一个Actor。在这种情况下,在处理器恢复时目的地 Actor 会收到重复的消息。
Eventsourced提供了三种不同的信道类型(计划还有更多):
默认信道
不存储接收到的消息。
仅在发送处理器恢复时重新发送未确认的消息。
在发送失败的情况下,不保证消息的发送顺序。
可靠信道
存储接收到的消息
基于可配置重发策略重发未确认消息。
保证消息发送的顺序,即使中间是失败情况。
经常被用来对付不可靠的远程目标。
可以从 JVM 崩溃中恢复
可靠的“请求-应答”信道
除了具有上述可靠信道的能力外,但还保证应答“最少一次”传递。
不保证应答顺序和和消息的发送顺序一致。
Eventsourced信道并不意味着替换任何现有的消息系统,相反它可以利用现有的消息系统。例如,如果需要的好,可以可靠地将处理器连接到已有的系统中。更一般地讲,如果要把处理器与其他其它服务集成在一起,信道就非常有用,更多描述见 这篇文章 。
日志也是一个Actor,处理器和信道使用它来记录log.日志提供的服务质量(可用性,可伸缩性,...)取决于所用的存储技术。 下文多种日志一节会给出了现有的日志实现及其开发状况的概述。
对于Message所包装的实际数据,Eventsourced库并没有在数据结构和语义上强加任何限制。因此,持久性消息可以是事件以及命令。两者都可以看做应用程序与其环境交互的方式。这在Eventsourced 的 reference application中有演示,该示例持久化事件以及命令。对应需要长期运行,具有长周期业务处理(有时也被称为sagas )来说,这也简化了其实现。例如,可以把这些处理逻辑做成一些处理器,他们通过向外发出命令来对事件做出响应,命令的接收者可以是其他的处理器或外部服务。 Eventsourced 与实现CQRS模式,并与遵循领域驱动设计(DDD)(见参考应用程序 )的应用配合的非常好。在另一方面,它也不强制应用程序遵循CQRS和/或DDD,没有CQRS和/或DDD应用程序一样可以实现 event sourcing 机制。
对于消息的持久化,Eventsourced目前提供了以下日志机制的实现:
Table 1.
日志机制 | 开发状态 |
LevelDB 日志 。 它可以配置为使用本地 LevelDB (通过访问leveldbjni )或LevelDB 的 Java 接口 作为存储后端。从运行 SBT 本地 LevelDB 需要特殊设置 。本用户指南中的所有例子都使用了 LevelDB 的 Java 接口。 | 生产 |
HBase 的日志 。一个HBase 的 为后端存储的日志,具有高可用性,横向的读 / 写的缩放新,并发和非阻塞读取和写入。 详情在这里 。 | 试验 |
基于 MongoDB Cabash 的日志 。一个后端存储为MongoDB 的 备份日志。 详情在这里 。感谢Duncan DeVore 。 | 试验 |
基于 MongoDB Reactive 的日志 。 一个MongoDB 的 备份日志。详情在这里 。感谢DuncaDuncan DeVore。 | 试验 |
DynamoDB 日志 。一个DynamoDB 为后端存储的日志。 详情在这里 。感谢Scott Clasen 。 | 试验 |
基于 Journal.IO 的日志 。 仅用于测试。 消息持久化。 | 测试 |
内存日志 。 内存中的日志用于测试目的。消息不持久。 | 测试 |
本节将示范创建,使用和回收的event-sourced 的Actor 所需的最少步骤,并演示了信道的使用。从本节代码包含在FirstSteps.scala和FirstSteps.java 。它可以从 SBT 提示与执行。
Scala:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.FirstSteps
Java:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.japi.FirstSteps
EventsourcingExtension是由 Eventsourced 库提供的 Akka 扩展。应用程序用它来
创建和注册 event-sourced Actor(称为处理器或事件处理器)
创建和注册信道
从日志事件消息恢复注册的处理器和信道
Scala:
import java.io.File import akka.actor._ import org.eligosource.eventsourced.core._ import org.eligosource.eventsourced.journal.leveldb._ val system: ActorSystem = ActorSystem("example") val journal: ActorRef = LeveldbJournalProps( new File("target/example-1"), native = false).createJournal val extension: EventsourcingExtension = EventsourcingExtension(system, journal)
Java:
import java.io.File; import akka.actor.*; import org.eligosource.eventsourced.core.*; import org.eligosource.eventsourced.journal.leveldb.*; final ActorSystem system = ActorSystem.create("guide"); final ActorRef journal = LeveldbJournalProps.create( new File("target/guide-1-java")).withNative(false).createJournal(system); final EventsourcingExtension extension = EventsourcingExtension.create(system, journal);
此示例使用一个LevelDB 日志 ,但其他任何日志实现也可用。
使用Scala 的API ,event-sourced 的参与者可以定义为标准的 Actor 。使用 Java 的 API ,event-sourced的Actor 需要扩展抽象UntypedEventsourcedActor类。 举个例子,
Scala:
class Processor extends Actor { var counter = 0 def receive = { case msg: Message => { counter = counter + 1 println("[processor] event = %s (%d)" format (msg.event, counter)) } } }
Java:
public class Processor extends UntypedEventsourcedActor { private int counter = 0; @Override public int id() { return 1; } @Override public void onReceive(Object message) throws Exception { if (message instanceof Message) { Message msg = (Message)message; counter = counter + 1; System.out.println(String.format("[processor] event = %s (%d)", msg.event(), counter)); } } }
这个Actor计算其收到的消息 (Message ) 数量。在 Eventsourced 应用,事件总是通过Message对象进行传送。
为了让Scala Processor成为一个event-sourced Actor ,它必须在实例化时混入Eventsourced 特质。Java 的 Processor已经扩展UntypedEventsourcedActor 类,所以没有进一步的修改是必要的 。
Scala:
// create and register event-sourced processor val processor: ActorRef = extension.processorOf(Props(new Processor with Eventsourced { val id = 1 } )) // recover registered processors by replaying journaled events extension.recover()
Java:
// create and register event-sourced processor final ActorRef processor = extension.processorOf(Props.create(Processor.class), system); // recover registered processors by replaying journaled events extension.recover();
一个Actor混入Eventsourced特质(或继承 UntypedEventsourcedActor )后,当它的receive方法(或onReceive方法)被调用之前,事件Message已经被计入日志。processorOf方法使用一个唯一id将Actor注册成为处理器 。该处理器id定义实现Eventsourced.id这抽象成员。id必须为正整数,在应用运行期间始终会用它来标志这个处理器。recover方法通过回放应用运行期间processor收到的所有消息来恢复其状态 。
作为Actor ,event-sourced处理器可以像任何其他的Actor一样使用。Message类型的消息会被被写入日志, 处理器直接收取其他任何类型的消息不会被记入日志 。
Scala:
//发送Meaage类型事件给处理器(将会记入日志) processor ! Message("foo")
Java:
//发送事件消息处理器(将会记入日志) processor.tell(Message.create("foo"), null);
应用程序的首次运行会创建一个空的日志。因此,没有事件消息会被重播, processor 输出 :
[processor] event = foo (1)
到stdout。当应用程序重新启动,但是,processor的状态将被重播先前日志化事件消息恢复。然后,应用程序发送另一个事件消息。因此,你会在标准输出看到:
[processor] event = foo (1) [processor] event = foo (2)
其中第一个println 是由一个重播事件消息触发。
在该步骤中,我们扩展 event-sourced 处理器 ,让他发送一个 Message 给 destination 。处理器创建另一个新的 Message (通过创建收到事件的一个副本) ,更新其 event 字段,再将新创建的消息发送给目标 。
Scala:
class Processor(destination: ActorRef) extends Actor { var counter = 0; def receive = { case msg: Message => { counter = counter + 1 // ... destination ! msg.copy(event = "processed %d event messages so far" format counter) } } } val destination: ActorRef = system.actorOf(Props[Destination]) //通过将目标作为构造函数的参数实例化处理器 val processor: ActorRef = extension.processorOf(Props(new Processor(destination) with Eventsourced { val id = 1 } )) extension.recover()
Java:
public class Processor extends UntypedEventsourcedActor { private ActorRef destination; private int counter = 0; public Processor(ActorRef destination) { this.destination = destination; } @Override public int id() { return 1; } @Override public void onReceive(Object message) throws Exception { if (message instanceof Message) { Message msg = (Message)message; counter = counter + 1; // ... destination.tell(msg.withEvent(String.format("processed %d event messages so far", counter)), getSelf()); } } } final ActorRef destination = system.actorOf(Props.create(Destination.class)); //通过将目标作为构造函数的参数实例化处理器 final ActorRef processor = extension.processorOf(Props.create(Processor.class, destination), system); extension.recover();
如果不采取进一步的行动,处理器在恢复过程中也会将Message再次发送给destination。每一次应用程序重新启动, destination将一次又一次的重复收到了整个事件消息历史记录。在大多数情况下,这是不能接受的。比如destination 是一个外部服务 。
为了防止重复消息传递到destination ,我们需要记住哪些消息已成功发送。这也正是信道 的用武之地 。信道会丢弃所有已经成功发送到目的地的消息。因此,我们将destination包装在一个信道内,让处理器通过该信道与目标进行通信。不必修改Processor的代码就能完成这些处理。
Scala:
val destination: ActorRef = system.actorOf(Props[Destination]) //将目的地包装在信道内 val channel: ActorRef = extension.channelOf(DefaultChannelProps(1, destination)) //处理器初始化时将信道(已经内置了目的地)作为构造参数传进去。 val processor: ActorRef = extension.processorOf(Props(new Processor(channel) with Eventsourced { val id = 1 } ))
Java:
final ActorRef destination = system.actorOf(Props.create(Destination.class)); //将目的地包装在信道内 final ActorRef channel = extension.channelOf(DefaultChannelProps.create(1, destination), system); //处理器初始化时将信道(已经内置了目的地)作为构造参数传进去。 final ActorRef processor = extension.processorOf(Props.create(Processor.class, channel), system);
一个信道必须有一个唯一的id(在这个例子中是1),一个正整数,必须在应用程序运行期间始终定义一致。在这里,我们创建了一个默认的信道配置了一个DefaultChannelProps配置对象。如果应用程序需要可靠的事件信息传递到目的地,他们应该使用可靠的信道并配置了一个ReliableChannelProps配置对象 。
如下定义的 Destination Actor
Scala:
class Destination extends Actor { def receive = { case msg: Message => { println("[destination] event = '%s'" format msg.event) //确认收到来自信道事件消息 msg.confirm() } } }
Java:
public class Destination extends UntypedActor { @Override public void onReceive(Object message) throws Exception { if (message instanceof Message) { Message msg = (Message)message; System.out.println(String.format("[destination] event = %s", msg.event())); msg.confirm(true); } } }
那我们再从空日志开始,在第一个应用运行的标准输出中,你应该会看 到
[processor] event = foo (1) [destination] event = 'processed 1 event messages so far'
再次运行该应用程序,你会看到该 event-sourced 的 processor 接收到完整的事件消息的历史,但 destination 只收到处理器发出的最后一个事件消息 ( 这个消息对应于本次程序运行时发送给 processor 的那个消息) :
[processor] event = foo (1) [processor] event = foo (2) [destination] event = 'processed 2 event messages so far'
消息接收方得到来自信道的消息时,必须调用 Message.confirm() 来回复一个确认收到的应答消息,这个应答消息也会异步写入日志。 随后,您会看到如何通过添加Confirm 特征,让消息接收者具有消息确认的功能 。
此快速入门指南是对 Eventsourced 库一个非常简单的介绍。 更高级的库功能都包含在下面的章节 。
Eventsourced特质已经在快速入门中有所讨论。它可与可与特质Receiver、Emitter和/或Confirm堆叠组合,但Eventsourced特质必须始终是堆叠的最后一个,即:
Scala:
new MyActor with Receiver with Confirm with Eventsourced
Java:
public class MyActor extends UntypedEventsourcedConfirmingReceiver
该 Eventsourced 的 Java API 提供的可堆叠的特征作为抽象基类的一些预定义的组合。例如,UntypedEventsourcedConfirmingReceiver 被定义为
abstract class UntypedEventsourcedReceiver extends UntypedActor with Receiver with Confirm with Eventsourced
在Java API中其他可堆叠的特质组合在下面的章节中描述。关于所有预定义的组合请参考Untyped*抽象类的API 文档 。
一个Actor收到Message事件,通常希望能通过模式匹配直接得到内置的事件内容,而不是需要处理整个事件消息(Message)。要达到这个效果可以在 Actor 初始化时混入Receiver特质( Scala API )或继承抽象类UntypedReceiver( Java API )。
Scala:
class MyActor extends Actor { def receive = { case event => println("received event %s" format event) } } val myActor = system.actorOf(Props(new MyActor with Receiver)) myActor ! Message("foo")
Java:
public class MyActor extends UntypedReceiver { @Override public void onReceive(Object event) throws Exception { System.out.println(String.format("received event = %s", event)); } } final ActorRef myActor = system.actorOf(Props.create(MyActor.class)); myActor.tell(Message.create("foo"), null);
在上面的例子中,给 myActor 发送的 Message("foo") 将被输出到标准输出:
received event foo
Receiver特质将接收到的事件Message作为当前(current)消息存储在一个内部字段中。提取Message包含的event 内容并以该event为参数调用MyActor的receive(或onReceive )的方法。如果MyActor想获得当前事 Message,它必须定义一个Receiver自身类型,并调用message的方法(Scala API)或调用message() 方法(Java API)。
Scala:
class MyActor extends Actor { this: Receiver => def receive = { case event => { //获得当前事件消息 val currentMessage = message // ... println("received event %s" format event) } } }
Java:
public class MyActor extends UntypedReceiver { @Override public void onReceive(Object event) throws Exception { //获得当前事件消息 Message currentMessage = message(); // ... System.out.println(String.format("received event = %s", event)); } }
Receiver的特征还可以与Eventsourced和/或Confirm特质堆叠组合,但Receiver必须是组合中的第一个。例如:
Scala:
new MyActor with Receiver with Eventsourced
Java:
public class MyActor extends UntypedEventsourcedReceiver
请参考API 文档以了解详情。
Receiver特质简化了接收端,让接收Actor使用模式匹配直接活动event内容而不是包装event的Message。同理,我们引入Emitter特质来对发送端进行相应的简化。 它使Actor可以直接向信道发送event,而不必手工进行Message 的包装动作。emitter 也也可以按名称(或 ID ,见下文)查找信道。
Scala:
class MyActor extends Actor { this: Emitter => def receive = { case event => { //向信道“MyChannel”发出事件 emitter("myChannel") sendEvent ("received: %s" format event) } } } //使用名字“MyChannel”创建并注册信道 extension.channelOf(DefaultChannelProps(1, destination).withName("myChannel")) val myActor = system.actorOf(Props(new MyActor with Emitter))
Java:
public class MyActor extends UntypedEmitter { @Override public void onReceive(Object event) throws Exception { //向信道“MyChannel”发出事件 emitter("myChannel").sendEvent(String.format("received: %s", event), getSelf()); } } //使用名字“MyChannel”创建并注册信道 extension.channelOf(DefaultChannelProps.create(1, destination).withName("myChannel"), system); final ActorRef myActor = extension.processorOf(Props.create(MyActor.class), system);
MyActor发出的事件经Emitter包装成Message发送到信道,Message中包含的事件源于MyActor发出的事件(一个 Emitter 也是 Receiver ,并保持当前事件消息,另见Receiver)。以信道名称作为参数调用emitter方法会创建一个MessageEmitter对象,用于捕捉指定的信道和当前事件消息。调用该对象的 sendEvent 会使该对象使用特定的事件参数修改捕获的事件消息,并将新的事件消息发送给信道(参见信道的使用提示 。一个 MessageEmitter对象也可以被传递给另一个Actor(或线程)使用,MessageEmitter对象是线程安全的。 MessageEmitter对象也可以通过 id 引用信道,另外,为信道指定一个名称并不是必须的:
Scala:
class MyActor extends Actor { this: Emitter => def receive = { case event => { //发出事件信道id为1 emitter(1) sendEvent ("received: %s" format event) } } } //创建并注册信道 extension.channelOf(DefaultChannelProps(1, destination))
Java:
public class MyActor extends UntypedEmitter { @Override public void onReceive(Object event) throws Exception { //发出事件信道id为1 emitter(1).sendEvent(String.format("received: %s", event), getSelf()); } } //创建并注册信道 extension.channelOf(DefaultChannelProps.create(1, destination), system);
Emitter的特质也可以与其他特质Eventsourced和/或Confirm等堆叠组合,但Emitter必须始终是第一个。 例如:
Scala:
new MyActor with Emitter with Eventsourced
Java:
public class MyActor extends UntypedEventsourcedEmitter
请参考API 文档 以了解详情。
对从信道收到消息进行确认应答必须调用Message的confirm() 或confirm(true) 。应用程序也可以做出一个否定的应答,通过调用confirm(false) 。如果是可靠信道,这会导致以重新传送该事件消息。
Actor也可以混入Confirm特质。而不用显式调用的confirm(true) 或confirm(false) 。 如果收到的事件经Actor 的 receive(或 onReceive )方法处理后正常返回, Confirm会自动调用confirm(true) 。如果receive (或 onReceive )抛出异常,Confirm 会自动调用confirm(false) 。
Confirm 特质可以独立使用:
Scala:
new MyActor with Confirm
Java:
public class MyActor extends UntypedConfirmingActor
也可以与其他特质 Receiver 、Emitter和/或Eventsourced堆叠组合。但 Confirm 修改必须在Receiver或Emitter之后, Eventsourced之前。 例如:
Scala:
new MyActor with Receiver with Confirm with Eventsourced
Java:
public class MyActor extends UntypedEventsourcedConfirmingReceiver
请参考API 文档 以了解详情。
这个例子通过堆叠 Receiver、Emitter和Confirm等特质简化了快速入门的例子。使用Scala API :
Processor会混入Emitter特质( 当然 Eventsourced 是必须有的)
Destination混入Receiver和Confirm
对于Java API :
Processor 会继承 UntypedEventsourcedEmitter
Destination 会继承 UntypedConfirmingReceiver
从本节代码包含在StackableTraits.scala 和StackableTraits.java 。它可以从 SBT 提示与执行
Scala:
> project eventsourced-examples > run-main org.eligosource.eventsourced.guide.StackableTraits
Java:
> project eventsourced-examples > run-main org.eligosource.eventsourced.guide.japi.StackableTraits
新Scala版本的Processor定义有一个自身类型 Emitter,其模式匹匹配直接针对事件。在Java版Processor继承 UntypedEventsourcedEmitter,还可以直接接收事件(而不是 Message )。
Scala:
class Processor extends Actor { this: Emitter => var counter = 0 def receive = { case event => { counter = counter + 1 println("[processor] event = %s (%d)" format (event, counter)) emitter("destination") sendEvent ("processed %d events so far" format counter) } } }
Java:
public class Processor extends UntypedEventsourcedEmitter { private int counter = 0; @Override public int id() { return 1; } @Override public void onReceive(Object event) throws Exception { counter = counter + 1; System.out.println(String.format("[processor] event = %s (%d)", event, counter)); emitter("destination").sendEvent( String.format("processed %d event messages so far", counter), getSelf()); } }
现在我们通过名字查找信道,而不是通过构造函数传递的信道。该信道名称信道在信道创建时指定。
Scala:
extension.channelOf(DefaultChannelProps(1, destination).withName("destination"))
Java:
extension.channelOf(DefaultChannelProps.create(1, destination).withName("destination"), system);
Scala 的Processor实例化时必须混入Emitter特质,以符合Processor要求的自身类型定义。Java的处理器没有额外的修改。
Scala:
val processor: ActorRef = extension.processorOf(Props(new Processor with Emitter with Eventsourced { val id = 1 } ))
Java:
final ActorRef processor = extension.processorOf(Props.create(Processor.class), system);
新版Destination定义:
Scala:
class Destination extends Actor { def receive = { case event => { println("[destination] event = '%s'" format event) } } }
Java:
public class Destination extends UntypedConfirmingReceiver { @Override public void onReceive(Object event) throws Exception { System.out.println(String.format("[destination] event = %s", event)); } }
直接对事件内容进行模式匹配,并且消息确认应答交由 Confirm 特质完成。 Scala 版的Destination 实例化时要混入 Receiver和Confirm 特质,Java版Destination仍然不用做修改。
Scala:
val destination: ActorRef = system.actorOf(Props(new Destination with Receiver with Confirm))
Java:
final ActorRef destination = system.actorOf(Props.create(Destination.class));
Eventsourced 库保存所有发送者的引用
Actor之间交换的消息被EventsourcedReceiver、Emitter和/或Confirm等特质修改(对于Java API 是Untyped* 系列基类)。
消息通过 信道 传递到目标Actor
使用event-sourced Actor 应用程序可以和普通Actor应用程序一样使用对发送者的引用。 如果您知道Akka Actor的发送者引用是如何工作的,下面的内容应该比较熟悉。
例如,基于的快速入门的代码开始扩展,Processor可以增加对发送者的消息回复,如下:
Scala:
class Processor(destination: ActorRef) extends Actor { // … def receive = { case msg: Message => { // … //回复给发件人 sender ! ("done processing event = %s" format msg.event) } } }
Java:
public class Processor extends UntypedEventsourcedActor { // … @Override public void onReceive(Object message) throws Exception { if (message instanceof Message) { // … getSender().tell(String.format("done processing event = %s", msg.event()), getSelf()); } } }
主程序现在可以使用ask 方法给 processor 发消息 ,并会得到一个异步响应。
Scala:
processor ? Message("foo") onSuccess { case response => println(response) }
Java:
ask(processor, Message.create("foo"), 5000L).onSuccess(new OnSuccess<Object>() { @Override public void onSuccess(Object response) throws Throwable { System.out.println(response); } }, system.dispatcher());
这里没有什么特别的东东。在这个例子中,处理器中的sender代表的就是?(也就是ask)方法返回的Future 。但消息重放时期间会发生什么情况呢?消息重放时,sender将会是deadLetters因为Eventsourced处理器不在日志中存储 sender 的引用。主要的原因是,应用程序通常不希望在消息重播时发送冗余地回复给sender。
除了直接回复给sender之外,该处理器还可以将sender引用进一步转发给它的destination, 让destination回复sender。即使 sender被封装在一个信道中,这种方法也能正常工作。因为信道只是简单的转发sender引用。出于这个原因,一个ReliableChannel需要存储sender 的引用(相对于处理器),从而使得sender引用即使在可靠的信道被重新启动之后仍然可用。如果存储的sender的是一个远程引用,即使从 JVM 崩溃(例如可靠信道所在的 JVM 崩溃)中恢复,保存的sender依然有效。
Scala:
class Processor(destination: ActorRef) extends Actor { var counter = 0 def receive = { case msg: Message => { // … //转发修改后的事件消息到目的地(sender 引用也一起过去了) destination forward msg.copy( event = "processed %d event messages so far" format counter) } } } class Destination extends Actor { def receive = { case msg: Message => { // … // reply to sender sender ! ("done processing event = %s (%d)" format msg.event) } } } val destination: ActorRef = system.actorOf(Props[Destination]) val channel: ActorRef = extension.channelOf(DefaultChannelProps(1, destination)) val processor: ActorRef = extension.processorOf( Props(new Processor(channel) with Eventsourced { val id = 1 } ))
Java:
public class Processor extends UntypedEventsourcedActor { private ActorRef destination; private int counter = 0; public Processor(ActorRef destination) { this.destination = destination; } @Override public int id() { return 1; } @Override public void onReceive(Object message) throws Exception { if (message instanceof Message) { Message msg = (Message)message; //转发修改后的事件消息到目的地(sender 引用也一起过去了) destination.forward(msg.withEvent( String.format("processed %d event messages so far", counter)), getContext()); } } } public class Destination extends UntypedActor { @Override public void onReceive(Object message) throws Exception { if (message instanceof Message) { Message msg = (Message)message; // … //回复给 sender getSender().tell( String.format("done processing event = %s", msg.event()), getSelf()); } } } final ActorRef destination = system.actorOf(Props.create(Destination.class)); final ActorRef channel = extension.channelOf(DefaultChannelProps.create(1, destination), system); final ActorRef processor = extension.processorOf(Props.create(Processor.class, channel), system);
当使用MessageEmitter(另请参见Emitter )应用程序可以选择使用方法sendEvent或forwardEvent。sendEvent需要一个(隐式)sender 引用作为参数,而forwardEvent使用当前消息的sender,并对消息进行转发。它们分别与 ActorRef 的tell(!)方法和forward方法工作方式相同。
从本节代码包含在SenderReferences.scala和SenderReferences.java 。它可以从 SBT 提示与执行
Scala:
> project eventsourced-examples > run-main org.eligosource.eventsourced.guide.SenderReferences
Java:
> project eventsourced-examples > run-main org.eligosource.eventsourced.guide.japi.SenderReferences
信道也是一个Actor ,用于跟踪成功的事件消息传递。event-sourced模式的处理器使用信道来避免在消息回放时重复给目的地发送消息。某些信道也可以独立 (standalong)使用 ,即不与event-sourced处理器绑定在一起。
Martin Fowler 的文章event sourcing中有一节External Updates描述了一个信道的使用案例 。本文快速入门示例中信道使用一节也给出了一个起步的例子。
目前,EventSourced 库提供了两个不同的信道实现,默认信道 (DefaultChannel)和可靠信道 (ReliableChannel) ,并基于可靠信道还实现了一个个可靠的“请求-应答”信道 (reliable request-reply channel) 。 这些将在下面的小节解释。
默认信道是传递事件消息到接收Actor一个临时信道。当接收 Actor 在收到的Message上调用confirm()或confirm(true) 进行确认时,一个确认信息会被异步写入日志。当消息回放时,消息会与日志中的确认信息进行匹配,能匹配上的就不会被再次发送给接收者。
如果接收者对事件消息进行了否定的应答(confirm(false),该消息就会在回放时重新发送。 如果消息没有被确认,回放时也会重新发送。因此,在存在消息丢失或否定应答的情况下,接收者从一个缺省信道收到的事件消息次序可能和event-sourced处理器生成事件消息的顺序不同。
DefaultChannel创建后要在EventsourcingExtension 上注册,如下:
val extension: EventsourcingExtension = … val destination: ActorRef = … val channelId: Int = … val channel: ActorRef = extension.channelOf(DefaultChannelProps(channelId, destination))
该channelId必须是一个正整数并在应用程序运行时保持不变。EventsourcingExtension的channels可以获得所有已注册的信道映射,Map[Int, ActorRef] , 主键是信道 ID 。信道还可以定义一个名称(另见Emitter)。
// … val channelId: Int = … val channelName: String = … val channel: ActorRef = extension.channelOf( DefaultChannelProps(channelId, destination).withName(channelName))
调用namedChannels方法,可以得到所有已注册的有名信道,Map[String, ActorRef],主键是信道名称。
默认信道保留sender引用 。因此,应用程序可以对信道Actor使用?和forward方法,消息会传递到信道的接收者Actor。不过,使用时?必须特别注意:消息回放时,已被信道接收者确认的消息会被信道忽略,从而接收者无法对回放的消息进行回复。因此,发送方将得到一个响应超时。要避免这种情况发生,应用可以预先判断信道是否会忽略消息。例如,应用可以维护一个Message.acks列表,如果信道id 在列表中,其消息就可能会被忽略,对这种信道就不应该使用ask方法。
val channelId: Int = … val channel: ActorRef = … if (!message.acks.contains(channelId)) channel ? message.copy(…) onComplete { case result => … }
关于 message.copy(…) 另请参阅使用提示。
可靠的信道是一个具有持久化能力的信道,它会在把消息传递给接收者Actor之前,就把消息写入日志。与默认信道相比,可靠信道保留event-sourced处理器产生消息的顺序并在接收者发生错误时会试图重发消息。因此,使用可靠的信道时,如果消息接收端临时发生了错误,即使不进行消息回放 , 应用程序能也能够恢复正常运行。此外,一个可靠的信道,也可以从 JVM 崩溃中恢复。这样就无论是接收者出故障,还是发送者出故障,应用程序都可以获得“ 至少一次 ”的消息传递保证。
当接收者确认收到事件消息时,所存储的消息从信道移除,下一个开始传送。如果接收者没有确认 ( 如 , 超时 ) 或给出一个否定确认,信道会在一定重发延迟内进行重试。如果达到最大重试次数还不成功,信道在一定的重启延迟后重新启动自己,再进行重发消息。如果达到最大的重启次数还不成功,信道会停止消息传递,并且发布一个DeliveryStopped事件给信道actor所属的Actor System。应用程序要重新激活信道,可以调用 EventsourcingExtension的deliver(Int) 的方法,将信道 ID 作为参数传给它。请参阅ReliableChannel API 文档了解详情。
ReliableChannel的创建和注册与默认信道类似,唯一不同的是需要使用配置对象ReliableChannelProps。
// … val channel: ActorRef = extension.channelOf(ReliableChannelProps(channelId, destination))
这个配置对象还允许应用程序配置信道的可靠策略 (RedeliveryPolicy)。
可靠信道会保存sender引用。 因此,应用程序可以对信道Actor使用?和forward 方法,消息会传递到信道的接收者Actor。详情已在默认信道部分描述过。可靠信道还会将sender引用和事件消息一起存储,这样即使信道重启,事件消息和sender仍能够原样传递到接收者。如果sender是远程引用,即使从JVM崩溃 ( 如信道所在的jvm 崩溃 ) 中恢复,引用仍然有效。
熟悉 Akka 的人会发现可靠信道与类似于Akka 可靠代理很相似,不同的是它还能使应用从发送方的 JVM 崩溃从恢复过来 ( 另见Remote destinations )。
可靠的“请求-应答”信道是在可靠信道基础上实现的。 它在发送者(通常是event-sourced处理器 )和接收者插入可靠的请求响应机制。 与一般可靠信道相比,它还有下列附加的属性:
在Message转交给接收者之前,提取其中的请求内容。
将接收者的回复包装进一个Message,再发送回请求的发送者。
如果在配置的超时时间内,接收者没有应答,它会发送一个特殊的DestinationNotResponding答复给请求的发送方。
发送一个特殊的DestinationFailure答复请求发送方,如果接收者响应Status.Failure。
对请求的发送方保证在“最少一次”的应答交付(当然,对接收方也有“最少一次”的请求交付)。
需要对接收者的回复再给一个确认应答,来标志一次请求-应答交互成功完成。
如果缺少应答或得到否定应答,会重发请求及后续的回复。
可靠的“请求-应答”信道的创建和注册与可靠信道类似,只是需要使用一个ReliableRequestReplyChannelProps配置对象。
// … import org.eligosource.eventsourced.patterns.reliable.requestreply._ val channel: ActorRef = extension.channelOf( ReliableRequestReplyChannelProps(channelId, destination))
这个配置对象还允许应用程序为接收者的回复配置一个replyTimeout。一个可靠的“请求-应答”信道的详细用法例子在这篇文章中 。
信道必须在使用前激活,请参阅 extension.deliver ()。
为了让信道正常工作,event-sourced处理器必须从收到( 或日志中 )的消息中复制processorId和sequenceNr到输出的事件消息中。这通常是通过调用输入Message的copy()方法,仅更新那些需要修改的字段即可。如:
class Processor(channel: ActorRef) extends Actor { def receive = { case msg: Message => { // … channel ! msg.copy(event = …, ack = …) } } }
如果使用Emitter,这个动作可以自动完成。
可靠信道和可靠的“请求-应答”信道也可以独立于Eventsourced 理器使用。独立使用时,sender 必须将待发送的Message的Message.processorId设置为0(这是默认值):
val channel = extension.channelOf(ReliableChannelProps(…)) channel ! Message("my event") // processorId == 0
这相当于直接发送 Message.event :
channel ! "my event"
可靠信道内会在内部将接收到的事件封装成为一个Message,并将Message的processorId设置为0 。将processorId 置为0会让可靠信道不对这个消息做写确认(acknowledgement)。event-sourced处理器收到的消息都会和一个确认(acknowledgement)相关联。但在这里因为信道没有绑定处理器,所以这个确认也是不必要的。另一个需要关闭写确认的场景请查看事件系列。
如果发送方处理器需要将消息传递到远程接收者,要求消息“ 最少一次 ”到达,并且要按照发送的顺序到达,那么应用程序应该考虑使用可靠的信道,这可以防止:
发送者和目的地之间的网络错误(远程Actor引用仍然有效,但远程Actor暂时不可用)
目标JVM的崩溃(远程Actor引用无效),并
发送方JVM崩溃(消息已经到达sender处理器,但尚未传递到远程的接收者,应该在sender从崩溃中恢复时自动发送)
如果将远程Actor的引用作为信道的接收方,能解决第1中情况的问题,但第2种情况不行。对第2种情况有一个可行的办法,使用本地Actor作为远程Actor的代理与远程Actor(通过ActorSelection得到)通信。ActorSelection可以从一个Actor路径来创建,它并不和远程Actor的生命周期绑定。
class DestinationProxy(destinationPath: String) extends Actor { val destinationSelection: ActorSelection = context.actorSelection(destinationPath) def receive = { case msg => destinationSelection tell (msg, sender) // forward } } val destinationPath: String = … val proxy = system.actorOf(Props(new DestinationProxy(destinationPath))) val channel = extension.channelOf(ReliableChannelProps(1, proxy))
当然,使用 ActorSelection 的办法也能解决第 1 种情况的问题。第 3 种情况反而最简单,只要 sender 处理器使用可靠信道就没问题。 如下例:
class Sender extends Actor { val id = 1 val ext = EventsourcingExtension(context.system) val proxy = context.actorOf(Props(new DestinationProxy( "akka.tcp://example@127.0.0.1:2852/user/destination"))) val channel = ext.channelOf(ReliableChannelProps(1, proxy)) def receive = { case msg: Message => channel forward msg } } //创建和恢复发送者和其信道 val sender = extension.processorOf(Props(new Sender with Eventsourced)) sender ! Message("hello")
如果远程的Actor是一个Eventsourced Actor,必须特别注意。 在这种情况下,应用程序必须确保远程的Actor在成功恢复之后,只能通过远程方式进行访问。这可以实现,例如,通过使用一个附加endpointActor,简单地转发消息给到目的地Actor。endpoint Actor使用目标路径进行注册,在目标Actor恢复之后与之关联起来。
class DestinationEndpoint(destination: ActorRef) extends Actor { def receive = { case msg => destination forward msg } } class Destination extends Actor { val id = 2 def receive = { case msg: Message => { println(s"received ${msg.event}") msg.confirm() } } } val destination = extension.processorOf(Props(new Destination with Eventsourced)) //等待目标完成恢复 extension.recover() //使目标恢复后可以远程访问 system.actorOf(Props(new DestinationEndpoint(destination)), "destination")
这确保了新的远程消息永远不会和恢复期间重播的消息交叉弄混。
从本节代码包含在ReliableChannelExample.scala。发送方应用程序可以从 SBT 开始与
> project eventsourced-examples > run-main org.eligosource.eventsourced.example.Sender
远程接收者启动:
> project eventsourced-examples > run-main org.eligosource.eventsourced.example.Destination
发送方应用程序提示用户在标准输入敲入消息,消息会可靠地传送到远程目标。
恢复是实现event-sourced模式的应用程序进行状态重建的过程。 恢复动作通常是在应用程序启动时完成,无论程序是在正常终止或崩溃后重启(但也可以做到任何时间执行,甚至是单个处理器和信道也可以做恢复动作)。
val system: ActorSystem = … val journal: ActorRef = … val extension = EventsourcingExtension(system, journal) //创建和注册event-sourced处理器 extension.processorOf(…) // … //创建和注册信道 extension.channelOf(…) // ... //恢复注册的处理器状态,并激活信道 extension.recover() //处理器和信道都已经可以使用 // …
recover()方法首先向所有已注册的处理器重播日志记录的事件消息。 通过重播事件消息历史,处理器可以恢复状态。消息回放期间,处理器也会向一个或多个信道发送消息。这些信道要么忽略在之前应用程序运行是已经成功交付(即收到过确认 )的消息,或缓冲消息,稍后传递。重播结束后,recover()方法触发信道传送被缓冲的消息。
如果信道立即传递,而不是先缓冲事件消息,传递的事件消息很可能与重播的事件消息错误地交织在一起。这可能会导致在事件消息顺序的不一致,从而引起应用程序状态的不一致。因此,恢复动作必须确保所有重放的事件消息已经放入处理器邮箱之后,才传递缓冲确保缓冲的事件消息。这对于连接成环状、有向图形式的的处理器和信道的恢复尤其重要。
可以给EventsourcingExtension.recover(Seq[ReplayParams]) 方法 ( 或其他的重载方法 ) 传递一个ReplayParams来对重播过程进行定制。ReplayParams允许对各个处理器的状态恢复做细粒度地控制。对于每一个需要恢复的处理器,应用程序都要为它创建一个ReplayParams实例。ReplayParams指定
是否应该重播从头开始,或是从一个快照,或从一个给定的顺序号(指定起始序号)。
一直恢复到当前状态,还是在某一个历史状态终止(指定终止序号)
下面两小节演示一些 ReplayParams 用法示例。 欲了解更多详情,请参阅的 API 文档ReplayParams及其伴生对象 。 有关创建快照的详细信息请参阅快照。
如前所述:
val extension: EventsourcingExtension = … import extension._ recover()
没有指定序号的上下限恢复所有的处理器,所有消息都会被重播。 这等效于
recover(replayParams.allFromScratch)
或
recover(processors.keys.map(pid => ReplayParams(pid)).toSeq)
如果应用程序只是想恢复特定的处理器,它应该只为这些处理器创建ReplayParams。例如
recover(Seq(ReplayParams(1), ReplayParams(2)))
只有恢复与IDS为1和2处理器 。也可以指定序号的上下范围。
recover(Seq(ReplayParams(1, toSequenceNr = 12651L), ReplayParams(2, fromSequenceNr = 10L)))
这里处理器1将会收到范围在0到12651(含)之间的消息,处理器2与接收到从10开始的所有消息。
在基于快照恢复时,处理器会先收到一个SnapshotOffer,再收到剩余的事件消息之前(如果有的话)。处理器使用SnapshotOffer消息以恢复其状态。
Scala:
class Processor extends Actor { var state = // … def receive = { case so: SnapshotOffer => state = so.snapshot.state // … }
Java:
public class Processor extends UntypedEventsourcedActor { private Object state = // … @Override public void onReceive(Object message) throws Exception { if (message instanceof SnapshotOffer) { SnapshotOffer so = (SnapshotOffer)message; state = so.snapshot().state(); } // … } }
基于快照的恢复只会给处理器发送一个 SnapshotOffer 消息,即使之前已经为处理器创建了多个快照,并且这些快照都匹配相应ReplayParams。 toSequenceNr和snapshotFilter可以用来限定快照选择的标准。如果处理器没有快照的或现有的快照不匹配ReplayParams条件,事件消息将从头开始播放,即从序号0开始 。
要从最新快照恢复所有的处理器可以调用:
recover(replayParams.allWithSnapshot)
这等效于
recover(processors.keys.map(pid => ReplayParams(pid, snapshot = true)).toSeq)
基于快照恢复也可以指定序号上限。
recover(Seq(ReplayParams(1, snapshot = true, toSequenceNr = 12651L)))
这将恢复处理器1先恢复到序号<= 12651的最新快照 。在重播其后序号小于12651(含)的事件消息(如果有的话)。应用程序也可以为快照定义进一步的约束。 例如:
import scala.concurrent.duration._ val limit = System.currentTimeMillis - 24.hours.toMillis recover(Seq(ReplayParams(1, snapshotFilter = snapshotMetadata => snapshotMetadata.timestamp < limit)))
对处理器 1 使用 24 小时内的最新快照。这是通过 snapshotFilter 来实现,它基于时间戳对快照进行过滤。快照过滤器在SnapshotMetadata上操作 。
recover方法等待回放的消息被发送到所有处理器(通过 "!" 方法)但不会等待这些处理器处理完回放的事件消息。然而,发送到任何注册的处理器的任何新的消息,会在 recover 成功返回后,由处理器在重播事件消息处理完之后处理。如果应用程序希望等待处理器处理完所有回播的事件消息,可以使用EventsourcingExtension的 awaitProcessing() 方法 。
val extension: EventsourcingExtension = …
extension.recover()
extension.awaitProcessing()
这特别适合于应用程序使用 STM 引用来保存状态的情景。这时应用程序会希望在收到客户端新的读请求之前,所有的(外部可见的)状态都已经完全恢复完成。默认情况下, awaitProcessing() 方法等待所有已注册的处理器完成处理,而应用程序也可以指定注册的处理器的子集。
在 recover 和 awaitProcessing 方法会阻塞调用线程。如果 event-sourced 应用程序希望在主线程中完成所有回复动作后再干新的事情,这可能比较方便。在其他情况下,例如,只是恢复 Actor 中的个别子处理器(和信道)(见OrderExampleReliable.scala ),应使用非阻塞恢复 API :
val extension: EventsourcingExtension = … val future = for { _ <- extension.replay(…) _ <- extension.deliver(…) // optional _ <- extension.completeProcessing(…) // optional } yield () future onSuccess { case _ => // event-sourced processors now ready to use … }
在这里, replay 、deliver和completeProcessing返回的future由for表达式组合在一起,保证这些异步操作能够顺序依次执行。当组合的future完成时,被恢复处理器和信道就可以使用了。 更多细节在API 文档 。 该replay方法也可以用ReplayParams定制 ( 参见Replay parameters).
信道甚至可以在创建之后激活之前,立即被应用程序使用。这一点非常重要,尤其是当event-sourced(父)处理器在处理事件的过程中创建新的event-sourced子处理器时:
class Parent extends Actor { this: Receiver with Eventsourced => import context.dispatcher var child: Option[ActorRef] = None def receive = { case event => { //创建用信道封装的子处理器 if (child.isEmpty) { child = Some(createChildProcessor(2, 2)) } //信道可立即使用 child.foreach(_ ! message.copy(…)) } } def createChildProcessor(pid: Int, cid: Int): ActorRef = { implicit val recoveryTimeout = Timeout(10 seconds) val childProcessor = extension.processorOf(Props(new Child with Receiver with Eventsourced { val id = pid } )) val childChannel = extension.channelOf(DefaultChannelProps(cid, childProcessor)) for { // asynchronous, non-blocking recovery _ <- extension.replay(Seq(ReplayParams(pid))) _ <- extension.deliver(cid) } yield () childChannel } } class Child extends Actor { this: Receiver => def receive = { case event => { … confirm() } } }
在这里,Parent收到事件消息后,创建一个新的childProcessor(以默认信道包装)。该childChannel是由Parent创造立即被使用,于此同时,子处理器的消息回放和信道的激活也在进行。 这是可行的,因为信道在内部会缓冲的新的消息,在它完成激活后再发送给目的方。这确保了新的消息只会在 子信道 激活完成后才被传递到 子处理器 。在Parent的恢复期间,childChannel将忽略已成功交付给childActor的消息(即childActor已经确认的消息)。
一个Eventsourced处理器的行为可能依赖于其他 Eventsourced 处理器的状态。例如,处理器A发送一个消息给处理器B ,然后处理器B回复一个消息,消息中包含了处理器B的状态(或部分状态)。根据回复中状态的不同,处理器A可以采取不同的行动。为了确保在这种情况下能够正确恢复恢复,A与B之间的任何状态传输或状态相关的消息交换都必须是Message类型 (参见DependentStateRecoverySpec.scala )。通过非日志化的消息交换状态数据(如传输非 Message 类型的数据 )会导致无法保证恢复时的一致性。还有一种情况也有类似问题:如果一个Eventsourced处理器通过一个外部可见的STM引用保存状态,并且另外一个Eventsourced处理器直接读取该引用。Eventsourced处理器之间的通信与外部查询 (external queries )和外部更新 (external updates)密切相关 。
一个快照代表处理器在一个特定时间点的状态,可以用来大大减少恢复所需的时间。快照捕捉和保存由应用程序触发,快照并不会删除事件消息的历史记录,除非由应用程序明确要求。
应用程序可以通过给送Eventsourced处理器发 SnapshotRequest消息( Scala API )或SnapshotRequest.get() 消息( Java API )来创建快照。
Scala:
import org.eligosource.eventsourced.core._ // … val processor: ActorRef = … processor ! SnapshotRequest
Java:
import org.eligosource.eventsourced.core.*; // … final ActorRef processor = … processor.tell(SnapshotRequest.get(), …)
这将异步采集和保存processor的状态快照。快照成功保存后发送方将收到通知。
Scala:
(processor ? SnapshotRequest).mapTo[SnapshotSaved].onComplete { case Success(SnapshotSaved(processorId, sequenceNr, timestamp)) => … case Failure(e) => … }
Java:
ask(processor, SnapshotRequest.get(), 5000L).mapTo(Util.classTag(SnapshotSaved.class)).onComplete(new OnComplete<SnapshotSaved>() { public void onComplete(Throwable failure, SnapshotSaved result) { if (failure != null) { … } else { … } } }, system.dispatcher());
或者,应用程序也可以使用EventsourcingExtension.snapshot方法来触发快照的创建。举个例子,
Scala:
val extension: EventsourcingExtension = … extension.snapshot(Set(1, 2)) onComplete { case Success(snapshotSavedSet: Set[SnapshotSaved]) => … case Failure(_) => … }
Java:
Set<Integer> processorIds = new HashSet<Integer>(); processorIds.add(1); processorIds.add(2); extension.snapshot(processorIds, new Timeout(5000L)).onComplete(new OnComplete<Set<SnapshotSaved>>() { public void onComplete(Throwable failure, Set<SnapshotSaved> snapshotSavedSet) { if (failure != null) { … } else { … } } }, system.dispatcher());
为处理器1和2创建快照 ,返回的 Future (类型的 Future[scala.immutable.Set[SnapshotSaved]]--Scala API )或 Future<java.util.Set<SnapshotSaved>> --Java API))成功完成时,两个处理器的快照已保存成功。
为了能够保存快照,处理器必须处理SnapshotRequest的消息,以其当前 state 作为参数调用它自身的的 process 方法:
Scala:
class Processor extends Actor { var state = … def receive = { case sr: SnapshotRequest => sr.process(state) // … } }
Java:
public class Processor extends UntypedEventsourcedActor { private Object state = … @Override public void onReceive(Object message) throws Exception { if (message instanceof SnapshotRequest) { SnapshotRequest sr = (SnapshotRequest)message; sr.process(state, getContext()); } // … } }
调用process将异步保存state参数,快照产生的元数据也会一起保存。 创建一个新的快照不会删除旧的快照,除非由应用程序明确要求。 因此,每个处理器可以有 N 个快照。
SnapshotExample.scala和SnapshotExample.java 中的例子示范快照的创建和基于快照的恢复。它可以从 SBT 中执行:
Scala:
> project eventsourced-examples > run-main org.eligosource.eventsourced.example.SnapshotExample
Java:
> project eventsourced-examples > run-main org.eligosource.eventsourced.example.japi.SnapshotExample
所有日志都支持快照,它是通过抽象的Hadoop FileSystem来实现的。缺省的 FileSystem 实例就是本地文件系统,默认情况下快照会写在本地,除非应用程序指定了另外的配置。关于如何创建基于 HDFS , FTP , S3 的 FileSystem 实例请参考Hadoop的文档。应用程序定义的FileSystem为实例可以如下配置:
Scala:
// … import org.apache.hadoop.fs.FileSystem // … val hdfs: FileSystem = FileSystem.get(...) val journal: ActorRef = LeveldbJournalProps(..., snapshotFilesystem = hdfs).createJournal
Java:
// … import org.apache.hadoop.fs.FileSystem; // … final FileSystem hdfs = FileSystem.get(…); final ActorRef journal = LeveldbJournalProps.create(...).withSnapshotFilesystem(hdfs).createJournal(system);
请查看HadoopFilesystemSnapshottingProps API 文档了解更多信息。
Envent-sourced Actor 一般都会混入Receiver ,Emitter和/ 或Eventsourced 特质(Scala API)或继承自相应的 Untyped* 系列基类(Java API),这些Acotr可以用方法become()和unbecome()动态改变Actor的行为。become()和 unbecome()方法定义在Behavior特质中,Receiver、 Emitter和Eventsourced都继承自Behavior。
即使Actor使用become()和unbecome()改变了自己的行为,它从Receiver ,Emitter和/或Eventsource 特质引入的功能仍然有效。 例如,一个混入了Eventsourced特质的Actor(Scala API )使用become()改变其行为后仍会继续记录日志事件消息。
但是, Actor 如果使用 context.become()(Scala API )或getContext().become() 的 Java API ) 改变其行为,将会丧失由 Receiver、 Emitter和/或Eventsourced等特质引入的功能(即使可以使用context.unbecome() 或 getContext().unbecome()切换回来)
当一个处理器根据一个输入的事件消息产生出多个输出事件消息,并将这些输出信息发出到一个信道,我们称它产生了一个事件消息系列。对于事件消息系列,事件处理器应该将系列中除了最后一个消息外的所有消息的 ack 字段设置为 false
class Processor(channel: ActorRef) extends Actor { def receive = { case msg: Message => { // … channel ! msg.copy(event = "event 1", ack = false) // 1st message of series channel ! msg.copy(event = "event 2", ack = false) // 2nd message of series // … channel ! msg.copy(event = "event n") // last message of series } } }
如果处理器使用了发射器,则按如下方式:
class Processor extends Actor { this: Emitter => def receive = { case event => { // … emitter("channelName") send (msg => msg.copy(event = "event 1", ack = false)) // 1st message of series emitter("channelName") send (msg => msg.copy(event = "event 2", ack = false)) // 2nd message of series // … emitter("channelName") sendEvent "event n" } } }
这确保了当确认信息被写入到日志之前,系类的最后一条消息已经成功完成下述动作 ,
不过对于目标接收者,应该确认收到的每一个事件消息,而不管其是否属于一个系列。
在某些故障条件下,信道可能将同一个消息不止一次地发送到目的地。一个典型的例子是,目标接收者给出了肯定确认的消息,但确认消息记入日志前不久,应用程序崩溃。在这种情况下,恢复期间目标接收者将再次接收到同样的事件消息。
由于这些(还有其他)的原因,信道的目标接收者地必须是幂等事件消息的消费者,这要在应用级别考虑。假如,有一个事件消息的消费者,它将收到的订单存储在一个映射 (Map,key 是订单号 ) 中 , 由于无论它收到订单消息一次或多次,都是同样的结果,映射中包含的订单只有一份,所以这是一个幂等消费者。如果一个事件消息的消费者任务是对收到的订单进行计数,那么它就不是一个幂等消费者。因为从业务逻辑看,重复收到的消息会导致错误的计数。 在这种情况下,该事件消息的消费者必须采取一些额外的手段来检测并处理重复的事件的消息 。
为了检测重复,应用程序应该对他们的事件进行标识。event-sourced处理器应该在事件被发送到信道之前设置标识值。信道目的接收者(或其他下游消费者)应该保持成功处理事件标识,并将它与新接收到的事件的标识进行比较。如果新收到的事件标志在已记录的标识中已经存在,这被认为是一个重复消息(假设发射处理器生成的标识符是唯一的)。 为了生成唯一标识符,处理器可以使用接收到的事件消息的序列号:
case class MyEvent(details: Any, eventId: Long) class Processor extends Actor { this: Emitter with Eventsourced => def receive = { case event => { // get sequence number of current event message val snr: Long = sequenceNr val details: Any = … // … emitter("channelName") sendEvent MyEvent(details, snr) } } }
使用序列号的优势在于,消费者只需要记住上次成功收到事件的标识。如果新收到的事件的标识是否小于或等于原先记住的值,就认为是重复的,可以忽略。
class Consumer extends Actor { var lastEventId = 0L def receive = { case MyEvent(details, eventId) => if (eventId <= lastEventId) { // duplicate } else { // … lastEventId = eventId } } }
如果消费者是 event-sourced 处理器,则可以将事件标识存储在自己的状态数据中,消息回放时事件标志也能被恢复。其他类型的消费者必须有自己的方式保存标识。
发出系列事件的处理器除了使用唯一标识外还应该使用一个事件消息的索引来标定发出的事件:
case class MyEvent(details: Any, eventId: (Long, Int)) class Processor extends Actor { this: Emitter with Eventsourced => def receive = { case event => { // get sequence number of current event message val snr: Long = sequenceNr val details: Seq[Any] = … // … emitter("channelName") send (msg => msg.copy(event = MyEvent(details(0), (snr, 0)), ack = false)) emitter("channelName") send (msg => msg.copy(event = MyEvent(details(1), (snr, 1)), ack = false)) // … } } }
为了检测重复,消费者应该对 “ 序列号 - 索引”对进行比对。
应用程序可以为 Message 中的事件自定义序列化方式。自定义序列化可以用与事件的日志记录和远程通讯。它们可以和任何其他的Akka 序列化一样的方式进行配置 。 例如:
akka { actor { serializers { custom = "example.MyEventSerializer" } serialization-bindings { "example.MyEvent" = custom } } }
在这里, example.MyEvent是一个应用程序特定的事件类型,example.MyEventSerializer是应用程序特定的序列器,它继承自akka.serialization.Serializer。
import akka.serialization.Serializer class CustomEventSerializer extends Serializer { def identifier = … def includeManifest = true def toBinary(o: AnyRef) = … def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]) = … }
事件Message 自己使用Enven-Sourced 库预定义的序列化方式。只要 eventsourced-journal-common-*.jar 在 Akka 应用程序的类路径中,这个用于Message的序列化器会自动被应用上。
本节中的订单管理例子是取自Martin Fowler 的著名的文章The LMAX Architecture :
想象一下,你正在使用信用卡下单购买糖豆。一个简单的零售系统,将获取您的订单信息,使用信用卡验证服务来检查你的信用卡号码,然后确认您的订单,所有这些动作在一个单一操作中完成。处理您的订单的线程在等待信用卡检查结果时将会被阻塞,阻塞时间对用户来说不会太长,服务器在等待时也可以在处理器上运行的另一个线程。
在 LMAX 架构中,你会将这个操作一分为二。第一个操作获取信用卡信息,向信用卡公司发出一个事件 ( 信用卡校验请求 ) 就会完成。业务逻辑处理器会继续处理其他客户事件,直到在其输入事件流收到一个信用卡验证事件。处理这个事件时在继续进行前述订单的确认动作。
这可以通过Eventsourced库来实现,如图(见附录 A )。
我们将前文提到的 业务逻辑处理器 处理器作为event-sourced Actor来实现(OrderProcessor)。它处理OrderSubmitted事件时将一个id赋予已提交的订单,并将订单保存在一个Map映射(这就是OrderProcessor的状态数据)中。对于每一个提交的订单会他发出一个CreditCardValidationRequested事件。
CreditCardValidationRequested事件由CreditCardValidator Actor 处理。它联系外包的信用卡验证服务,验证完成后,对每一份信用卡有效的订单,他都将一个 CreditCardValidated事件发送回 OrderProcessor。在下面的例子中为了实现简单,我们没有真的使用外部的服务,,但对于真实的实现,akka-camel 将特别适合用到这里。
收到CreditCardValidated事件,event-sourced OrderProcessor 更新的相应订单状态,即设置validated = true,并发送一个OrderAccepted事件,包含更新的订单信息到它的Destination。这也会将更新后的订单回发给初始的发送者。
领域对象Order ,领域域事件和以及OrderProcessor定义如下:
//领域对象 case class Order(id: Int = -1, details: String, validated: Boolean = false, creditCardNumber: String) // 领域事件 case class OrderSubmitted(order: Order) case class OrderAccepted(order: Order) case class CreditCardValidationRequested(order: Order) case class CreditCardValidated(orderId: Int) //event-sourced订单处理器 class OrderProcessor extends Actor { this: Emitter => var orders = Map.empty[Int, Order] // processor state def receive = { case OrderSubmitted(order) => { val id = orders.size val upd = order.copy(id = id) orders = orders + (id -> upd) emitter("validation_requests") forwardEvent CreditCardValidationRequested(upd) } case CreditCardValidated(orderId) => { orders.get(orderId).foreach { order => val upd = order.copy(validated = true) orders = orders + (orderId -> upd) sender ! upd emitter("accepted_orders") sendEvent OrderAccepted(upd) } } } }
OrderProcessor处理器将一个CreditCardValidationRequested事件以Message的形式通过信道"validation_requests"发送给 CreditCardValidator。 该forwardEvent方法不仅发送事件,还会将事件的发送者设置为最初的sernder引用 。 CreditCardValidator 在接收到CreditCardValidationRequested事件后,在后台执行一个信用卡验证动作,验证完成后并回发一个CreditCardValidated事件给 OrderProcessor。
class CreditCardValidator(orderProcessor: ActorRef) extends Actor { this: Receiver => def receive = { case CreditCardValidationRequested(order) => { val sdr = sender // initial sender val msg = message // current event message Future { //做一些信用卡验证 // … // and send back a successful validation result (preserving the initial sender) orderProcessor tell (msg.copy(event = CreditCardValidated(order.id)), sdr) } } } }
CreditCardValidator 向 OrderProcessor 发送 CreditCardValidated 事件时,再次把之前保存的初始sender引用一起发给OrderProcessor,这样OrderProcessor就可以答复初始的sender引用。OrderProcessor还会使用名为"accepted_orders"信道的信道发送一个OrderAccepted 件给最终的Destination。
class Destination extends Actor { def receive = { case event => println("received event %s" format event) } }
下一步是将这些对象协同在一起,并且恢复他们:
val extension: EventsourcingExtension = … val processor = extension.processorOf(Props(new OrderProcessor with Emitter with Confirm with Eventsourced { val id = 1 })) val validator = system.actorOf(Props(new CreditCardValidator(processor) with Receiver)) val destination = system.actorOf(Props(new Destination with Receiver with Confirm)) extension.channelOf(ReliableChannelProps(1, validator).withName("validation_requests")) extension.channelOf(DefaultChannelProps(2, destination).withName("accepted_orders")) extension.recover()
名为 "validation requests" 的信道是一个可靠的信道,当 CreditCardValidator 故障(例如,外部信用卡验证服务暂时不可用)时会重发 CreditCardValidationRequested 。此外,应该指出的是, CreditCardValidator 没有对收到的事件消息做确认(它既没有显式的调用 confirm() 也没有在初始化时混入 Confirm 特质)。传输确认是在 OrderProcessor 成功处理了 CreditCardValidated 事件时做出的。
现在 Order processor ,已经做好了接收 OrderSubmitted 事件的准备。
processor ? Message(OrderSubmitted(Order(details = "jelly beans", creditCardNumber = "1234-5678-1234-5678"))) onSuccess { case order: Order => println("received response %s" format order) }
以一个空的日志运行这个例子会在标准输出打印:
received response Order(0,jelly beans,true,1234-5678-1234-5678) received event OrderAccepted(Order(0,jelly beans,true,1234-5678-1234-5678))
运行示例时,您看到的订单信息可能有所不同。 提交订单的id被设置为OrderProcessor内部映射orders的初始大小,第一次运行时值为0。第二次运行时会先恢复之前的应用程序状态,所以生成的id变成了1 。 程序第二次运行将首先恢复以前的应用程序的状态,所以其他订单提交会产生一个订单id为1 。
received response Order(1,jelly beans,true,1234-5678-1234-5678) received event OrderAccepted(Order(1,jelly beans,true,1234-5678-1234-5678))
该示例代码包含在OrderExample.scala ,可以从SBT提示与执行
> project eventsourced-examples > run-main org.eligosource.eventsourced.example.OrderExample
这个例子的高级版本,使用的是前面讲述过的可靠的“请求-应答”信道 。
自从 Akka 2.1 ,将 event-sourcing 模式应用于 Akka FSM s 非常方便。下面演示了一个 门 (Door) 的状态机例子, 门 (Door) 有两种状态, Open 和 Closed 。
sealed trait DoorState case object Open extends DoorState case object Closed extends DoorState case class DoorMoved(state: DoorState, times: Int) case class DoorNotMoved(state: DoorState, cmd: String) case class NotSupported(cmd: Any) class Door extends Actor with FSM[DoorState, Int] { this: Emitter => startWith(Closed, 0) when(Closed) { case Event("open", counter) => { emit(DoorMoved(Open, counter + 1)) goto(Open) using(counter + 1) } } when(Open) { case Event("close", counter) => { emit(DoorMoved(Closed, counter + 1)) goto(Closed) using(counter + 1) } } whenUnhandled { case Event(cmd @ ("open" | "close"), counter) => { emit(DoorNotMoved(stateName, "cannot %s door" format cmd)) stay } case Event(cmd, counter) => { emit(NotSupported(cmd)) stay } } def emit(event: Any) = emitter("destination") forwardEvent event }
来看看状态的变化情况, " 门 (door)" 向名为 "destination" 的信道发出 DoorMoved 事件。 DoorMoved 事件中包含当前状态和移动计数。对无效移动尝试,例如试图打开已经打开的门,会发出 DoorNotMoved 事件。 信道接收者只是简单的将事件信息打印到标准输出。
class Destination extends Actor { def receive = { case event => println("received event %s" format event) } }
应用程序配置:
val system: ActorSystem = … val extension: EventsourcingExtension = … val destination = system.actorOf(Props(new Destination with Receiver with Confirm)) extension.channelOf(DefaultChannelProps(1, destination).withName("destination")) extension.processorOf(Props(new Door with Emitter with Eventsourced { val id = 1 } )) extension.recover() val door = extension.processors(1)
我们就可以开始发送事件消息到 door :
door ! Message("open") door ! Message("close")
标准输出会打印出:
received event DoorMoved(Open,1) received event DoorMoved(Closed,2)
当试图尝试一个无效的状态与变化
door ! Message("close")
信道 destination 将获得DoorNotMoved事件:
received event DoorNotMoved(Closed,cannot close door)
重新启动示例应用程序将恢复门的状态,所以
door ! Message("open") door ! Message("close")
会产生
received event DoorMoved(Open,3) received event DoorMoved(Closed,4)
从本节中的代码在FsmExample.scala ( 有少量修改 ) 。
这一节使用 Akka 集群 技术将前面例子中的 Door 状态机改造为高可用性系统。 该Door状态机在整个集群中是单子对象,由一组NodeActor管理。每个集群节点都有一个NodeActor侦听集群中的事件。如果某一个NodeActor成为 主节点 (Master) 它创建并恢复一个 Door 实例。其他 NodeActor 保留一个主节点上 Door 实例的远程引用。
客户端与 Door 单子实例通过集群的 NodeActor 交互,发送("open"或"close" )命令。集群中任何节点上NodeActor都会接收命令,不仅仅是主节点。NodeActor将这些命令封装成Message转发给Door对象。 由Door发出的事件Message 由命名信道 "destination" 信道发送个目标接收者 (Destination) 。目标接收者 (Destination) 根据收到的事件创建相应,并发送该响应给初始发送者。 运行 Destination 的 Actor 的应用程序不是集群的一部分,但是是一个独立的远程应用程序。它也同集群节点一样使用日志(在本例中是 SPOF ,将来的版本会使用分布式日志)。
当主节点崩溃,集群中的另一个节点会成为主节点并恢复 Door 状态机。其余从节点更新其远程引用指向新的主节点上的Door 实例。
从本节代码包含在ClusterExample.scala ,使用的配置文件journal.conf和cluster.conf 。 对于示例代码的详细说明,请参考代码中的注释。 若要从 SBT 分布式示例应用程序,首先启动承载的应用程序 Destination 的 Actor 和日志:
> run-main org.eligosource.eventsourced.example.DestinationApp
然后启动集群的第一个种子节点
> run-main org.eligosource.eventsourced.example.NodeApp 2561
然后启动集群的第二个种子节点
> run-main org.eligosource.eventsourced.example.NodeApp 2562
最后启动的第三个集群种子节点
> run-main org.eligosource.eventsourced.example.NodeApp
上面的命令要求你在eventsourced-examples项目。你可以在SBT中切换到这个工程
> project eventsourced-examples
第一个种子节点最有可能成为主节点,它在标准输出打印:
MASTER: recovered door at akka://node@127.0.0.1:2561
其他从节点输出:
SLAVE: referenced door at akka://node@127.0.0.1:2561
所有节点都会提示用户输入一个门操作命令:
command (open|close):
现在,我们在最后启动的一个群集节点(从节点)上输入命令:
Door 单子实例的初始状态是关闭状态。敲入 open 命令打开它:
command (open|close): open moved 1 times: door now open
然后再关闭它:
command (open|close): close moved 2 times: door now closed
试图关闭一个已经关上的门会导致错误:
command (open|close): close cannot close door: door is closed
现在,使用 ctrl^c 杀掉主节点 。 Door 单子实例也会被破坏。经过 1-2 秒后,新的主节点已由集群决定。新的主节点会恢复 event-sourced Door 子实例。从节点将更新其对 Door 的远程引用 。 要验证 Door 已经恢复正常,再次打开门:
command (open|close): open moved 3 times: door now open
你可以看到, Door状态(其中包含了过去的移动次数)已从故障中正确恢复。
Eventsourced 库还定义了组播 (Multicast)处理器 , 用来将收到的事件消息转发给多个目标。 相比让多个 Eventsourced 处理器接收相同的事件消息,使用 多播 Multicast 处理器更为优化。 使用多播处理器,接收的事件消息只会记入日志一次,而使用 n Eventsourced 处理器的消息将会记入日志 n 次(每个处理器一次)。进行了如果有大量的接收目标,使用 Multicast 处理器可以显著节省磁盘空间并提高吞吐量。
应用程序可以用定义在包core 中的多播工厂方法创建 Multicast 处理器。
// … import org.eligosource.eventsourced.core._ val extension: EventsourcingExtension = … val processorId: Int = … val target1: ActorRef = … val target2: ActorRef = … val multicast = extension.processorOf(Props(multicast(processorId, List(target1, target2))))
这等效于
val multicast = extension.processorOf(Props(new Multicast(List(target1, target2), identity) with Eventsourced { val id = processorId } ))
应用程序如果想在MessageS 被转发到目标之前修改接收的事件,可以指定一个transformer的函数。
val transformer: Message => Any = msg => msg.event val multicast = extension.processorOf(Props(multicast(1, List(target1, target2), transformer)))
在上面的例子中,transformer 函数从接收到的事件Message提取的event。如果transformer函数没有指定,则默认为identity函数。还有一个 Multicast 工厂方法decorator,这个方法用来创建只有一个目标接收者的组播处理器。
Commercial support byEligotech B.V.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。