Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recursive read side jdbc handler #281

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ import akka.projection.eventsourced.EventEnvelope
import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper
import akka.projection.jdbc.JdbcSession
import com.google.protobuf.any.{Any => ProtoAny}

import scala.util.{Failure, Success, Try}
import org.slf4j.{Logger, LoggerFactory}
import com.namely.protobuf.chiefofstate.v1.common.MetaData
import com.namely.protobuf.chiefofstate.v1.readside.HandleReadSideResponse

import java.time.Duration
import scala.annotation.tailrec

/**
* Implements the akka JdbcHandler interface and forwards events to the
* provided remote read side processor
Expand All @@ -26,19 +30,36 @@ import com.namely.protobuf.chiefofstate.v1.readside.HandleReadSideResponse
*/
private[readside] class ReadSideJdbcHandler(eventTag: String,
processorId: String,
remoteReadProcessor: RemoteReadSideProcessor
remoteReadProcessor: RemoteReadSideProcessor,
backOffSecondsMin: Long,
backOffSecondsMax: Long
) extends JdbcHandler[EventEnvelope[EventWrapper], JdbcSession] {

private val logger: Logger = LoggerFactory.getLogger(this.getClass)

val backOffMultiplier: Double = Math.random()

/**
* @param session a JdbcSession implementation
* @param envelope the wrapped event to process
*/
def process(session: JdbcSession, envelope: EventEnvelope[EventWrapper]): Unit = {
recursiveProcess(session, envelope)
}

/**
* process an event inside the jdbc session by invoking the remote
* read processor
* read processor. In the failure state, backs off and tries again.
*
* @param session a JdbcSession implementation
* @param envelope the wrapped event to process
* @param numAttempts the number of attempts
*/
def process(session: JdbcSession, envelope: EventEnvelope[EventWrapper]): Unit = {
@tailrec
private[this] def recursiveProcess(session: JdbcSession,
envelope: EventEnvelope[EventWrapper],
numAttempts: Int = 0
): Unit = {
// extract required arguments
val event: ProtoAny = envelope.event.getEvent
val resultingState: ProtoAny = envelope.event.getResultingState
Expand All @@ -56,18 +77,23 @@ private[readside] class ReadSideJdbcHandler(eventTag: String,
// handle successful gRPC call where server indicated "successful = false"
case Success(_) =>
val errMsg: String =
s"read side returned failure, processor=${processorId}, id=${meta.entityId}, revisionNumber=${meta.revisionNumber}"
s"read side returned failure, attempt=${numAttempts}, processor=${processorId}, id=${meta.entityId}, revisionNumber=${meta.revisionNumber}"
logger.warn(errMsg)
throw new RuntimeException(errMsg)

// handle failed gRPC call
case Failure(exception) =>
logger.error(
s"read side processing failure, processor=${processorId}, id=${meta.entityId}, revisionNumber=${meta.revisionNumber}, cause=${exception.getMessage()}"
s"read side processing failure, attempt=${numAttempts}, processor=${processorId}, id=${meta.entityId}, revisionNumber=${meta.revisionNumber}, cause=${exception.getMessage}"
)
// for debug purposes, log the stack trace as well
logger.debug("remote handler failure", exception)
throw exception

val backoffSeconds: Long = Math.min(backOffSecondsMax, (backOffSecondsMin * Math.pow(1.1, numAttempts)).toLong)

Thread.sleep(Duration.ofSeconds(backoffSeconds).toMillis)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can we avoid this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


recursiveProcess(session, envelope, numAttempts + 1)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private[readside] class ReadSideProcessor(
)
}

// TODO: Pass in the back off seconds min and max from config
private[readside] def jdbcProjection(tagName: String): ExactlyOnceProjection[Offset, EventEnvelope[EventWrapper]] = {
JdbcProjection
.exactlyOnce(
Expand All @@ -68,7 +69,7 @@ private[readside] class ReadSideProcessor(
// defines a session factory that returns a jdbc
// session connected to the hikari pool
sessionFactory = () => new ReadSideJdbcSession(dataSource.getConnection()),
handler = () => new ReadSideJdbcHandler(tagName, processorId, remoteReadProcessor)
handler = () => new ReadSideJdbcHandler(tagName, processorId, remoteReadProcessor, 1, 30)
)

}
Expand Down