Skip to content

Commit

Permalink
HandleCommandRequest/HandleEventRequest proto fields renamed ✨ (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Sep 28, 2020
1 parent 8dbf99b commit 0d056ab
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ class AggregateCommandHandler(
final val log: Logger = LoggerFactory.getLogger(getClass)

/**
* entrypoint command handler that unpacks the command proto and calls
* the typed parameter
*
* @param command
* @param priorState
* @param priorEventMeta
* @return
*/
* entrypoint command handler that unpacks the command proto and calls
* the typed parameter
*
* @param command
* @param priorState
* @param priorEventMeta
* @return
*/
final def handle(command: Any, priorState: Any, priorEventMeta: MetaData): Try[CommandHandlerResponse] = {
ProtosRegistry.unpackAny(command) match {
case Failure(exception) =>
Expand All @@ -69,7 +69,10 @@ class AggregateCommandHandler(
* @param priorEventMeta the priorEventMeta
* @return
*/
def handleTyped(command: scalapb.GeneratedMessage, priorState: Any, priorEventMeta: MetaData): Try[CommandHandlerResponse] = {
def handleTyped(command: scalapb.GeneratedMessage,
priorState: Any,
priorEventMeta: MetaData
): Try[CommandHandlerResponse] = {
command match {
// handle get requests locally
case getStateRequest: GetStateRequest => Try(handleGetCommand(getStateRequest, priorEventMeta))
Expand All @@ -88,9 +91,7 @@ class AggregateCommandHandler(
* @param priorEventMeta the prior event meta
* @return a command handler response indicating Reply or failure
*/
def handleGetCommand(command: GetStateRequest,
priorEventMeta: MetaData
): CommandHandlerResponse = {
def handleGetCommand(command: GetStateRequest, priorEventMeta: MetaData): CommandHandlerResponse = {
log.debug("[ChiefOfState] handling GetStateRequest")

// use revision number to determine if there was a prior state
Expand Down Expand Up @@ -122,9 +123,9 @@ class AggregateCommandHandler(
val responseAttempt: Try[HandleCommandResponse] = Try {
// construct the request message
val handleCommandRequest =
HandleCommandRequest(command=remoteCommand.command)
.withCurrentState(priorState)
.withMeta(Util.toCosMetaData(priorEventMeta))
HandleCommandRequest(command = remoteCommand.command)
.withPriorState(priorState)
.withPriorEventMeta(Util.toCosMetaData(priorEventMeta))

// create an akka gRPC request builder
val futureResponse: Future[HandleCommandResponse] =
Expand Down Expand Up @@ -164,7 +165,6 @@ class AggregateCommandHandler(
def handleRemoteResponseSuccess(response: HandleCommandResponse): CommandHandlerResponse = {
response.event match {
case Some(event) =>

log.debug("[ChiefOfState] command handler return successfully. An event will be persisted...")

val eventFQN: String = Util.getProtoFullyQualifiedName(event)
Expand All @@ -178,8 +178,8 @@ class AggregateCommandHandler(
)
} else {
log.debug(s"[ChiefOfState] command handler event to persist $eventFQN is valid.")
CommandHandlerResponse()
.withEvent(event)
CommandHandlerResponse()
.withEvent(event)
}

case None =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.namely.chiefofstate

import com.namely.chiefofstate.config.HandlerSetting
import akka.actor.ActorSystem
import com.namely.protobuf.chiefofstate.v1.common
import com.google.protobuf.any.Any
import com.namely.chiefofstate.config.HandlerSetting
import com.namely.protobuf.chiefofstate.v1.writeside.{
HandleEventRequest,
HandleEventResponse,
Expand All @@ -11,8 +11,6 @@ import com.namely.protobuf.chiefofstate.v1.writeside.{
import io.superflat.lagompb.EventHandler
import io.superflat.lagompb.protobuf.v1.core.MetaData
import org.slf4j.{Logger, LoggerFactory}
import scalapb.GeneratedMessage
import com.google.protobuf.any.Any

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
Expand All @@ -38,8 +36,8 @@ class AggregateEventHandler(
writeSideHandlerServiceClient.handleEvent(
HandleEventRequest()
.withEvent(event)
.withCurrentState(priorState)
.withMeta(Util.toCosMetaData(eventMeta))
.withPriorState(priorState)
.withEventMeta(Util.toCosMetaData(eventMeta))
)
) match {

Expand All @@ -52,13 +50,14 @@ class AggregateEventHandler(
} match {
case Failure(exception) => throw new Exception(exception.getMessage)
case Success(handleEventResponse: HandleEventResponse) =>

val stateFQN: String = Util.getProtoFullyQualifiedName(handleEventResponse.getResultingState)
log.debug(s"[ChiefOfState]: received event handler state $stateFQN")

// if enabled, validate the state type url returned by event handler
if (handlerSetting.enableProtoValidations && !handlerSetting.stateFQNs.contains(stateFQN)) {
log.error(s"[ChiefOfState]: command handler state to persist $stateFQN is not configured. Failing request")
log.error(
s"[ChiefOfState]: command handler state to persist $stateFQN is not configured. Failing request"
)
throw new Exception(s"received unknown state $stateFQN")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ class AggregateCommandHandlerSpec extends TestSpec with MockFactory {
.expects(
HandleCommandRequest()
.withCommand(cmd.getCommand)
.withCurrentState(currentState)
.withMeta(Util.toCosMetaData(currentMeta))
.withPriorState(currentState)
.withPriorEventMeta(Util.toCosMetaData(currentMeta))
)
.returning(
Future.successful(
Expand Down Expand Up @@ -254,8 +254,8 @@ class AggregateCommandHandlerSpec extends TestSpec with MockFactory {
.expects(
HandleCommandRequest()
.withCommand(innerCmd)
.withCurrentState(priorState)
.withMeta(Util.toCosMetaData(priorEventMeta))
.withPriorState(priorState)
.withPriorEventMeta(Util.toCosMetaData(priorEventMeta))
)
.returning(
Future.successful(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class AggregateEventHandlerSpec extends TestSpec with MockFactory {
.expects(
HandleEventRequest()
.withEvent(Any.pack(event))
.withCurrentState(priorState)
.withMeta(Util.toCosMetaData(eventMeta))
.withPriorState(priorState)
.withEventMeta(Util.toCosMetaData(eventMeta))
)
.returning(
Future.successful(
Expand Down Expand Up @@ -95,8 +95,8 @@ class AggregateEventHandlerSpec extends TestSpec with MockFactory {
.expects(
HandleEventRequest()
.withEvent(Any.pack(event))
.withCurrentState(priorState)
.withMeta(Util.toCosMetaData(eventMeta))
.withPriorState(priorState)
.withEventMeta(Util.toCosMetaData(eventMeta))
)
.returning(
Future.successful(
Expand Down Expand Up @@ -136,8 +136,8 @@ class AggregateEventHandlerSpec extends TestSpec with MockFactory {
.expects(
HandleEventRequest()
.withEvent(Any.pack(event))
.withCurrentState(priorState)
.withMeta(Util.toCosMetaData(eventMeta))
.withPriorState(priorState)
.withEventMeta(Util.toCosMetaData(eventMeta))
)
.returning(
Future.successful(
Expand Down Expand Up @@ -176,8 +176,8 @@ class AggregateEventHandlerSpec extends TestSpec with MockFactory {
.expects(
HandleEventRequest()
.withEvent(Any.pack(event))
.withCurrentState(priorState)
.withMeta(Util.toCosMetaData(eventMeta))
.withPriorState(priorState)
.withEventMeta(Util.toCosMetaData(eventMeta))
)
.returning(
Future.successful(
Expand Down Expand Up @@ -217,8 +217,8 @@ class AggregateEventHandlerSpec extends TestSpec with MockFactory {
.expects(
HandleEventRequest()
.withEvent(Any.pack(event))
.withCurrentState(priorState)
.withMeta(Util.toCosMetaData(eventMeta))
.withPriorState(priorState)
.withEventMeta(Util.toCosMetaData(eventMeta))
)
.returning(Future.failed(new GrpcServiceException(Status.INTERNAL)))

Expand Down Expand Up @@ -252,8 +252,8 @@ class AggregateEventHandlerSpec extends TestSpec with MockFactory {
.expects(
HandleEventRequest()
.withEvent(Any.pack(event))
.withCurrentState(priorState)
.withMeta(Util.toCosMetaData(eventMeta))
.withPriorState(priorState)
.withEventMeta(Util.toCosMetaData(eventMeta))
)
.throws(new RuntimeException("broken"))

Expand Down

0 comments on commit 0d056ab

Please sign in to comment.