From 63d0a03f1bf8beb9419622a738e48dffb9136dbe Mon Sep 17 00:00:00 2001 From: Dawid Rusnak Date: Fri, 20 Dec 2024 11:09:02 +0100 Subject: [PATCH 1/3] fix(TKC-3004): kill nested processes on timeout properly --- .../orchestration/processes.go | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/cmd/testworkflow-init/orchestration/processes.go b/cmd/testworkflow-init/orchestration/processes.go index 559d849f17..30040c5a9e 100644 --- a/cmd/testworkflow-init/orchestration/processes.go +++ b/cmd/testworkflow-init/orchestration/processes.go @@ -35,21 +35,14 @@ func (p *processNode) Find(pid int32) []*processNode { } func (p *processNode) VirtualizePath(pid int32) { - path := p.Find(pid) - if path == nil { - return - } - - // Cannot virtualize itself - if len(path) == 1 { - return - } - - // Virtualize recursively - for i := 1; i < len(path); i++ { - delete(path[0].nodes, path[i]) - for node := range path[i].nodes { - path[0].nodes[node] = struct{}{} + for ps := range p.nodes { + if ps.pid == pid { + for sub := range ps.nodes { + p.nodes[sub] = struct{}{} + } + delete(p.nodes, ps) + } else { + ps.VirtualizePath(pid) } } } @@ -106,7 +99,10 @@ func (p *processNode) Resume() error { func (p *processNode) Kill() error { errs := make([]error, 0) if p.pid != -1 { - return errors.Wrap((&gopsutil.Process{Pid: p.pid}).Kill(), "killing processes") + err := errors.Wrap((&gopsutil.Process{Pid: p.pid}).Kill(), "killing processes") + if err != nil { + return err + } } for node := range p.nodes { err := node.Kill() From 12cbb2cb06876206c7cff2cbfd098a64bd0cf1fc Mon Sep 17 00:00:00 2001 From: Dawid Rusnak Date: Fri, 20 Dec 2024 11:36:31 +0100 Subject: [PATCH 2/3] fix: correctly set test workflow as failed on step timeout --- pkg/api/v1/testkube/model_test_workflow_step_status_extended.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/v1/testkube/model_test_workflow_step_status_extended.go b/pkg/api/v1/testkube/model_test_workflow_step_status_extended.go index 8b9a7e5a09..a84ab030d6 100644 --- a/pkg/api/v1/testkube/model_test_workflow_step_status_extended.go +++ b/pkg/api/v1/testkube/model_test_workflow_step_status_extended.go @@ -1,7 +1,7 @@ package testkube func (s *TestWorkflowStepStatus) Finished() bool { - return s != nil && *s != "" && *s != QUEUED_TestWorkflowStepStatus && *s != PAUSED_TestWorkflowStepStatus && *s != RUNNING_TestWorkflowStepStatus && *s != TIMEOUT_TestWorkflowStepStatus + return s != nil && *s != "" && *s != QUEUED_TestWorkflowStepStatus && *s != PAUSED_TestWorkflowStepStatus && *s != RUNNING_TestWorkflowStepStatus } func (s *TestWorkflowStepStatus) Aborted() bool { From 8221d309aa8973350cd77ddffddf10fb7151485a Mon Sep 17 00:00:00 2001 From: Dawid Rusnak Date: Fri, 20 Dec 2024 12:01:53 +0100 Subject: [PATCH 3/3] fix: make the step timeout graceful (allow continuing, allow retries --- cmd/testworkflow-init/main.go | 27 ++++++++++++++++--- .../orchestration/executions.go | 8 +++++- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/cmd/testworkflow-init/main.go b/cmd/testworkflow-init/main.go index 7ed32ecb50..f89f593114 100644 --- a/cmd/testworkflow-init/main.go +++ b/cmd/testworkflow-init/main.go @@ -3,10 +3,12 @@ package main import ( "encoding/json" "errors" + "fmt" "os" "os/signal" "slices" "strconv" + "sync/atomic" "syscall" "time" @@ -310,6 +312,7 @@ func main() { } // Configure timeout finalizer + var hasTimeout, hasOwnTimeout atomic.Bool finalizeTimeout := func() { // Check timed out steps in leaf timedOut := orchestration.GetTimedOut(leaf...) @@ -321,6 +324,10 @@ func main() { for _, r := range timedOut { r.SetStatus(constants.StepStatusTimeout) sub := state.GetSubSteps(r.Ref) + hasTimeout.Store(true) + if step.Ref == r.Ref { + hasOwnTimeout.Store(true) + } for i := range sub { if sub[i].IsFinished() { continue @@ -331,7 +338,6 @@ func main() { sub[i].SetStatus(constants.StepStatusSkipped) } } - stdoutUnsafe.Println("Timed out.") } _ = orchestration.Executions.Kill() @@ -358,6 +364,8 @@ func main() { } // Register timeouts + hasTimeout.Store(false) + hasOwnTimeout.Store(false) stopTimeoutWatcher := orchestration.WatchTimeout(finalizeTimeout, leaf...) // Run the command @@ -366,12 +374,17 @@ func main() { // Stop timer listener stopTimeoutWatcher() + // Handle timeout gracefully + if hasOwnTimeout.Load() { + orchestration.Executions.ClearAbortedStatus() + } + // Ensure there won't be any hanging processes after the command is executed _ = orchestration.Executions.Kill() // TODO: Handle retry policy in tree independently // Verify if there may be any other iteration - if step.Iteration >= step.Retry.Count { + if step.Iteration >= step.Retry.Count || (!hasOwnTimeout.Load() && hasTimeout.Load()) { break } @@ -393,7 +406,15 @@ func main() { // Continue with the next iteration step.Iteration++ stdout.HintDetails(step.Ref, constants.InstructionIteration, step.Iteration) - stdoutUnsafe.Printf("\nExit code: %d • Retrying: attempt #%d (of %d):\n", step.ExitCode, step.Iteration, step.Retry.Count) + message := fmt.Sprintf("Exit code: %d", step.ExitCode) + if hasOwnTimeout.Load() { + message = "Timed out" + } + stdoutUnsafe.Printf("\n%s • Retrying: attempt #%d (of %d):\n", message, step.Iteration, step.Retry.Count) + + // Restart start time for the next iteration to allow retries + now := time.Now() + step.StartedAt = &now } } diff --git a/cmd/testworkflow-init/orchestration/executions.go b/cmd/testworkflow-init/orchestration/executions.go index 5bf0eadd57..f1911bfcf4 100644 --- a/cmd/testworkflow-init/orchestration/executions.go +++ b/cmd/testworkflow-init/orchestration/executions.go @@ -34,6 +34,8 @@ type executionGroup struct { paused atomic.Bool pauseMu sync.Mutex + + softKillProgress atomic.Bool } func newExecutionGroup(outStream io.Writer, errStream io.Writer) *executionGroup { @@ -142,6 +144,10 @@ func (e *executionGroup) IsAborted() bool { return e.aborted.Load() } +func (e *executionGroup) ClearAbortedStatus() { + e.aborted.Store(false) +} + type execution struct { cmd *exec.Cmd cmdMu sync.Mutex @@ -203,7 +209,7 @@ func (e *execution) Run() (*executionResult, error) { // Mark the execution group as aborted when this process was aborted. // In Kubernetes, when that child process is killed, it may mean OOM Kill. - if aborted && !e.group.aborted.Load() { + if aborted && !e.group.aborted.Load() && !e.group.softKillProgress.Load() { e.group.Abort() }