Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@select macro fixes #13853

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 76 additions & 70 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,13 @@ A select expression of the form:

```julia
@select begin
if clause
body
elseif clause
body
[else
default_body
]
clause1 => body1
clause2 => body2
_ => default_body
end
end
```

Wait for multiple clauses simultaneously using an if-else syntax, taking a different action depending on which clause is available first.
Wait for multiple clauses simultaneously using a pattern matching syntax, taking a different action depending on which clause is available first.

A clause has three possible forms:

Expand Down Expand Up @@ -241,14 +236,12 @@ channel2 = Channel()
task = @task ...

result = @select begin
if channel1 |> value
info("Took from channel1")
value
elseif channel2 <| :test
info("Put :test into channel2")
elseif task
info("task finished")
end
channel1 |> value => begin
info("Took from channel1")
value
end
channel2 <| :test => info("Put :test into channel2")
task => info("task finished")
end
```
"""
Expand All @@ -258,25 +251,23 @@ macro select(expr)
# an 'else' clause is present in the @select body (in which case it will be
# nonblocking).
mode = :blocking
while true
# Be robust to extraneous blocks caused by whitespace by skipping over them
while isa(expr, Expr) && expr.head == :block
expr = expr.args[2] # args[1] is a LineNumber node
end
if isa(expr, Expr) && expr.head == :if
push!(clauses, (parse_select_clause(expr.args[1]), expr.args[2]))
if length(expr.args) == 3
expr = expr.args[3] # The 'elseif' block
for se in expr.args
# skip line nodes
isa(se, Expr) || continue
# grab all the pairs
if se.head == :(=>)
if se.args[1] != :_
push!(clauses, (parse_select_clause(se.args[1]), se.args[2]))
else
break
# The defaule case (_). If present, the select
# statement is considered non-blocking and will return this
# section if none of the other conditions are immediately available.
push!(clauses, (SelectClause(SelectDefault, Nullable(), Nullable()), se.args[2]))
mode = :nonblocking
end
else
# The 'else' section of an if statement. If present, the select
# statement is considered non-blocking and will return this
# section if none of the other conditions are immediately available.
push!(clauses, (SelectClause(SelectDefault, Nullable(), Nullable()), expr))
mode = :nonblocking
break
elseif se.head != :block && se.head != :line
# if we run into an expression that is not a block. line or pair throw an error
throw(ErrorException("Selection expressions must be Pairs. Found: $(se.head)"))
end
end
if mode == :nonblocking
Expand All @@ -302,42 +293,48 @@ _take!(x) = wait(x)
_isready(c::AbstractChannel) = isready(c)
_isready(t::Task) = istaskdone(t)

# helper function to place the default case in the proper position
function set_default_first!(clauses)
default_pos = find(clauses) do x
clause, body = x
clause.kind == SelectDefault
end
l = length(default_pos)
l == 0 && return # bail out if there is no default case
if l > 1
d = [clauses[i] for i in default_pos]
err_str = reduce(* , ["\n _ => $(string(b))" for (c, b) in d])
throw(ErrorException("Select takes at most one default case. Found: $l $err_str"))
end
# swap elements to sure make SelectDefault comes first
clauses[1], clauses[default_pos[1]] = clauses[default_pos[1]], clauses[1]
clauses
end

function _select_nonblock_macro(clauses)
set_default_first!(clauses)
branches = Expr(:block)
for (clause, body) in clauses
local branch
branch =
if clause.kind == SelectPut
branch = quote
if isready_put($(clause.channel|>get|>esc))
put!($(clause.channel|>get|>esc), $(clause.value|>get|>esc))
ret = $(esc(body))
break
end
end
:(if isready_put($(clause.channel|>get|>esc))
put!($(clause.channel|>get|>esc), $(clause.value|>get|>esc))
$(esc(body))
end)
elseif clause.kind == SelectTake
branch = quote
if _isready($(clause.channel|>get|>esc))
$(clause.value|>get|>esc) =
_take!($(clause.channel|>get|>esc))
ret = $(esc(body))
break
end
end
:(if _isready($(clause.channel|>get|>esc))
$(clause.value|>get|>esc) = _take!($(clause.channel|>get|>esc))
$(esc(body))
end)
elseif clause.kind == SelectDefault
branch = quote
ret = $(esc(body))
break
end
:($(esc(body)))
end
push!(branches.args, branch)
end
quote
local ret
while true
$branches
end
ret

# the next two lines build an if / elseif chain from the bottom up
push!(branch.args, branches)
branches = branch
end
:($branches)
end

# The strategy for blocking select statements is to create a set of "rival"
Expand Down Expand Up @@ -370,13 +367,16 @@ end

function _select_block_macro(clauses)
branches = Expr(:block)
body_branches = Expr(:block)
for (i, (clause, body)) in enumerate(clauses)
if clause.kind == SelectPut
wait_for_channel = :(wait_put($(clause.channel|>get|>esc)))
mutate_channel = :(put!($(clause.channel|>get|>esc), $(clause.value|>get|>esc)))
bind_variable = :(nothing)
elseif clause.kind == SelectTake
wait_for_channel = :(wait($(clause.channel|>get|>esc)))
mutate_channel = :($(clause.value|>get|>esc) = _take!($(clause.channel|>get|>esc)))
mutate_channel = :(_take!($(clause.channel|>get|>esc)))
bind_variable = :($(clause.value|>get|>esc) = branch_val)
end
branch = quote
tasks[$i] = @schedule begin
Expand All @@ -394,21 +394,27 @@ function _select_block_macro(clauses)
end
end
select_kill_rivals(tasks, $i)
$mutate_channel
put!(winner_ch, $(esc(body)))
event_val = $mutate_channel
put!(winner_ch, ($i, event_val))
catch err
throwto(maintask, err)
Base.throwto(maintask, err)
end
end
end
end # if
end # for
push!(branches.args, branch)

body_branch = :(if branch_id == $i; $bind_variable; $(esc(body)); end)
# the next two lines build an if / elseif chain from the bottom up
push!(body_branch.args, body_branches)
body_branches = body_branch
end
quote
winner_ch = Channel(1)
tasks = Array(Task, $(length(clauses)))
maintask = current_task()
$branches
take!(winner_ch)
$branches # set up competing tasks
(branch_id, branch_val) = take!(winner_ch) # get the id of the winning task
$body_branches # execute the winning block in the original lexical context
end
end

Expand Down
7 changes: 2 additions & 5 deletions doc/manual/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,8 @@ Consider this simple example which times out if a computation takes too long::
end

@select begin
if output_channel |> value
println("Calculation produced $value")
elseif timeout_channel
println("Timed out waiting for computation")
end
output_channel |> value => println("Calculation produced $value")
timeout_channel => println("Timed out waiting for computation")
end
end

Expand Down
21 changes: 7 additions & 14 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@ function select_block_test(t1, t2, t3, t4)
end

@select begin
if c1 |> x
"Got $x from c1"
elseif c2
"Got a message from c2"
elseif c3 <| :write_test
"Wrote to c3"
elseif task |> z
c1 |> x => "Got $x from c1"
c2 => "Got a message from c2"
c3 <| :write_test => "Wrote to c3"
task |> z => begin
"Task finished with $z"
end
end
Expand All @@ -59,13 +56,9 @@ function select_nonblock_test(test)
end

@select begin
if c |> x
"Got $x from c"
elseif c2 <| 1
"Wrote to c2"
else
"Default case"
end
c |> x => "Got $x from c"
c2 <| 1 => "Wrote to c2"
_ => "Default case"
end
end

Expand Down