Skip to content

Commit

Permalink
Fix remaining tests: Fix fetching from Tasks, fix non-blocking select…
Browse files Browse the repository at this point in the history
… after latest changes
  • Loading branch information
NHDaly committed Sep 4, 2019
1 parent 1e55025 commit de1ba80
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 18 deletions.
55 changes: 43 additions & 12 deletions src/Select.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,26 @@ using Nullables

export @select

function isready_put(c::Channel)
function isready_put(c::Channel, sibling_tasks)
# TODO: To fix the circular dependency, I think it might be enough to just add a check
# here that there is at least one ready task that _isn't_ one of our siblings! We can
# take another argument to this function, which is the list of tasks, and cross-reference it?
return if Base.isbuffered(c)
length(c.data) != c.sz_max
else
!isempty(c.cond_take.waitq)
# TODO: No this isn't enough. I need to do it for the _wait_ function, not the wait_put. :'(
#@info sibling_tasks
#@info "isready_put:" (!isempty(c.cond_take.waitq))#, collect(c.cond_take.waitq))
!isempty(c.cond_take.waitq) && any(t->!occursin(t, sibling_tasks), c.cond_take.waitq)
end
end

function wait_put(c::Channel)
function wait_put(c::Channel, sibling_tasks)
#isready_put(c, sibling_tasks) && return
# TODO: Is this sufficiently thread-safe?
lock(c)
try
while !isready_put(c)
while !isready_put(c, sibling_tasks)
Base.check_channel_state(c)
wait(c.cond_put) # Can be cancelled while waiting here...
end
Expand All @@ -26,6 +33,30 @@ function wait_put(c::Channel)
nothing
end

isready_wait_nosibs(c::Channel, sibling_tasks) = n_avail_nosibs(c, sibling_tasks) > 0
function n_avail_nosibs(c::Channel, sibling_tasks)
if Base.isbuffered(c)
length(c.data)
else
#@info "isready_wait_nosibs:" (isempty(c.cond_put.waitq), collect(c.cond_put.waitq))
length(filter(t->0==count(x->x==t, sibling_tasks), collect(c.cond_put.waitq)))
end
end
function wait_nosibs(c::Channel, sibling_tasks)
# I don't understand why its okay to access this outside the lock...?
#isready_wait_nosibs(c, sibling_tasks) && return
lock(c)
try
while !isready_wait_nosibs(c, sibling_tasks)
Base.check_channel_state(c)
wait(c.cond_wait)
end
finally
unlock(c)
end
nothing
end
wait_nosibs(x, sibling_tasks) = wait(x)

## Implementation of 'select' mechanism to block on the disjunction of
## of 'waitable' objects.
Expand Down Expand Up @@ -149,7 +180,7 @@ end
# with select.
# @select if x |> value ... will ultimately insert an expression value=_take!(x).
_take!(c::AbstractChannel) = take!(c)
_take!(x) = wait(x)
_take!(x) = fetch(x)
# @select if x <| value .... will ultimately inset value=put!(x), which currently
# is only meanginful for channels and so no underscore varirant is used here.
# These are used with the non-blocking variant of select, which will
Expand Down Expand Up @@ -183,7 +214,7 @@ function _select_nonblock_macro(clauses)
if clause.kind == SelectPut
channel_var = gensym("channel")
channel_assignment_expr = :($channel_var = $(clause.channel|>get|>esc))
:(if ($channel_assignment_expr; isready_put($channel_var))
:(if ($channel_assignment_expr; isready_put($channel_var, []))
put!($channel_var, $(clause.value|>get|>esc))
$(esc(body))
end)
Expand Down Expand Up @@ -247,17 +278,17 @@ function _select_block_macro(clauses)
channel_assignment_expr = :($channel_var = $(clause.channel|>get|>esc))
if clause.kind == SelectPut
isready_func = isready_put
wait_for_channel = :(wait_put($channel_var))
wait_for_channel = :(wait_put($channel_var, tasks))
mutate_channel = :(put!($channel_var, $value_var))
bind_variable = :(nothing)
elseif clause.kind == SelectTake
isready_func = _isready
wait_for_channel = :(wait($channel_var))
wait_for_channel = :(wait_nosibs($channel_var, tasks))
mutate_channel = :(_take!($channel_var))
bind_variable = :($value_var = branch_val)
end
branch = quote
tasks[$i] = Threads.@spawn begin
tasks[$i] = @async begin
$channel_declaration_expr
try # Listen for genuine errors to throw to the main task
$channel_assignment_expr
Expand Down Expand Up @@ -339,7 +370,7 @@ end
function _select_nonblock(clauses)
for (i, clause) in enumerate(clauses)
if clause[1] == :put
if isready_put(clause[2])
if isready_put(clause[2], [])
return (i, put!(clause[2], clause[3]))
end
elseif clause[1] == :take
Expand All @@ -361,9 +392,9 @@ function _select_block(clauses)
try
try
if clause[1] == :put
wait_put(clause[2])
wait_put(clause[2], tasks)
elseif clause[1] == :take
wait(clause[2])
wait_nosibs(clause[2], tasks)
end
catch err
if isa(err, SelectInterrupt)
Expand Down
23 changes: 17 additions & 6 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,23 @@ using Select
using Test

@testset "self-references" begin
ch = Channel()
@test Select.@select begin
ch <| "hi" => "put"
ch |> x => "take! |> $x"
@async(sleep(1)) => "timeout"
end == "timeout"
# Test multiple times because the deadlock was only triggered if Task 1 is run before
# Task 2 or Task 3. Running it multiple times will hopefully cover all the cases.
for _ in 1:10
# Test that the two clauses in `@select` can't trigger eachother (Here, the problem
# would be if Task 1 puts into ch, then Task 2 sees Task 1 waiting, and thinks ch is
# ready to take! and attempts to incorrectly proceed w/ the take!.) Instead, this should
# timeout, since no one else is putting or taking on `ch`.
ch = Channel()
@test @select(begin
ch <| "hi" => "put"
# This take!(ch) should not be triggered by the put
ch |> x => "take! |> $x"
# This wait(ch) should also not be triggered by the put
ch => "waiting on ch"
@async(sleep(0.3)) => "timeout"
end) == "timeout"
end
end

function select_block_test(t1, t2, t3, t4)
Expand Down

0 comments on commit de1ba80

Please sign in to comment.