Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move execution scheduling to Control Plane #6089

Open
wants to merge 84 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
211dc88
fix: avoid processing workflow's managedFields
rangoo94 Nov 21, 2024
be53b6a
feat: prepare basic runner
rangoo94 Nov 7, 2024
8bbf24a
chore: simplify one repository a bit
rangoo94 Nov 7, 2024
43fcb0b
feat: make the sensitive data extraction more abstract
rangoo94 Nov 21, 2024
2e2c344
chore: decouple execution scheduling into multiple phases
rangoo94 Nov 25, 2024
6776cde
fix: configure default service account in Execution Worker
rangoo94 Nov 26, 2024
7341a26
fixup
rangoo94 Nov 26, 2024
8279697
chore: delete unused method from TestWorkflowExecutor
rangoo94 Nov 26, 2024
59763a4
chore: make ExecutionScheduler public
rangoo94 Nov 26, 2024
efaf6e9
feat: prepare initial gRPC command for scheduling executions group
rangoo94 Nov 27, 2024
821438d
chore: split a bit preparing executions again
rangoo94 Nov 27, 2024
84f8d7b
chore: add note about used resources in scheduler
rangoo94 Nov 27, 2024
326a245
chore: reorder execution preparation
rangoo94 Nov 27, 2024
26fa905
feat(performance): delay unregistering controllers for fast re-use
rangoo94 Dec 2, 2024
2cc6f26
feat: add option to estimate the TestWorkflow result
rangoo94 Dec 2, 2024
4eeff55
chore: streamline a bit scheduling execution
rangoo94 Dec 2, 2024
8f29813
feat: update TestWorkflowExecution resource in Kubernetes on executio…
rangoo94 Dec 3, 2024
7be1537
chore: add Init() function for the execution creation, save signature…
rangoo94 Dec 3, 2024
4bdf56f
feat: add StartTestWorkflow event
rangoo94 Dec 3, 2024
df186b8
feat: translate new running context to old format
rangoo94 Dec 3, 2024
dc0bf16
fixup delete obsolete comment
rangoo94 Dec 3, 2024
c4867f0
chore: move the running context conversion to separate file
rangoo94 Dec 3, 2024
2ab6549
chore: move schedule request validation to utility function
rangoo94 Dec 3, 2024
bcb3b61
chore: reorganize execution scheduler a bit
rangoo94 Dec 4, 2024
2695d96
fixup delete unused code
rangoo94 Dec 4, 2024
bef0ce7
fix: cloning intermediate execution
rangoo94 Dec 4, 2024
1684ced
chore: avoid using schedule request data when not needed
rangoo94 Dec 4, 2024
7b7a060
chore: extract direct/remote executor & prepare lazy emitter/runner
rangoo94 Dec 5, 2024
5b31557
chore: include executor from external place
rangoo94 Dec 5, 2024
7363231
chore: rename a bit
rangoo94 Dec 5, 2024
39cd979
chore: rename a bit
rangoo94 Dec 5, 2024
a16335f
feat: move executor, so it's properly distributed and supports legacy…
rangoo94 Dec 5, 2024
2901163
chore: delete legacy TestWorkflowExecutor.LegacyExecute
rangoo94 Dec 5, 2024
69fab4b
chore: replace todo comment
rangoo94 Dec 6, 2024
1935b6e
fix: correctly display logs for immediately failed executions
rangoo94 Dec 6, 2024
8f6d75c
fix: correctly display logs for aborted executions
rangoo94 Dec 6, 2024
b39bfb3
fix: displaying logs in the CLI
rangoo94 Dec 6, 2024
9ef9ce8
feat: use gRPC connection for executing Test Workflows from inside of…
rangoo94 Dec 6, 2024
caf8e18
chore: move Test Workflow Execution telemetry and metrics to be based…
rangoo94 Dec 9, 2024
676c9b5
feat: add information about support of the new runner methods in the …
rangoo94 Dec 9, 2024
0cb2871
chore: update comment
rangoo94 Dec 9, 2024
d96c0e2
feat: retry multiple times getting credentials from the Control Plane
rangoo94 Dec 9, 2024
53d2b8d
fix: avoid get credentials retry if it's not recoverable error
rangoo94 Dec 9, 2024
11db316
chore: adjust Test Workflow Execution metrics only after it's finished
rangoo94 Dec 9, 2024
d19ea26
fix: telemetry/metrics condition
rangoo94 Dec 9, 2024
ae94fc4
feat: add method to federate events from the Control Plane back to th…
rangoo94 Dec 9, 2024
2242172
fixup unit tests
rangoo94 Dec 10, 2024
97824c8
fix after rebase
rangoo94 Dec 11, 2024
f3a7bc3
fix: read ScheduleExecution error properly
rangoo94 Dec 11, 2024
04ee48f
fix: read ScheduleExecution error properly
rangoo94 Dec 11, 2024
ce11d1a
chore: expose TestWorkflow's Scheduler as public interface
rangoo94 Dec 11, 2024
4cafc7f
chore: simplify fetcher interface
rangoo94 Dec 11, 2024
7cce850
feat: add layer to send runner request and to lock execution to runner
rangoo94 Dec 13, 2024
829d869
feat: support runner requests
rangoo94 Dec 13, 2024
b073a28
feat: unify Cloud/Kubernetes client for Test Workflows
rangoo94 Dec 13, 2024
3fd0a3c
feat: add cloud/kubernetes client for Test Workflow Templates
rangoo94 Dec 16, 2024
e96db8c
fix: building configuration
rangoo94 Dec 16, 2024
148743f
fix: obfuscating dynamic credentials
rangoo94 Dec 16, 2024
9bf73f9
feat: add feature flag for new executions scheduling
rangoo94 Dec 16, 2024
714587a
fix: wait for runner requests only when it's supported
rangoo94 Dec 16, 2024
fd57887
feat: add method to list queued executions without runner
rangoo94 Dec 16, 2024
69d91d5
fix: pass down information about untrusted user signature too
rangoo94 Dec 16, 2024
48caee1
fixup
rangoo94 Dec 16, 2024
875c324
chore: different fixes
rangoo94 Dec 16, 2024
3885955
fixup
rangoo94 Dec 16, 2024
537d508
chore: add todo comment
rangoo94 Dec 16, 2024
deabc37
chore: add todo comment
rangoo94 Dec 16, 2024
027b870
fix: unit tests
rangoo94 Dec 17, 2024
b6ac632
chore: merge
rangoo94 Dec 17, 2024
01628d9
fix: execute step
rangoo94 Dec 17, 2024
2e8cd45
feat: add Control Plane functions for TestWorkflow CRD management
rangoo94 Dec 18, 2024
817e1ca
fixup add todos
rangoo94 Dec 18, 2024
26686bf
feat: add support for Test Workflow Storage in the Control Plane
rangoo94 Dec 18, 2024
a737688
fixup lint
rangoo94 Dec 18, 2024
94bc34d
feat: add client endpoint to read labels from Agent
rangoo94 Dec 18, 2024
55b60fb
chore: delete unused repositories
rangoo94 Dec 18, 2024
fa2c568
Merge branch 'main' into dawid/feat/move-scheduling
rangoo94 Dec 18, 2024
8f71eea
chore: change DeepCopy in testkube models
rangoo94 Dec 19, 2024
b39ab7e
chore: review fixes
rangoo94 Dec 19, 2024
9337141
Merge branch 'main' into dawid/feat/move-scheduling
rangoo94 Dec 20, 2024
bb1b729
chore: restructure scheduling API a bit
rangoo94 Dec 20, 2024
6dd1c00
fixup unit tests
rangoo94 Dec 20, 2024
173b115
Merge branch 'refs/heads/main' into dawid/feat/move-scheduling
rangoo94 Dec 20, 2024
506cec9
fixup merge
rangoo94 Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/v1/testkube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7385,6 +7385,8 @@ components:
type: string
example:
WEBHOOK_PARAMETER: "any value"
external:
type: boolean

