当前位置:   article > 正文

Akka-CQRS(1)- Write-side, Persisting event sources:CQRS存写端操作方式_akka persistasync

akka persistasync




  1. case class GetItemInfo(itemcode: String) extends Command
  2. case class AddItem(item: Item, qty: Int) extends Command
  3. case class AliPay(amount: Double) extends Command



AddItem: 这个指令只影响交易状态,没有副作用,转换成 :ItemAdded(item: Item, qty: Int) extends Event

AliPay:改变交易状态并且产生副作用,因为要即时从支付宝扣款。做法:先确定支付成功,然后转成: AliPaid(amount  Double) extends Event

  1. case class ItemAdded(item: Item, qty: Int) extends Event
  2. case class AliPaid(amount: Double) extends Event


  1. //交易记录
  2. case class TxnItem(
  3. num: Int //销售单号
  4. ,seq: Int //交易序号
  5. ,txntype: Int //交易类型编号
  6. ,code: String //编号(商品、账号...)
  7. ,qty: Int //交易数量
  8. ,price: Int //单价(分)
  9. ,amount: Int //金额(分)
  10. )
  11. case class SalesMemo(salesnum: Int, txnitems: List[TxnItem] = Nil) {
  12. def itemAdded(evt: Event): SalesMemo = evt match {
  13. case ItemAdded(item,qty) =>
  14. copy(txnitems = TxnItem(salesnum, txnitems.length+1,0,item.code,qty,item.price,qty * item.price) :: txnitems)
  15. case _ => this
  16. }
  17. def aliPaid(evt: Event) = evt match {
  18. case AliPaid(amt) =>
  19. copy(txnitems = TxnItem(salesnum,txnitems.length+1,0,'ali',1,amt,amt) :: items)
  20. case _ => this
  21. }
  22. }



  1. //无干扰存写,后面进来的消息先存放在内部的临时存放点 message-stashing
  2. def persist[A](event: A)(handler: A ⇒ Unit): Unit = {
  3. internalPersist(event)(handler)
  4. }
  5. //同时存写多个事件
  6. def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
  7. internalPersistAll(events)(handler)
  8. }
  9. //异步存写事件,没有临时存放点机制 no-message-stashing
  10. def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
  11. internalPersistAsync(event)(handler)
  12. }
  13. //异步存写多项事件
  14. def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
  15. internalPersistAllAsync(events)(handler)
  16. }
  17. //不存写事件,利用内部临时存放点机制来保证handler执行顺序
  18. def defer[A](event: A)(handler: A ⇒ Unit): Unit = {
  19. internalDefer(event)(handler)
  20. }
  21. //不存写事件,只保证handler运行顺序
  22. def deferAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
  23. internalDeferAsync(event)(handler)
  24. }


  1. override def receiveCommand: Receive = {
  2. case c: String ⇒ {
  3. sender() ! c
  4. persist(s"evt-$c-1") { e ⇒ sender() ! e }
  5. persist(s"evt-$c-2") { e ⇒ sender() ! e }
  6. defer(s"evt-$c-3") { e ⇒ sender() ! e }
  7. }
  8. }
  9. //有内部临存 with message stashing
  10. persistentActor ! "a"
  11. persistentActor ! "b"
  12. // order of received messages:
  13. // a
  14. // evt-a-1
  15. // evt-a-2
  16. // evt-a-3
  17. // b
  18. // evt-b-1
  19. // evt-b-2
  20. // evt-b-3
  21. ----------------------------------
  22. override def receiveCommand: Receive = {
  23. case c: String ⇒ {
  24. sender() ! c
  25. persistAsync(s"evt-$c-1") { e ⇒ sender() ! e }
  26. persistAsync(s"evt-$c-2") { e ⇒ sender() ! e }
  27. deferAsync(s"evt-$c-3") { e ⇒ sender() ! e }
  28. }
  29. }
  30. persistentActor ! "a"
  31. persistentActor ! "b"
  32. // order of received messages:
  33. // a
  34. // b //无临存机制,外部信息立即处理了
  35. // evt-a-1
  36. // evt-a-2
  37. // evt-a-3
  38. // evt-b-1
  39. // evt-b-2
  40. // evt-b-3


  1. override def receiveCommand: Receive = {
  2. case c: String
  3. sender() ! c
  4. persist(s"$c-1-outer") { outer1
  5. sender() ! outer1
  6. persist(s"$c-1-inner") { inner1
  7. sender() ! inner1
  8. }
  9. }
  10. persist(s"$c-2-outer") { outer2
  11. sender() ! outer2
  12. persist(s"$c-2-inner") { inner2
  13. sender() ! inner2
  14. }
  15. }
  16. }
  17. persistentActor ! "a"
  18. persistentActor ! "b"
  19. // order of received messages:
  20. // a
  21. // a-outer-1
  22. // a-outer-2
  23. // a-inner-1
  24. // a-inner-2
  25. // and only then process "b"
  26. // b
  27. // b-outer-1
  28. // b-outer-2
  29. // b-inner-1
  30. // b-inner-2
  31. --------------------------------
  32. override def receiveCommand: Receive = {
  33. case c: String
  34. sender() ! c
  35. persistAsync(c + "-outer-1") { outer ⇒
  36. sender() ! outer
  37. persistAsync(c + "-inner-1") { inner ⇒ sender() ! inner }
  38. }
  39. persistAsync(c + "-outer-2") { outer ⇒
  40. sender() ! outer
  41. persistAsync(c + "-inner-2") { inner ⇒ sender() ! inner }
  42. }
  43. }
  44. persistentActor ! "a"
  45. persistentActor ! "b"
  46. // order of received messages:
  47. // a
  48. // b
  49. // a-outer-1
  50. // a-outer-2
  51. // b-outer-1
  52. // b-outer-2
  53. // a-inner-1
  54. // a-inner-2
  55. // b-inner-1
  56. // b-inner-2
  57. // which can be seen as the following causal relationship:
  58. // a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2
  59. // b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2


  1. override def receiveCommand: Receive = {
  2. case AddItem(item,qty) =>
  3. persist(ItemAdded(item,qty))(salesMemo.itemAdded)
  4. case AliPay(amt) =>
  5. try {
  6. if (aliOnlinePay(amt)) //先产生副作用
  7. persist(AliPaid(amt))(salesMemo.alipaid(_))
  8. } catch {
  9. case _ > Throw new OnlinePayExecption("boom!!!")
  10. }
  11. ...


  1. override def receiveRecover: Receive = {
  2. case evt: Event =>
  3. salesMemo = salesMemo.updateMemo(evt)
  4. case SnapshotOffer(_,loggedItems: SalesMemo) =>
  5. salesMemo = loggedItems
  6. }


  1. def saveSnapshot(snapshot: Any): Unit = {
  2. snapshotStore ! SaveSnapshot(SnapshotMetadata(snapshotterId, snapshotSequenceNr), snapshot)
  3. }
  4. /**
  5. * Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received
  6. * before any further replayed messages.
  7. */
  8. @SerialVersionUID(1L)
  9. final case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
  10. /**
  11. * Snapshot metadata.
  12. *
  13. * @param persistenceId id of persistent actor from which the snapshot was taken.
  14. * @param sequenceNr sequence number at which the snapshot was taken.
  15. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
  16. */
  17. @SerialVersionUID(1L) //#snapshot-metadata
  18. final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)
  19. //#snapshot-metadata


  1. /**
  2. * Called whenever a message replay fails. By default it logs the error.
  3. *
  4. * Subclass may override to customize logging.
  5. *
  6. * The actor is always stopped after this method has been invoked.
  7. *
  8. * @param cause failure cause.
  9. * @param event the event that was processed in `receiveRecover`, if the exception
  10. * was thrown there
  11. */
  12. protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit =
  13. ...
  14. /**
  15. * Called when persist fails. By default it logs the error.
  16. * Subclass may override to customize logging and for example send negative
  17. * acknowledgment to sender.
  18. *
  19. * The actor is always stopped after this method has been invoked.
  20. *
  21. * Note that the event may or may not have been saved, depending on the type of
  22. * failure.
  23. *
  24. * @param cause failure cause.
  25. * @param event the event that was to be persisted
  26. */
  27. protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
  28. ...
  29. /**
  30. * Called when the journal rejected `persist` of an event. The event was not
  31. * stored. By default this method logs the problem as a warning, and the actor continues.
  32. * The callback handler that was passed to the `persist` method will not be invoked.
  33. *
  34. * @param cause failure cause
  35. * @param event the event that was to be persisted
  36. */
  37. protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
  38. ...


  1. trait PersistenceRecovery {
  2. //#persistence-recovery
  3. /**
  4. * Called when the persistent actor is started for the first time.
  5. * The returned [[Recovery]] object defines how the Actor will recover its persistent state before
  6. * handling the first incoming message.
  7. *
  8. * To skip recovery completely return `Recovery.none`.
  9. */
  10. def recovery: Recovery = Recovery()
  11. //#persistence-recovery
  12. }


  1. override def stateReceive(receive: Receive, message: Any) = try message match {
  2. case ReplayedMessage(p) ⇒
  3. try {
  4. eventSeenInInterval = true
  5. updateLastSequenceNr(p)
  6. Eventsourced.super.aroundReceive(recoveryBehavior, p)
  7. } catch {
  8. case NonFatal(t) ⇒
  9. timeoutCancellable.cancel()
  10. try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self)
  11. returnRecoveryPermit()
  12. }
  13. case RecoverySuccess(highestSeqNr) ⇒
  14. timeoutCancellable.cancel()
  15. onReplaySuccess() // callback for subclass implementation
  16. sequenceNr = highestSeqNr
  17. setLastSequenceNr(highestSeqNr)
  18. _recoveryRunning = false
  19. try Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
  20. finally transitToProcessingState()
  21. case ReplayMessagesFailure(cause) ⇒
  22. timeoutCancellable.cancel()
  23. try onRecoveryFailure(cause, event = None) finally context.stop(self)
  24. case RecoveryTick(false) if !eventSeenInInterval ⇒
  25. timeoutCancellable.cancel()
  26. try onRecoveryFailure(
  27. new RecoveryTimedOut(s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $lastSequenceNr"),
  28. event = None)
  29. finally context.stop(self)
  30. case RecoveryTick(false) ⇒
  31. eventSeenInInterval = false
  32. case RecoveryTick(true) ⇒
  33. // snapshot tick, ignore
  34. case other ⇒
  35. stashInternally(other)
  36. } catch {
  37. case NonFatal(e) ⇒
  38. returnRecoveryPermit()
  39. throw e
  40. }


  1. /**
  3. *
  4. * Can be overridden to intercept calls to this actor's current behavior.
  5. *
  6. * @param receive current behavior.
  7. * @param msg current message.
  8. */
  9. protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
  10. // optimization: avoid allocation of lambda
  11. if (receive.applyOrElse(msg, Actor.notHandledFun).asInstanceOf[AnyRef] eq Actor.NotHandled) {
  12. unhandled(msg)
  13. }
  14. }

