diff --git a/base/event.jl b/base/event.jl index 1a5e12d321316..e3888529ca43a 100644 --- a/base/event.jl +++ b/base/event.jl @@ -92,7 +92,7 @@ called can be woken up. For level-triggered notifications, you must keep extra s track of whether a notification has happened. The [`Channel`](@ref) and [`Event`](@ref) types do this, and can be used for level-triggered events. -This object is NOT thread-safe. See [`Threads.ConditionMT`](@ref) for a thread-safe version. +This object is NOT thread-safe. See [`Threads.Condition`](@ref) for a thread-safe version. """ struct GenericCondition{L<:AbstractLock} waitq::Vector{Any} @@ -183,52 +183,9 @@ Return `true` if no tasks are waiting on the condition, `false` otherwise. isempty(c::GenericCondition) = isempty(c.waitq) -""" - Event() - -Create a level-triggered event source. Tasks that call [`wait`](@ref) on an -`Event` are suspended and queued until `notify` is called on the `Event`. -After `notify` is called, the `Event` remains in a signaled state and -tasks will no longer block when waiting for it. - -!!! compat "Julia 1.1" - This functionality requires at least Julia 1.1. -""" -mutable struct GenericEvent{L<:AbstractLock} - notify::GenericCondition{L} - set::Bool - GenericEvent{L}() where {L<:AbstractLock} = new{L}(GenericCondition{L}(), false) -end - -function wait(e::GenericEvent) - e.set && return - lock(e.notify) - try - while !e.set - wait(e.notify) - end - finally - unlock(e.notify) - end - nothing -end - -function notify(e::GenericEvent) - lock(e.notify) - try - if !e.set - e.set = true - notify(e.notify) - end - finally - unlock(e.notify) - end - nothing -end - - -const ConditionST = GenericCondition{AlwaysLockedST} -const EventST = GenericEvent{CooperativeLock} +# default (Julia v1.0) is currently single-threaded +# (although it uses MT-safe versions, when possible) +const Condition = GenericCondition{AlwaysLockedST} ## scheduler and work queue @@ -433,11 +390,11 @@ Use [`isopen`](@ref) to check whether it is still active. """ mutable struct AsyncCondition handle::Ptr{Cvoid} - cond::ConditionST + cond::Condition isopen::Bool function AsyncCondition() - this = new(Libc.malloc(_sizeof_uv_async), ConditionST(), true) + this = new(Libc.malloc(_sizeof_uv_async), Condition(), true) associate_julia_struct(this.handle, this) finalizer(uvfinalize, this) err = ccall(:uv_async_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), @@ -491,14 +448,14 @@ to check whether a timer is still active. """ mutable struct Timer handle::Ptr{Cvoid} - cond::ConditionST + cond::Condition isopen::Bool function Timer(timeout::Real; interval::Real = 0.0) timeout ≥ 0 || throw(ArgumentError("timer cannot have negative timeout of $timeout seconds")) interval ≥ 0 || throw(ArgumentError("timer cannot have negative repeat interval of $interval seconds")) - this = new(Libc.malloc(_sizeof_uv_timer), ConditionST(), true) + this = new(Libc.malloc(_sizeof_uv_timer), Condition(), true) err = ccall(:uv_timer_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), this) if err != 0 #TODO: this codepath is currently not tested diff --git a/base/lock.jl b/base/lock.jl index 23fe2114a0070..b19ac8bd67f2b 100644 --- a/base/lock.jl +++ b/base/lock.jl @@ -8,17 +8,14 @@ Creates a re-entrant lock for synchronizing [`Task`](@ref)s. The same task can acquire the lock as many times as required. Each [`lock`](@ref) must be matched with an [`unlock`](@ref). """ -mutable struct GenericReentrantLock{ThreadLock<:AbstractLock} <: AbstractLock +mutable struct ReentrantLock <: AbstractLock locked_by::Union{Task, Nothing} - cond_wait::GenericCondition{ThreadLock} + cond_wait::GenericCondition{Threads.SpinLock} reentrancy_cnt::Int - GenericReentrantLock{ThreadLock}() where {ThreadLock<:AbstractLock} = new(nothing, GenericCondition{ThreadLock}(), 0) + ReentrantLock() = new(nothing, GenericCondition{Threads.SpinLock}(), 0) end -# A basic single-threaded, Julia-aware lock: -const ReentrantLockST = GenericReentrantLock{CooperativeLock} - """ islocked(lock) -> Status (Boolean) @@ -26,7 +23,7 @@ const ReentrantLockST = GenericReentrantLock{CooperativeLock} Check whether the `lock` is held by any task/thread. This should not be used for synchronization (see instead [`trylock`](@ref)). """ -function islocked(rl::GenericReentrantLock) +function islocked(rl::ReentrantLock) return rl.reentrancy_cnt != 0 end @@ -40,7 +37,7 @@ return `false`. Each successful `trylock` must be matched by an [`unlock`](@ref). """ -function trylock(rl::GenericReentrantLock) +function trylock(rl::ReentrantLock) t = current_task() lock(rl.cond_wait) try @@ -67,7 +64,7 @@ wait for it to become available. Each `lock` must be matched by an [`unlock`](@ref). """ -function lock(rl::GenericReentrantLock) +function lock(rl::ReentrantLock) t = current_task() lock(rl.cond_wait) try @@ -95,7 +92,7 @@ Releases ownership of the `lock`. If this is a recursive lock which has been acquired before, decrement an internal counter and return immediately. """ -function unlock(rl::GenericReentrantLock) +function unlock(rl::ReentrantLock) t = current_task() rl.reentrancy_cnt == 0 && error("unlock count must match lock count") rl.locked_by == t || error("unlock from wrong thread") @@ -112,7 +109,7 @@ function unlock(rl::GenericReentrantLock) return end -function unlockall(rl::GenericReentrantLock) +function unlockall(rl::ReentrantLock) t = current_task() n = rl.reentrancy_cnt rl.locked_by == t || error("unlock from wrong thread") @@ -128,7 +125,7 @@ function unlockall(rl::GenericReentrantLock) return n end -function relockall(rl::GenericReentrantLock, n::Int) +function relockall(rl::ReentrantLock, n::Int) t = current_task() lock(rl) n1 = rl.reentrancy_cnt @@ -157,20 +154,33 @@ function trylock(f, l::AbstractLock) return false end +@eval Threads begin + const Condition = Base.GenericCondition{ReentrantLock} + + """ + Special note for [`Threads.Condition`](@ref): + + The caller must be holding the [`lock`](@ref) that owns `c` before calling this method. + The calling task will be blocked until some other task wakes it, + usually by calling [`notify`](@ref)` on the same Condition object. + The lock will be atomically released when blocking (even if it was locked recursively), + and will be reacquired before returning. + """ + wait(c::Threads.Condition) +end + """ Semaphore(sem_size) Create a counting semaphore that allows at most `sem_size` acquires to be in use at any time. Each acquire must be matched with a release. - -This construct is NOT threadsafe. """ mutable struct Semaphore sem_size::Int curr_cnt::Int - cond_wait::ConditionST - Semaphore(sem_size) = sem_size > 0 ? new(sem_size, 0, ConditionST()) : throw(ArgumentError("Semaphore size must be > 0")) + cond_wait::Threads.Condition + Semaphore(sem_size) = sem_size > 0 ? new(sem_size, 0, Threads.Condition()) : throw(ArgumentError("Semaphore size must be > 0")) end """ @@ -180,14 +190,16 @@ Wait for one of the `sem_size` permits to be available, blocking until one can be acquired. """ function acquire(s::Semaphore) - while true - if s.curr_cnt < s.sem_size - s.curr_cnt = s.curr_cnt + 1 - return - else + lock(s.cond_wait) + try + while s.curr_cnt >= s.sem_size wait(s.cond_wait) end + s.curr_cnt = s.curr_cnt + 1 + finally + unlock(s.cond_wait) end + return end """ @@ -198,8 +210,57 @@ possibly allowing another task to acquire it and resume execution. """ function release(s::Semaphore) - @assert s.curr_cnt > 0 "release count must match acquire count" - s.curr_cnt -= 1 - notify(s.cond_wait; all=false) + lock(s.cond_wait) + try + s.curr_cnt > 0 || error("release count must match acquire count") + s.curr_cnt -= 1 + notify(s.cond_wait; all=false) + finally + unlock(s.cond_wait) + end return end + + +""" + Event() + +Create a level-triggered event source. Tasks that call [`wait`](@ref) on an +`Event` are suspended and queued until `notify` is called on the `Event`. +After `notify` is called, the `Event` remains in a signaled state and +tasks will no longer block when waiting for it. + +!!! compat "Julia 1.1" + This functionality requires at least Julia 1.1. +""" +mutable struct Event + notify::Threads.Condition + set::Bool + Event() = new(Threads.Condition(), false) +end + +function wait(e::Event) + e.set && return + lock(e.notify) + try + while !e.set + wait(e.notify) + end + finally + unlock(e.notify) + end + nothing +end + +function notify(e::Event) + lock(e.notify) + try + if !e.set + e.set = true + notify(e.notify) + end + finally + unlock(e.notify) + end + nothing +end diff --git a/base/locks-mt.jl b/base/locks-mt.jl index 09c49cf08c55f..40c8ce288aef5 100644 --- a/base/locks-mt.jl +++ b/base/locks-mt.jl @@ -2,13 +2,7 @@ import .Base: _uv_hook_close, unsafe_convert, lock, trylock, unlock, islocked, wait, notify, - AbstractLock, GenericCondition, GenericReentrantLock, GenericEvent - -export ConditionMT, EventMT, ReentrantLockMT - -# Important Note: these low-level primitives exported here -# are typically not for general usage -export SpinLock, RecursiveSpinLock, Mutex + AbstractLock ########################################## # Atomic Locks @@ -17,16 +11,6 @@ export SpinLock, RecursiveSpinLock, Mutex # Test-and-test-and-set spin locks are quickest up to about 30ish # contending threads. If you have more contention than that, perhaps # a lock is the wrong way to synchronize. -""" - TatasLock() - -See [`SpinLock`](@ref). -""" -struct TatasLock <: AbstractLock - handle::Atomic{Int} - TatasLock() = new(Atomic{Int}(0)) -end - """ SpinLock() @@ -38,14 +22,15 @@ Test-and-test-and-set spin locks are quickest up to about 30ish contending threads. If you have more contention than that, perhaps a lock is the wrong way to synchronize. -See also [`RecursiveSpinLock`](@ref) for a version that permits recursion. - See also [`Mutex`](@ref) for a more efficient version on one core or if the lock may be held for a considerable length of time. """ -const SpinLock = TatasLock +struct SpinLock <: AbstractLock + handle::Atomic{Int} + SpinLock() = new(Atomic{Int}(0)) +end -function lock(l::TatasLock) +function lock(l::SpinLock) while true if l.handle[] == 0 p = atomic_xchg!(l.handle, 1) @@ -59,116 +44,20 @@ function lock(l::TatasLock) end end -function trylock(l::TatasLock) +function trylock(l::SpinLock) if l.handle[] == 0 return atomic_xchg!(l.handle, 1) == 0 end return false end -function unlock(l::TatasLock) - l.handle[] = 0 - ccall(:jl_cpu_wake, Cvoid, ()) - return -end - -function islocked(l::TatasLock) - return l.handle[] != 0 -end - - -""" - RecursiveTatasLock() - -See [`RecursiveSpinLock`](@ref). -""" -struct RecursiveTatasLock <: AbstractLock - ownertid::Atomic{Int16} - handle::Atomic{Int} - RecursiveTatasLock() = new(Atomic{Int16}(0), Atomic{Int}(0)) -end - -""" - RecursiveSpinLock() - -Creates a reentrant lock. -The same thread can acquire the lock as many times as required. -Each [`lock`](@ref) must be matched with an [`unlock`](@ref). - -See also [`SpinLock`](@ref) for a slightly faster version. - -See also [`Mutex`](@ref) for a more efficient version on one core or if the lock -may be held for a considerable length of time. -""" -const RecursiveSpinLock = RecursiveTatasLock - - -function lock(l::RecursiveTatasLock) - if l.ownertid[] == threadid() - l.handle[] += 1 - return - end - while true - if l.handle[] == 0 - if atomic_cas!(l.handle, 0, 1) == 0 - l.ownertid[] = threadid() - return - end - end - ccall(:jl_cpu_pause, Cvoid, ()) - # Temporary solution before we have gc transition support in codegen. - ccall(:jl_gc_safepoint, Cvoid, ()) - end -end - -function trylock(l::RecursiveTatasLock) - if l.ownertid[] == threadid() - l.handle[] += 1 - return true - end - if l.handle[] == 0 - if atomic_cas!(l.handle, 0, 1) == 0 - l.ownertid[] = threadid() - return true - end - return false - end - return false -end - -function unlock(l::RecursiveTatasLock) - l.ownertid[] == threadid() || error("unlock from wrong thread") - n = l.handle[] - n != 0 || error("unlock count must match lock count") - if l.handle[] == 1 - l.ownertid[] = 0 - l.handle[] = 0 - ccall(:jl_cpu_wake, Cvoid, ()) - else - l.handle[] = n - 1 - end - return -end - -function unlockall(l::RecursiveTatasLock) - l.ownertid[] == threadid() || error("unlock from wrong thread") - n = l.handle[] - n != 0 || error("unlock count must match lock count") - l.ownertid[] = 0 +function unlock(l::SpinLock) l.handle[] = 0 ccall(:jl_cpu_wake, Cvoid, ()) - return n -end - -function relockall(l::RecursiveTatasLock, n::Int) - lock(l) - n1 = l.handle[] - l.handle[] = n - n1 == 1 || error("concurrency violation detected") return end -function islocked(l::RecursiveTatasLock) +function islocked(l::SpinLock) return l.handle[] != 0 end @@ -243,44 +132,3 @@ end function islocked(m::Mutex) return m.ownertid != 0 end - -""" - ReentrantLockMT() - -A thread-safe version of [`ReentrantLock`](@ref). - -!!! compat "Julia 1.1" - This functionality requires at least Julia 1.1. -""" -const ReentrantLockMT = GenericReentrantLock{TatasLock} - -""" - ConditionMT([lock-mt]) - -A thread-safe version of [`Condition`](@ref). - -!!! compat "Julia 1.1" - This functionality requires at least Julia 1.1. -""" -const ConditionMT = GenericCondition{ReentrantLockMT} - -""" - EventMT() - -A thread-safe version of [`Event`](@ref). - -!!! compat "Julia 1.1" - This functionality requires at least Julia 1.1. -""" -const EventMT = GenericEvent{ReentrantLockMT} - -""" -Special note for [`Threads.ConditionMT`](@ref): - -The caller must be holding the [`lock`](@ref) that owns `c` before calling this method. -The calling task will be blocked until some other task wakes it, -usually by calling [`notify`](@ref)` on the same ConditionMT object. -The lock will be atomically released when blocking (even if it was locked recursively), -and will be reacquired before returning. -""" -wait(c::ConditionMT) diff --git a/base/sysimg.jl b/base/sysimg.jl index 42baa8e2f2899..48920935e95d4 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -314,18 +314,10 @@ include("env.jl") include("libuv.jl") include("event.jl") include("task.jl") -include("lock.jl") include("threads.jl") +include("lock.jl") include("weakkeydict.jl") -# default (Julia v1.0) is currently single-threaded -const Condition = GenericCondition{Union{AlwaysLockedST, Threads.ReentrantLockMT}} -Condition() = Condition(AlwaysLockedST()) -Condition(threadsafe::Bool) = Condition(threadsafe ? Threads.ReentrantLockMT() : AlwaysLockedST()) -# but uses MT-safe versions, when possible -const ReentrantLock = Threads.ReentrantLockMT -const Event = Threads.EventMT - # Logging include("logging.jl") using .CoreLogging diff --git a/base/threadcall.jl b/base/threadcall.jl index 1dd48b4097f7a..4754afec0d6ac 100644 --- a/base/threadcall.jl +++ b/base/threadcall.jl @@ -1,7 +1,7 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license const max_ccall_threads = parse(Int, get(ENV, "UV_THREADPOOL_SIZE", "4")) -const thread_notifiers = Union{Condition, Nothing}[nothing for i in 1:max_ccall_threads] +const thread_notifiers = Union{Base.Condition, Nothing}[nothing for i in 1:max_ccall_threads] const threadcall_restrictor = Semaphore(max_ccall_threads) """ @@ -81,7 +81,7 @@ function do_threadcall(fun_ptr::Ptr{Cvoid}, rettype::Type, argtypes::Vector, arg # wait for a worker thread to be available acquire(threadcall_restrictor) idx = findfirst(isequal(nothing), thread_notifiers)::Int - thread_notifiers[idx] = Condition() + thread_notifiers[idx] = Base.Condition() GC.@preserve args_arr ret_arr roots begin # queue up the work to be done diff --git a/base/threads.jl b/base/threads.jl index c5332ca9458e2..292513418525b 100644 --- a/base/threads.jl +++ b/base/threads.jl @@ -5,6 +5,8 @@ Experimental multithreading support. """ module Threads +global Condition # we'll define this later, make sure we don't import Base.Condition + include("threadingconstructs.jl") include("atomics.jl") include("locks-mt.jl") diff --git a/base/weakkeydict.jl b/base/weakkeydict.jl index 09199e6ce5c40..4b6dab33aca54 100644 --- a/base/weakkeydict.jl +++ b/base/weakkeydict.jl @@ -14,12 +14,12 @@ See [`Dict`](@ref) for further help. Note, unlike [`Dict`](@ref), """ mutable struct WeakKeyDict{K,V} <: AbstractDict{K,V} ht::Dict{WeakRef,V} - lock::Threads.RecursiveSpinLock + lock::ReentrantLock finalizer::Function # Constructors mirror Dict's function WeakKeyDict{K,V}() where V where K - t = new(Dict{Any,V}(), Threads.RecursiveSpinLock(), identity) + t = new(Dict{Any,V}(), ReentrantLock(), identity) t.finalizer = function (k) # when a weak key is finalized, remove from dictionary if it is still there if islocked(t) diff --git a/test/threads.jl b/test/threads.jl index 5f571845f8536..b2063f7b244fa 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -2,6 +2,7 @@ using Test using Base.Threads +using Base.Threads: SpinLock, Mutex # threading constructs @@ -90,13 +91,13 @@ function threaded_add_locked(::Type{LockT}, x, n) where LockT end @test threaded_add_locked(SpinLock, 0, 10000) == 10000 -@test threaded_add_locked(RecursiveSpinLock, 0, 10000) == 10000 +@test threaded_add_locked(ReentrantLock, 0, 10000) == 10000 @test threaded_add_locked(Mutex, 0, 10000) == 10000 # Check if the recursive lock can be locked and unlocked correctly. -let critical = RecursiveSpinLock() +let critical = ReentrantLock() @test !islocked(critical) - @test_throws ErrorException("unlock from wrong thread") unlock(critical) + @test_throws ErrorException("unlock count must match lock count") unlock(critical) @test lock(critical) === nothing @test islocked(critical) @test lock(critical) === nothing @@ -108,12 +109,12 @@ let critical = RecursiveSpinLock() @test islocked(critical) @test unlock(critical) === nothing @test !islocked(critical) - @test_throws ErrorException("unlock from wrong thread") unlock(critical) + @test_throws ErrorException("unlock count must match lock count") unlock(critical) @test trylock(critical) == true @test islocked(critical) @test unlock(critical) === nothing @test !islocked(critical) - @test_throws ErrorException("unlock from wrong thread") unlock(critical) + @test_throws ErrorException("unlock count must match lock count") unlock(critical) @test !islocked(critical) end @@ -131,7 +132,7 @@ function threaded_gc_locked(::Type{LockT}) where LockT end threaded_gc_locked(SpinLock) -threaded_gc_locked(Threads.RecursiveSpinLock) +threaded_gc_locked(Threads.ReentrantLock) threaded_gc_locked(Mutex) # Issue 14726