Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate Java thread interruption in Dispatcher#unsafeRunSync #4167

Open
wants to merge 6 commits into
base: series/3.5.x
Choose a base branch
from

Conversation

kamilkloch
Copy link
Contributor

@kamilkloch kamilkloch commented Nov 12, 2024

Closes #4166.

Copy link
Member

@armanbilge armanbilge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also make a similar change for IO#unsafeRunSync.

@kamilkloch
Copy link
Contributor Author

We should also make a similar change for IO#unsafeRunSync.

I got a bit lost trying to comprehend the nuanced

final def unsafeRunTimed(limit: FiniteDuration)(

@armanbilge
Copy link
Member

I got a bit lost trying to comprehend the nuanced

Actually, maybe let's hold off on that change. I forgot that the specification for this method is a bit unusual.

* Similar to `unsafeRunSync`, except with a bounded blocking duration when awaiting
* asynchronous results. As soon as an async blocking limit is hit, evaluation ''immediately''
* aborts and `None` is returned. Note that this does not run finalizers, which makes it quite
* different (and less safe) than other mechanisms for limiting evaluation time.


But if we did decide to move forward ...

You can factor out a package-private version of unsafeRunAsync that returns the Fiber.

def unsafeRunAsync(cb: Either[Throwable, A] => Unit)(
implicit runtime: unsafe.IORuntime): Unit = {
unsafeRunFiber(
cb(Left(new CancellationException("The fiber was canceled"))),
t => {
if (!NonFatal(t)) {
t.printStackTrace()
}
cb(Left(t))
},
a => cb(Right(a))
)
()
}

Then, you can use that for cancelation in unsafeRunTimed.

try {
val result = blocking(queue.poll(limit.toNanos, TimeUnit.NANOSECONDS))
if (result eq null) None else result.fold(throw _, Some(_))
} catch {
case _: InterruptedException =>
None
}

@kamilkloch
Copy link
Contributor Author

Actually, maybe let's hold off on that change. I forgot that the specification for this method is a bit unusual.

If only you are ok with the resulting discrepancy in sematics between Dispatcher#unsafeRunSync and IO#unsafeRunSync, I would hold off as well. (Actually, for unsafeRunTimed the semantics is already different)

@armanbilge armanbilge changed the title Propagate Java thread interruption in unsafeRunSync. Fixes #4166. Propagate Java thread interruption in Dispatcher#unsafeRunSync Nov 14, 2024
Copy link
Member

@armanbilge armanbilge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants