Skip to content

Commit

Permalink
Condition/RecursiveLock: add ability to handle threads
Browse files Browse the repository at this point in the history
This extends Condition to assert that it may only be used
in the single-threaded case (co-operatively scheduled),
and then adds a thread-safe version of the same:
Threads.Condition.

Additionally, it also upgrades ReentrantLock, etc. to be thread-safe.

fixup: undo previous commit. remove ST versions of Lock, Event, Semaphore. introduce new Threads.Condition
  • Loading branch information
vtjnash committed Dec 12, 2018
1 parent d767fcf commit eaa8cf1
Show file tree
Hide file tree
Showing 23 changed files with 545 additions and 460 deletions.
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ New language features
and values ([#29733]).
* Binary `~` can now be dotted, as in `x .~ y` ([#30341]).

Multi-threading changes
-----------------------

* The `Condition` type now has a thread-safe replacement, accessed as `Threads.Condition`.
With that addition, task scheduling primitives such as `ReentrantLock` are now thread-safe ([#30061]).

Language changes
----------------

Expand Down
134 changes: 121 additions & 13 deletions base/event.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,85 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

## thread/task locking abstraction

"""
AbstractLock
Abstract supertype describing types that
implement the synchronization primitives:
[`lock`](@ref), [`trylock`](@ref), [`unlock`](@ref), and [`islocked`](@ref).
"""
abstract type AbstractLock end
function lock end
function unlock end
function trylock end
function islocked end
unlockall(l::AbstractLock) = unlock(l) # internal function for implementing `wait`
relockall(l::AbstractLock, token::Nothing) = lock(l) # internal function for implementing `wait`
assert_havelock(l::AbstractLock) = assert_havelock(l, Threads.threadid())
assert_havelock(l::AbstractLock, tid::Integer) =
(islocked(l) && tid == Threads.threadid()) ? nothing : error("concurrency violation detected")
assert_havelock(l::AbstractLock, tid::Task) =
(islocked(l) && tid === current_task()) ? nothing : error("concurrency violation detected")
assert_havelock(l::AbstractLock, tid::Nothing) = error("concurrency violation detected")

"""
AlwaysLockedST
This struct does not implement a real lock, but instead
pretends to be always locked on the original thread it was allocated on,
and simply ignores all other interactions.
It also does not synchronize tasks; for that use a [`CooperativeLock`](@ref) or a
real lock such as [`RecursiveLock`](@ref).
This can be used in the place of a real lock to, instead, simply and cheaply assert
that the operation is only occurring on a single thread.
And is thus functionally equivalent to allocating a real, recursive, task-unaware lock
immediately calling `lock` on it, and then never calling a matching `unlock`,
except that calling `lock` from another thread will throw a concurrency violation exception.
"""
struct AlwaysLockedST <: AbstractLock
ownertid::Int16
AlwaysLockedST() = new(Threads.threadid())
end
assert_havelock(l::AlwaysLockedST) = assert_havelock(l, l.ownertid)
lock(l::AlwaysLockedST) = assert_havelock(l)
unlock(l::AlwaysLockedST) = assert_havelock(l)
trylock(l::AlwaysLockedST) = l.ownertid == Threads.threadid()
islocked(::AlwaysLockedST) = true

"""
CooperativeLock
An optimistic lock for cooperative tasks, which can be used cheaply to check for missing
lock/unlock guards around `wait`, in the trivial (conflict-free, yield-free, single-threaded, non-recursive) case,
without paying the cost for a full RecursiveLock.
"""
mutable struct CooperativeLock <: AbstractLock
owner::Union{Task, Nothing}
CooperativeLock() = new(nothing)
end
assert_havelock(l::CooperativeLock) = assert_havelock(l, l.owner)
function lock(l::CooperativeLock)
l.owner === nothing || error("concurrency violation detected")
l.owner = current_task()
nothing
end
function unlock(l::CooperativeLock)
assert_havelock(l)
l.owner = nothing
nothing
end
function trylock(l::CooperativeLock)
if l.owner === nothing
l.owner = current_task()
return true
else
return false
end
end
islocked(l::CooperativeLock) = l.owner !== nothing


## condition variables

"""
Expand All @@ -9,15 +89,26 @@ Create an edge-triggered event source that tasks can wait for. Tasks that call [
`Condition` are suspended and queued. Tasks are woken up when [`notify`](@ref) is later called on
the `Condition`. Edge triggering means that only tasks waiting at the time [`notify`](@ref) is
called can be woken up. For level-triggered notifications, you must keep extra state to keep
track of whether a notification has happened. The [`Channel`](@ref) type does
this, and so can be used for level-triggered events.
track of whether a notification has happened. The [`Channel`](@ref) and [`Event`](@ref) types do
this, and can be used for level-triggered events.
This object is NOT thread-safe. See [`Threads.Condition`](@ref) for a thread-safe version.
"""
mutable struct Condition
struct GenericCondition{L<:AbstractLock}
waitq::Vector{Any}
lock::L

Condition() = new([])
GenericCondition{L}() where {L<:AbstractLock} = new{L}([], L())
GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}([], l)
GenericCondition(l::AbstractLock) = new{typeof(l)}([], l)
end

assert_havelock(c::GenericCondition) = assert_havelock(c.lock)
lock(c::GenericCondition) = lock(c.lock)
unlock(c::GenericCondition) = unlock(c.lock)
trylock(c::GenericCondition) = trylock(c.lock)
islocked(c::GenericCondition) = islocked(c.lock)

"""
wait([x])
Expand All @@ -37,16 +128,19 @@ restarted by an explicit call to [`schedule`](@ref) or [`yieldto`](@ref).
Often `wait` is called within a `while` loop to ensure a waited-for condition is met before
proceeding.
"""
function wait(c::Condition)
function wait(c::GenericCondition)
ct = current_task()

assert_havelock(c)
push!(c.waitq, ct)
token = unlockall(c.lock)

try
return wait()
catch
filter!(x->x!==ct, c.waitq)
rethrow()
finally
relockall(c.lock, token)
end
end

Expand All @@ -59,26 +153,40 @@ is raised as an exception in the woken tasks.
Return the count of tasks woken up. Return 0 if no tasks are waiting on `condition`.
"""
notify(c::Condition, @nospecialize(arg = nothing); all=true, error=false) = notify(c, arg, all, error)
function notify(c::Condition, arg, all, error)
notify(c::GenericCondition, @nospecialize(arg = nothing); all=true, error=false) = notify(c, arg, all, error)
function notify(c::GenericCondition, @nospecialize(arg), all, error)
assert_havelock(c)
cnt = 0
if all
cnt = length(c.waitq)
for t in c.waitq
error ? schedule(t, arg, error=error) : schedule(t, arg)
schedule(t, arg, error=error)
end
empty!(c.waitq)
elseif !isempty(c.waitq)
cnt = 1
t = popfirst!(c.waitq)
error ? schedule(t, arg, error=error) : schedule(t, arg)
schedule(t, arg, error=error)
end
cnt
return cnt
end

notify_error(c::Condition, err) = notify(c, err, true, true)
notify_error(c::GenericCondition, err) = notify(c, err, true, true)

n_waiters(c::GenericCondition) = length(c.waitq)

"""
isempty(condition)
Return `true` if no tasks are waiting on the condition, `false` otherwise.
"""
isempty(c::GenericCondition) = isempty(c.waitq)


# default (Julia v1.0) is currently single-threaded
# (although it uses MT-safe versions, when possible)
const Condition = GenericCondition{AlwaysLockedST}

n_waiters(c::Condition) = length(c.waitq)

## scheduler and work queue

Expand Down
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ export

# tasks and conditions
Condition,
Event,
current_task,
islocked,
istaskdone,
Expand Down
Loading

0 comments on commit eaa8cf1

Please sign in to comment.