Skip to content

Commit

Permalink
cluster manager fixes (#30172)
Browse files Browse the repository at this point in the history
* kill workers which don't launch properly

* don't emit spurious error messages

* document how to asynchronously launch workers
  • Loading branch information
bjarthur authored and ViralBShah committed Dec 29, 2018
1 parent 6b0cde3 commit 50c8e62
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
51 changes: 44 additions & 7 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,11 @@ function redirect_worker_output(ident, stream)
end
end

struct LaunchWorkerError <: Exception
msg::String
end

Base.showerror(io::IO, e::LaunchWorkerError) = print(io, e.msg)

# The default TCP transport relies on the worker listening on a free
# port available and printing its bind address and port.
Expand Down Expand Up @@ -272,7 +277,7 @@ function read_worker_host_port(io::IO)

conninfo = fetch(readtask)
if isempty(conninfo) && !isopen(io)
error("Unable to read host:port string from worker. Launch command exited with error?")
throw(LaunchWorkerError("Unable to read host:port string from worker. Launch command exited with error?"))
end

ntries -= 1
Expand All @@ -286,13 +291,13 @@ function read_worker_host_port(io::IO)
end
close(io)
if ntries > 0
error("Timed out waiting to read host:port string from worker.")
throw(LaunchWorkerError("Timed out waiting to read host:port string from worker."))
else
error("Unexpected output from worker launch command. Host:port string not found.")
throw(LaunchWorkerError("Unexpected output from worker launch command. Host:port string not found."))
end
finally
for line in leader
println("\tFrom failed worker startup:\t", line)
println("\tFrom worker startup:\t", line)
end
end
end
Expand Down Expand Up @@ -354,6 +359,34 @@ the package `ClusterManagers.jl`.
The number of seconds a newly launched worker waits for connection establishment from the
master can be specified via variable `JULIA_WORKER_TIMEOUT` in the worker process's
environment. Relevant only when using TCP/IP as transport.
To launch workers without blocking the REPL, or the containing function
if launching workers programmatically, execute `addprocs` in its own task.
# Examples
```
# On busy clusters, call `addprocs` asynchronously
t = @async addprocs(...)
```
```
# Utilize workers as and when they come online
if nprocs() > 1 # Ensure at least one new worker is available
.... # perform distributed execution
end
```
```
# Retrieve newly launched worker IDs, or any error messages
if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't block
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
```
"""
function addprocs(manager::ClusterManager; kwargs...)
init_multi()
Expand Down Expand Up @@ -499,9 +532,13 @@ function create_worker(manager, wconfig)
local r_s, w_s
try
(r_s, w_s) = connect(manager, w.id, wconfig)
catch
deregister_worker(w.id)
rethrow()
catch ex
try
deregister_worker(w.id)
kill(manager, w.id, wconfig)
finally
rethrow(ex)
end
end

w = Worker(w.id, r_s, w_s, manager; config=wconfig)
Expand Down
2 changes: 1 addition & 1 deletion test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ for (addp_testf, expected_errstr, env) in testruns
old_stdout = stdout
stdout_out, stdout_in = redirect_stdout()
stdout_txt = @async filter!(readlines(stdout_out)) do s
return !startswith(s, "\tFrom failed worker startup:\t")
return !startswith(s, "\tFrom worker startup:\t")
end
try
withenv(env...) do
Expand Down

0 comments on commit 50c8e62

Please sign in to comment.