Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Threads.Condition variables in Select. #3

Merged
merged 1 commit into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 59 additions & 3 deletions src/Select.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ using Nullables

export @select

# =========================================================================================
# Custom concurrency primitives needed to support `@select`
# ------------------

function isready_put(c::Channel, sibling_tasks)
# TODO: To fix the circular dependency, I think it might be enough to just add a check
# here that there is at least one ready task that _isn't_ one of our siblings! We can
Expand Down Expand Up @@ -56,7 +60,59 @@ function wait_nosibs(c::Channel, sibling_tasks)
end
nothing
end
wait_nosibs(x, sibling_tasks) = wait(x)

wait_select(c::Channel, parent_task, sibling_tasks) = wait_nosibs(c, sibling_tasks)
wait_select(c::Base.GenericCondition, parent_task, sibling_tasks) = wait_from_parent(c, parent_task)
wait_select(x, parent_task, sibling_tasks) = wait(x)

# ---- Conditions ---------
assert_parent_haslock(c::Base.GenericCondition, parent_task) = assert_parent_haslock(c.lock, parent_task)
assert_parent_haslock(l::ReentrantLock, parent_task) =
(islocked(l) && l.locked_by === parent_task) ? nothing : Base.concurrency_violation()
assert_parent_haslock(l::Base.AlwaysLockedST, parent_task) =
(islocked(l) && l.ownertid === parent_task) ? nothing : Base.concurrency_violation()

function wait_from_parent(c::Base.GenericCondition, parent_task)
ct = current_task()
# Note that the parent task is guaranteed to be blocking on us, so this is okay.
assert_parent_haslock(c, parent_task)
push!(c.waitq, ct)
token = unlockall_from_parent(c.lock, parent_task)
try
return wait()
catch
ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct)
rethrow()
finally
# Note that now _this task_ gets the lock, so we can execute the remaining body w/ the lock
Base.relockall(c.lock, token)
# eww, manually re-assign the parent to own this lock
c.lock.locked_by = parent_task
end
end

function unlockall_from_parent(rl::ReentrantLock, parent_task)
n = rl.reentrancy_cnt
rl.locked_by === parent_task || error("unlock from wrong thread")
n == 0 && error("unlock count must match lock count")
lock(rl.cond_wait)
rl.reentrancy_cnt = 0
rl.locked_by = nothing
if !isempty(rl.cond_wait.waitq)
try
notify(rl.cond_wait)
catch
unlock(rl.cond_wait)
rethrow()
end
end
unlock(rl.cond_wait)
return n
end


# =========================================================================================


## Implementation of 'select' mechanism to block on the disjunction of
## of 'waitable' objects.
Expand Down Expand Up @@ -283,7 +339,7 @@ function _select_block_macro(clauses)
bind_variable = :(nothing)
elseif clause.kind == SelectTake
isready_func = _isready
wait_for_channel = :(wait_nosibs($channel_var, tasks))
wait_for_channel = :(wait_select($channel_var, maintask, tasks))
mutate_channel = :(_take!($channel_var))
bind_variable = :($value_var = branch_val)
end
Expand Down Expand Up @@ -394,7 +450,7 @@ function _select_block(clauses)
if clause[1] == :put
wait_put(clause[2], tasks)
elseif clause[1] == :take
wait_nosibs(clause[2], tasks)
wait_select(clause[2], maintask, tasks)
end
catch err
if isa(err, SelectInterrupt)
Expand Down
70 changes: 70 additions & 0 deletions test/Select.jl
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,73 @@ end
end) == "timeout"
end
end

# Other waitable things
# Conditions
# Note that we may not support Base.Condition() because it cannot be used in multithreaded code.
## Simple condition test: wait until a condition is notified
#c = Condition()
#@async while notify(c) != 1 sleep(0.5) end
#@test @select(begin
# c => "c"
#end) == "c"

@sync begin
c = Threads.Condition()
t = @async begin
success = false
while true
lock(c)
success = notify(c) > 0
unlock(c)
if success break; end
sleep(0.5)
end
end
lock(c)
@test @select(begin
c => (unlock(c); "c")
end) == "c"
end

# Multiple waiters
@sync begin
coordinator = Channel()

c1 = Threads.Condition()
c2 = Threads.Condition()

wait_on_conds() = begin
lock(c1)
lock(c2)
put!(coordinator, 0)
@select begin
c1 => begin
put!(coordinator, 0)
unlock(c2); unlock(c1);
end
c2 => begin
put!(coordinator, 0)
unlock(c2); unlock(c1);
end
end
end
t1 = @async wait_on_conds()
t2 = @async wait_on_conds()

take!(coordinator); take!(coordinator) # Wait for both tasks to start waiting

# Now, everyone is waiting, so notifying c1 will wake up _both_ select tasks (and kill both c2 siblings)
lock(c1)
@test notify(c1) == 2
unlock(c1)
yield()
yield()

take!(coordinator); take!(coordinator) # Wait for both tasks to finish running

# Both select tasks will have killed their sibling clauses, so no one is listening on c2
lock(c2)
@test notify(c2) == 0
unlock(c2)
end