Skip to content

Commit

Permalink
lagom-pb 0.9.0+3, simplify command handler response (#92)
Browse files Browse the repository at this point in the history
* update lagom-pb 0.9.0+3
* update-protos and HandleCommandResponse
* remove empty ReadSideHandlerSpec for now
* override akka deps to make test pass
* fix coverage problems
  • Loading branch information
Zen Yui authored Sep 13, 2020
1 parent f9facc0 commit 08be370
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 742 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ class Aggregate(
eventHandler: EventHandler,
encryptionAdapter: EncryptionAdapter
) extends AggregateRoot(actorSystem, commandHandler, eventHandler, Empty.defaultInstance, encryptionAdapter) {
// $COVERAGE-OFF$

override def aggregateName: String = "chiefOfState"

// $COVERAGE-ON$
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import com.namely.protobuf.chiefofstate.v1.writeside.{
HandleCommandResponse,
WriteSideHandlerServiceClient
}
import com.namely.protobuf.chiefofstate.v1.writeside.HandleCommandResponse.ResponseType.{PersistAndReply, Reply}
import com.namely.chiefofstate.config.HandlerSetting
import io.grpc.{Status, StatusRuntimeException}
import io.superflat.lagompb.{CommandHandler, ProtosRegistry}
Expand Down Expand Up @@ -98,16 +97,10 @@ class AggregateCommandHandler(
if (priorEventMeta.revisionNumber > 0) {
log.debug(s"[ChiefOfState] found state for entity ${command.entityId}")
CommandHandlerResponse()
.withSuccessResponse(
SuccessCommandHandlerResponse()
.withNoEvent(com.google.protobuf.empty.Empty.defaultInstance)
)
} else {
log.error(s"[ChiefOfState] could not find state for entity ${command.entityId}")
CommandHandlerResponse()
.withFailedResponse(
AggregateCommandHandler.GET_STATE_NOT_FOUND_FAILURE
)
.withFailure(FailureResponse().withNotFound("entity not found"))
}
}

Expand All @@ -128,9 +121,10 @@ class AggregateCommandHandler(
// make blocking gRPC call to handler service
val responseAttempt: Try[HandleCommandResponse] = Try {
// construct the request message
val handleCommandRequest = HandleCommandRequest(command=remoteCommand.command)
.withCurrentState(priorState)
.withMeta(Util.toCosMetaData(priorEventMeta))
val handleCommandRequest =
HandleCommandRequest(command=remoteCommand.command)
.withCurrentState(priorState)
.withMeta(Util.toCosMetaData(priorEventMeta))

// create an akka gRPC request builder
val futureResponse: Future[HandleCommandResponse] =
Expand All @@ -144,7 +138,7 @@ class AggregateCommandHandler(
request.addHeader(header.key, value)
case RemoteCommand.Header.Value.BytesValue(value) =>
request.addHeader(header.key, akka.util.ByteString(value.toByteArray))
case unhandled => throw new Exception(s"unhandled gRPC header type, ${unhandled.getClass.getName}")
case _ => throw new Exception(s"header value must be string or bytes")
}
}
)
Expand All @@ -168,55 +162,29 @@ class AggregateCommandHandler(
* @return an instance of CommandHandlerResponse
*/
def handleRemoteResponseSuccess(response: HandleCommandResponse): CommandHandlerResponse = {
response.responseType match {
case PersistAndReply(persistAndReply) =>
response.event match {
case Some(event) =>

log.debug("[ChiefOfState] command handler return successfully. An event will be persisted...")
val eventFQN: String = Util.getProtoFullyQualifiedName(persistAndReply.getEvent)

log.debug(s"[ChiefOfState] command handler event to persist $eventFQN")
if (handlerSetting.enableProtoValidations) {
if (handlerSetting.eventFQNs.contains(eventFQN)) {
log.debug(s"[ChiefOfState] command handler event to persist $eventFQN is valid.")
CommandHandlerResponse()
.withSuccessResponse(
SuccessCommandHandlerResponse()
.withEvent(persistAndReply.getEvent)
)
} else {
log.error(
s"[ChiefOfState] command handler event to persist $eventFQN is not configured. Failing request"
)
CommandHandlerResponse()
.withFailedResponse(
FailedCommandHandlerResponse()
.withReason(s"received unknown event type $eventFQN")
.withCause(FailureCause.VALIDATION_ERROR)
)
}
} else {
log.debug(s"[ChiefOfState] command handler event to persist $eventFQN. FQN validation skipped.")
val eventFQN: String = Util.getProtoFullyQualifiedName(event)

if (handlerSetting.enableProtoValidations && !handlerSetting.eventFQNs.contains(eventFQN)) {
log.error(s"[ChiefOfState] command handler returned unknown event type, $eventFQN")
CommandHandlerResponse()
.withSuccessResponse(
SuccessCommandHandlerResponse()
.withEvent(persistAndReply.getEvent)
.withFailure(
FailureResponse()
.withValidation(s"received unknown event type $eventFQN")
)
} else {
log.debug(s"[ChiefOfState] command handler event to persist $eventFQN is valid.")
CommandHandlerResponse()
.withEvent(event)
}

case Reply(_) =>
case None =>
log.debug("[ChiefOfState] command handler return successfully. No event will be persisted...")
CommandHandlerResponse()
.withSuccessResponse(
SuccessCommandHandlerResponse()
.withNoEvent(com.google.protobuf.empty.Empty.defaultInstance)
)

case unhandled =>
CommandHandlerResponse()
.withFailedResponse(
FailedCommandHandlerResponse()
.withReason(s"command handler returned malformed event, ${unhandled.getClass.getName}")
.withCause(FailureCause.INTERNAL_ERROR)
)
}
}

Expand All @@ -235,38 +203,28 @@ class AggregateCommandHandler(
log.error(s"[ChiefOfState] $reason")

// handle specific gRPC error statuses
val cause =
val failureResponse =
if (GRPC_FAILED_VALIDATION_STATUSES.contains(status.getCode)) {
FailureCause.VALIDATION_ERROR
FailureResponse().withValidation(status.getDescription)
} else {
FailureCause.INTERNAL_ERROR
FailureResponse().withCritical(status.getDescription)
}

CommandHandlerResponse()
.withFailedResponse(
FailedCommandHandlerResponse()
.withReason(reason)
.withCause(cause)
)
CommandHandlerResponse().withFailure(failureResponse)

case e: GrpcServiceException =>
log.error(s"[ChiefOfState] handler gRPC failed with ${e.status.toString}", e)
CommandHandlerResponse()
.withFailedResponse(
FailedCommandHandlerResponse()
.withReason(e.getStatus.toString)
.withCause(FailureCause.INTERNAL_ERROR)
.withFailure(
FailureResponse().withCritical(e.getStatus.getDescription)
)

case e: Throwable =>
log.error(s"[ChiefOfState] gRPC handler critical failure", e)
CommandHandlerResponse()
.withFailedResponse(
FailedCommandHandlerResponse()
.withReason(
s"Critical error occurred handling command, ${e.getMessage}"
)
.withCause(FailureCause.INTERNAL_ERROR)
.withFailure(
FailureResponse()
.withCritical(s"Critical error occurred handling command, ${e.getMessage}")
)
}
}
Expand All @@ -286,10 +244,4 @@ object AggregateCommandHandler {
Status.Code.OUT_OF_RANGE,
Status.Code.PERMISSION_DENIED
)

// constant failure for entity not found
val GET_STATE_NOT_FOUND_FAILURE: FailedCommandHandlerResponse =
FailedCommandHandlerResponse()
.withReason("entity not found")
.withCause(FailureCause.INTERNAL_ERROR)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.namely.protobuf.chiefofstate.v1.writeside.{
HandleEventResponse,
WriteSideHandlerServiceClient
}
import io.superflat.lagompb.{EventHandler, GlobalException}
import io.superflat.lagompb.EventHandler
import io.superflat.lagompb.protobuf.v1.core.MetaData
import org.slf4j.{Logger, LoggerFactory}
import scalapb.GeneratedMessage
Expand Down Expand Up @@ -44,13 +44,13 @@ class AggregateEventHandler(
) match {

case Failure(e) =>
throw new GlobalException(e.getMessage)
throw new Exception(e.getMessage)

case Success(eventualEventResponse: Future[HandleEventResponse]) =>
Try {
Await.result(eventualEventResponse, Duration.Inf)
} match {
case Failure(exception) => throw new GlobalException(exception.getMessage)
case Failure(exception) => throw new Exception(exception.getMessage)
case Success(handleEventResponse: HandleEventResponse) =>

val stateFQN: String = Util.getProtoFullyQualifiedName(handleEventResponse.getResultingState)
Expand All @@ -59,7 +59,7 @@ class AggregateEventHandler(
// if enabled, validate the state type url returned by event handler
if (handlerSetting.enableProtoValidations && !handlerSetting.stateFQNs.contains(stateFQN)) {
log.error(s"[ChiefOfState]: command handler state to persist $stateFQN is not configured. Failing request")
throw new GlobalException(s"received unknown state $stateFQN")
throw new Exception(s"received unknown state $stateFQN")
}

// pass through state returned by event handler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import com.lightbend.lagom.internal.persistence.ReadSideConfig
* @param context the application context
*/
abstract class Application(context: LagomApplicationContext) extends BaseApplication(context) {
// $COVERAGE-OFF$

// reflect encryption from config
override def protoEncryption: Option[ProtoEncryption] = EncryptionSetting(config).encryption
Expand Down Expand Up @@ -79,22 +78,17 @@ abstract class Application(context: LagomApplicationContext) extends BaseApplica
chiefOfStateReadProcessor.init()
}
}
// $COVERAGE-ON$
}

/**
* ApplicationLoader boostraps the application at runtime
*/
class ApplicationLoader extends LagomApplicationLoader {

// $COVERAGE-OFF$
override def load(context: LagomApplicationContext): LagomApplication =
new Application(context) with AkkaDiscoveryComponents

override def loadDevMode(context: LagomApplicationContext): LagomApplication =
new Application(context) with LagomDevModeComponents

override def describeService: Option[Descriptor] = Some(readDescriptor[ChiefOfStateService])

// $COVERAGE-ON$
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import com.namely.chiefofstate.config.SendCommandSettings

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Try, Success, Failure}
import io.superflat.lagompb.GlobalException
import io.superflat.lagompb.protobuf.v1.core.StateWrapper

class GrpcServiceImpl(sys: ActorSystem,
Expand Down Expand Up @@ -106,24 +105,11 @@ class GrpcServiceImpl(sys: ActorSystem,
)
} else {
sendCommand(in.entityId, in, Map.empty[String, String])
.transform({
// transform success to a GetStateResponse
case Success(stateWrapper: StateWrapper) =>
Success(
GetStateResponse(
state = stateWrapper.state,
meta = stateWrapper.meta.map(Util.toCosMetaData)
)
)

// handle not-found errors specifically
case Failure(e) if e.getMessage == AggregateCommandHandler.GET_STATE_NOT_FOUND_FAILURE.reason =>
Failure(new GrpcServiceException(status = Status.NOT_FOUND.withDescription("COS could not find entity")))

// pass through other failures
case Failure(e) =>
log.error(s"unhandled error in getState", e)
Failure(e)
.map((stateWrapper: StateWrapper) => {
GetStateResponse(
state = stateWrapper.state,
meta = stateWrapper.meta.map(Util.toCosMetaData)
)
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.namely.protobuf.chiefofstate.v1.readside.{
ReadSideHandlerServiceClient
}
import com.namely.chiefofstate.config.{HandlerSetting, ReadSideSetting}
import io.superflat.lagompb.{ConfigReader, GlobalException}
import io.superflat.lagompb.ConfigReader
import io.superflat.lagompb.encryption.EncryptionAdapter
import com.namely.chiefofstate.Util
import io.superflat.lagompb.readside.{ReadSideEvent, ReadSideProcessor}
Expand All @@ -36,13 +36,10 @@ class ReadSideHandler(
handlerSetting: HandlerSetting
)(implicit ec: ExecutionContext)
extends ReadSideProcessor(encryptionAdapter)(ec, actorSystem.toTyped) {
// $COVERAGE-OFF$

override def projectionName: String =
s"${grpcReadSideConfig.processorId}-${ConfigReader.serviceName}-readside-projection"

// $COVERAGE-ON$

private val COS_EVENT_TAG_HEADER = "x-cos-event-tag"
private val COS_ENTITY_ID_HEADER = "x-cos-entity-id"

Expand All @@ -63,22 +60,22 @@ class ReadSideHandler(
log.error(
s"[ChiefOfState]: ${grpcReadSideConfig.processorId} - unable to retrieve command handler response due to ${exception.getMessage}"
)
DBIOAction.failed(throw new GlobalException(exception.getMessage))
DBIOAction.failed(throw new Exception(exception.getMessage))
case Success(eventualReadSideResponse: Future[HandleReadSideResponse]) =>
Try {
Await.result(eventualReadSideResponse, Duration.Inf)
} match {
case Failure(exception) =>
DBIOAction.failed(
throw new GlobalException(
throw new Exception(
s"[ChiefOfState]: ${grpcReadSideConfig.processorId} - ${exception.getMessage}"
)
)
case Success(value) =>
if (value.successful) DBIOAction.successful(Done)
else
DBIOAction.failed(
throw new GlobalException(
throw new Exception(
s"[ChiefOfState]: ${grpcReadSideConfig.processorId} - unable to handle readSide"
)
)
Expand Down
Loading

0 comments on commit 08be370

Please sign in to comment.