当前位置:   article > 正文

Akka-CQRS(1)- Write-side, Persisting event sources:CQRS存写端操作方式_akka persistasync

akka persistasync

上篇我们提到CQRS是一种读写分离式高并发、大流量数据录入体系,其中存写部分是通过event-sourcing+akka-persistence实现的。也可以这样理解:event-sourcing(事件源)是一种特殊数据录入模式,akka-persistence是这种模式的具体实现方式。事件源的核心思想是把某写发生的事件写入log(journal)。这些事件是能成功改变系统状态,并且时已经发生的事情。一开始我常常把事件源和命令源(command-sourcing)混为一谈。它们根本的区别事件event是已经发生的,命令command是待发生的。如果我们把命令存入journal,在对journal里的记录进行重新演算(replay)时就会执行命令并产生一些副作,如显示打印、发email等等。而系统状态和环境随着时间在不断变化,这些副作用也会在不同的时间产生不同的影响,这肯定是我们不想看见的。

事件源模式中,在内存里保存能代表程序状态的对象state-objects,这些状态对象与数据库表model之间建立了对应关系。假设程序中支持某些指令command,它们会改变程序的状态并且还可能还会产生一些副作用,那么用事件源做法的操作顺序应该是:产生副作用->存写事件->改变内存里的状态对象。其中任何一个环节失败都会放弃下面的环节。另一方面,在用journal中记录进行重新演算时,就需要先把发生的事件还原成改变状态的命令,人为的免去副作用,因为它已经在正确的时间产生过了,然后只要更新数据库model状态就算完成了。所以,实现persistence包括object和model之间对应、state-objects维护方式以及command和event之间的转换。

首先分析一下command与event之间的转换:我们还是用上一篇的POS收银系统做示范。下面是几个收银操作指令:

  1. case class GetItemInfo(itemcode: String) extends Command
  2. case class AddItem(item: Item, qty: Int) extends Command
  3. case class AliPay(amount: Double) extends Command

上面三个典型command可以下面的方式转换成event:

GetItemInfo:这是一个查询商品资料的指令,不影响交易状态,不必转换

AddItem: 这个指令只影响交易状态,没有副作用,转换成 :ItemAdded(item: Item, qty: Int) extends Event

AliPay:改变交易状态并且产生副作用,因为要即时从支付宝扣款。做法:先确定支付成功,然后转成: AliPaid(amount  Double) extends Event

  1. case class ItemAdded(item: Item, qty: Int) extends Event
  2. case class AliPaid(amount: Double) extends Event

POS收银交易状态是一张未结算账单内容,是个简单的交易记录清单SalesMemo:

  1. //交易记录
  2. case class TxnItem(
  3. num: Int //销售单号
  4. ,seq: Int //交易序号
  5. ,txntype: Int //交易类型编号
  6. ,code: String //编号(商品、账号...)
  7. ,qty: Int //交易数量
  8. ,price: Int //单价(分)
  9. ,amount: Int //金额(分)
  10. )
  11. case class SalesMemo(salesnum: Int, txnitems: List[TxnItem] = Nil) {
  12. def itemAdded(evt: Event): SalesMemo = evt match {
  13. case ItemAdded(item,qty) =>
  14. copy(txnitems = TxnItem(salesnum, txnitems.length+1,0,item.code,qty,item.price,qty * item.price) :: txnitems)
  15. case _ => this
  16. }
  17. def aliPaid(evt: Event) = evt match {
  18. case AliPaid(amt) =>
  19. copy(txnitems = TxnItem(salesnum,txnitems.length+1,0,'ali',1,amt,amt) :: items)
  20. case _ => this
  21. }
  22. }

itemAdded,aliPaid这两个函数分别代表AddItem和AliPay对状态对象的转变处理。

