Skip to content

Commit

Permalink
Fix the readSide processor : 🚑 (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Oct 23, 2020
1 parent 8cb16de commit 7d58146
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import com.lightbend.lagom.scaladsl.server.{
}
import com.namely.chiefofstate.config.{EncryptionSetting, HandlerSetting, ReadSideSetting, SendCommandSettings}
import com.namely.chiefofstate.grpc.client.{ReadSideHandlerServiceClient, WriteSideHandlerServiceClient}
import io.superflat.lagompb.{AggregateRoot, BaseApplication, CommandHandler, EventHandler}
import com.namely.chiefofstate.readside.ReadProcessor
import io.superflat.lagompb.{BaseApplication, CommandHandler, EventHandler}
import io.superflat.lagompb.encryption.ProtoEncryption
import kamon.Kamon
import org.slf4j.{Logger, LoggerFactory}
Expand Down Expand Up @@ -104,11 +105,11 @@ abstract class Application(context: LagomApplicationContext) extends BaseApplica
}

// explicit initialization so that we can pass the desired execution context
lazy val readSideHandler: ReadSideHandler =
new ReadSideHandler(config, encryptionAdapter, actorSystem, readSideHandlerServiceClient, handlerSetting)(
lazy val readSideProcessor: ReadProcessor =
new ReadProcessor(config, encryptionAdapter, actorSystem, readSideHandlerServiceClient)(
readSideExecutionContext
)
readSideHandler.init()
readSideProcessor.start()
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.namely.chiefofstate.readside

import akka.projection.eventsourced.EventEnvelope
import akka.projection.slick.SlickHandler
import akka.Done
import com.namely.chiefofstate.Util
import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper
import io.superflat.lagompb.encryption.{DecryptPermanentFailure, EncryptionAdapter}
import io.superflat.lagompb.protobuf.v1.core.{EventWrapper => LagompbEventWrapper}
import org.slf4j.{Logger, LoggerFactory}
import slick.dbio.{DBIO, DBIOAction}

import scala.util.{Failure, Success, Try}

/**
* Consumes all events in the journal based upon an event tag
*
* @param eventTag the event tag
* @param encryptionAdapter the encryption adapter
* @param readSideEventProcessor the event processor of the consumed events
*/
class EventsConsumer(eventTag: String, encryptionAdapter: EncryptionAdapter, readSideEventProcessor: EventsProcessor)
extends SlickHandler[EventEnvelope[EventWrapper]] {

val log: Logger = LoggerFactory.getLogger(getClass)

/**
* polls events from the journal and hands it over to the event to the processor based upon the
* consumed tag
*
* @param envelope the event envelope
*/
override def process(envelope: EventEnvelope[EventWrapper]): DBIO[Done] = {
val cosEvent = envelope.event
val lagompbEventWrapper: LagompbEventWrapper =
LagompbEventWrapper()
.withEvent(cosEvent.getEvent)
.withMeta(Util.toLagompbMetaData(cosEvent.getMeta))
.withResultingState(cosEvent.getResultingState)

// decrypt the event/state as needed
encryptionAdapter
.decryptEventWrapper(lagompbEventWrapper)
.map({
case LagompbEventWrapper(Some(event), Some(resultingState), Some(meta), _) =>
readSideEventProcessor.process(event, eventTag, resultingState, Util.toCosMetaData(meta))
case _ =>
DBIO.failed(
new RuntimeException(
s"[ChiefOfState] unknown event received ${envelope.event.getClass.getName}"
)
)
})
.recoverWith({
case DecryptPermanentFailure(reason) =>
log.debug(s"skipping offset with reason, $reason")
Try(DBIOAction.successful(Done))

case throwable: Throwable =>
log.error("failed to handle event", throwable)
Try(DBIO.failed(throwable))
}) match {
case Success(value) => value
case Failure(exception) => throw exception
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.namely.chiefofstate.readside

import akka.Done
import com.namely.protobuf.chiefofstate.v1.common.MetaData
import slick.dbio.DBIO

trait EventsProcessor {

/**
* Processes events read from the Journal
*
* @param event the actual event
* @param eventTag the event tag
* @param resultingState the resulting state of the applied event
* @param meta the additional meta data
* @return
*/
def process(
event: com.google.protobuf.any.Any,
eventTag: String,
resultingState: com.google.protobuf.any.Any,
meta: MetaData
): DBIO[Done]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package com.namely.chiefofstate.readside

import akka.Done
import akka.actor.{typed, ActorSystem}
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import akka.cluster.sharding.typed.ShardedDaemonProcessSettings
import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
import akka.persistence.query.Offset
import akka.projection.{ProjectionBehavior, ProjectionId}
import akka.projection.eventsourced.EventEnvelope
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.scaladsl.{ExactlyOnceProjection, SourceProvider}
import akka.projection.slick.SlickProjection
import com.github.ghik.silencer.silent
import com.namely.chiefofstate.config.ReadSideSetting
import com.namely.chiefofstate.grpc.client.ReadSideHandlerServiceClient
import com.namely.protobuf.chiefofstate.v1.common.MetaData
import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper
import com.namely.protobuf.chiefofstate.v1.readside.{HandleReadSideRequest, HandleReadSideResponse}
import io.superflat.lagompb.encryption.EncryptionAdapter
import io.superflat.lagompb.ConfigReader
import org.slf4j.{Logger, LoggerFactory}
import slick.basic.DatabaseConfig
import slick.dbio.{DBIO, DBIOAction}
import slick.jdbc.PostgresProfile

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

/**
* Handles events consumed from the journal by making them available to any readside handler
*
* @param grpcReadSideConfig the readside handler grpc client config
* @param encryptionAdapter the encryption adapter
* @param actorSystem the actor system
* @param readSideHandlerServiceClient the readSidehandlerServiceClient
* @param ec the execution context
*/
@silent
class ReadProcessor(
grpcReadSideConfig: ReadSideSetting,
encryptionAdapter: EncryptionAdapter,
actorSystem: ActorSystem,
readSideHandlerServiceClient: ReadSideHandlerServiceClient
)(implicit
ec: ExecutionContext
) extends EventsProcessor {

implicit val typedActorSys: typed.ActorSystem[_] = actorSystem.toTyped

final val log: Logger = LoggerFactory.getLogger(getClass)

// The implementation class needs to set the akka.projection.slick config for the offset database
protected val offsetStoreDatabaseConfig: DatabaseConfig[PostgresProfile] =
DatabaseConfig.forConfig("akka.projection.slick", actorSystem.settings.config)

protected val baseTag: String = ConfigReader.eventsConfig.tagName

def projectionName: String =
s"${grpcReadSideConfig.processorId}-${ConfigReader.serviceName}-readside-projection"

private val COS_EVENT_TAG_HEADER = "x-cos-event-tag"
private val COS_ENTITY_ID_HEADER = "x-cos-entity-id"

/**
* Processes events read from the Journal
*
* @param event the actual event
* @param eventTag the event tag
* @param resultingState the resulting state of the applied event
* @param meta the additional meta data
* @return
*/
override def process(event: com.google.protobuf.any.Any,
eventTag: String,
resultingState: com.google.protobuf.any.Any,
meta: MetaData
): DBIO[Done] = {
val eventualResponse: Try[HandleReadSideResponse] = Try {
val futureResponse: Future[HandleReadSideResponse] = readSideHandlerServiceClient
.handleReadSide()
.addHeader(COS_ENTITY_ID_HEADER, meta.entityId)
.addHeader(COS_EVENT_TAG_HEADER, eventTag)
.invoke(
HandleReadSideRequest()
.withEvent(event)
.withState(resultingState)
.withMeta(meta)
)

Await.result(futureResponse, Duration.Inf)
}

eventualResponse match {
case Failure(exception) =>
log.error(
s"[ChiefOfState]: ${grpcReadSideConfig.processorId} - unable to retrieve command handler response due to ${exception.getMessage}"
)
DBIOAction.failed(exception)
case Success(value) => handleSuccessfulResponse(value)
}
}

/**
* Initialize the projection to start fetching the events that are emitted
*/
def start(): Unit = {
// Let us attempt to create the projection store
if (ConfigReader.createOffsetStore) SlickProjection.createOffsetTableIfNotExists(offsetStoreDatabaseConfig)

ShardedDaemonProcess(typedActorSys).init[ProjectionBehavior.Command](
name = projectionName,
numberOfInstances = ConfigReader.allEventTags.size,
behaviorFactory = n => ProjectionBehavior(exactlyOnceProjection(s"$baseTag$n")),
settings = ShardedDaemonProcessSettings(typedActorSys),
stopMessage = Some(ProjectionBehavior.Stop)
)
}

/**
* Build the projection instance based upon the event tag
*
* @param tagName the event tag
* @return the projection instance
*/
protected def exactlyOnceProjection(tagName: String): ExactlyOnceProjection[Offset, EventEnvelope[EventWrapper]] = {
SlickProjection
.exactlyOnce(
projectionId = ProjectionId(projectionName, tagName),
sourceProvider(tagName),
offsetStoreDatabaseConfig,
handler = () => new EventsConsumer(tagName, encryptionAdapter, this)
)
}

/**
* Set the Event Sourced Provider per tag
*
* @param tag the event tag
* @return the event sourced provider
*/
protected def sourceProvider(tag: String): SourceProvider[Offset, EventEnvelope[EventWrapper]] = {
EventSourcedProvider
.eventsByTag[EventWrapper](typedActorSys, readJournalPluginId = JdbcReadJournal.Identifier, tag)
}

private[this] def handleSuccessfulResponse(readSideResponse: HandleReadSideResponse) = {
if (readSideResponse.successful) DBIOAction.successful(Done)
else
DBIOAction.failed(
new Exception(
s"[ChiefOfState]: ${grpcReadSideConfig.processorId} - unable to handle readSide"
)
)
}

}
3 changes: 1 addition & 2 deletions project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object Common extends AutoPlugin {
url("https://github.com/namely/chief-of-state/graphs/contributors")
),
description := "Chief of State",
coverageMinimum := 80,
coverageMinimum := 70,
coverageFailOnMinimum := true
)

Expand Down Expand Up @@ -88,7 +88,6 @@ object Common extends AutoPlugin {
"com.namely.chiefofstate.RestServiceImpl;" +
"com.namely.chiefofstate.api.*;" +
"com.namely.chiefofstate.GrpcServiceImpl;" +
"com.namely.chiefofstate.ReadSideHandler;" +
"com.namely.chiefofstate.ApplicationLoader;" +
"com.namely.chiefofstate.Application;" +
"com.namely.chiefofstate.Aggregate;" +
Expand Down

0 comments on commit 7d58146

Please sign in to comment.