EventResource:
type: string
Expand Down Expand Up @@ -8188,6 +8190,9 @@ components:
description: identifier for group of correlated executions
format: bson objectId
example: "62f395e004109209b50edfc1"
runnerId:
type: string
description: identifier of the runner where it has been executed
name:
type: string
description: execution name
Expand Down Expand Up @@ -8258,6 +8263,11 @@ components:
description: unique execution identifier
format: bson objectId
example: "62f395e004109209b50edfc1"
groupId:
type: string
description: identifier for group of correlated executions
format: bson objectId
example: "62f395e004109209b50edfc1"
name:
type: string
description: execution name
Expand Down
23 changes: 17 additions & 6 deletions cmd/api-server/commons/commons.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
parser "github.com/kubeshop/testkube/internal/template"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/cache"
"github.com/kubeshop/testkube/pkg/capabilities"
"github.com/kubeshop/testkube/pkg/cloud"
"github.com/kubeshop/testkube/pkg/configmap"
"github.com/kubeshop/testkube/pkg/dbmigrator"
Expand Down Expand Up @@ -290,12 +291,14 @@ func ReadProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.Te
WorkflowNotificationsWorkerCount: cfg.TestkubeProWorkflowNotificationsWorkerCount,
WorkflowServiceNotificationsWorkerCount: cfg.TestkubeProWorkflowServiceNotificationsWorkerCount,
WorkflowParallelStepNotificationsWorkerCount: cfg.TestkubeProWorkflowParallelStepNotificationsWorkerCount,
SkipVerify: cfg.TestkubeProSkipVerify,
EnvID: cfg.TestkubeProEnvID,
OrgID: cfg.TestkubeProOrgID,
Migrate: cfg.TestkubeProMigrate,
ConnectionTimeout: cfg.TestkubeProConnectionTimeout,
DashboardURI: cfg.TestkubeDashboardURI,
SkipVerify: cfg.TestkubeProSkipVerify,
EnvID: cfg.TestkubeProEnvID,
OrgID: cfg.TestkubeProOrgID,
Migrate: cfg.TestkubeProMigrate,
ConnectionTimeout: cfg.TestkubeProConnectionTimeout,
DashboardURI: cfg.TestkubeDashboardURI,
NewExecutions: grpcClient == nil,
TestWorkflowStorage: grpcClient == nil,
}