上面提到persistenceActor存写journal时对事件发生的顺序有严格要求,否则无法实现读取端正确恢复原始状态。这项要求的实现是通过persist/persistAsync这两种函数来实现的。下面是这几类函数的款式:

  1. //无干扰存写,后面进来的消息先存放在内部的临时存放点 message-stashing
  2. def persist[A](event: A)(handler: A ⇒ Unit): Unit = {
  3. internalPersist(event)(handler)
  4. }
  5. //同时存写多个事件
  6. def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
  7. internalPersistAll(events)(handler)
  8. }
  9. //异步存写事件,没有临时存放点机制 no-message-stashing
  10. def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
  11. internalPersistAsync(event)(handler)
  12. }
  13. //异步存写多项事件
  14. def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
  15. internalPersistAllAsync(events)(handler)
  16. }
  17. //不存写事件,利用内部临时存放点机制来保证handler执行顺序
  18. def defer[A](event: A)(handler: A ⇒ Unit): Unit = {
  19. internalDefer(event)(handler)
  20. }
  21. //不存写事件,只保证handler运行顺序
  22. def deferAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
  23. internalDeferAsync(event)(handler)
  24. }

无论如何,handler函数都保证在事件存写动作成功后才能运行。我们用一些伪代码来示范有临存stash及无临存no-stash时handler运行的顺序:

  1. override def receiveCommand: Receive = {
  2. case c: String ⇒ {
  3. sender() ! c
  4. persist(s"evt-$c-1") { e ⇒ sender() ! e }
  5. persist(s"evt-$c-2") { e ⇒ sender() ! e }
  6. defer(s"evt-$c-3") { e ⇒ sender() ! e }
  7. }
  8. }
  9. //有内部临存 with message stashing
  10. persistentActor ! "a"
  11. persistentActor ! "b"
  12. // order of received messages:
  13. // a
  14. // evt-a-1
  15. // evt-a-2
  16. // evt-a-3
  17. // b
  18. // evt-b-1
  19. // evt-b-2
  20. // evt-b-3
  21. ----------------------------------
  22. override def receiveCommand: Receive = {
  23. case c: String ⇒ {
  24. sender() ! c
  25. persistAsync(s"evt-$c-1") { e ⇒ sender() ! e }
  26. persistAsync(s"evt-$c-2") { e ⇒ sender() ! e }
  27. deferAsync(s"evt-$c-3") { e ⇒ sender() ! e }
  28. }
  29. }
  30. persistentActor ! "a"
  31. persistentActor ! "b"
  32. // order of received messages:
  33. // a
  34. // b //无临存机制,外部信息立即处理了
  35. // evt-a-1
  36. // evt-a-2
  37. // evt-a-3
  38. // evt-b-1
  39. // evt-b-2
  40. // evt-b-3

如果发生内嵌多层persist时,正确的顺序如下:

  1. override def receiveCommand: Receive = {
  2. case c: String
  3. sender() ! c
  4. persist(s"$c-1-outer") { outer1
  5. sender() ! outer1
  6. persist(s"$c-1-inner") { inner1
  7. sender() ! inner1
  8. }
  9. }
  10. persist(s"$c-2-outer") { outer2
  11. sender() ! outer2
  12. persist(s"$c-2-inner") { inner2
  13. sender() ! inner2
  14. }
  15. }
  16. }
  17. persistentActor ! "a"
  18. persistentActor ! "b"
  19. // order of received messages:
  20. // a
  21. // a-outer-1
  22. // a-outer-2
  23. // a-inner-1
  24. // a-inner-2
  25. // and only then process "b"
  26. // b
  27. // b-outer-1
  28. // b-outer-2
  29. // b-inner-1
  30. // b-inner-2
  31. --------------------------------
  32. override def receiveCommand: Receive = {
  33. case c: String
  34. sender() ! c
  35. persistAsync(c + "-outer-1") { outer ⇒
  36. sender() ! outer
  37. persistAsync(c + "-inner-1") { inner ⇒ sender() ! inner }
  38. }
  39. persistAsync(c + "-outer-2") { outer ⇒
  40. sender() ! outer
  41. persistAsync(c + "-inner-2") { inner ⇒ sender() ! inner }
  42. }
  43. }
  44. persistentActor ! "a"
  45. persistentActor ! "b"
  46. // order of received messages:
  47. // a
  48. // b
  49. // a-outer-1
  50. // a-outer-2
  51. // b-outer-1
  52. // b-outer-2
  53. // a-inner-1
  54. // a-inner-2
  55. // b-inner-1
  56. // b-inner-2
  57. // which can be seen as the following causal relationship:
  58. // a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2
  59. // b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2

