From a75120728b4de5fdfcaf68495acb839050ec6348 Mon Sep 17 00:00:00 2001 From: Nathan Daly Date: Wed, 4 Sep 2019 19:41:03 -0400 Subject: [PATCH] Support Threads.Condition variables in Select. Note that you need to unlock the condition inside the body of your select, which is super annoying and kind of makes it not-worth-it to support Condition variables. It would probably be better to just have an assertion error if you try to wait on a condition variable... BUT, anyway, this commit fixes the waiting to support condition variables correctly, I'm pretty sure! :) --- src/Select.jl | 62 +++++++++++++++++++++++++++++++++++++++++--- test/Select.jl | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 3 deletions(-) diff --git a/src/Select.jl b/src/Select.jl index 159c3be..a043339 100644 --- a/src/Select.jl +++ b/src/Select.jl @@ -4,6 +4,10 @@ using Nullables export @select +# ========================================================================================= +# Custom concurrency primitives needed to support `@select` +# ------------------ + function isready_put(c::Channel, sibling_tasks) # TODO: To fix the circular dependency, I think it might be enough to just add a check # here that there is at least one ready task that _isn't_ one of our siblings! We can @@ -56,7 +60,59 @@ function wait_nosibs(c::Channel, sibling_tasks) end nothing end -wait_nosibs(x, sibling_tasks) = wait(x) + +wait_select(c::Channel, parent_task, sibling_tasks) = wait_nosibs(c, sibling_tasks) +wait_select(c::Base.GenericCondition, parent_task, sibling_tasks) = wait_from_parent(c, parent_task) +wait_select(x, parent_task, sibling_tasks) = wait(x) + +# ---- Conditions --------- +assert_parent_haslock(c::Base.GenericCondition, parent_task) = assert_parent_haslock(c.lock, parent_task) +assert_parent_haslock(l::ReentrantLock, parent_task) = + (islocked(l) && l.locked_by === parent_task) ? nothing : Base.concurrency_violation() +assert_parent_haslock(l::Base.AlwaysLockedST, parent_task) = + (islocked(l) && l.ownertid === parent_task) ? nothing : Base.concurrency_violation() + +function wait_from_parent(c::Base.GenericCondition, parent_task) + ct = current_task() + # Note that the parent task is guaranteed to be blocking on us, so this is okay. + assert_parent_haslock(c, parent_task) + push!(c.waitq, ct) + token = unlockall_from_parent(c.lock, parent_task) + try + return wait() + catch + ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct) + rethrow() + finally + # Note that now _this task_ gets the lock, so we can execute the remaining body w/ the lock + Base.relockall(c.lock, token) + # eww, manually re-assign the parent to own this lock + c.lock.locked_by = parent_task + end +end + +function unlockall_from_parent(rl::ReentrantLock, parent_task) + n = rl.reentrancy_cnt + rl.locked_by === parent_task || error("unlock from wrong thread") + n == 0 && error("unlock count must match lock count") + lock(rl.cond_wait) + rl.reentrancy_cnt = 0 + rl.locked_by = nothing + if !isempty(rl.cond_wait.waitq) + try + notify(rl.cond_wait) + catch + unlock(rl.cond_wait) + rethrow() + end + end + unlock(rl.cond_wait) + return n +end + + +# ========================================================================================= + ## Implementation of 'select' mechanism to block on the disjunction of ## of 'waitable' objects. @@ -283,7 +339,7 @@ function _select_block_macro(clauses) bind_variable = :(nothing) elseif clause.kind == SelectTake isready_func = _isready - wait_for_channel = :(wait_nosibs($channel_var, tasks)) + wait_for_channel = :(wait_select($channel_var, maintask, tasks)) mutate_channel = :(_take!($channel_var)) bind_variable = :($value_var = branch_val) end @@ -394,7 +450,7 @@ function _select_block(clauses) if clause[1] == :put wait_put(clause[2], tasks) elseif clause[1] == :take - wait_nosibs(clause[2], tasks) + wait_select(clause[2], maintask, tasks) end catch err if isa(err, SelectInterrupt) diff --git a/test/Select.jl b/test/Select.jl index ce4f53d..39eca00 100644 --- a/test/Select.jl +++ b/test/Select.jl @@ -104,3 +104,73 @@ end end) == "timeout" end end + +# Other waitable things +# Conditions + # Note that we may not support Base.Condition() because it cannot be used in multithreaded code. + ## Simple condition test: wait until a condition is notified + #c = Condition() + #@async while notify(c) != 1 sleep(0.5) end + #@test @select(begin + # c => "c" + #end) == "c" + +@sync begin + c = Threads.Condition() + t = @async begin + success = false + while true + lock(c) + success = notify(c) > 0 + unlock(c) + if success break; end + sleep(0.5) + end + end + lock(c) + @test @select(begin + c => (unlock(c); "c") + end) == "c" +end + +# Multiple waiters +@sync begin + coordinator = Channel() + + c1 = Threads.Condition() + c2 = Threads.Condition() + + wait_on_conds() = begin + lock(c1) + lock(c2) + put!(coordinator, 0) + @select begin + c1 => begin + put!(coordinator, 0) + unlock(c2); unlock(c1); + end + c2 => begin + put!(coordinator, 0) + unlock(c2); unlock(c1); + end + end + end + t1 = @async wait_on_conds() + t2 = @async wait_on_conds() + + take!(coordinator); take!(coordinator) # Wait for both tasks to start waiting + + # Now, everyone is waiting, so notifying c1 will wake up _both_ select tasks (and kill both c2 siblings) + lock(c1) + @test notify(c1) == 2 + unlock(c1) + yield() + yield() + + take!(coordinator); take!(coordinator) # Wait for both tasks to finish running + + # Both select tasks will have killed their sibling clauses, so no one is listening on c2 + lock(c2) + @test notify(c2) == 0 + unlock(c2) +end