-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathCounterSpec.scala
157 lines (118 loc) · 4.62 KB
/
CounterSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package com.evolutiongaming.safeakka.persistence
import java.util.UUID
import akka.actor.Status
import com.evolutiongaming.nel.Nel
import com.evolutiongaming.safeakka.actor.util.ActorSpec
import com.evolutiongaming.safeakka.actor.{ActorCtx, ActorLog, SafeActorRef, Signal}
import com.evolutiongaming.safeakka.persistence.PersistentBehavior as Behavior
import scala.concurrent.duration.*
import org.scalatest.wordspec.AnyWordSpec
class CounterSpec extends AnyWordSpec with ActorSpec {
import CounterSpec.*
"Counter" should {
"handle commands and persist events" in new Scope {
ref ! Cmd.Get
expectMsg(Counter(0, 0))
expectMsg(PersistenceSignal.Sys(Signal.RcvTimeout))
ref ! Cmd.Inc
expectMsg(Event.Inc)
expectMsg(Counter(1, 1))
ref ! Cmd.Dec
expectMsg(Event.Dec)
expectMsg(Counter(0, 2))
expectMsgType[PersistenceSignal.SaveSnapshotSuccess]
ref ! Cmd.Stop
expectMsg(PersistenceSignal.Sys(Signal.PostStop))
}
"handle many commands" ignore new Scope {
val n = 10000
for {_ <- 0 to n} ref ! Cmd.Inc
val expected = Counter(n, n.toLong)
fishForMessage() {
case Event.Inc => false
case `expected` => true
case _: Counter => false
}
}
}
private trait Scope extends ActorScope {
private def persistenceSetup(ctx: ActorCtx) = {
ctx.setReceiveTimeout(300.millis)
new PersistenceSetup[Counter, Counter, Cmd, Event] {
override val persistenceId: String = UUID.randomUUID().toString
override val log: ActorLog = ActorLog(system, classOf[CounterSpec]).prefixed(persistenceId)
override def journalId: Option[String] = None
override def snapshotId: Option[String] = None
override def onRecoveryStarted(
offer: Option[SnapshotOffer[Counter]],
journaller: Journaller,
snapshotter: Snapshotter[Counter]): Recovering = new Recovering {
override def state: Counter = offer map { _.snapshot } getOrElse Counter(0, 0)
override def eventHandler(state: Counter, event: Event, seqNr: SeqNr): Counter = state(event, seqNr)
override def onCompleted(state: Counter, seqNr: SeqNr): Behavior[Cmd, Event] = {
def behavior(counter: Counter): Behavior[Cmd, Event] = Behavior[Cmd, Event] { (signal, _) =>
signal match {
case signal: PersistenceSignal.System =>
testActor.tell(signal, ctx.self)
signal match {
case PersistenceSignal.Sys(Signal.RcvTimeout) => ctx.setReceiveTimeout(Duration.Inf)
case _ =>
}
behavior(counter)
case PersistenceSignal.Cmd(cmd, sender) =>
def onEvent(event: Event) = {
val record = Record.of(event)(_ => sender.tell(event, ctx.self))
val onPersisted = (seqNr: SeqNr) => {
val newCounter = counter(event, seqNr)
sender.tell(newCounter, ctx.self)
if (cmd == Cmd.Dec) snapshotter.save(seqNr, newCounter)
behavior(newCounter)
}
val onFailure = (failure: Throwable) => {
sender.tell(Status.Failure(failure), ctx.self)
}
Behavior.persist(Nel(record), onPersisted, onFailure)
}
cmd match {
case Cmd.Inc => onEvent(Event.Inc)
case Cmd.Dec => onEvent(Event.Dec)
case Cmd.Stop => Behavior.stop
case Cmd.Get =>
sender.tell(counter, ctx.self)
behavior(counter)
}
}
}
behavior(state)
}
override def onStopped(state: Counter, seqNr: SeqNr): Unit = {}
}
override def onStopped(seqNr: SeqNr): Unit = {}
}
}
val ref: SafeActorRef[Cmd] = PersistentActorRef(persistenceSetup)
}
}
object CounterSpec {
case class Counter(value: Int, seqNr: SeqNr) {
def apply(event: Event, seqNr: SeqNr): Counter = {
val x = event match {
case Event.Inc => 1
case Event.Dec => -1
}
Counter(value + x, seqNr)
}
}
sealed trait Cmd
object Cmd {
case object Inc extends Cmd
case object Dec extends Cmd
case object Get extends Cmd
case object Stop extends Cmd
}
sealed trait Event
object Event {
case object Inc extends Event
case object Dec extends Event
}
}