Skip to content

Commit

Permalink
feat(testworkflows): expose temporary log lines about the container s…
Browse files Browse the repository at this point in the history
…tatus (#5595)

* feat(testworkflows): expose temporary log lines about the container status
* chore(testworkflows): avoid displaying creation events
* chore(testworkflows): avoid displaying unnecessary events
* fix(testworkflows): improve display logs in the CLI
  • Loading branch information
rangoo94 authored Jun 19, 2024
1 parent 4ee4c49 commit 0601447
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 99 deletions.
3 changes: 3 additions & 0 deletions api/v1/testkube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7743,6 +7743,9 @@ components:
description: log content, if it's just a log. note, that it includes 30 chars timestamp + space
output:
$ref: "#/components/schemas/TestWorkflowOutput"
temporary:
type: boolean
description: should it be considered temporary only for execution time

TestWorkflowOutput:
type: object
Expand Down
136 changes: 68 additions & 68 deletions cmd/kubectl-testkube/commands/testworkflows/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"os"
"strings"
"time"
"unicode"

"github.com/spf13/cobra"

"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/common"
"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/common/render"
"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/testworkflows/renderer"
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
apiclientv1 "github.com/kubeshop/testkube/pkg/api/v1/client"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/ui"
Expand Down Expand Up @@ -148,33 +148,35 @@ func flattenSignatures(sig []testkube.TestWorkflowSignature) []testkube.TestWork
return res
}

func printSingleResultDifference(r1 testkube.TestWorkflowStepResult, r2 testkube.TestWorkflowStepResult, signature testkube.TestWorkflowSignature, index int, steps int) bool {
r1Status := testkube.QUEUED_TestWorkflowStepStatus
r2Status := testkube.QUEUED_TestWorkflowStepStatus
if r1.Status != nil {
r1Status = *r1.Status
}
if r2.Status != nil {
r2Status = *r2.Status
}
if r1Status == r2Status {
return false
}
name := signature.Category
if signature.Name != "" {
name = signature.Name
}
took := r2.FinishedAt.Sub(r2.QueuedAt).Round(time.Millisecond)

printStatus(signature, r2Status, took, index, steps, name)
return true
}

func printResultDifference(res1 *testkube.TestWorkflowResult, res2 *testkube.TestWorkflowResult, steps []testkube.TestWorkflowSignature) bool {
if res1 == nil || res2 == nil {
return false
}
changed := false
changed := printSingleResultDifference(*res1.Initialization, *res2.Initialization, testkube.TestWorkflowSignature{Name: "Initializing"}, -1, len(steps))
for i, s := range steps {
r1 := res1.Steps[s.Ref]
r2 := res2.Steps[s.Ref]
r1Status := testkube.QUEUED_TestWorkflowStepStatus
r2Status := testkube.QUEUED_TestWorkflowStepStatus
if r1.Status != nil {
r1Status = *r1.Status
}
if r2.Status != nil {
r2Status = *r2.Status
}
if r1Status == r2Status {
continue
}
name := s.Category
if s.Name != "" {
name = s.Name
}
took := r2.FinishedAt.Sub(r2.QueuedAt).Round(time.Millisecond)
changed = true

printStatus(s, r2Status, took, i, len(steps), name)
changed = changed || printSingleResultDifference(res1.Steps[s.Ref], res2.Steps[s.Ref], s, i, len(steps))
}

return changed
Expand Down Expand Up @@ -204,7 +206,9 @@ func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature
continue
}
if l.Result != nil {
isLineBeginning = printResultDifference(result, l.Result, steps)
if printResultDifference(result, l.Result, steps) {
isLineBeginning = true
}
result = l.Result
continue
}
Expand All @@ -217,11 +221,19 @@ func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature
return result, err
}

func printStatusHeader(i, n int, name string) {
if i == -1 {
fmt.Print(ui.LightCyan(fmt.Sprintf("\n• %s\n", name)))
} else {
fmt.Print(ui.LightCyan(fmt.Sprintf("\n• (%d/%d) %s\n", i+1, n, name)))
}
}