if cfg.TestkubeProAPIKey == "" || grpcClient == nil {
Expand All @@ -320,6 +323,14 @@ func ReadProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.Te
proContext.OrgID = foundProContext.OrgId
}

if capabilities.Enabled(foundProContext.Capabilities, capabilities.CapabilityNewExecutions) {
proContext.NewExecutions = true
}

if capabilities.Enabled(foundProContext.Capabilities, capabilities.CapabilityTestWorkflowStorage) {
proContext.TestWorkflowStorage = true
}

return proContext
}

Expand Down
173 changes: 124 additions & 49 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"strings"
"time"

"github.com/gofiber/fiber/v2/middleware/cors"
"google.golang.org/grpc"
Expand All @@ -19,12 +20,19 @@ import (
cloudtestworkflow "github.com/kubeshop/testkube/pkg/cloud/data/testworkflow"
"github.com/kubeshop/testkube/pkg/event/kind/cdevent"
"github.com/kubeshop/testkube/pkg/event/kind/k8sevent"
"github.com/kubeshop/testkube/pkg/event/kind/testworkflowexecutionmetrics"
"github.com/kubeshop/testkube/pkg/event/kind/testworkflowexecutions"
"github.com/kubeshop/testkube/pkg/event/kind/testworkflowexecutiontelemetry"
"github.com/kubeshop/testkube/pkg/event/kind/webhook"
ws "github.com/kubeshop/testkube/pkg/event/kind/websocket"
"github.com/kubeshop/testkube/pkg/newclients/testworkflowclient"
"github.com/kubeshop/testkube/pkg/newclients/testworkflowtemplateclient"
runner2 "github.com/kubeshop/testkube/pkg/runner"
"github.com/kubeshop/testkube/pkg/secretmanager"
"github.com/kubeshop/testkube/pkg/server"
"github.com/kubeshop/testkube/pkg/tcl/checktcl"
"github.com/kubeshop/testkube/pkg/tcl/schedulertcl"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowconfig"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/presets"

"github.com/kubeshop/testkube/internal/common"
Expand Down Expand Up @@ -78,9 +86,35 @@ func main() {

configMapConfig := commons.MustGetConfigMapConfig(ctx, cfg.APIServerConfig, cfg.TestkubeNamespace, cfg.TestkubeAnalyticsEnabled)

// k8s
kubeClient, err := kubeclient.GetClient()
commons.ExitOnError("Getting kubernetes client", err)
clientset, err := k8sclient.ConnectToK8s()
commons.ExitOnError("Creating k8s clientset", err)

var runner runner2.Runner
lazyRunner := runner2.Lazy(&runner)

var eventsEmitter *event.Emitter
lazyEmitter := event.Lazy(&eventsEmitter)

// TODO: Make granular environment variables, yet backwards compatible
secretConfig := testkube.SecretConfig{
Prefix: cfg.SecretCreationPrefix,
List: cfg.EnableSecretsEndpoint,
ListAll: cfg.EnableSecretsEndpoint && cfg.EnableListingAllSecrets,
Create: cfg.EnableSecretsEndpoint && !cfg.DisableSecretCreation,
Modify: cfg.EnableSecretsEndpoint && !cfg.DisableSecretCreation,
Delete: cfg.EnableSecretsEndpoint && !cfg.DisableSecretCreation,
AutoCreate: !cfg.DisableSecretCreation,
}
secretManager := secretmanager.New(clientset, secretConfig)

metrics := metrics.NewMetrics()

// Start local Control Plane
if mode == common.ModeStandalone {
controlPlane := services.CreateControlPlane(ctx, cfg, features, configMapConfig)
controlPlane := services.CreateControlPlane(ctx, cfg, features, configMapConfig, secretManager, metrics, lazyRunner, lazyEmitter)
g.Go(func() error {
return controlPlane.Run(ctx)
})
Expand All @@ -93,37 +127,18 @@ func main() {
clusterId, _ := configMapConfig.GetUniqueClusterId(ctx)
telemetryEnabled, _ := configMapConfig.GetTelemetryEnabled(ctx)

// k8s
kubeClient, err := kubeclient.GetClient()
commons.ExitOnError("Getting kubernetes client", err)
clientset, err := k8sclient.ConnectToK8s()
commons.ExitOnError("Creating k8s clientset", err)

// k8s clients
secretClient := secret.NewClientFor(clientset, cfg.TestkubeNamespace)
configMapClient := configmap.NewClientFor(clientset, cfg.TestkubeNamespace)
webhooksClient := executorsclientv1.NewWebhooksClient(kubeClient, cfg.TestkubeNamespace)
testTriggersClient := testtriggersclientv1.NewClient(kubeClient, cfg.TestkubeNamespace)
testWorkflowExecutionsClient := testworkflowsclientv1.NewTestWorkflowExecutionsClient(kubeClient, cfg.TestkubeNamespace)

// TODO: Make granular environment variables, yet backwards compatible
secretConfig := testkube.SecretConfig{
Prefix: cfg.SecretCreationPrefix,
List: cfg.EnableSecretsEndpoint,
ListAll: cfg.EnableSecretsEndpoint && cfg.EnableListingAllSecrets,
Create: cfg.EnableSecretsEndpoint && !cfg.DisableSecretCreation,
Modify: cfg.EnableSecretsEndpoint && !cfg.DisableSecretCreation,
Delete: cfg.EnableSecretsEndpoint && !cfg.DisableSecretCreation,
AutoCreate: !cfg.DisableSecretCreation,
}
secretManager := secretmanager.New(clientset, secretConfig)

envs := commons.GetEnvironmentVariables()

inspector := commons.CreateImageInspector(cfg, configMapClient, secretClient)

var testWorkflowsClient testworkflowsclientv1.Interface
var testWorkflowTemplatesClient testworkflowsclientv1.TestWorkflowTemplatesInterface
var testWorkflowsClient testworkflowclient.TestWorkflowClient
var testWorkflowTemplatesClient testworkflowtemplateclient.TestWorkflowTemplateClient

var grpcClient cloud.TestKubeCloudAPIClient
var grpcConn *grpc.ClientConn
Expand All @@ -146,20 +161,8 @@ func main() {

grpcClient = cloud.NewTestKubeCloudAPIClient(grpcConn)

if mode == common.ModeAgent && cfg.WorkflowStorage == "control-plane" {
testWorkflowsClient = cloudtestworkflow.NewCloudTestWorkflowRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
testWorkflowTemplatesClient = cloudtestworkflow.NewCloudTestWorkflowTemplateRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
} else {
testWorkflowsClient = testworkflowsclientv1.NewClient(kubeClient, cfg.TestkubeNamespace)
testWorkflowTemplatesClient = testworkflowsclientv1.NewTestWorkflowTemplatesClient(kubeClient, cfg.TestkubeNamespace)
}

testWorkflowResultsRepository := cloudtestworkflow.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
var opts []cloudtestworkflow.Option
if cfg.StorageSkipVerify {
opts = append(opts, cloudtestworkflow.WithSkipVerify())
}
testWorkflowOutputRepository := cloudtestworkflow.NewCloudOutputRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, opts...)
testWorkflowOutputRepository := cloudtestworkflow.NewCloudOutputRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.StorageSkipVerify)
triggerLeaseBackend := triggers.NewAcquireAlwaysLeaseBackend()
artifactStorage := cloudartifacts.NewCloudArtifactsStorage(grpcClient, grpcConn, cfg.TestkubeProAPIKey)

Expand All @@ -168,13 +171,21 @@ func main() {
if cfg.Trace {
eventBus.TraceEvents()
}
eventsEmitter := event.NewEmitter(eventBus, cfg.TestkubeClusterName)
eventsEmitter = event.NewEmitter(eventBus, cfg.TestkubeClusterName)

// Check Pro/Enterprise subscription
proContext := commons.ReadProContext(ctx, cfg, grpcClient)
subscriptionChecker, err := checktcl.NewSubscriptionChecker(ctx, proContext, grpcClient, grpcConn)
commons.ExitOnError("Failed creating subscription checker", err)

if proContext.TestWorkflowStorage && cfg.FeatureTestWorkflowCloudStorage {
testWorkflowsClient = testworkflowclient.NewCloudTestWorkflowClient(grpcConn, cfg.TestkubeProAPIKey)
testWorkflowTemplatesClient = testworkflowtemplateclient.NewCloudTestWorkflowTemplateClient(grpcConn, cfg.TestkubeProAPIKey)
} else {
testWorkflowsClient = testworkflowclient.NewKubernetesTestWorkflowClient(kubeClient, cfg.TestkubeNamespace)
testWorkflowTemplatesClient = testworkflowtemplateclient.NewKubernetesTestWorkflowTemplateClient(kubeClient, cfg.TestkubeNamespace)
}

serviceAccountNames := map[string]string{
cfg.TestkubeNamespace: cfg.JobServiceAccountName,
}
Expand All @@ -185,8 +196,6 @@ func main() {
serviceAccountNames = schedulertcl.GetServiceAccountNamesFromConfig(serviceAccountNames, cfg.TestkubeExecutionNamespaces)
}

metrics := metrics.NewMetrics()

var deprecatedSystem *services.DeprecatedSystem
if !cfg.DisableDeprecatedTests {
deprecatedSystem = services.CreateDeprecatedSystem(
Expand Down Expand Up @@ -214,28 +223,71 @@ func main() {
if mode == common.ModeAgent {
testWorkflowProcessor = presets.NewPro(inspector)
}
executionWorker := services.CreateExecutionWorker(clientset, cfg, clusterId, serviceAccountNames, testWorkflowProcessor)
executionWorker := services.CreateExecutionWorker(clientset, cfg, clusterId, serviceAccountNames, testWorkflowProcessor, map[string]string{
testworkflowconfig.FeatureFlagNewExecutions: fmt.Sprintf("%v", cfg.FeatureNewExecutions),
testworkflowconfig.FeatureFlagTestWorkflowCloudStorage: fmt.Sprintf("%v", cfg.FeatureTestWorkflowCloudStorage),
})

// Build the runner
runner = runner2.New(
executionWorker,
testWorkflowOutputRepository,
testWorkflowResultsRepository,
configMapConfig,
eventsEmitter,
metrics,
cfg.TestkubeDashboardURI,
cfg.StorageSkipVerify,
)

// Recover control
func() {
var list []testkube.TestWorkflowExecution
for {
// TODO: it should get running only in the context of current runner (worker.List?)
list, err = testWorkflowResultsRepository.GetRunning(ctx)
if err != nil {
log.DefaultLogger.Errorw("failed to fetch running executions to recover", "error", err)
<-time.After(time.Second)
continue
}
break
}

for i := range list {
if (list[i].RunnerId == "" && len(list[i].Signature) == 0) || (list[i].RunnerId != "" && list[i].RunnerId != proContext.EnvID) {
continue
}

// TODO: Should it throw error at all?
// TODO: Pass hints (namespace, signature, scheduledAt)
go func(e *testkube.TestWorkflowExecution) {
err := runner.Monitor(ctx, e.Id)
if err != nil {
log.DefaultLogger.Errorw("failed to monitor execution", "id", e.Id, "error", err)
}
}(&list[i])
}
}()

testWorkflowExecutor := testworkflowexecutor.New(
grpcClient,
cfg.TestkubeProAPIKey,
cfg.CDEventsTarget,
eventsEmitter,
executionWorker,
clientset,
runner,
testWorkflowResultsRepository,
testWorkflowOutputRepository,
configMapConfig,
testWorkflowTemplatesClient,
testWorkflowExecutionsClient,
testWorkflowsClient,
metrics,
secretManager,
cfg.GlobalWorkflowTemplateName,
cfg.TestkubeDashboardURI,
&proContext,
proContext.OrgID,
proContext.EnvID,
cfg.FeatureNewExecutions,
)
g.Go(func() error {
testWorkflowExecutor.Recover(ctx)
return nil
})

var deprecatedClients commons.DeprecatedClients
var deprecatedRepositories commons.DeprecatedRepositories
Expand All @@ -262,6 +314,17 @@ func main() {
if cfg.EnableK8sEvents {
eventsEmitter.Loader.Register(k8sevent.NewK8sEventLoader(clientset, cfg.TestkubeNamespace, testkube.AllEventTypes))
}

// Update TestWorkflowExecution Kubernetes resource objects on status change
eventsEmitter.Loader.Register(testworkflowexecutions.NewLoader(ctx, cfg.TestkubeNamespace, kubeClient))

// Update the Prometheus metrics regarding the Test Workflow Execution
eventsEmitter.Loader.Register(testworkflowexecutionmetrics.NewLoader(ctx, metrics, cfg.TestkubeDashboardURI))

// Send the telemetry data regarding the Test Workflow Execution
// TODO: Disable it if Control Plane does that
eventsEmitter.Loader.Register(testworkflowexecutiontelemetry.NewLoader(ctx, configMapConfig))

eventsEmitter.Listen(ctx)
g.Go(func() error {
eventsEmitter.Reconcile(ctx)
Expand All @@ -286,11 +349,12 @@ func main() {
webhooksClient,
testTriggersClient,
testWorkflowsClient,
testworkflowsclientv1.NewClient(kubeClient, cfg.TestkubeNamespace),
testWorkflowTemplatesClient,
testworkflowsclientv1.NewTestWorkflowTemplatesClient(kubeClient, cfg.TestkubeNamespace),
configMapConfig,
secretManager,
secretConfig,
testWorkflowExecutor,
executionWorker,
eventsEmitter,
websocketLoader,
Expand All @@ -301,6 +365,7 @@ func main() {
cfg.TestkubeHelmchartVersion,
serviceAccountNames,
cfg.TestkubeDockerImageVersion,
testWorkflowExecutor,
)
api.Init(httpServer)

Expand All @@ -323,6 +388,16 @@ func main() {
features,
&proContext,
cfg.TestkubeDockerImageVersion,
eventsEmitter,
func(environmentId, executionId string) error {
execution, err := testWorkflowResultsRepository.Get(context.Background(), executionId)
if err != nil {
return err
}
return testWorkflowExecutor.Start(environmentId, &execution, nil)
},
cfg.FeatureNewExecutions,
cfg.FeatureTestWorkflowCloudStorage,
)
commons.ExitOnError("Starting agent", err)
g.Go(func() error {
Expand Down
Loading
Loading