Skip to content

Commit

Permalink
Issue 45: rich gRPC errors (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zen Yui authored Jul 29, 2020
1 parent 8504a03 commit 82718ef
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.namely.protobuf.chief_of_state.writeside.{
WriteSideHandlerServiceClient
}
import com.namely.protobuf.chief_of_state.writeside.HandleCommandResponse.ResponseType.{Empty, PersistAndReply, Reply}
import io.grpc.Status
import io.grpc.{Status, StatusRuntimeException}
import io.superflat.lagompb.{Command, CommandHandler}
import io.superflat.lagompb.protobuf.core._
import org.slf4j.{Logger, LoggerFactory}
Expand All @@ -35,6 +35,8 @@ class AggregateCommandHandler(
handlerSetting: HandlerSetting
) extends CommandHandler[State](actorSystem) {

import AggregateCommandHandler.GRPC_FAILED_VALIDATION_STATUSES

final val log: Logger = LoggerFactory.getLogger(getClass)

/**
Expand Down Expand Up @@ -155,15 +157,36 @@ class AggregateCommandHandler(
)
}

case Failure(e: StatusRuntimeException) =>
val status: Status = e.getStatus()
val reason: String = s"command failed (${status.getCode.name}) ${status.getDescription()}"
log.error(s"[ChiefOfState] $reason")

// handle specific gRPC error statuses
val cause =
if (GRPC_FAILED_VALIDATION_STATUSES.contains(status.getCode)) {
FailureCause.ValidationError
} else {
FailureCause.InternalError
}

CommandHandlerResponse()
.withFailedResponse(
FailedCommandHandlerResponse()
.withReason(reason)
.withCause(cause)
)

case Failure(e: GrpcServiceException) =>
log.error(s"[ChiefOfState] handler gRPC failed with ${e.status.toString()}", e)
CommandHandlerResponse()
.withFailedResponse(
FailedCommandHandlerResponse()
.withReason(e.status.toString)
.withReason(e.getStatus.toString)
.withCause(FailureCause.InternalError)
)


case Failure(e: Throwable) =>
log.error(s"[ChiefOfState] gRPC handler critical failure", e)
CommandHandlerResponse()
Expand All @@ -175,3 +198,19 @@ class AggregateCommandHandler(
}
}
}

/**
* companion object
*/
object AggregateCommandHandler {

// statuses that should be considered validation errors
// in the command handler
val GRPC_FAILED_VALIDATION_STATUSES: Set[Status.Code] = Set(
Status.Code.INVALID_ARGUMENT,
Status.Code.ALREADY_EXISTS,
Status.Code.FAILED_PRECONDITION,
Status.Code.OUT_OF_RANGE,
Status.Code.PERMISSION_DENIED
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.namely.chiefofstate
import akka.actor.ActorSystem
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.grpc.scaladsl.Metadata
import akka.grpc.GrpcServiceException
import com.google.protobuf.any.Any
import com.namely.protobuf.chief_of_state.common
import com.namely.protobuf.chief_of_state.persistence.State
Expand All @@ -15,14 +16,18 @@ import com.namely.protobuf.chief_of_state.service.{
}
import io.superflat.lagompb.{AggregateRoot, BaseGrpcServiceImpl, StateAndMeta}
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Failure
import org.slf4j.{Logger, LoggerFactory}


class GrpcServiceImpl(sys: ActorSystem, clusterSharding: ClusterSharding, aggregate: AggregateRoot[State])(implicit
ec: ExecutionContext
) extends AbstractChiefOfStateServicePowerApiRouter(sys)
with BaseGrpcServiceImpl {

private val log: Logger = LoggerFactory.getLogger(getClass)

override def aggregateRoot: AggregateRoot[_] = aggregate

override def aggregateStateCompanion: GeneratedMessageCompanion[_ <: GeneratedMessage] = State
Expand All @@ -35,13 +40,21 @@ class GrpcServiceImpl(sys: ActorSystem, clusterSharding: ClusterSharding, aggreg
*/
override def processCommand(in: ProcessCommandRequest, metadata: Metadata): Future[ProcessCommandResponse] = {

sendCommand[Any, State](clusterSharding, in.entityId, in.command.get, Map.empty[String, String])
.map((namelyState: StateAndMeta[State]) => {
ProcessCommandResponse(
state = namelyState.state.currentState,
meta = Some(Util.toCosMetaData(namelyState.metaData))
)
})
if(in.entityId.isEmpty()) {
val status = io.grpc.Status.INVALID_ARGUMENT.withDescription("empty entity ID")
val e: Throwable = new GrpcServiceException(status = status)
log.error(s"request missing entity id")
Future.fromTry(Failure(e))

} else {
sendCommand[Any, State](clusterSharding, in.entityId, in.command.get, Map.empty[String, String])
.map((namelyState: StateAndMeta[State]) => {
ProcessCommandResponse(
state = namelyState.state.currentState,
meta = Some(Util.toCosMetaData(namelyState.metaData))
)
})
}
}

/** gRPC GetState implementation
Expand All @@ -51,12 +64,20 @@ class GrpcServiceImpl(sys: ActorSystem, clusterSharding: ClusterSharding, aggreg
* @return future of GetStateResponse
*/
override def getState(in: GetStateRequest, metadata: Metadata): Future[GetStateResponse] = {
sendCommand[GetStateRequest, State](clusterSharding, in.entityId, in, Map.empty[String, String])
.map((namelyState: StateAndMeta[State]) => {
GetStateResponse(
state = namelyState.state.currentState,
meta = Some(Util.toCosMetaData(namelyState.metaData))
)
})
if(in.entityId.isEmpty()) {
val status = io.grpc.Status.INVALID_ARGUMENT.withDescription("empty entity ID")
val e: Throwable = new GrpcServiceException(status = status)
log.error(s"request missing entity id")
Future.fromTry(Failure(e))

} else {
sendCommand[GetStateRequest, State](clusterSharding, in.entityId, in, Map.empty[String, String])
.map((namelyState: StateAndMeta[State]) => {
GetStateResponse(
state = namelyState.state.currentState,
meta = Some(Util.toCosMetaData(namelyState.metaData))
)
})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.namely.protobuf.chief_of_state.tests.{Account, AccountOpened, OpenAcc
import com.namely.protobuf.chief_of_state.writeside._
import com.namely.protobuf.chief_of_state.writeside.HandleCommandResponse.ResponseType
import com.namely.protobuf.chief_of_state.service.GetStateRequest
import io.grpc.Status
import io.grpc.{Status, StatusRuntimeException}
import io.superflat.lagompb.protobuf.core._
import io.superflat.lagompb.testkit.LagompbSpec
import io.superflat.lagompb.Command
Expand Down Expand Up @@ -318,7 +318,7 @@ class AggregateCommandHandlerSpec extends LagompbSpec with MockFactory {
)
}

"handle grpc exception sent by command handler as expected" in {
"handle failed validations sent by command handler" in {
val priorState: State = State.defaultInstance
val priorEventMeta: MetaData = MetaData.defaultInstance
val stateProto: Seq[String] = Seq(Util.getProtoFullyQualifiedName(Any.pack(Account.defaultInstance)))
Expand All @@ -334,22 +334,84 @@ class AggregateCommandHandlerSpec extends LagompbSpec with MockFactory {
// let us create a mock instance of the handler service client
val mockGrpcClient = mock[WriteSideHandlerServiceClient]

val badStatus: Status = Status.INVALID_ARGUMENT.withDescription("very invalid")

(mockGrpcClient
.handleCommand(_: HandleCommandRequest))
.expects(*)
.throws(new GrpcServiceException(Status.INVALID_ARGUMENT))
.throws(new StatusRuntimeException(badStatus))

// let us execute the request
val cmdhandler = new AggregateCommandHandler(null, mockGrpcClient, handlerSetting)
val result: Try[CommandHandlerResponse] = cmdhandler.handle(cmd, priorState, priorEventMeta)
result shouldBe (Success(
val result: CommandHandlerResponse = cmdhandler.handleRemoteCommand(cmd, priorState, priorEventMeta)
result.getFailedResponse.reason.contains(badStatus.getDescription) shouldBe(true)
result.getFailedResponse.reason.contains(badStatus.getCode.name) shouldBe(true)
result.getFailedResponse.cause shouldBe(FailureCause.ValidationError)
}

"handle gRPC internal errors from command handler" in {
val priorState: State = State.defaultInstance
val priorEventMeta: MetaData = MetaData.defaultInstance
val stateProto: Seq[String] = Seq(Util.getProtoFullyQualifiedName(Any.pack(Account.defaultInstance)))
val eventsProtos: Seq[String] = Seq(Util.getProtoFullyQualifiedName(Any.pack(AccountOpened.defaultInstance)))
val handlerSetting: HandlerSetting = HandlerSetting(stateProto, eventsProtos)

val cmd = Command(
Any.pack(OpenAccount.defaultInstance),
null, // ignore the actor ref in this test
Map.empty
)

// let us create a mock instance of the handler service client
val mockGrpcClient = mock[WriteSideHandlerServiceClient]

val badStatus: Status = Status.INTERNAL.withDescription("super broken")

(mockGrpcClient
.handleCommand(_: HandleCommandRequest))
.expects(*)
.throws(new StatusRuntimeException(badStatus))

// let us execute the request
val cmdhandler = new AggregateCommandHandler(null, mockGrpcClient, handlerSetting)
val result: CommandHandlerResponse = cmdhandler.handleRemoteCommand(cmd, priorState, priorEventMeta)
result.getFailedResponse.reason.contains(badStatus.getDescription) shouldBe(true)
result.getFailedResponse.reason.contains(badStatus.getCode.name) shouldBe(true)
result.getFailedResponse.cause shouldBe(FailureCause.InternalError)
}

"handles akka gRPC exceptions" in {
val stateProto: Seq[String] = Seq(Util.getProtoFullyQualifiedName(Any.pack(Account.defaultInstance)))
val eventsProtos: Seq[String] = Seq(Util.getProtoFullyQualifiedName(Any.pack(AccountOpened.defaultInstance)))
val handlerSetting: HandlerSetting = HandlerSetting(stateProto, eventsProtos)

val cmd = Command(
Any.pack(OpenAccount.defaultInstance),
null, // ignore the actor ref in this test
Map.empty
)

// let us create a mock instance of the handler service client
val mockGrpcClient = mock[WriteSideHandlerServiceClient]
val badStatus: Status = Status.INTERNAL.withDescription("grpc broken")

(mockGrpcClient
.handleCommand(_: HandleCommandRequest))
.expects(*)
.throws(new GrpcServiceException(status=badStatus))

// let us execute the request
val cmdhandler = new AggregateCommandHandler(null, mockGrpcClient, handlerSetting)
val result: CommandHandlerResponse = cmdhandler.handleRemoteCommand(cmd, State.defaultInstance, MetaData.defaultInstance)

result shouldBe(
CommandHandlerResponse()
.withFailedResponse(
FailedCommandHandlerResponse()
.withReason(Status.INVALID_ARGUMENT.toString)
.withReason(badStatus.toString())
.withCause(FailureCause.InternalError)
)
))
)
}

"handles a critical grpc failure" in {
Expand Down

0 comments on commit 82718ef

Please sign in to comment.