因为EventSourced继承了PersistenceRecovery trait,所以重载recovery函数可以改变状态恢复行为。默认的模式是:

  1. /**
  2. * Recovery mode configuration object to be returned in [[PersistentActor#recovery]].
  3. *
  4. * By default recovers from latest snapshot replays through to the last available event (last sequenceId).
  5. *
  6. * Recovery will start from a snapshot if the persistent actor has previously saved one or more snapshots
  7. * and at least one of these snapshots matches the specified `fromSnapshot` criteria.
  8. * Otherwise, recovery will start from scratch by replaying all stored events.
  9. *
  10. * If recovery starts from a snapshot, the persistent actor is offered that snapshot with a [[SnapshotOffer]]
  11. * message, followed by replayed messages, if any, that are younger than the snapshot, up to the
  12. * specified upper sequence number bound (`toSequenceNr`).
  13. *
  14. * @param fromSnapshot criteria for selecting a saved snapshot from which recovery should start. Default
  15. * is latest (= youngest) snapshot.
  16. * @param toSequenceNr upper sequence number bound (inclusive) for recovery. Default is no upper bound.
  17. * @param replayMax maximum number of messages to replay. Default is no limit.
  18. */
  19. @SerialVersionUID(1L)
  20. final case class Recovery(
  21. fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,
  22. toSequenceNr: Long = Long.MaxValue,
  23. replayMax: Long = Long.MaxValue)


  1. /**
  2. * Sent to a [[PersistentActor]] when the journal replay has been finished.
  3. */
  4. @SerialVersionUID(1L)
  5. case object RecoveryCompleted extends RecoveryCompleted {
  6. ...
  7. final class RecoveryTimedOut(message: String) extends RuntimeException(message) with NoStackTrace


  1. /**
  2. * Returns `true` if this persistent actor is currently recovering.
  3. */
  4. def recoveryRunning: Boolean = {
  5. // currentState is null if this is called from constructor
  6. if (currentState == null) true else currentState.recoveryRunning
  7. }
  8. /**
  9. * Returns `true` if this persistent actor has successfully finished recovery.
  10. */
  11. def recoveryFinished: Boolean = !recoveryRunning


  1. /**
  2. * Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`.
  3. *
  4. * If the delete is successful a [[DeleteMessagesSuccess]] will be sent to the actor.
  5. * If the delete fails a [[DeleteMessagesFailure]] will be sent to the actor.
  6. *
  7. * @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
  8. */
  9. def deleteMessages(toSequenceNr: Long): Unit =
  10. journal ! DeleteMessagesTo(persistenceId, toSequenceNr, self)


  1. /**
  2. * Returns `persistenceId`.
  3. */
  4. override def snapshotterId: String = persistenceId
  5. /**
  6. * Highest received sequence number so far or `0L` if this actor hasn't replayed
  7. * or stored any persistent events yet.
  8. */
  9. def lastSequenceNr: Long = _lastSequenceNr
  10. /**
  11. * Returns `lastSequenceNr`.
  12. */
  13. def snapshotSequenceNr: Long = lastSequenceNr


  1. /**
  2. * Reply message to a successful [[Eventsourced#deleteMessages]] request.
  3. */
  4. final case class DeleteMessagesSuccess(toSequenceNr: Long)
  5. /**
  6. * Reply message to a failed [[Eventsourced#deleteMessages]] request.
  7. */
  8. final case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long)


  1. /**
  2. * Snapshotter id.
  3. */
  4. def snapshotterId: String
  5. /**
  6. * Sequence number to use when taking a snapshot.
  7. */
  8. def snapshotSequenceNr: Long
  9. /**
  10. * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]]
  11. * to the running [[PersistentActor]].
  12. */
  13. def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) =
  14. snapshotStore ! LoadSnapshot(persistenceId, criteria, toSequenceNr)
  15. /**
  16. * Saves a `snapshot` of this snapshotter's state.
  17. *
  18. * The [[PersistentActor]] will be notified about the success or failure of this
  19. * via an [[SaveSnapshotSuccess]] or [[SaveSnapshotFailure]] message.
  20. */
  21. def saveSnapshot(snapshot: Any): Unit = {
  22. snapshotStore ! SaveSnapshot(SnapshotMetadata(snapshotterId, snapshotSequenceNr), snapshot)
  23. }
  24. /**
  25. * Deletes the snapshot identified by `sequenceNr`.
  26. *
  27. * The [[PersistentActor]] will be notified about the status of the deletion
  28. * via an [[DeleteSnapshotSuccess]] or [[DeleteSnapshotFailure]] message.
  29. */
  30. def deleteSnapshot(sequenceNr: Long): Unit = {
  31. snapshotStore ! DeleteSnapshot(SnapshotMetadata(snapshotterId, sequenceNr))
  32. }
  33. /**
  34. * Deletes all snapshots matching `criteria`.
  35. *
  36. * The [[PersistentActor]] will be notified about the status of the deletion
  37. * via an [[DeleteSnapshotsSuccess]] or [[DeleteSnapshotsFailure]] message.
  38. */
  39. def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit = {
  40. snapshotStore ! DeleteSnapshots(snapshotterId, criteria)
  41. }


  1. /**
  2. * Snapshot metadata.
  3. *
  4. * @param persistenceId id of persistent actor from which the snapshot was taken.
  5. * @param sequenceNr sequence number at which the snapshot was taken.
  6. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
  7. */
  8. @SerialVersionUID(1L) //#snapshot-metadata
  9. final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)
  10. //#snapshot-metadata
  11. object SnapshotMetadata {
  12. implicit val ordering: Ordering[SnapshotMetadata] = Ordering.fromLessThan[SnapshotMetadata] { (a, b) ⇒
  13. if (a eq b) false
  14. else if (a.persistenceId != b.persistenceId) a.persistenceId.compareTo(b.persistenceId) < 0
  15. else if (a.sequenceNr != b.sequenceNr) a.sequenceNr < b.sequenceNr
  16. else if (a.timestamp != b.timestamp) a.timestamp < b.timestamp
  17. else false
  18. }
  19. }
  20. /**
  21. * Sent to a [[PersistentActor]] after successful saving of a snapshot.
  22. *
  23. * @param metadata snapshot metadata.
  24. */
  25. @SerialVersionUID(1L)
  26. final case class SaveSnapshotSuccess(metadata: SnapshotMetadata)
  27. extends SnapshotProtocol.Response
  28. /**
  29. * Sent to a [[PersistentActor]] after successful deletion of a snapshot.
  30. *
  31. * @param metadata snapshot metadata.
  32. */
  33. @SerialVersionUID(1L)
  34. final case class DeleteSnapshotSuccess(metadata: SnapshotMetadata)
  35. extends SnapshotProtocol.Response
  36. /**
  37. * Sent to a [[PersistentActor]] after successful deletion of specified range of snapshots.
  38. *
  39. * @param criteria snapshot selection criteria.
  40. */
  41. @SerialVersionUID(1L)
  42. final case class DeleteSnapshotsSuccess(criteria: SnapshotSelectionCriteria)
  43. extends SnapshotProtocol.Response
  44. /**
  45. * Sent to a [[PersistentActor]] after failed saving of a snapshot.
  46. *
  47. * @param metadata snapshot metadata.
  48. * @param cause failure cause.
  49. */
  50. @SerialVersionUID(1L)
  51. final case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
  52. extends SnapshotProtocol.Response
  53. /**
  54. * Sent to a [[PersistentActor]] after failed deletion of a snapshot.
  55. *
  56. * @param metadata snapshot metadata.
  57. * @param cause failure cause.
  58. */
  59. @SerialVersionUID(1L)
  60. final case class DeleteSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
  61. extends SnapshotProtocol.Response
  62. /**
  63. * Sent to a [[PersistentActor]] after failed deletion of a range of snapshots.
  64. *
  65. * @param criteria snapshot selection criteria.
  66. * @param cause failure cause.
  67. */
  68. @SerialVersionUID(1L)
  69. final case class DeleteSnapshotsFailure(criteria: SnapshotSelectionCriteria, cause: Throwable)
  70. extends SnapshotProtocol.Response
  71. /**
  72. * Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received
  73. * before any further replayed messages.
  74. */
  75. @SerialVersionUID(1L)
  76. final case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
  77. /**
  78. * Selection criteria for loading and deleting snapshots.
  79. *
  80. * @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound,
  81. * i.e. `Long.MaxValue`
  82. * @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound,
  83. * i.e. `Long.MaxValue`
  84. * @param minSequenceNr lower bound for a selected snapshot's sequence number. Default is no lower bound,
  85. * i.e. `0L`
  86. * @param minTimestamp lower bound for a selected snapshot's timestamp. Default is no lower bound,
  87. * i.e. `0L`
  88. *
  89. * @see [[Recovery]]
  90. */
  91. @SerialVersionUID(1L)
  92. final case class SnapshotSelectionCriteria(
  93. maxSequenceNr: Long = Long.MaxValue,
  94. maxTimestamp: Long = Long.MaxValue,
  95. minSequenceNr: Long = 0L,
  96. minTimestamp: Long = 0L) {
  97. /**
  99. */
  100. private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria =
  101. if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this
  102. /**
  103. * INTERNAL API.
  104. */
  105. private[persistence] def matches(metadata: SnapshotMetadata): Boolean =
  106. metadata.sequenceNr <= maxSequenceNr && metadata.timestamp <= maxTimestamp &&
  107. metadata.sequenceNr >= minSequenceNr && metadata.timestamp >= minTimestamp
  108. }
  109. object SnapshotSelectionCriteria {
  110. /**
  111. * The latest saved snapshot.
  112. */
  113. val Latest = SnapshotSelectionCriteria()
  114. /**
  115. * No saved snapshot matches.
  116. */
  117. val None = SnapshotSelectionCriteria(0L, 0L)
  118. /**
  119. * Java API.
  120. */
  121. def create(maxSequenceNr: Long, maxTimestamp: Long) =
  122. SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp)
  123. /**
  124. * Java API.
  125. */
  126. def create(maxSequenceNr: Long, maxTimestamp: Long,
  127. minSequenceNr: Long, minTimestamp: Long) =
  128. SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp, minSequenceNr, minTimestamp)
  129. /**
  130. * Java API.
  131. */
  132. def latest() = Latest
  133. /**
  134. * Java API.
  135. */
  136. def none() = None
  137. }
  138. /**
  139. * Plugin API: a selected snapshot matching [[SnapshotSelectionCriteria]].
  140. *
  141. * @param metadata snapshot metadata.
  142. * @param snapshot snapshot.
  143. */
  144. final case class SelectedSnapshot(metadata: SnapshotMetadata, snapshot: Any)
  145. object SelectedSnapshot {
  146. /**
  147. * Java API, Plugin API.
  148. */
  149. def create(metadata: SnapshotMetadata, snapshot: Any): SelectedSnapshot =
  150. SelectedSnapshot(metadata, snapshot)
  151. }
  152. /**
  153. * INTERNAL API.
  154. *
  155. * Defines messages exchanged between persistent actors and a snapshot store.
  156. */
  157. private[persistence] object SnapshotProtocol {
  158. /** Marker trait shared by internal snapshot messages. */
  159. sealed trait Message extends Protocol.Message
  160. /** Internal snapshot command. */
  161. sealed trait Request extends Message
  162. /** Internal snapshot acknowledgement. */
  163. sealed trait Response extends Message
  164. /**
  165. * Instructs a snapshot store to load a snapshot.
  166. *
  167. * @param persistenceId persistent actor id.
  168. * @param criteria criteria for selecting a snapshot from which recovery should start.
  169. * @param toSequenceNr upper sequence number bound (inclusive) for recovery.
  170. */
  171. final case class LoadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long)
  172. extends Request
  173. /**
  174. * Response message to a [[LoadSnapshot]] message.
  175. *
  176. * @param snapshot loaded snapshot, if any.
  177. */
  178. final case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long)
  179. extends Response
  180. /**
  181. * Reply message to a failed [[LoadSnapshot]] request.
  182. * @param cause failure cause.
  183. */
  184. final case class LoadSnapshotFailed(cause: Throwable) extends Response
  185. /**
  186. * Instructs snapshot store to save a snapshot.
  187. *
  188. * @param metadata snapshot metadata.
  189. * @param snapshot snapshot.
  190. */
  191. final case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any)
  192. extends Request
  193. /**
  194. * Instructs snapshot store to delete a snapshot.
  195. *
  196. * @param metadata snapshot metadata.
  197. */
  198. final case class DeleteSnapshot(metadata: SnapshotMetadata)
  199. extends Request
  200. /**
  201. * Instructs snapshot store to delete all snapshots that match `criteria`.
  202. *
  203. * @param persistenceId persistent actor id.
  204. * @param criteria criteria for selecting snapshots to be deleted.
  205. */
  206. final case class DeleteSnapshots(persistenceId: String, criteria: SnapshotSelectionCriteria)
  207. extends Request
  208. }


