Skip to content

Commit

Permalink
Merge pull request #9 from dvgica/add-future-jdk
Browse files Browse the repository at this point in the history
Add Future support to JDK implementation
  • Loading branch information
dvgica authored Oct 22, 2023
2 parents b1bcb68 + e1239cf commit 1e18136
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 25 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import java.time.Instant

def updateData(): String = Instant.now.toString

val data = AutoUpdatingVar.jdk(
val data = AutoUpdatingVar.jdk( // or `AutoUpdatingVar.jdkFuture` if `updateData` returns a `scala.concurrent.Future`
updateData(),
// can also be dynamic based on the last data
UpdateInterval.Static(1.second),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package ca.dvgi.periodic

import scala.reflect.ClassTag
import org.slf4j.LoggerFactory
import ca.dvgi.periodic.jdk.Identity
import ca.dvgi.periodic.jdk.JdkAutoUpdater
import ca.dvgi.periodic.jdk._
import scala.concurrent.duration.Duration
import scala.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
Expand Down Expand Up @@ -122,7 +121,34 @@ object AutoUpdatingVar {
executorOverride: Option[ScheduledExecutorService] = None
)(implicit ct: ClassTag[T]): AutoUpdatingVar[Identity, Future, T] = {
new AutoUpdatingVar(
new JdkAutoUpdater[T](blockUntilReadyTimeout, executorOverride)
new IdentityJdkAutoUpdater[T](blockUntilReadyTimeout, executorOverride)
)(
updateVar,
updateInterval,
updateAttemptStrategy,
handleInitializationError,
varNameOverride
)
}

/** An AutoUpdatingVar based on only the JDK, for use when `updateVar` returns a `Future`.
*
* @see
* [[ca.dvgi.periodic.jdk.JdkAutoUpdater]]
* @see
* [[ca.dvgi.periodic.AutoUpdatingVar]]
*/
def jdkFuture[T](
updateVar: => Future[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy,
handleInitializationError: PartialFunction[Throwable, Future[T]] = PartialFunction.empty,
varNameOverride: Option[String] = None,
blockUntilReadyTimeout: Option[Duration] = None,
executorOverride: Option[ScheduledExecutorService] = None
)(implicit ct: ClassTag[T]): AutoUpdatingVar[Future, Future, T] = {
new AutoUpdatingVar(
new FutureJdkAutoUpdater[T](blockUntilReadyTimeout, executorOverride)
)(
updateVar,
updateInterval,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ca.dvgi.periodic.jdk

import scala.concurrent.duration.Duration
import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.Future
import scala.concurrent.Await

class FutureJdkAutoUpdater[T](
blockUntilReadyTimeout: Option[Duration] = None,
executorOverride: Option[ScheduledExecutorService] = None
) extends JdkAutoUpdater[Future, T](blockUntilReadyTimeout, executorOverride) {
override protected def evalUpdate(ut: Future[T]): T = Await.result(ut, Duration.Inf)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ca.dvgi.periodic.jdk

import scala.concurrent.duration.Duration
import java.util.concurrent.ScheduledExecutorService

class IdentityJdkAutoUpdater[T](
blockUntilReadyTimeout: Option[Duration] = None,
executorOverride: Option[ScheduledExecutorService] = None
) extends JdkAutoUpdater[Identity, T](blockUntilReadyTimeout, executorOverride) {
override protected def evalUpdate(ut: Identity[T]): T = ut
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import java.util.concurrent.ScheduledFuture
* @param executorOverride
* If present, will be used instead of starting a new thread.
*/
class JdkAutoUpdater[T](
blockUntilReadyTimeout: Option[Duration] = None,
executorOverride: Option[ScheduledExecutorService] = None
) extends AutoUpdater[Identity, Future, T] {
abstract class JdkAutoUpdater[U[T], T](
blockUntilReadyTimeout: Option[Duration],
executorOverride: Option[ScheduledExecutorService]
) extends AutoUpdater[U, Future, T] {

private val executor = executorOverride.getOrElse(Executors.newScheduledThreadPool(1))

Expand All @@ -45,10 +45,10 @@ class JdkAutoUpdater[T](

override def start(
log: Logger,
updateVar: () => T,
updateVar: () => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy,
handleInitializationError: PartialFunction[Throwable, T]
handleInitializationError: PartialFunction[Throwable, U[T]]
): Future[Unit] = {
executor.schedule(
new Runnable {
Expand All @@ -57,13 +57,13 @@ class JdkAutoUpdater[T](
Try(try {
try {
log.info("Attempting initialization...")
updateVar()
evalUpdate(updateVar())
} catch {
case NonFatal(e) =>
log.error("Failed to initialize var", e)
throw e
}
} catch (handleInitializationError))
} catch (handleInitializationError.andThen(evalUpdate _)))

tryV match {
case Success(value) =>
Expand Down Expand Up @@ -108,9 +108,11 @@ class JdkAutoUpdater[T](
()
}

protected def evalUpdate(ut: U[T]): T

private def scheduleUpdate(nextUpdate: FiniteDuration)(implicit
log: Logger,
updateVar: () => T,
updateVar: () => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy
): Unit = {
Expand All @@ -131,14 +133,14 @@ class JdkAutoUpdater[T](

private class UpdateVar(attempt: Int)(implicit
log: Logger,
updateVar: () => T,
updateVar: () => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy
) extends Runnable {
def run(): Unit = {
log.info("Attempting var update...")
try {
val newV = updateVar()
val newV = evalUpdate(updateVar())
variable = Some(newV)
log.info("Successfully updated")
scheduleUpdate(updateInterval.duration(newV))
Expand All @@ -159,7 +161,7 @@ class JdkAutoUpdater[T](

private def reattempt(e: Throwable, delay: FiniteDuration)(implicit
log: Logger,
updateVar: () => T,
updateVar: () => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy
): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {
FunFixture(
_ => {
val holder = new VarHolder
val v = new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(5.seconds)))(
val v = new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(5.seconds)))(
holder.get,
UpdateInterval.Static(1.seconds),
UpdateAttemptStrategy.Infinite(1.second)
Expand Down Expand Up @@ -66,7 +66,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {
FunFixture(
_ => {
val holder = new VarHolder
val v = new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))(
val v = new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))(
holder.get,
UpdateInterval.Dynamic((i: Int) => i * 1.second),
UpdateAttemptStrategy.Infinite(1.second)
Expand Down Expand Up @@ -98,7 +98,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {

FunFixture(
_ => {
new AutoUpdatingVar(new JdkAutoUpdater[Int]())(
new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int]())(
throw TestException,
UpdateInterval.Static(1.seconds),
UpdateAttemptStrategy.Infinite(1.second)
Expand All @@ -111,7 +111,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {

FunFixture(
_ => {
new AutoUpdatingVar(new JdkAutoUpdater[Int]())(
new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int]())(
{
Thread.sleep(1000)
1
Expand All @@ -129,7 +129,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {
"returns a failed future from constructor if the first update fails and instructed to block"
) {
intercept[TestException.type] {
new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))(
new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))(
throw TestException,
UpdateInterval.Static(1.seconds),
UpdateAttemptStrategy.Infinite(1.second)
Expand All @@ -139,7 +139,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {

FunFixture(
_ => {
new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))(
new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))(
throw TestException,
UpdateInterval.Static(1.seconds),
UpdateAttemptStrategy.Infinite(1.second),
Expand All @@ -159,7 +159,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {
_ => {
val holder = new VarErrorHolder
val v =
new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))(
new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))(
holder.get,
UpdateInterval.Static(1.second),
UpdateAttemptStrategy.Infinite(1.second),
Expand Down Expand Up @@ -190,7 +190,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {
_ => {
val holder = new VarErrorHolder
val v =
new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))(
new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))(
holder.get,
UpdateInterval.Static(1.second),
UpdateAttemptStrategy
Expand Down Expand Up @@ -229,7 +229,9 @@ class JdkAutoUpdaterTest extends munit.FunSuite {
val holder = new VarHolder
val ses = Executors.newScheduledThreadPool(1)
val v =
new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second), executorOverride = Some(ses)))(
new AutoUpdatingVar(
new IdentityJdkAutoUpdater[Int](Some(1.second), executorOverride = Some(ses))
)(
holder.get,
UpdateInterval.Static(2.seconds),
UpdateAttemptStrategy.Infinite(1.second)
Expand Down

0 comments on commit 1e18136

Please sign in to comment.