赞
踩
create database if not exists akka_persistence_sql_async; CREATE TABLE IF NOT EXISTS persistence_metadata ( persistence_key BIGINT NOT NULL AUTO_INCREMENT, persistence_id VARCHAR(255) NOT NULL, sequence_nr BIGINT NOT NULL, PRIMARY KEY (persistence_key), UNIQUE (persistence_id) ) ENGINE = InnoDB; CREATE TABLE IF NOT EXISTS persistence_journal ( persistence_key BIGINT NOT NULL, sequence_nr BIGINT NOT NULL, message LONGBLOB NOT NULL, PRIMARY KEY (persistence_key, sequence_nr), FOREIGN KEY (persistence_key) REFERENCES persistence_metadata (persistence_key) ) ENGINE = InnoDB; CREATE TABLE IF NOT EXISTS persistence_snapshot ( persistence_key BIGINT NOT NULL, sequence_nr BIGINT NOT NULL, created_at BIGINT NOT NULL, snapshot LONGBLOB NOT NULL, PRIMARY KEY (persistence_key, sequence_nr), FOREIGN KEY (persistence_key) REFERENCES persistence_metadata (persistence_key) ) ENGINE = InnoDB;
在build.sbt 中添加依赖
name := "PersistTest" version := "1.0" scalaVersion := "2.12.6" lazy val akkaVersion = "2.6.0-M1" lazy val sparkVersion="2.4.3" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-testkit" % akkaVersion, "org.scalatest" %% "scalatest" % "3.0.5" % "test", "com.typesafe.akka" %% "akka-persistence" % "2.5.6", "com.okumin" %% "akka-persistence-sql-async" % "0.5.1", "com.github.mauricio" %% "mysql-async" % "0.2.21", // "com.github.mauricio" %% "postgresql-async" % "0.2.20", // for postgres, but this example is for mysql, so not needed "org.apache.spark" %% "spark-sql" % sparkVersion )
新建文件src\main\resources\application.conf
在application.conf文件中添加配置信息,注意将password修改为mysql数据库对应的密码
akka { persistence { journal.plugin = "akka-persistence-sql-async.journal" snapshot-store.plugin = "akka-persistence-sql-async.snapshot-store" } } akka-persistence-sql-async { journal.class = "akka.persistence.journal.sqlasync.MySQLAsyncWriteJournal" snapshot-store.class = "akka.persistence.snapshot.sqlasync.MySQLSnapshotStore" # For PostgreSQL # journal.class = "akka.persistence.journal.sqlasync.PostgreSQLAsyncWriteJournal" # snapshot-store.class = "akka.persistence.snapshot.sqlasync.PostgreSQLSnapshotStore" user = "root" password = "123456" url = "jdbc:mysql://localhost/akka_persistence_sql_async" max-pool-size = 4 wait-queue-capacity = 10000 metadata-table-name = "persistence_metadata" journal-table-name = "persistence_journal" snapshot-table-name = "persistence_snapshot" connect-timeout = 5s query-timeout = 5s }
package com.example import java.util import akka.actor.{ActorSystem, Props} import akka.persistence.{PersistentActor, SnapshotOffer} import com.example._ sealed trait Command case class Add(str:String) extends Command case class Get() extends Command case class Clear() extends Command case class Save() extends Command case class Snapshot(Strs:util.ArrayList[String]) object Run extends App{ val actorSystem = ActorSystem("myhellosys") val myakka = actorSystem.actorOf(Props[Mypersistence],"myakaactor") myakka ! Clear() myakka ! Get() for(i <- 1 to 10){ myakka ! Add(i.toString()) if( i==5 ) myakka ! Save() //第五的时候,保存snapshot } myakka ! Get() } class Mypersistence extends PersistentActor{ val ID:Int = -1 var c=0 var Strs:util.ArrayList[String]=new util.ArrayList[String] override def persistenceId: String = s"this persistentId $ID" override def receiveRecover: Receive = { case Add(str) => { Strs.add(str) c+=1 } case SnapshotOffer(_,snapshot: Snapshot)=>{ Strs=snapshot.Strs println("snapshot") } } override def receiveCommand: Receive = { case Add(str) => persist(Add(str)){e=> Strs.add(str) } case Get() => { sender() ! Strs println("mystrs:"+Strs.toString) println("conut:"+c) } case Clear() =>{ persist(Clear())(e=>Strs.clear()) } case Save()=> saveSnapshot(Snapshot(Strs)) } }
mystrs:[]
conut:0
mystrs:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
conut:0
修改代码,注释部分代码
object Run extends App{
val actorSystem = ActorSystem("myhellosys")
val myakka = actorSystem.actorOf(Props[Mypersistence],"myakaactor")
// myakka ! Clear()
// myakka ! Get()
// for(i <- 1 to 10){
// myakka ! Add(i.toString())
// if( i==5 ) myakka ! Save() //第五的时候,保存snapshot
// }
myakka ! Get()
}
再次运行,得到结果:
snapshot
mystrs:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
conut:5
可见,再次运行的时候先从snapshot载入,然后依次运行snapshot之后的动作
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。