diff --git a/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfig.scala b/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfig.scala index bd470702..36f09c4e 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfig.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfig.scala @@ -11,7 +11,8 @@ final case class ReadSideConfig( host: String = "", port: Int = -1, useTls: Boolean = false, - settings: Map[String, String] = Map.empty[String, String]) { + settings: Map[String, String] = Map.empty[String, String], + useStreaming: Boolean = false) { /** * Adds a setting to the config diff --git a/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfigReader.scala b/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfigReader.scala index d47ee421..dcb78c5b 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfigReader.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfigReader.scala @@ -12,6 +12,7 @@ object ReadSideConfigReader { val READ_SIDE_HOST_KEY: String = "HOST" val READ_SIDE_PORT_KEY: String = "PORT" val READ_SIDE_TLS_KEY: String = "USE_TLS" + val READ_SIDE_USE_STREAMING_KEY: String = "USE_STREAMING" val logger: Logger = LoggerFactory.getLogger(this.getClass) @@ -57,6 +58,9 @@ object ReadSideConfigReader { case (config, (READ_SIDE_TLS_KEY, value)) => config.copy(useTls = value.toBooleanOption.getOrElse(false)) + case (config, (READ_SIDE_USE_STREAMING_KEY, value)) => + config.copy(useStreaming = value.toBooleanOption.getOrElse(false)) + case (config, (key, value)) => config.addSetting(key, value) }) diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala index c0ebd550..20f457be 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala @@ -8,8 +8,12 @@ package com.namely.chiefofstate.readside import akka.actor.typed.ActorSystem import com.namely.chiefofstate.NettyHelper -import com.namely.chiefofstate.config.{ ReadSideConfig, ReadSideConfigReader } -import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.ReadSideHandlerServiceBlockingStub +import com.namely.chiefofstate.config.{ GrpcConfig, ReadSideConfig, ReadSideConfigReader } +import com.namely.chiefofstate.readside.streaming.{ ReadSideStreamHandlerImpl, ReadSideStreamProjection } +import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.{ + ReadSideHandlerServiceBlockingStub, + ReadSideHandlerServiceStub +} import com.typesafe.config.Config import com.zaxxer.hikari.{ HikariConfig, HikariDataSource } import io.grpc.ClientInterceptor @@ -23,13 +27,15 @@ import org.slf4j.{ Logger, LoggerFactory } * @param dbConfig the DB config for creating a hikari data source * @param readSideConfigs sequence of configs for specific read sides * @param numShards number of shards for projections/tags + * @param grpcConfig the grpc configuration */ class ReadSideManager( system: ActorSystem[_], interceptors: Seq[ClientInterceptor], dbConfig: ReadSideManager.DbConfig, readSideConfigs: Seq[ReadSideConfig], - numShards: Int) { + numShards: Int, + grpcConfig: GrpcConfig) { private val logger: Logger = LoggerFactory.getLogger(this.getClass) @@ -44,18 +50,40 @@ class ReadSideManager( readSideConfigs.foreach(rsconfig => { logger.info(s"starting read side, id=${rsconfig.processorId}") - - // construct a remote gRPC read side client for this read side - // and register interceptors - val rpcClient: ReadSideHandlerServiceBlockingStub = new ReadSideHandlerServiceBlockingStub( - NettyHelper.builder(rsconfig.host, rsconfig.port, rsconfig.useTls).build).withInterceptors(interceptors: _*) - // instantiate a remote read side processor with the gRPC client - val remoteReadSideProcessor: ReadSideHandlerImpl = new ReadSideHandlerImpl(rsconfig.processorId, rpcClient) - // instantiate the read side projection with the remote processor - val projection = - new ReadSideProjection(system, rsconfig.processorId, dataSource, remoteReadSideProcessor, numShards) - // start the sharded daemon process - projection.start() + // FIXME draft implementation + if (rsconfig.useStreaming) { + // construct a remote gRPC streaming read side client + val rpcStreamingClient = + new ReadSideHandlerServiceStub(NettyHelper.builder(rsconfig.host, rsconfig.port, rsconfig.useTls).build) + .withInterceptors(interceptors: _*) + + // instantiate a remote read side stream processor with the gRPC client + val remoteReadSideStreamProcessor: ReadSideStreamHandlerImpl = + ReadSideStreamHandlerImpl(rsconfig.processorId, grpcConfig, rpcStreamingClient) + + // instantiate the read side projection with the remote processor + val projection: ReadSideStreamProjection = + new ReadSideStreamProjection( + system, + rsconfig.processorId, + dataSource, + remoteReadSideStreamProcessor, + numShards) + // start the sharded daemon process + projection.start() + } else { + // construct a remote gRPC read side client for this read side + // and register interceptors + val rpcClient: ReadSideHandlerServiceBlockingStub = new ReadSideHandlerServiceBlockingStub( + NettyHelper.builder(rsconfig.host, rsconfig.port, rsconfig.useTls).build).withInterceptors(interceptors: _*) + // instantiate a remote read side processor with the gRPC client + val remoteReadSideProcessor: ReadSideHandlerImpl = new ReadSideHandlerImpl(rsconfig.processorId, rpcClient) + // instantiate the read side projection with the remote processor + val projection = + new ReadSideProjection(system, rsconfig.processorId, dataSource, remoteReadSideProcessor, numShards) + // start the sharded daemon process + projection.start() + } }) } } @@ -71,6 +99,8 @@ object ReadSideManager { DbConfig(jdbcCfg) } + val grpcConfig: GrpcConfig = GrpcConfig(system.settings.config) + // get the individual read side configs val configs: Seq[ReadSideConfig] = ReadSideConfigReader.getReadSideSettings // make the manager @@ -79,7 +109,8 @@ object ReadSideManager { interceptors = interceptors, dbConfig = dbConfig, readSideConfigs = configs, - numShards = numShards) + numShards = numShards, + grpcConfig) } /** diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideJdbcStreamHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideJdbcStreamHandler.scala new file mode 100644 index 00000000..cbef0885 --- /dev/null +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideJdbcStreamHandler.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2020 Namely Inc. + * + * SPDX-License-Identifier: MIT + */ + +package com.namely.chiefofstate.readside.streaming + +import akka.projection.eventsourced.EventEnvelope +import akka.projection.jdbc.scaladsl.JdbcHandler +import akka.projection.jdbc.JdbcSession +import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper + +/** + * Implements the the akka projection JdbcHandler interface and forwards events to the + * stream readside handler + * + * @param processorId read side processor id + * @param readSideStreamHandler a remote handler implementation + */ +private[readside] class ReadSideJdbcStreamHandler(processorId: String, readSideStreamHandler: ReadSideStreamHandler) + extends JdbcHandler[Seq[EventEnvelope[EventWrapper]], JdbcSession] { + + override def process(session: JdbcSession, envelopes: Seq[EventEnvelope[EventWrapper]]): Unit = { + // construct the list of events to push out + val events = envelopes.map(envelope => { + (envelope.event.getEvent, envelope.event.getResultingState, envelope.event.getMeta) + }) + + // send to the remote gRPC stream handler + readSideStreamHandler.processEvents(events) + } +} diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamHandler.scala new file mode 100644 index 00000000..c861589b --- /dev/null +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamHandler.scala @@ -0,0 +1,136 @@ +/* + * Copyright 2020 Namely Inc. + * + * SPDX-License-Identifier: MIT + */ + +package com.namely.chiefofstate.readside.streaming + +import com.google.protobuf.any +import com.namely.chiefofstate.config.GrpcConfig +import com.namely.protobuf.chiefofstate.v1.common.MetaData +import com.namely.protobuf.chiefofstate.v1.readside.{ HandleReadSideStreamRequest, HandleReadSideStreamResponse } +import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.ReadSideHandlerServiceStub +import io.grpc.stub.StreamObserver +import io.grpc.Status +import org.slf4j.{ Logger, LoggerFactory } + +import java.util.concurrent.{ CountDownLatch, TimeUnit } +import scala.util.{ Failure, Success, Try } + +/** + * Processes events read from the Journal by sending them to the read side server + * as gRPC stream + */ +private[readside] trait ReadSideStreamHandler { + + /** + * handles a sequence of events that will be used to build a read model + * + * @param events the sequence of events to handle + */ + def processEvents(events: Seq[(any.Any, any.Any, MetaData)]): Unit +} + +/** + * Receives the readside response from an observable stream of messages. + * + * @param processorId the processor id + * @param doneSignal the async signal notification + */ +private[readside] case class HandleReadSideResponseStreamObserver(processorId: String, doneSignal: CountDownLatch) + extends StreamObserver[HandleReadSideStreamResponse] { + + private val logger: Logger = LoggerFactory.getLogger(this.getClass) + + override def onNext(response: HandleReadSideStreamResponse): Unit = { + // onNext will be called only once after the server has finished processing the messages + logger.info("received a server response...") + if (!response.successful) { + val errMsg: String = + s"read side streaming message not handled, processor=$processorId" + logger.warn(errMsg) + throw new RuntimeException(errMsg) + } + } + + override def onError(t: Throwable): Unit = { + val status = Status.fromThrowable(t) + val errMsg: String = + s"read side streaming returned failure, processor=$processorId, cause=$status" + logger.warn(errMsg) + doneSignal.countDown() + } + + override def onCompleted(): Unit = { + // the server is done sending us data + // onCompleted will be called right after onNext() + doneSignal.countDown() + } +} + +/** + * read side processor that sends messages to a gRPC server that implements + * the ReadSideHandler service + * + * @param processorId the unique Id for this read side + * @param readSideHandlerServiceStub a non-blocking client for a ReadSideHandler + */ +private[readside] case class ReadSideStreamHandlerImpl( + processorId: String, + grpcConfig: GrpcConfig, + readSideHandlerServiceStub: ReadSideHandlerServiceStub, + doneSignal: CountDownLatch = new CountDownLatch(1)) + extends ReadSideStreamHandler { + + private val logger: Logger = LoggerFactory.getLogger(this.getClass) + + /** + * handles a sequence of events that will be used to build a read model + * + * @param events the read event envelopes to handle + * @return true or false + */ + override def processEvents(events: Seq[(any.Any, any.Any, MetaData)]): Unit = { + // create an instance of the response stream observer + val readSideResponseStreamObserver: HandleReadSideResponseStreamObserver = + HandleReadSideResponseStreamObserver(processorId = processorId, doneSignal = doneSignal) + + // create the readSide request observer + val readSideRequestObserver: StreamObserver[HandleReadSideStreamRequest] = + readSideHandlerServiceStub + .withDeadlineAfter(grpcConfig.client.timeout, TimeUnit.MILLISECONDS) + .handleReadSideStream(readSideResponseStreamObserver) + + Try { + val proceed: Boolean = doneSignal.getCount == 0 + events.foreach(elt => { + if (proceed) { + val (event, resultingState, meta) = elt + val readSideRequest: HandleReadSideStreamRequest = + HandleReadSideStreamRequest() + .withEvent(event) + .withState(resultingState) + .withMeta(meta) + .withReadSideId(processorId) + + // send the request to the server + readSideRequestObserver.onNext(readSideRequest) + } + }) + } match { + case Failure(e) => + logger.error(s"read side processing failure, processor=$processorId, cause=${e.getMessage}") + // Cancel RPC call + readSideRequestObserver.onError(e) + throw e; + case Success(_) => + } + + // we tell the server that the client is done sending data + readSideRequestObserver.onCompleted() + + // Receiving happens asynchronously. + doneSignal.await() + } +} diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamProjection.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamProjection.scala new file mode 100644 index 00000000..2406a06e --- /dev/null +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamProjection.scala @@ -0,0 +1,85 @@ +/* + * Copyright 2020 Namely Inc. + * + * SPDX-License-Identifier: MIT + */ + +package com.namely.chiefofstate.readside.streaming + +import akka.actor.typed.{ ActorSystem, Behavior } +import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess +import akka.cluster.sharding.typed.ShardedDaemonProcessSettings +import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal +import akka.persistence.query.Offset +import akka.projection.{ ProjectionBehavior, ProjectionId } +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.eventsourced.EventEnvelope +import akka.projection.jdbc.scaladsl.JdbcProjection +import akka.projection.scaladsl.{ GroupedProjection, SourceProvider } +import com.namely.chiefofstate.readside.{ ReadSideJdbcSession, ReadSideProjection } +import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper +import org.slf4j.{ Logger, LoggerFactory } + +import javax.sql.DataSource +import scala.concurrent.duration.DurationInt + +private[readside] class ReadSideStreamProjection( + actorSystem: ActorSystem[_], + val processorId: String, + val dataSource: DataSource, + readSideStreamHandler: ReadSideStreamHandler, + val numShards: Int) { + + final val log: Logger = LoggerFactory.getLogger(getClass) + + implicit val sys: ActorSystem[_] = actorSystem + + /** + * Initialize the projection to start fetching the events that are emitted + */ + def start(): Unit = { + ShardedDaemonProcess(actorSystem).init[ProjectionBehavior.Command]( + name = processorId, + numberOfInstances = numShards, + behaviorFactory = shardNumber => jdbcGroupedProjection(shardNumber.toString), + settings = ShardedDaemonProcessSettings(actorSystem), + stopMessage = Some(ProjectionBehavior.Stop)) + } + + /** + * creates a jdbc grouped projection + * + * @param tagName the event tag + * @return the jdbc grouped projection behavior + */ + private[readside] def jdbcGroupedProjection(tagName: String): Behavior[ProjectionBehavior.Command] = { + val projection: GroupedProjection[Offset, EventEnvelope[EventWrapper]] = + JdbcProjection + .groupedWithin( + projectionId = ProjectionId(processorId, tagName), + sourceProvider = ReadSideProjection.sourceProvider(actorSystem, tagName), + // defines a session factory that returns a jdbc + // session connected to the hikari pool + sessionFactory = () => new ReadSideJdbcSession(dataSource.getConnection()), + handler = () => new ReadSideJdbcStreamHandler(processorId, readSideStreamHandler)) + .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis) // FIXME set this values in configuration + + ProjectionBehavior(projection) + } +} + +private[readside] object ReadSideStreamProjection { + + /** + * Set the Event Sourced Provider per tag + * + * @param system the actor system + * @param tag the event tag + * @return the event sourced provider + */ + private[readside] def sourceProvider( + system: ActorSystem[_], + tag: String): SourceProvider[Offset, EventEnvelope[EventWrapper]] = { + EventSourcedProvider.eventsByTag[EventWrapper](system, readJournalPluginId = JdbcReadJournal.Identifier, tag) + } +} diff --git a/code/service/src/test/scala/com/namely/chiefofstate/config/ReadSideConfigReaderSpec.scala b/code/service/src/test/scala/com/namely/chiefofstate/config/ReadSideConfigReaderSpec.scala index d22330ae..b88d04c4 100644 --- a/code/service/src/test/scala/com/namely/chiefofstate/config/ReadSideConfigReaderSpec.scala +++ b/code/service/src/test/scala/com/namely/chiefofstate/config/ReadSideConfigReaderSpec.scala @@ -34,14 +34,18 @@ class ReadSideConfigReaderSpec extends BaseSpec { EnvironmentHelper.setEnv("COS_READ_SIDE_CONFIG__HOST__RS3", "host3") EnvironmentHelper.setEnv("COS_READ_SIDE_CONFIG__PORT__RS3", "3") EnvironmentHelper.setEnv("COS_READ_SIDE_CONFIG__USE_TLS__RS3", "true") + EnvironmentHelper.setEnv("COS_READ_SIDE_CONFIG__USE_STREAMING__RS3", "true") val grpcReadSideSetting1: ReadSideConfig = - ReadSideConfig("RS1", "host1", 1, false).addSetting("GRPC_SOME_SETTING", "setting1") + ReadSideConfig("RS1", "host1", 1, false, Map.empty[String, String], false) + .addSetting("GRPC_SOME_SETTING", "setting1") val grpcReadSideSetting2: ReadSideConfig = - ReadSideConfig("RS2", "host2", 2, false).addSetting("GRPC_SOME_SETTING", "setting2") + ReadSideConfig("RS2", "host2", 2, false, Map.empty[String, String], false) + .addSetting("GRPC_SOME_SETTING", "setting2") - val grpcReadSideSetting3: ReadSideConfig = ReadSideConfig("RS3", "host3", 3, true) + val grpcReadSideSetting3: ReadSideConfig = + ReadSideConfig("RS3", "host3", 3, true, Map.empty[String, String], true) val actual: Seq[ReadSideConfig] = ReadSideConfigReader.getReadSideSettings val expected: Seq[ReadSideConfig] = Seq(grpcReadSideSetting1, grpcReadSideSetting2, grpcReadSideSetting3) diff --git a/proto/chief-of-state-protos b/proto/chief-of-state-protos index 80c2d197..570fb7c1 160000 --- a/proto/chief-of-state-protos +++ b/proto/chief-of-state-protos @@ -1 +1 @@ -Subproject commit 80c2d19722b69dcce4f05efa6197b85862c81753 +Subproject commit 570fb7c1935dcf2153383f13fb7b533e1aaadf02