From 8afacd28e1d96a5ce4ee9ec9b91967ac99ddad5f Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Tue, 11 Dec 2018 15:56:04 -0500 Subject: [PATCH] drop internal `notify_error` API, use `close` instead --- base/channels.jl | 59 +++++++++++++-------------- stdlib/Distributed/src/Distributed.jl | 2 +- stdlib/Distributed/src/cluster.jl | 12 +++--- 3 files changed, 37 insertions(+), 36 deletions(-) diff --git a/base/channels.jl b/base/channels.jl index 5f9596c9da82f..eb82db3587824 100644 --- a/base/channels.jl +++ b/base/channels.jl @@ -108,24 +108,27 @@ isbuffered(c::Channel) = c.sz_max==0 ? false : true function check_channel_state(c::Channel) if !isopen(c) - c.excp !== nothing && throw(c.excp) + excp = c.excp + excp !== nothing && throw(excp) throw(closed_exception()) end end """ - close(c::Channel) + close(c::Channel[, excp::Exception]) -Close a channel. An exception is thrown by: +Close a channel. An exception (optionally given by `excp`), is thrown by: * [`put!`](@ref) on a closed channel. * [`take!`](@ref) and [`fetch`](@ref) on an empty, closed channel. """ -function close(c::Channel) +function close(c::Channel, excp::Exception=closed_exception()) lock(c) try c.state = :closed - c.excp = closed_exception() - notify_error(c) + c.excp = excp + notify_error(c.cond_take, excp) + notify_error(c.cond_wait, excp) + notify_error(c.cond_put, excp) finally unlock(c) end @@ -219,24 +222,28 @@ function close_chnl_on_taskdone(t::Task, ref::WeakRef) c = ref.value if c isa Channel isopen(c) || return - if !trylock(c) - # can't use `lock`, since attempts to task-switch to wait for it - # will just silently fail and leave us with broken state, - # so schedule this to happen once we are finished destroying our task - @async close_chnl_on_taskdone(t, ref) - return nothing - end - try - isopen(c) || return - if istaskfailed(t) - c.state = :closed - c.excp = task_result(t) - notify_error(c) - else + cleanup = () -> try + isopen(c) || return + if istaskfailed(t) + excp = task_result(t) + if excp isa Exception + close(c, excp) + return + end + end close(c) + return + finally + unlock(c) end - finally - unlock(c) + if trylock(c) + # can't use `lock`, since attempts to task-switch to wait for it + # will just silently fail and leave us with broken state + cleanup() + else + # so schedule this to happen once we are finished destroying our task + # (on a new Task) + @async (lock(c); cleanup()) end end nothing @@ -390,14 +397,6 @@ function wait(c::Channel) nothing end -function notify_error(c::Channel, err) - notify_error(c.cond_take, err) - notify_error(c.cond_wait, err) - notify_error(c.cond_put, err) - nothing -end -notify_error(c::Channel) = notify_error(c, c.excp) - eltype(::Type{Channel{T}}) where {T} = T show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c)))") diff --git a/stdlib/Distributed/src/Distributed.jl b/stdlib/Distributed/src/Distributed.jl index e0afb433a6f3f..1ab793f44d8ee 100644 --- a/stdlib/Distributed/src/Distributed.jl +++ b/stdlib/Distributed/src/Distributed.jl @@ -11,7 +11,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length, # imports for use using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected, - VERSION_STRING, binding_module, notify_error, atexit, julia_exename, + VERSION_STRING, binding_module, atexit, julia_exename, julia_cmd, AsyncGenerator, acquire, release, invokelatest, shell_escape_posixly, uv_error, something, notnothing, isbuffered diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index 747919e56b07e..7d1f47c2dd763 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -1046,12 +1046,12 @@ function deregister_worker(pg, pid) ids = [] tonotify = [] lock(client_refs) do - for (id,rv) in pg.refs - if in(pid,rv.clientset) + for (id, rv) in pg.refs + if in(pid, rv.clientset) push!(ids, id) end if rv.waitingfor == pid - push!(tonotify, (id,rv)) + push!(tonotify, (id, rv)) end end for id in ids @@ -1059,11 +1059,12 @@ function deregister_worker(pg, pid) end # throw exception to tasks waiting for this pid - for (id,rv) in tonotify - notify_error(rv.c, ProcessExitedException()) + for (id, rv) in tonotify + close(rv.c, ProcessExitedException()) delete!(pg.refs, id) end end + return end @@ -1073,6 +1074,7 @@ function interrupt(pid::Integer) if isa(w, Worker) manage(w.manager, w.id, w.config, :interrupt) end + return end """