Skip to content

Commit

Permalink
drop internal notify_error API, use close instead
Browse files Browse the repository at this point in the history
  • Loading branch information
vtjnash committed Dec 11, 2018
1 parent 385afe8 commit 8afacd2
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 36 deletions.
59 changes: 29 additions & 30 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -108,24 +108,27 @@ isbuffered(c::Channel) = c.sz_max==0 ? false : true

function check_channel_state(c::Channel)
if !isopen(c)
c.excp !== nothing && throw(c.excp)
excp = c.excp
excp !== nothing && throw(excp)
throw(closed_exception())
end
end
"""
close(c::Channel)
close(c::Channel[, excp::Exception])
Close a channel. An exception is thrown by:
Close a channel. An exception (optionally given by `excp`), is thrown by:
* [`put!`](@ref) on a closed channel.
* [`take!`](@ref) and [`fetch`](@ref) on an empty, closed channel.
"""
function close(c::Channel)
function close(c::Channel, excp::Exception=closed_exception())
lock(c)
try
c.state = :closed
c.excp = closed_exception()
notify_error(c)
c.excp = excp
notify_error(c.cond_take, excp)
notify_error(c.cond_wait, excp)
notify_error(c.cond_put, excp)
finally
unlock(c)
end
Expand Down Expand Up @@ -219,24 +222,28 @@ function close_chnl_on_taskdone(t::Task, ref::WeakRef)
c = ref.value
if c isa Channel
isopen(c) || return
if !trylock(c)
# can't use `lock`, since attempts to task-switch to wait for it
# will just silently fail and leave us with broken state,
# so schedule this to happen once we are finished destroying our task
@async close_chnl_on_taskdone(t, ref)
return nothing
end
try
isopen(c) || return
if istaskfailed(t)
c.state = :closed
c.excp = task_result(t)
notify_error(c)
else
cleanup = () -> try
isopen(c) || return
if istaskfailed(t)
excp = task_result(t)
if excp isa Exception
close(c, excp)
return
end
end
close(c)
return
finally
unlock(c)
end
finally
unlock(c)
if trylock(c)
# can't use `lock`, since attempts to task-switch to wait for it
# will just silently fail and leave us with broken state
cleanup()
else
# so schedule this to happen once we are finished destroying our task
# (on a new Task)
@async (lock(c); cleanup())
end
end
nothing
Expand Down Expand Up @@ -390,14 +397,6 @@ function wait(c::Channel)
nothing
end

function notify_error(c::Channel, err)
notify_error(c.cond_take, err)
notify_error(c.cond_wait, err)
notify_error(c.cond_put, err)
nothing
end
notify_error(c::Channel) = notify_error(c, c.excp)

eltype(::Type{Channel{T}}) where {T} = T

show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c)))")
Expand Down
2 changes: 1 addition & 1 deletion stdlib/Distributed/src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,

# imports for use
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected,
VERSION_STRING, binding_module, notify_error, atexit, julia_exename,
VERSION_STRING, binding_module, atexit, julia_exename,
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, uv_error, something, notnothing, isbuffered

Expand Down
12 changes: 7 additions & 5 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1046,24 +1046,25 @@ function deregister_worker(pg, pid)
ids = []
tonotify = []
lock(client_refs) do
for (id,rv) in pg.refs
if in(pid,rv.clientset)
for (id, rv) in pg.refs
if in(pid, rv.clientset)
push!(ids, id)
end
if rv.waitingfor == pid
push!(tonotify, (id,rv))
push!(tonotify, (id, rv))
end
end
for id in ids
del_client(pg, id, pid)
end

# throw exception to tasks waiting for this pid
for (id,rv) in tonotify
notify_error(rv.c, ProcessExitedException())
for (id, rv) in tonotify
close(rv.c, ProcessExitedException())
delete!(pg.refs, id)
end
end
return
end


Expand All @@ -1073,6 +1074,7 @@ function interrupt(pid::Integer)
if isa(w, Worker)
manage(w.manager, w.id, w.config, :interrupt)
end
return
end

"""
Expand Down

0 comments on commit 8afacd2

Please sign in to comment.