Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce connections_lock to protect Server.connections (backport 1.9) #1162

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions src/Servers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ struct Server{L <: Listener}
connections::Set{Connection}
# server listenandserve loop task
task::Task
# Protects the connections Set which is mutated in the listenloop
# while potentially being accessed by the close method at the same time
connections_lock::ReentrantLock
end

port(s::Server) = Int(s.listener.addr.port)
Expand All @@ -127,8 +130,10 @@ Base.wait(s::Server) = wait(s.task)
function forceclose(s::Server)
shutdown(s.on_shutdown)
close(s.listener)
for c in s.connections
close(c)
Base.@lock s.connections_lock begin
for c in s.connections
close(c)
end
end
return wait(s.task)
end
Expand Down Expand Up @@ -166,14 +171,19 @@ function Base.close(s::Server)
shutdown(s.on_shutdown)
close(s.listener)
# first pass to mark or request connections to close
for c in s.connections
requestclose!(c)
Base.@lock s.connections_lock begin
for c in s.connections
requestclose!(c)
end
end
# second pass to wait for connections to close
# we wait for connections to empty because as
# connections close themselves, they are removed
# from our connections Set
while !isempty(s.connections)
while true
Base.@lock s.connections_lock begin
isempty(s.connections) && break
end
sleep(0.5 + rand() * 0.1)
end
return wait(s.task)
Expand Down Expand Up @@ -346,25 +356,28 @@ function listen!(f, listener::Listener;
access_log::Union{Function,Nothing}=nothing,
verbose=false, kw...)
conns = Set{Connection}()
conns_lock = ReentrantLock()
ready_to_accept = Threads.Event()
if verbose > 0
tsk = @_spawn_interactive LoggingExtras.withlevel(Logging.Debug; verbosity=verbose) do
listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose)
listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, conns_lock, verbose)
end
else
tsk = @_spawn_interactive listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose)
tsk = @_spawn_interactive listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, conns_lock, verbose)
end
# wait until the listenloop enters the loop
wait(ready_to_accept)
return Server(listener, on_shutdown, conns, tsk)
return Server(listener, on_shutdown, conns, tsk, conns_lock)
end

""""
Main server loop.
Accepts new tcp connections and spawns async tasks to handle them."
"""
function listenloop(f, listener, conns, tcpisvalid,
max_connections, readtimeout, access_log, ready_to_accept, verbose)
function listenloop(
f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept,
conns_lock, verbose
)
sem = Base.Semaphore(max_connections)
verbose >= 0 && @infov 1 "Listening on: $(listener.hostname):$(listener.hostport), thread id: $(Threads.threadid())"
notify(ready_to_accept)
Expand All @@ -382,13 +395,13 @@ function listenloop(f, listener, conns, tcpisvalid,
end
conn = Connection(io)
conn.state = IDLE
push!(conns, conn)
Base.@lock conns_lock push!(conns, conn)
conn.host, conn.port = listener.hostname, listener.hostport
@async try
handle_connection(f, conn, listener, readtimeout, access_log)
finally
# handle_connection is in charge of closing the underlying io
delete!(conns, conn)
Base.@lock conns_lock delete!(conns, conn)
Base.release(sem)
end
catch e
Expand Down
Loading