From 97d6f753ad3f39a2649f9363b3ea0f341f521b54 Mon Sep 17 00:00:00 2001 From: Arsene Tochemey Gandote Date: Fri, 11 Jun 2021 16:54:54 +0000 Subject: [PATCH 1/5] streaming readside initial commit --- .../readside/ReadSideJdbcStreamHandler.scala | 32 +++++ .../readside/ReadSideStreamHandler.scala | 128 ++++++++++++++++++ .../readside/ReadSideStreamProjection.scala | 3 + 3 files changed, 163 insertions(+) create mode 100644 code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcStreamHandler.scala create mode 100644 code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamHandler.scala create mode 100644 code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamProjection.scala diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcStreamHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcStreamHandler.scala new file mode 100644 index 00000000..01b09c67 --- /dev/null +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcStreamHandler.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2020 Namely Inc. + * + * SPDX-License-Identifier: MIT + */ + +package com.namely.chiefofstate.readside +import akka.projection.eventsourced.EventEnvelope +import akka.projection.jdbc.scaladsl.JdbcHandler +import akka.projection.jdbc.JdbcSession +import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper + +/** + * Implements the the akka projection JdbcHandler interface and forwards events to the + * stream readside handler + * + * @param processorId read side processor id + * @param readSideStreamHandler a remote handler implementation + */ +private[readside] class ReadSideJdbcStreamHandler(processorId: String, readSideStreamHandler: ReadSideStreamHandler) + extends JdbcHandler[Seq[EventEnvelope[EventWrapper]], JdbcSession] { + + override def process(session: JdbcSession, envelopes: Seq[EventEnvelope[EventWrapper]]): Unit = { + // construct the list of events to push out + val events = envelopes.map(envelope => { + (envelope.event.getEvent, envelope.event.getResultingState, envelope.event.getMeta) + }) + + // send to the remote gRPC stream handler + readSideStreamHandler.processEvents(events) + } +} diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamHandler.scala new file mode 100644 index 00000000..063faded --- /dev/null +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamHandler.scala @@ -0,0 +1,128 @@ +/* + * Copyright 2020 Namely Inc. + * + * SPDX-License-Identifier: MIT + */ + +package com.namely.chiefofstate.readside +import com.google.protobuf.any +import com.namely.chiefofstate.config.GrpcConfig +import com.namely.protobuf.chiefofstate.v1.common.MetaData +import com.namely.protobuf.chiefofstate.v1.readside.{ HandleReadSideRequest, HandleReadSideResponse } +import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.ReadSideHandlerServiceStub +import io.grpc.stub.StreamObserver +import io.grpc.Status +import org.slf4j.{ Logger, LoggerFactory } + +import java.util.concurrent.{ CountDownLatch, TimeUnit } + +/** + * Processes events read from the Journal by sending them to the read side server + * as gRPC stream + */ +private[readside] trait ReadSideStreamHandler { + + /** + * handles a sequence of events that will be used to build a read model + * + * @param events the sequence of events to handle + */ + def processEvents(events: Seq[(any.Any, any.Any, MetaData)]): Unit +} + +/** + * Receives the readside response from an observable stream of messages. + * + * @param processorId the processor id + * @param doneSignal the async signal notification + */ +private[readside] case class HandleReadSideResponseStreamObserver(processorId: String, doneSignal: CountDownLatch) + extends StreamObserver[HandleReadSideResponse] { + + private val logger: Logger = LoggerFactory.getLogger(this.getClass) + + override def onNext(response: HandleReadSideResponse): Unit = { + // onNext will be called only once after the server has finished processing the messages + logger.info("received a server response...") + if (!response.successful) { + val errMsg: String = + s"read side streaming message not handled, processor=$processorId" + logger.warn(errMsg) + throw new RuntimeException(errMsg) + } + } + + override def onError(t: Throwable): Unit = { + val status = Status.fromThrowable(t) + val errMsg: String = + s"read side streaming returned failure, processor=$processorId, cause=$status" + logger.warn(errMsg) + doneSignal.countDown() + } + + override def onCompleted(): Unit = { + // the server is done sending us data + // onCompleted will be called right after onNext() + doneSignal.countDown() + } +} + +/** + * read side processor that sends messages to a gRPC server that implements + * the ReadSideHandler service + * + * @param processorId the unique Id for this read side + * @param readSideHandlerServiceStub a non-blocking client for a ReadSideHandler + */ +private[readside] case class ReadSideStreamHandlerImpl( + processorId: String, + grpcConfig: GrpcConfig, + readSideHandlerServiceStub: ReadSideHandlerServiceStub, + doneSignal: CountDownLatch = new CountDownLatch(1)) + extends ReadSideStreamHandler { + + private val logger: Logger = LoggerFactory.getLogger(this.getClass) + + /** + * handles a sequence of events that will be used to build a read model + * + * @param events the read event envelopes to handle + * @return true or false + */ + override def processEvents(events: Seq[(any.Any, any.Any, MetaData)]): Unit = { + // create an instance of the response stream observer + val readSideResponseStreamObserver: HandleReadSideResponseStreamObserver = + HandleReadSideResponseStreamObserver(processorId = processorId, doneSignal = doneSignal) + + // create the readSide request observer + val readSideRequestObserver: StreamObserver[HandleReadSideRequest] = + readSideHandlerServiceStub + .withDeadlineAfter(grpcConfig.client.timeout, TimeUnit.MILLISECONDS) + .handleReadSideStream(readSideResponseStreamObserver) + + try { + val it = events.iterator + val proceed: Boolean = doneSignal.getCount == 0 + while (proceed && it.hasNext) { + val (event, resultingState, meta) = it.next() + val readSideRequest: HandleReadSideRequest = + HandleReadSideRequest().withEvent(event).withState(resultingState).withMeta(meta).withReadSideId(processorId) + + // send the request to the server + readSideRequestObserver.onNext(readSideRequest) + } + } catch { + case e: RuntimeException => + logger.error(s"read side processing failure, processor=$processorId, cause=${e.getMessage}") + // Cancel RPC call + readSideRequestObserver.onError(e) + throw e; + } + + // we tell the server that the client is done sending data + readSideRequestObserver.onCompleted() + + // Receiving happens asynchronously. + doneSignal.await() + } +} diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamProjection.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamProjection.scala new file mode 100644 index 00000000..ea9f2855 --- /dev/null +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamProjection.scala @@ -0,0 +1,3 @@ +package com.namely.chiefofstate.readside class ReadSideStreamProjection { + +} From a3c672635fab3a49a558b543b12234cf727f8720 Mon Sep 17 00:00:00 2001 From: Arsene Tochemey Gandote Date: Fri, 11 Jun 2021 16:55:39 +0000 Subject: [PATCH 2/5] checkin --- .../chiefofstate/config/ReadSideConfig.scala | 3 +- .../config/ReadSideConfigReader.scala | 4 + .../readside/ReadSideManager.scala | 62 ++++++++++---- .../readside/ReadSideStreamProjection.scala | 82 ++++++++++++++++++- .../config/ReadSideConfigReaderSpec.scala | 10 ++- proto/chief-of-state-protos | 2 +- 6 files changed, 141 insertions(+), 22 deletions(-) diff --git a/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfig.scala b/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfig.scala index bd470702..36f09c4e 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfig.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfig.scala @@ -11,7 +11,8 @@ final case class ReadSideConfig( host: String = "", port: Int = -1, useTls: Boolean = false, - settings: Map[String, String] = Map.empty[String, String]) { + settings: Map[String, String] = Map.empty[String, String], + useStreaming: Boolean = false) { /** * Adds a setting to the config diff --git a/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfigReader.scala b/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfigReader.scala index d47ee421..dcb78c5b 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfigReader.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/config/ReadSideConfigReader.scala @@ -12,6 +12,7 @@ object ReadSideConfigReader { val READ_SIDE_HOST_KEY: String = "HOST" val READ_SIDE_PORT_KEY: String = "PORT" val READ_SIDE_TLS_KEY: String = "USE_TLS" + val READ_SIDE_USE_STREAMING_KEY: String = "USE_STREAMING" val logger: Logger = LoggerFactory.getLogger(this.getClass) @@ -57,6 +58,9 @@ object ReadSideConfigReader { case (config, (READ_SIDE_TLS_KEY, value)) => config.copy(useTls = value.toBooleanOption.getOrElse(false)) + case (config, (READ_SIDE_USE_STREAMING_KEY, value)) => + config.copy(useStreaming = value.toBooleanOption.getOrElse(false)) + case (config, (key, value)) => config.addSetting(key, value) }) diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala index c0ebd550..435733d9 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala @@ -8,8 +8,11 @@ package com.namely.chiefofstate.readside import akka.actor.typed.ActorSystem import com.namely.chiefofstate.NettyHelper -import com.namely.chiefofstate.config.{ ReadSideConfig, ReadSideConfigReader } -import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.ReadSideHandlerServiceBlockingStub +import com.namely.chiefofstate.config.{ GrpcConfig, ReadSideConfig, ReadSideConfigReader } +import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.{ + ReadSideHandlerServiceBlockingStub, + ReadSideHandlerServiceStub +} import com.typesafe.config.Config import com.zaxxer.hikari.{ HikariConfig, HikariDataSource } import io.grpc.ClientInterceptor @@ -23,13 +26,15 @@ import org.slf4j.{ Logger, LoggerFactory } * @param dbConfig the DB config for creating a hikari data source * @param readSideConfigs sequence of configs for specific read sides * @param numShards number of shards for projections/tags + * @param grpcConfig the grpc configuration */ class ReadSideManager( system: ActorSystem[_], interceptors: Seq[ClientInterceptor], dbConfig: ReadSideManager.DbConfig, readSideConfigs: Seq[ReadSideConfig], - numShards: Int) { + numShards: Int, + grpcConfig: GrpcConfig) { private val logger: Logger = LoggerFactory.getLogger(this.getClass) @@ -44,18 +49,40 @@ class ReadSideManager( readSideConfigs.foreach(rsconfig => { logger.info(s"starting read side, id=${rsconfig.processorId}") - - // construct a remote gRPC read side client for this read side - // and register interceptors - val rpcClient: ReadSideHandlerServiceBlockingStub = new ReadSideHandlerServiceBlockingStub( - NettyHelper.builder(rsconfig.host, rsconfig.port, rsconfig.useTls).build).withInterceptors(interceptors: _*) - // instantiate a remote read side processor with the gRPC client - val remoteReadSideProcessor: ReadSideHandlerImpl = new ReadSideHandlerImpl(rsconfig.processorId, rpcClient) - // instantiate the read side projection with the remote processor - val projection = - new ReadSideProjection(system, rsconfig.processorId, dataSource, remoteReadSideProcessor, numShards) - // start the sharded daemon process - projection.start() + // FIXME draft implementation + if (rsconfig.useStreaming) { + // construct a remote gRPC streaming read side client + val rpcStreamingClient = + new ReadSideHandlerServiceStub(NettyHelper.builder(rsconfig.host, rsconfig.port, rsconfig.useTls).build) + .withInterceptors(interceptors: _*) + + // instantiate a remote read side stream processor with the gRPC client + val remoteReadSideStreamProcessor: ReadSideStreamHandlerImpl = + ReadSideStreamHandlerImpl(rsconfig.processorId, grpcConfig, rpcStreamingClient) + + // instantiate the read side projection with the remote processor + val projection: ReadSideStreamProjection = + new ReadSideStreamProjection( + system, + rsconfig.processorId, + dataSource, + remoteReadSideStreamProcessor, + numShards) + // start the sharded daemon process + projection.start() + } else { + // construct a remote gRPC read side client for this read side + // and register interceptors + val rpcClient: ReadSideHandlerServiceBlockingStub = new ReadSideHandlerServiceBlockingStub( + NettyHelper.builder(rsconfig.host, rsconfig.port, rsconfig.useTls).build).withInterceptors(interceptors: _*) + // instantiate a remote read side processor with the gRPC client + val remoteReadSideProcessor: ReadSideHandlerImpl = new ReadSideHandlerImpl(rsconfig.processorId, rpcClient) + // instantiate the read side projection with the remote processor + val projection = + new ReadSideProjection(system, rsconfig.processorId, dataSource, remoteReadSideProcessor, numShards) + // start the sharded daemon process + projection.start() + } }) } } @@ -71,6 +98,8 @@ object ReadSideManager { DbConfig(jdbcCfg) } + val grpcConfig: GrpcConfig = GrpcConfig(system.settings.config) + // get the individual read side configs val configs: Seq[ReadSideConfig] = ReadSideConfigReader.getReadSideSettings // make the manager @@ -79,7 +108,8 @@ object ReadSideManager { interceptors = interceptors, dbConfig = dbConfig, readSideConfigs = configs, - numShards = numShards) + numShards = numShards, + grpcConfig) } /** diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamProjection.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamProjection.scala index ea9f2855..1ce8843d 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamProjection.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamProjection.scala @@ -1,3 +1,83 @@ -package com.namely.chiefofstate.readside class ReadSideStreamProjection { +/* + * Copyright 2020 Namely Inc. + * + * SPDX-License-Identifier: MIT + */ +package com.namely.chiefofstate.readside +import akka.actor.typed.{ ActorSystem, Behavior } +import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess +import akka.cluster.sharding.typed.ShardedDaemonProcessSettings +import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal +import akka.persistence.query.Offset +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.eventsourced.EventEnvelope +import akka.projection.jdbc.scaladsl.JdbcProjection +import akka.projection.scaladsl.{ GroupedProjection, SourceProvider } +import akka.projection.{ ProjectionBehavior, ProjectionId } +import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper +import org.slf4j.{ Logger, LoggerFactory } + +import javax.sql.DataSource +import scala.concurrent.duration.DurationInt + +private[readside] class ReadSideStreamProjection( + actorSystem: ActorSystem[_], + val processorId: String, + val dataSource: DataSource, + readSideStreamHandler: ReadSideStreamHandler, + val numShards: Int) { + + final val log: Logger = LoggerFactory.getLogger(getClass) + + implicit val sys: ActorSystem[_] = actorSystem + + /** + * Initialize the projection to start fetching the events that are emitted + */ + def start(): Unit = { + ShardedDaemonProcess(actorSystem).init[ProjectionBehavior.Command]( + name = processorId, + numberOfInstances = numShards, + behaviorFactory = shardNumber => jdbcGroupedProjection(shardNumber.toString), + settings = ShardedDaemonProcessSettings(actorSystem), + stopMessage = Some(ProjectionBehavior.Stop)) + } + + /** + * creates a jdbc grouped projection + * + * @param tagName the event tag + * @return the jdbc grouped projection behavior + */ + private[readside] def jdbcGroupedProjection(tagName: String): Behavior[ProjectionBehavior.Command] = { + val projection: GroupedProjection[Offset, EventEnvelope[EventWrapper]] = + JdbcProjection + .groupedWithin( + projectionId = ProjectionId(processorId, tagName), + sourceProvider = ReadSideProjection.sourceProvider(actorSystem, tagName), + // defines a session factory that returns a jdbc + // session connected to the hikari pool + sessionFactory = () => new ReadSideJdbcSession(dataSource.getConnection()), + handler = () => new ReadSideJdbcStreamHandler(processorId, readSideStreamHandler)) + .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis) // FIXME set this values in configuration + + ProjectionBehavior(projection) + } +} + +private[readside] object ReadSideStreamProjection { + + /** + * Set the Event Sourced Provider per tag + * + * @param system the actor system + * @param tag the event tag + * @return the event sourced provider + */ + private[readside] def sourceProvider( + system: ActorSystem[_], + tag: String): SourceProvider[Offset, EventEnvelope[EventWrapper]] = { + EventSourcedProvider.eventsByTag[EventWrapper](system, readJournalPluginId = JdbcReadJournal.Identifier, tag) + } } diff --git a/code/service/src/test/scala/com/namely/chiefofstate/config/ReadSideConfigReaderSpec.scala b/code/service/src/test/scala/com/namely/chiefofstate/config/ReadSideConfigReaderSpec.scala index d22330ae..b88d04c4 100644 --- a/code/service/src/test/scala/com/namely/chiefofstate/config/ReadSideConfigReaderSpec.scala +++ b/code/service/src/test/scala/com/namely/chiefofstate/config/ReadSideConfigReaderSpec.scala @@ -34,14 +34,18 @@ class ReadSideConfigReaderSpec extends BaseSpec { EnvironmentHelper.setEnv("COS_READ_SIDE_CONFIG__HOST__RS3", "host3") EnvironmentHelper.setEnv("COS_READ_SIDE_CONFIG__PORT__RS3", "3") EnvironmentHelper.setEnv("COS_READ_SIDE_CONFIG__USE_TLS__RS3", "true") + EnvironmentHelper.setEnv("COS_READ_SIDE_CONFIG__USE_STREAMING__RS3", "true") val grpcReadSideSetting1: ReadSideConfig = - ReadSideConfig("RS1", "host1", 1, false).addSetting("GRPC_SOME_SETTING", "setting1") + ReadSideConfig("RS1", "host1", 1, false, Map.empty[String, String], false) + .addSetting("GRPC_SOME_SETTING", "setting1") val grpcReadSideSetting2: ReadSideConfig = - ReadSideConfig("RS2", "host2", 2, false).addSetting("GRPC_SOME_SETTING", "setting2") + ReadSideConfig("RS2", "host2", 2, false, Map.empty[String, String], false) + .addSetting("GRPC_SOME_SETTING", "setting2") - val grpcReadSideSetting3: ReadSideConfig = ReadSideConfig("RS3", "host3", 3, true) + val grpcReadSideSetting3: ReadSideConfig = + ReadSideConfig("RS3", "host3", 3, true, Map.empty[String, String], true) val actual: Seq[ReadSideConfig] = ReadSideConfigReader.getReadSideSettings val expected: Seq[ReadSideConfig] = Seq(grpcReadSideSetting1, grpcReadSideSetting2, grpcReadSideSetting3) diff --git a/proto/chief-of-state-protos b/proto/chief-of-state-protos index 80c2d197..42d42320 160000 --- a/proto/chief-of-state-protos +++ b/proto/chief-of-state-protos @@ -1 +1 @@ -Subproject commit 80c2d19722b69dcce4f05efa6197b85862c81753 +Subproject commit 42d42320c55e98570d5978b641f782d56884b0fd From afb99b05c98cd0d430777767f7c9ca4686ca2484 Mon Sep 17 00:00:00 2001 From: Arsene Tochemey Gandote Date: Fri, 11 Jun 2021 17:18:43 +0000 Subject: [PATCH 3/5] checkin --- .../readside/ReadSideStreamHandler.scala | 16 ++++++++++------ proto/chief-of-state-protos | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamHandler.scala index 063faded..7d8f6b66 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamHandler.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamHandler.scala @@ -8,7 +8,7 @@ package com.namely.chiefofstate.readside import com.google.protobuf.any import com.namely.chiefofstate.config.GrpcConfig import com.namely.protobuf.chiefofstate.v1.common.MetaData -import com.namely.protobuf.chiefofstate.v1.readside.{ HandleReadSideRequest, HandleReadSideResponse } +import com.namely.protobuf.chiefofstate.v1.readside.{ HandleReadSideStreamRequest, HandleReadSideStreamResponse } import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.ReadSideHandlerServiceStub import io.grpc.stub.StreamObserver import io.grpc.Status @@ -37,11 +37,11 @@ private[readside] trait ReadSideStreamHandler { * @param doneSignal the async signal notification */ private[readside] case class HandleReadSideResponseStreamObserver(processorId: String, doneSignal: CountDownLatch) - extends StreamObserver[HandleReadSideResponse] { + extends StreamObserver[HandleReadSideStreamResponse] { private val logger: Logger = LoggerFactory.getLogger(this.getClass) - override def onNext(response: HandleReadSideResponse): Unit = { + override def onNext(response: HandleReadSideStreamResponse): Unit = { // onNext will be called only once after the server has finished processing the messages logger.info("received a server response...") if (!response.successful) { @@ -95,7 +95,7 @@ private[readside] case class ReadSideStreamHandlerImpl( HandleReadSideResponseStreamObserver(processorId = processorId, doneSignal = doneSignal) // create the readSide request observer - val readSideRequestObserver: StreamObserver[HandleReadSideRequest] = + val readSideRequestObserver: StreamObserver[HandleReadSideStreamRequest] = readSideHandlerServiceStub .withDeadlineAfter(grpcConfig.client.timeout, TimeUnit.MILLISECONDS) .handleReadSideStream(readSideResponseStreamObserver) @@ -105,8 +105,12 @@ private[readside] case class ReadSideStreamHandlerImpl( val proceed: Boolean = doneSignal.getCount == 0 while (proceed && it.hasNext) { val (event, resultingState, meta) = it.next() - val readSideRequest: HandleReadSideRequest = - HandleReadSideRequest().withEvent(event).withState(resultingState).withMeta(meta).withReadSideId(processorId) + val readSideRequest: HandleReadSideStreamRequest = + HandleReadSideStreamRequest() + .withEvent(event) + .withState(resultingState) + .withMeta(meta) + .withReadSideId(processorId) // send the request to the server readSideRequestObserver.onNext(readSideRequest) diff --git a/proto/chief-of-state-protos b/proto/chief-of-state-protos index 42d42320..570fb7c1 160000 --- a/proto/chief-of-state-protos +++ b/proto/chief-of-state-protos @@ -1 +1 @@ -Subproject commit 42d42320c55e98570d5978b641f782d56884b0fd +Subproject commit 570fb7c1935dcf2153383f13fb7b533e1aaadf02 From 41a02f89bd54ac4bcb61d7b40f7b875e53e88d21 Mon Sep 17 00:00:00 2001 From: Arsene Tochemey Gandote Date: Fri, 11 Jun 2021 17:23:43 +0000 Subject: [PATCH 4/5] checkin --- .../com/namely/chiefofstate/readside/ReadSideManager.scala | 1 + .../{ => streaming}/ReadSideJdbcStreamHandler.scala | 3 ++- .../readside/{ => streaming}/ReadSideStreamHandler.scala | 3 ++- .../readside/{ => streaming}/ReadSideStreamProjection.scala | 6 ++++-- 4 files changed, 9 insertions(+), 4 deletions(-) rename code/service/src/main/scala/com/namely/chiefofstate/readside/{ => streaming}/ReadSideJdbcStreamHandler.scala (95%) rename code/service/src/main/scala/com/namely/chiefofstate/readside/{ => streaming}/ReadSideStreamHandler.scala (98%) rename code/service/src/main/scala/com/namely/chiefofstate/readside/{ => streaming}/ReadSideStreamProjection.scala (95%) diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala index 435733d9..20f457be 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala @@ -9,6 +9,7 @@ package com.namely.chiefofstate.readside import akka.actor.typed.ActorSystem import com.namely.chiefofstate.NettyHelper import com.namely.chiefofstate.config.{ GrpcConfig, ReadSideConfig, ReadSideConfigReader } +import com.namely.chiefofstate.readside.streaming.{ ReadSideStreamHandlerImpl, ReadSideStreamProjection } import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.{ ReadSideHandlerServiceBlockingStub, ReadSideHandlerServiceStub diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcStreamHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideJdbcStreamHandler.scala similarity index 95% rename from code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcStreamHandler.scala rename to code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideJdbcStreamHandler.scala index 01b09c67..cbef0885 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcStreamHandler.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideJdbcStreamHandler.scala @@ -4,7 +4,8 @@ * SPDX-License-Identifier: MIT */ -package com.namely.chiefofstate.readside +package com.namely.chiefofstate.readside.streaming + import akka.projection.eventsourced.EventEnvelope import akka.projection.jdbc.scaladsl.JdbcHandler import akka.projection.jdbc.JdbcSession diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamHandler.scala similarity index 98% rename from code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamHandler.scala rename to code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamHandler.scala index 7d8f6b66..9bd88969 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamHandler.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamHandler.scala @@ -4,7 +4,8 @@ * SPDX-License-Identifier: MIT */ -package com.namely.chiefofstate.readside +package com.namely.chiefofstate.readside.streaming + import com.google.protobuf.any import com.namely.chiefofstate.config.GrpcConfig import com.namely.protobuf.chiefofstate.v1.common.MetaData diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamProjection.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamProjection.scala similarity index 95% rename from code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamProjection.scala rename to code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamProjection.scala index 1ce8843d..2406a06e 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideStreamProjection.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamProjection.scala @@ -4,17 +4,19 @@ * SPDX-License-Identifier: MIT */ -package com.namely.chiefofstate.readside +package com.namely.chiefofstate.readside.streaming + import akka.actor.typed.{ ActorSystem, Behavior } import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal import akka.persistence.query.Offset +import akka.projection.{ ProjectionBehavior, ProjectionId } import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.eventsourced.EventEnvelope import akka.projection.jdbc.scaladsl.JdbcProjection import akka.projection.scaladsl.{ GroupedProjection, SourceProvider } -import akka.projection.{ ProjectionBehavior, ProjectionId } +import com.namely.chiefofstate.readside.{ ReadSideJdbcSession, ReadSideProjection } import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper import org.slf4j.{ Logger, LoggerFactory } From 9eeb5e5131e0e53ce654d6345f8f36a6d2a9701b Mon Sep 17 00:00:00 2001 From: Arsene Tochemey Gandote Date: Fri, 11 Jun 2021 17:37:31 +0000 Subject: [PATCH 5/5] checkin --- .../streaming/ReadSideStreamHandler.scala | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamHandler.scala index 9bd88969..c861589b 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamHandler.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/streaming/ReadSideStreamHandler.scala @@ -16,6 +16,7 @@ import io.grpc.Status import org.slf4j.{ Logger, LoggerFactory } import java.util.concurrent.{ CountDownLatch, TimeUnit } +import scala.util.{ Failure, Success, Try } /** * Processes events read from the Journal by sending them to the read side server @@ -101,27 +102,29 @@ private[readside] case class ReadSideStreamHandlerImpl( .withDeadlineAfter(grpcConfig.client.timeout, TimeUnit.MILLISECONDS) .handleReadSideStream(readSideResponseStreamObserver) - try { - val it = events.iterator + Try { val proceed: Boolean = doneSignal.getCount == 0 - while (proceed && it.hasNext) { - val (event, resultingState, meta) = it.next() - val readSideRequest: HandleReadSideStreamRequest = - HandleReadSideStreamRequest() - .withEvent(event) - .withState(resultingState) - .withMeta(meta) - .withReadSideId(processorId) - - // send the request to the server - readSideRequestObserver.onNext(readSideRequest) - } - } catch { - case e: RuntimeException => + events.foreach(elt => { + if (proceed) { + val (event, resultingState, meta) = elt + val readSideRequest: HandleReadSideStreamRequest = + HandleReadSideStreamRequest() + .withEvent(event) + .withState(resultingState) + .withMeta(meta) + .withReadSideId(processorId) + + // send the request to the server + readSideRequestObserver.onNext(readSideRequest) + } + }) + } match { + case Failure(e) => logger.error(s"read side processing failure, processor=$processorId, cause=${e.getMessage}") // Cancel RPC call readSideRequestObserver.onError(e) throw e; + case Success(_) => } // we tell the server that the client is done sending data