Skip to content

Commit

Permalink
Merge pull request #3 from NHDaly/condition-variables
Browse files Browse the repository at this point in the history
Support Threads.Condition variables in Select.
  • Loading branch information
NHDaly authored Sep 5, 2019
2 parents ba29a87 + a751207 commit a42449f
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 3 deletions.
62 changes: 59 additions & 3 deletions src/Select.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ using Nullables

export @select

# =========================================================================================
# Custom concurrency primitives needed to support `@select`
# ------------------

function isready_put(c::Channel, sibling_tasks)
# TODO: To fix the circular dependency, I think it might be enough to just add a check
# here that there is at least one ready task that _isn't_ one of our siblings! We can
Expand Down Expand Up @@ -56,7 +60,59 @@ function wait_nosibs(c::Channel, sibling_tasks)
end
nothing
end
wait_nosibs(x, sibling_tasks) = wait(x)

wait_select(c::Channel, parent_task, sibling_tasks) = wait_nosibs(c, sibling_tasks)
wait_select(c::Base.GenericCondition, parent_task, sibling_tasks) = wait_from_parent(c, parent_task)
wait_select(x, parent_task, sibling_tasks) = wait(x)

# ---- Conditions ---------
assert_parent_haslock(c::Base.GenericCondition, parent_task) = assert_parent_haslock(c.lock, parent_task)
assert_parent_haslock(l::ReentrantLock, parent_task) =
(islocked(l) && l.locked_by === parent_task) ? nothing : Base.concurrency_violation()
assert_parent_haslock(l::Base.AlwaysLockedST, parent_task) =
(islocked(l) && l.ownertid === parent_task) ? nothing : Base.concurrency_violation()

function wait_from_parent(c::Base.GenericCondition, parent_task)
ct = current_task()
# Note that the parent task is guaranteed to be blocking on us, so this is okay.
assert_parent_haslock(c, parent_task)
push!(c.waitq, ct)
token = unlockall_from_parent(c.lock, parent_task)
try
return wait()
catch
ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct)
rethrow()
finally
# Note that now _this task_ gets the lock, so we can execute the remaining body w/ the lock
Base.relockall(c.lock, token)
# eww, manually re-assign the parent to own this lock
c.lock.locked_by = parent_task
end
end

function unlockall_from_parent(rl::ReentrantLock, parent_task)
n = rl.reentrancy_cnt
rl.locked_by === parent_task || error("unlock from wrong thread")
n == 0 && error("unlock count must match lock count")
lock(rl.cond_wait)
rl.reentrancy_cnt = 0
rl.locked_by = nothing
if !isempty(rl.cond_wait.waitq)
try
notify(rl.cond_wait)
catch
unlock(rl.cond_wait)
rethrow()
end
end
unlock(rl.cond_wait)
return n
end


# =========================================================================================


## Implementation of 'select' mechanism to block on the disjunction of
## of 'waitable' objects.
Expand Down Expand Up @@ -283,7 +339,7 @@ function _select_block_macro(clauses)
bind_variable = :(nothing)
elseif clause.kind == SelectTake
isready_func = _isready
wait_for_channel = :(wait_nosibs($channel_var, tasks))
wait_for_channel = :(wait_select($channel_var, maintask, tasks))
mutate_channel = :(_take!($channel_var))
bind_variable = :($value_var = branch_val)
end
Expand Down Expand Up @@ -394,7 +450,7 @@ function _select_block(clauses)
if clause[1] == :put
wait_put(clause[2], tasks)
elseif clause[1] == :take
wait_nosibs(clause[2], tasks)
wait_select(clause[2], maintask, tasks)
end
catch err
if isa(err, SelectInterrupt)
Expand Down
70 changes: 70 additions & 0 deletions test/Select.jl
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,73 @@ end
end) == "timeout"
end
end

# Other waitable things
# Conditions
# Note that we may not support Base.Condition() because it cannot be used in multithreaded code.
## Simple condition test: wait until a condition is notified
#c = Condition()
#@async while notify(c) != 1 sleep(0.5) end
#@test @select(begin
# c => "c"
#end) == "c"

@sync begin
c = Threads.Condition()
t = @async begin
success = false
while true
lock(c)
success = notify(c) > 0
unlock(c)
if success break; end
sleep(0.5)
end
end
lock(c)
@test @select(begin
c => (unlock(c); "c")
end) == "c"
end

# Multiple waiters
@sync begin
coordinator = Channel()

c1 = Threads.Condition()
c2 = Threads.Condition()

wait_on_conds() = begin
lock(c1)
lock(c2)
put!(coordinator, 0)
@select begin
c1 => begin
put!(coordinator, 0)
unlock(c2); unlock(c1);
end
c2 => begin
put!(coordinator, 0)
unlock(c2); unlock(c1);
end
end
end
t1 = @async wait_on_conds()
t2 = @async wait_on_conds()

take!(coordinator); take!(coordinator) # Wait for both tasks to start waiting

# Now, everyone is waiting, so notifying c1 will wake up _both_ select tasks (and kill both c2 siblings)
lock(c1)
@test notify(c1) == 2
unlock(c1)
yield()
yield()

take!(coordinator); take!(coordinator) # Wait for both tasks to finish running

# Both select tasks will have killed their sibling clauses, so no one is listening on c2
lock(c2)
@test notify(c2) == 0
unlock(c2)
end

0 comments on commit a42449f

Please sign in to comment.