From 904e64130e4400dca90a82a5ba798455560a0ff4 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Thu, 19 Dec 2024 19:49:19 -0500 Subject: [PATCH 1/3] Add timeout parameter to `wait(::Condition)` --- base/boot.jl | 6 +++++- base/condition.jl | 43 ++++++++++++++++++++++++++++++++++++++++--- test/channels.jl | 17 +++++++++++++++++ 3 files changed, 62 insertions(+), 4 deletions(-) diff --git a/base/boot.jl b/base/boot.jl index 4652524530703..9caf1136f18ef 100644 --- a/base/boot.jl +++ b/base/boot.jl @@ -224,7 +224,7 @@ export InterruptException, InexactError, OutOfMemoryError, ReadOnlyMemoryError, OverflowError, StackOverflowError, SegmentationFault, UndefRefError, UndefVarError, TypeError, ArgumentError, MethodError, AssertionError, LoadError, InitError, - UndefKeywordError, ConcurrencyViolationError, FieldError, + UndefKeywordError, ConcurrencyViolationError, FieldError, TimeoutError, # AST representation Expr, QuoteNode, LineNumberNode, GlobalRef, # object model functions @@ -473,6 +473,10 @@ end struct PrecompilableError <: Exception end +struct TimeoutError <: Exception + timeout::Real +end + String(s::String) = s # no constructor yet const Cvoid = Nothing diff --git a/base/condition.jl b/base/condition.jl index fd771c9be346a..36eea4edb5943 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -126,19 +126,56 @@ proceeding. function wait end """ - wait(c::GenericCondition; first::Bool=false) + wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0) Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`. If the keyword `first` is set to `true`, the waiter will be put _first_ in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior. + +If `timeout` is specified, cancel the `wait` when it expires and throw a +`TimeoutError` to the waiting task. The minimum value for `timeout` +is 0.001 seconds, i.e. 1 millisecond. """ -function wait(c::GenericCondition; first::Bool=false) +function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0) + timeout == 0.0 || timeout ≥ 1e-3 || throw(ArgumentError("timeout must be ≥ 1 millisecond")) + ct = current_task() _wait2(c, ct, first) token = unlockall(c.lock) + + timer::Union{Timer, Nothing} = nothing + if timeout > 0.0 + timer = Timer(timeout) + # start a task to wait on the timer + t = Task() do + try + wait(timer) + catch e + # if the timer was closed, the waiting task has been scheduled; do nothing + e isa EOFError && return + end + dosched = false + lock(c.lock) + # Confirm that the waiting task is still in the wait queue and remove it. If + # the task is not in the wait queue, it must have been notified already so we + # don't do anything here. + if ct.queue == c.waitq + dosched = true + Base.list_deletefirst!(c.waitq, ct) + end + unlock(c.lock) + # send the waiting task a timeout + dosched && schedule(ct, TimeoutError(timeout); error=true) + end + t.sticky = false + schedule(t) + end + try - return wait() + res = wait() + timer === nothing || close(timer) + return res catch q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) rethrow() diff --git a/test/channels.jl b/test/channels.jl index eed7a7ecc0566..89e716e613b72 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -1,6 +1,7 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license using Random +using Base.Threads using Base: Experimental using Base: n_avail @@ -39,6 +40,22 @@ end @test fetch(t) == "finished" end +@testset "timed wait on Condition" begin + a = Condition() + @test_throws ArgumentError wait(a; timeout=0.0005) + @test_throws TimeoutError wait(a; timeout=0.1) + @spawn begin + sleep(0.01) + notify(a) + end + @test try + wait(a; timeout=2) + true + catch + false + end +end + @testset "various constructors" begin c = Channel() @test eltype(c) == Any From d989c765c174e4b56c3d0b2d7de89f41be2b87b8 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Mon, 6 Jan 2025 16:22:06 -0500 Subject: [PATCH 2/3] Prevent ABA problem --- base/condition.jl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/base/condition.jl b/base/condition.jl index 36eea4edb5943..f0d59da5b6de6 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -145,6 +145,7 @@ function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0) token = unlockall(c.lock) timer::Union{Timer, Nothing} = nothing + waiter_left = Threads.Atomic{Bool}(false) if timeout > 0.0 timer = Timer(timeout) # start a task to wait on the timer @@ -160,7 +161,7 @@ function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0) # Confirm that the waiting task is still in the wait queue and remove it. If # the task is not in the wait queue, it must have been notified already so we # don't do anything here. - if ct.queue == c.waitq + if !waiter_left[] && ct.queue == c.waitq dosched = true Base.list_deletefirst!(c.waitq, ct) end @@ -174,7 +175,10 @@ function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0) try res = wait() - timer === nothing || close(timer) + if timer !== nothing + close(timer) + waiter_left[] = true + end return res catch q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) From 53a5d21da604938754df379ebc902e81147abe45 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Wed, 8 Jan 2025 12:05:58 -0500 Subject: [PATCH 3/3] Make the timeout task an interactive task --- base/condition.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/base/condition.jl b/base/condition.jl index f0d59da5b6de6..7aaf3d3aded98 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -170,6 +170,7 @@ function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0) dosched && schedule(ct, TimeoutError(timeout); error=true) end t.sticky = false + Threads._spawn_set_thrpool(t, :interactive) schedule(t) end