Skip to content

Commit

Permalink
Bug Fix: Write handler event metadata (#132)
Browse files Browse the repository at this point in the history
Adds the correct event metadata in the write handler handleEvent method
  • Loading branch information
Zen Yui authored Nov 19, 2020
1 parent b59d423 commit 0003eaf
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,20 @@ object AggregateRoot {
})
.flatMap({
case WriteHandlerHelpers.NewEvent(newEvent) =>
val newEventMeta: MetaData = MetaData()
.withRevisionNumber(priorState.getMeta.revisionNumber + 1)
.withRevisionDate(Instant.now().toTimestamp)
.withData(data)
.withEntityId(priorState.getMeta.entityId)

val priorStateAny: com.google.protobuf.any.Any = priorState.getState

eventHandler
.handleEvent(newEvent, priorState)
.handleEvent(newEvent, priorStateAny, newEventMeta)
.map(response => {
require(response.resultingState.isDefined, "event handler replied with empty state")
eventsAndStateProtoValidation.requireValidState(response.getResultingState)
WriteHandlerHelpers.NewState(newEvent, response.getResultingState)
WriteHandlerHelpers.NewState(newEvent, response.getResultingState, newEventMeta)
})

case x =>
Expand All @@ -212,8 +220,8 @@ object AggregateRoot {
case Success(NoOp) =>
Effect.reply(replyTo)(CommandReply().withState(priorState))

case Success(NewState(event, newState)) =>
persistEventAndReply(event, newState, priorState.getMeta, data, replyTo)
case Success(NewState(event, newState, eventMeta)) =>
persistEventAndReply(event, newState, eventMeta, replyTo)

case Failure(e: StatusException) =>
OpentracingHelpers
Expand Down Expand Up @@ -309,22 +317,16 @@ object AggregateRoot {
private[chiefofstate] def persistEventAndReply(
event: any.Any,
resultingState: any.Any,
priorMeta: MetaData,
data: Map[String, any.Any],
eventMeta: MetaData,
replyTo: ActorRef[CommandReply]
): ReplyEffect[EventWrapper, StateWrapper] = {
val meta: MetaData = MetaData()
.withRevisionNumber(priorMeta.revisionNumber + 1)
.withRevisionDate(Instant.now().toTimestamp)
.withData(data)
.withEntityId(priorMeta.entityId)

Effect
.persist(
EventWrapper()
.withEvent(event)
.withResultingState(resultingState)
.withMeta(meta)
.withMeta(eventMeta)
)
.thenReply(replyTo)((updatedState: StateWrapper) => CommandReply().withState(updatedState))
}
Expand All @@ -348,5 +350,10 @@ object WriteHandlerHelpers {
sealed trait WriteTransitions
case object NoOp extends WriteTransitions
case class NewEvent(event: com.google.protobuf.any.Any) extends WriteTransitions
case class NewState(event: com.google.protobuf.any.Any, state: com.google.protobuf.any.Any) extends WriteTransitions

case class NewState(
event: com.google.protobuf.any.Any,
state: com.google.protobuf.any.Any,
eventMeta: MetaData
) extends WriteTransitions
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import scala.util.Try
import io.opentracing.contrib.grpc.TracingClientInterceptor
import io.opentracing.util.GlobalTracer
import com.namely.chiefofstate.interceptors.ErrorsClientInterceptor
import com.google.protobuf.any
import com.namely.protobuf.chiefofstate.v1.common.MetaData

/**
* handles a given event by making a rpc call
Expand All @@ -37,7 +39,7 @@ case class RemoteEventHandler(grpcConfig: GrpcConfig, writeHandlerServicetub: Wr
* @param priorState the aggregate prior state
* @return the eventual HandleEventResponse
*/
def handleEvent(event: com.google.protobuf.any.Any, priorState: StateWrapper): Try[HandleEventResponse] = {
def handleEvent(event: any.Any, priorState: any.Any, eventMeta: MetaData): Try[HandleEventResponse] = {
Try {
log.debug(
s"sending request to the event handler, ${event.typeUrl}"
Expand All @@ -49,8 +51,8 @@ case class RemoteEventHandler(grpcConfig: GrpcConfig, writeHandlerServicetub: Wr
.handleEvent(
HandleEventRequest()
.withEvent(event)
.withPriorState(priorState.getState)
.withEventMeta(priorState.getMeta)
.withPriorState(priorState)
.withEventMeta(eventMeta)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import com.namely.chiefofstate.helper.GrpcHelpers.Closeables
import scala.concurrent.Future
import com.namely.protobuf.chiefofstate.v1.writeside.WriteSideHandlerServiceGrpc.WriteSideHandlerServiceStub
import io.grpc.ServerServiceDefinition
import scala.util.Try
import scala.util.Failure

class AggregrateRootSpec extends BaseActorSpec(s"""
akka.cluster.sharding.number-of-shards = 1
Expand Down Expand Up @@ -147,38 +149,52 @@ class AggregrateRootSpec extends BaseActorSpec(s"""

".handleCommand" should {
"return as expected" in {
// define the ID's
val aggregateId: String = UUID.randomUUID().toString
val persistenceId: PersistenceId = PersistenceId("chiefofstate", aggregateId)
val state: Account = Account().withAccountUuid(aggregateId)
val stateWrapper: StateWrapper = StateWrapper()
.withState(any.Any.pack(Empty.defaultInstance))
.withMeta(
MetaData.defaultInstance.withEntityId(getEntityId(persistenceId))
)
val command: Any = Any.pack(OpenAccount())
val event: AccountOpened = AccountOpened()

val resultingState = com.google.protobuf.any.Any.pack(state.withBalance(200))
// define prior state, command, and prior event meta
val priorState: Any = Any.pack(Empty.defaultInstance)
val command: Any = Any.pack(OpenAccount())
val priorMeta: MetaData = MetaData.defaultInstance
.withRevisionNumber(0)
.withEntityId(aggregateId)

val serviceImpl = mock[WriteSideHandlerServiceGrpc.WriteSideHandlerService]
// define event to return and handle command response
val event: AccountOpened = AccountOpened()

val handleCommandRequest = HandleCommandRequest()
.withCommand(command)
.withPriorState(stateWrapper.getState)
.withPriorEventMeta(stateWrapper.getMeta)
.withPriorState(priorState)
.withPriorEventMeta(priorMeta)

val handleEventRequest = HandleEventRequest()
.withPriorState(stateWrapper.getState)
.withEventMeta(stateWrapper.getMeta)
val handleCommandResponse = HandleCommandResponse()
.withEvent(Any.pack(event))

// define a resulting state
val resultingState = Any.pack(Account().withAccountUuid(aggregateId).withBalance(200))

// mock the write handler
val serviceImpl = mock[WriteSideHandlerServiceGrpc.WriteSideHandlerService]

(serviceImpl.handleCommand _)
.expects(handleCommandRequest)
.returning(Future.successful(HandleCommandResponse().withEvent(Any.pack(event))))
.returning {
Future.successful(handleCommandResponse)
}

(serviceImpl.handleEvent _)
.expects(handleEventRequest)
.returning(Future.successful(HandleEventResponse().withResultingState(resultingState)))
.expects(*)
.onCall((request: HandleEventRequest) => {
val output = Try {
require(request.getEventMeta.revisionNumber == priorMeta.revisionNumber + 1)
HandleEventResponse().withResultingState(resultingState)
}
.recoverWith({
case e: Throwable => Failure(Util.makeStatusException(e))
})
Future.fromTry(output)
})

val service = WriteSideHandlerServiceGrpc.bindService(serviceImpl, global)
val serverName = InProcessServerBuilder.generateName()
Expand Down Expand Up @@ -223,13 +239,12 @@ class AggregrateRootSpec extends BaseActorSpec(s"""

commandSender.receiveMessage(replyTimeout) match {
case CommandReply(Reply.State(value: StateWrapper), _) =>
val account: Account = value.getState.unpack[Account]
account.accountUuid shouldBe aggregateId
account.balance shouldBe 200
value.getMeta.revisionNumber shouldBe 1
value.getState shouldBe (resultingState)
value.getMeta.revisionNumber shouldBe priorMeta.revisionNumber + 1
value.getMeta.entityId shouldBe aggregateId

case _ => fail("unexpected message type")
case x =>
fail("unexpected message type")
}
}
"return as expected with no event to persist" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import com.namely.chiefofstate.helper.GrpcHelpers.Closeables
import io.grpc.ServerServiceDefinition
import io.grpc.inprocess._
import scala.concurrent.ExecutionContext.global
import com.namely.protobuf.chiefofstate.v1.common.MetaData
import com.google.protobuf.any

class RemoteEventHandlerSpec extends BaseSpec {

Expand Down Expand Up @@ -52,20 +54,22 @@ class RemoteEventHandlerSpec extends BaseSpec {

"RemoteEventHandler" should {
"handle event successfully" in {
val state: Account = Account().withAccountUuid("123")

val stateWrapper: StateWrapper = StateWrapper().withState(com.google.protobuf.any.Any.pack(state))
val state = Account().withAccountUuid("123")
val stateAny = any.Any.pack(state)

val resultingState = com.google.protobuf.any.Any.pack(state.withBalance(200))

val event: any.Any = com.google.protobuf.any.Any.pack(AccountOpened())

val eventMeta: MetaData = MetaData.defaultInstance
.withRevisionNumber(2)

val expected: HandleEventResponse =
HandleEventResponse().withResultingState(resultingState)

val request: HandleEventRequest = HandleEventRequest()
.withPriorState(stateWrapper.getState)
.withEventMeta(stateWrapper.getMeta)
.withPriorState(stateAny)
.withEventMeta(eventMeta)
.withEvent(event)

val serviceImpl = mock[WriteSideHandlerServiceGrpc.WriteSideHandlerService]
Expand All @@ -82,20 +86,24 @@ class RemoteEventHandlerSpec extends BaseSpec {
new WriteSideHandlerServiceBlockingStub(serverChannel)

val remoteEventHandler: RemoteEventHandler = RemoteEventHandler(grpcConfig, writeHandlerServicetub)
val triedHandleEventResponse: Try[HandleEventResponse] = remoteEventHandler.handleEvent(event, stateWrapper)
val triedHandleEventResponse: Try[HandleEventResponse] =
remoteEventHandler.handleEvent(event, stateAny, eventMeta)
triedHandleEventResponse.success.value shouldBe (expected)
}

"handle event when there is a failure" in {
val state: Account = Account().withAccountUuid("123")
val stateAny = any.Any.pack(state)

val stateWrapper: StateWrapper = StateWrapper().withState(com.google.protobuf.any.Any.pack(state))

val event: any.Any = com.google.protobuf.any.Any.pack(AccountOpened())

val eventMeta: MetaData = MetaData.defaultInstance.withRevisionNumber(3)

val request: HandleEventRequest = HandleEventRequest()
.withPriorState(stateWrapper.getState)
.withEventMeta(stateWrapper.getMeta)
.withEventMeta(eventMeta)
.withEvent(event)

val serviceImpl = mock[WriteSideHandlerServiceGrpc.WriteSideHandlerService]
Expand All @@ -112,7 +120,8 @@ class RemoteEventHandlerSpec extends BaseSpec {
new WriteSideHandlerServiceBlockingStub(serverChannel)

val remoteEventHandler: RemoteEventHandler = RemoteEventHandler(grpcConfig, writeHandlerServicetub)
val triedHandleEventResponse: Try[HandleEventResponse] = remoteEventHandler.handleEvent(event, stateWrapper)
val triedHandleEventResponse: Try[HandleEventResponse] =
remoteEventHandler.handleEvent(event, stateAny, eventMeta)
(triedHandleEventResponse.failure.exception should have).message("UNKNOWN")
}
}
Expand Down

0 comments on commit 0003eaf

Please sign in to comment.