diff --git a/api/v1/testkube.yaml b/api/v1/testkube.yaml index f9fd410108f..909997b0873 100644 --- a/api/v1/testkube.yaml +++ b/api/v1/testkube.yaml @@ -7743,6 +7743,9 @@ components: description: log content, if it's just a log. note, that it includes 30 chars timestamp + space output: $ref: "#/components/schemas/TestWorkflowOutput" + temporary: + type: boolean + description: should it be considered temporary only for execution time TestWorkflowOutput: type: object diff --git a/cmd/kubectl-testkube/commands/testworkflows/run.go b/cmd/kubectl-testkube/commands/testworkflows/run.go index f160b77fe78..ab1a668cee5 100644 --- a/cmd/kubectl-testkube/commands/testworkflows/run.go +++ b/cmd/kubectl-testkube/commands/testworkflows/run.go @@ -5,13 +5,13 @@ import ( "os" "strings" "time" - "unicode" "github.com/spf13/cobra" "github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/common" "github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/common/render" "github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/testworkflows/renderer" + "github.com/kubeshop/testkube/cmd/testworkflow-init/data" apiclientv1 "github.com/kubeshop/testkube/pkg/api/v1/client" "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/ui" @@ -148,33 +148,35 @@ func flattenSignatures(sig []testkube.TestWorkflowSignature) []testkube.TestWork return res } +func printSingleResultDifference(r1 testkube.TestWorkflowStepResult, r2 testkube.TestWorkflowStepResult, signature testkube.TestWorkflowSignature, index int, steps int) bool { + r1Status := testkube.QUEUED_TestWorkflowStepStatus + r2Status := testkube.QUEUED_TestWorkflowStepStatus + if r1.Status != nil { + r1Status = *r1.Status + } + if r2.Status != nil { + r2Status = *r2.Status + } + if r1Status == r2Status { + return false + } + name := signature.Category + if signature.Name != "" { + name = signature.Name + } + took := r2.FinishedAt.Sub(r2.QueuedAt).Round(time.Millisecond) + + printStatus(signature, r2Status, took, index, steps, name) + return true +} + func printResultDifference(res1 *testkube.TestWorkflowResult, res2 *testkube.TestWorkflowResult, steps []testkube.TestWorkflowSignature) bool { if res1 == nil || res2 == nil { return false } - changed := false + changed := printSingleResultDifference(*res1.Initialization, *res2.Initialization, testkube.TestWorkflowSignature{Name: "Initializing"}, -1, len(steps)) for i, s := range steps { - r1 := res1.Steps[s.Ref] - r2 := res2.Steps[s.Ref] - r1Status := testkube.QUEUED_TestWorkflowStepStatus - r2Status := testkube.QUEUED_TestWorkflowStepStatus - if r1.Status != nil { - r1Status = *r1.Status - } - if r2.Status != nil { - r2Status = *r2.Status - } - if r1Status == r2Status { - continue - } - name := s.Category - if s.Name != "" { - name = s.Name - } - took := r2.FinishedAt.Sub(r2.QueuedAt).Round(time.Millisecond) - changed = true - - printStatus(s, r2Status, took, i, len(steps), name) + changed = changed || printSingleResultDifference(res1.Steps[s.Ref], res2.Steps[s.Ref], s, i, len(steps)) } return changed @@ -204,7 +206,9 @@ func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature continue } if l.Result != nil { - isLineBeginning = printResultDifference(result, l.Result, steps) + if printResultDifference(result, l.Result, steps) { + isLineBeginning = true + } result = l.Result continue } @@ -217,11 +221,19 @@ func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature return result, err } +func printStatusHeader(i, n int, name string) { + if i == -1 { + fmt.Print(ui.LightCyan(fmt.Sprintf("\n• %s\n", name))) + } else { + fmt.Print(ui.LightCyan(fmt.Sprintf("\n• (%d/%d) %s\n", i+1, n, name))) + } +} + func printStatus(s testkube.TestWorkflowSignature, rStatus testkube.TestWorkflowStepStatus, took time.Duration, i, n int, name string) { switch rStatus { case testkube.RUNNING_TestWorkflowStepStatus: - fmt.Print(ui.LightCyan(fmt.Sprintf("\n• (%d/%d) %s\n", i+1, n, name))) + printStatusHeader(i, n, name) case testkube.SKIPPED_TestWorkflowStepStatus: fmt.Print(ui.LightGray("• skipped\n")) case testkube.PASSED_TestWorkflowStepStatus: @@ -259,58 +271,46 @@ func printStructuredLogLines(logs string, isLineBeginning *bool) { func printRawLogLines(logs string, steps map[string]testkube.TestWorkflowSignature, results map[string]testkube.TestWorkflowStepResult) { - isLineBeginning := true previousStep := "" i := 0 + printStatusHeader(-1, len(steps), "Initializing") // Strip timestamp + space for all new lines in the log for len(logs) > 0 { - if isLineBeginning { - newLineIndex := strings.Index(logs, "\n") - if newLineIndex >= LogTimestampLength-1 { - logs = logs[getTimestampLength(logs)+1:] - isLineBeginning = false - } else { - if newLineIndex != -1 { - name := logs[:newLineIndex] - cleanName := strings.TrimFunc(name, func(r rune) bool { - return !unicode.IsGraphic(r) - }) - - cleanName = strings.TrimFunc(strings.TrimSuffix(cleanName, "start"), func(r rune) bool { - return !unicode.IsGraphic(r) - }) - - if step, ok := steps[cleanName]; ok { - stepName := step.Category - if step.Name != "" { - stepName = step.Name - } - - if ps, ok := results[previousStep]; ok && ps.Status != nil { - if step, ok := steps[previousStep]; ok { - took := ps.FinishedAt.Sub(ps.QueuedAt).Round(time.Millisecond) - printStatus(step, *ps.Status, took, i, len(steps), stepName) - } - } - - fmt.Print(ui.LightCyan(fmt.Sprintf("\n• %s\n", stepName))) - previousStep = cleanName - i++ - } - - logs = strings.TrimPrefix(logs, name) - } - } - } - newLineIndex := strings.Index(logs, "\n") if newLineIndex == -1 { fmt.Print(logs) break + } + + line := logs[0:newLineIndex] + logs = logs[newLineIndex+1:] + + if newLineIndex >= LogTimestampLength-1 { + line = line[getTimestampLength(line)+1:] + } + + start := data.StartHintRe.FindStringSubmatch(line) + if len(start) > 0 { + ref := start[1] + if step, ok := steps[ref]; ok { + stepName := step.Category + if step.Name != "" { + stepName = step.Name + } + + if ps, ok := results[previousStep]; ok && ps.Status != nil { + if step, ok := steps[previousStep]; ok { + took := ps.FinishedAt.Sub(ps.QueuedAt).Round(time.Millisecond) + printStatus(step, *ps.Status, took, i, len(steps), stepName) + } + } + + printStatusHeader(i, len(steps), stepName) + previousStep = ref + i++ + } } else { - fmt.Print(logs[0 : newLineIndex+1]) - logs = logs[newLineIndex+1:] - isLineBeginning = true + fmt.Println(line) } } diff --git a/cmd/testworkflow-init/data/emit.go b/cmd/testworkflow-init/data/emit.go index 94992684a66..911ed287b68 100644 --- a/cmd/testworkflow-init/data/emit.go +++ b/cmd/testworkflow-init/data/emit.go @@ -17,6 +17,9 @@ const ( var instructionRe = regexp.MustCompile(fmt.Sprintf(`^%s(%s)?([^%s]+)%s([a-zA-Z0-9-_.]+)(?:%s([^\n]+))?%s$`, InstructionPrefix, HintPrefix, InstructionSeparator, InstructionSeparator, InstructionValueSeparator, InstructionSeparator)) +var StartHintRe = regexp.MustCompile(fmt.Sprintf(`^%s%s([^%s]+)%sstart%s$`, + InstructionPrefix, HintPrefix, InstructionSeparator, InstructionSeparator, InstructionSeparator)) + type Instruction struct { Ref string Name string diff --git a/pkg/api/v1/testkube/model_test_workflow_execution_notification.go b/pkg/api/v1/testkube/model_test_workflow_execution_notification.go index 97d3ee69626..3da9914bcca 100644 --- a/pkg/api/v1/testkube/model_test_workflow_execution_notification.go +++ b/pkg/api/v1/testkube/model_test_workflow_execution_notification.go @@ -22,4 +22,6 @@ type TestWorkflowExecutionNotification struct { // log content, if it's just a log. note, that it includes 30 chars timestamp + space Log string `json:"log,omitempty"` Output *TestWorkflowOutput `json:"output,omitempty"` + // should it be considered temporary only for execution time + Temporary bool `json:"temporary,omitempty"` } diff --git a/pkg/testworkflows/testworkflowcontroller/controller.go b/pkg/testworkflows/testworkflowcontroller/controller.go index 6bbcec99da1..c60d1ac7ec5 100644 --- a/pkg/testworkflows/testworkflowcontroller/controller.go +++ b/pkg/testworkflows/testworkflowcontroller/controller.go @@ -288,8 +288,8 @@ func (c *controller) Logs(parentCtx context.Context, follow bool) io.Reader { return } for v := range w.Channel() { - if v.Error == nil && v.Value.Log != "" { - if ref != v.Value.Ref { + if v.Error == nil && v.Value.Log != "" && !v.Value.Temporary { + if ref != v.Value.Ref && v.Value.Ref != "" { ref = v.Value.Ref _, _ = writer.Write([]byte(data.SprintHint(ref, initconstants.InstructionStart))) } diff --git a/pkg/testworkflows/testworkflowcontroller/notification.go b/pkg/testworkflows/testworkflowcontroller/notification.go index 9b7f8fc66f9..03e838b45f1 100644 --- a/pkg/testworkflows/testworkflowcontroller/notification.go +++ b/pkg/testworkflows/testworkflowcontroller/notification.go @@ -15,15 +15,17 @@ type Notification struct { Ref string `json:"ref,omitempty"` Log string `json:"log,omitempty"` Output *data.Instruction `json:"output,omitempty"` + Temporary bool `json:"temporary,omitempty"` } func (n *Notification) ToInternal() testkube.TestWorkflowExecutionNotification { return testkube.TestWorkflowExecutionNotification{ - Ts: n.Timestamp, - Result: n.Result, - Ref: n.Ref, - Log: n.Log, - Output: InstructionToInternal(n.Output), + Ts: n.Timestamp, + Result: n.Result, + Ref: n.Ref, + Log: n.Log, + Output: InstructionToInternal(n.Output), + Temporary: n.Temporary, } } diff --git a/pkg/testworkflows/testworkflowcontroller/notifier.go b/pkg/testworkflows/testworkflowcontroller/notifier.go index f880f63d1a4..f5b6f145669 100644 --- a/pkg/testworkflows/testworkflowcontroller/notifier.go +++ b/pkg/testworkflows/testworkflowcontroller/notifier.go @@ -40,7 +40,7 @@ func (n *notifier) RegisterTimestamp(ref string, t time.Time) { } } -func (n *notifier) Raw(ref string, ts time.Time, message string) { +func (n *notifier) Raw(ref string, ts time.Time, message string, temporary bool) { if message != "" { if ref == InitContainerName { ref = "" @@ -50,6 +50,7 @@ func (n *notifier) Raw(ref string, ts time.Time, message string) { Timestamp: ts.UTC(), Log: message, Ref: ref, + Temporary: temporary, }) } } @@ -57,7 +58,7 @@ func (n *notifier) Raw(ref string, ts time.Time, message string) { func (n *notifier) Log(ref string, ts time.Time, message string) { if message != "" { n.RegisterTimestamp(ref, ts) - n.Raw(ref, ts, fmt.Sprintf("%s %s", ts.Format(KubernetesLogTimeFormat), message)) + n.Raw(ref, ts, fmt.Sprintf("%s %s", ts.Format(KubernetesLogTimeFormat), message), false) } } @@ -65,8 +66,14 @@ func (n *notifier) Error(err error) { n.watcher.Error(err) } -func (n *notifier) Warning(ref string, ts time.Time, reason, message string) { - n.Log(ref, ts, fmt.Sprintf("%s (%s) %s\n", ui.Yellow("warn:"), reason, message)) +func (n *notifier) Event(ref string, ts time.Time, level, reason, message string) { + n.RegisterTimestamp(ref, ts) + color := ui.LightGray + if level != "Normal" { + color = ui.Yellow + } + log := color(fmt.Sprintf("(%s) %s", reason, message)) + n.Raw(ref, ts, fmt.Sprintf("%s %s\n", ts.Format(KubernetesLogTimeFormat), log), level == "Normal") } func (n *notifier) recompute() { diff --git a/pkg/testworkflows/testworkflowcontroller/podstate.go b/pkg/testworkflows/testworkflowcontroller/podstate.go index 050fecc9bcc..d547c874266 100644 --- a/pkg/testworkflows/testworkflowcontroller/podstate.go +++ b/pkg/testworkflows/testworkflowcontroller/podstate.go @@ -3,6 +3,7 @@ package testworkflowcontroller import ( "context" "slices" + "strings" "sync" "time" @@ -11,6 +12,7 @@ import ( corev1 "k8s.io/api/core/v1" "github.com/kubeshop/testkube/internal/common" + "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants" ) const ( @@ -28,6 +30,7 @@ type podState struct { started map[string]time.Time finished map[string]time.Time warnings map[string][]*corev1.Event + events map[string][]*corev1.Event prestart map[string]*channel[podStateUpdate] finishedCh map[string]chan struct{} mu sync.RWMutex @@ -39,6 +42,7 @@ type podStateUpdate struct { Queued *time.Time Started *time.Time Warning *corev1.Event + Event *corev1.Event } func newPodState(parentCtx context.Context) *podState { @@ -48,6 +52,7 @@ func newPodState(parentCtx context.Context) *podState { started: map[string]time.Time{}, finished: map[string]time.Time{}, warnings: map[string][]*corev1.Event{}, + events: map[string][]*corev1.Event{}, prestart: map[string]*channel[podStateUpdate]{}, finishedCh: map[string]chan struct{}{}, ctx: ctx, @@ -167,19 +172,19 @@ func (p *podState) setFinishedAt(name string, ts time.Time) { } } -func (p *podState) unsafeAddWarning(name string, event *corev1.Event) { - if !slices.ContainsFunc(p.warnings[name], common.DeepEqualCmp(event)) { - p.warnings[name] = append(p.warnings[name], event) - p.preStartWatcher(name).Send(podStateUpdate{Warning: event}) +func (p *podState) unsafeAddEvent(name string, event *corev1.Event) { + if !slices.ContainsFunc(p.events[name], common.DeepEqualCmp(event)) { + p.events[name] = append(p.events[name], event) + p.preStartWatcher(name).Send(podStateUpdate{Event: event}) } } -func (p *podState) addWarning(name string, event *corev1.Event) { +func (p *podState) addEvent(name string, event *corev1.Event) { p.mu.Lock() defer p.mu.Unlock() - p.unsafeAddWarning(name, event) + p.unsafeAddEvent(name, event) if name == "" { - p.unsafeAddWarning(InitContainerName, event) + p.unsafeAddEvent(InitContainerName, event) } } @@ -208,8 +213,10 @@ func (p *podState) RegisterEvent(event *corev1.Event) { case "Scheduled", "Started": p.setStartedAt(name, event.CreationTimestamp.Time) } - if event.Type != "Normal" { - p.addWarning(name, event) + if p.StartedAt(name).IsZero() && + event.Reason != "Created" && event.Reason != "SuccessfulCreate" && + (event.Reason != "Pulled" || (!strings.Contains(event.Message, constants.DefaultInitImage) && !strings.Contains(event.Message, constants.DefaultToolkitImage))) { + p.addEvent(name, event) } } diff --git a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go index 564c3cd9688..7e7feafeda7 100644 --- a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go +++ b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go @@ -51,9 +51,9 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf } else if v.Value.Started != nil { s.Queue("", state.QueuedAt("")) s.Start("", state.StartedAt("")) - } else if v.Value.Warning != nil { - ts := maxTime(v.Value.Warning.CreationTimestamp.Time, v.Value.Warning.FirstTimestamp.Time, v.Value.Warning.LastTimestamp.Time) - s.Warning("", ts, v.Value.Warning.Reason, v.Value.Warning.Message) + } else if v.Value.Event != nil { + ts := maxTime(v.Value.Event.CreationTimestamp.Time, v.Value.Event.FirstTimestamp.Time, v.Value.Event.LastTimestamp.Time) + s.Event("", ts, v.Value.Event.Type, v.Value.Event.Reason, v.Value.Event.Message) } } @@ -85,9 +85,9 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf } else if v.Value.Started != nil { s.Queue(ref, state.QueuedAt(ref)) s.Start(ref, state.StartedAt(ref)) - } else if v.Value.Warning != nil { - ts := maxTime(v.Value.Warning.CreationTimestamp.Time, v.Value.Warning.FirstTimestamp.Time, v.Value.Warning.LastTimestamp.Time) - s.Warning("", ts, v.Value.Warning.Reason, v.Value.Warning.Message) + } else if v.Value.Event != nil { + ts := maxTime(v.Value.Event.CreationTimestamp.Time, v.Value.Event.FirstTimestamp.Time, v.Value.Event.LastTimestamp.Time) + s.Event(ref, ts, v.Value.Event.Type, v.Value.Event.Reason, v.Value.Event.Message) } } @@ -132,7 +132,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf s.Resume(ref, end) } } else { - s.Raw(ref, v.Value.Time, string(v.Value.Log)) + s.Raw(ref, v.Value.Time, string(v.Value.Log), false) } } @@ -164,7 +164,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf if status.Details == "" { status.Details = "Manual" } - s.Raw(ref, s.GetLastTimestamp(ref), fmt.Sprintf("\n%s Aborted (%s)", s.GetLastTimestamp(ref).Format(KubernetesLogTimeFormat), status.Details)) + s.Raw(ref, s.GetLastTimestamp(ref), fmt.Sprintf("\n%s Aborted (%s)", s.GetLastTimestamp(ref).Format(KubernetesLogTimeFormat), status.Details), false) break } } diff --git a/pkg/testworkflows/testworkflowexecutor/executor.go b/pkg/testworkflows/testworkflowexecutor/executor.go index 9072f563a04..94c99d2bfec 100644 --- a/pkg/testworkflows/testworkflowexecutor/executor.go +++ b/pkg/testworkflows/testworkflowexecutor/executor.go @@ -210,7 +210,9 @@ func (e *executor) Control(ctx context.Context, testWorkflow *testworkflowsv1.Te continue } if v.Value.Output != nil { - execution.Output = append(execution.Output, *testworkflowcontroller.InstructionToInternal(v.Value.Output)) + if !v.Value.Temporary { + execution.Output = append(execution.Output, *testworkflowcontroller.InstructionToInternal(v.Value.Output)) + } } else if v.Value.Result != nil { execution.Result = v.Value.Result if execution.Result.IsFinished() { @@ -230,8 +232,8 @@ func (e *executor) Control(ctx context.Context, testWorkflow *testworkflowsv1.Te wg.Done() }() wg.Wait() - } else { - if ref != v.Value.Ref { + } else if !v.Value.Temporary { + if ref != v.Value.Ref && v.Value.Ref != "" { ref = v.Value.Ref _, err := writer.Write([]byte(data.SprintHint(ref, initconstants.InstructionStart))) if err != nil {