diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandler.scala index abf3cc31..85c81c49 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandler.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandler.scala @@ -11,11 +11,15 @@ import akka.projection.eventsourced.EventEnvelope import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper import akka.projection.jdbc.JdbcSession import com.google.protobuf.any.{Any => ProtoAny} + import scala.util.{Failure, Success, Try} import org.slf4j.{Logger, LoggerFactory} import com.namely.protobuf.chiefofstate.v1.common.MetaData import com.namely.protobuf.chiefofstate.v1.readside.HandleReadSideResponse +import java.time.Duration +import scala.annotation.tailrec + /** * Implements the akka JdbcHandler interface and forwards events to the * provided remote read side processor @@ -26,19 +30,36 @@ import com.namely.protobuf.chiefofstate.v1.readside.HandleReadSideResponse */ private[readside] class ReadSideJdbcHandler(eventTag: String, processorId: String, - remoteReadProcessor: RemoteReadSideProcessor + remoteReadProcessor: RemoteReadSideProcessor, + backOffSecondsMin: Long, + backOffSecondsMax: Long ) extends JdbcHandler[EventEnvelope[EventWrapper], JdbcSession] { private val logger: Logger = LoggerFactory.getLogger(this.getClass) + val backOffMultiplier: Double = Math.random() + + /** + * @param session a JdbcSession implementation + * @param envelope the wrapped event to process + */ + def process(session: JdbcSession, envelope: EventEnvelope[EventWrapper]): Unit = { + recursiveProcess(session, envelope) + } + /** * process an event inside the jdbc session by invoking the remote - * read processor + * read processor. In the failure state, backs off and tries again. * * @param session a JdbcSession implementation * @param envelope the wrapped event to process + * @param numAttempts the number of attempts */ - def process(session: JdbcSession, envelope: EventEnvelope[EventWrapper]): Unit = { + @tailrec + private[this] def recursiveProcess(session: JdbcSession, + envelope: EventEnvelope[EventWrapper], + numAttempts: Int = 0 + ): Unit = { // extract required arguments val event: ProtoAny = envelope.event.getEvent val resultingState: ProtoAny = envelope.event.getResultingState @@ -56,18 +77,23 @@ private[readside] class ReadSideJdbcHandler(eventTag: String, // handle successful gRPC call where server indicated "successful = false" case Success(_) => val errMsg: String = - s"read side returned failure, processor=${processorId}, id=${meta.entityId}, revisionNumber=${meta.revisionNumber}" + s"read side returned failure, attempt=${numAttempts}, processor=${processorId}, id=${meta.entityId}, revisionNumber=${meta.revisionNumber}" logger.warn(errMsg) throw new RuntimeException(errMsg) // handle failed gRPC call case Failure(exception) => logger.error( - s"read side processing failure, processor=${processorId}, id=${meta.entityId}, revisionNumber=${meta.revisionNumber}, cause=${exception.getMessage()}" + s"read side processing failure, attempt=${numAttempts}, processor=${processorId}, id=${meta.entityId}, revisionNumber=${meta.revisionNumber}, cause=${exception.getMessage}" ) // for debug purposes, log the stack trace as well logger.debug("remote handler failure", exception) - throw exception + + val backoffSeconds: Long = Math.min(backOffSecondsMax, (backOffSecondsMin * Math.pow(1.1, numAttempts)).toLong) + + Thread.sleep(Duration.ofSeconds(backoffSeconds).toMillis) + + recursiveProcess(session, envelope, numAttempts + 1) } } } diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideProcessor.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideProcessor.scala index 3461bd67..7ac6c38a 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideProcessor.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideProcessor.scala @@ -60,6 +60,7 @@ private[readside] class ReadSideProcessor( ) } + // TODO: Pass in the back off seconds min and max from config private[readside] def jdbcProjection(tagName: String): ExactlyOnceProjection[Offset, EventEnvelope[EventWrapper]] = { JdbcProjection .exactlyOnce( @@ -68,7 +69,7 @@ private[readside] class ReadSideProcessor( // defines a session factory that returns a jdbc // session connected to the hikari pool sessionFactory = () => new ReadSideJdbcSession(dataSource.getConnection()), - handler = () => new ReadSideJdbcHandler(tagName, processorId, remoteReadProcessor) + handler = () => new ReadSideJdbcHandler(tagName, processorId, remoteReadProcessor, 1, 30) ) }