Skip to content

Commit

Permalink
Condition/RecursiveLock: add ability to handle threads (#30061)
Browse files Browse the repository at this point in the history
This extends Condition to assert that it may only be used
in the single-threaded case (co-operatively scheduled),
and then adds a thread-safe version of the same:
`Threads.Condition`.

Additionally, it also upgrades ReentrantLock, etc. to be thread-safe.
  • Loading branch information
vtjnash authored Dec 18, 2018
1 parent 5e72a49 commit 6b0cde3
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 8 deletions.
4 changes: 0 additions & 4 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ Distributed.pmap
Distributed.RemoteException
Distributed.Future
Distributed.RemoteChannel
Distributed.wait
Distributed.fetch(::Any)
Distributed.remotecall(::Any, ::Integer, ::Any...)
Distributed.remotecall_wait(::Any, ::Integer, ::Any...)
Expand All @@ -38,13 +37,10 @@ Distributed.remotecall(::Any, ::AbstractWorkerPool, ::Any...)
Distributed.remotecall_wait(::Any, ::AbstractWorkerPool, ::Any...)
Distributed.remotecall_fetch(::Any, ::AbstractWorkerPool, ::Any...)
Distributed.remote_do(::Any, ::AbstractWorkerPool, ::Any...)
Distributed.timedwait
Distributed.@spawn
Distributed.@spawnat
Distributed.@fetch
Distributed.@fetchfrom
Distributed.@async
Distributed.@sync
Distributed.@distributed
Distributed.@everywhere
Distributed.clear!(::Any, ::Any; ::Any)
Expand Down
4 changes: 2 additions & 2 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mutable struct Worker
manager::ClusterManager
config::WorkerConfig
version::Union{VersionNumber, Nothing} # Julia version of the remote process
initialized::Threads.Event
initialized::Event

function Worker(id::Int, r_stream::IO, w_stream::IO, manager::ClusterManager;
version::Union{VersionNumber, Nothing}=nothing,
Expand All @@ -91,7 +91,7 @@ mutable struct Worker
return map_pid_wrkr[id]
end
w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func)
w.initialized = Threads.Event()
w.initialized = Event()
register_worker(w)
w
end
Expand Down
4 changes: 2 additions & 2 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
# Wait for all launches to complete.
launch_tasks = Vector{Any}(undef, length(manager.machines))

for (i,(machine, cnt)) in enumerate(manager.machines)
for (i, (machine, cnt)) in enumerate(manager.machines)
let machine=machine, cnt=cnt
launch_tasks[i] = @async try
launch_on_machine(manager, machine, cnt, params, launched, launch_ntfy)
Expand All @@ -135,7 +135,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
end

for t in launch_tasks
wait(t)
wait(t::Task)
end

notify(launch_ntfy)
Expand Down

0 comments on commit 6b0cde3

Please sign in to comment.