From 6b0cde3f0660131258c3dec78c55ce05d649595d Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Mon, 17 Dec 2018 21:23:03 -0500 Subject: [PATCH] Condition/RecursiveLock: add ability to handle threads (JuliaLang/julia#30061) This extends Condition to assert that it may only be used in the single-threaded case (co-operatively scheduled), and then adds a thread-safe version of the same: `Threads.Condition`. Additionally, it also upgrades ReentrantLock, etc. to be thread-safe. --- docs/src/index.md | 4 ---- src/cluster.jl | 4 ++-- src/managers.jl | 4 ++-- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/docs/src/index.md b/docs/src/index.md index da0b822e1d026..4653a260badcb 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -18,7 +18,6 @@ Distributed.pmap Distributed.RemoteException Distributed.Future Distributed.RemoteChannel -Distributed.wait Distributed.fetch(::Any) Distributed.remotecall(::Any, ::Integer, ::Any...) Distributed.remotecall_wait(::Any, ::Integer, ::Any...) @@ -38,13 +37,10 @@ Distributed.remotecall(::Any, ::AbstractWorkerPool, ::Any...) Distributed.remotecall_wait(::Any, ::AbstractWorkerPool, ::Any...) Distributed.remotecall_fetch(::Any, ::AbstractWorkerPool, ::Any...) Distributed.remote_do(::Any, ::AbstractWorkerPool, ::Any...) -Distributed.timedwait Distributed.@spawn Distributed.@spawnat Distributed.@fetch Distributed.@fetchfrom -Distributed.@async -Distributed.@sync Distributed.@distributed Distributed.@everywhere Distributed.clear!(::Any, ::Any; ::Any) diff --git a/src/cluster.jl b/src/cluster.jl index 50f81251be71a..747919e56b07e 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -67,7 +67,7 @@ mutable struct Worker manager::ClusterManager config::WorkerConfig version::Union{VersionNumber, Nothing} # Julia version of the remote process - initialized::Threads.Event + initialized::Event function Worker(id::Int, r_stream::IO, w_stream::IO, manager::ClusterManager; version::Union{VersionNumber, Nothing}=nothing, @@ -91,7 +91,7 @@ mutable struct Worker return map_pid_wrkr[id] end w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func) - w.initialized = Threads.Event() + w.initialized = Event() register_worker(w) w end diff --git a/src/managers.jl b/src/managers.jl index 3bc838750ab71..f888b0a1ceeac 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -124,7 +124,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: # Wait for all launches to complete. launch_tasks = Vector{Any}(undef, length(manager.machines)) - for (i,(machine, cnt)) in enumerate(manager.machines) + for (i, (machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt launch_tasks[i] = @async try launch_on_machine(manager, machine, cnt, params, launched, launch_ntfy) @@ -135,7 +135,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: end for t in launch_tasks - wait(t) + wait(t::Task) end notify(launch_ntfy)