Skip to content

Commit

Permalink
Support Threads.Condition variables in Select.
Browse files Browse the repository at this point in the history
Note that you need to unlock the condition inside the body of your
select, which is super annoying and kind of makes it not-worth-it to
support Condition variables.

It would probably be better to just have an assertion error if you try
to wait on a condition variable...

BUT, anyway, this commit fixes the waiting to support condition
variables correctly, I'm pretty sure! :)
  • Loading branch information
NHDaly committed Sep 4, 2019
1 parent ba29a87 commit a751207
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 3 deletions.
62 changes: 59 additions & 3 deletions src/Select.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ using Nullables

export @select

# =========================================================================================
# Custom concurrency primitives needed to support `@select`
# ------------------

function isready_put(c::Channel, sibling_tasks)
# TODO: To fix the circular dependency, I think it might be enough to just add a check
# here that there is at least one ready task that _isn't_ one of our siblings! We can
Expand Down Expand Up @@ -56,7 +60,59 @@ function wait_nosibs(c::Channel, sibling_tasks)
end
nothing
end
wait_nosibs(x, sibling_tasks) = wait(x)

wait_select(c::Channel, parent_task, sibling_tasks) = wait_nosibs(c, sibling_tasks)
wait_select(c::Base.GenericCondition, parent_task, sibling_tasks) = wait_from_parent(c, parent_task)
wait_select(x, parent_task, sibling_tasks) = wait(x)

# ---- Conditions ---------
assert_parent_haslock(c::Base.GenericCondition, parent_task) = assert_parent_haslock(c.lock, parent_task)
assert_parent_haslock(l::ReentrantLock, parent_task) =
(islocked(l) && l.locked_by === parent_task) ? nothing : Base.concurrency_violation()
assert_parent_haslock(l::Base.AlwaysLockedST, parent_task) =
(islocked(l) && l.ownertid === parent_task) ? nothing : Base.concurrency_violation()

function wait_from_parent(c::Base.GenericCondition, parent_task)
ct = current_task()
# Note that the parent task is guaranteed to be blocking on us, so this is okay.
assert_parent_haslock(c, parent_task)
push!(c.waitq, ct)
token = unlockall_from_parent(c.lock, parent_task)
try
return wait()
catch
ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct)
rethrow()
finally
# Note that now _this task_ gets the lock, so we can execute the remaining body w/ the lock
Base.relockall(c.lock, token)
# eww, manually re-assign the parent to own this lock
c.lock.locked_by = parent_task
end
end

function unlockall_from_parent(rl::ReentrantLock, parent_task)
n = rl.reentrancy_cnt
rl.locked_by === parent_task || error("unlock from wrong thread")
n == 0 && error("unlock count must match lock count")
lock(rl.cond_wait)
rl.reentrancy_cnt = 0
rl.locked_by = nothing
if !isempty(rl.cond_wait.waitq)
try
notify(rl.cond_wait)
catch
unlock(rl.cond_wait)
rethrow()
end
end
unlock(rl.cond_wait)
return n
end


# =========================================================================================


## Implementation of 'select' mechanism to block on the disjunction of
## of 'waitable' objects.
Expand Down Expand Up @@ -283,7 +339,7 @@ function _select_block_macro(clauses)
bind_variable = :(nothing)
elseif clause.kind == SelectTake
isready_func = _isready
wait_for_channel = :(wait_nosibs($channel_var, tasks))
wait_for_channel = :(wait_select($channel_var, maintask, tasks))
mutate_channel = :(_take!($channel_var))
bind_variable = :($value_var = branch_val)
end
Expand Down Expand Up @@ -394,7 +450,7 @@ function _select_block(clauses)
if clause[1] == :put
wait_put(clause[2], tasks)
elseif clause[1] == :take
wait_nosibs(clause[2], tasks)
wait_select(clause[2], maintask, tasks)
end
catch err
if isa(err, SelectInterrupt)
Expand Down
70 changes: 70 additions & 0 deletions test/Select.jl
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,73 @@ end
end) == "timeout"
end
end

# Other waitable things
# Conditions
# Note that we may not support Base.Condition() because it cannot be used in multithreaded code.
## Simple condition test: wait until a condition is notified
#c = Condition()
#@async while notify(c) != 1 sleep(0.5) end
#@test @select(begin
# c => "c"
#end) == "c"

@sync begin
c = Threads.Condition()
t = @async begin
success = false
while true
lock(c)
success = notify(c) > 0
unlock(c)
if success break; end
sleep(0.5)
end
end
lock(c)
@test @select(begin
c => (unlock(c); "c")
end) == "c"
end

# Multiple waiters
@sync begin
coordinator = Channel()

c1 = Threads.Condition()
c2 = Threads.Condition()

wait_on_conds() = begin
lock(c1)
lock(c2)
put!(coordinator, 0)
@select begin
c1 => begin
put!(coordinator, 0)
unlock(c2); unlock(c1);
end
c2 => begin
put!(coordinator, 0)
unlock(c2); unlock(c1);
end
end
end
t1 = @async wait_on_conds()
t2 = @async wait_on_conds()

take!(coordinator); take!(coordinator) # Wait for both tasks to start waiting

# Now, everyone is waiting, so notifying c1 will wake up _both_ select tasks (and kill both c2 siblings)
lock(c1)
@test notify(c1) == 2
unlock(c1)
yield()
yield()

take!(coordinator); take!(coordinator) # Wait for both tasks to finish running

# Both select tasks will have killed their sibling clauses, so no one is listening on c2
lock(c2)
@test notify(c2) == 0
unlock(c2)
end

0 comments on commit a751207

Please sign in to comment.