值得注意的是这个handler函数只会在事件存写成功后才运行,失败则否。也就是说确认了事件已经安全存写后才更新state-objects状态(model状态在CQRS读取时再相应更新)。针对上面的POS例子里可以用下面的代码处理方式:

  1. override def receiveCommand: Receive = {
  2. case AddItem(item,qty) =>
  3. persist(ItemAdded(item,qty))(salesMemo.itemAdded)
  4. case AliPay(amt) =>
  5. try {
  6. if (aliOnlinePay(amt)) //先产生副作用
  7. persist(AliPaid(amt))(salesMemo.alipaid(_))
  8. } catch {
  9. case _ > Throw new OnlinePayExecption("boom!!!")
  10. }
  11. ...

akka-persistence代表CQRS模式中以事件源方式存写数据的具体实现。我们提到过,数据存写具体做法是向一个journal里写入发生的改变状态目标state-objects的事件。每次PersistenceActor启动时都会从journal里读取之前写入的事件、还原成指令command、然后逐步把state-objects恢复到上次停止时的状态,不管是因异常还是正常停止的。这个恢复状态的过程是由PersistenceActor的receiveRecovery函数实现的,如下:

  1. override def receiveRecover: Receive = {
  2. case evt: Event =>
  3. salesMemo = salesMemo.updateMemo(evt)
  4. case SnapshotOffer(_,loggedItems: SalesMemo) =>
  5. salesMemo = loggedItems
  6. }

按理来说恢复状态即是把事件从头到尾再演算一遍。不过这种方式效率是个大问题,试想每次启动都需要先读取几十万条数据会是怎样的感受。效率问题的解决方法就是通过存写快照方式把之前的事件总结成快照snapshot形式的阶段状态,然后存入快照库(snapshot-store)。这样在PersistenceActor启动时先用最后一个快照把状态恢复到一个阶段,然后再读取快照产生之后的所有事件对阶段性状态再转换成最新状态。快照的读写函数如下:

  1. def saveSnapshot(snapshot: Any): Unit = {
  2. snapshotStore ! SaveSnapshot(SnapshotMetadata(snapshotterId, snapshotSequenceNr), snapshot)
  3. }
  4. /**
  5. * Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received
  6. * before any further replayed messages.
  7. */
  8. @SerialVersionUID(1L)
  9. final case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
  10. /**
  11. * Snapshot metadata.
  12. *
  13. * @param persistenceId id of persistent actor from which the snapshot was taken.
  14. * @param sequenceNr sequence number at which the snapshot was taken.
  15. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
  16. */
  17. @SerialVersionUID(1L) //#snapshot-metadata
  18. final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)
  19. //#snapshot-metadata

PersistenceActor里有receiveRecover和receiveCommand两个抽象函数,必须由用户提供具体的实现。这两个函数代表了PersistentActor的两大功能:状态复原和消息处理。状态复原是通过receiveRecover对snapshot-store和journal里的记录处理实现的。而PersistentActor的receiveCommand就是普通Actor的receive消息处理函数。用户可以通过PersistentActor提供的回调(callback)函数来进行事件读取过程前的事前准备和后面的事后处理。可以对这些callback函数进行重载(override)来自定义这些处理程序,如:

  1. /**
  2. * Called whenever a message replay fails. By default it logs the error.
  3. *
  4. * Subclass may override to customize logging.
  5. *
  6. * The actor is always stopped after this method has been invoked.
  7. *
  8. * @param cause failure cause.
  9. * @param event the event that was processed in `receiveRecover`, if the exception
  10. * was thrown there
  11. */
  12. protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit =
  13. ...
  14. /**
  15. * Called when persist fails. By default it logs the error.
  16. * Subclass may override to customize logging and for example send negative
  17. * acknowledgment to sender.
  18. *
  19. * The actor is always stopped after this method has been invoked.
  20. *
  21. * Note that the event may or may not have been saved, depending on the type of
  22. * failure.
  23. *
  24. * @param cause failure cause.
  25. * @param event the event that was to be persisted
  26. */
  27. protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
  28. ...
  29. /**
  30. * Called when the journal rejected `persist` of an event. The event was not
  31. * stored. By default this method logs the problem as a warning, and the actor continues.
  32. * The callback handler that was passed to the `persist` method will not be invoked.
  33. *
  34. * @param cause failure cause
  35. * @param event the event that was to be persisted
  36. */
  37. protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
  38. ...

