Skip to content

Commit

Permalink
Restarts logic review
Browse files Browse the repository at this point in the history
  • Loading branch information
attdona committed Dec 5, 2024
1 parent a952844 commit ca3e6b7
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 50 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change Log

## 0.7.4 (-- December, 2024)

- Improved the logic of processes restart.

## 0.7.3 (17 November, 2024)

- Added from_name() API method. It may be useful if the process name contains dots.
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Visor"
uuid = "cf786855-3531-4b86-ba6e-3e33dce7dcdb"
authors = ["Attilio Donà"]
version = "0.7.3"
version = "0.7.4"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
88 changes: 46 additions & 42 deletions src/Visor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ mutable struct Supervisor <: Supervised
strategy::Symbol
terminateif::Symbol
evhandler::Union{Nothing,Function}
restarts::Vector{Supervised}
supervisor::Supervisor
inbox::Channel
task::Task
Expand All @@ -101,16 +100,7 @@ mutable struct Supervisor <: Supervised
evhandler=nothing,
)
return new(
id,
idle,
:undef,
processes,
intensity,
period,
strategy,
terminateif,
evhandler,
[],
id, idle, :undef, processes, intensity, period, strategy, terminateif, evhandler
)
end
function Supervisor(
Expand All @@ -133,7 +123,6 @@ mutable struct Supervisor <: Supervised
strategy,
terminateif,
evhandler,
[],
parent,
)
end
Expand Down Expand Up @@ -466,7 +455,11 @@ struct ProcessInterrupt <: Exception
end

"Trigger a supervisor resync"
struct SupervisorResync end
struct SupervisorResync
restarts::Vector{Supervised}
end

struct SupervisorShuttable end

# Returns the list of running nodes supervised by `supervisor`:
# processes and supervisors direct children.
Expand Down Expand Up @@ -553,14 +546,19 @@ function restart_policy(supervisor, process)
for p in stopped
delete!(supervisor.processes, p.id)
end
put!(supervisor.inbox, ProcessFatal(process))
msg = ProcessFatal(process)
handle_event(supervisor, msg)
put!(supervisor.inbox, msg)
else
process.isrestart = true
if !isnan(process.debounce_time)
sleep(process.debounce_time)
end

if process.status === idle
if supervisor.status === idle
# in the meantime supervisor shutted down, skip restart event
@debug "supervisor [$supervisor] terminated, skipping [$process] restart "
elseif process.status === idle
# a shutdown was issued, terminate the restarts
@debug "[$process]: honore the shutdown request"
delete!(supervisor.processes, process.id)
Expand All @@ -571,12 +569,14 @@ function restart_policy(supervisor, process)
# and then all child processes, including the terminated one, are restarted.
@debug "[$supervisor] restart strategy: $(supervisor.strategy)"
stopped = supervisor_shutdown(supervisor)
supervisor.restarts = stopped
put!(supervisor.inbox, SupervisorResync(stopped))
elseif supervisor.strategy === :rest_for_one
stopped = supervisor_shutdown(supervisor, process)
supervisor.restarts = stopped
put!(supervisor.inbox, SupervisorResync(stopped))
end
end

return nothing
end

function restart_processes(supervisor, procs)
Expand All @@ -591,7 +591,8 @@ function restart_processes(supervisor, procs)
break
end
end
return supervisor.restarts = []

return nothing
end

"""
Expand Down Expand Up @@ -739,15 +740,11 @@ function root_supervisor(process::Supervised)
end

function terminate_others(proc)
try
for (name, p) in collect(proc.supervisor.processes)
if p !== proc
p.status = idle
@async shutdown(p)
end
for (name, p) in collect(proc.supervisor.processes)
if p !== proc
p.status = idle
@async shutdown(p)
end
catch e
@info e
end
end

Expand All @@ -757,7 +754,9 @@ function normal_return(supervisor::Supervisor, child::Process)
if supervisor.strategy === :one_terminate_all
terminate_others(child)
elseif child.restart === :permanent
!child.onhold && restart_policy(supervisor, child)
if !child.onhold
restart_policy(supervisor, child)
end
else
@debug "[$supervisor] normal_return: [$child] done, onhold:$(child.onhold)"
if !child.onhold
Expand All @@ -766,6 +765,8 @@ function normal_return(supervisor::Supervisor, child::Process)
child.status = done
end
end

return nothing
end

function exitby_exception(supervisor::Supervisor, child::Process)
Expand All @@ -778,6 +779,8 @@ function exitby_exception(supervisor::Supervisor, child::Process)
@debug "[$supervisor] exitby_exception: delete [$child]"
end
end

return nothing
end

function exitby_forced_shutdown(supervisor::Supervisor, child::Process)
Expand Down Expand Up @@ -812,21 +815,25 @@ Restart processes previously stopped by supervisor policies.
Return true if all supervised processes terminated.
"""
function resync(supervisor)
if !isempty(supervisor.restarts)
@debug "[$supervisor] to be restarted: $(format4print(supervisor.restarts))"
function resync(supervisor, restarts::Vector{Supervised})
if !isempty(restarts)
@debug "[$supervisor] to be restarted: $(format4print(restarts))"
# check all required processes are terminated
if all(proc -> proc.status !== running, supervisor.restarts)
if all(proc -> proc.status !== running, restarts)
@debug "[$supervisor] restarting processes"
restart_processes(supervisor, supervisor.restarts)
restart_processes(supervisor, restarts)
end
end
return nothing
end

