Skip to content

Commit

Permalink
Condition/RecursiveLock: add ability to handle threads
Browse files Browse the repository at this point in the history
This extends Condition and RecursiveLock to assert that they may only be used
in the single-threaded case (co-operatively scheduled),
and then adds thread-safe versions of the same
(ConditionMT and RecursiveLockMT).

Unlike the existing thread-safe primitives (Threads.AsyncCondition,
Threads.SpinLock, Threads.Mutex), these new types integrate
with the Task system also, and thus should typically be preferred.
  • Loading branch information
vtjnash committed Nov 19, 2018
1 parent 02d8da2 commit 023bf71
Show file tree
Hide file tree
Showing 19 changed files with 416 additions and 223 deletions.
5 changes: 5 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ New language features

* An *exception stack* is maintained on each task to make exception handling more robust and enable root cause analysis using `catch_stack` ([#28878]).

Multi-threading changes
-----------------------

* The `Condition` and `ReentrantLock` types now come in thread-safe variants formed by adding `MT` to their name.
Single-threaded variants now exist additionally with the `ST` suffix — these are v1.0 defaults ([#TBD]).

Language changes
----------------
Expand Down
156 changes: 146 additions & 10 deletions base/event.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,67 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

## thread/task locking abstraction

"""
AbstractLock
Abstract supertype describing types that
implement the thread-safe synchronization primitives:
[`lock`](@ref), [`trylock`](@ref), [`unlock`](@ref), and [`islocked`](@ref).
"""
abstract type AbstractLock end
function lock end
function unlock end
function trylock end
function islocked end
unlockall(l::AbstractLock) = unlock(l) # internal function for implementing `wait`
relockall(l::AbstractLock, token::Nothing) = lock(l) # internal function for implementing `wait`
assert_havelock(l::AbstractLock) = assert_havelock(l, Threads.threadid())
assert_havelock(l::AbstractLock, tid::Integer) =
(islocked(l) && tid == Threads.threadid()) ? nothing : error("concurrency violation detected")
assert_havelock(l::AbstractLock, tid::Task) =
(islocked(l) && tid === current_task()) ? nothing : error("concurrency violation detected")
assert_havelock(l::AbstractLock, tid::Nothing) = error("concurrency violation detected")

"""
NotALock
A struct that pretends to be always locked on the original thread it was allocated on,
and simply ignores all other interactions.
This can be used in the place of a real lock to, instead, simply and cheaply assert
that the operation is only occurring on a single thread.
And is thus functionally equivalent to allocating a real, recursive lock,
immediately calling `lock` on it, and then never calling a matching `unlock`,
except that calling `lock` from another thread will throw a concurrency violation exception.
"""
struct NotALock <: AbstractLock
ownertid::Int16
NotALock() = new(Threads.threadid())
end
assert_havelock(l::NotALock) = assert_havelock(l, l.ownertid)
lock(l::NotALock) = assert_havelock(l)
unlock(l::NotALock) = assert_havelock(l)
trylock(l::NotALock) = l.ownertid == Threads.threadid()
islocked(::NotALock) = true

"""
CooperativeLock
An optimistic lock, which can be used cheaply to check for missing
lock/unlock guards around `wait`, in the trivial (conflict-free, yield-free, single-threaded, non-recursive) case,
without paying the cost for a full RecursiveLock.
"""
mutable struct CooperativeLock <: AbstractLock
owner::Union{Task, Nothing}
CooperativeLock() = new(nothing)
end
assert_havelock(l::CooperativeLock) = assert_havelock(l, l.owner)
lock(l::CooperativeLock) = (l.owner === nothing || error("concurrency violation detected"); l.owner = current_task(); nothing)
unlock(l::CooperativeLock) = (assert_havelock(l); l.owner = nothing; nothing)
trylock(l::CooperativeLock) = (l.owner === nothing ? (l.owner = current_task(); true) : false)
islocked(l::CooperativeLock) = l.owner !== nothing


## condition variables

"""
Expand All @@ -9,15 +71,26 @@ Create an edge-triggered event source that tasks can wait for. Tasks that call [
`Condition` are suspended and queued. Tasks are woken up when [`notify`](@ref) is later called on
the `Condition`. Edge triggering means that only tasks waiting at the time [`notify`](@ref) is
called can be woken up. For level-triggered notifications, you must keep extra state to keep
track of whether a notification has happened. The [`Channel`](@ref) type does
this, and so can be used for level-triggered events.
track of whether a notification has happened. The [`Channel`](@ref) and [`Event`](@ref) types do
this, and can be used for level-triggered events.
This object is NOT thread-safe. See [`Threads.ConditionMT`](@ref) for a thread-safe version.
"""
mutable struct Condition
mutable struct GenericCondition{L<:AbstractLock}
waitq::Vector{Any}
lock::L

Condition() = new([])
GenericCondition{L}() where {L<:AbstractLock} = new{L}([], L())
GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}([], l)
GenericCondition(l::AbstractLock) = new{typeof(l)}([], l)
end

assert_havelock(c::GenericCondition) = assert_havelock(c.lock)
lock(c::GenericCondition) = lock(c.lock)
unlock(c::GenericCondition) = unlock(c.lock)
trylock(c::GenericCondition) = trylock(c.lock)
islocked(c::GenericCondition) = islocked(c.lock)

"""
wait([x])
Expand All @@ -37,16 +110,19 @@ restarted by an explicit call to [`schedule`](@ref) or [`yieldto`](@ref).
Often `wait` is called within a `while` loop to ensure a waited-for condition is met before
proceeding.
"""
function wait(c::Condition)
function wait(c::GenericCondition)
ct = current_task()

assert_havelock(c)
push!(c.waitq, ct)
token = unlockall(c.lock)

try
return wait()
catch
filter!(x->x!==ct, c.waitq)
rethrow()
finally
relockall(c.lock, token)
end
end

Expand All @@ -59,8 +135,9 @@ is raised as an exception in the woken tasks.
Return the count of tasks woken up. Return 0 if no tasks are waiting on `condition`.
"""
notify(c::Condition, @nospecialize(arg = nothing); all=true, error=false) = notify(c, arg, all, error)
function notify(c::Condition, arg, all, error)
notify(c::GenericCondition, @nospecialize(arg = nothing); all=true, error=false) = notify(c, arg, all, error)
function notify(c::GenericCondition, @nospecialize(arg), all, error)
assert_havelock(c)
cnt = 0
if all
cnt = length(c.waitq)
Expand All @@ -76,9 +153,68 @@ function notify(c::Condition, arg, all, error)
cnt
end

notify_error(c::Condition, err) = notify(c, err, true, true)
notify_error(c::GenericCondition, err) = notify(c, err, true, true)

n_waiters(c::GenericCondition) = length(c.waitq)

"""
isempty(condition)
Return `true` if no tasks are waiting on the condition, `false` otherwise.
"""
isempty(c::GenericCondition) = isempty(c.waitq)


"""
Event()
Create a level-triggered event source. Tasks that call [`wait`](@ref) on an
`Event` are suspended and queued until `notify` is called on the `Event`.
After `notify` is called, the `Event` remains in a signaled state and
tasks will no longer block when waiting for it.
This object is NOT thread-safe. See [`Threads.EventMT`](@ref) for a thread-safe version.
"""
mutable struct GenericEvent{L<:AbstractLock}
notify::GenericCondition{L}
set::Bool
GenericEvent{L}() where {L<:AbstractLock} = new{L}(GenericCondition{L}(), false)
end

function wait(e::GenericEvent)
e.set && return
lock(e.notify)
try
while !e.set
wait(e.notify)
end
finally
unlock(e.notify)
end
nothing
end

function notify(e::GenericEvent)
lock(e.notify)
try
if !e.set
e.set = true
notify(e.notify)
end
finally
unlock(e.notify)
end
nothing
end


const ConditionST = GenericCondition{CooperativeLock}
const EventST = GenericEvent{CooperativeLock}

# default (Julia v1.0) is currently single-threaded
const Condition = GenericCondition{NotALock}
const Event = EventST

n_waiters(c::Condition) = length(c.waitq)

## scheduler and work queue

Expand Down
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ export

# tasks and conditions
Condition,
Event,
current_task,
islocked,
istaskdone,
Expand Down
110 changes: 78 additions & 32 deletions base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,32 @@
"""
ReentrantLock()
Creates a reentrant lock for synchronizing [`Task`](@ref)s.
Creates a re-entrant lock for synchronizing [`Task`](@ref)s.
The same task can acquire the lock as many times as required.
Each [`lock`](@ref) must be matched with an [`unlock`](@ref).
This lock is NOT threadsafe. See [`Threads.Mutex`](@ref) for a threadsafe lock.
This lock is NOT thread-safe. See [`Threads.ReentrantLockMT`](@ref) for a thread-safe version.
"""
mutable struct ReentrantLock
mutable struct GenericReentrantLock{ThreadLock<:AbstractLock} <: AbstractLock
locked_by::Union{Task, Nothing}
cond_wait::Condition
cond_wait::GenericCondition{ThreadLock}
reentrancy_cnt::Int

ReentrantLock() = new(nothing, Condition(), 0)
GenericReentrantLock{ThreadLock}() where {ThreadLock<:AbstractLock} = new(nothing, GenericCondition{ThreadLock}(), 0)
end

# A basic single-threaded, Julia-aware lock:
const ReentrantLockST = GenericReentrantLock{CooperativeLock}
const ReentrantLock = ReentrantLockST # default (Julia v1.0) is currently single-threaded


"""
islocked(lock) -> Status (Boolean)
Check whether the `lock` is held by any task/thread.
This should not be used for synchronization (see instead [`trylock`](@ref)).
"""
function islocked(rl::ReentrantLock)
function islocked(rl::GenericReentrantLock)
return rl.reentrancy_cnt != 0
end

Expand All @@ -38,17 +43,22 @@ return `false`.
Each successful `trylock` must be matched by an [`unlock`](@ref).
"""
function trylock(rl::ReentrantLock)
function trylock(rl::GenericReentrantLock)
t = current_task()
if rl.reentrancy_cnt == 0
rl.locked_by = t
rl.reentrancy_cnt = 1
return true
elseif t == notnothing(rl.locked_by)
rl.reentrancy_cnt += 1
return true
lock(rl.cond_wait)
try
if rl.reentrancy_cnt == 0
rl.locked_by = t
rl.reentrancy_cnt = 1
return true
elseif t == notnothing(rl.locked_by)
rl.reentrancy_cnt += 1
return true
end
return false
finally
unlock(rl.cond_wait)
end
return false
end

"""
Expand All @@ -60,18 +70,23 @@ wait for it to become available.
Each `lock` must be matched by an [`unlock`](@ref).
"""
function lock(rl::ReentrantLock)
function lock(rl::GenericReentrantLock)
t = current_task()
while true
if rl.reentrancy_cnt == 0
rl.locked_by = t
rl.reentrancy_cnt = 1
return
elseif t == notnothing(rl.locked_by)
rl.reentrancy_cnt += 1
return
lock(rl.cond_wait)
try
while true
if rl.reentrancy_cnt == 0
rl.locked_by = t
rl.reentrancy_cnt = 1
return
elseif t == notnothing(rl.locked_by)
rl.reentrancy_cnt += 1
return
end
wait(rl.cond_wait)
end
wait(rl.cond_wait)
finally
unlock(rl.cond_wait)
end
end

Expand All @@ -83,19 +98,49 @@ Releases ownership of the `lock`.
If this is a recursive lock which has been acquired before, decrement an
internal counter and return immediately.
"""
function unlock(rl::ReentrantLock)
if rl.reentrancy_cnt == 0
error("unlock count must match lock count")
function unlock(rl::GenericReentrantLock)
t = current_task()
rl.reentrancy_cnt == 0 && error("unlock count must match lock count")
rl.locked_by == t || error("unlock from wrong thread")
lock(rl.cond_wait)
try
rl.reentrancy_cnt -= 1
if rl.reentrancy_cnt == 0
rl.locked_by = nothing
notify(rl.cond_wait)
end
finally
unlock(rl.cond_wait)
end
rl.reentrancy_cnt -= 1
if rl.reentrancy_cnt == 0
return
end

function unlockall(rl::GenericReentrantLock)
t = current_task()
n = rl.reentrancy_cnt
rl.locked_by == t || error("unlock from wrong thread")
n == 0 && error("unlock count must match lock count")
lock(rl.cond_wait)
try
rl.reentrancy_cnt == 0
rl.locked_by = nothing
notify(rl.cond_wait)
finally
unlock(rl.cond_wait)
end
return n
end

function relockall(rl::GenericReentrantLock, n::Int)
t = current_task()
lock(rl)
n1 = rl.reentrancy_cnt
rl.reentrancy_cnt = n
n1 == 1 || error("concurrency violation detected")
return
end

function lock(f, l)
function lock(f, l::AbstractLock)
lock(l)
try
return f()
Expand All @@ -104,7 +149,7 @@ function lock(f, l)
end
end

function trylock(f, l)
function trylock(f, l::AbstractLock)
if trylock(l)
try
return f()
Expand Down Expand Up @@ -159,4 +204,5 @@ function release(s::Semaphore)
@assert s.curr_cnt > 0 "release count must match acquire count"
s.curr_cnt -= 1
notify(s.cond_wait; all=false)
return
end
Loading

0 comments on commit 023bf71

Please sign in to comment.