From a0b7a7610334d0925f126ae20f5ff2d074e3ba69 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Mon, 17 Dec 2018 21:23:03 -0500 Subject: [PATCH] Condition/RecursiveLock: add ability to handle threads (#30061) This extends Condition to assert that it may only be used in the single-threaded case (co-operatively scheduled), and then adds a thread-safe version of the same: `Threads.Condition`. Additionally, it also upgrades ReentrantLock, etc. to be thread-safe. --- NEWS.md | 6 + base/event.jl | 117 ++++++++-- base/exports.jl | 1 + base/lock.jl | 194 ++++++++++++---- base/locks-mt.jl | 137 +++++++++++ base/locks.jl | 297 ------------------------ base/stream.jl | 8 +- base/sysimg.jl | 2 +- base/threadcall.jl | 4 +- base/threads.jl | 5 +- base/weakkeydict.jl | 4 +- doc/src/base/multi-threading.md | 19 +- doc/src/base/parallel.md | 38 ++- doc/src/manual/faq.md | 3 +- stdlib/Distributed/docs/src/index.md | 4 - stdlib/Distributed/src/cluster.jl | 4 +- stdlib/Distributed/src/managers.jl | 4 +- stdlib/FileWatching/src/FileWatching.jl | 6 +- stdlib/Sockets/test/runtests.jl | 87 +++---- test/channels.jl | 10 + test/read.jl | 5 +- test/spawn.jl | 15 +- test/threads.jl | 18 +- 23 files changed, 523 insertions(+), 465 deletions(-) create mode 100644 base/locks-mt.jl delete mode 100644 base/locks.jl diff --git a/NEWS.md b/NEWS.md index ebb6d68faa426..1bdb74e324f23 100644 --- a/NEWS.md +++ b/NEWS.md @@ -7,6 +7,12 @@ New language features * The `extrema` function now accepts a function argument in the same manner as `minimum` and `maximum` ([#30323]). +Multi-threading changes +----------------------- + + * The `Condition` type now has a thread-safe replacement, accessed as `Threads.Condition`. + With that addition, task scheduling primitives such as `ReentrantLock` are now thread-safe ([#30061]). + Language changes ---------------- diff --git a/base/event.jl b/base/event.jl index cf5e93cc25934..0c73e09e3deea 100644 --- a/base/event.jl +++ b/base/event.jl @@ -1,23 +1,75 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license +## thread/task locking abstraction + +""" + AbstractLock + +Abstract supertype describing types that +implement the synchronization primitives: +[`lock`](@ref), [`trylock`](@ref), [`unlock`](@ref), and [`islocked`](@ref). +""" +abstract type AbstractLock end +function lock end +function unlock end +function trylock end +function islocked end +unlockall(l::AbstractLock) = unlock(l) # internal function for implementing `wait` +relockall(l::AbstractLock, token::Nothing) = lock(l) # internal function for implementing `wait` +assert_havelock(l::AbstractLock) = assert_havelock(l, Threads.threadid()) +assert_havelock(l::AbstractLock, tid::Integer) = + (islocked(l) && tid == Threads.threadid()) ? nothing : error("concurrency violation detected") +assert_havelock(l::AbstractLock, tid::Task) = + (islocked(l) && tid === current_task()) ? nothing : error("concurrency violation detected") +assert_havelock(l::AbstractLock, tid::Nothing) = error("concurrency violation detected") + +""" + AlwaysLockedST + +This struct does not implement a real lock, but instead +pretends to be always locked on the original thread it was allocated on, +and simply ignores all other interactions. +It also does not synchronize tasks; for that use a real lock such as [`RecursiveLock`](@ref). +This can be used in the place of a real lock to, instead, simply and cheaply assert +that the operation is only occurring on a single cooperatively-scheduled thread. +It is thus functionally equivalent to allocating a real, recursive, task-unaware lock +immediately calling `lock` on it, and then never calling a matching `unlock`, +except that calling `lock` from another thread will throw a concurrency violation exception. +""" +struct AlwaysLockedST <: AbstractLock + ownertid::Int16 + AlwaysLockedST() = new(Threads.threadid()) +end +assert_havelock(l::AlwaysLockedST) = assert_havelock(l, l.ownertid) +lock(l::AlwaysLockedST) = assert_havelock(l) +unlock(l::AlwaysLockedST) = assert_havelock(l) +trylock(l::AlwaysLockedST) = l.ownertid == Threads.threadid() +islocked(::AlwaysLockedST) = true + + ## condition variables """ - Condition() + GenericCondition -Create an edge-triggered event source that tasks can wait for. Tasks that call [`wait`](@ref) on a -`Condition` are suspended and queued. Tasks are woken up when [`notify`](@ref) is later called on -the `Condition`. Edge triggering means that only tasks waiting at the time [`notify`](@ref) is -called can be woken up. For level-triggered notifications, you must keep extra state to keep -track of whether a notification has happened. The [`Channel`](@ref) type does -this, and so can be used for level-triggered events. +Abstract implementation of a condition object +for synchonizing tasks objects with a given lock. """ -mutable struct Condition +struct GenericCondition{L<:AbstractLock} waitq::Vector{Any} + lock::L - Condition() = new([]) + GenericCondition{L}() where {L<:AbstractLock} = new{L}([], L()) + GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}([], l) + GenericCondition(l::AbstractLock) = new{typeof(l)}([], l) end +assert_havelock(c::GenericCondition) = assert_havelock(c.lock) +lock(c::GenericCondition) = lock(c.lock) +unlock(c::GenericCondition) = unlock(c.lock) +trylock(c::GenericCondition) = trylock(c.lock) +islocked(c::GenericCondition) = islocked(c.lock) + """ wait([x]) @@ -37,16 +89,19 @@ restarted by an explicit call to [`schedule`](@ref) or [`yieldto`](@ref). Often `wait` is called within a `while` loop to ensure a waited-for condition is met before proceeding. """ -function wait(c::Condition) +function wait(c::GenericCondition) ct = current_task() - + assert_havelock(c) push!(c.waitq, ct) + token = unlockall(c.lock) try return wait() catch filter!(x->x!==ct, c.waitq) rethrow() + finally + relockall(c.lock, token) end end @@ -59,26 +114,52 @@ is raised as an exception in the woken tasks. Return the count of tasks woken up. Return 0 if no tasks are waiting on `condition`. """ -notify(c::Condition, @nospecialize(arg = nothing); all=true, error=false) = notify(c, arg, all, error) -function notify(c::Condition, arg, all, error) +notify(c::GenericCondition, @nospecialize(arg = nothing); all=true, error=false) = notify(c, arg, all, error) +function notify(c::GenericCondition, @nospecialize(arg), all, error) + assert_havelock(c) cnt = 0 if all cnt = length(c.waitq) for t in c.waitq - error ? schedule(t, arg, error=error) : schedule(t, arg) + schedule(t, arg, error=error) end empty!(c.waitq) elseif !isempty(c.waitq) cnt = 1 t = popfirst!(c.waitq) - error ? schedule(t, arg, error=error) : schedule(t, arg) + schedule(t, arg, error=error) end - cnt + return cnt end -notify_error(c::Condition, err) = notify(c, err, true, true) +notify_error(c::GenericCondition, err) = notify(c, err, true, true) + +n_waiters(c::GenericCondition) = length(c.waitq) + +""" + isempty(condition) + +Return `true` if no tasks are waiting on the condition, `false` otherwise. +""" +isempty(c::GenericCondition) = isempty(c.waitq) + + +# default (Julia v1.0) is currently single-threaded +# (although it uses MT-safe versions, when possible) +""" + Condition() + +Create an edge-triggered event source that tasks can wait for. Tasks that call [`wait`](@ref) on a +`Condition` are suspended and queued. Tasks are woken up when [`notify`](@ref) is later called on +the `Condition`. Edge triggering means that only tasks waiting at the time [`notify`](@ref) is +called can be woken up. For level-triggered notifications, you must keep extra state to keep +track of whether a notification has happened. The [`Channel`](@ref) and [`Event`](@ref) types do +this, and can be used for level-triggered events. + +This object is NOT thread-safe. See [`Threads.Condition`](@ref) for a thread-safe version. +""" +const Condition = GenericCondition{AlwaysLockedST} -n_waiters(c::Condition) = length(c.waitq) ## scheduler and work queue diff --git a/base/exports.jl b/base/exports.jl index 3163d4914d3dc..74ef52ea28e77 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -641,6 +641,7 @@ export # tasks and conditions Condition, + Event, current_task, islocked, istaskdone, diff --git a/base/lock.jl b/base/lock.jl index f7ab63337e1b5..aef65052cdf8e 100644 --- a/base/lock.jl +++ b/base/lock.jl @@ -4,20 +4,19 @@ """ ReentrantLock() -Creates a reentrant lock for synchronizing [`Task`](@ref)s. +Creates a re-entrant lock for synchronizing [`Task`](@ref)s. The same task can acquire the lock as many times as required. Each [`lock`](@ref) must be matched with an [`unlock`](@ref). - -This lock is NOT threadsafe. See [`Threads.Mutex`](@ref) for a threadsafe lock. """ -mutable struct ReentrantLock +mutable struct ReentrantLock <: AbstractLock locked_by::Union{Task, Nothing} - cond_wait::Condition + cond_wait::GenericCondition{Threads.SpinLock} reentrancy_cnt::Int - ReentrantLock() = new(nothing, Condition(), 0) + ReentrantLock() = new(nothing, GenericCondition{Threads.SpinLock}(), 0) end + """ islocked(lock) -> Status (Boolean) @@ -40,15 +39,20 @@ Each successful `trylock` must be matched by an [`unlock`](@ref). """ function trylock(rl::ReentrantLock) t = current_task() - if rl.reentrancy_cnt == 0 - rl.locked_by = t - rl.reentrancy_cnt = 1 - return true - elseif t == notnothing(rl.locked_by) - rl.reentrancy_cnt += 1 - return true + lock(rl.cond_wait) + try + if rl.reentrancy_cnt == 0 + rl.locked_by = t + rl.reentrancy_cnt = 1 + return true + elseif t == notnothing(rl.locked_by) + rl.reentrancy_cnt += 1 + return true + end + return false + finally + unlock(rl.cond_wait) end - return false end """ @@ -62,16 +66,21 @@ Each `lock` must be matched by an [`unlock`](@ref). """ function lock(rl::ReentrantLock) t = current_task() - while true - if rl.reentrancy_cnt == 0 - rl.locked_by = t - rl.reentrancy_cnt = 1 - return - elseif t == notnothing(rl.locked_by) - rl.reentrancy_cnt += 1 - return + lock(rl.cond_wait) + try + while true + if rl.reentrancy_cnt == 0 + rl.locked_by = t + rl.reentrancy_cnt = 1 + return + elseif t == notnothing(rl.locked_by) + rl.reentrancy_cnt += 1 + return + end + wait(rl.cond_wait) end - wait(rl.cond_wait) + finally + unlock(rl.cond_wait) end end @@ -84,18 +93,48 @@ If this is a recursive lock which has been acquired before, decrement an internal counter and return immediately. """ function unlock(rl::ReentrantLock) - if rl.reentrancy_cnt == 0 - error("unlock count must match lock count") + t = current_task() + rl.reentrancy_cnt == 0 && error("unlock count must match lock count") + rl.locked_by == t || error("unlock from wrong thread") + lock(rl.cond_wait) + try + rl.reentrancy_cnt -= 1 + if rl.reentrancy_cnt == 0 + rl.locked_by = nothing + notify(rl.cond_wait) + end + finally + unlock(rl.cond_wait) end - rl.reentrancy_cnt -= 1 - if rl.reentrancy_cnt == 0 + return +end + +function unlockall(rl::ReentrantLock) + t = current_task() + n = rl.reentrancy_cnt + rl.locked_by == t || error("unlock from wrong thread") + n == 0 && error("unlock count must match lock count") + lock(rl.cond_wait) + try + rl.reentrancy_cnt = 0 rl.locked_by = nothing notify(rl.cond_wait) + finally + unlock(rl.cond_wait) end + return n +end + +function relockall(rl::ReentrantLock, n::Int) + t = current_task() + lock(rl) + n1 = rl.reentrancy_cnt + rl.reentrancy_cnt = n + n1 == 1 || error("concurrency violation detected") return end -function lock(f, l) +function lock(f, l::AbstractLock) lock(l) try return f() @@ -104,7 +143,7 @@ function lock(f, l) end end -function trylock(f, l) +function trylock(f, l::AbstractLock) if trylock(l) try return f() @@ -115,20 +154,41 @@ function trylock(f, l) return false end +@eval Threads begin + """ + Threads.Condition([lock]) + + A thread-safe version of [`Base.Condition`](@ref). + + !!! compat "Julia 1.2" + This functionality requires at least Julia 1.2. + """ + const Condition = Base.GenericCondition{Base.ReentrantLock} + + """ + Special note for [`Threads.Condition`](@ref): + + The caller must be holding the [`lock`](@ref) that owns `c` before calling this method. + The calling task will be blocked until some other task wakes it, + usually by calling [`notify`](@ref)` on the same Condition object. + The lock will be atomically released when blocking (even if it was locked recursively), + and will be reacquired before returning. + """ + wait(c::Condition) +end + """ Semaphore(sem_size) Create a counting semaphore that allows at most `sem_size` acquires to be in use at any time. Each acquire must be matched with a release. - -This construct is NOT threadsafe. """ mutable struct Semaphore sem_size::Int curr_cnt::Int - cond_wait::Condition - Semaphore(sem_size) = sem_size > 0 ? new(sem_size, 0, Condition()) : throw(ArgumentError("Semaphore size must be > 0")) + cond_wait::Threads.Condition + Semaphore(sem_size) = sem_size > 0 ? new(sem_size, 0, Threads.Condition()) : throw(ArgumentError("Semaphore size must be > 0")) end """ @@ -138,14 +198,16 @@ Wait for one of the `sem_size` permits to be available, blocking until one can be acquired. """ function acquire(s::Semaphore) - while true - if s.curr_cnt < s.sem_size - s.curr_cnt = s.curr_cnt + 1 - return - else + lock(s.cond_wait) + try + while s.curr_cnt >= s.sem_size wait(s.cond_wait) end + s.curr_cnt = s.curr_cnt + 1 + finally + unlock(s.cond_wait) end + return end """ @@ -156,7 +218,57 @@ possibly allowing another task to acquire it and resume execution. """ function release(s::Semaphore) - @assert s.curr_cnt > 0 "release count must match acquire count" - s.curr_cnt -= 1 - notify(s.cond_wait; all=false) + lock(s.cond_wait) + try + s.curr_cnt > 0 || error("release count must match acquire count") + s.curr_cnt -= 1 + notify(s.cond_wait; all=false) + finally + unlock(s.cond_wait) + end + return +end + + +""" + Event() + +Create a level-triggered event source. Tasks that call [`wait`](@ref) on an +`Event` are suspended and queued until `notify` is called on the `Event`. +After `notify` is called, the `Event` remains in a signaled state and +tasks will no longer block when waiting for it. + +!!! compat "Julia 1.1" + This functionality requires at least Julia 1.1. +""" +mutable struct Event + notify::Threads.Condition + set::Bool + Event() = new(Threads.Condition(), false) +end + +function wait(e::Event) + e.set && return + lock(e.notify) + try + while !e.set + wait(e.notify) + end + finally + unlock(e.notify) + end + nothing +end + +function notify(e::Event) + lock(e.notify) + try + if !e.set + e.set = true + notify(e.notify) + end + finally + unlock(e.notify) + end + nothing end diff --git a/base/locks-mt.jl b/base/locks-mt.jl new file mode 100644 index 0000000000000..397d41543da9a --- /dev/null +++ b/base/locks-mt.jl @@ -0,0 +1,137 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +import .Base: _uv_hook_close, unsafe_convert, + lock, trylock, unlock, islocked, wait, notify, + AbstractLock + +# Important Note: these low-level primitives defined here +# are typically not for general usage + +########################################## +# Atomic Locks +########################################## + +# Test-and-test-and-set spin locks are quickest up to about 30ish +# contending threads. If you have more contention than that, perhaps +# a lock is the wrong way to synchronize. +""" + SpinLock() + +Create a non-reentrant lock. +Recursive use will result in a deadlock. +Each [`lock`](@ref) must be matched with an [`unlock`](@ref). + +Test-and-test-and-set spin locks are quickest up to about 30ish +contending threads. If you have more contention than that, perhaps +a lock is the wrong way to synchronize. + +See also [`Mutex`](@ref) for a more efficient version on one core or if the +lock may be held for a considerable length of time. +""" +struct SpinLock <: AbstractLock + handle::Atomic{Int} + SpinLock() = new(Atomic{Int}(0)) +end + +function lock(l::SpinLock) + while true + if l.handle[] == 0 + p = atomic_xchg!(l.handle, 1) + if p == 0 + return + end + end + ccall(:jl_cpu_pause, Cvoid, ()) + # Temporary solution before we have gc transition support in codegen. + ccall(:jl_gc_safepoint, Cvoid, ()) + end +end + +function trylock(l::SpinLock) + if l.handle[] == 0 + return atomic_xchg!(l.handle, 1) == 0 + end + return false +end + +function unlock(l::SpinLock) + l.handle[] = 0 + ccall(:jl_cpu_wake, Cvoid, ()) + return +end + +function islocked(l::SpinLock) + return l.handle[] != 0 +end + + +########################################## +# System Mutexes +########################################## + +# These are mutexes from libuv. +const UV_MUTEX_SIZE = ccall(:jl_sizeof_uv_mutex, Cint, ()) + +""" + Mutex() + +These are standard system mutexes for locking critical sections of logic. + +On Windows, this is a critical section object, +on pthreads, this is a `pthread_mutex_t`. + +See also [`SpinLock`](@ref) for a lighter-weight lock. +""" +mutable struct Mutex <: AbstractLock + ownertid::Int16 + handle::Ptr{Cvoid} + function Mutex() + m = new(zero(Int16), Libc.malloc(UV_MUTEX_SIZE)) + ccall(:uv_mutex_init, Cvoid, (Ptr{Cvoid},), m.handle) + finalizer(_uv_hook_close, m) + return m + end +end + +unsafe_convert(::Type{Ptr{Cvoid}}, m::Mutex) = m.handle + +function _uv_hook_close(x::Mutex) + h = x.handle + if h != C_NULL + x.handle = C_NULL + ccall(:uv_mutex_destroy, Cvoid, (Ptr{Cvoid},), h) + Libc.free(h) + nothing + end +end + +function lock(m::Mutex) + m.ownertid == threadid() && error("concurrency violation detected") # deadlock + # Temporary solution before we have gc transition support in codegen. + # This could mess up gc state when we add codegen support. + gc_state = ccall(:jl_gc_safe_enter, Int8, ()) + ccall(:uv_mutex_lock, Cvoid, (Ptr{Cvoid},), m) + ccall(:jl_gc_safe_leave, Cvoid, (Int8,), gc_state) + m.ownertid = threadid() + return +end + +function trylock(m::Mutex) + m.ownertid == threadid() && error("concurrency violation detected") # deadlock + r = ccall(:uv_mutex_trylock, Cint, (Ptr{Cvoid},), m) + if r == 0 + m.ownertid = threadid() + end + return r == 0 +end + +function unlock(m::Mutex) + m.ownertid == threadid() || error("concurrency violation detected") + m.ownertid = 0 + ccall(:uv_mutex_unlock, Cvoid, (Ptr{Cvoid},), m) + return +end + +function islocked(m::Mutex) + return m.ownertid != 0 +end diff --git a/base/locks.jl b/base/locks.jl deleted file mode 100644 index 78454f3d38779..0000000000000 --- a/base/locks.jl +++ /dev/null @@ -1,297 +0,0 @@ -# This file is a part of Julia. License is MIT: https://julialang.org/license - -import .Base: _uv_hook_close, unsafe_convert, - lock, trylock, unlock, islocked, wait, notify - -export SpinLock, RecursiveSpinLock, Mutex, Event - - -########################################## -# Atomic Locks -########################################## - -""" - AbstractLock - -Abstract supertype describing types that -implement the thread-safe synchronization primitives: -[`lock`](@ref), [`trylock`](@ref), [`unlock`](@ref), and [`islocked`](@ref). -""" -abstract type AbstractLock end - -# Test-and-test-and-set spin locks are quickest up to about 30ish -# contending threads. If you have more contention than that, perhaps -# a lock is the wrong way to synchronize. -""" - TatasLock() - -See [`SpinLock`](@ref). -""" -struct TatasLock <: AbstractLock - handle::Atomic{Int} - TatasLock() = new(Atomic{Int}(0)) -end - -""" - SpinLock() - -Create a non-reentrant lock. -Recursive use will result in a deadlock. -Each [`lock`](@ref) must be matched with an [`unlock`](@ref). - -Test-and-test-and-set spin locks are quickest up to about 30ish -contending threads. If you have more contention than that, perhaps -a lock is the wrong way to synchronize. - -See also [`RecursiveSpinLock`](@ref) for a version that permits recursion. - -See also [`Mutex`](@ref) for a more efficient version on one core or if the -lock may be held for a considerable length of time. -""" -const SpinLock = TatasLock - -function lock(l::TatasLock) - while true - if l.handle[] == 0 - p = atomic_xchg!(l.handle, 1) - if p == 0 - return - end - end - ccall(:jl_cpu_pause, Cvoid, ()) - # Temporary solution before we have gc transition support in codegen. - ccall(:jl_gc_safepoint, Cvoid, ()) - end -end - -function trylock(l::TatasLock) - if l.handle[] == 0 - return atomic_xchg!(l.handle, 1) == 0 - end - return false -end - -function unlock(l::TatasLock) - l.handle[] = 0 - ccall(:jl_cpu_wake, Cvoid, ()) - return -end - -function islocked(l::TatasLock) - return l.handle[] != 0 -end - - -""" - RecursiveTatasLock() - -See [`RecursiveSpinLock`](@ref). -""" -struct RecursiveTatasLock <: AbstractLock - ownertid::Atomic{Int16} - handle::Atomic{Int} - RecursiveTatasLock() = new(Atomic{Int16}(0), Atomic{Int}(0)) -end - -""" - RecursiveSpinLock() - -Creates a reentrant lock. -The same thread can acquire the lock as many times as required. -Each [`lock`](@ref) must be matched with an [`unlock`](@ref). - -See also [`SpinLock`](@ref) for a slightly faster version. - -See also [`Mutex`](@ref) for a more efficient version on one core or if the lock -may be held for a considerable length of time. -""" -const RecursiveSpinLock = RecursiveTatasLock - -function lock(l::RecursiveTatasLock) - if l.ownertid[] == threadid() - l.handle[] += 1 - return - end - while true - if l.handle[] == 0 - if atomic_cas!(l.handle, 0, 1) == 0 - l.ownertid[] = threadid() - return - end - end - ccall(:jl_cpu_pause, Cvoid, ()) - # Temporary solution before we have gc transition support in codegen. - ccall(:jl_gc_safepoint, Cvoid, ()) - end -end - -function trylock(l::RecursiveTatasLock) - if l.ownertid[] == threadid() - l.handle[] += 1 - return true - end - if l.handle[] == 0 - if atomic_cas!(l.handle, 0, 1) == 0 - l.ownertid[] = threadid() - return true - end - return false - end - return false -end - -function unlock(l::RecursiveTatasLock) - @assert(l.ownertid[] == threadid(), "unlock from wrong thread") - @assert(l.handle[] != 0, "unlock count must match lock count") - if l.handle[] == 1 - l.ownertid[] = 0 - l.handle[] = 0 - ccall(:jl_cpu_wake, Cvoid, ()) - else - l.handle[] -= 1 - end - return -end - -function islocked(l::RecursiveTatasLock) - return l.handle[] != 0 -end - - -########################################## -# System Mutexes -########################################## - -# These are mutexes from libuv. We're doing some error checking (and -# paying for it in overhead), but regardless, in some situations, -# passing a bad parameter will cause an abort. - -# TODO: how defensive to get, and how to turn it off? -# TODO: how to catch an abort? - -const UV_MUTEX_SIZE = ccall(:jl_sizeof_uv_mutex, Cint, ()) - -""" - Mutex() - -These are standard system mutexes for locking critical sections of logic. - -On Windows, this is a critical section object, -on pthreads, this is a `pthread_mutex_t`. - -See also [`SpinLock`](@ref) for a lighter-weight lock. -""" -mutable struct Mutex <: AbstractLock - ownertid::Int16 - handle::Ptr{Cvoid} - function Mutex() - m = new(zero(Int16), Libc.malloc(UV_MUTEX_SIZE)) - ccall(:uv_mutex_init, Cvoid, (Ptr{Cvoid},), m.handle) - finalizer(_uv_hook_close, m) - return m - end -end - -unsafe_convert(::Type{Ptr{Cvoid}}, m::Mutex) = m.handle - -function _uv_hook_close(x::Mutex) - h = x.handle - if h != C_NULL - x.handle = C_NULL - ccall(:uv_mutex_destroy, Cvoid, (Ptr{Cvoid},), h) - Libc.free(h) - nothing - end -end - -function lock(m::Mutex) - if m.ownertid == threadid() - return - end - # Temporary solution before we have gc transition support in codegen. - # This could mess up gc state when we add codegen support. - gc_state = ccall(:jl_gc_safe_enter, Int8, ()) - ccall(:uv_mutex_lock, Cvoid, (Ptr{Cvoid},), m) - ccall(:jl_gc_safe_leave, Cvoid, (Int8,), gc_state) - m.ownertid = threadid() - return -end - -function trylock(m::Mutex) - if m.ownertid == threadid() - return true - end - r = ccall(:uv_mutex_trylock, Cint, (Ptr{Cvoid},), m) - if r == 0 - m.ownertid = threadid() - end - return r == 0 -end - -function unlock(m::Mutex) - @assert(m.ownertid == threadid(), "unlock from wrong thread") - m.ownertid = 0 - ccall(:uv_mutex_unlock, Cvoid, (Ptr{Cvoid},), m) - return -end - -function islocked(m::Mutex) - return m.ownertid != 0 -end - -""" - Event() - -Create a level-triggered event source. Tasks that call [`wait`](@ref) on an -`Event` are suspended and queued until `notify` is called on the `Event`. -After `notify` is called, the `Event` remains in a signaled state and -tasks will no longer block when waiting for it. - -!!! compat "Julia 1.1" - This functionality requires at least Julia 1.1. -""" -mutable struct Event - lock::Mutex - q::Vector{Task} - set::Bool - # TODO: use a Condition with its paired lock - Event() = new(Mutex(), Task[], false) -end - -function wait(e::Event) - e.set && return - lock(e.lock) - while !e.set - ct = current_task() - push!(e.q, ct) - unlock(e.lock) - try - wait() - catch - filter!(x->x!==ct, e.q) - rethrow() - end - lock(e.lock) - end - unlock(e.lock) - return nothing -end - -function notify(e::Event) - lock(e.lock) - if !e.set - e.set = true - for t in e.q - schedule(t) - end - empty!(e.q) - end - unlock(e.lock) - return nothing -end - -# TODO: decide what to call this -#function clear(e::Event) -# e.set = false -# return nothing -#end diff --git a/base/stream.jl b/base/stream.jl index 71fe818482026..6bf914fc05d77 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -345,7 +345,7 @@ function wait_readbyte(x::LibuvStream, c::UInt8) wait(x.readnotify) end finally - if isempty(x.readnotify.waitq) + if isempty(x.readnotify) stop_reading(x) # stop reading iff there are currently no other read clients of the stream end unpreserve_handle(x) @@ -368,7 +368,7 @@ function wait_readnb(x::LibuvStream, nb::Int) wait(x.readnotify) end finally - if isempty(x.readnotify.waitq) + if isempty(x.readnotify) stop_reading(x) # stop reading iff there are currently no other read clients of the stream end if oldthrottle <= x.throttle <= nb @@ -774,7 +774,7 @@ function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int) return bytesavailable(newbuf) finally s.buffer = sbuf - if !isempty(s.readnotify.waitq) + if !isempty(s.readnotify) start_reading(s) # resume reading iff there are currently other read clients of the stream end end @@ -810,7 +810,7 @@ function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt) nb == bytesavailable(newbuf) || throw(EOFError()) finally s.buffer = sbuf - if !isempty(s.readnotify.waitq) + if !isempty(s.readnotify) start_reading(s) # resume reading iff there are currently other read clients of the stream end end diff --git a/base/sysimg.jl b/base/sysimg.jl index 8e0dd138b9cea..48920935e95d4 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -314,8 +314,8 @@ include("env.jl") include("libuv.jl") include("event.jl") include("task.jl") -include("lock.jl") include("threads.jl") +include("lock.jl") include("weakkeydict.jl") # Logging diff --git a/base/threadcall.jl b/base/threadcall.jl index 1dd48b4097f7a..4754afec0d6ac 100644 --- a/base/threadcall.jl +++ b/base/threadcall.jl @@ -1,7 +1,7 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license const max_ccall_threads = parse(Int, get(ENV, "UV_THREADPOOL_SIZE", "4")) -const thread_notifiers = Union{Condition, Nothing}[nothing for i in 1:max_ccall_threads] +const thread_notifiers = Union{Base.Condition, Nothing}[nothing for i in 1:max_ccall_threads] const threadcall_restrictor = Semaphore(max_ccall_threads) """ @@ -81,7 +81,7 @@ function do_threadcall(fun_ptr::Ptr{Cvoid}, rettype::Type, argtypes::Vector, arg # wait for a worker thread to be available acquire(threadcall_restrictor) idx = findfirst(isequal(nothing), thread_notifiers)::Int - thread_notifiers[idx] = Condition() + thread_notifiers[idx] = Base.Condition() GC.@preserve args_arr ret_arr roots begin # queue up the work to be done diff --git a/base/threads.jl b/base/threads.jl index 0fa1981ea17c2..292513418525b 100644 --- a/base/threads.jl +++ b/base/threads.jl @@ -5,9 +5,12 @@ Experimental multithreading support. """ module Threads +global Condition # we'll define this later, make sure we don't import Base.Condition + include("threadingconstructs.jl") include("atomics.jl") -include("locks.jl") +include("locks-mt.jl") + """ resize_nthreads!(A, copyvalue=A[1]) diff --git a/base/weakkeydict.jl b/base/weakkeydict.jl index 09199e6ce5c40..4b6dab33aca54 100644 --- a/base/weakkeydict.jl +++ b/base/weakkeydict.jl @@ -14,12 +14,12 @@ See [`Dict`](@ref) for further help. Note, unlike [`Dict`](@ref), """ mutable struct WeakKeyDict{K,V} <: AbstractDict{K,V} ht::Dict{WeakRef,V} - lock::Threads.RecursiveSpinLock + lock::ReentrantLock finalizer::Function # Constructors mirror Dict's function WeakKeyDict{K,V}() where V where K - t = new(Dict{Any,V}(), Threads.RecursiveSpinLock(), identity) + t = new(Dict{Any,V}(), ReentrantLock(), identity) t.finalizer = function (k) # when a weak key is finalized, remove from dictionary if it is still there if islocked(t) diff --git a/doc/src/base/multi-threading.md b/doc/src/base/multi-threading.md index 508c2f348057e..93f36b54760e3 100644 --- a/doc/src/base/multi-threading.md +++ b/doc/src/base/multi-threading.md @@ -1,4 +1,3 @@ - # Multi-Threading This experimental interface supports Julia's multi-threading capabilities. Types and functions @@ -8,6 +7,9 @@ described here might (and likely will) change in the future. Base.Threads.threadid Base.Threads.nthreads Base.Threads.@threads +``` + +```@docs Base.Threads.Atomic Base.Threads.atomic_cas! Base.Threads.atomic_xchg! @@ -28,20 +30,11 @@ Base.Threads.atomic_fence Base.@threadcall ``` -## Synchronization Primitives +# Low-level synchronization primitives + +These building blocks are used to create the regular synchronization objects. ```@docs -Base.Threads.AbstractLock -Base.lock -Base.unlock -Base.trylock -Base.islocked -Base.ReentrantLock Base.Threads.Mutex Base.Threads.SpinLock -Base.Threads.RecursiveSpinLock -Base.Semaphore -Base.acquire -Base.release ``` - diff --git a/doc/src/base/parallel.md b/doc/src/base/parallel.md index cd689e15dfd33..5a669b2eaf75a 100644 --- a/doc/src/base/parallel.md +++ b/doc/src/base/parallel.md @@ -2,19 +2,47 @@ ```@docs Core.Task +Base.@task +Base.@async +Base.@sync +Base.asyncmap +Base.asyncmap! +Base.fetch(t::Task) Base.current_task Base.istaskdone Base.istaskstarted -Base.yield -Base.yieldto Base.task_local_storage(::Any) Base.task_local_storage(::Any, ::Any) Base.task_local_storage(::Function, ::Any, ::Any) +``` + +# Scheduling + +```@docs +Base.yield +Base.yieldto +Base.sleep +Base.wait +Base.timedwait + Base.Condition +Base.Threads.Condition Base.notify Base.schedule -Base.@task -Base.sleep + +Base.Event + +Base.Semaphore +Base.acquire +Base.release + +Base.AbstractLock +Base.lock +Base.unlock +Base.trylock +Base.islocked +Base.ReentrantLock + Base.Channel Base.put!(::Channel, ::Any) Base.take!(::Channel) @@ -22,6 +50,4 @@ Base.isready(::Channel) Base.fetch(::Channel) Base.close(::Channel) Base.bind(c::Channel, task::Task) -Base.asyncmap -Base.asyncmap! ``` diff --git a/doc/src/manual/faq.md b/doc/src/manual/faq.md index e6ae749fd1b5a..4db755c652d89 100644 --- a/doc/src/manual/faq.md +++ b/doc/src/manual/faq.md @@ -774,8 +774,7 @@ julia> @sync for i in 1:3 You can lock your writes with a `ReentrantLock` like this: ```jldoctest -julia> l = ReentrantLock() -ReentrantLock(nothing, Condition(Any[]), 0) +julia> l = ReentrantLock(); julia> @sync for i in 1:3 @async begin diff --git a/stdlib/Distributed/docs/src/index.md b/stdlib/Distributed/docs/src/index.md index da0b822e1d026..4653a260badcb 100644 --- a/stdlib/Distributed/docs/src/index.md +++ b/stdlib/Distributed/docs/src/index.md @@ -18,7 +18,6 @@ Distributed.pmap Distributed.RemoteException Distributed.Future Distributed.RemoteChannel -Distributed.wait Distributed.fetch(::Any) Distributed.remotecall(::Any, ::Integer, ::Any...) Distributed.remotecall_wait(::Any, ::Integer, ::Any...) @@ -38,13 +37,10 @@ Distributed.remotecall(::Any, ::AbstractWorkerPool, ::Any...) Distributed.remotecall_wait(::Any, ::AbstractWorkerPool, ::Any...) Distributed.remotecall_fetch(::Any, ::AbstractWorkerPool, ::Any...) Distributed.remote_do(::Any, ::AbstractWorkerPool, ::Any...) -Distributed.timedwait Distributed.@spawn Distributed.@spawnat Distributed.@fetch Distributed.@fetchfrom -Distributed.@async -Distributed.@sync Distributed.@distributed Distributed.@everywhere Distributed.clear!(::Any, ::Any; ::Any) diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index 50f81251be71a..747919e56b07e 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -67,7 +67,7 @@ mutable struct Worker manager::ClusterManager config::WorkerConfig version::Union{VersionNumber, Nothing} # Julia version of the remote process - initialized::Threads.Event + initialized::Event function Worker(id::Int, r_stream::IO, w_stream::IO, manager::ClusterManager; version::Union{VersionNumber, Nothing}=nothing, @@ -91,7 +91,7 @@ mutable struct Worker return map_pid_wrkr[id] end w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func) - w.initialized = Threads.Event() + w.initialized = Event() register_worker(w) w end diff --git a/stdlib/Distributed/src/managers.jl b/stdlib/Distributed/src/managers.jl index 3bc838750ab71..f888b0a1ceeac 100644 --- a/stdlib/Distributed/src/managers.jl +++ b/stdlib/Distributed/src/managers.jl @@ -124,7 +124,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: # Wait for all launches to complete. launch_tasks = Vector{Any}(undef, length(manager.machines)) - for (i,(machine, cnt)) in enumerate(manager.machines) + for (i, (machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt launch_tasks[i] = @async try launch_on_machine(manager, machine, cnt, params, launched, launch_ntfy) @@ -135,7 +135,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: end for t in launch_tasks - wait(t) + wait(t::Task) end notify(launch_ntfy) diff --git a/stdlib/FileWatching/src/FileWatching.jl b/stdlib/FileWatching/src/FileWatching.jl index 484540ba1c547..86aec8c997c84 100644 --- a/stdlib/FileWatching/src/FileWatching.jl +++ b/stdlib/FileWatching/src/FileWatching.jl @@ -338,7 +338,7 @@ function uv_pollcb(handle::Ptr{Cvoid}, status::Int32, events::Int32) else t.events |= events if t.active[1] || t.active[2] - if isempty(t.notify.waitq) + if isempty(t.notify) # if we keep hearing about events when nobody appears to be listening, # stop the poll to save cycles t.active = (false, false) @@ -400,7 +400,7 @@ function start_watching(t::PollingFileWatcher) end function stop_watching(t::PollingFileWatcher) - if t.active && isempty(t.notify.waitq) + if t.active && isempty(t.notify) t.active = false uv_error("PollingFileWatcher (stop)", ccall(:uv_fs_poll_stop, Int32, (Ptr{Cvoid},), t.handle)) @@ -420,7 +420,7 @@ function start_watching(t::FileMonitor) end function stop_watching(t::FileMonitor) - if t.active && isempty(t.notify.waitq) + if t.active && isempty(t.notify) t.active = false uv_error("FileMonitor (stop)", ccall(:uv_fs_event_stop, Int32, (Ptr{Cvoid},), t.handle)) diff --git a/stdlib/Sockets/test/runtests.jl b/stdlib/Sockets/test/runtests.jl index 6065debb2b8b7..d38acfea3c9c5 100644 --- a/stdlib/Sockets/test/runtests.jl +++ b/stdlib/Sockets/test/runtests.jl @@ -64,10 +64,8 @@ end inet = Sockets.InetAddr(IPv4(127,0,0,1), 1024) @test inet.host == ip"127.0.0.1" @test inet.port == 1024 - io = IOBuffer() - show(io, inet) str = "Sockets.InetAddr{$(isdefined(Main, :IPv4) ? "" : "Sockets.")IPv4}(ip\"127.0.0.1\", 1024)" - @test String(take!(io)) == str + @test sprint(show, inet) == str end @testset "InetAddr invalid port" begin @test_throws InexactError Sockets.InetAddr(IPv4(127,0,0,1), -1) @@ -134,23 +132,20 @@ defaultport = rand(2000:4000) @test read(client, String) == "Hello World\n" * ("a1\n"^100) end end - Base.wait(tsk) + wait(tsk) end mktempdir() do tmpdir socketname = Sys.iswindows() ? ("\\\\.\\pipe\\uv-test-" * randstring(6)) : joinpath(tmpdir, "socket") - c = Condition() + s = listen(socketname) tsk = @async begin - s = listen(socketname) - notify(c) sock = accept(s) - write(sock,"Hello World\n") + write(sock, "Hello World\n") close(s) close(sock) end - wait(c) @test read(connect(socketname), String) == "Hello World\n" - Base.wait(tsk) + wait(tsk) end end @@ -210,7 +205,7 @@ end end @test fetch(r) === :start close(server) - Base.wait(tsk) + wait(tsk) end # test connecting to a named port @@ -231,42 +226,34 @@ end @testset "UDPSocket" begin # test show() function for UDPSocket() - @test endswith(repr(UDPSocket()), "UDPSocket(init)") - a = UDPSocket() - b = UDPSocket() - bind(a, ip"127.0.0.1", randport) - bind(b, ip"127.0.0.1", randport + 1) + @test repr(UDPSocket()) ∈ ("Sockets.UDPSocket(init)", "UDPSocket(init)") - c = Condition() - tsk = @async begin - @test String(recv(a)) == "Hello World" - # Issue 6505 - tsk2 = @async begin - @test String(recv(a)) == "Hello World" - notify(c) + let + a = UDPSocket() + b = UDPSocket() + bind(a, ip"127.0.0.1", randport) + bind(b, ip"127.0.0.1", randport + 1) + + @sync begin + # FIXME: check that we received all messages + for i = 1:3 + @async send(b, ip"127.0.0.1", randport, "Hello World") + @async String(recv(a)) == "Hello World" + end end - send(b, ip"127.0.0.1", randport, "Hello World") - Base.wait(tsk2) - end - send(b, ip"127.0.0.1", randport, "Hello World") - wait(c) - Base.wait(tsk) - tsk = @async begin - @test begin - (addr,data) = recvfrom(a) - addr == ip"127.0.0.1" && String(data) == "Hello World" - end + tsk = @async send(b, ip"127.0.0.1", randport, "Hello World") + (addr, data) = recvfrom(a) + @test addr == ip"127.0.0.1" && String(data) == "Hello World" + wait(tsk) + close(a) + close(b) end - send(b, ip"127.0.0.1", randport, "Hello World") - Base.wait(tsk) @test_throws MethodError bind(UDPSocket(), randport) - close(a) - close(b) - if !Sys.iswindows() || Sys.windows_version() >= Sys.WINDOWS_VISTA_VER + let a = UDPSocket() b = UDPSocket() bind(a, ip"::1", UInt16(randport)) @@ -279,9 +266,10 @@ end end end send(b, ip"::1", randport, "Hello World") - Base.wait(tsk) + wait(tsk) send(b, ip"::1", randport, "Hello World") - Base.wait(tsk) + wait(tsk) + end end end @@ -339,7 +327,7 @@ end sleep(0.05) end length(recvs_check) > 0 && error("timeout") - map(Base.wait, recvs) + map(wait, recvs) end a, b, c = [create_socket() for i = 1:3] @@ -385,12 +373,12 @@ end # on windows, the kernel fails to do even that # causing the `write` call to freeze # so we end up forced to do a slightly weaker test here - Sys.iswindows() || Base.wait(t) + Sys.iswindows() || wait(t) @test isopen(P) # without an active uv_reader, P shouldn't be closed yet @test !eof(P) # should already know this, @test isopen(P) # so it still shouldn't have an active uv_reader @test readuntil(P, 'w') == "llo" - Sys.iswindows() && Base.wait(t) + Sys.iswindows() && wait(t) @test eof(P) @test !isopen(P) # eof test should have closed this by now close(P) # should be a no-op, just make sure @@ -402,15 +390,10 @@ end # test the method matching connect!(::TCPSocket, ::Sockets.InetAddr{T<:Base.IPAddr}) let addr = Sockets.InetAddr(ip"127.0.0.1", 4444) srv = listen(addr) - c = Condition() - r = @async try; close(accept(srv)); finally; notify(c); end - try - close(connect(addr)) - fetch(c) - finally - close(srv) - end + r = @async close(accept(srv)) + close(connect(addr)) fetch(r) + close(srv) end let addr = Sockets.InetAddr(ip"127.0.0.1", 4444) diff --git a/test/channels.jl b/test/channels.jl index a2dcf2c4ea2cf..91e420e0519d3 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -2,6 +2,16 @@ using Random +@testset "single-threaded Condition usage" begin + a = Condition() + t = @async begin + Base.notify(a, "success") + "finished" + end + @test wait(a) == "success" + @test fetch(t) == "finished" +end + @testset "various constructors" begin c = Channel(1) @test eltype(c) == Any diff --git a/test/read.jl b/test/read.jl index 944b911748b63..8e74e038553bf 100644 --- a/test/read.jl +++ b/test/read.jl @@ -559,9 +559,12 @@ let p = Pipe() t = @async read(p) @sync begin @async write(p, zeros(UInt16, 660_000)) + yield() # TODO: need to add an Event to the previous line + order::UInt16 = 0 for i = 1:typemax(UInt16) - @async write(p, UInt16(i)) + @async (order += 1; write(p, order); nothing) end + yield() # TODO: need to add an Event to the previous line @async close(p.in) end s = reinterpret(UInt16, fetch(t)) diff --git a/test/spawn.jl b/test/spawn.jl index 2840f5ecea030..746ff9de41548 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -58,9 +58,9 @@ out = read(`$echocmd hello` & `$echocmd world`, String) Sys.isunix() && run(pipeline(yescmd, `head`, devnull)) let p = run(pipeline(yescmd, devnull), wait=false) - t = @async !success(p) - kill(p) - @test fetch(t) + t = @async kill(p) + @test !success(p) + wait(t) end if valgrind_off @@ -164,16 +164,19 @@ let r, t t = @async begin try wait(r) - catch + @test false + catch ex + @test isa(ex, InterruptException) end - p = run(`$sleepcmd 1`, wait=false); wait(p) + p = run(`$sleepcmd 1`, wait=false) + wait(p) @test p.exitcode == 0 return true end yield() schedule(t, InterruptException(), error=true) yield() - put!(r,11) + put!(r, 11) yield() @test fetch(t) end diff --git a/test/threads.jl b/test/threads.jl index 7b79b141b660b..b2063f7b244fa 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -2,6 +2,7 @@ using Test using Base.Threads +using Base.Threads: SpinLock, Mutex # threading constructs @@ -90,13 +91,13 @@ function threaded_add_locked(::Type{LockT}, x, n) where LockT end @test threaded_add_locked(SpinLock, 0, 10000) == 10000 -@test threaded_add_locked(RecursiveSpinLock, 0, 10000) == 10000 +@test threaded_add_locked(ReentrantLock, 0, 10000) == 10000 @test threaded_add_locked(Mutex, 0, 10000) == 10000 # Check if the recursive lock can be locked and unlocked correctly. -let critical = RecursiveSpinLock() +let critical = ReentrantLock() @test !islocked(critical) - @test_throws AssertionError unlock(critical) + @test_throws ErrorException("unlock count must match lock count") unlock(critical) @test lock(critical) === nothing @test islocked(critical) @test lock(critical) === nothing @@ -108,12 +109,12 @@ let critical = RecursiveSpinLock() @test islocked(critical) @test unlock(critical) === nothing @test !islocked(critical) - @test_throws AssertionError unlock(critical) + @test_throws ErrorException("unlock count must match lock count") unlock(critical) @test trylock(critical) == true @test islocked(critical) @test unlock(critical) === nothing @test !islocked(critical) - @test_throws AssertionError unlock(critical) + @test_throws ErrorException("unlock count must match lock count") unlock(critical) @test !islocked(critical) end @@ -131,7 +132,7 @@ function threaded_gc_locked(::Type{LockT}) where LockT end threaded_gc_locked(SpinLock) -threaded_gc_locked(Threads.RecursiveSpinLock) +threaded_gc_locked(Threads.ReentrantLock) threaded_gc_locked(Mutex) # Issue 14726 @@ -504,9 +505,10 @@ function test_thread_too_few_iters() end test_thread_too_few_iters() -let e = Event() +let e = Event(), started = Event() done = false - t = @async (wait(e); done = true) + t = @async (notify(started); wait(e); done = true) + wait(started) sleep(0.1) @test done == false notify(e)