From 492446e82d2b2b2e1cb1d8461a095f4b1f40d945 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Wed, 10 Jul 2024 01:47:30 -0700
Subject: [PATCH 1/6] feat: [TKC-2194] improve workflow execution telemetry
(#5648)
---
pkg/telemetry/payload.go | 25 +-
.../testworkflowexecutor/executor.go | 43 +---
.../testworkflowmetrics.go | 234 ++++++++++++++++++
3 files changed, 259 insertions(+), 43 deletions(-)
create mode 100644 pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go
diff --git a/pkg/telemetry/payload.go b/pkg/telemetry/payload.go
index ece02f9a3a..84b113212c 100644
--- a/pkg/telemetry/payload.go
+++ b/pkg/telemetry/payload.go
@@ -34,9 +34,16 @@ type Params struct {
ErrorType string `json:"error_type,omitempty"`
ErrorStackTrace string `json:"error_stacktrace,omitempty"`
TestWorkflowSteps int32 `json:"test_workflow_steps,omitempty"`
+ TestWorkflowExecuteCount int32 `json:"test_workflow_execute_count,omitempty"`
+ TestWorkflowParallelUsed bool `json:"test_workflow_parallel_used,omitempty"`
+ TestWorkflowMatrixUsed bool `json:"test_workflow_matrix_used,omitempty"`
+ TestWorkflowServicesUsed bool `json:"test_workflow_services_used,omitempty"`
+ TestWorkflowIsSample bool `json:"test_workflow_is_sample,omitempty"`
+ TestWorkflowTemplates []string `json:"testWorkflowTemplates"`
+ TestWorkflowImages []string `json:"testWorkflowImages"`
TestWorkflowTemplateUsed bool `json:"test_workflow_template_used,omitempty"`
- TestWorkflowImage string `json:"test_workflow_image,omitempty"`
TestWorkflowArtifactUsed bool `json:"test_workflow_artifact_used,omitempty"`
+ TestWorkflowImage string `json:"test_workflow_image,omitempty"`
TestWorkflowKubeshopGitURI bool `json:"test_workflow_kubeshop_git_uri,omitempty"`
License string `json:"license,omitempty"`
Step string `json:"step,omitempty"`
@@ -84,9 +91,16 @@ type RunContext struct {
type WorkflowParams struct {
TestWorkflowSteps int32
- TestWorkflowTemplateUsed bool
+ TestWorkflowExecuteCount int32
TestWorkflowImage string
TestWorkflowArtifactUsed bool
+ TestWorkflowParallelUsed bool
+ TestWorkflowMatrixUsed bool
+ TestWorkflowServicesUsed bool
+ TestWorkflowTemplateUsed bool
+ TestWorkflowIsSample bool
+ TestWorkflowTemplates []string
+ TestWorkflowImages []string
TestWorkflowKubeshopGitURI bool
}
@@ -290,7 +304,14 @@ func NewRunWorkflowPayload(name, clusterType string, params RunWorkflowParams) P
ClusterType: clusterType,
Context: getAgentContext(),
TestWorkflowSteps: params.TestWorkflowSteps,
+ TestWorkflowExecuteCount: params.TestWorkflowExecuteCount,
+ TestWorkflowParallelUsed: params.TestWorkflowParallelUsed,
TestWorkflowTemplateUsed: params.TestWorkflowTemplateUsed,
+ TestWorkflowMatrixUsed: params.TestWorkflowMatrixUsed,
+ TestWorkflowServicesUsed: params.TestWorkflowServicesUsed,
+ TestWorkflowIsSample: params.TestWorkflowIsSample,
+ TestWorkflowTemplates: params.TestWorkflowTemplates,
+ TestWorkflowImages: params.TestWorkflowImages,
TestWorkflowImage: params.TestWorkflowImage,
TestWorkflowArtifactUsed: params.TestWorkflowArtifactUsed,
TestWorkflowKubeshopGitURI: params.TestWorkflowKubeshopGitURI,
diff --git a/pkg/testworkflows/testworkflowexecutor/executor.go b/pkg/testworkflows/testworkflowexecutor/executor.go
index 3dea3b6d0f..d3f00502c1 100644
--- a/pkg/testworkflows/testworkflowexecutor/executor.go
+++ b/pkg/testworkflows/testworkflowexecutor/executor.go
@@ -29,13 +29,10 @@ import (
configRepo "github.com/kubeshop/testkube/pkg/repository/config"
"github.com/kubeshop/testkube/pkg/repository/result"
"github.com/kubeshop/testkube/pkg/repository/testworkflow"
- "github.com/kubeshop/testkube/pkg/telemetry"
- "github.com/kubeshop/testkube/pkg/testworkflows"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowcontroller"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowresolver"
- "github.com/kubeshop/testkube/pkg/version"
)
//go:generate mockgen -destination=./mock_executor.go -package=testworkflowexecutor "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowexecutor" TestWorkflowExecutor
@@ -294,6 +291,8 @@ func (e *executor) Control(ctx context.Context, testWorkflow *testworkflowsv1.Te
// TODO: Consider AppendOutput ($push) instead
_ = e.repository.UpdateOutput(ctx, execution.Id, execution.Output)
if execution.Result.IsFinished() {
+ e.sendRunWorkflowTelemetry(ctx, testWorkflow, execution)
+
if execution.Result.IsPassed() {
e.emitter.Notify(testkube.NewEventEndTestWorkflowSuccess(execution))
} else if execution.Result.IsAborted() {
@@ -537,8 +536,6 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor
return execution, errors.Wrap(err, "deploying required resources")
}
- e.sendRunWorkflowTelemetry(ctx, &workflow)
-
// Start to control the results
go func() {
err = e.Control(context.Background(), initialWorkflow, &execution)
@@ -550,39 +547,3 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor
return execution, nil
}
-
-func (e *executor) sendRunWorkflowTelemetry(ctx context.Context, workflow *testworkflowsv1.TestWorkflow) {
- if workflow == nil {
- log.DefaultLogger.Debug("empty workflow passed to telemetry event")
- return
- }
- telemetryEnabled, err := e.configMap.GetTelemetryEnabled(ctx)
- if err != nil {
- log.DefaultLogger.Debugf("getting telemetry enabled error", "error", err)
- }
- if !telemetryEnabled {
- return
- }
-
- out, err := telemetry.SendRunWorkflowEvent("testkube_api_run_test_workflow", telemetry.RunWorkflowParams{
- RunParams: telemetry.RunParams{
- AppVersion: version.Version,
- DataSource: testworkflows.GetDataSource(workflow.Spec.Content),
- Host: testworkflows.GetHostname(),
- ClusterID: testworkflows.GetClusterID(ctx, e.configMap),
- },
- WorkflowParams: telemetry.WorkflowParams{
- TestWorkflowSteps: int32(len(workflow.Spec.Setup) + len(workflow.Spec.Steps) + len(workflow.Spec.After)),
- TestWorkflowImage: testworkflows.GetImage(workflow.Spec.Container),
- TestWorkflowArtifactUsed: testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasArtifacts),
- TestWorkflowKubeshopGitURI: testworkflows.IsKubeshopGitURI(workflow.Spec.Content) ||
- testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasKubeshopGitURI),
- },
- })
-
- if err != nil {
- log.DefaultLogger.Debugf("sending run test workflow telemetry event error", "error", err)
- } else {
- log.DefaultLogger.Debugf("sending run test workflow telemetry event", "output", out)
- }
-}
diff --git a/pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go b/pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go
new file mode 100644
index 0000000000..dae361f581
--- /dev/null
+++ b/pkg/testworkflows/testworkflowexecutor/testworkflowmetrics.go
@@ -0,0 +1,234 @@
+package testworkflowexecutor
+
+import (
+ "context"
+ "strings"
+
+ testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1"
+
+ "github.com/kubeshop/testkube/pkg/api/v1/testkube"
+ "github.com/kubeshop/testkube/pkg/log"
+ "github.com/kubeshop/testkube/pkg/telemetry"
+ "github.com/kubeshop/testkube/pkg/testworkflows"
+ "github.com/kubeshop/testkube/pkg/version"
+)
+
+type stepStats struct {
+ numSteps int
+ numExecute int
+ hasArtifacts bool
+ hasMatrix bool
+ hasParallel bool
+ hasTemplate bool
+ hasServices bool
+ imagesUsed map[string]struct{}
+ templatesUsed map[string]struct{}
+}
+
+func (ss *stepStats) Merge(stats *stepStats) {
+ ss.numSteps += stats.numSteps
+ ss.numExecute += stats.numExecute
+
+ if stats.hasArtifacts {
+ ss.hasArtifacts = true
+ }
+ if stats.hasMatrix {
+ ss.hasMatrix = true
+ }
+ if stats.hasParallel {
+ ss.hasParallel = true
+ }
+ if stats.hasServices {
+ ss.hasServices = true
+ }
+ if stats.hasTemplate {
+ ss.hasTemplate = true
+ }
+ for image := range stats.imagesUsed {
+ ss.imagesUsed[image] = struct{}{}
+ }
+ for tmpl := range stats.templatesUsed {
+ ss.templatesUsed[tmpl] = struct{}{}
+ }
+}
+
+func getStepInfo(step testworkflowsv1.Step) *stepStats {
+ res := &stepStats{
+ imagesUsed: make(map[string]struct{}),
+ templatesUsed: make(map[string]struct{}),
+ }
+ if step.Execute != nil {
+ res.numExecute++
+ }
+ if step.Artifacts != nil {
+ res.hasArtifacts = true
+ }
+ if len(step.Use) > 0 {
+ res.hasTemplate = true
+ for _, tmpl := range step.Use {
+ res.templatesUsed[tmpl.Name] = struct{}{}
+ }
+ }
+ if step.Template != nil {
+ res.hasTemplate = true
+ res.templatesUsed[step.Template.Name] = struct{}{}
+ }
+ if len(step.Services) > 0 {
+ res.hasServices = true
+ }
+
+ if step.Run != nil && step.Run.Image != "" {
+ res.imagesUsed[step.Run.Image] = struct{}{}
+ }
+ if step.Container != nil && step.Container.Image != "" {
+ res.imagesUsed[step.Container.Image] = struct{}{}
+ }
+
+ for _, step := range step.Steps {
+ res.Merge(getStepInfo(step))
+ }
+
+ if step.Parallel != nil {
+ res.hasParallel = true
+
+ if len(step.Parallel.Matrix) != 0 {
+ res.hasMatrix = true
+ }
+ if step.Parallel.Artifacts != nil {
+ res.hasArtifacts = true
+ }
+ if step.Parallel.Execute != nil {
+ res.numExecute++
+ }
+ if len(step.Parallel.Use) > 0 {
+ res.hasTemplate = true
+ for _, tmpl := range step.Parallel.Use {
+ res.templatesUsed[tmpl.Name] = struct{}{}
+ }
+ }
+ if step.Parallel.Template != nil {
+ res.hasTemplate = true
+ res.templatesUsed[step.Parallel.Template.Name] = struct{}{}
+ }
+
+ if len(step.Parallel.Services) > 0 {
+ res.hasServices = true
+ }
+
+ if step.Parallel.Run != nil && step.Parallel.Run.Image != "" {
+ res.imagesUsed[step.Parallel.Run.Image] = struct{}{}
+ }
+ if step.Parallel.Container != nil && step.Parallel.Container.Image != "" {
+ res.imagesUsed[step.Parallel.Container.Image] = struct{}{}
+ }
+
+ for _, step := range step.Parallel.Steps {
+ res.Merge(getStepInfo(step))
+ }
+ }
+
+ return res
+}
+
+func (e *executor) sendRunWorkflowTelemetry(ctx context.Context, workflow *testworkflowsv1.TestWorkflow, execution *testkube.TestWorkflowExecution) {
+ if workflow == nil {
+ log.DefaultLogger.Debug("empty workflow passed to telemetry event")
+ return
+ }
+ telemetryEnabled, err := e.configMap.GetTelemetryEnabled(ctx)
+ if err != nil {
+ log.DefaultLogger.Debugf("getting telemetry enabled error", "error", err)
+ }
+ if !telemetryEnabled {
+ return
+ }
+
+ properties := make(map[string]any)
+ properties["name"] = workflow.Name
+ stats := stepStats{
+ imagesUsed: make(map[string]struct{}),
+ templatesUsed: make(map[string]struct{}),
+ }
+
+ var isSample bool
+ if workflow.Labels != nil && workflow.Labels["docs"] == "example" && strings.HasSuffix(workflow.Name, "-sample") {
+ isSample = true
+ } else {
+ isSample = false
+ }
+
+ spec := workflow.Spec
+ for _, step := range spec.Steps {
+ stats.Merge(getStepInfo(step))
+ }
+ if spec.Container != nil {
+ stats.imagesUsed[spec.Container.Image] = struct{}{}
+ }
+ if len(spec.Services) != 0 {
+ stats.hasServices = true
+ }
+ if len(spec.Use) > 0 {
+ stats.hasTemplate = true
+ for _, tmpl := range spec.Use {
+ stats.templatesUsed[tmpl.Name] = struct{}{}
+ }
+ }
+
+ var images []string
+ for image := range stats.imagesUsed {
+ if image == "" {
+ continue
+ }
+ images = append(images, image)
+ }
+
+ var templates []string
+ for t := range stats.templatesUsed {
+ if t == "" {
+ continue
+ }
+ templates = append(templates, t)
+ }
+ var (
+ status string
+ durationMs int32
+ )
+ if execution.Result != nil {
+ if execution.Result.Status != nil {
+ status = string(*execution.Result.Status)
+ }
+ durationMs = execution.Result.DurationMs
+ }
+
+ out, err := telemetry.SendRunWorkflowEvent("testkube_api_run_test_workflow", telemetry.RunWorkflowParams{
+ RunParams: telemetry.RunParams{
+ AppVersion: version.Version,
+ DataSource: testworkflows.GetDataSource(workflow.Spec.Content),
+ Host: testworkflows.GetHostname(),
+ ClusterID: testworkflows.GetClusterID(ctx, e.configMap),
+ DurationMs: durationMs,
+ Status: status,
+ },
+ WorkflowParams: telemetry.WorkflowParams{
+ TestWorkflowSteps: int32(stats.numSteps),
+ TestWorkflowExecuteCount: int32(stats.numExecute),
+ TestWorkflowImage: testworkflows.GetImage(workflow.Spec.Container),
+ TestWorkflowArtifactUsed: stats.hasArtifacts,
+ TestWorkflowParallelUsed: stats.hasParallel,
+ TestWorkflowMatrixUsed: stats.hasMatrix,
+ TestWorkflowServicesUsed: stats.hasServices,
+ TestWorkflowTemplateUsed: stats.hasTemplate,
+ TestWorkflowIsSample: isSample,
+ TestWorkflowTemplates: templates,
+ TestWorkflowImages: images,
+ TestWorkflowKubeshopGitURI: testworkflows.IsKubeshopGitURI(workflow.Spec.Content) ||
+ testworkflows.HasWorkflowStepLike(workflow.Spec, testworkflows.HasKubeshopGitURI),
+ },
+ })
+
+ if err != nil {
+ log.DefaultLogger.Debugf("sending run test workflow telemetry event error", "error", err)
+ } else {
+ log.DefaultLogger.Debugf("sending run test workflow telemetry event", "output", out)
+ }
+}
From 40e4ee96658f8cbe5057526800c734b3edb1d3ff Mon Sep 17 00:00:00 2001
From: Dejan Zele Pejchev
Date: Wed, 10 Jul 2024 11:02:34 +0200
Subject: [PATCH 2/6] TKC-2161: skip tls verification when saving logs if
STORAGE_SKIP_VERIFY is true (#5627)
---
cmd/api-server/main.go | 6 +++++-
pkg/cloud/data/testworkflow/output.go | 25 ++++++++++++++++++++-----
2 files changed, 25 insertions(+), 6 deletions(-)
diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go
index 69eb4a57c8..73bbf58d5a 100644
--- a/cmd/api-server/main.go
+++ b/cmd/api-server/main.go
@@ -273,7 +273,11 @@ func main() {
configRepository = cloudconfig.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
// Pro edition only (tcl protected code)
testWorkflowResultsRepository = cloudtestworkflow.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
- testWorkflowOutputRepository = cloudtestworkflow.NewCloudOutputRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
+ var opts []cloudtestworkflow.Option
+ if cfg.StorageSkipVerify {
+ opts = append(opts, cloudtestworkflow.WithSkipVerify())
+ }
+ testWorkflowOutputRepository = cloudtestworkflow.NewCloudOutputRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, opts...)
triggerLeaseBackend = triggers.NewAcquireAlwaysLeaseBackend()
artifactStorage = cloudartifacts.NewCloudArtifactsStorage(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
} else {
diff --git a/pkg/cloud/data/testworkflow/output.go b/pkg/cloud/data/testworkflow/output.go
index 55102d42ea..5848b3e0e0 100644
--- a/pkg/cloud/data/testworkflow/output.go
+++ b/pkg/cloud/data/testworkflow/output.go
@@ -3,6 +3,7 @@ package testworkflow
import (
"bytes"
"context"
+ "crypto/tls"
"io"
"net/http"
@@ -18,11 +19,25 @@ import (
var _ testworkflow.OutputRepository = (*CloudOutputRepository)(nil)
type CloudOutputRepository struct {
- executor executor.Executor
+ executor executor.Executor
+ httpClient *http.Client
}
-func NewCloudOutputRepository(client cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey string) *CloudOutputRepository {
- return &CloudOutputRepository{executor: executor.NewCloudGRPCExecutor(client, grpcConn, apiKey)}
+type Option func(*CloudOutputRepository)
+
+func WithSkipVerify() Option {
+ return func(r *CloudOutputRepository) {
+ transport := &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
+ r.httpClient.Transport = transport
+ }
+}
+
+func NewCloudOutputRepository(client cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey string, opts ...Option) *CloudOutputRepository {
+ r := &CloudOutputRepository{executor: executor.NewCloudGRPCExecutor(client, grpcConn, apiKey), httpClient: http.DefaultClient}
+ for _, opt := range opts {
+ opt(r)
+ }
+ return r
}
// PresignSaveLog builds presigned storage URL to save the output in Cloud
@@ -59,7 +74,7 @@ func (r *CloudOutputRepository) SaveLog(ctx context.Context, id, workflowName st
if err != nil {
return err
}
- res, err := http.DefaultClient.Do(req)
+ res, err := r.httpClient.Do(req)
if err != nil {
return errors.Wrap(err, "failed to save file in cloud storage")
}
@@ -79,7 +94,7 @@ func (r *CloudOutputRepository) ReadLog(ctx context.Context, id, workflowName st
if err != nil {
return nil, err
}
- res, err := http.DefaultClient.Do(req)
+ res, err := r.httpClient.Do(req)
if err != nil {
return nil, errors.Wrap(err, "failed to get file from cloud storage")
}
From b7f81186c5fe98b49dde56807c68ae0a2f8ea3db Mon Sep 17 00:00:00 2001
From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com>
Date: Wed, 10 Jul 2024 14:11:51 +0300
Subject: [PATCH 3/6] build(deps): bump anchore/sbom-action from 0.16.0 to
0.16.1 (#5649)
Bumps [anchore/sbom-action](https://github.com/anchore/sbom-action) from 0.16.0 to 0.16.1.
- [Release notes](https://github.com/anchore/sbom-action/releases)
- [Commits](https://github.com/anchore/sbom-action/compare/v0.16.0...v0.16.1)
---
updated-dependencies:
- dependency-name: anchore/sbom-action
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot]
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
---
.../docker-build-api-executors-tag.yaml | 22 +++++++++----------
.github/workflows/release-dev.yaml | 2 +-
.github/workflows/release.yaml | 2 +-
3 files changed, 13 insertions(+), 13 deletions(-)
diff --git a/.github/workflows/docker-build-api-executors-tag.yaml b/.github/workflows/docker-build-api-executors-tag.yaml
index f933db83cb..863162ab79 100644
--- a/.github/workflows/docker-build-api-executors-tag.yaml
+++ b/.github/workflows/docker-build-api-executors-tag.yaml
@@ -19,7 +19,7 @@ jobs:
uses: actions/checkout@v4
- uses: sigstore/cosign-installer@v3.5.0
- - uses: anchore/sbom-action/download-syft@v0.16.0
+ - uses: anchore/sbom-action/download-syft@v0.16.1
- name: Set up Docker Buildx
id: buildx
@@ -94,7 +94,7 @@ jobs:
uses: actions/checkout@v4
- uses: sigstore/cosign-installer@v3.5.0
- - uses: anchore/sbom-action/download-syft@v0.16.0
+ - uses: anchore/sbom-action/download-syft@v0.16.1
- name: Set up Docker Buildx
id: buildx
@@ -178,7 +178,7 @@ jobs:
uses: actions/checkout@v4
- uses: sigstore/cosign-installer@v3.5.0
- - uses: anchore/sbom-action/download-syft@v0.16.0
+ - uses: anchore/sbom-action/download-syft@v0.16.1
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
@@ -249,7 +249,7 @@ jobs:
uses: actions/checkout@v4
- uses: sigstore/cosign-installer@v3.5.0
- - uses: anchore/sbom-action/download-syft@v0.16.0
+ - uses: anchore/sbom-action/download-syft@v0.16.1
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
@@ -317,7 +317,7 @@ jobs:
uses: actions/checkout@v4
- uses: sigstore/cosign-installer@v3.5.0
- - uses: anchore/sbom-action/download-syft@v0.16.0
+ - uses: anchore/sbom-action/download-syft@v0.16.1
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
@@ -442,7 +442,7 @@ jobs:
uses: actions/checkout@v4
- uses: sigstore/cosign-installer@v3.5.0
- - uses: anchore/sbom-action/download-syft@v0.16.0
+ - uses: anchore/sbom-action/download-syft@v0.16.1
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
@@ -510,7 +510,7 @@ jobs:
uses: actions/checkout@v4
- uses: sigstore/cosign-installer@v3.5.0
- - uses: anchore/sbom-action/download-syft@v0.16.0
+ - uses: anchore/sbom-action/download-syft@v0.16.1
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
@@ -669,7 +669,7 @@ jobs:
uses: actions/checkout@v4
- uses: sigstore/cosign-installer@v3.5.0
- - uses: anchore/sbom-action/download-syft@v0.16.0
+ - uses: anchore/sbom-action/download-syft@v0.16.1
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
@@ -741,7 +741,7 @@ jobs:
uses: docker/setup-qemu-action@v3
- uses: sigstore/cosign-installer@v3.5.0
- - uses: anchore/sbom-action/download-syft@v0.16.0
+ - uses: anchore/sbom-action/download-syft@v0.16.1
- name: Set up Docker Buildx
id: buildx
@@ -790,7 +790,7 @@ jobs:
uses: actions/checkout@v4
- uses: sigstore/cosign-installer@v3.5.0
- - uses: anchore/sbom-action/download-syft@v0.16.0
+ - uses: anchore/sbom-action/download-syft@v0.16.1
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
@@ -863,7 +863,7 @@ jobs:
fetch-depth: 0
- uses: sigstore/cosign-installer@v3.5.0
- - uses: anchore/sbom-action/download-syft@v0.16.0
+ - uses: anchore/sbom-action/download-syft@v0.16.1
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
diff --git a/.github/workflows/release-dev.yaml b/.github/workflows/release-dev.yaml
index 596b21e2ea..f51de24cec 100644
--- a/.github/workflows/release-dev.yaml
+++ b/.github/workflows/release-dev.yaml
@@ -131,7 +131,7 @@ jobs:
with:
fetch-depth: 0
- uses: sigstore/cosign-installer@v3.5.0
- - uses: anchore/sbom-action/download-syft@v0.16.0
+ - uses: anchore/sbom-action/download-syft@v0.16.1
- name: Download Artifacts for Linux
uses: actions/download-artifact@master
with:
diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml
index 0e539be951..1682a69ee2 100644
--- a/.github/workflows/release.yaml
+++ b/.github/workflows/release.yaml
@@ -121,7 +121,7 @@ jobs:
with:
fetch-depth: 0
- uses: sigstore/cosign-installer@v3.5.0
- - uses: anchore/sbom-action/download-syft@v0.16.0
+ - uses: anchore/sbom-action/download-syft@v0.16.1
- name: Download Artifacts for Linux
uses: actions/download-artifact@master
with:
From 14437375c69d328bfbc37bb5791cd688e09c5868 Mon Sep 17 00:00:00 2001
From: Dawid Rusnak
Date: Wed, 10 Jul 2024 15:24:20 +0200
Subject: [PATCH 4/6] feat(performance): improve Log Processing performance
(#5647)
* feat(performance): buffer the logs sent from the container, to avoid sending message for each line
* feat(performance): batch Test Workflow's result updates
* fix(testworkflows): handle getting long container logs after the log rotation happens
@see {@link https://stackoverflow.com/a/68673451}
* feat(testworkflows): optimize reading timestamp from Kubernetes logs
* feat(testworkflows): optimize buffering logs
* feat(testworkflows): use native channel instead of heavier Channels for WatchInstrumentedPod
* feat(testworkflows): increase buffer size for logs buffering
---
.../testworkflowcontroller/channel.go | 5 +
.../testworkflowcontroller/controller.go | 8 +-
.../testworkflowcontroller/logs.go | 361 +++++++++++++++---
.../testworkflowcontroller/logs_test.go | 48 ++-
.../testworkflowcontroller/notifier.go | 106 ++++-
.../testworkflowcontroller/utils.go | 2 +-
.../watchinstrumentedpod.go | 11 +-
.../testworkflowexecutor/executor.go | 2 +-
8 files changed, 463 insertions(+), 80 deletions(-)
diff --git a/pkg/testworkflows/testworkflowcontroller/channel.go b/pkg/testworkflows/testworkflowcontroller/channel.go
index e9da67db0f..00702e80c0 100644
--- a/pkg/testworkflows/testworkflowcontroller/channel.go
+++ b/pkg/testworkflows/testworkflowcontroller/channel.go
@@ -20,6 +20,7 @@ type Channel[T any] interface {
Channel() <-chan ChannelMessage[T]
Close()
Done() <-chan struct{}
+ CtxErr() error
}
type channel[T any] struct {
@@ -168,3 +169,7 @@ func (c *channel[T]) Close() {
func (c *channel[T]) Done() <-chan struct{} {
return c.ctx.Done()
}
+
+func (c *channel[T]) CtxErr() error {
+ return c.ctx.Err()
+}
diff --git a/pkg/testworkflows/testworkflowcontroller/controller.go b/pkg/testworkflows/testworkflowcontroller/controller.go
index bc074e216e..ef7b2cbc52 100644
--- a/pkg/testworkflows/testworkflowcontroller/controller.go
+++ b/pkg/testworkflows/testworkflowcontroller/controller.go
@@ -212,7 +212,7 @@ func (c *controller) StopController() {
}
func (c *controller) Watch(parentCtx context.Context) <-chan ChannelMessage[Notification] {
- w, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{
+ ch, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{
JobEvents: c.jobEvents,
Job: c.job,
})
@@ -222,7 +222,7 @@ func (c *controller) Watch(parentCtx context.Context) <-chan ChannelMessage[Noti
v.Close()
return v.Channel()
}
- return w.Channel()
+ return ch
}
// TODO: Make it actually light
@@ -281,7 +281,7 @@ func (c *controller) Logs(parentCtx context.Context, follow bool) io.Reader {
case <-c.podEvents.Peek(parentCtx):
case <-alignTimeoutCh:
}
- w, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{
+ ch, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{
JobEvents: c.jobEvents,
Job: c.job,
Follow: common.Ptr(follow),
@@ -289,7 +289,7 @@ func (c *controller) Logs(parentCtx context.Context, follow bool) io.Reader {
if err != nil {
return
}
- for v := range w.Channel() {
+ for v := range ch {
if v.Error == nil && v.Value.Log != "" && !v.Value.Temporary {
if ref != v.Value.Ref && v.Value.Ref != "" {
ref = v.Value.Ref
diff --git a/pkg/testworkflows/testworkflowcontroller/logs.go b/pkg/testworkflows/testworkflowcontroller/logs.go
index 15d9072d19..7fd97ef4bd 100644
--- a/pkg/testworkflows/testworkflowcontroller/logs.go
+++ b/pkg/testworkflows/testworkflowcontroller/logs.go
@@ -2,20 +2,31 @@ package testworkflowcontroller
import (
"bufio"
+ "bytes"
"context"
- "errors"
"io"
"strings"
+ "sync"
"time"
+ "unsafe"
- errors2 "github.com/pkg/errors"
+ "github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
+ "github.com/kubeshop/testkube/internal/common"
+ "github.com/kubeshop/testkube/pkg/log"
"github.com/kubeshop/testkube/pkg/utils"
)
+const (
+ FlushLogMaxSize = 100_000
+ FlushBufferSize = 65_536
+ FlushLogTime = 100 * time.Millisecond
+)
+
type Comment struct {
Time time.Time
Hint *data.Instruction
@@ -29,63 +40,220 @@ type ContainerLog struct {
Output *data.Instruction
}
-func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, follow bool, pod Channel[*corev1.Pod]) Channel[ContainerLog] {
+// getContainerLogsStream is getting logs stream, and tries to reinitialize the stream on EOF.
+// EOF may happen not only on the actual container end, but also in case of the log rotation.
+// @see {@link https://stackoverflow.com/a/68673451}
+func getContainerLogsStream(ctx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, pod Channel[*corev1.Pod], since *time.Time) (io.Reader, error) {
+ // Fail immediately if the context is finished
+ if ctx.Err() != nil {
+ return nil, ctx.Err()
+ }
+
+ // Build Kubernetes structure for time
+ var sinceTime *metav1.Time
+ if since != nil {
+ sinceTime = &metav1.Time{Time: *since}
+ }
+
+ // Create logs stream request
+ req := clientSet.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
+ Container: containerName,
+ Follow: true,
+ Timestamps: true,
+ SinceTime: sinceTime,
+ })
+ var err error
+ var stream io.ReadCloser
+ for {
+ stream, err = req.Stream(ctx)
+ if err != nil {
+ // The container is not necessarily already started when Started event is received
+ if !strings.Contains(err.Error(), "is waiting to start") {
+ return nil, err
+ }
+ p := <-pod.Peek(ctx)
+ if p == nil {
+ return bytes.NewReader(nil), io.EOF
+ }
+ containerDone := IsPodDone(p)
+ for i := range p.Status.InitContainerStatuses {
+ if p.Status.InitContainerStatuses[i].Name == containerName {
+ if p.Status.InitContainerStatuses[i].State.Terminated != nil {
+ containerDone = true
+ break
+ }
+ }
+ }
+ for i := range p.Status.ContainerStatuses {
+ if p.Status.ContainerStatuses[i].Name == containerName {
+ if p.Status.ContainerStatuses[i].State.Terminated != nil {
+ containerDone = true
+ break
+ }
+ }
+ }
+
+ if containerDone {
+ return bytes.NewReader(nil), io.EOF
+ }
+ continue
+ }
+ break
+ }
+ return stream, nil
+}
+
+func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, pod Channel[*corev1.Pod]) Channel[ContainerLog] {
+ ctx, ctxCancel := context.WithCancel(parentCtx)
w := newChannel[ContainerLog](ctx, bufferSize)
go func() {
- defer w.Close()
+ <-w.Done()
+ ctxCancel()
+ }()
+
+ go func() {
+ defer ctxCancel()
var err error
+ var since *time.Time
+
// Create logs stream request
- req := clientSet.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
- Follow: follow,
- Timestamps: true,
- Container: containerName,
- })
- var stream io.ReadCloser
- for {
- stream, err = req.Stream(ctx)
+ stream, err := getContainerLogsStream(ctx, clientSet, namespace, podName, containerName, pod, since)
+ hadAnyContent := false
+ if err == io.EOF {
+ return
+ } else if err != nil {
+ w.Error(err)
+ return
+ }
+
+ // Build a buffer for logs to avoid scheduling Log notification for each write
+ var logBufferLog bytes.Buffer
+ var logBufferTs time.Time
+ var logBufferMu sync.Mutex
+ var logBufferCh = make(chan struct{}, 1)
+ unsafeFlushLogBuffer := func() {
+ if logBufferLog.Len() == 0 || w.CtxErr() != nil {
+ return
+ }
+ message := make([]byte, logBufferLog.Len())
+ _, err := logBufferLog.Read(message)
if err != nil {
- // The container is not necessarily already started when Started event is received
- if !strings.Contains(err.Error(), "is waiting to start") {
- w.Error(err)
- return
- }
- p := <-pod.Peek(ctx)
- if p != nil && IsPodDone(p) {
- w.Error(errors.New("pod is finished and there are no logs for this container"))
+ log.DefaultLogger.Errorf("failed to read log buffer: %s/%s", podName, containerName)
+ return
+ }
+ w.Send(ContainerLog{Time: logBufferTs, Log: message})
+ }
+ flushLogBuffer := func() {
+ logBufferMu.Lock()
+ defer logBufferMu.Unlock()
+ unsafeFlushLogBuffer()
+ }
+ appendLog := func(ts time.Time, log ...[]byte) {
+ logBufferMu.Lock()
+ defer logBufferMu.Unlock()
+
+ initialLogLen := logBufferLog.Len()
+ if initialLogLen == 0 {
+ logBufferTs = ts
+ }
+ for i := range log {
+ logBufferLog.Write(log[i])
+ }
+
+ finalLogLen := logBufferLog.Len()
+ flushable := finalLogLen > FlushLogMaxSize
+ if flushable {
+ unsafeFlushLogBuffer()
+ }
+
+ // Inform the flushing worker about a new log to flush.
+ // Do it only when it's not scheduled
+ if initialLogLen == 0 || flushable {
+ select {
+ case logBufferCh <- struct{}{}:
+ default:
}
- continue
}
- break
}
+ // Flush the log automatically after 100ms
+ bufferCtx, bufferCtxCancel := context.WithCancel(ctx)
+ defer bufferCtxCancel()
go func() {
- <-w.Done()
- _ = stream.Close()
+ t := time.NewTimer(FlushLogTime)
+ for {
+ t.Stop()
+
+ if bufferCtx.Err() != nil {
+ return
+ }
+
+ logLen := logBufferLog.Len()
+ if logLen == 0 {
+ select {
+ case <-bufferCtx.Done():
+ return
+ case <-logBufferCh:
+ continue
+ }
+ }
+
+ t.Reset(FlushLogTime)
+ select {
+ case <-bufferCtx.Done():
+ if !t.Stop() {
+ <-t.C
+ }
+ return
+ case <-t.C:
+ flushLogBuffer()
+ case <-logBufferCh:
+ continue
+ }
+ }
}()
+ // Flush the rest of logs if it is closed
+ defer flushLogBuffer()
+
// Parse and return the logs
- reader := bufio.NewReader(stream)
- var tsPrefix, tmpTsPrefix []byte
+ reader := bufio.NewReaderSize(stream, FlushBufferSize)
+ tsReader := newTimestampReader()
isNewLine := false
isStarted := false
- var ts, tmpTs time.Time
for {
var prepend []byte
// Read next timestamp
- tmpTs, tmpTsPrefix, err = ReadTimestamp(reader)
+ err = tsReader.Read(reader)
if err == nil {
- ts = tmpTs
- tsPrefix = tmpTsPrefix
+ // Strip older logs - SinceTime in Kubernetes logs is ignoring milliseconds precision
+ if since != nil && since.After(tsReader.ts) {
+ _, _ = utils.ReadLongLine(reader)
+ continue
+ }
+ hadAnyContent = true
} else if err == io.EOF {
- return
+ if !hadAnyContent {
+ return
+ }
+ // Reinitialize logs stream
+ since = common.Ptr(tsReader.ts.Add(1))
+ stream, err = getContainerLogsStream(ctx, clientSet, namespace, podName, containerName, pod, since)
+ if err != nil {
+ return
+ }
+ reader.Reset(stream)
+ hadAnyContent = false
+ continue
} else {
// Edge case: Kubernetes may send critical errors without timestamp (like ionotify)
- if len(tmpTsPrefix) > 0 {
- prepend = tmpTsPrefix
+ if len(tsReader.Prefix()) > 0 {
+ prepend = bytes.Clone(tsReader.Prefix())
}
+ flushLogBuffer()
w.Error(err)
}
@@ -104,33 +272,34 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam
if err == nil && instruction != nil {
isNewLine = false
hadComment = true
- log := ContainerLog{Time: ts}
+ log := ContainerLog{Time: tsReader.ts}
if isHint {
log.Hint = instruction
} else {
log.Output = instruction
}
+ flushLogBuffer()
w.Send(log)
}
// Append as regular log if expected
if !hadComment {
if !isStarted {
- line = append(tsPrefix, line...)
+ appendLog(tsReader.ts, tsReader.Prefix(), line)
isStarted = true
} else if isNewLine {
- line = append(append([]byte("\n"), tsPrefix...), line...)
+ appendLog(tsReader.ts, []byte("\n"), tsReader.Prefix(), line)
}
- w.Send(ContainerLog{Time: ts, Log: line})
isNewLine = true
}
} else if isStarted {
- w.Send(ContainerLog{Time: ts, Log: append([]byte("\n"), tsPrefix...)})
+ appendLog(tsReader.ts, []byte("\n"), tsReader.Prefix())
}
// Handle the error
if err != nil {
if err != io.EOF {
+ flushLogBuffer()
w.Error(err)
}
return
@@ -141,31 +310,111 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam
return w
}
-func ReadTimestamp(reader *bufio.Reader) (time.Time, []byte, error) {
- tsPrefix := make([]byte, 31, 35) // 30 bytes for timestamp + 1 byte for space + 4 additional bytes for non-UTC timezone
- count, err := io.ReadFull(reader, tsPrefix)
+var (
+ ErrInvalidTimestamp = errors.New("invalid timestamp")
+)
+
+type timestampReader struct {
+ buffer []byte
+ bytes int
+ ts time.Time
+ utc *bool
+}
+
+func newTimestampReader() *timestampReader {
+ return ×tampReader{
+ buffer: make([]byte, 31, 36), // 30 bytes for timestamp + 1 byte for space + 5 additional bytes for non-UTC timezone
+ }
+}
+
+func (t *timestampReader) Prefix() []byte {
+ return t.buffer[:t.bytes]
+}
+
+// read is initial operation for reading the timestamp,
+// that is the slowest one, but also detects the timestamp format.
+// It's meant to be executed just once, for performance reasons.
+func (t *timestampReader) read(reader *bufio.Reader) error {
+ // Read the possible timestamp slice
+ read, err := io.ReadFull(reader, t.buffer[:31])
+ t.bytes = read
if err != nil {
- return time.Time{}, nil, err
+ return err
}
- if count < 31 {
- return time.Time{}, nil, io.EOF
+
+ // Detect the timezone format and adjust the reader if needed
+ utc := t.buffer[29] == 'Z'
+ t.utc = &utc
+ if !utc && len(t.buffer) < 35 {
+ // Increase capacity to store the +00:00 time
+ t.buffer = append(t.buffer, make([]byte, 5)...)
+
+ // Read the missing part
+ read, err = io.ReadFull(reader, t.buffer[31:])
+ t.bytes += read
+ if err != nil {
+ return err
+ }
}
- var ts time.Time
- // Handle non-UTC timezones
- if tsPrefix[29] == '+' {
- tsSuffix := make([]byte, 5)
- count, err = io.ReadFull(reader, tsSuffix)
+
+ // Compute the timestamp
+ if utc {
+ ts, err := time.Parse(time.RFC3339Nano, unsafe.String(&t.buffer[0], 30))
if err != nil {
- return time.Time{}, nil, err
+ return ErrInvalidTimestamp
}
- if count < 5 {
- return time.Time{}, nil, io.EOF
+ t.ts = ts
+ } else {
+ ts, err := time.Parse(time.RFC3339Nano, unsafe.String(&t.buffer[0], 35))
+ if err != nil {
+ return ErrInvalidTimestamp
}
- tsPrefix = append(tsPrefix, tsSuffix...)
+ t.ts = ts.UTC()
}
- ts, err = time.Parse(KubernetesTimezoneLogTimeFormat, string(tsPrefix[0:len(tsPrefix)-1]))
+ return nil
+}
+
+// readUTC is optimized operation for reading the UTC timestamp (Z).
+func (t *timestampReader) readUTC(reader *bufio.Reader) error {
+ // Read the possible timestamp slice
+ read, err := io.ReadFull(reader, t.buffer)
+ t.bytes = read
if err != nil {
- return time.Time{}, tsPrefix, errors2.Wrap(err, "parsing timestamp")
+ return err
+ }
+
+ // Compute the timestamp
+ ts, err := time.Parse(time.RFC3339Nano, unsafe.String(&t.buffer[0], 30))
+ if err != nil {
+ return ErrInvalidTimestamp
+ }
+ t.ts = ts
+ return nil
+}
+
+// readNonUTC is optimized operation for reading the non-UTC timestamp (+00:00).
+func (t *timestampReader) readNonUTC(reader *bufio.Reader) error {
+ // Read the possible timestamp slice
+ read, err := io.ReadFull(reader, t.buffer)
+ t.bytes = read
+ if err != nil {
+ return err
+ }
+
+ // Compute the timestamp
+ ts, err := time.Parse(time.RFC3339Nano, unsafe.String(&t.buffer[0], 35))
+ if err != nil {
+ return ErrInvalidTimestamp
+ }
+ t.ts = ts.UTC()
+ return nil
+}
+
+func (t *timestampReader) Read(reader *bufio.Reader) error {
+ if t.utc == nil {
+ return t.read(reader)
+ } else if *t.utc {
+ return t.readUTC(reader)
}
- return ts.UTC(), tsPrefix, nil
+ return t.readNonUTC(reader)
}
diff --git a/pkg/testworkflows/testworkflowcontroller/logs_test.go b/pkg/testworkflows/testworkflowcontroller/logs_test.go
index f181034773..f9e4f6c64e 100644
--- a/pkg/testworkflows/testworkflowcontroller/logs_test.go
+++ b/pkg/testworkflows/testworkflowcontroller/logs_test.go
@@ -10,26 +10,58 @@ import (
"github.com/stretchr/testify/assert"
)
-func Test_ReadTimestamp_UTC(t *testing.T) {
+func Test_ReadTimestamp_UTC_Initial(t *testing.T) {
+ reader := newTimestampReader()
prefix := "2024-06-07T12:41:49.037275300Z "
message := "some-message"
buf := bufio.NewReader(bytes.NewBufferString(prefix + message))
- ts, byt, err := ReadTimestamp(buf)
+ err := reader.Read(buf)
rest, _ := io.ReadAll(buf)
assert.NoError(t, err)
- assert.Equal(t, []byte(prefix), byt)
+ assert.Equal(t, []byte(prefix), reader.Prefix())
assert.Equal(t, []byte(message), rest)
- assert.Equal(t, time.Date(2024, 6, 7, 12, 41, 49, 37275300, time.UTC), ts)
+ assert.Equal(t, time.Date(2024, 6, 7, 12, 41, 49, 37275300, time.UTC), reader.ts)
}
-func Test_ReadTimestamp_NonUTC(t *testing.T) {
+func Test_ReadTimestamp_NonUTC_Initial(t *testing.T) {
+ reader := newTimestampReader()
prefix := "2024-06-07T15:41:49.037275300+03:00 "
message := "some-message"
buf := bufio.NewReader(bytes.NewBufferString(prefix + message))
- ts, byt, err := ReadTimestamp(buf)
+ err := reader.Read(buf)
rest, _ := io.ReadAll(buf)
assert.NoError(t, err)
- assert.Equal(t, []byte(prefix), byt)
+ assert.Equal(t, []byte(prefix), reader.Prefix())
assert.Equal(t, []byte(message), rest)
- assert.Equal(t, time.Date(2024, 6, 7, 12, 41, 49, 37275300, time.UTC), ts)
+ assert.Equal(t, time.Date(2024, 6, 7, 12, 41, 49, 37275300, time.UTC), reader.ts)
+}
+
+func Test_ReadTimestamp_UTC_Recurring(t *testing.T) {
+ reader := newTimestampReader()
+ prefix := "2024-06-07T12:41:49.037275300Z "
+ message := "some-message"
+ buf := bufio.NewReader(bytes.NewBufferString(prefix + prefix + message))
+ err1 := reader.Read(buf)
+ err2 := reader.Read(buf)
+ rest, _ := io.ReadAll(buf)
+ assert.NoError(t, err1)
+ assert.NoError(t, err2)
+ assert.Equal(t, []byte(prefix), reader.Prefix())
+ assert.Equal(t, []byte(message), rest)
+ assert.Equal(t, time.Date(2024, 6, 7, 12, 41, 49, 37275300, time.UTC), reader.ts)
+}
+
+func Test_ReadTimestamp_NonUTC_Recurring(t *testing.T) {
+ reader := newTimestampReader()
+ prefix := "2024-06-07T15:41:49.037275300+03:00 "
+ message := "some-message"
+ buf := bufio.NewReader(bytes.NewBufferString(prefix + prefix + message))
+ err1 := reader.Read(buf)
+ err2 := reader.Read(buf)
+ rest, _ := io.ReadAll(buf)
+ assert.NoError(t, err1)
+ assert.NoError(t, err2)
+ assert.Equal(t, []byte(prefix), reader.Prefix())
+ assert.Equal(t, []byte(message), rest)
+ assert.Equal(t, time.Date(2024, 6, 7, 12, 41, 49, 37275300, time.UTC), reader.ts)
}
diff --git a/pkg/testworkflows/testworkflowcontroller/notifier.go b/pkg/testworkflows/testworkflowcontroller/notifier.go
index f5b6f14566..bb86d9cdb2 100644
--- a/pkg/testworkflows/testworkflowcontroller/notifier.go
+++ b/pkg/testworkflows/testworkflowcontroller/notifier.go
@@ -3,6 +3,7 @@ package testworkflowcontroller
import (
"context"
"fmt"
+ "sync"
"time"
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
@@ -12,12 +13,38 @@ import (
"github.com/kubeshop/testkube/pkg/ui"
)
+const (
+ FlushResultTime = 50 * time.Millisecond
+ FlushResultMaxTime = 100 * time.Millisecond
+)
+
type notifier struct {
- watcher *channel[Notification]
+ ctx context.Context
+ ch chan ChannelMessage[Notification]
result testkube.TestWorkflowResult
sig []testkube.TestWorkflowSignature
scheduledAt time.Time
lastTs map[string]time.Time
+
+ resultMu sync.Mutex
+ resultCh chan struct{}
+ resultScheduled bool
+}
+
+func (n *notifier) send(value Notification) {
+ // Ignore when the channel is already closed
+ defer func() {
+ recover()
+ }()
+ n.ch <- ChannelMessage[Notification]{Value: value}
+}
+
+func (n *notifier) error(err error) {
+ // Ignore when the channel is already closed
+ defer func() {
+ recover()
+ }()
+ n.ch <- ChannelMessage[Notification]{Error: err}
}
func (n *notifier) GetLastTimestamp(ref string) time.Time {
@@ -40,13 +67,69 @@ func (n *notifier) RegisterTimestamp(ref string, t time.Time) {
}
}
+func (n *notifier) Flush() {
+ n.resultMu.Lock()
+ defer n.resultMu.Unlock()
+ if !n.resultScheduled {
+ return
+ }
+ n.send(Notification{Timestamp: n.result.LatestTimestamp(), Result: n.result.Clone()})
+ n.resultScheduled = false
+}
+
+func (n *notifier) scheduleFlush() {
+ n.resultMu.Lock()
+ defer n.resultMu.Unlock()
+
+ // Inform existing scheduler about the next result
+ if n.resultScheduled {
+ select {
+ case n.resultCh <- struct{}{}:
+ default:
+ }
+ return
+ }
+
+ // Run the scheduler
+ n.resultScheduled = true
+ go func() {
+ flushTimer := time.NewTimer(FlushResultMaxTime)
+ flushTimerEnabled := false
+
+ for {
+ if n.ctx.Err() != nil {
+ return
+ }
+
+ select {
+ case <-n.ctx.Done():
+ n.Flush()
+ return
+ case <-flushTimer.C:
+ n.Flush()
+ flushTimerEnabled = false
+ case <-time.After(FlushResultTime):
+ n.Flush()
+ flushTimerEnabled = false
+ case <-n.resultCh:
+ if !flushTimerEnabled {
+ flushTimerEnabled = true
+ flushTimer.Reset(FlushResultMaxTime)
+ }
+ continue
+ }
+ }
+ }()
+}
+
func (n *notifier) Raw(ref string, ts time.Time, message string, temporary bool) {
if message != "" {
if ref == InitContainerName {
ref = ""
}
// TODO: use timestamp from the message too for lastTs?
- n.watcher.Send(Notification{
+ n.Flush()
+ n.send(Notification{
Timestamp: ts.UTC(),
Log: message,
Ref: ref,
@@ -63,7 +146,7 @@ func (n *notifier) Log(ref string, ts time.Time, message string) {
}
func (n *notifier) Error(err error) {
- n.watcher.Error(err)
+ n.error(err)
}
func (n *notifier) Event(ref string, ts time.Time, level, reason, message string) {
@@ -92,7 +175,7 @@ func (n *notifier) recompute() {
func (n *notifier) emit() {
n.recompute()
- n.watcher.Send(Notification{Timestamp: n.result.LatestTimestamp(), Result: n.result.Clone()})
+ n.scheduleFlush()
}
func (n *notifier) queue(ts time.Time) {
@@ -184,7 +267,8 @@ func (n *notifier) Output(ref string, ts time.Time, output *data.Instruction) {
return
}
n.RegisterTimestamp(ref, ts)
- n.watcher.Send(Notification{Timestamp: ts.UTC(), Ref: ref, Output: output})
+ n.Flush()
+ n.send(Notification{Timestamp: ts.UTC(), Ref: ref, Output: output})
}
func (n *notifier) Finish(ts time.Time) {
@@ -270,11 +354,21 @@ func newNotifier(ctx context.Context, signature []testworkflowprocessor.Signatur
}
result.Recompute(sig, scheduledAt)
+ ch := make(chan ChannelMessage[Notification])
+
+ go func() {
+ <-ctx.Done()
+ close(ch)
+ }()
+
return ¬ifier{
- watcher: newChannel[Notification](ctx, 0),
+ ch: ch,
+ ctx: ctx,
sig: sig,
scheduledAt: scheduledAt,
result: result,
lastTs: make(map[string]time.Time),
+
+ resultCh: make(chan struct{}, 1),
}
}
diff --git a/pkg/testworkflows/testworkflowcontroller/utils.go b/pkg/testworkflows/testworkflowcontroller/utils.go
index 77bb22bc4e..ad54220b0c 100644
--- a/pkg/testworkflows/testworkflowcontroller/utils.go
+++ b/pkg/testworkflows/testworkflowcontroller/utils.go
@@ -12,7 +12,7 @@ import (
)
const (
- KubernetesLogTimeFormat = "2006-01-02T15:04:05.000000000Z"
+ KubernetesLogTimeFormat = "2006-01-02T15:04:05.999999999Z"
KubernetesTimezoneLogTimeFormat = KubernetesLogTimeFormat + "07:00"
)
diff --git a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go
index 7e7feafeda..c98dd7b067 100644
--- a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go
+++ b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go
@@ -27,7 +27,7 @@ type WatchInstrumentedPodOptions struct {
Follow *bool
}
-func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interface, signature []testworkflowprocessor.Signature, scheduledAt time.Time, pod Channel[*corev1.Pod], podEvents Channel[*corev1.Event], opts WatchInstrumentedPodOptions) (Channel[Notification], error) {
+func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interface, signature []testworkflowprocessor.Signature, scheduledAt time.Time, pod Channel[*corev1.Pod], podEvents Channel[*corev1.Event], opts WatchInstrumentedPodOptions) (<-chan ChannelMessage[Notification], error) {
// Avoid missing data
if pod == nil {
return nil, errors.New("pod watcher is required")
@@ -42,7 +42,10 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
// Start watching
go func() {
- defer ctxCancel()
+ defer func() {
+ s.Flush()
+ ctxCancel()
+ }()
// Watch for the basic initialization warnings
for v := range state.PreStart("") {
@@ -99,7 +102,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
// Watch the container logs
follow := common.ResolvePtr(opts.Follow, true) && !state.IsFinished(ref)
- for v := range WatchContainerLogs(ctx, clientSet, podObj.Namespace, podObj.Name, ref, 10, follow, pod).Channel() {
+ for v := range WatchContainerLogs(ctx, clientSet, podObj.Namespace, podObj.Name, ref, 10, pod).Channel() {
if v.Error != nil {
s.Error(v.Error)
} else if v.Value.Output != nil {
@@ -177,7 +180,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
}
}()
- return s.watcher, nil
+ return s.ch, nil
}
func maxTime(times ...time.Time) time.Time {
diff --git a/pkg/testworkflows/testworkflowexecutor/executor.go b/pkg/testworkflows/testworkflowexecutor/executor.go
index d3f00502c1..f7c02dff68 100644
--- a/pkg/testworkflows/testworkflowexecutor/executor.go
+++ b/pkg/testworkflows/testworkflowexecutor/executor.go
@@ -313,7 +313,7 @@ func (e *executor) Control(ctx context.Context, testWorkflow *testworkflowsv1.Te
e.metrics.IncAndObserveExecuteTestWorkflow(*execution, e.dashboardURI)
- e.updateStatus(testWorkflow, execution, testWorkflowExecution)
+ e.updateStatus(testWorkflow, execution, testWorkflowExecution) // TODO: Consider if it is needed
err = testworkflowcontroller.Cleanup(ctx, e.clientSet, execution.GetNamespace(e.namespace), execution.Id)
if err != nil {
log.DefaultLogger.Errorw("failed to cleanup TestWorkflow resources", "id", execution.Id, "error", err)
From f44d150a29ae6439c7943bfe3480cc40696e54b4 Mon Sep 17 00:00:00 2001
From: Dawid Rusnak
Date: Thu, 11 Jul 2024 10:59:18 +0200
Subject: [PATCH 5/6] fix(testworkflows): displaying events (#5652)
---
pkg/testworkflows/testworkflowcontroller/utils.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pkg/testworkflows/testworkflowcontroller/utils.go b/pkg/testworkflows/testworkflowcontroller/utils.go
index ad54220b0c..77bb22bc4e 100644
--- a/pkg/testworkflows/testworkflowcontroller/utils.go
+++ b/pkg/testworkflows/testworkflowcontroller/utils.go
@@ -12,7 +12,7 @@ import (
)
const (
- KubernetesLogTimeFormat = "2006-01-02T15:04:05.999999999Z"
+ KubernetesLogTimeFormat = "2006-01-02T15:04:05.000000000Z"
KubernetesTimezoneLogTimeFormat = KubernetesLogTimeFormat + "07:00"
)
From 1db9d5652eed2558424d4f1491e4353f78da3655 Mon Sep 17 00:00:00 2001
From: Dawid Rusnak
Date: Thu, 11 Jul 2024 14:21:17 +0200
Subject: [PATCH 6/6] fix(testworkflows): storing logs for the services (#5656)
---
pkg/testworkflows/testworkflowcontroller/logs.go | 10 +++++-----
.../testworkflowcontroller/watchinstrumentedpod.go | 2 +-
pkg/testworkflows/testworkflowprocessor/container.go | 2 +-
3 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/pkg/testworkflows/testworkflowcontroller/logs.go b/pkg/testworkflows/testworkflowcontroller/logs.go
index 7fd97ef4bd..b72b8b3680 100644
--- a/pkg/testworkflows/testworkflowcontroller/logs.go
+++ b/pkg/testworkflows/testworkflowcontroller/logs.go
@@ -43,7 +43,7 @@ type ContainerLog struct {
// getContainerLogsStream is getting logs stream, and tries to reinitialize the stream on EOF.
// EOF may happen not only on the actual container end, but also in case of the log rotation.
// @see {@link https://stackoverflow.com/a/68673451}
-func getContainerLogsStream(ctx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, pod Channel[*corev1.Pod], since *time.Time) (io.Reader, error) {
+func getContainerLogsStream(ctx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, follow bool, pod Channel[*corev1.Pod], since *time.Time) (io.Reader, error) {
// Fail immediately if the context is finished
if ctx.Err() != nil {
return nil, ctx.Err()
@@ -58,7 +58,7 @@ func getContainerLogsStream(ctx context.Context, clientSet kubernetes.Interface,
// Create logs stream request
req := clientSet.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
Container: containerName,
- Follow: true,
+ Follow: follow,
Timestamps: true,
SinceTime: sinceTime,
})
@@ -103,7 +103,7 @@ func getContainerLogsStream(ctx context.Context, clientSet kubernetes.Interface,
return stream, nil
}
-func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, pod Channel[*corev1.Pod]) Channel[ContainerLog] {
+func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, follow bool, bufferSize int, pod Channel[*corev1.Pod]) Channel[ContainerLog] {
ctx, ctxCancel := context.WithCancel(parentCtx)
w := newChannel[ContainerLog](ctx, bufferSize)
@@ -119,7 +119,7 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac
var since *time.Time
// Create logs stream request
- stream, err := getContainerLogsStream(ctx, clientSet, namespace, podName, containerName, pod, since)
+ stream, err := getContainerLogsStream(ctx, clientSet, namespace, podName, containerName, follow, pod, since)
hadAnyContent := false
if err == io.EOF {
return
@@ -241,7 +241,7 @@ func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interfac
}
// Reinitialize logs stream
since = common.Ptr(tsReader.ts.Add(1))
- stream, err = getContainerLogsStream(ctx, clientSet, namespace, podName, containerName, pod, since)
+ stream, err = getContainerLogsStream(ctx, clientSet, namespace, podName, containerName, follow, pod, since)
if err != nil {
return
}
diff --git a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go
index c98dd7b067..1ae190e2f0 100644
--- a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go
+++ b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go
@@ -102,7 +102,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
// Watch the container logs
follow := common.ResolvePtr(opts.Follow, true) && !state.IsFinished(ref)
- for v := range WatchContainerLogs(ctx, clientSet, podObj.Namespace, podObj.Name, ref, 10, pod).Channel() {
+ for v := range WatchContainerLogs(ctx, clientSet, podObj.Namespace, podObj.Name, ref, follow, 10, pod).Channel() {
if v.Error != nil {
s.Error(v.Error)
} else if v.Value.Output != nil {
diff --git a/pkg/testworkflows/testworkflowprocessor/container.go b/pkg/testworkflows/testworkflowprocessor/container.go
index 7566a3a106..1546965341 100644
--- a/pkg/testworkflows/testworkflowprocessor/container.go
+++ b/pkg/testworkflows/testworkflowprocessor/container.go
@@ -468,7 +468,7 @@ func (c *container) Resolve(m ...expressions.Machine) error {
}
env := c.Env()
name = name[4:]
- for i := range env {
+ for i := len(env) - 1; i >= 0; i-- {
if env[i].Name == name && env[i].ValueFrom == nil {
value, err := expressions.EvalTemplate(env[i].Value)
if err == nil {