也可以通过函数重载来自定义状态恢复行为:

  1. trait PersistenceRecovery {
  2. //#persistence-recovery
  3. /**
  4. * Called when the persistent actor is started for the first time.
  5. * The returned [[Recovery]] object defines how the Actor will recover its persistent state before
  6. * handling the first incoming message.
  7. *
  8. * To skip recovery completely return `Recovery.none`.
  9. */
  10. def recovery: Recovery = Recovery()
  11. //#persistence-recovery
  12. }

整个状态恢复过程是在EventSourced.scala里下面这个函数实现的:

  1. override def stateReceive(receive: Receive, message: Any) = try message match {
  2. case ReplayedMessage(p) ⇒
  3. try {
  4. eventSeenInInterval = true
  5. updateLastSequenceNr(p)
  6. Eventsourced.super.aroundReceive(recoveryBehavior, p)
  7. } catch {
  8. case NonFatal(t) ⇒
  9. timeoutCancellable.cancel()
  10. try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self)
  11. returnRecoveryPermit()
  12. }
  13. case RecoverySuccess(highestSeqNr) ⇒
  14. timeoutCancellable.cancel()
  15. onReplaySuccess() // callback for subclass implementation
  16. sequenceNr = highestSeqNr
  17. setLastSequenceNr(highestSeqNr)
  18. _recoveryRunning = false
  19. try Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
  20. finally transitToProcessingState()
  21. case ReplayMessagesFailure(cause) ⇒
  22. timeoutCancellable.cancel()
  23. try onRecoveryFailure(cause, event = None) finally context.stop(self)
  24. case RecoveryTick(false) if !eventSeenInInterval ⇒
  25. timeoutCancellable.cancel()
  26. try onRecoveryFailure(
  27. new RecoveryTimedOut(s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $lastSequenceNr"),
  28. event = None)
  29. finally context.stop(self)
  30. case RecoveryTick(false) ⇒
  31. eventSeenInInterval = false
  32. case RecoveryTick(true) ⇒
  33. // snapshot tick, ignore
  34. case other ⇒
  35. stashInternally(other)
  36. } catch {
  37. case NonFatal(e) ⇒
  38. returnRecoveryPermit()
  39. throw e
  40. }

函数通过super.aroundReceive把消息传给了receiveRecovery:

  1. /**
  2. * INTERNAL API.
  3. *
  4. * Can be overridden to intercept calls to this actor's current behavior.
  5. *
  6. * @param receive current behavior.
  7. * @param msg current message.
  8. */
  9. protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
  10. // optimization: avoid allocation of lambda
  11. if (receive.applyOrElse(msg, Actor.notHandledFun).asInstanceOf[AnyRef] eq Actor.NotHandled) {
  12. unhandled(msg)
  13. }
  14. }

因为EventSourced继承了PersistenceRecovery trait,所以重载recovery函数可以改变状态恢复行为。默认的模式是:

  1. /**
  2. * Recovery mode configuration object to be returned in [[PersistentActor#recovery]].
  3. *
  4. * By default recovers from latest snapshot replays through to the last available event (last sequenceId).
  5. *
  6. * Recovery will start from a snapshot if the persistent actor has previously saved one or more snapshots
  7. * and at least one of these snapshots matches the specified `fromSnapshot` criteria.
  8. * Otherwise, recovery will start from scratch by replaying all stored events.
  9. *
  10. * If recovery starts from a snapshot, the persistent actor is offered that snapshot with a [[SnapshotOffer]]
  11. * message, followed by replayed messages, if any, that are younger than the snapshot, up to the
  12. * specified upper sequence number bound (`toSequenceNr`).
  13. *
  14. * @param fromSnapshot criteria for selecting a saved snapshot from which recovery should start. Default
  15. * is latest (= youngest) snapshot.
  16. * @param toSequenceNr upper sequence number bound (inclusive) for recovery. Default is no upper bound.
  17. * @param replayMax maximum number of messages to replay. Default is no limit.
  18. */
  19. @SerialVersionUID(1L)
  20. final case class Recovery(
  21. fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,
  22. toSequenceNr: Long = Long.MaxValue,
  23. replayMax: Long = Long.MaxValue)

