diff --git a/.travis.yml b/.travis.yml index 399549b..670a7bf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,8 +4,13 @@ os: - linux - osx julia: - - release + - 1.3 - nightly +env: + - JULIA_NUM_THREADS=1 + - JULIA_NUM_THREADS=6 + + notifications: email: false # uncomment the following lines to override the default test script diff --git a/Project.toml b/Project.toml new file mode 100644 index 0000000..ecc5079 --- /dev/null +++ b/Project.toml @@ -0,0 +1,17 @@ +name = "Select" +uuid = "ac7b3b08-bf78-4e82-aa6e-b038cb67d740" +authors = ["Nathan Daly ", "Brett Cornell ", + "Jon Malmaud "] +version = "0.1.0" + +[deps] +Nullables = "4d1e1d77-625e-5b40-9113-a560ec7a8ecd" + +[compat] +julia = "1.3" + +[extras] +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" + +[targets] +test = ["Test"] diff --git a/README.md b/README.md index ba627df..9686d43 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,17 @@ # Select -This is copy of [Jon Malmaud's](https://github.com/malmaud) go inspired select macro for the Julia programming language. I have made a slight syntax modification, but essentially all the code is his. - -This packege is currently not registered and can be installed with: +This repo is branched from https://github.com/durcan/Select.jl, which was originally a copy of [Jon Malmaud's](https://github.com/malmaud) go-inspired select macro for the Julia programming language. I have updated the repo for Julia 1.3+, multithreaded the Select macro, and hardened the code a bit. +Install this package via: +```julia +julia> Pkg.add("https://github.com/NHDaly/Select.jl") ``` -Pkg.clone("https://github.com/durcan/Select.jl.git") -``` + +The original README from [durcan/Select.jl](https://github.com/durcan/Select.jl) follows: + +----------------------------------------------------- + +This is copy of [Jon Malmaud's](https://github.com/malmaud) go inspired select macro for the Julia programming language. I have made a slight syntax modification, but essentially all the code is his. A select expression is for waiting on multiple communication operations and is of the form: diff --git a/examples/default.jl b/examples/default.jl new file mode 100644 index 0000000..7fcc87c --- /dev/null +++ b/examples/default.jl @@ -0,0 +1,35 @@ +module Default +using Select + +doevery(secs) = Channel{Int}() do ch + while true + sleep(secs) + put!(ch, 1) + end +end +doafter(secs) = Channel{Int}() do ch + sleep(secs) + put!(ch, 1) +end + +function main() + tick = doevery(0.1) + boom = doafter(0.5) + while true + @select begin + tick => println("tick.") + boom => begin + println("BOOM!") + return + end + _ => begin + println(" .") + sleep(0.05) + end + end + end +end + +main() + +end diff --git a/examples/fibonacci.jl b/examples/fibonacci.jl new file mode 100644 index 0000000..8ab43c4 --- /dev/null +++ b/examples/fibonacci.jl @@ -0,0 +1,38 @@ +# Translated from the Golang Tour's select intro: https://tour.golang.org/concurrency/5 +module Fibonacci + +using Select +import Base.Threads: @spawn + +function fibonacci(c::Channel{>:Int}, quit::Channel{>:Int}) + x, y = 0, 1 + while true + @select begin + c <| x => begin + x, y = y, x+y + end + quit => begin + println("quit") + return + end + end + end +end + +function main() + @sync begin + c = Channel{Int}() + quit = Channel{Int}() + @spawn begin + for i in 0:9 + println(take!(c)) + end + put!(quit, 0) + end + fibonacci(c, quit) + end +end + +main() + +end diff --git a/src/Select.jl b/src/Select.jl index d85a504..159c3be 100644 --- a/src/Select.jl +++ b/src/Select.jl @@ -1,23 +1,62 @@ module Select +using Nullables + export @select -function isready_put(c::Channel) - d = c.take_pos - c.put_pos - if (d == 1) || (d == -(c.szp1-1)) - if (c.szp1 - 1) ≥ c.sz_max - return false - end +function isready_put(c::Channel, sibling_tasks) + # TODO: To fix the circular dependency, I think it might be enough to just add a check + # here that there is at least one ready task that _isn't_ one of our siblings! We can + # take another argument to this function, which is the list of tasks, and cross-reference it? + return if Base.isbuffered(c) + length(c.data) != c.sz_max + else + # TODO: No this isn't enough. I need to do it for the _wait_ function, not the wait_put. :'( + #@info sibling_tasks + #@info "isready_put:" (!isempty(c.cond_take.waitq))#, collect(c.cond_take.waitq)) + !isempty(c.cond_take.waitq) && any(t->!in(t, sibling_tasks), c.cond_take.waitq) end - return true end -function wait_put(c::Channel) - while !isready_put(c) - wait(c.cond_put) +function wait_put(c::Channel, sibling_tasks) + #isready_put(c, sibling_tasks) && return + # TODO: Is this sufficiently thread-safe? + lock(c) + try + while !isready_put(c, sibling_tasks) + Base.check_channel_state(c) + wait(c.cond_put) # Can be cancelled while waiting here... + end + finally + unlock(c) end + nothing end +isready_wait_nosibs(c::Channel, sibling_tasks) = n_avail_nosibs(c, sibling_tasks) > 0 +function n_avail_nosibs(c::Channel, sibling_tasks) + if Base.isbuffered(c) + length(c.data) + else + #@info "isready_wait_nosibs:" (isempty(c.cond_put.waitq), collect(c.cond_put.waitq)) + length(filter(t->0==count(x->x==t, sibling_tasks), collect(c.cond_put.waitq))) + end +end +function wait_nosibs(c::Channel, sibling_tasks) + # I don't understand why its okay to access this outside the lock...? + #isready_wait_nosibs(c, sibling_tasks) && return + lock(c) + try + while !isready_wait_nosibs(c, sibling_tasks) + Base.check_channel_state(c) + wait(c.cond_wait) + end + finally + unlock(c) + end + nothing +end +wait_nosibs(x, sibling_tasks) = wait(x) ## Implementation of 'select' mechanism to block on the disjunction of ## of 'waitable' objects. @@ -30,7 +69,7 @@ end # println(value) # ... # end -immutable SelectClause{ChannelT, ValueT} +struct SelectClause{ChannelT, ValueT} kind::SelectClauseKind channel::Nullable{ChannelT} value::Nullable{ValueT} @@ -116,14 +155,14 @@ macro select(expr) # skip line nodes isa(se, Expr) || continue # grab all the pairs - if se.head == :(=>) - if se.args[1] != :_ - push!(clauses, (parse_select_clause(se.args[1]), se.args[2])) + if se.head == :call && se.args[1] == :(=>) + if se.args[2] != :_ + push!(clauses, (parse_select_clause(se.args[2]), se.args[3])) else # The defaule case (_). If present, the select # statement is considered non-blocking and will return this # section if none of the other conditions are immediately available. - push!(clauses, (SelectClause(SelectDefault, Nullable(), Nullable()), se.args[2])) + push!(clauses, (SelectClause(SelectDefault, Nullable(), Nullable()), se.args[3])) mode = :nonblocking end elseif se.head != :block && se.head != :line @@ -141,7 +180,7 @@ end # with select. # @select if x |> value ... will ultimately insert an expression value=_take!(x). _take!(c::AbstractChannel) = take!(c) -_take!(x) = wait(x) +_take!(x) = fetch(x) # @select if x <| value .... will ultimately inset value=put!(x), which currently # is only meanginful for channels and so no underscore varirant is used here. # These are used with the non-blocking variant of select, which will @@ -150,9 +189,12 @@ _take!(x) = wait(x) _isready(c::AbstractChannel) = isready(c) _isready(t::Task) = istaskdone(t) +_wait_condition(c::AbstractChannel) = c.cond_wait +_wait_condition(x) = x + # helper function to place the default case in the proper position function set_default_first!(clauses) - default_pos = find(clauses) do x + default_pos = findall(clauses) do x clause, body = x clause.kind == SelectDefault end @@ -170,13 +212,17 @@ function _select_nonblock_macro(clauses) for (clause, body) in clauses branch = if clause.kind == SelectPut - :(if isready_put($(clause.channel|>get|>esc)) - put!($(clause.channel|>get|>esc), $(clause.value|>get|>esc)) + channel_var = gensym("channel") + channel_assignment_expr = :($channel_var = $(clause.channel|>get|>esc)) + :(if ($channel_assignment_expr; isready_put($channel_var, [])) + put!($channel_var, $(clause.value|>get|>esc)) $(esc(body)) end) elseif clause.kind == SelectTake - :(if _isready($(clause.channel|>get|>esc)) - $(clause.value|>get|>esc) = _take!($(clause.channel|>get|>esc)) + channel_var = gensym("channel") + channel_assignment_expr = :($channel_var = $(clause.channel|>get|>esc)) + :(if ($channel_assignment_expr; _isready($channel_var)) + $(clause.value|>get|>esc) = _take!($channel_var) $(esc(body)) end) elseif clause.kind == SelectDefault @@ -195,58 +241,108 @@ end # the first available, it sends a special interrupt to its rivals to kill them. # The interrupt includes the task where control should be resumed # once the rival has shut itself down. -immutable SelectInterrupt <: Exception +struct SelectInterrupt <: Exception parent::Task end # Kill all tasks in "tasks" besides a given task. Used for killing the rivals # of the winning waiting task. function select_kill_rivals(tasks, myidx) + #@info myidx for (taskidx, task) in enumerate(tasks) taskidx == myidx && continue - if task.state==:waiting + #@info taskidx, task + #if task.state == :waiting || task.state == :queued # Rival is blocked waiting for its channel; send it a message that it's # lost the race. - Base.throwto(task, SelectInterrupt(current_task())) - elseif task.state==:queued - # Rival hasn't starting running yet and so hasn't blocked or set up - # a try-catch block to listen for SelectInterrupt. - # Just delete it from the workqueue. - queueidx = findfirst(Base.Workqueue.==task) - deleteat!(Base.Workqueue, queueidx) - end + Base.schedule(task, SelectInterrupt(current_task()), error=true) + # TODO: Is this still a legit optimization?: + # elseif task.state==:queued + # # Rival hasn't starting running yet and so hasn't blocked or set up + # # a try-catch block to listen for SelectInterrupt. + # # Just delete it from the workqueue. + # queueidx = findfirst(Base.Workqueue.==task) + # deleteat!(Base.Workqueue, queueidx) + # end end + #@info "done killing" end function _select_block_macro(clauses) branches = Expr(:block) body_branches = Expr(:block) + clause_lock = gensym("clause_lock") + lock_assignment_expr = :($clause_lock = Base.ReentrantLock()) for (i, (clause, body)) in enumerate(clauses) + channel_var = gensym("channel") + value_var = clause.value|>get|>esc + channel_declaration_expr = :(local $channel_var) + channel_assignment_expr = :($channel_var = $(clause.channel|>get|>esc)) if clause.kind == SelectPut - wait_for_channel = :(wait_put($(clause.channel|>get|>esc))) - mutate_channel = :(put!($(clause.channel|>get|>esc), $(clause.value|>get|>esc))) + isready_func = isready_put + wait_for_channel = :(wait_put($channel_var, tasks)) + mutate_channel = :(put!($channel_var, $value_var)) bind_variable = :(nothing) elseif clause.kind == SelectTake - wait_for_channel = :(wait($(clause.channel|>get|>esc))) - mutate_channel = :(_take!($(clause.channel|>get|>esc))) - bind_variable = :($(clause.value|>get|>esc) = branch_val) + isready_func = _isready + wait_for_channel = :(wait_nosibs($channel_var, tasks)) + mutate_channel = :(_take!($channel_var)) + bind_variable = :($value_var = branch_val) end branch = quote - tasks[$i] = @schedule begin + tasks[$i] = @async begin + $channel_declaration_expr try # Listen for genuine errors to throw to the main task + $channel_assignment_expr + + # ---- Begin the actual `wait_and_select` algorithm ---- + # TODO: Is this sufficiently thread-safe? + # Listen for SelectInterrupt messages so we can shutdown + # if a rival's channel unblocks first. try - # Listen for SelectInterrupt messages so we can shutdown - # if a rival's channel unblocks first. + #@info "Task $($i) about to wait" $wait_for_channel + + # TODO: Because of this gap, where no locks are held, it's possible + # that multiple tasks can be woken-up due to a `put!` or `take!` on + # a channel they were waiting for. Only once will proceed in this + # @select, but a channel running _outside this macro_ may yet proceed + # and cause a problem.. I think this is bad. Fix this (probably) by + # returning the lock to unlock from `wait_for_channel`. + + # NOTE: This is _not a deadock_ because there is a global ordering + # to the locks: we _ALWAYS_ wait on the channel before waiting on + # the clause_lock. This invariant must not be violated. + #@info "Task $($i) about to lock" + lock($clause_lock) + # We got the lock, so run this task to completion. + try + #@info "Task $($i): got lock" + # This block is atomic, so it _shouldn't_ matter whether we kill + # rivals first or mutate_channel first. It only matters if one + # case is accidentally synchronizing w/ another case, which + # should be specifically prohibited (somehow). + # For now, I'm killing rivals first so that at least we'll get + # an exception, rather than a deadlock, if we end up waiting on + # our rival, sibling cases. + #@info "Task $($i): killing rivals" + select_kill_rivals(tasks, $i) + + #@info "Task $($i): mutating" + event_val = $mutate_channel + #@info "Got event_val: $event_val" + put!(winner_ch, ($i, event_val)) + finally + #@info "Task $($i)) unlock" + unlock($clause_lock) + end catch err if isa(err, SelectInterrupt) - yieldto(err.parent) + #@info "CAUGHT SelectInterrupt: $err" + #yieldto(err.parent) # TODO: is this still a thing we should do? return else rethrow() end end - select_kill_rivals(tasks, $i) - event_val = $mutate_channel - put!(winner_ch, ($i, event_val)) catch err Base.throwto(maintask, err) end @@ -261,8 +357,9 @@ function _select_block_macro(clauses) end quote winner_ch = Channel(1) - tasks = Array(Task, $(length(clauses))) + tasks = Array{Task}(undef, $(length(clauses))) maintask = current_task() + $lock_assignment_expr $branches # set up competing tasks (branch_id, branch_val) = take!(winner_ch) # get the id of the winning task $body_branches # execute the winning block in the original lexical context @@ -273,7 +370,7 @@ end function _select_nonblock(clauses) for (i, clause) in enumerate(clauses) if clause[1] == :put - if isready_put(clause[2]) + if isready_put(clause[2], []) return (i, put!(clause[2], clause[3])) end elseif clause[1] == :take @@ -288,16 +385,16 @@ function _select_nonblock(clauses) end function _select_block(clauses) winner_ch = Channel{Tuple{Int, Any}}(1) - tasks = Array(Task, length(clauses)) + tasks = Array{Task}(undef, length(clauses)) maintask = current_task() for (i, clause) in enumerate(clauses) - tasks[i] = @async begin + tasks[i] = Threads.@spawn begin try try if clause[1] == :put - wait_put(clause[2]) + wait_put(clause[2], tasks) elseif clause[1] == :take - wait(clause[2]) + wait_nosibs(clause[2], tasks) end catch err if isa(err, SelectInterrupt) diff --git a/test/Select.jl b/test/Select.jl new file mode 100644 index 0000000..ce4f53d --- /dev/null +++ b/test/Select.jl @@ -0,0 +1,106 @@ +using Select +using Test + +function select_block_test(t1, t2, t3, t4) + c1 = Channel{Symbol}(1) + c2 = Channel{Int}(1) + c3 = Channel(1) + + put!(c3,1) + + @async begin + sleep(t1) + put!(c1,:a) + end + + @async begin + sleep(t2) + put!(c2,1) + end + + @async begin + sleep(t3) + take!(c3) + end + + task = @async begin + sleep(t4) + :task_done + end + + @select begin + c1 |> x => "Got $x from c1" + c2 => "Got a message from c2" + c3 <| :write_test => "Wrote to c3" + task |> z => begin + "Task finished with $z" + end + end +end + +@testset "@select blocking" begin + @test select_block_test(.5, 1, 1, 1) == "Got a from c1" + @test select_block_test(1, .5, 1, 1) == "Got a message from c2" + @test select_block_test(1, 1, .5, 1) == "Wrote to c3" + @test select_block_test(1, 1, 1, .5) == "Task finished with task_done" +end + +@testset "@select blocking, already ready" begin + ch = Channel(1) + put!(ch, 1) + @test (@select begin + ch |> x => x + @async(sleep(0.3)) => "timeout" + end) == 1 + + # Can immediately put into a buffered channel + ch = Channel(1) + @test (@select begin + ch <| 1 => take!(ch) + @async(sleep(0.3)) => "timeout" + end) == 1 +end + +function select_nonblock_test(test) + c = Channel(1) + c2 = Channel(1) + put!(c2, 1) + if test == :take + put!(c, 1) + elseif test == :put + take!(c2) + elseif test == :default + end + + @select begin + c |> x => "Got $x from c" + c2 <| 1 => "Wrote to c2" + _ => "Default case" + end +end + +@testset "@select non-blocking" begin + @test select_nonblock_test(:take) == "Got 1 from c" + @test select_nonblock_test(:put) == "Wrote to c2" + @test select_nonblock_test(:default) == "Default case" +end + +@testset "self-references" begin + # Test multiple times because the deadlock was only triggered if Task 1 is run before + # Task 2 or Task 3. Running it multiple times will hopefully cover all the cases. + for _ in 1:10 + # Test that the two clauses in `@select` can't trigger eachother (Here, the problem + # would be if Task 1 puts into ch, then Task 2 sees Task 1 waiting, and thinks ch is + # ready to take! and attempts to incorrectly proceed w/ the take!.) Instead, this should + # timeout, since no one else is putting or taking on `ch`. + ch = Channel() + @test @select(begin + ch <| "hi" => "put" + # This take!(ch) should not be triggered by the put + ch |> x => "take! |> $x" + # This wait(ch) should also not be triggered by the put + ch => "waiting on ch" + @async(sleep(0.3)) => "timeout" + end) == "timeout" + end +end diff --git a/test/runtests.jl b/test/runtests.jl index 43c31e7..d66542c 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,66 +1,5 @@ -using Select -using Base.Test +using Test -function select_block_test(t1, t2, t3, t4) - c1 = Channel{Symbol}(1) - c2 = Channel{Int}(1) - c3 = Channel(1) - - put!(c3,1) - - @schedule begin - sleep(t1) - put!(c1,:a) - end - - @schedule begin - sleep(t2) - put!(c2,1) - end - - @schedule begin - sleep(t3) - take!(c3) - end - - task = @schedule begin - sleep(t4) - :task_done - end - - @select begin - c1 |> x => "Got $x from c1" - c2 => "Got a message from c2" - c3 <| :write_test => "Wrote to c3" - task |> z => begin - "Task finished with $z" - end - end +@testset "Select.jl" begin + include("Select.jl") end - -@test select_block_test(.5, 1, 1, 1) == "Got a from c1" -@test select_block_test(1, .5, 1, 1) == "Got a message from c2" -@test select_block_test(1, 1, .5, 1) == "Wrote to c3" -@test select_block_test(1, 1, 1, .5) == "Task finished with task_done" - -function select_nonblock_test(test) - c = Channel(1) - c2 = Channel(1) - put!(c2, 1) - if test == :take - put!(c, 1) - elseif test == :put - take!(c2) - elseif test == :default - end - - @select begin - c |> x => "Got $x from c" - c2 <| 1 => "Wrote to c2" - _ => "Default case" - end -end - -@test select_nonblock_test(:take) == "Got 1 from c" -@test select_nonblock_test(:put) == "Wrote to c2" -@test select_nonblock_test(:default) == "Default case"