func printStatus(s testkube.TestWorkflowSignature, rStatus testkube.TestWorkflowStepStatus, took time.Duration,
i, n int, name string) {
switch rStatus {
case testkube.RUNNING_TestWorkflowStepStatus:
fmt.Print(ui.LightCyan(fmt.Sprintf("\n• (%d/%d) %s\n", i+1, n, name)))
printStatusHeader(i, n, name)
case testkube.SKIPPED_TestWorkflowStepStatus:
fmt.Print(ui.LightGray("• skipped\n"))
case testkube.PASSED_TestWorkflowStepStatus:
Expand Down Expand Up @@ -259,58 +271,46 @@ func printStructuredLogLines(logs string, isLineBeginning *bool) {

func printRawLogLines(logs string,
steps map[string]testkube.TestWorkflowSignature, results map[string]testkube.TestWorkflowStepResult) {
isLineBeginning := true
previousStep := ""
i := 0
printStatusHeader(-1, len(steps), "Initializing")
// Strip timestamp + space for all new lines in the log
for len(logs) > 0 {
if isLineBeginning {
newLineIndex := strings.Index(logs, "\n")
if newLineIndex >= LogTimestampLength-1 {
logs = logs[getTimestampLength(logs)+1:]
isLineBeginning = false
} else {
if newLineIndex != -1 {
name := logs[:newLineIndex]
cleanName := strings.TrimFunc(name, func(r rune) bool {
return !unicode.IsGraphic(r)
})

cleanName = strings.TrimFunc(strings.TrimSuffix(cleanName, "start"), func(r rune) bool {
return !unicode.IsGraphic(r)
})

if step, ok := steps[cleanName]; ok {
stepName := step.Category
if step.Name != "" {
stepName = step.Name
}

if ps, ok := results[previousStep]; ok && ps.Status != nil {
if step, ok := steps[previousStep]; ok {
took := ps.FinishedAt.Sub(ps.QueuedAt).Round(time.Millisecond)
printStatus(step, *ps.Status, took, i, len(steps), stepName)
}
}

fmt.Print(ui.LightCyan(fmt.Sprintf("\n• %s\n", stepName)))
previousStep = cleanName
i++
}

logs = strings.TrimPrefix(logs, name)
}
}
}

newLineIndex := strings.Index(logs, "\n")
if newLineIndex == -1 {
fmt.Print(logs)
break
}

line := logs[0:newLineIndex]
logs = logs[newLineIndex+1:]

if newLineIndex >= LogTimestampLength-1 {
line = line[getTimestampLength(line)+1:]
}

start := data.StartHintRe.FindStringSubmatch(line)
if len(start) > 0 {
ref := start[1]
if step, ok := steps[ref]; ok {
stepName := step.Category
if step.Name != "" {
stepName = step.Name
}

if ps, ok := results[previousStep]; ok && ps.Status != nil {
if step, ok := steps[previousStep]; ok {
took := ps.FinishedAt.Sub(ps.QueuedAt).Round(time.Millisecond)
printStatus(step, *ps.Status, took, i, len(steps), stepName)
}
}

printStatusHeader(i, len(steps), stepName)
previousStep = ref
i++
}
} else {
fmt.Print(logs[0 : newLineIndex+1])
logs = logs[newLineIndex+1:]
isLineBeginning = true
fmt.Println(line)
}
}

Expand Down
3 changes: 3 additions & 0 deletions cmd/testworkflow-init/data/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ const (
var instructionRe = regexp.MustCompile(fmt.Sprintf(`^%s(%s)?([^%s]+)%s([a-zA-Z0-9-_.]+)(?:%s([^\n]+))?%s$`,
InstructionPrefix, HintPrefix, InstructionSeparator, InstructionSeparator, InstructionValueSeparator, InstructionSeparator))

var StartHintRe = regexp.MustCompile(fmt.Sprintf(`^%s%s([^%s]+)%sstart%s$`,
InstructionPrefix, HintPrefix, InstructionSeparator, InstructionSeparator, InstructionSeparator))

type Instruction struct {
Ref string
Name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ type TestWorkflowExecutionNotification struct {
// log content, if it's just a log. note, that it includes 30 chars timestamp + space
Log string `json:"log,omitempty"`
Output *TestWorkflowOutput `json:"output,omitempty"`
// should it be considered temporary only for execution time
Temporary bool `json:"temporary,omitempty"`
}
4 changes: 2 additions & 2 deletions pkg/testworkflows/testworkflowcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ func (c *controller) Logs(parentCtx context.Context, follow bool) io.Reader {
return
}
for v := range w.Channel() {
if v.Error == nil && v.Value.Log != "" {
if ref != v.Value.Ref {
if v.Error == nil && v.Value.Log != "" && !v.Value.Temporary {
if ref != v.Value.Ref && v.Value.Ref != "" {
ref = v.Value.Ref
_, _ = writer.Write([]byte(data.SprintHint(ref, initconstants.InstructionStart)))
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/testworkflows/testworkflowcontroller/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ type Notification struct {
Ref string `json:"ref,omitempty"`
Log string `json:"log,omitempty"`
Output *data.Instruction `json:"output,omitempty"`
Temporary bool `json:"temporary,omitempty"`
}

func (n *Notification) ToInternal() testkube.TestWorkflowExecutionNotification {
return testkube.TestWorkflowExecutionNotification{
Ts: n.Timestamp,
Result: n.Result,
Ref: n.Ref,
Log: n.Log,
Output: InstructionToInternal(n.Output),
Ts: n.Timestamp,
Result: n.Result,
Ref: n.Ref,
Log: n.Log,
Output: InstructionToInternal(n.Output),
Temporary: n.Temporary,
}
}

Expand Down
15 changes: 11 additions & 4 deletions pkg/testworkflows/testworkflowcontroller/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (n *notifier) RegisterTimestamp(ref string, t time.Time) {
}
}

func (n *notifier) Raw(ref string, ts time.Time, message string) {
func (n *notifier) Raw(ref string, ts time.Time, message string, temporary bool) {
if message != "" {
if ref == InitContainerName {
ref = ""
Expand All @@ -50,23 +50,30 @@ func (n *notifier) Raw(ref string, ts time.Time, message string) {
Timestamp: ts.UTC(),
Log: message,
Ref: ref,
Temporary: temporary,
})
}
}

func (n *notifier) Log(ref string, ts time.Time, message string) {
if message != "" {
n.RegisterTimestamp(ref, ts)
n.Raw(ref, ts, fmt.Sprintf("%s %s", ts.Format(KubernetesLogTimeFormat), message))
n.Raw(ref, ts, fmt.Sprintf("%s %s", ts.Format(KubernetesLogTimeFormat), message), false)
}
}

func (n *notifier) Error(err error) {
n.watcher.Error(err)
}

func (n *notifier) Warning(ref string, ts time.Time, reason, message string) {
n.Log(ref, ts, fmt.Sprintf("%s (%s) %s\n", ui.Yellow("warn:"), reason, message))
func (n *notifier) Event(ref string, ts time.Time, level, reason, message string) {
n.RegisterTimestamp(ref, ts)
color := ui.LightGray
if level != "Normal" {
color = ui.Yellow
}
log := color(fmt.Sprintf("(%s) %s", reason, message))
n.Raw(ref, ts, fmt.Sprintf("%s %s\n", ts.Format(KubernetesLogTimeFormat), log), level == "Normal")
}

func (n *notifier) recompute() {
Expand Down
25 changes: 16 additions & 9 deletions pkg/testworkflows/testworkflowcontroller/podstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package testworkflowcontroller
import (
"context"
"slices"
"strings"
"sync"
"time"

Expand All @@ -11,6 +12,7 @@ import (
corev1 "k8s.io/api/core/v1"

"github.com/kubeshop/testkube/internal/common"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants"
)

const (
Expand All @@ -28,6 +30,7 @@ type podState struct {
started map[string]time.Time
finished map[string]time.Time
warnings map[string][]*corev1.Event
events map[string][]*corev1.Event
prestart map[string]*channel[podStateUpdate]
finishedCh map[string]chan struct{}
mu sync.RWMutex
Expand All @@ -39,6 +42,7 @@ type podStateUpdate struct {
Queued *time.Time
Started *time.Time
Warning *corev1.Event
Event *corev1.Event
}

func newPodState(parentCtx context.Context) *podState {
Expand All @@ -48,6 +52,7 @@ func newPodState(parentCtx context.Context) *podState {
started: map[string]time.Time{},
finished: map[string]time.Time{},
warnings: map[string][]*corev1.Event{},
events: map[string][]*corev1.Event{},
prestart: map[string]*channel[podStateUpdate]{},
finishedCh: map[string]chan struct{}{},
ctx: ctx,
Expand Down Expand Up @@ -167,19 +172,19 @@ func (p *podState) setFinishedAt(name string, ts time.Time) {
}
}

func (p *podState) unsafeAddWarning(name string, event *corev1.Event) {
if !slices.ContainsFunc(p.warnings[name], common.DeepEqualCmp(event)) {
p.warnings[name] = append(p.warnings[name], event)
p.preStartWatcher(name).Send(podStateUpdate{Warning: event})
func (p *podState) unsafeAddEvent(name string, event *corev1.Event) {
if !slices.ContainsFunc(p.events[name], common.DeepEqualCmp(event)) {
p.events[name] = append(p.events[name], event)
p.preStartWatcher(name).Send(podStateUpdate{Event: event})
}
}

func (p *podState) addWarning(name string, event *corev1.Event) {
func (p *podState) addEvent(name string, event *corev1.Event) {
p.mu.Lock()
defer p.mu.Unlock()
p.unsafeAddWarning(name, event)
p.unsafeAddEvent(name, event)
if name == "" {
p.unsafeAddWarning(InitContainerName, event)
p.unsafeAddEvent(InitContainerName, event)
}
}

Expand Down Expand Up @@ -208,8 +213,10 @@ func (p *podState) RegisterEvent(event *corev1.Event) {
case "Scheduled", "Started":
p.setStartedAt(name, event.CreationTimestamp.Time)
}
if event.Type != "Normal" {
p.addWarning(name, event)
if p.StartedAt(name).IsZero() &&
event.Reason != "Created" && event.Reason != "SuccessfulCreate" &&
(event.Reason != "Pulled" || (!strings.Contains(event.Message, constants.DefaultInitImage) && !strings.Contains(event.Message, constants.DefaultToolkitImage))) {
p.addEvent(name, event)
}
}

Expand Down
Loading

0 comments on commit 0601447

Please sign in to comment.