Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming readside #378

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ final case class ReadSideConfig(
host: String = "",
port: Int = -1,
useTls: Boolean = false,
settings: Map[String, String] = Map.empty[String, String]) {
settings: Map[String, String] = Map.empty[String, String],
useStreaming: Boolean = false) {

/**
* Adds a setting to the config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ object ReadSideConfigReader {
val READ_SIDE_HOST_KEY: String = "HOST"
val READ_SIDE_PORT_KEY: String = "PORT"
val READ_SIDE_TLS_KEY: String = "USE_TLS"
val READ_SIDE_USE_STREAMING_KEY: String = "USE_STREAMING"

val logger: Logger = LoggerFactory.getLogger(this.getClass)

Expand Down Expand Up @@ -57,6 +58,9 @@ object ReadSideConfigReader {
case (config, (READ_SIDE_TLS_KEY, value)) =>
config.copy(useTls = value.toBooleanOption.getOrElse(false))

case (config, (READ_SIDE_USE_STREAMING_KEY, value)) =>
config.copy(useStreaming = value.toBooleanOption.getOrElse(false))

case (config, (key, value)) =>
config.addSetting(key, value)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ package com.namely.chiefofstate.readside

import akka.actor.typed.ActorSystem
import com.namely.chiefofstate.NettyHelper
import com.namely.chiefofstate.config.{ ReadSideConfig, ReadSideConfigReader }
import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.ReadSideHandlerServiceBlockingStub
import com.namely.chiefofstate.config.{ GrpcConfig, ReadSideConfig, ReadSideConfigReader }
import com.namely.chiefofstate.readside.streaming.{ ReadSideStreamHandlerImpl, ReadSideStreamProjection }
import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.{
ReadSideHandlerServiceBlockingStub,
ReadSideHandlerServiceStub
}
import com.typesafe.config.Config
import com.zaxxer.hikari.{ HikariConfig, HikariDataSource }
import io.grpc.ClientInterceptor
Expand All @@ -23,13 +27,15 @@ import org.slf4j.{ Logger, LoggerFactory }
* @param dbConfig the DB config for creating a hikari data source
* @param readSideConfigs sequence of configs for specific read sides
* @param numShards number of shards for projections/tags
* @param grpcConfig the grpc configuration
*/
class ReadSideManager(
system: ActorSystem[_],
interceptors: Seq[ClientInterceptor],
dbConfig: ReadSideManager.DbConfig,
readSideConfigs: Seq[ReadSideConfig],
numShards: Int) {
numShards: Int,
grpcConfig: GrpcConfig) {

private val logger: Logger = LoggerFactory.getLogger(this.getClass)

Expand All @@ -44,18 +50,40 @@ class ReadSideManager(
readSideConfigs.foreach(rsconfig => {

logger.info(s"starting read side, id=${rsconfig.processorId}")

// construct a remote gRPC read side client for this read side
// and register interceptors
val rpcClient: ReadSideHandlerServiceBlockingStub = new ReadSideHandlerServiceBlockingStub(
NettyHelper.builder(rsconfig.host, rsconfig.port, rsconfig.useTls).build).withInterceptors(interceptors: _*)
// instantiate a remote read side processor with the gRPC client
val remoteReadSideProcessor: ReadSideHandlerImpl = new ReadSideHandlerImpl(rsconfig.processorId, rpcClient)
// instantiate the read side projection with the remote processor
val projection =
new ReadSideProjection(system, rsconfig.processorId, dataSource, remoteReadSideProcessor, numShards)
// start the sharded daemon process
projection.start()
// FIXME draft implementation
if (rsconfig.useStreaming) {
// construct a remote gRPC streaming read side client
val rpcStreamingClient =
new ReadSideHandlerServiceStub(NettyHelper.builder(rsconfig.host, rsconfig.port, rsconfig.useTls).build)
.withInterceptors(interceptors: _*)

// instantiate a remote read side stream processor with the gRPC client
val remoteReadSideStreamProcessor: ReadSideStreamHandlerImpl =
ReadSideStreamHandlerImpl(rsconfig.processorId, grpcConfig, rpcStreamingClient)

// instantiate the read side projection with the remote processor
val projection: ReadSideStreamProjection =
new ReadSideStreamProjection(
system,
rsconfig.processorId,
dataSource,
remoteReadSideStreamProcessor,
numShards)
// start the sharded daemon process
projection.start()
} else {
// construct a remote gRPC read side client for this read side
// and register interceptors
val rpcClient: ReadSideHandlerServiceBlockingStub = new ReadSideHandlerServiceBlockingStub(
NettyHelper.builder(rsconfig.host, rsconfig.port, rsconfig.useTls).build).withInterceptors(interceptors: _*)
// instantiate a remote read side processor with the gRPC client
val remoteReadSideProcessor: ReadSideHandlerImpl = new ReadSideHandlerImpl(rsconfig.processorId, rpcClient)
// instantiate the read side projection with the remote processor
val projection =
new ReadSideProjection(system, rsconfig.processorId, dataSource, remoteReadSideProcessor, numShards)
// start the sharded daemon process
projection.start()
}
})
}
}
Expand All @@ -71,6 +99,8 @@ object ReadSideManager {
DbConfig(jdbcCfg)
}

val grpcConfig: GrpcConfig = GrpcConfig(system.settings.config)

// get the individual read side configs
val configs: Seq[ReadSideConfig] = ReadSideConfigReader.getReadSideSettings
// make the manager
Expand All @@ -79,7 +109,8 @@ object ReadSideManager {
interceptors = interceptors,
dbConfig = dbConfig,
readSideConfigs = configs,
numShards = numShards)
numShards = numShards,
grpcConfig)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2020 Namely Inc.
*
* SPDX-License-Identifier: MIT
*/

