当前位置:   article > 正文

akka PersistentActor(event sourcing)通过mysql持久化数据_akka-persistence mysql schema

akka-persistence mysql schema

akka PersistentActor(event sourcing)通过mysql持久化数据

一:新建mysql数据库表

create database if not exists akka_persistence_sql_async;

CREATE TABLE IF NOT EXISTS persistence_metadata (
  persistence_key BIGINT NOT NULL AUTO_INCREMENT,
  persistence_id VARCHAR(255) NOT NULL,
  sequence_nr BIGINT NOT NULL,
  PRIMARY KEY (persistence_key),
  UNIQUE (persistence_id)
) ENGINE = InnoDB;

CREATE TABLE IF NOT EXISTS persistence_journal (
  persistence_key BIGINT NOT NULL,
  sequence_nr BIGINT NOT NULL,
  message LONGBLOB NOT NULL,
  PRIMARY KEY (persistence_key, sequence_nr),
  FOREIGN KEY (persistence_key) REFERENCES persistence_metadata (persistence_key)
) ENGINE = InnoDB;

CREATE TABLE IF NOT EXISTS persistence_snapshot (
  persistence_key BIGINT NOT NULL,
  sequence_nr BIGINT NOT NULL,
  created_at BIGINT NOT NULL,
  snapshot LONGBLOB NOT NULL,
  PRIMARY KEY (persistence_key, sequence_nr),
  FOREIGN KEY (persistence_key) REFERENCES persistence_metadata (persistence_key)
) ENGINE = InnoDB;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

二:新建一个sbt项目PersistTest

在build.sbt 中添加依赖


name := "PersistTest"

version := "1.0"

scalaVersion := "2.12.6"

lazy val akkaVersion = "2.6.0-M1"

lazy val sparkVersion="2.4.3"



libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-testkit" % akkaVersion,

  "org.scalatest" %% "scalatest" % "3.0.5" % "test",

  "com.typesafe.akka" %% "akka-persistence" % "2.5.6",
  "com.okumin" %% "akka-persistence-sql-async" % "0.5.1",
  "com.github.mauricio" %% "mysql-async" % "0.2.21",
  // "com.github.mauricio" %% "postgresql-async" % "0.2.20", // for postgres, but this example is for mysql, so not needed

  "org.apache.spark" %% "spark-sql" % sparkVersion
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

三、配置数据库

新建文件src\main\resources\application.conf

在application.conf文件中添加配置信息,注意将password修改为mysql数据库对应的密码

akka {
  persistence {
    journal.plugin = "akka-persistence-sql-async.journal"
    snapshot-store.plugin = "akka-persistence-sql-async.snapshot-store"
  }
}

akka-persistence-sql-async {
  journal.class = "akka.persistence.journal.sqlasync.MySQLAsyncWriteJournal"
  snapshot-store.class = "akka.persistence.snapshot.sqlasync.MySQLSnapshotStore"

  # For PostgreSQL
  # journal.class = "akka.persistence.journal.sqlasync.PostgreSQLAsyncWriteJournal"
  # snapshot-store.class = "akka.persistence.snapshot.sqlasync.PostgreSQLSnapshotStore"

  user = "root"
  password = "123456"
  url = "jdbc:mysql://localhost/akka_persistence_sql_async"
  max-pool-size = 4
  wait-queue-capacity = 10000

  metadata-table-name = "persistence_metadata"
  journal-table-name = "persistence_journal"
  snapshot-table-name = "persistence_snapshot"

  connect-timeout = 5s
  query-timeout = 5s
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

四:编写代码

package com.example

import java.util

import akka.actor.{ActorSystem, Props}
import akka.persistence.{PersistentActor, SnapshotOffer}
import com.example._

sealed trait  Command
case class Add(str:String) extends Command
case class Get() extends Command
case class Clear() extends Command
case class Save() extends Command

case class Snapshot(Strs:util.ArrayList[String])



object Run extends App{
  val actorSystem = ActorSystem("myhellosys")
  val myakka = actorSystem.actorOf(Props[Mypersistence],"myakaactor")
  myakka ! Clear()
  myakka ! Get()
  for(i <- 1 to 10){
    myakka ! Add(i.toString())
    if( i==5 ) myakka ! Save() //第五的时候,保存snapshot
  }
  
  myakka ! Get()
}


class Mypersistence extends PersistentActor{

  val ID:Int = -1

  var c=0

  var Strs:util.ArrayList[String]=new util.ArrayList[String]


  override def persistenceId: String = s"this persistentId $ID"

  override def receiveRecover: Receive = {
    case Add(str) => {
      Strs.add(str)
      c+=1
    }
    case SnapshotOffer(_,snapshot: Snapshot)=>{
      Strs=snapshot.Strs
      println("snapshot")
    }

  }

  override def receiveCommand: Receive = {
    case Add(str) => persist(Add(str)){e=>
      Strs.add(str)
    }
    case Get() => {
      sender() ! Strs
      println("mystrs:"+Strs.toString)
      println("conut:"+c)
    }
    case Clear() =>{
      persist(Clear())(e=>Strs.clear())
    }
    case Save()=> saveSnapshot(Snapshot(Strs))
  }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

五:运行结果

mystrs:[]
conut:0
mystrs:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
conut:0
  • 1
  • 2
  • 3
  • 4

修改代码,注释部分代码

object Run extends App{
  val actorSystem = ActorSystem("myhellosys")
  val myakka = actorSystem.actorOf(Props[Mypersistence],"myakaactor")
//  myakka ! Clear()
//  myakka ! Get()
//  for(i <- 1 to 10){
//    myakka ! Add(i.toString())
//    if( i==5 ) myakka ! Save() //第五的时候,保存snapshot
//  }

  myakka ! Get()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

再次运行,得到结果:

snapshot
mystrs:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
conut:5
  • 1
  • 2
  • 3

可见,再次运行的时候先从snapshot载入,然后依次运行snapshot之后的动作

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/179721
推荐阅读
相关标签
  

闽ICP备14008679号