Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Condition/RecursiveLock: add ability to handle threads #30061

Merged
merged 1 commit into from
Dec 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ New language features
* The `extrema` function now accepts a function argument in the same manner as `minimum` and
`maximum` ([#30323]).

Multi-threading changes
-----------------------

* The `Condition` type now has a thread-safe replacement, accessed as `Threads.Condition`.
With that addition, task scheduling primitives such as `ReentrantLock` are now thread-safe ([#30061]).

Language changes
----------------

Expand Down
117 changes: 99 additions & 18 deletions base/event.jl
Original file line number Diff line number Diff line change
@@ -1,23 +1,75 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

## thread/task locking abstraction

"""
AbstractLock

Abstract supertype describing types that
implement the synchronization primitives:
[`lock`](@ref), [`trylock`](@ref), [`unlock`](@ref), and [`islocked`](@ref).
"""
abstract type AbstractLock end
function lock end
function unlock end
function trylock end
function islocked end
unlockall(l::AbstractLock) = unlock(l) # internal function for implementing `wait`
relockall(l::AbstractLock, token::Nothing) = lock(l) # internal function for implementing `wait`
assert_havelock(l::AbstractLock) = assert_havelock(l, Threads.threadid())
assert_havelock(l::AbstractLock, tid::Integer) =
(islocked(l) && tid == Threads.threadid()) ? nothing : error("concurrency violation detected")
assert_havelock(l::AbstractLock, tid::Task) =
(islocked(l) && tid === current_task()) ? nothing : error("concurrency violation detected")
assert_havelock(l::AbstractLock, tid::Nothing) = error("concurrency violation detected")

"""
AlwaysLockedST
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be the only case where the original *ST and *MT naming has remained in the merged PR. Was that intentional or was this just overlooked?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there aren't any MT and this type is only meant to be used internally. We could rename it though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that's what I was thinking—just calling it AlwaysLocked would be good.


This struct does not implement a real lock, but instead
pretends to be always locked on the original thread it was allocated on,
and simply ignores all other interactions.
It also does not synchronize tasks; for that use a real lock such as [`RecursiveLock`](@ref).
This can be used in the place of a real lock to, instead, simply and cheaply assert
that the operation is only occurring on a single cooperatively-scheduled thread.
It is thus functionally equivalent to allocating a real, recursive, task-unaware lock
immediately calling `lock` on it, and then never calling a matching `unlock`,
except that calling `lock` from another thread will throw a concurrency violation exception.
"""
struct AlwaysLockedST <: AbstractLock
ownertid::Int16
AlwaysLockedST() = new(Threads.threadid())
end
assert_havelock(l::AlwaysLockedST) = assert_havelock(l, l.ownertid)
lock(l::AlwaysLockedST) = assert_havelock(l)
unlock(l::AlwaysLockedST) = assert_havelock(l)
trylock(l::AlwaysLockedST) = l.ownertid == Threads.threadid()
islocked(::AlwaysLockedST) = true


## condition variables

"""
Condition()
GenericCondition

Create an edge-triggered event source that tasks can wait for. Tasks that call [`wait`](@ref) on a
`Condition` are suspended and queued. Tasks are woken up when [`notify`](@ref) is later called on
the `Condition`. Edge triggering means that only tasks waiting at the time [`notify`](@ref) is
called can be woken up. For level-triggered notifications, you must keep extra state to keep
track of whether a notification has happened. The [`Channel`](@ref) type does
this, and so can be used for level-triggered events.
Abstract implementation of a condition object
for synchonizing tasks objects with a given lock.
"""
vtjnash marked this conversation as resolved.
Show resolved Hide resolved
mutable struct Condition
struct GenericCondition{L<:AbstractLock}
waitq::Vector{Any}
lock::L

Condition() = new([])
GenericCondition{L}() where {L<:AbstractLock} = new{L}([], L())
GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}([], l)
GenericCondition(l::AbstractLock) = new{typeof(l)}([], l)
end

assert_havelock(c::GenericCondition) = assert_havelock(c.lock)
lock(c::GenericCondition) = lock(c.lock)
unlock(c::GenericCondition) = unlock(c.lock)
trylock(c::GenericCondition) = trylock(c.lock)
islocked(c::GenericCondition) = islocked(c.lock)

"""
wait([x])

Expand All @@ -37,16 +89,19 @@ restarted by an explicit call to [`schedule`](@ref) or [`yieldto`](@ref).
Often `wait` is called within a `while` loop to ensure a waited-for condition is met before
proceeding.
"""
function wait(c::Condition)
function wait(c::GenericCondition)
ct = current_task()

assert_havelock(c)
push!(c.waitq, ct)
token = unlockall(c.lock)

try
return wait()
catch
filter!(x->x!==ct, c.waitq)
rethrow()
finally
relockall(c.lock, token)
end
end

Expand All @@ -59,26 +114,52 @@ is raised as an exception in the woken tasks.

Return the count of tasks woken up. Return 0 if no tasks are waiting on `condition`.
"""
notify(c::Condition, @nospecialize(arg = nothing); all=true, error=false) = notify(c, arg, all, error)
function notify(c::Condition, arg, all, error)
notify(c::GenericCondition, @nospecialize(arg = nothing); all=true, error=false) = notify(c, arg, all, error)
function notify(c::GenericCondition, @nospecialize(arg), all, error)
assert_havelock(c)
cnt = 0
if all
cnt = length(c.waitq)
for t in c.waitq
error ? schedule(t, arg, error=error) : schedule(t, arg)
schedule(t, arg, error=error)
end
empty!(c.waitq)
elseif !isempty(c.waitq)
cnt = 1
t = popfirst!(c.waitq)
error ? schedule(t, arg, error=error) : schedule(t, arg)
schedule(t, arg, error=error)
end
cnt
return cnt
end

notify_error(c::Condition, err) = notify(c, err, true, true)
notify_error(c::GenericCondition, err) = notify(c, err, true, true)

n_waiters(c::GenericCondition) = length(c.waitq)

"""
isempty(condition)

Return `true` if no tasks are waiting on the condition, `false` otherwise.
"""
isempty(c::GenericCondition) = isempty(c.waitq)


# default (Julia v1.0) is currently single-threaded
# (although it uses MT-safe versions, when possible)
"""
Condition()

Create an edge-triggered event source that tasks can wait for. Tasks that call [`wait`](@ref) on a
`Condition` are suspended and queued. Tasks are woken up when [`notify`](@ref) is later called on
the `Condition`. Edge triggering means that only tasks waiting at the time [`notify`](@ref) is
called can be woken up. For level-triggered notifications, you must keep extra state to keep
track of whether a notification has happened. The [`Channel`](@ref) and [`Event`](@ref) types do
this, and can be used for level-triggered events.

This object is NOT thread-safe. See [`Threads.Condition`](@ref) for a thread-safe version.
"""
const Condition = GenericCondition{AlwaysLockedST}

n_waiters(c::Condition) = length(c.waitq)

## scheduler and work queue

Expand Down
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ export

# tasks and conditions
Condition,
Event,
current_task,
islocked,
istaskdone,
Expand Down
Loading