From 0c731a96bb856a5ce9a628b699fe4351e2f72cc9 Mon Sep 17 00:00:00 2001 From: Dawid Rusnak Date: Fri, 20 Dec 2024 12:29:34 +0100 Subject: [PATCH 1/3] fix(TKC-3004): kill nested processes on timeout properly (#6097) * fix: kill nested processes on timeout properly * fix: correctly set test workflow as failed on step timeout * fix: make the step timeout graceful (allow continuing, allow retries --- cmd/testworkflow-init/main.go | 27 ++++++++++++++++-- .../orchestration/executions.go | 8 +++++- .../orchestration/processes.go | 28 ++++++++----------- ...odel_test_workflow_step_status_extended.go | 2 +- 4 files changed, 44 insertions(+), 21 deletions(-) diff --git a/cmd/testworkflow-init/main.go b/cmd/testworkflow-init/main.go index 7ed32ecb502..f89f5931144 100644 --- a/cmd/testworkflow-init/main.go +++ b/cmd/testworkflow-init/main.go @@ -3,10 +3,12 @@ package main import ( "encoding/json" "errors" + "fmt" "os" "os/signal" "slices" "strconv" + "sync/atomic" "syscall" "time" @@ -310,6 +312,7 @@ func main() { } // Configure timeout finalizer + var hasTimeout, hasOwnTimeout atomic.Bool finalizeTimeout := func() { // Check timed out steps in leaf timedOut := orchestration.GetTimedOut(leaf...) @@ -321,6 +324,10 @@ func main() { for _, r := range timedOut { r.SetStatus(constants.StepStatusTimeout) sub := state.GetSubSteps(r.Ref) + hasTimeout.Store(true) + if step.Ref == r.Ref { + hasOwnTimeout.Store(true) + } for i := range sub { if sub[i].IsFinished() { continue @@ -331,7 +338,6 @@ func main() { sub[i].SetStatus(constants.StepStatusSkipped) } } - stdoutUnsafe.Println("Timed out.") } _ = orchestration.Executions.Kill() @@ -358,6 +364,8 @@ func main() { } // Register timeouts + hasTimeout.Store(false) + hasOwnTimeout.Store(false) stopTimeoutWatcher := orchestration.WatchTimeout(finalizeTimeout, leaf...) // Run the command @@ -366,12 +374,17 @@ func main() { // Stop timer listener stopTimeoutWatcher() + // Handle timeout gracefully + if hasOwnTimeout.Load() { + orchestration.Executions.ClearAbortedStatus() + } + // Ensure there won't be any hanging processes after the command is executed _ = orchestration.Executions.Kill() // TODO: Handle retry policy in tree independently // Verify if there may be any other iteration - if step.Iteration >= step.Retry.Count { + if step.Iteration >= step.Retry.Count || (!hasOwnTimeout.Load() && hasTimeout.Load()) { break } @@ -393,7 +406,15 @@ func main() { // Continue with the next iteration step.Iteration++ stdout.HintDetails(step.Ref, constants.InstructionIteration, step.Iteration) - stdoutUnsafe.Printf("\nExit code: %d • Retrying: attempt #%d (of %d):\n", step.ExitCode, step.Iteration, step.Retry.Count) + message := fmt.Sprintf("Exit code: %d", step.ExitCode) + if hasOwnTimeout.Load() { + message = "Timed out" + } + stdoutUnsafe.Printf("\n%s • Retrying: attempt #%d (of %d):\n", message, step.Iteration, step.Retry.Count) + + // Restart start time for the next iteration to allow retries + now := time.Now() + step.StartedAt = &now } } diff --git a/cmd/testworkflow-init/orchestration/executions.go b/cmd/testworkflow-init/orchestration/executions.go index 5bf0eadd57a..f1911bfcf4d 100644 --- a/cmd/testworkflow-init/orchestration/executions.go +++ b/cmd/testworkflow-init/orchestration/executions.go @@ -34,6 +34,8 @@ type executionGroup struct { paused atomic.Bool pauseMu sync.Mutex + + softKillProgress atomic.Bool } func newExecutionGroup(outStream io.Writer, errStream io.Writer) *executionGroup { @@ -142,6 +144,10 @@ func (e *executionGroup) IsAborted() bool { return e.aborted.Load() } +func (e *executionGroup) ClearAbortedStatus() { + e.aborted.Store(false) +} + type execution struct { cmd *exec.Cmd cmdMu sync.Mutex @@ -203,7 +209,7 @@ func (e *execution) Run() (*executionResult, error) { // Mark the execution group as aborted when this process was aborted. // In Kubernetes, when that child process is killed, it may mean OOM Kill. - if aborted && !e.group.aborted.Load() { + if aborted && !e.group.aborted.Load() && !e.group.softKillProgress.Load() { e.group.Abort() } diff --git a/cmd/testworkflow-init/orchestration/processes.go b/cmd/testworkflow-init/orchestration/processes.go index 559d849f174..30040c5a9e7 100644 --- a/cmd/testworkflow-init/orchestration/processes.go +++ b/cmd/testworkflow-init/orchestration/processes.go @@ -35,21 +35,14 @@ func (p *processNode) Find(pid int32) []*processNode { } func (p *processNode) VirtualizePath(pid int32) { - path := p.Find(pid) - if path == nil { - return - } - - // Cannot virtualize itself - if len(path) == 1 { - return - } - - // Virtualize recursively - for i := 1; i < len(path); i++ { - delete(path[0].nodes, path[i]) - for node := range path[i].nodes { - path[0].nodes[node] = struct{}{} + for ps := range p.nodes { + if ps.pid == pid { + for sub := range ps.nodes { + p.nodes[sub] = struct{}{} + } + delete(p.nodes, ps) + } else { + ps.VirtualizePath(pid) } } } @@ -106,7 +99,10 @@ func (p *processNode) Resume() error { func (p *processNode) Kill() error { errs := make([]error, 0) if p.pid != -1 { - return errors.Wrap((&gopsutil.Process{Pid: p.pid}).Kill(), "killing processes") + err := errors.Wrap((&gopsutil.Process{Pid: p.pid}).Kill(), "killing processes") + if err != nil { + return err + } } for node := range p.nodes { err := node.Kill() diff --git a/pkg/api/v1/testkube/model_test_workflow_step_status_extended.go b/pkg/api/v1/testkube/model_test_workflow_step_status_extended.go index 8b9a7e5a090..a84ab030d63 100644 --- a/pkg/api/v1/testkube/model_test_workflow_step_status_extended.go +++ b/pkg/api/v1/testkube/model_test_workflow_step_status_extended.go @@ -1,7 +1,7 @@ package testkube func (s *TestWorkflowStepStatus) Finished() bool { - return s != nil && *s != "" && *s != QUEUED_TestWorkflowStepStatus && *s != PAUSED_TestWorkflowStepStatus && *s != RUNNING_TestWorkflowStepStatus && *s != TIMEOUT_TestWorkflowStepStatus + return s != nil && *s != "" && *s != QUEUED_TestWorkflowStepStatus && *s != PAUSED_TestWorkflowStepStatus && *s != RUNNING_TestWorkflowStepStatus } func (s *TestWorkflowStepStatus) Aborted() bool { From 0d10bf24610b578a64ecccf3f82907c81967c2d2 Mon Sep 17 00:00:00 2001 From: Dawid Rusnak Date: Fri, 20 Dec 2024 12:44:43 +0100 Subject: [PATCH 2/3] fix(TKC-2934): fix order of merging the templates (#6098) --- .../testworkflowresolver/apply.go | 12 +- .../testworkflowresolver/apply_test.go | 117 ++++++++++++++---- 2 files changed, 101 insertions(+), 28 deletions(-) diff --git a/pkg/testworkflows/testworkflowresolver/apply.go b/pkg/testworkflows/testworkflowresolver/apply.go index 07ee151165e..a5ab59b3463 100644 --- a/pkg/testworkflows/testworkflowresolver/apply.go +++ b/pkg/testworkflows/testworkflowresolver/apply.go @@ -133,7 +133,8 @@ func InjectServiceTemplate(svc *testworkflowsv1.ServiceSpec, template testworkfl func applyTemplatesToStep(step testworkflowsv1.Step, templates map[string]testworkflowsv1.TestWorkflowTemplate, externalize func(key, value string) (*corev1.EnvVarSource, error)) (testworkflowsv1.Step, error) { // Apply regular templates - for i, ref := range step.Use { + for i := len(step.Use) - 1; i >= 0; i-- { + ref := step.Use[i] tpl, err := getConfiguredTemplate(ref.Name, ref.Config, templates, externalize) if err != nil { return step, errors.Wrap(err, fmt.Sprintf(".use[%d]: resolving template", i)) @@ -170,7 +171,8 @@ func applyTemplatesToStep(step testworkflowsv1.Step, templates map[string]testwo // Apply templates to the services for name, svc := range step.Services { - for i, ref := range svc.Use { + for i := len(svc.Use) - 1; i >= 0; i-- { + ref := svc.Use[i] tpl, err := getConfiguredTemplate(ref.Name, ref.Config, templates, externalize) if err != nil { return step, errors.Wrap(err, fmt.Sprintf("services[%s].use[%d]: resolving template", name, i)) @@ -268,7 +270,8 @@ func applyTemplatesToSpec(spec *testworkflowsv1.TestWorkflowSpec, templates map[ defer expressions.Simplify(spec, expressions.ReplacePrefixMachine(random+".", "config.")) // Apply top-level templates - for i, ref := range spec.Use { + for i := len(spec.Use) - 1; i >= 0; i-- { + ref := spec.Use[i] tpl, err := getConfiguredTemplate(ref.Name, ref.Config, templates, externalize) if err != nil { return errors.Wrap(err, fmt.Sprintf("spec.use[%d]: resolving template", i)) @@ -282,7 +285,8 @@ func applyTemplatesToSpec(spec *testworkflowsv1.TestWorkflowSpec, templates map[ // Apply templates to the services for name, svc := range spec.Services { - for i, ref := range svc.Use { + for i := len(svc.Use) - 1; i >= 0; i-- { + ref := svc.Use[i] tpl, err := getConfiguredTemplate(ref.Name, ref.Config, templates, externalize) if err != nil { return errors.Wrap(err, fmt.Sprintf("services[%s].use[%d]: resolving template", name, i)) diff --git a/pkg/testworkflows/testworkflowresolver/apply_test.go b/pkg/testworkflows/testworkflowresolver/apply_test.go index 8e5137fd3d2..e0617373ae4 100644 --- a/pkg/testworkflows/testworkflowresolver/apply_test.go +++ b/pkg/testworkflows/testworkflowresolver/apply_test.go @@ -296,23 +296,23 @@ func TestApplyTemplatesMergeMultipleTopLevelSteps(t *testing.T) { want := workflowSteps.DeepCopy() want.Spec.Setup = []testworkflowsv1.Step{ - ConvertIndependentStepToStep(tplStepsConfig.Spec.Setup[0]), ConvertIndependentStepToStep(tplSteps.Spec.Setup[0]), + ConvertIndependentStepToStep(tplStepsConfig.Spec.Setup[0]), want.Spec.Setup[0], } - want.Spec.Setup[0].Name = "setup-tpl-test-20" + want.Spec.Setup[1].Name = "setup-tpl-test-20" want.Spec.Steps = []testworkflowsv1.Step{ - ConvertIndependentStepToStep(tplStepsConfig.Spec.Steps[0]), ConvertIndependentStepToStep(tplSteps.Spec.Steps[0]), + ConvertIndependentStepToStep(tplStepsConfig.Spec.Steps[0]), want.Spec.Steps[0], } - want.Spec.Steps[0].Name = "steps-tpl-test-20" + want.Spec.Steps[1].Name = "steps-tpl-test-20" want.Spec.After = []testworkflowsv1.Step{ want.Spec.After[0], - ConvertIndependentStepToStep(tplSteps.Spec.After[0]), ConvertIndependentStepToStep(tplStepsConfig.Spec.After[0]), + ConvertIndependentStepToStep(tplSteps.Spec.After[0]), } - want.Spec.After[2].Name = "after-tpl-test-20" + want.Spec.After[1].Name = "after-tpl-test-20" assert.NoError(t, err) assert.Equal(t, want, wf) @@ -329,22 +329,22 @@ func TestApplyTemplatesMergeMultipleConfigurable(t *testing.T) { ConvertIndependentStepToStep(tplStepsConfig.Spec.Setup[0]), want.Spec.Setup[0], } - want.Spec.Setup[0].Name = "setup-tpl-test-20" - want.Spec.Setup[1].Name = "setup-tpl-test-10" + want.Spec.Setup[0].Name = "setup-tpl-test-10" + want.Spec.Setup[1].Name = "setup-tpl-test-20" want.Spec.Steps = []testworkflowsv1.Step{ ConvertIndependentStepToStep(tplStepsConfig.Spec.Steps[0]), ConvertIndependentStepToStep(tplStepsConfig.Spec.Steps[0]), want.Spec.Steps[0], } - want.Spec.Steps[0].Name = "steps-tpl-test-20" - want.Spec.Steps[1].Name = "steps-tpl-test-10" + want.Spec.Steps[0].Name = "steps-tpl-test-10" + want.Spec.Steps[1].Name = "steps-tpl-test-20" want.Spec.After = []testworkflowsv1.Step{ want.Spec.After[0], ConvertIndependentStepToStep(tplStepsConfig.Spec.After[0]), ConvertIndependentStepToStep(tplStepsConfig.Spec.After[0]), } - want.Spec.After[1].Name = "after-tpl-test-10" - want.Spec.After[2].Name = "after-tpl-test-20" + want.Spec.After[1].Name = "after-tpl-test-20" + want.Spec.After[2].Name = "after-tpl-test-10" assert.NoError(t, err) assert.Equal(t, want, wf) @@ -449,19 +449,19 @@ func TestApplyTemplatesStepBasicMultipleSteps(t *testing.T) { want := *basicStep.DeepCopy() want.Setup = []testworkflowsv1.Step{ - ConvertIndependentStepToStep(tplStepsConfig.Spec.Setup[0]), ConvertIndependentStepToStep(tplSteps.Spec.Setup[0]), + ConvertIndependentStepToStep(tplStepsConfig.Spec.Setup[0]), } want.Steps = append([]testworkflowsv1.Step{ - ConvertIndependentStepToStep(tplStepsConfig.Spec.Steps[0]), ConvertIndependentStepToStep(tplSteps.Spec.Steps[0]), + ConvertIndependentStepToStep(tplStepsConfig.Spec.Steps[0]), }, append(want.Steps, []testworkflowsv1.Step{ - ConvertIndependentStepToStep(tplSteps.Spec.After[0]), ConvertIndependentStepToStep(tplStepsConfig.Spec.After[0]), + ConvertIndependentStepToStep(tplSteps.Spec.After[0]), }...)...) - want.Setup[0].Name = "setup-tpl-test-20" - want.Steps[0].Name = "steps-tpl-test-20" - want.Steps[3].Name = "after-tpl-test-20" + want.Setup[1].Name = "setup-tpl-test-20" + want.Steps[1].Name = "steps-tpl-test-20" + want.Steps[2].Name = "after-tpl-test-20" assert.NoError(t, err) assert.Equal(t, want, s) @@ -569,19 +569,19 @@ func TestApplyTemplatesStepAdvancedMultipleSteps(t *testing.T) { want := *advancedStep.DeepCopy() want.Setup = []testworkflowsv1.Step{ - ConvertIndependentStepToStep(tplStepsConfig.Spec.Setup[0]), ConvertIndependentStepToStep(tplSteps.Spec.Setup[0]), + ConvertIndependentStepToStep(tplStepsConfig.Spec.Setup[0]), } want.Steps = append([]testworkflowsv1.Step{ - ConvertIndependentStepToStep(tplStepsConfig.Spec.Steps[0]), ConvertIndependentStepToStep(tplSteps.Spec.Steps[0]), + ConvertIndependentStepToStep(tplStepsConfig.Spec.Steps[0]), }, append(want.Steps, []testworkflowsv1.Step{ - ConvertIndependentStepToStep(tplSteps.Spec.After[0]), ConvertIndependentStepToStep(tplStepsConfig.Spec.After[0]), + ConvertIndependentStepToStep(tplSteps.Spec.After[0]), }...)...) - want.Setup[0].Name = "setup-tpl-test-20" - want.Steps[0].Name = "steps-tpl-test-20" - want.Steps[4].Name = "after-tpl-test-20" + want.Setup[1].Name = "setup-tpl-test-20" + want.Steps[1].Name = "steps-tpl-test-20" + want.Steps[3].Name = "after-tpl-test-20" assert.NoError(t, err) assert.Equal(t, want, s) @@ -673,3 +673,72 @@ func TestApplyTemplates_ConditionAlways(t *testing.T) { assert.NoError(t, err) assert.Equal(t, want, wf) } + +func TestApplyTemplates_MergePodValues(t *testing.T) { + tpls := map[string]testworkflowsv1.TestWorkflowTemplate{ + "top": { + Spec: testworkflowsv1.TestWorkflowTemplateSpec{ + TestWorkflowSpecBase: testworkflowsv1.TestWorkflowSpecBase{ + Pod: &testworkflowsv1.PodConfig{ + Labels: map[string]string{ + "label1": "topvalue", + "label2": "topvalue", + "label3": "topvalue", + }, + }, + }, + }, + }, + "middle": { + Spec: testworkflowsv1.TestWorkflowTemplateSpec{ + TestWorkflowSpecBase: testworkflowsv1.TestWorkflowSpecBase{ + Pod: &testworkflowsv1.PodConfig{ + Labels: map[string]string{ + "label1": "middlevalue", + "label2": "middlevalue", + }, + }, + }, + }, + }, + } + wf := &testworkflowsv1.TestWorkflow{ + Spec: testworkflowsv1.TestWorkflowSpec{ + TestWorkflowSpecBase: testworkflowsv1.TestWorkflowSpecBase{ + Pod: &testworkflowsv1.PodConfig{ + Labels: map[string]string{ + "label1": "workflowvalue", + }, + }, + }, + Use: []testworkflowsv1.TemplateRef{ + {Name: "top"}, + {Name: "middle"}, + }, + Steps: []testworkflowsv1.Step{ + {StepOperations: testworkflowsv1.StepOperations{Shell: "exit 0"}}, + }, + }, + } + err := ApplyTemplates(wf, tpls, nil) + + want := &testworkflowsv1.TestWorkflow{ + Spec: testworkflowsv1.TestWorkflowSpec{ + TestWorkflowSpecBase: testworkflowsv1.TestWorkflowSpecBase{ + Pod: &testworkflowsv1.PodConfig{ + Labels: map[string]string{ + "label1": "workflowvalue", + "label2": "middlevalue", + "label3": "topvalue", + }, + }, + }, + Steps: []testworkflowsv1.Step{ + {StepOperations: testworkflowsv1.StepOperations{Shell: "exit 0"}}, + }, + }, + } + + assert.NoError(t, err) + assert.Equal(t, want, wf) +} From 5bd1dd5bbdf4bf53038c06ab3107df77619d920a Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Mon, 23 Dec 2024 20:04:53 +0300 Subject: [PATCH 3/3] fix: add mapping for services for twt spec (#6102) Signed-off-by: Vladislav Sukhin --- pkg/mapper/testworkflows/kube_openapi.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/mapper/testworkflows/kube_openapi.go b/pkg/mapper/testworkflows/kube_openapi.go index f94000934d3..6d56ba2fd10 100644 --- a/pkg/mapper/testworkflows/kube_openapi.go +++ b/pkg/mapper/testworkflows/kube_openapi.go @@ -1148,6 +1148,7 @@ func MapTemplateSpecKubeToAPI(v testworkflowsv1.TestWorkflowTemplateSpec) testku Config: common.MapMap(v.Config, MapParameterSchemaKubeToAPI), System: common.MapPtr(v.System, MapSystemKubeToAPI), Content: common.MapPtr(v.Content, MapContentKubeToAPI), + Services: common.MapMap(v.Services, MapIndependentServiceSpecKubeToAPI), Container: common.MapPtr(v.Container, MapContainerConfigKubeToAPI), Job: common.MapPtr(v.Job, MapJobConfigKubeToAPI), Pod: common.MapPtr(v.Pod, MapPodConfigKubeToAPI),