From 953f7612f656bc37dffa9865ed8949531a414bf5 Mon Sep 17 00:00:00 2001 From: Ido David Date: Mon, 24 May 2021 03:30:47 -0400 Subject: [PATCH 1/2] support handling multiple events --- .../namely/chiefofstate/AggregateRoot.scala | 55 +-- ...RootSpec.scala => AggregateRootSpec.scala} | 324 +++++++++++++++++- proto/chief-of-state-protos | 2 +- 3 files changed, 359 insertions(+), 22 deletions(-) rename code/service/src/test/scala/com/namely/chiefofstate/{AggregrateRootSpec.scala => AggregateRootSpec.scala} (69%) diff --git a/code/service/src/main/scala/com/namely/chiefofstate/AggregateRoot.scala b/code/service/src/main/scala/com/namely/chiefofstate/AggregateRoot.scala index 698cb629..508dabbf 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/AggregateRoot.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/AggregateRoot.scala @@ -178,18 +178,29 @@ object AggregateRoot { protosValidator: ProtosValidator, data: Map[String, com.google.protobuf.any.Any]): ReplyEffect[EventWrapper, StateWrapper] = { - val handlerOutput: Try[WriteHandlerHelpers.WriteTransitions] = commandHandler - .handleCommand(command, priorState) - .map(_.event match { - case Some(newEvent) => - protosValidator.requireValidEvent(newEvent) - WriteHandlerHelpers.NewEvent(newEvent) - - case None => - WriteHandlerHelpers.NoOp - }) - .flatMap({ - case WriteHandlerHelpers.NewEvent(newEvent) => + // state will be updated with the resulting state of each HandleEventResponse + var state = priorState.getState + + val handlerOutput: Try[WriteHandlerHelpers.WriteTransitions] = + commandHandler + .handleCommand(command, priorState) + // use events or fallback to event in case events is empty + .map(response => + response.events.isEmpty match { + case true => + response.event match { + case Some(newEvent) => Seq(newEvent) + case None => Seq.empty[any.Any] + } + case _ => response.events + }) + .map(_.filter(x => x != any.Any()) // ignore empty events and validate types + .map(newEvent => { + protosValidator.requireValidEvent(newEvent) + newEvent + })) + // call handleEvent for each event, while passing the updated resulting state from previous call + .map(_.map(newEvent => { val newEventMeta: MetaData = MetaData() .withRevisionNumber(priorState.getMeta.revisionNumber + 1) .withRevisionDate(Instant.now().toTimestamp) @@ -197,21 +208,25 @@ object AggregateRoot { .withEntityId(priorState.getMeta.entityId) .withHeaders(command.persistedHeaders) - val priorStateAny: com.google.protobuf.any.Any = priorState.getState + val priorStateAny: com.google.protobuf.any.Any = state eventHandler .handleEvent(newEvent, priorStateAny, newEventMeta) .map(response => { require(response.resultingState.isDefined, "event handler replied with empty state") protosValidator.requireValidState(response.getResultingState) - WriteHandlerHelpers.NewState(newEvent, response.getResultingState, newEventMeta) + state = response.getResultingState + WriteHandlerHelpers.NewState(newEvent, state, newEventMeta) }) - - case x => - Success(x) - }) - .recoverWith(makeFailedStatusPf) - + }) + // extract WriteTransitions from Try or fail and get the last one (if any exists) + .map(_.get).lastOption) + .map(x => + x match { + case None => WriteHandlerHelpers.NoOp + case Some(newState) => newState + }) + .recoverWith(makeFailedStatusPf) handlerOutput match { case Success(NoOp) => Effect.reply(replyTo)(CommandReply().withState(priorState)) diff --git a/code/service/src/test/scala/com/namely/chiefofstate/AggregrateRootSpec.scala b/code/service/src/test/scala/com/namely/chiefofstate/AggregateRootSpec.scala similarity index 69% rename from code/service/src/test/scala/com/namely/chiefofstate/AggregrateRootSpec.scala rename to code/service/src/test/scala/com/namely/chiefofstate/AggregateRootSpec.scala index f51d03ad..a48b376c 100644 --- a/code/service/src/test/scala/com/namely/chiefofstate/AggregrateRootSpec.scala +++ b/code/service/src/test/scala/com/namely/chiefofstate/AggregateRootSpec.scala @@ -19,7 +19,7 @@ import com.namely.protobuf.chiefofstate.v1.common.{ Header, MetaData } import com.namely.protobuf.chiefofstate.v1.internal._ import com.namely.protobuf.chiefofstate.v1.internal.CommandReply.Reply import com.namely.protobuf.chiefofstate.v1.persistence.StateWrapper -import com.namely.protobuf.chiefofstate.v1.tests.{ Account, AccountOpened, OpenAccount } +import com.namely.protobuf.chiefofstate.v1.tests.{ Account, AccountDebited, AccountOpened, OpenAccount } import com.namely.protobuf.chiefofstate.v1.writeside._ import com.namely.protobuf.chiefofstate.v1.writeside.WriteSideHandlerServiceGrpc.WriteSideHandlerServiceBlockingStub import com.typesafe.config.{ Config, ConfigFactory } @@ -205,6 +205,128 @@ class AggregrateRootSpec extends BaseActorSpec(s""" fail("unexpected message type") } } + "return as expected for multiple events" in { + // define the ID's + val aggregateId: String = UUID.randomUUID().toString + val persistenceId: PersistenceId = PersistenceId.ofUniqueId(aggregateId) + + // define prior state, command, and prior event meta + var priorState: Any = Any.pack(Empty.defaultInstance) + val command: Any = Any.pack(OpenAccount()) + val priorMeta: MetaData = MetaData.defaultInstance.withRevisionNumber(0).withEntityId(aggregateId) + + // define events to return and handle command response, include events that should get filtered out + val events = Seq(AccountOpened(), Any(), AccountDebited()) + + // single event that should get ignored because events is populated + val event = Any.pack(AccountDebited().withBalance(900)) + + val handleCommandRequest = + HandleCommandRequest().withCommand(command).withPriorState(priorState).withPriorEventMeta(priorMeta) + + val handleCommandResponse = + HandleCommandResponse() + .withEvents(events.map(x => + x == Any() match { // replace null message for an empty Any + case true => x.asInstanceOf[Any] + case _ => Any.pack(x.asInstanceOf[scalapb.GeneratedMessage]) + })) + .withEvent(event) + + // define resulting states evolution after each event handling + val resultingStates = Array[any.Any]( + Any.pack(Account().withAccountUuid(aggregateId).withBalance(200)), + Any.pack(Account().withAccountUuid(aggregateId).withBalance(400))) + + // mock the write handler + val serviceImpl = mock[WriteSideHandlerServiceGrpc.WriteSideHandlerService] + + (serviceImpl.handleCommand _).expects(handleCommandRequest).returning { + Future.successful(handleCommandResponse) + } + + // set expect handleEvent call for each event + handleCommandResponse.events + .filter(x => x != Any()) + .lazyZip(resultingStates) + .foreach((curEvent, curResultingState) => + (serviceImpl.handleEvent _) + .expects(*) + .onCall((request: HandleEventRequest) => { + val output = Try { + require(request.getEventMeta.revisionNumber == priorMeta.revisionNumber + 1) + require(request.event.get == curEvent) + require(request.priorState.get == priorState) + priorState = curResultingState // set prior state for next expectation + HandleEventResponse().withResultingState(curResultingState) + }.recoverWith({ case e: Throwable => + Failure(Util.makeStatusException(e)) + }) + Future.fromTry(output) + }) + .once()) + + // set expectation that should never get called + (serviceImpl.handleEvent _) + .expects(*) + .onCall((request: HandleEventRequest) => { + val output = Try { + require(request.event.get == event) + HandleEventResponse() + }.recoverWith({ case e: Throwable => + Failure(Util.makeStatusException(e)) + }) + Future.fromTry(output) + }) + .never() + + val service = WriteSideHandlerServiceGrpc.bindService(serviceImpl, global) + val serverName = InProcessServerBuilder.generateName() + + createServer(serverName, service) + val serverChannel = getChannel(serverName) + + val writeHandlerServicetub: WriteSideHandlerServiceBlockingStub = + new WriteSideHandlerServiceBlockingStub(serverChannel) + + // Let us create the sender of commands + val commandSender: TestProbe[GeneratedMessage] = + createTestProbe[GeneratedMessage]() + + val remoteCommandHandler: RemoteCommandHandler = + RemoteCommandHandler(cosConfig.grpcConfig, writeHandlerServicetub) + val remoteEventHandler: RemoteEventHandler = RemoteEventHandler(cosConfig.grpcConfig, writeHandlerServicetub) + val shardIndex = 0 + val eventsAndStateProtosValidation: ProtosValidator = + ProtosValidator(cosConfig.writeSideConfig) + + val aggregateRoot = AggregateRoot( + persistenceId, + shardIndex, + cosConfig, + remoteCommandHandler, + remoteEventHandler, + eventsAndStateProtosValidation) + + val aggregateRef: ActorRef[MessageWithActorRef] = spawn(aggregateRoot) + + val remoteCommand = RemoteCommand() + .withCommand(command) + .addPropagatedHeaders(Header().withKey("header-1").withStringValue("header-value-1")) + .withEntityId(aggregateId) + + aggregateRef ! MessageWithActorRef(SendCommand().withRemoteCommand(remoteCommand), commandSender.ref) + + commandSender.receiveMessage(replyTimeout) match { + case CommandReply(Reply.State(value: StateWrapper), _) => + value.getState shouldBe resultingStates.last + value.getMeta.revisionNumber shouldBe priorMeta.revisionNumber + 1 + value.getMeta.entityId shouldBe aggregateId + + case x => + fail("unexpected message type") + } + } "return as expected with no event to persist" in { val aggregateId: String = UUID.randomUUID().toString val persistenceId: PersistenceId = PersistenceId.ofUniqueId(aggregateId) @@ -269,6 +391,72 @@ class AggregrateRootSpec extends BaseActorSpec(s""" case _ => fail("unexpected message type") } } + "return as expected with no event to persist when events is only empty Anys" in { + val aggregateId: String = UUID.randomUUID().toString + val persistenceId: PersistenceId = PersistenceId.ofUniqueId(aggregateId) + val stateWrapper: StateWrapper = StateWrapper() + .withState(any.Any.pack(Empty.defaultInstance)) + .withMeta(MetaData.defaultInstance.withEntityId(persistenceId.id)) + val command: Any = Any.pack(OpenAccount()) + + val request = HandleCommandRequest() + .withCommand(command) + .withPriorState(stateWrapper.getState) + .withPriorEventMeta(stateWrapper.getMeta) + + val serviceImpl = mock[WriteSideHandlerServiceGrpc.WriteSideHandlerService] + + (serviceImpl.handleCommand _) + .expects(request) + .returning(Future.successful(HandleCommandResponse().withEvents(Seq().padTo(3, Any())))) + + val service = WriteSideHandlerServiceGrpc.bindService(serviceImpl, global) + + val serverName = InProcessServerBuilder.generateName() + + createServer(serverName, service) + val serverChannel = getChannel(serverName) + + val writeHandlerServicetub: WriteSideHandlerServiceBlockingStub = + new WriteSideHandlerServiceBlockingStub(serverChannel) + + // Let us create the sender of commands + val commandSender: TestProbe[GeneratedMessage] = + createTestProbe[GeneratedMessage]() + + val remoteCommandHandler: RemoteCommandHandler = + RemoteCommandHandler(cosConfig.grpcConfig, writeHandlerServicetub) + val remoteEventHandler: RemoteEventHandler = RemoteEventHandler(cosConfig.grpcConfig, writeHandlerServicetub) + val shardIndex = 0 + val eventsAndStateProtosValidation: ProtosValidator = + ProtosValidator(cosConfig.writeSideConfig) + + val aggregateRoot = AggregateRoot( + persistenceId, + shardIndex, + cosConfig, + remoteCommandHandler, + remoteEventHandler, + eventsAndStateProtosValidation) + + val aggregateRef: ActorRef[MessageWithActorRef] = spawn(aggregateRoot) + + val remoteCommand = RemoteCommand() + .withCommand(command) + .addPropagatedHeaders(Header().withKey("header-1").withStringValue("header-value-1")) + .withEntityId(aggregateId) + + aggregateRef ! MessageWithActorRef(SendCommand().withRemoteCommand(remoteCommand), commandSender.ref) + + commandSender.receiveMessage(replyTimeout) match { + case CommandReply(Reply.State(value: StateWrapper), _) => + value.getState shouldBe Any.pack(Empty.defaultInstance) + value.getMeta.revisionNumber shouldBe 0 + value.getMeta.entityId shouldBe aggregateId + + case _ => fail("unexpected message type") + } + } "return a failure when an empty command is sent" in { val aggregateId: String = UUID.randomUUID().toString val persistenceId: PersistenceId = PersistenceId.ofUniqueId(aggregateId) @@ -434,6 +622,73 @@ class AggregrateRootSpec extends BaseActorSpec(s""" case _ => fail("unexpected message type") } } + "return a failure when event handler failed for any event" in { + val aggregateId: String = UUID.randomUUID().toString + val persistenceId: PersistenceId = PersistenceId.ofUniqueId(aggregateId) + val command: Any = Any.pack(OpenAccount()) + + val serviceImpl = mock[WriteSideHandlerServiceGrpc.WriteSideHandlerService] + + // define a resulting state + val temporaryResultingState = Any.pack(Account().withAccountUuid(aggregateId).withBalance(200)) + + (serviceImpl.handleCommand _) + .expects(*) + .returning(Future.successful( + HandleCommandResponse().withEvents(Seq(Any.pack(AccountOpened()), Any.pack(AccountDebited()))))) + + (serviceImpl.handleEvent _) + .expects(*) + .returning(Future.successful(HandleEventResponse().withResultingState(temporaryResultingState))) + .once() + (serviceImpl.handleEvent _).expects(*).returning(Future.failed(Status.UNKNOWN.asException())).once() + + val service = WriteSideHandlerServiceGrpc.bindService(serviceImpl, global) + + val serverName = InProcessServerBuilder.generateName() + + createServer(serverName, service) + val serverChannel = getChannel(serverName) + + val writeHandlerServicetub: WriteSideHandlerServiceBlockingStub = + new WriteSideHandlerServiceBlockingStub(serverChannel) + + // Let us create the sender of commands + val commandSender: TestProbe[GeneratedMessage] = + createTestProbe[GeneratedMessage]() + + val remoteCommandHandler: RemoteCommandHandler = + RemoteCommandHandler(cosConfig.grpcConfig, writeHandlerServicetub) + val remoteEventHandler: RemoteEventHandler = RemoteEventHandler(cosConfig.grpcConfig, writeHandlerServicetub) + val shardIndex = 0 + val eventsAndStateProtosValidation: ProtosValidator = + ProtosValidator(cosConfig.writeSideConfig) + + val aggregateRoot = AggregateRoot( + persistenceId, + shardIndex, + cosConfig, + remoteCommandHandler, + remoteEventHandler, + eventsAndStateProtosValidation) + + val aggregateRef: ActorRef[MessageWithActorRef] = spawn(aggregateRoot) + + val remoteCommand = RemoteCommand() + .withCommand(command) + .addPropagatedHeaders(Header().withKey("header-1").withStringValue("header-value-1")) + .withEntityId(aggregateId) + + aggregateRef ! MessageWithActorRef(SendCommand().withRemoteCommand(remoteCommand), commandSender.ref) + + commandSender.receiveMessage(replyTimeout) match { + case CommandReply(Reply.Error(status), _) => + status.code shouldBe (Status.Code.UNKNOWN.value) + Option(status.message) shouldBe (Some("")) + + case _ => fail("unexpected message type") + } + } "return a failure when an invalid event is received" in { val writeSideConfig = cosConfig.writeSideConfig.copy(enableProtoValidation = true, eventsProtos = Seq(), statesProtos = Seq()) @@ -496,6 +751,73 @@ class AggregrateRootSpec extends BaseActorSpec(s""" case _ => fail("unexpected message type") } + } + "return a failure when an invalid event is received in 2nd call" in { + val writeSideConfig = + cosConfig.writeSideConfig.copy( + enableProtoValidation = true, + eventsProtos = Seq("chief_of_state.v1.AccountOpened"), + statesProtos = Seq()) + + val mainConfig = cosConfig.copy(writeSideConfig = writeSideConfig) + + val aggregateId: String = UUID.randomUUID().toString + val persistenceId: PersistenceId = PersistenceId.ofUniqueId(aggregateId) + val command: Any = Any.pack(OpenAccount()) + + val serviceImpl = mock[WriteSideHandlerServiceGrpc.WriteSideHandlerService] + + (serviceImpl.handleCommand _) + .expects(*) + .returning(Future.successful( + HandleCommandResponse().withEvents(Seq(Any.pack(AccountOpened()), Any.pack(AccountDebited()))))) + + val service = WriteSideHandlerServiceGrpc.bindService(serviceImpl, global) + + val serverName = InProcessServerBuilder.generateName() + + createServer(serverName, service) + val serverChannel = getChannel(serverName) + + val writeHandlerServicetub: WriteSideHandlerServiceBlockingStub = + new WriteSideHandlerServiceBlockingStub(serverChannel) + + // Let us create the sender of commands + val commandSender: TestProbe[GeneratedMessage] = + createTestProbe[GeneratedMessage]() + + val remoteCommandHandler: RemoteCommandHandler = + RemoteCommandHandler(mainConfig.grpcConfig, writeHandlerServicetub) + val remoteEventHandler: RemoteEventHandler = RemoteEventHandler(mainConfig.grpcConfig, writeHandlerServicetub) + val shardIndex = 0 + val eventsAndStateProtosValidation: ProtosValidator = + ProtosValidator(mainConfig.writeSideConfig) + + val aggregateRoot = AggregateRoot( + persistenceId, + shardIndex, + mainConfig, + remoteCommandHandler, + remoteEventHandler, + eventsAndStateProtosValidation) + + val aggregateRef: ActorRef[MessageWithActorRef] = spawn(aggregateRoot) + + val remoteCommand = RemoteCommand() + .withCommand(command) + .addPropagatedHeaders(Header().withKey("header-1").withStringValue("header-value-1")) + .withEntityId(aggregateId) + + aggregateRef ! MessageWithActorRef(SendCommand().withRemoteCommand(remoteCommand), commandSender.ref) + + commandSender.receiveMessage(replyTimeout) match { + case CommandReply(Reply.Error(status), _) => + status.code shouldBe (Status.Code.INVALID_ARGUMENT.value) + Option(status.message) shouldBe (Some("invalid event: type.googleapis.com/chief_of_state.v1.AccountDebited")) + + case _ => fail("unexpected message type") + } + } "return a failure when an invalid state is received" in { val writeSideConfig = cosConfig.writeSideConfig.copy( diff --git a/proto/chief-of-state-protos b/proto/chief-of-state-protos index 80c2d197..03949a66 160000 --- a/proto/chief-of-state-protos +++ b/proto/chief-of-state-protos @@ -1 +1 @@ -Subproject commit 80c2d19722b69dcce4f05efa6197b85862c81753 +Subproject commit 03949a66ec8b674fbd59cd97dbb1f464fbe7116b From 3ccac7a3797e70502af08c530f73640a471d0b53 Mon Sep 17 00:00:00 2001 From: Ido David Date: Mon, 24 May 2021 03:37:59 -0400 Subject: [PATCH 2/2] update comment --- .../src/main/scala/com/namely/chiefofstate/AggregateRoot.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code/service/src/main/scala/com/namely/chiefofstate/AggregateRoot.scala b/code/service/src/main/scala/com/namely/chiefofstate/AggregateRoot.scala index 508dabbf..ed0ed56c 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/AggregateRoot.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/AggregateRoot.scala @@ -219,7 +219,7 @@ object AggregateRoot { WriteHandlerHelpers.NewState(newEvent, state, newEventMeta) }) }) - // extract WriteTransitions from Try or fail and get the last one (if any exists) + // extract WriteTransitions of each Try or fail, then get the last one (if any exists) .map(_.get).lastOption) .map(x => x match {