From 0003eaf87a07bad50c457a514fb7b3d78fae5da6 Mon Sep 17 00:00:00 2001 From: Zen Yui Date: Thu, 19 Nov 2020 15:44:15 -0500 Subject: [PATCH] Bug Fix: Write handler event metadata (#132) Adds the correct event metadata in the write handler handleEvent method --- .../namely/chiefofstate/AggregateRoot.scala | 33 ++++++---- .../chiefofstate/RemoteEventHandler.scala | 8 ++- .../chiefofstate/AggregrateRootSpec.scala | 61 ++++++++++++------- .../chiefofstate/RemoteEventHandlerSpec.scala | 25 +++++--- 4 files changed, 80 insertions(+), 47 deletions(-) diff --git a/code/service/src/main/scala/com/namely/chiefofstate/AggregateRoot.scala b/code/service/src/main/scala/com/namely/chiefofstate/AggregateRoot.scala index c2e40e0e..99f053ef 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/AggregateRoot.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/AggregateRoot.scala @@ -195,12 +195,20 @@ object AggregateRoot { }) .flatMap({ case WriteHandlerHelpers.NewEvent(newEvent) => + val newEventMeta: MetaData = MetaData() + .withRevisionNumber(priorState.getMeta.revisionNumber + 1) + .withRevisionDate(Instant.now().toTimestamp) + .withData(data) + .withEntityId(priorState.getMeta.entityId) + + val priorStateAny: com.google.protobuf.any.Any = priorState.getState + eventHandler - .handleEvent(newEvent, priorState) + .handleEvent(newEvent, priorStateAny, newEventMeta) .map(response => { require(response.resultingState.isDefined, "event handler replied with empty state") eventsAndStateProtoValidation.requireValidState(response.getResultingState) - WriteHandlerHelpers.NewState(newEvent, response.getResultingState) + WriteHandlerHelpers.NewState(newEvent, response.getResultingState, newEventMeta) }) case x => @@ -212,8 +220,8 @@ object AggregateRoot { case Success(NoOp) => Effect.reply(replyTo)(CommandReply().withState(priorState)) - case Success(NewState(event, newState)) => - persistEventAndReply(event, newState, priorState.getMeta, data, replyTo) + case Success(NewState(event, newState, eventMeta)) => + persistEventAndReply(event, newState, eventMeta, replyTo) case Failure(e: StatusException) => OpentracingHelpers @@ -309,22 +317,16 @@ object AggregateRoot { private[chiefofstate] def persistEventAndReply( event: any.Any, resultingState: any.Any, - priorMeta: MetaData, - data: Map[String, any.Any], + eventMeta: MetaData, replyTo: ActorRef[CommandReply] ): ReplyEffect[EventWrapper, StateWrapper] = { - val meta: MetaData = MetaData() - .withRevisionNumber(priorMeta.revisionNumber + 1) - .withRevisionDate(Instant.now().toTimestamp) - .withData(data) - .withEntityId(priorMeta.entityId) Effect .persist( EventWrapper() .withEvent(event) .withResultingState(resultingState) - .withMeta(meta) + .withMeta(eventMeta) ) .thenReply(replyTo)((updatedState: StateWrapper) => CommandReply().withState(updatedState)) } @@ -348,5 +350,10 @@ object WriteHandlerHelpers { sealed trait WriteTransitions case object NoOp extends WriteTransitions case class NewEvent(event: com.google.protobuf.any.Any) extends WriteTransitions - case class NewState(event: com.google.protobuf.any.Any, state: com.google.protobuf.any.Any) extends WriteTransitions + + case class NewState( + event: com.google.protobuf.any.Any, + state: com.google.protobuf.any.Any, + eventMeta: MetaData + ) extends WriteTransitions } diff --git a/code/service/src/main/scala/com/namely/chiefofstate/RemoteEventHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/RemoteEventHandler.scala index 81727e11..7e09803e 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/RemoteEventHandler.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/RemoteEventHandler.scala @@ -12,6 +12,8 @@ import scala.util.Try import io.opentracing.contrib.grpc.TracingClientInterceptor import io.opentracing.util.GlobalTracer import com.namely.chiefofstate.interceptors.ErrorsClientInterceptor +import com.google.protobuf.any +import com.namely.protobuf.chiefofstate.v1.common.MetaData /** * handles a given event by making a rpc call @@ -37,7 +39,7 @@ case class RemoteEventHandler(grpcConfig: GrpcConfig, writeHandlerServicetub: Wr * @param priorState the aggregate prior state * @return the eventual HandleEventResponse */ - def handleEvent(event: com.google.protobuf.any.Any, priorState: StateWrapper): Try[HandleEventResponse] = { + def handleEvent(event: any.Any, priorState: any.Any, eventMeta: MetaData): Try[HandleEventResponse] = { Try { log.debug( s"sending request to the event handler, ${event.typeUrl}" @@ -49,8 +51,8 @@ case class RemoteEventHandler(grpcConfig: GrpcConfig, writeHandlerServicetub: Wr .handleEvent( HandleEventRequest() .withEvent(event) - .withPriorState(priorState.getState) - .withEventMeta(priorState.getMeta) + .withPriorState(priorState) + .withEventMeta(eventMeta) ) } } diff --git a/code/service/src/test/scala/com/namely/chiefofstate/AggregrateRootSpec.scala b/code/service/src/test/scala/com/namely/chiefofstate/AggregrateRootSpec.scala index 06953352..b60411ad 100644 --- a/code/service/src/test/scala/com/namely/chiefofstate/AggregrateRootSpec.scala +++ b/code/service/src/test/scala/com/namely/chiefofstate/AggregrateRootSpec.scala @@ -30,6 +30,8 @@ import com.namely.chiefofstate.helper.GrpcHelpers.Closeables import scala.concurrent.Future import com.namely.protobuf.chiefofstate.v1.writeside.WriteSideHandlerServiceGrpc.WriteSideHandlerServiceStub import io.grpc.ServerServiceDefinition +import scala.util.Try +import scala.util.Failure class AggregrateRootSpec extends BaseActorSpec(s""" akka.cluster.sharding.number-of-shards = 1 @@ -147,38 +149,52 @@ class AggregrateRootSpec extends BaseActorSpec(s""" ".handleCommand" should { "return as expected" in { + // define the ID's val aggregateId: String = UUID.randomUUID().toString val persistenceId: PersistenceId = PersistenceId("chiefofstate", aggregateId) - val state: Account = Account().withAccountUuid(aggregateId) - val stateWrapper: StateWrapper = StateWrapper() - .withState(any.Any.pack(Empty.defaultInstance)) - .withMeta( - MetaData.defaultInstance.withEntityId(getEntityId(persistenceId)) - ) - val command: Any = Any.pack(OpenAccount()) - val event: AccountOpened = AccountOpened() - val resultingState = com.google.protobuf.any.Any.pack(state.withBalance(200)) + // define prior state, command, and prior event meta + val priorState: Any = Any.pack(Empty.defaultInstance) + val command: Any = Any.pack(OpenAccount()) + val priorMeta: MetaData = MetaData.defaultInstance + .withRevisionNumber(0) + .withEntityId(aggregateId) - val serviceImpl = mock[WriteSideHandlerServiceGrpc.WriteSideHandlerService] + // define event to return and handle command response + val event: AccountOpened = AccountOpened() val handleCommandRequest = HandleCommandRequest() .withCommand(command) - .withPriorState(stateWrapper.getState) - .withPriorEventMeta(stateWrapper.getMeta) + .withPriorState(priorState) + .withPriorEventMeta(priorMeta) - val handleEventRequest = HandleEventRequest() - .withPriorState(stateWrapper.getState) - .withEventMeta(stateWrapper.getMeta) + val handleCommandResponse = HandleCommandResponse() .withEvent(Any.pack(event)) + // define a resulting state + val resultingState = Any.pack(Account().withAccountUuid(aggregateId).withBalance(200)) + + // mock the write handler + val serviceImpl = mock[WriteSideHandlerServiceGrpc.WriteSideHandlerService] + (serviceImpl.handleCommand _) .expects(handleCommandRequest) - .returning(Future.successful(HandleCommandResponse().withEvent(Any.pack(event)))) + .returning { + Future.successful(handleCommandResponse) + } (serviceImpl.handleEvent _) - .expects(handleEventRequest) - .returning(Future.successful(HandleEventResponse().withResultingState(resultingState))) + .expects(*) + .onCall((request: HandleEventRequest) => { + val output = Try { + require(request.getEventMeta.revisionNumber == priorMeta.revisionNumber + 1) + HandleEventResponse().withResultingState(resultingState) + } + .recoverWith({ + case e: Throwable => Failure(Util.makeStatusException(e)) + }) + Future.fromTry(output) + }) val service = WriteSideHandlerServiceGrpc.bindService(serviceImpl, global) val serverName = InProcessServerBuilder.generateName() @@ -223,13 +239,12 @@ class AggregrateRootSpec extends BaseActorSpec(s""" commandSender.receiveMessage(replyTimeout) match { case CommandReply(Reply.State(value: StateWrapper), _) => - val account: Account = value.getState.unpack[Account] - account.accountUuid shouldBe aggregateId - account.balance shouldBe 200 - value.getMeta.revisionNumber shouldBe 1 + value.getState shouldBe (resultingState) + value.getMeta.revisionNumber shouldBe priorMeta.revisionNumber + 1 value.getMeta.entityId shouldBe aggregateId - case _ => fail("unexpected message type") + case x => + fail("unexpected message type") } } "return as expected with no event to persist" in { diff --git a/code/service/src/test/scala/com/namely/chiefofstate/RemoteEventHandlerSpec.scala b/code/service/src/test/scala/com/namely/chiefofstate/RemoteEventHandlerSpec.scala index 075c0e78..99e06fef 100644 --- a/code/service/src/test/scala/com/namely/chiefofstate/RemoteEventHandlerSpec.scala +++ b/code/service/src/test/scala/com/namely/chiefofstate/RemoteEventHandlerSpec.scala @@ -17,6 +17,8 @@ import com.namely.chiefofstate.helper.GrpcHelpers.Closeables import io.grpc.ServerServiceDefinition import io.grpc.inprocess._ import scala.concurrent.ExecutionContext.global +import com.namely.protobuf.chiefofstate.v1.common.MetaData +import com.google.protobuf.any class RemoteEventHandlerSpec extends BaseSpec { @@ -52,20 +54,22 @@ class RemoteEventHandlerSpec extends BaseSpec { "RemoteEventHandler" should { "handle event successfully" in { - val state: Account = Account().withAccountUuid("123") - - val stateWrapper: StateWrapper = StateWrapper().withState(com.google.protobuf.any.Any.pack(state)) + val state = Account().withAccountUuid("123") + val stateAny = any.Any.pack(state) val resultingState = com.google.protobuf.any.Any.pack(state.withBalance(200)) val event: any.Any = com.google.protobuf.any.Any.pack(AccountOpened()) + val eventMeta: MetaData = MetaData.defaultInstance + .withRevisionNumber(2) + val expected: HandleEventResponse = HandleEventResponse().withResultingState(resultingState) val request: HandleEventRequest = HandleEventRequest() - .withPriorState(stateWrapper.getState) - .withEventMeta(stateWrapper.getMeta) + .withPriorState(stateAny) + .withEventMeta(eventMeta) .withEvent(event) val serviceImpl = mock[WriteSideHandlerServiceGrpc.WriteSideHandlerService] @@ -82,20 +86,24 @@ class RemoteEventHandlerSpec extends BaseSpec { new WriteSideHandlerServiceBlockingStub(serverChannel) val remoteEventHandler: RemoteEventHandler = RemoteEventHandler(grpcConfig, writeHandlerServicetub) - val triedHandleEventResponse: Try[HandleEventResponse] = remoteEventHandler.handleEvent(event, stateWrapper) + val triedHandleEventResponse: Try[HandleEventResponse] = + remoteEventHandler.handleEvent(event, stateAny, eventMeta) triedHandleEventResponse.success.value shouldBe (expected) } "handle event when there is a failure" in { val state: Account = Account().withAccountUuid("123") + val stateAny = any.Any.pack(state) val stateWrapper: StateWrapper = StateWrapper().withState(com.google.protobuf.any.Any.pack(state)) val event: any.Any = com.google.protobuf.any.Any.pack(AccountOpened()) + val eventMeta: MetaData = MetaData.defaultInstance.withRevisionNumber(3) + val request: HandleEventRequest = HandleEventRequest() .withPriorState(stateWrapper.getState) - .withEventMeta(stateWrapper.getMeta) + .withEventMeta(eventMeta) .withEvent(event) val serviceImpl = mock[WriteSideHandlerServiceGrpc.WriteSideHandlerService] @@ -112,7 +120,8 @@ class RemoteEventHandlerSpec extends BaseSpec { new WriteSideHandlerServiceBlockingStub(serverChannel) val remoteEventHandler: RemoteEventHandler = RemoteEventHandler(grpcConfig, writeHandlerServicetub) - val triedHandleEventResponse: Try[HandleEventResponse] = remoteEventHandler.handleEvent(event, stateWrapper) + val triedHandleEventResponse: Try[HandleEventResponse] = + remoteEventHandler.handleEvent(event, stateAny, eventMeta) (triedHandleEventResponse.failure.exception should have).message("UNKNOWN") } }