下面是状态恢复过程中产生的消息:

  1. /**
  2. * Sent to a [[PersistentActor]] when the journal replay has been finished.
  3. */
  4. @SerialVersionUID(1L)
  5. case object RecoveryCompleted extends RecoveryCompleted {
  6. ...
  7. final class RecoveryTimedOut(message: String) extends RuntimeException(message) with NoStackTrace

状态恢复的进程可用用下面的方法检测:

  1. /**
  2. * Returns `true` if this persistent actor is currently recovering.
  3. */
  4. def recoveryRunning: Boolean = {
  5. // currentState is null if this is called from constructor
  6. if (currentState == null) true else currentState.recoveryRunning
  7. }
  8. /**
  9. * Returns `true` if this persistent actor has successfully finished recovery.
  10. */
  11. def recoveryFinished: Boolean = !recoveryRunning

用户也可以删除journal里的事件。虽然应该作为原始资料完整保存不应该鼓励这么做: 

  1. /**
  2. * Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`.
  3. *
  4. * If the delete is successful a [[DeleteMessagesSuccess]] will be sent to the actor.
  5. * If the delete fails a [[DeleteMessagesFailure]] will be sent to the actor.
  6. *
  7. * @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
  8. */
  9. def deleteMessages(toSequenceNr: Long): Unit =
  10. journal ! DeleteMessagesTo(persistenceId, toSequenceNr, self)

删除事件范围是用SequenceNr来代表的,下面是一些可用的序号:

  1. /**
  2. * Returns `persistenceId`.
  3. */
  4. override def snapshotterId: String = persistenceId
  5. /**
  6. * Highest received sequence number so far or `0L` if this actor hasn't replayed
  7. * or stored any persistent events yet.
  8. */
  9. def lastSequenceNr: Long = _lastSequenceNr
  10. /**
  11. * Returns `lastSequenceNr`.
  12. */
  13. def snapshotSequenceNr: Long = lastSequenceNr

事件删除过程可用下面的消息监控:

  1. /**
  2. * Reply message to a successful [[Eventsourced#deleteMessages]] request.
  3. */
  4. final case class DeleteMessagesSuccess(toSequenceNr: Long)
  5. /**
  6. * Reply message to a failed [[Eventsourced#deleteMessages]] request.
  7. */
  8. final case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long)

下面是一些快照的持久化维护方法:

  1. /**
  2. * Snapshotter id.
  3. */
  4. def snapshotterId: String
  5. /**
  6. * Sequence number to use when taking a snapshot.
  7. */
  8. def snapshotSequenceNr: Long
  9. /**
  10. * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]]
  11. * to the running [[PersistentActor]].
  12. */
  13. def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) =
  14. snapshotStore ! LoadSnapshot(persistenceId, criteria, toSequenceNr)
  15. /**
  16. * Saves a `snapshot` of this snapshotter's state.
  17. *
  18. * The [[PersistentActor]] will be notified about the success or failure of this
  19. * via an [[SaveSnapshotSuccess]] or [[SaveSnapshotFailure]] message.
  20. */
  21. def saveSnapshot(snapshot: Any): Unit = {
  22. snapshotStore ! SaveSnapshot(SnapshotMetadata(snapshotterId, snapshotSequenceNr), snapshot)
  23. }
  24. /**
  25. * Deletes the snapshot identified by `sequenceNr`.
  26. *
  27. * The [[PersistentActor]] will be notified about the status of the deletion
  28. * via an [[DeleteSnapshotSuccess]] or [[DeleteSnapshotFailure]] message.
  29. */
  30. def deleteSnapshot(sequenceNr: Long): Unit = {
  31. snapshotStore ! DeleteSnapshot(SnapshotMetadata(snapshotterId, sequenceNr))
  32. }
  33. /**
  34. * Deletes all snapshots matching `criteria`.
  35. *
  36. * The [[PersistentActor]] will be notified about the status of the deletion
  37. * via an [[DeleteSnapshotsSuccess]] or [[DeleteSnapshotsFailure]] message.
  38. */
  39. def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit = {
  40. snapshotStore ! DeleteSnapshots(snapshotterId, criteria)
  41. }