function shuttable(supervisor)
@debug "[$supervisor] procs:[$(format4print(supervisor.processes))], terminateif $(supervisor.terminateif)"
if supervisor.terminateif === :empty && isalldone(supervisor)
return true
put!(supervisor.inbox, SupervisorShuttable())
end
return false

return nothing
end

# Supervisor main loop.
Expand All @@ -839,9 +846,10 @@ function manage(supervisor)
supervisor_shutdown(supervisor, nothing, msg.reset)
break
elseif isa(msg, SupervisorResync)
# do nothing here, just a resync() is needed
resync(supervisor, msg.restarts)
elseif isa(msg, SupervisorShuttable)
break
elseif isa(msg, ProcessFatal)
@async handle_event(supervisor, msg)
@debug "[$supervisor] manage process fatal: process done [$(msg.process)]"
msg.process.status = done
elseif isa(msg, Supervised)
Expand All @@ -866,10 +874,6 @@ function manage(supervisor)
else
unknown_message(supervisor, msg)
end

if resync(supervisor)
break
end
end
catch e
@error "[$supervisor]: error: $e"
Expand Down Expand Up @@ -1449,14 +1453,14 @@ function wait_child(supervisor::Supervisor, process::Process)
@debug "removing temporary process $process"
delete!(supervisor.processes, process.id)
end
put!(supervisor.inbox, SupervisorResync())
shuttable(supervisor)
end
end

function wait_child(supervisor::Supervisor, process::Supervisor)
wait(process.task)
normal_return(supervisor, process)
put!(supervisor.inbox, SupervisorResync())
shuttable(supervisor)
return nothing
end

Expand Down
15 changes: 15 additions & 0 deletions test/coverage.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Pkg
using Coverage

try
Pkg.test("Visor"; coverage=true)
catch e
@error "coverage: $e"
finally
coverage = process_folder()
LCOV.writefile("lcov.info", coverage)
end

for dir in ["src", "test"]
foreach(rm, filter(endswith(".cov"), readdir(dir; join=true)))
end
9 changes: 9 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ logging(; debug=DEBUG == "0" ? [] : [Visor])
@time @safetestset "process" begin
include("test_process.jl")
end
@time @safetestset "setname" begin
include("test_setname.jl")
end
@time @safetestset "phase" begin
include("test_phase.jl")
end
@time @safetestset "one_terminate_all" begin
include("test_one_terminate_all.jl")
end
@time @safetestset "supervisor" begin
include("test_supervisor.jl")
end
Expand Down
7 changes: 6 additions & 1 deletion test/test_hierarchy.jl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ node = Visor.from_path(sv, ".sv1")
@test Visor.from_path(from("sv2.w3"), "w3") === from("sv2.w3")

t1 = Timer((tim) -> shutdown_request(handle, "sv1.sv1-3.w1"), 2)
t2 = Timer((tim) -> getrunning(handle), 3)
t2 = Timer((tim) -> getrunning(handle), 3.5)

n = Visor.nproc(sv)
@test n == 3
Expand All @@ -73,4 +73,9 @@ for (tst, result) in ttrace
@test result
end

process_tree = procs()
@test issetequal(keys(process_tree["root"]), ["sv1", "sv2"])
@test issetequal(keys(process_tree["root"]["sv1"]), ["sv1-3"])
@test isempty(process_tree["root"]["sv2"])

shutdown()
42 changes: 42 additions & 0 deletions test/test_one_terminate_all.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using Visor
using Test

terminated = true

function terminate(::Timer)
global terminated
terminated = false
@error "[tast_one_terminate_all] failed to stop all processes"
shutdown()

return nothing
end

function task_one(pd)
return sleep(0.5)
end

function task_all(pd)
for msg in pd.inbox
if isshutdown(msg)
break
end
end
end

Timer(terminate, 5)

try
#spec = [process(task_all), process(task_one), process(task_all)]
spec = [process("p1", task_all), process("p2", task_one), process("p3", task_all)]

sv = supervise(spec; strategy=:one_terminate_all, terminateif=:empty)

Visor.dump()
info = procs()
@info "processes: $info"

@test terminated
finally
shutdown()
end
28 changes: 28 additions & 0 deletions test/test_phase.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Visor
using Test

function terminate(::Timer)
return shutdown()
end

function mytask(pd)
for msg in pd.inbox
if isshutdown(msg)
phase = getphase(pd)
@info "[test_phase] phase=$phase"
break
end
end
end

proc = process(mytask)
spec = [proc]

Timer(terminate, 2)
sv = supervise(spec; wait=false)

setphase(proc, :ground)

wait(sv)

@test getphase(proc) === :ground
12 changes: 6 additions & 6 deletions test/test_restart_all.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ using Visor
using Test

#
#
#
# root
# /
# /
# s1
# /
# s1
# / \
# s11 w3
# / \
# w1 w2
#
# startup order: w1, w2, w3
# check :one_for_all restart strategy
# startup order: w1, w2, w3
# check :one_for_all restart strategy
#

#ENV["JULIA_DEBUG"] = Visor
Expand Down Expand Up @@ -55,7 +55,7 @@ s1_specs = [
process("w3", myworker; force_interrupt_after=1),
]

specs = [supervisor("s1", s1_specs; strategy=:one_for_all)]
specs = [supervisor("s1", s1_specs; strategy=:one_for_all, terminateif=:shutdown)]

handle = Visor.supervise(specs; wait=false)
Timer((tim) -> shutdown(handle), 10)
Expand Down
Loading

0 comments on commit ca3e6b7

Please sign in to comment.