Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reboot: Update Select.jl for multithreaded, modern Julia #1

Merged
merged 22 commits into from
Sep 4, 2019
Merged

Conversation

NHDaly
Copy link
Owner

@NHDaly NHDaly commented Sep 4, 2019

This PR updates the code from https://github.com/durcan/Select.jl, originally based on @malmaud's julep (JuliaLang/julia#13763) for julia 1.3+, for a working, multithreaded @select macro.

I kept the syntax unchanged from the original repo, but updated to modern julia and fixed some concurrency synchronization bugs I encountered along the way.

  • Update to Julia 1.0 (this was written in 2015, during (i think) v0.4)
  • Update to be aware of multithreading, and still perform correctly (e.g. locking when selecting one of the tasks)
  • Fix deadlock when self-synchronizing between sibling clauses (e.g first clause puts, next clause takes).
  • Test the working behavior for both Channels and Tasks
    • (Leaving other waitable types for future work: Conditions, Futures, etc)

The hardest fix was fixing a deadlock that occurred in the old code (even when single-threaded) when synchronization was possible between sibling clauses, as demonstrated in this testset I added:

ch = Channel()
@test @select(begin
ch <| "hi" => "put"
# This take!(ch) should not be triggered by the put
ch |> x => "take! |> $x"
# This wait(ch) should also not be triggered by the put
ch => "waiting on ch"
@async(sleep(0.3)) => "timeout"
end) == "timeout"

NHDaly and others added 18 commits September 3, 2019 13:06
… to do it for channel if i can assume value var is _always_ a name
We need to hold the lock around the ENTIRE OPERATION:

 - lock
 - while !isready_put(c) wait(c) end
 - _kill siblings!_ # we aren't doing this inside the lock right now.
 - mutate_channel(c)  # either take!(c) or put!(c)
 - unlock

So to do this, i'm going to try passing the mutate_channel operation
as a callback
… for select blocking: unite entire algorithm inside the lock
…tithreading exception

```julia
julia> Select.@select begin
           #ch |> x             => @info ch, x
           ch <| "hi"          => @info ch, take!(ch)
           ch <| "hey"         => @info ch, take!(ch)
       end
[ Info: Task 1 about to lock
[ Info: Task 2 about to lock
[ Info: Task 1 about to wait
[ Info: Task 1 woke: killing rivals
[ Info: 1
[ Info: (2, Task (runnable) @0x0000000127d19d50)
[ Info: done killing
[ Info: CAUGHT SelectInterrupt: Main.Select.SelectInterrupt(Task (runnable) @0x0000000127d19b10)
[ Info: Got event_val: hi
┌ Error: Exception while generating log record in module Main at /Users/nathan.daly/.julia/dev/Select/src/Select.jl:291
│   exception =
│    TypeError: in typeassert, expected Int32, got Nothing
│    Stacktrace:
│     [1] uv_write(::Base.TTY, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:935
│     [2] unsafe_write(::Base.TTY, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:1007
│     [3] unsafe_write at ./io.jl:593 [inlined]
│     [4] macro expansion at ./gcutils.jl:91 [inlined]
│     [5] write(::Base.TTY, ::Array{UInt8,1}) at ./io.jl:616
│     [6] #handle_message#2(::Nothing, ::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::typeof(Base.CoreLogging.handle_message), ::Logging.ConsoleLogger, ::Base.CoreLogging.LogLevel, ::String, ::Module, ::String, ::Symbol, ::String, ::Int64) at /Users/nathan.daly/builds/julia-1.4/usr/share/julia/stdlib/v1.4/Logging/src/ConsoleLogger.jl:161
│     [7] handle_message at /Users/nathan.daly/builds/julia-1.4/usr/share/julia/stdlib/v1.4/Logging/src/ConsoleLogger.jl:100 [inlined]
│     [8] macro expansion at ./logging.jl:320 [inlined]
│     [9] macro expansion at /Users/nathan.daly/.julia/dev/Select/src/Select.jl:291 [inlined]
│     [10] (::var"##159#161")() at ./task.jl:333
└ @ Main ~/.julia/dev/Select/src/Select.jl:291
[ Info: (Channel{Any}(sz_max:1,sz_curr:0), "hi")
```
…w uv multithreading exception"

This reverts commit e4d3ae85b6b2859b667cfb0f8d4e1301141a5fc7.
The problem is that there's a deadlock in this situation:
```julia
julia> ch = Channel()
Channel{Any}(sz_max:0,sz_curr:0)

julia> @async @info Select.@select begin
           ch <| "hi"          => "put"
           ch |> x             => "take! |> $x"
           @async(sleep(1))    => "timeout"
       end
Task (runnable) @0x0000000111fd06d0
```

Because Task 1 waits on "put", it allows Task 2 to immediately succeed
on its wait.
So then Task 2 gets the Lock, but no one is _actually_ ready to put!
into the channel.

The problem partly comes from this "fake-out" _wait_ without actually
take! or put!.
@NHDaly NHDaly merged commit 7fcd217 into master Sep 4, 2019
@NHDaly NHDaly deleted the reboot branch September 4, 2019 13:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant