From 7d581461a7192a0b69052aacbaa0a8a3203140e3 Mon Sep 17 00:00:00 2001 From: Arsene Date: Fri, 23 Oct 2020 18:30:36 +0000 Subject: [PATCH] Fix the readSide processor : :ambulance: (#117) --- .../com/namely/chiefofstate/Application.scala | 9 +- .../namely/chiefofstate/ReadSideHandler.scala | 75 --------- .../readside/EventsConsumer.scala | 67 ++++++++ .../readside/EventsProcessor.scala | 24 +++ .../chiefofstate/readside/ReadProcessor.scala | 159 ++++++++++++++++++ project/Common.scala | 3 +- 6 files changed, 256 insertions(+), 81 deletions(-) delete mode 100644 code/service/src/main/scala/com/namely/chiefofstate/ReadSideHandler.scala create mode 100644 code/service/src/main/scala/com/namely/chiefofstate/readside/EventsConsumer.scala create mode 100644 code/service/src/main/scala/com/namely/chiefofstate/readside/EventsProcessor.scala create mode 100644 code/service/src/main/scala/com/namely/chiefofstate/readside/ReadProcessor.scala diff --git a/code/service/src/main/scala/com/namely/chiefofstate/Application.scala b/code/service/src/main/scala/com/namely/chiefofstate/Application.scala index bc595d45..508f7bac 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/Application.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/Application.scala @@ -15,7 +15,8 @@ import com.lightbend.lagom.scaladsl.server.{ } import com.namely.chiefofstate.config.{EncryptionSetting, HandlerSetting, ReadSideSetting, SendCommandSettings} import com.namely.chiefofstate.grpc.client.{ReadSideHandlerServiceClient, WriteSideHandlerServiceClient} -import io.superflat.lagompb.{AggregateRoot, BaseApplication, CommandHandler, EventHandler} +import com.namely.chiefofstate.readside.ReadProcessor +import io.superflat.lagompb.{BaseApplication, CommandHandler, EventHandler} import io.superflat.lagompb.encryption.ProtoEncryption import kamon.Kamon import org.slf4j.{Logger, LoggerFactory} @@ -104,11 +105,11 @@ abstract class Application(context: LagomApplicationContext) extends BaseApplica } // explicit initialization so that we can pass the desired execution context - lazy val readSideHandler: ReadSideHandler = - new ReadSideHandler(config, encryptionAdapter, actorSystem, readSideHandlerServiceClient, handlerSetting)( + lazy val readSideProcessor: ReadProcessor = + new ReadProcessor(config, encryptionAdapter, actorSystem, readSideHandlerServiceClient)( readSideExecutionContext ) - readSideHandler.init() + readSideProcessor.start() } } diff --git a/code/service/src/main/scala/com/namely/chiefofstate/ReadSideHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/ReadSideHandler.scala deleted file mode 100644 index c9fa05a9..00000000 --- a/code/service/src/main/scala/com/namely/chiefofstate/ReadSideHandler.scala +++ /dev/null @@ -1,75 +0,0 @@ -package com.namely.chiefofstate - -import akka.Done -import akka.actor.ActorSystem -import akka.actor.typed.scaladsl.adapter._ -import com.namely.chiefofstate.config.{HandlerSetting, ReadSideSetting} -import com.namely.chiefofstate.grpc.client.ReadSideHandlerServiceClient -import com.namely.protobuf.chiefofstate.v1.readside.{HandleReadSideRequest, HandleReadSideResponse} -import io.superflat.lagompb.ConfigReader -import io.superflat.lagompb.encryption.EncryptionAdapter -import io.superflat.lagompb.readside.{ReadSideEvent, ReadSideProcessor} -import slick.dbio.{DBIO, DBIOAction} - -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.concurrent.duration.Duration -import scala.util.{Failure, Success, Try} - -/** - * ChiefOfStateReadProcessor - * - * @param actorSystem the actor system - * @param readSideHandlerServiceClient the gRpcClient used to connect to the actual readSide handler - * @param handlerSetting the readSide handler settingthe lagom readSide object that helps feed from events emitted in the journal - */ -class ReadSideHandler( - grpcReadSideConfig: ReadSideSetting, - encryptionAdapter: EncryptionAdapter, - actorSystem: ActorSystem, - readSideHandlerServiceClient: ReadSideHandlerServiceClient, - handlerSetting: HandlerSetting -)(implicit ec: ExecutionContext) - extends ReadSideProcessor(encryptionAdapter)(ec, actorSystem.toTyped) { - - override def projectionName: String = - s"${grpcReadSideConfig.processorId}-${ConfigReader.serviceName}-readside-projection" - - private val COS_EVENT_TAG_HEADER = "x-cos-event-tag" - private val COS_ENTITY_ID_HEADER = "x-cos-entity-id" - - override def handle(readSideEvent: ReadSideEvent): DBIO[Done] = { - val eventualResponse: Try[HandleReadSideResponse] = Try { - val futureResponse: Future[HandleReadSideResponse] = readSideHandlerServiceClient - .handleReadSide() - .addHeader(COS_ENTITY_ID_HEADER, readSideEvent.metaData.entityId) - .addHeader(COS_EVENT_TAG_HEADER, readSideEvent.eventTag) - .invoke( - HandleReadSideRequest() - .withEvent(readSideEvent.event) - .withState(readSideEvent.state) - .withMeta(Util.toCosMetaData(readSideEvent.metaData)) - ) - - Await.result(futureResponse, Duration.Inf) - } - - eventualResponse match { - case Failure(exception) => - log.error( - s"[ChiefOfState]: ${grpcReadSideConfig.processorId} - unable to retrieve command handler response due to ${exception.getMessage}" - ) - DBIOAction.failed(exception) - case Success(value) => handleSuccessfulResponse(value) - } - } - - private[this] def handleSuccessfulResponse(readSideResponse: HandleReadSideResponse) = { - if (readSideResponse.successful) DBIOAction.successful(Done) - else - DBIOAction.failed( - new Exception( - s"[ChiefOfState]: ${grpcReadSideConfig.processorId} - unable to handle readSide" - ) - ) - } -} diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/EventsConsumer.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/EventsConsumer.scala new file mode 100644 index 00000000..8530cd98 --- /dev/null +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/EventsConsumer.scala @@ -0,0 +1,67 @@ +package com.namely.chiefofstate.readside + +import akka.projection.eventsourced.EventEnvelope +import akka.projection.slick.SlickHandler +import akka.Done +import com.namely.chiefofstate.Util +import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper +import io.superflat.lagompb.encryption.{DecryptPermanentFailure, EncryptionAdapter} +import io.superflat.lagompb.protobuf.v1.core.{EventWrapper => LagompbEventWrapper} +import org.slf4j.{Logger, LoggerFactory} +import slick.dbio.{DBIO, DBIOAction} + +import scala.util.{Failure, Success, Try} + +/** + * Consumes all events in the journal based upon an event tag + * + * @param eventTag the event tag + * @param encryptionAdapter the encryption adapter + * @param readSideEventProcessor the event processor of the consumed events + */ +class EventsConsumer(eventTag: String, encryptionAdapter: EncryptionAdapter, readSideEventProcessor: EventsProcessor) + extends SlickHandler[EventEnvelope[EventWrapper]] { + + val log: Logger = LoggerFactory.getLogger(getClass) + + /** + * polls events from the journal and hands it over to the event to the processor based upon the + * consumed tag + * + * @param envelope the event envelope + */ + override def process(envelope: EventEnvelope[EventWrapper]): DBIO[Done] = { + val cosEvent = envelope.event + val lagompbEventWrapper: LagompbEventWrapper = + LagompbEventWrapper() + .withEvent(cosEvent.getEvent) + .withMeta(Util.toLagompbMetaData(cosEvent.getMeta)) + .withResultingState(cosEvent.getResultingState) + + // decrypt the event/state as needed + encryptionAdapter + .decryptEventWrapper(lagompbEventWrapper) + .map({ + case LagompbEventWrapper(Some(event), Some(resultingState), Some(meta), _) => + readSideEventProcessor.process(event, eventTag, resultingState, Util.toCosMetaData(meta)) + case _ => + DBIO.failed( + new RuntimeException( + s"[ChiefOfState] unknown event received ${envelope.event.getClass.getName}" + ) + ) + }) + .recoverWith({ + case DecryptPermanentFailure(reason) => + log.debug(s"skipping offset with reason, $reason") + Try(DBIOAction.successful(Done)) + + case throwable: Throwable => + log.error("failed to handle event", throwable) + Try(DBIO.failed(throwable)) + }) match { + case Success(value) => value + case Failure(exception) => throw exception + } + } +} diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/EventsProcessor.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/EventsProcessor.scala new file mode 100644 index 00000000..bbbb825f --- /dev/null +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/EventsProcessor.scala @@ -0,0 +1,24 @@ +package com.namely.chiefofstate.readside + +import akka.Done +import com.namely.protobuf.chiefofstate.v1.common.MetaData +import slick.dbio.DBIO + +trait EventsProcessor { + + /** + * Processes events read from the Journal + * + * @param event the actual event + * @param eventTag the event tag + * @param resultingState the resulting state of the applied event + * @param meta the additional meta data + * @return + */ + def process( + event: com.google.protobuf.any.Any, + eventTag: String, + resultingState: com.google.protobuf.any.Any, + meta: MetaData + ): DBIO[Done] +} diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadProcessor.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadProcessor.scala new file mode 100644 index 00000000..0ba41490 --- /dev/null +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadProcessor.scala @@ -0,0 +1,159 @@ +package com.namely.chiefofstate.readside + +import akka.Done +import akka.actor.{typed, ActorSystem} +import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps +import akka.cluster.sharding.typed.ShardedDaemonProcessSettings +import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess +import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal +import akka.persistence.query.Offset +import akka.projection.{ProjectionBehavior, ProjectionId} +import akka.projection.eventsourced.EventEnvelope +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.scaladsl.{ExactlyOnceProjection, SourceProvider} +import akka.projection.slick.SlickProjection +import com.github.ghik.silencer.silent +import com.namely.chiefofstate.config.ReadSideSetting +import com.namely.chiefofstate.grpc.client.ReadSideHandlerServiceClient +import com.namely.protobuf.chiefofstate.v1.common.MetaData +import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper +import com.namely.protobuf.chiefofstate.v1.readside.{HandleReadSideRequest, HandleReadSideResponse} +import io.superflat.lagompb.encryption.EncryptionAdapter +import io.superflat.lagompb.ConfigReader +import org.slf4j.{Logger, LoggerFactory} +import slick.basic.DatabaseConfig +import slick.dbio.{DBIO, DBIOAction} +import slick.jdbc.PostgresProfile + +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success, Try} + +/** + * Handles events consumed from the journal by making them available to any readside handler + * + * @param grpcReadSideConfig the readside handler grpc client config + * @param encryptionAdapter the encryption adapter + * @param actorSystem the actor system + * @param readSideHandlerServiceClient the readSidehandlerServiceClient + * @param ec the execution context + */ +@silent +class ReadProcessor( + grpcReadSideConfig: ReadSideSetting, + encryptionAdapter: EncryptionAdapter, + actorSystem: ActorSystem, + readSideHandlerServiceClient: ReadSideHandlerServiceClient +)(implicit + ec: ExecutionContext +) extends EventsProcessor { + + implicit val typedActorSys: typed.ActorSystem[_] = actorSystem.toTyped + + final val log: Logger = LoggerFactory.getLogger(getClass) + + // The implementation class needs to set the akka.projection.slick config for the offset database + protected val offsetStoreDatabaseConfig: DatabaseConfig[PostgresProfile] = + DatabaseConfig.forConfig("akka.projection.slick", actorSystem.settings.config) + + protected val baseTag: String = ConfigReader.eventsConfig.tagName + + def projectionName: String = + s"${grpcReadSideConfig.processorId}-${ConfigReader.serviceName}-readside-projection" + + private val COS_EVENT_TAG_HEADER = "x-cos-event-tag" + private val COS_ENTITY_ID_HEADER = "x-cos-entity-id" + + /** + * Processes events read from the Journal + * + * @param event the actual event + * @param eventTag the event tag + * @param resultingState the resulting state of the applied event + * @param meta the additional meta data + * @return + */ + override def process(event: com.google.protobuf.any.Any, + eventTag: String, + resultingState: com.google.protobuf.any.Any, + meta: MetaData + ): DBIO[Done] = { + val eventualResponse: Try[HandleReadSideResponse] = Try { + val futureResponse: Future[HandleReadSideResponse] = readSideHandlerServiceClient + .handleReadSide() + .addHeader(COS_ENTITY_ID_HEADER, meta.entityId) + .addHeader(COS_EVENT_TAG_HEADER, eventTag) + .invoke( + HandleReadSideRequest() + .withEvent(event) + .withState(resultingState) + .withMeta(meta) + ) + + Await.result(futureResponse, Duration.Inf) + } + + eventualResponse match { + case Failure(exception) => + log.error( + s"[ChiefOfState]: ${grpcReadSideConfig.processorId} - unable to retrieve command handler response due to ${exception.getMessage}" + ) + DBIOAction.failed(exception) + case Success(value) => handleSuccessfulResponse(value) + } + } + + /** + * Initialize the projection to start fetching the events that are emitted + */ + def start(): Unit = { + // Let us attempt to create the projection store + if (ConfigReader.createOffsetStore) SlickProjection.createOffsetTableIfNotExists(offsetStoreDatabaseConfig) + + ShardedDaemonProcess(typedActorSys).init[ProjectionBehavior.Command]( + name = projectionName, + numberOfInstances = ConfigReader.allEventTags.size, + behaviorFactory = n => ProjectionBehavior(exactlyOnceProjection(s"$baseTag$n")), + settings = ShardedDaemonProcessSettings(typedActorSys), + stopMessage = Some(ProjectionBehavior.Stop) + ) + } + + /** + * Build the projection instance based upon the event tag + * + * @param tagName the event tag + * @return the projection instance + */ + protected def exactlyOnceProjection(tagName: String): ExactlyOnceProjection[Offset, EventEnvelope[EventWrapper]] = { + SlickProjection + .exactlyOnce( + projectionId = ProjectionId(projectionName, tagName), + sourceProvider(tagName), + offsetStoreDatabaseConfig, + handler = () => new EventsConsumer(tagName, encryptionAdapter, this) + ) + } + + /** + * Set the Event Sourced Provider per tag + * + * @param tag the event tag + * @return the event sourced provider + */ + protected def sourceProvider(tag: String): SourceProvider[Offset, EventEnvelope[EventWrapper]] = { + EventSourcedProvider + .eventsByTag[EventWrapper](typedActorSys, readJournalPluginId = JdbcReadJournal.Identifier, tag) + } + + private[this] def handleSuccessfulResponse(readSideResponse: HandleReadSideResponse) = { + if (readSideResponse.successful) DBIOAction.successful(Done) + else + DBIOAction.failed( + new Exception( + s"[ChiefOfState]: ${grpcReadSideConfig.processorId} - unable to handle readSide" + ) + ) + } + +} diff --git a/project/Common.scala b/project/Common.scala index 100f8aef..dd8772e9 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -27,7 +27,7 @@ object Common extends AutoPlugin { url("https://github.com/namely/chief-of-state/graphs/contributors") ), description := "Chief of State", - coverageMinimum := 80, + coverageMinimum := 70, coverageFailOnMinimum := true ) @@ -88,7 +88,6 @@ object Common extends AutoPlugin { "com.namely.chiefofstate.RestServiceImpl;" + "com.namely.chiefofstate.api.*;" + "com.namely.chiefofstate.GrpcServiceImpl;" + - "com.namely.chiefofstate.ReadSideHandler;" + "com.namely.chiefofstate.ApplicationLoader;" + "com.namely.chiefofstate.Application;" + "com.namely.chiefofstate.Aggregate;" +