package com.namely.chiefofstate.readside.streaming

import akka.projection.eventsourced.EventEnvelope
import akka.projection.jdbc.scaladsl.JdbcHandler
import akka.projection.jdbc.JdbcSession
import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper

/**
* Implements the the akka projection JdbcHandler interface and forwards events to the
* stream readside handler
*
* @param processorId read side processor id
* @param readSideStreamHandler a remote handler implementation
*/
private[readside] class ReadSideJdbcStreamHandler(processorId: String, readSideStreamHandler: ReadSideStreamHandler)
extends JdbcHandler[Seq[EventEnvelope[EventWrapper]], JdbcSession] {

override def process(session: JdbcSession, envelopes: Seq[EventEnvelope[EventWrapper]]): Unit = {
// construct the list of events to push out
val events = envelopes.map(envelope => {
(envelope.event.getEvent, envelope.event.getResultingState, envelope.event.getMeta)
})

// send to the remote gRPC stream handler
readSideStreamHandler.processEvents(events)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright 2020 Namely Inc.
*
* SPDX-License-Identifier: MIT
*/

package com.namely.chiefofstate.readside.streaming

import com.google.protobuf.any
import com.namely.chiefofstate.config.GrpcConfig
import com.namely.protobuf.chiefofstate.v1.common.MetaData
import com.namely.protobuf.chiefofstate.v1.readside.{ HandleReadSideStreamRequest, HandleReadSideStreamResponse }
import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.ReadSideHandlerServiceStub
import io.grpc.stub.StreamObserver
import io.grpc.Status
import org.slf4j.{ Logger, LoggerFactory }

import java.util.concurrent.{ CountDownLatch, TimeUnit }
import scala.util.{ Failure, Success, Try }

/**
* Processes events read from the Journal by sending them to the read side server
* as gRPC stream
*/
private[readside] trait ReadSideStreamHandler {

/**
* handles a sequence of events that will be used to build a read model
*
* @param events the sequence of events to handle
*/
def processEvents(events: Seq[(any.Any, any.Any, MetaData)]): Unit
}

/**
* Receives the readside response from an observable stream of messages.
*
* @param processorId the processor id
* @param doneSignal the async signal notification
*/
private[readside] case class HandleReadSideResponseStreamObserver(processorId: String, doneSignal: CountDownLatch)
extends StreamObserver[HandleReadSideStreamResponse] {

private val logger: Logger = LoggerFactory.getLogger(this.getClass)

override def onNext(response: HandleReadSideStreamResponse): Unit = {
// onNext will be called only once after the server has finished processing the messages
logger.info("received a server response...")
if (!response.successful) {
val errMsg: String =
s"read side streaming message not handled, processor=$processorId"
logger.warn(errMsg)
throw new RuntimeException(errMsg)
}
}

override def onError(t: Throwable): Unit = {
val status = Status.fromThrowable(t)
val errMsg: String =
s"read side streaming returned failure, processor=$processorId, cause=$status"
logger.warn(errMsg)
doneSignal.countDown()
}

override def onCompleted(): Unit = {
// the server is done sending us data
// onCompleted will be called right after onNext()
doneSignal.countDown()
}
}

/**
* read side processor that sends messages to a gRPC server that implements
* the ReadSideHandler service
*
* @param processorId the unique Id for this read side
* @param readSideHandlerServiceStub a non-blocking client for a ReadSideHandler
*/
private[readside] case class ReadSideStreamHandlerImpl(
processorId: String,
grpcConfig: GrpcConfig,
readSideHandlerServiceStub: ReadSideHandlerServiceStub,
doneSignal: CountDownLatch = new CountDownLatch(1))
extends ReadSideStreamHandler {

private val logger: Logger = LoggerFactory.getLogger(this.getClass)

/**
* handles a sequence of events that will be used to build a read model
*
* @param events the read event envelopes to handle
* @return true or false
*/
override def processEvents(events: Seq[(any.Any, any.Any, MetaData)]): Unit = {
// create an instance of the response stream observer
val readSideResponseStreamObserver: HandleReadSideResponseStreamObserver =
HandleReadSideResponseStreamObserver(processorId = processorId, doneSignal = doneSignal)

// create the readSide request observer
val readSideRequestObserver: StreamObserver[HandleReadSideStreamRequest] =
readSideHandlerServiceStub
.withDeadlineAfter(grpcConfig.client.timeout, TimeUnit.MILLISECONDS)
.handleReadSideStream(readSideResponseStreamObserver)

Try {
val proceed: Boolean = doneSignal.getCount == 0
events.foreach(elt => {
if (proceed) {
val (event, resultingState, meta) = elt
val readSideRequest: HandleReadSideStreamRequest =
HandleReadSideStreamRequest()
.withEvent(event)
.withState(resultingState)
.withMeta(meta)
.withReadSideId(processorId)

// send the request to the server
readSideRequestObserver.onNext(readSideRequest)
}
})
} match {
case Failure(e) =>
logger.error(s"read side processing failure, processor=$processorId, cause=${e.getMessage}")
// Cancel RPC call
readSideRequestObserver.onError(e)
throw e;
case Success(_) =>
}

// we tell the server that the client is done sending data
readSideRequestObserver.onCompleted()

// Receiving happens asynchronously.
doneSignal.await()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2020 Namely Inc.
*
* SPDX-License-Identifier: MIT
*/

package com.namely.chiefofstate.readside.streaming

import akka.actor.typed.{ ActorSystem, Behavior }
import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
import akka.cluster.sharding.typed.ShardedDaemonProcessSettings
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
import akka.persistence.query.Offset
import akka.projection.{ ProjectionBehavior, ProjectionId }
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.eventsourced.EventEnvelope
import akka.projection.jdbc.scaladsl.JdbcProjection
import akka.projection.scaladsl.{ GroupedProjection, SourceProvider }
import com.namely.chiefofstate.readside.{ ReadSideJdbcSession, ReadSideProjection }
import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper
import org.slf4j.{ Logger, LoggerFactory }

import javax.sql.DataSource
import scala.concurrent.duration.DurationInt

private[readside] class ReadSideStreamProjection(
actorSystem: ActorSystem[_],
val processorId: String,
val dataSource: DataSource,
readSideStreamHandler: ReadSideStreamHandler,
val numShards: Int) {

final val log: Logger = LoggerFactory.getLogger(getClass)

implicit val sys: ActorSystem[_] = actorSystem

/**
* Initialize the projection to start fetching the events that are emitted
*/
def start(): Unit = {
ShardedDaemonProcess(actorSystem).init[ProjectionBehavior.Command](
name = processorId,
numberOfInstances = numShards,
behaviorFactory = shardNumber => jdbcGroupedProjection(shardNumber.toString),
settings = ShardedDaemonProcessSettings(actorSystem),
stopMessage = Some(ProjectionBehavior.Stop))
}

/**
* creates a jdbc grouped projection
*
* @param tagName the event tag
* @return the jdbc grouped projection behavior
*/
private[readside] def jdbcGroupedProjection(tagName: String): Behavior[ProjectionBehavior.Command] = {
val projection: GroupedProjection[Offset, EventEnvelope[EventWrapper]] =
JdbcProjection
.groupedWithin(
projectionId = ProjectionId(processorId, tagName),
sourceProvider = ReadSideProjection.sourceProvider(actorSystem, tagName),
// defines a session factory that returns a jdbc
// session connected to the hikari pool
sessionFactory = () => new ReadSideJdbcSession(dataSource.getConnection()),
handler = () => new ReadSideJdbcStreamHandler(processorId, readSideStreamHandler))
.withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis) // FIXME set this values in configuration

ProjectionBehavior(projection)
}
}

private[readside] object ReadSideStreamProjection {

/**
* Set the Event Sourced Provider per tag
*
* @param system the actor system
* @param tag the event tag
* @return the event sourced provider
*/
private[readside] def sourceProvider(
system: ActorSystem[_],
tag: String): SourceProvider[Offset, EventEnvelope[EventWrapper]] = {
EventSourcedProvider.eventsByTag[EventWrapper](system, readJournalPluginId = JdbcReadJournal.Identifier, tag)
}
}
Loading