快照维护数据结构和消息如下:

  1. /**
  2. * Snapshot metadata.
  3. *
  4. * @param persistenceId id of persistent actor from which the snapshot was taken.
  5. * @param sequenceNr sequence number at which the snapshot was taken.
  6. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
  7. */
  8. @SerialVersionUID(1L) //#snapshot-metadata
  9. final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)
  10. //#snapshot-metadata
  11. object SnapshotMetadata {
  12. implicit val ordering: Ordering[SnapshotMetadata] = Ordering.fromLessThan[SnapshotMetadata] { (a, b) ⇒
  13. if (a eq b) false
  14. else if (a.persistenceId != b.persistenceId) a.persistenceId.compareTo(b.persistenceId) < 0
  15. else if (a.sequenceNr != b.sequenceNr) a.sequenceNr < b.sequenceNr
  16. else if (a.timestamp != b.timestamp) a.timestamp < b.timestamp
  17. else false
  18. }
  19. }
  20. /**
  21. * Sent to a [[PersistentActor]] after successful saving of a snapshot.
  22. *
  23. * @param metadata snapshot metadata.
  24. */
  25. @SerialVersionUID(1L)
  26. final case class SaveSnapshotSuccess(metadata: SnapshotMetadata)
  27. extends SnapshotProtocol.Response
  28. /**
  29. * Sent to a [[PersistentActor]] after successful deletion of a snapshot.
  30. *
  31. * @param metadata snapshot metadata.
  32. */
  33. @SerialVersionUID(1L)
  34. final case class DeleteSnapshotSuccess(metadata: SnapshotMetadata)
  35. extends SnapshotProtocol.Response
  36. /**
  37. * Sent to a [[PersistentActor]] after successful deletion of specified range of snapshots.
  38. *
  39. * @param criteria snapshot selection criteria.
  40. */
  41. @SerialVersionUID(1L)
  42. final case class DeleteSnapshotsSuccess(criteria: SnapshotSelectionCriteria)
  43. extends SnapshotProtocol.Response
  44. /**
  45. * Sent to a [[PersistentActor]] after failed saving of a snapshot.
  46. *
  47. * @param metadata snapshot metadata.
  48. * @param cause failure cause.
  49. */
  50. @SerialVersionUID(1L)
  51. final case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
  52. extends SnapshotProtocol.Response
  53. /**
  54. * Sent to a [[PersistentActor]] after failed deletion of a snapshot.
  55. *
  56. * @param metadata snapshot metadata.
  57. * @param cause failure cause.
  58. */
  59. @SerialVersionUID(1L)
  60. final case class DeleteSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
  61. extends SnapshotProtocol.Response
  62. /**
  63. * Sent to a [[PersistentActor]] after failed deletion of a range of snapshots.
  64. *
  65. * @param criteria snapshot selection criteria.
  66. * @param cause failure cause.
  67. */
  68. @SerialVersionUID(1L)
  69. final case class DeleteSnapshotsFailure(criteria: SnapshotSelectionCriteria, cause: Throwable)
  70. extends SnapshotProtocol.Response
  71. /**
  72. * Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received
  73. * before any further replayed messages.
  74. */
  75. @SerialVersionUID(1L)
  76. final case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
  77. /**
  78. * Selection criteria for loading and deleting snapshots.
  79. *
  80. * @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound,
  81. * i.e. `Long.MaxValue`
  82. * @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound,
  83. * i.e. `Long.MaxValue`
  84. * @param minSequenceNr lower bound for a selected snapshot's sequence number. Default is no lower bound,
  85. * i.e. `0L`
  86. * @param minTimestamp lower bound for a selected snapshot's timestamp. Default is no lower bound,
  87. * i.e. `0L`
  88. *
  89. * @see [[Recovery]]
  90. */
  91. @SerialVersionUID(1L)
  92. final case class SnapshotSelectionCriteria(
  93. maxSequenceNr: Long = Long.MaxValue,
  94. maxTimestamp: Long = Long.MaxValue,
  95. minSequenceNr: Long = 0L,
  96. minTimestamp: Long = 0L) {
  97. /**
  98. * INTERNAL API.
  99. */
  100. private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria =
  101. if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this
  102. /**
  103. * INTERNAL API.
  104. */
  105. private[persistence] def matches(metadata: SnapshotMetadata): Boolean =
  106. metadata.sequenceNr <= maxSequenceNr && metadata.timestamp <= maxTimestamp &&
  107. metadata.sequenceNr >= minSequenceNr && metadata.timestamp >= minTimestamp
  108. }
  109. object SnapshotSelectionCriteria {
  110. /**
  111. * The latest saved snapshot.
  112. */
  113. val Latest = SnapshotSelectionCriteria()
  114. /**
  115. * No saved snapshot matches.
  116. */
  117. val None = SnapshotSelectionCriteria(0L, 0L)
  118. /**
  119. * Java API.
  120. */
  121. def create(maxSequenceNr: Long, maxTimestamp: Long) =
  122. SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp)
  123. /**
  124. * Java API.
  125. */
  126. def create(maxSequenceNr: Long, maxTimestamp: Long,
  127. minSequenceNr: Long, minTimestamp: Long) =
  128. SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp, minSequenceNr, minTimestamp)
  129. /**
  130. * Java API.
  131. */
  132. def latest() = Latest
  133. /**
  134. * Java API.
  135. */
  136. def none() = None
  137. }
  138. /**
  139. * Plugin API: a selected snapshot matching [[SnapshotSelectionCriteria]].
  140. *
  141. * @param metadata snapshot metadata.
  142. * @param snapshot snapshot.
  143. */
  144. final case class SelectedSnapshot(metadata: SnapshotMetadata, snapshot: Any)
  145. object SelectedSnapshot {
  146. /**
  147. * Java API, Plugin API.
  148. */
  149. def create(metadata: SnapshotMetadata, snapshot: Any): SelectedSnapshot =
  150. SelectedSnapshot(metadata, snapshot)
  151. }
  152. /**
  153. * INTERNAL API.
  154. *
  155. * Defines messages exchanged between persistent actors and a snapshot store.
  156. */
  157. private[persistence] object SnapshotProtocol {
  158. /** Marker trait shared by internal snapshot messages. */
  159. sealed trait Message extends Protocol.Message
  160. /** Internal snapshot command. */
  161. sealed trait Request extends Message
  162. /** Internal snapshot acknowledgement. */
  163. sealed trait Response extends Message
  164. /**
  165. * Instructs a snapshot store to load a snapshot.
  166. *
  167. * @param persistenceId persistent actor id.
  168. * @param criteria criteria for selecting a snapshot from which recovery should start.
  169. * @param toSequenceNr upper sequence number bound (inclusive) for recovery.
  170. */
  171. final case class LoadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long)
  172. extends Request
  173. /**
  174. * Response message to a [[LoadSnapshot]] message.
  175. *
  176. * @param snapshot loaded snapshot, if any.
  177. */
  178. final case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long)
  179. extends Response
  180. /**
  181. * Reply message to a failed [[LoadSnapshot]] request.
  182. * @param cause failure cause.
  183. */
  184. final case class LoadSnapshotFailed(cause: Throwable) extends Response
  185. /**
  186. * Instructs snapshot store to save a snapshot.
  187. *
  188. * @param metadata snapshot metadata.
  189. * @param snapshot snapshot.
  190. */
  191. final case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any)
  192. extends Request
  193. /**
  194. * Instructs snapshot store to delete a snapshot.
  195. *
  196. * @param metadata snapshot metadata.
  197. */
  198. final case class DeleteSnapshot(metadata: SnapshotMetadata)
  199. extends Request
  200. /**
  201. * Instructs snapshot store to delete all snapshots that match `criteria`.
  202. *
  203. * @param persistenceId persistent actor id.
  204. * @param criteria criteria for selecting snapshots to be deleted.
  205. */
  206. final case class DeleteSnapshots(persistenceId: String, criteria: SnapshotSelectionCriteria)
  207. extends Request
  208. }

篇幅所限,我们将在下一篇用一个具体的应用例子来进行akka-CQRS写端示范。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/179712?site
推荐阅读
相关标签
  

闽ICP备14008679号