diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index b350e2e156d..a0db963f2fa 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -403,7 +403,7 @@ func main() { eventBus.TraceEvents() } - eventsEmitter := event.NewEmitter(eventBus, cfg.TestkubeClusterName, envs) + eventsEmitter := event.NewEmitter(eventBus, cfg.TestkubeClusterName) var logsStream logsclient.Stream @@ -642,6 +642,7 @@ func main() { logGrpcClient, subscriptionChecker, serviceAccountNames, + envs, ) if mode == common.ModeAgent { diff --git a/internal/app/api/v1/server.go b/internal/app/api/v1/server.go index 288633dcfe3..688a6a0ab78 100644 --- a/internal/app/api/v1/server.go +++ b/internal/app/api/v1/server.go @@ -107,6 +107,7 @@ func NewTestkubeAPI( logGrpcClient logsclient.StreamGetter, subscriptionChecker checktcl.SubscriptionChecker, serviceAccountNames map[string]string, + envs map[string]string, ) TestkubeAPI { var httpConfig server.Config @@ -163,6 +164,7 @@ func NewTestkubeAPI( SubscriptionChecker: subscriptionChecker, LabelSources: common.Ptr(make([]LabelSource, 0)), ServiceAccountNames: serviceAccountNames, + Envs: envs, } } @@ -211,6 +213,7 @@ type TestkubeAPI struct { SubscriptionChecker checktcl.SubscriptionChecker LabelSources *[]LabelSource ServiceAccountNames map[string]string + Envs map[string]string } type storageParams struct { @@ -612,7 +615,7 @@ func (s *TestkubeAPI) InitEventListeners( s.Events.Loader.Register(webhook.NewWebhookLoader( s.Log, webhookClient, templatesClient, testExecutionResults, testSuiteExecutionsResults, - testWorkflowResults, metrics, s.proContext)) + testWorkflowResults, metrics, s.proContext, s.Envs)) s.Events.Loader.Register(s.WebsocketLoader) s.Events.Loader.Register(s.slackLoader) diff --git a/pkg/agent/events.go b/pkg/agent/events.go index 42cd201d5c0..2af87a793d2 100644 --- a/pkg/agent/events.go +++ b/pkg/agent/events.go @@ -51,7 +51,6 @@ func (ag *Agent) Metadata() map[string]string { func (ag *Agent) Notify(event testkube.Event) (result testkube.EventResult) { event.ClusterName = ag.clusterName - event.Envs = ag.envs // Non blocking send select { case ag.events <- event: diff --git a/pkg/event/emitter.go b/pkg/event/emitter.go index c2445aef6f8..b814b0f22a3 100644 --- a/pkg/event/emitter.go +++ b/pkg/event/emitter.go @@ -20,14 +20,13 @@ const ( ) // NewEmitter returns new emitter instance -func NewEmitter(eventBus bus.Bus, clusterName string, envs map[string]string) *Emitter { +func NewEmitter(eventBus bus.Bus, clusterName string) *Emitter { return &Emitter{ Log: log.DefaultLogger, Loader: NewLoader(), Bus: eventBus, Listeners: make(common.Listeners, 0), ClusterName: clusterName, - Envs: envs, } } @@ -39,7 +38,6 @@ type Emitter struct { mutex sync.RWMutex Bus bus.Bus ClusterName string - Envs map[string]string } // Register adds new listener @@ -126,7 +124,6 @@ func listerersToMap(listeners []common.Listener) map[string]map[string]common.Li // Notify notifies emitter with webhook func (e *Emitter) Notify(event testkube.Event) { event.ClusterName = e.ClusterName - event.Envs = e.Envs err := e.Bus.PublishTopic(event.Topic(), event) if err != nil { e.Log.Errorw("error publishing event", append(event.Log(), "error", err)) diff --git a/pkg/event/emitter_integration_test.go b/pkg/event/emitter_integration_test.go index 138e5e88556..02832f511cc 100644 --- a/pkg/event/emitter_integration_test.go +++ b/pkg/event/emitter_integration_test.go @@ -32,7 +32,7 @@ func GetTestNATSEmitter() *Emitter { if err != nil { panic(err) } - return NewEmitter(bus.NewNATSBus(nc), "", nil) + return NewEmitter(bus.NewNATSBus(nc), "") } func TestEmitter_NATS_Register_Integration(t *testing.T) { diff --git a/pkg/event/emitter_test.go b/pkg/event/emitter_test.go index 8d4ba6126fc..ded7216d96e 100644 --- a/pkg/event/emitter_test.go +++ b/pkg/event/emitter_test.go @@ -25,7 +25,7 @@ func TestEmitter_Register(t *testing.T) { t.Parallel() // given eventBus := bus.NewEventBusMock() - emitter := NewEmitter(eventBus, "", nil) + emitter := NewEmitter(eventBus, "") // when emitter.Register(&dummy.DummyListener{Id: "l1"}) @@ -43,7 +43,7 @@ func TestEmitter_Listen(t *testing.T) { t.Parallel() // given eventBus := bus.NewEventBusMock() - emitter := NewEmitter(eventBus, "", nil) + emitter := NewEmitter(eventBus, "") // given listener with matching selector listener1 := &dummy.DummyListener{Id: "l1", SelectorString: "type=listener1"} // and listener with second matic selector @@ -97,7 +97,7 @@ func TestEmitter_Notify(t *testing.T) { t.Parallel() // given eventBus := bus.NewEventBusMock() - emitter := NewEmitter(eventBus, "", nil) + emitter := NewEmitter(eventBus, "") // and 2 listeners subscribed to the same queue // * first on pod1 listener1 := &dummy.DummyListener{Id: "l3"} @@ -131,7 +131,7 @@ func TestEmitter_NotifyBecome(t *testing.T) { t.Parallel() // given eventBus := bus.NewEventBusMock() - emitter := NewEmitter(eventBus, "", nil) + emitter := NewEmitter(eventBus, "") // and 2 listeners subscribed to the same queue // * first on pod1 listener1 := &dummy.DummyListener{Id: "l5", Types: []testkube.EventType{ @@ -167,7 +167,7 @@ func TestEmitter_Reconcile(t *testing.T) { t.Parallel() // given first reconciler loop was done eventBus := bus.NewEventBusMock() - emitter := NewEmitter(eventBus, "", nil) + emitter := NewEmitter(eventBus, "") emitter.Loader.Register(&dummy.DummyLoader{IdPrefix: "dummy1"}) emitter.Loader.Register(&dummy.DummyLoader{IdPrefix: "dummy2"}) @@ -229,7 +229,7 @@ func TestEmitter_UpdateListeners(t *testing.T) { t.Parallel() // given eventBus := bus.NewEventBusMock() - emitter := NewEmitter(eventBus, "", nil) + emitter := NewEmitter(eventBus, "") // given listener with matching selector listener1 := &dummy.DummyListener{Id: "l1", SelectorString: "type=listener1"} // and listener with second matching selector diff --git a/pkg/event/kind/webhook/listener.go b/pkg/event/kind/webhook/listener.go index d2ad5379bdb..b155b86d4b7 100644 --- a/pkg/event/kind/webhook/listener.go +++ b/pkg/event/kind/webhook/listener.go @@ -33,7 +33,9 @@ func NewWebhookListener(name, uri, selector string, events []testkube.EventType, testSuiteExecutionResults testresult.Repository, testWorkflowExecutionResults testworkflow.Repository, metrics v1.Metrics, - proContext *config.ProContext) *WebhookListener { + proContext *config.ProContext, + envs map[string]string, +) *WebhookListener { return &WebhookListener{ name: name, Uri: uri, @@ -50,6 +52,7 @@ func NewWebhookListener(name, uri, selector string, events []testkube.EventType, testWorkflowExecutionResults: testWorkflowExecutionResults, metrics: metrics, proContext: proContext, + envs: envs, } } @@ -69,6 +72,7 @@ type WebhookListener struct { testWorkflowExecutionResults testworkflow.Repository metrics v1.Metrics proContext *config.ProContext + envs map[string]string } func (l *WebhookListener) Name() string { @@ -112,6 +116,9 @@ func (l *WebhookListener) Disabled() bool { } func (l *WebhookListener) Notify(event testkube.Event) (result testkube.EventResult) { + // load global envs to be able to use them in templates + event.Envs = l.envs + defer func() { var eventType, res string if event.Type_ != nil { @@ -169,9 +176,11 @@ func (l *WebhookListener) Notify(event testkube.Event) (result testkube.EventRes _, err = body.Write(data) } else { + // clean envs if not requested explicitly by payload template + event.Envs = nil err = json.NewEncoder(body).Encode(event) if err == nil && l.payloadObjectField != "" { - data := map[string]string{l.payloadObjectField: string(body.Bytes())} + data := map[string]string{l.payloadObjectField: body.String()} body.Reset() err = json.NewEncoder(body).Encode(data) } diff --git a/pkg/event/kind/webhook/listener_test.go b/pkg/event/kind/webhook/listener_test.go index 4d34276523a..07e078006f3 100644 --- a/pkg/event/kind/webhook/listener_test.go +++ b/pkg/event/kind/webhook/listener_test.go @@ -34,7 +34,7 @@ func TestWebhookListener_Notify(t *testing.T) { svr := httptest.NewServer(testHandler) defer svr.Close() - l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil) + l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil, nil) // when r := l.Notify(testkube.Event{ @@ -56,7 +56,7 @@ func TestWebhookListener_Notify(t *testing.T) { svr := httptest.NewServer(testHandler) defer svr.Close() - l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil) + l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil, nil) // when r := l.Notify(testkube.Event{ @@ -73,7 +73,7 @@ func TestWebhookListener_Notify(t *testing.T) { t.Parallel() // given - s := NewWebhookListener("l1", "http://baduri.badbadbad", "", testEventTypes, "", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil) + s := NewWebhookListener("l1", "http://baduri.badbadbad", "", testEventTypes, "", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil, nil) // when r := s.Notify(testkube.Event{ @@ -106,7 +106,7 @@ func TestWebhookListener_Notify(t *testing.T) { svr := httptest.NewServer(testHandler) defer svr.Close() - l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "field", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil) + l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "field", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil, nil) // when r := l.Notify(testkube.Event{ @@ -133,7 +133,7 @@ func TestWebhookListener_Notify(t *testing.T) { defer svr.Close() l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "", "{\"id\": \"{{ .Id }}\"}", - map[string]string{"Content-Type": "application/json"}, false, nil, nil, nil, v1.NewMetrics(), nil) + map[string]string{"Content-Type": "application/json"}, false, nil, nil, nil, v1.NewMetrics(), nil, nil) // when r := l.Notify(testkube.Event{ @@ -150,7 +150,7 @@ func TestWebhookListener_Notify(t *testing.T) { t.Parallel() // given - s := NewWebhookListener("l1", "http://baduri.badbadbad", "", testEventTypes, "", "", nil, true, nil, nil, nil, v1.NewMetrics(), nil) + s := NewWebhookListener("l1", "http://baduri.badbadbad", "", testEventTypes, "", "", nil, true, nil, nil, nil, v1.NewMetrics(), nil, nil) // when r := s.Notify(testkube.Event{ diff --git a/pkg/event/kind/webhook/loader.go b/pkg/event/kind/webhook/loader.go index b5a3c28d8a6..6e2d490fc90 100644 --- a/pkg/event/kind/webhook/loader.go +++ b/pkg/event/kind/webhook/loader.go @@ -26,7 +26,7 @@ type WebhooksLister interface { func NewWebhookLoader(log *zap.SugaredLogger, webhooksClient WebhooksLister, templatesClient templatesclientv1.Interface, testExecutionResults result.Repository, testSuiteExecutionResults testresult.Repository, testWorkflowExecutionResults testworkflow.Repository, - metrics v1.Metrics, proContext *config.ProContext, + metrics v1.Metrics, proContext *config.ProContext, envs map[string]string, ) *WebhooksLoader { return &WebhooksLoader{ log: log, @@ -37,6 +37,7 @@ func NewWebhookLoader(log *zap.SugaredLogger, webhooksClient WebhooksLister, tem testWorkflowExecutionResults: testWorkflowExecutionResults, metrics: metrics, proContext: proContext, + envs: envs, } } @@ -49,6 +50,7 @@ type WebhooksLoader struct { testWorkflowExecutionResults testworkflow.Repository metrics v1.Metrics proContext *config.ProContext + envs map[string]string } func (r WebhooksLoader) Kind() string { @@ -84,10 +86,15 @@ func (r WebhooksLoader) Load() (listeners common.Listeners, err error) { types := webhooks.MapEventArrayToCRDEvents(webhook.Spec.Events) name := fmt.Sprintf("%s.%s", webhook.ObjectMeta.Namespace, webhook.ObjectMeta.Name) - listeners = append(listeners, NewWebhookListener(name, webhook.Spec.Uri, webhook.Spec.Selector, types, - webhook.Spec.PayloadObjectField, payloadTemplate, webhook.Spec.Headers, webhook.Spec.Disabled, - r.testExecutionResults, r.testSuiteExecutionResults, r.testWorkflowExecutionResults, - r.metrics, r.proContext)) + listeners = append( + listeners, + NewWebhookListener( + name, webhook.Spec.Uri, webhook.Spec.Selector, types, + webhook.Spec.PayloadObjectField, payloadTemplate, webhook.Spec.Headers, webhook.Spec.Disabled, + r.testExecutionResults, r.testSuiteExecutionResults, r.testWorkflowExecutionResults, + r.metrics, r.proContext, r.envs, + ), + ) } return listeners, nil diff --git a/pkg/event/kind/webhook/loader_test.go b/pkg/event/kind/webhook/loader_test.go index d4709c899d3..55dbc3e444f 100644 --- a/pkg/event/kind/webhook/loader_test.go +++ b/pkg/event/kind/webhook/loader_test.go @@ -30,7 +30,7 @@ func TestWebhookLoader(t *testing.T) { defer mockCtrl.Finish() mockTemplatesClient := templatesclientv1.NewMockInterface(mockCtrl) - webhooksLoader := NewWebhookLoader(zap.NewNop().Sugar(), &DummyLoader{}, mockTemplatesClient, nil, nil, nil, v1.NewMetrics(), nil) + webhooksLoader := NewWebhookLoader(zap.NewNop().Sugar(), &DummyLoader{}, mockTemplatesClient, nil, nil, nil, v1.NewMetrics(), nil, nil) listeners, err := webhooksLoader.Load() assert.Equal(t, 1, len(listeners)) diff --git a/pkg/logs/events_test.go b/pkg/logs/events_test.go index 27cd80a3d45..cce5b3efcbb 100644 --- a/pkg/logs/events_test.go +++ b/pkg/logs/events_test.go @@ -146,7 +146,7 @@ func TestLogs_EventsFlow(t *testing.T) { ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) assert.NoError(t, err) eventBus := bus.NewNATSBus(ec) - emitter := event.NewEmitter(eventBus, "test-cluster", map[string]string{}) + emitter := event.NewEmitter(eventBus, "test-cluster") // and stream client stream, err := client.NewNatsLogStream(nc) diff --git a/pkg/repository/config/mongo_test.go b/pkg/repository/config/mongo_test.go index dcc37362c39..ae289d082ff 100644 --- a/pkg/repository/config/mongo_test.go +++ b/pkg/repository/config/mongo_test.go @@ -30,7 +30,6 @@ func getRepository() (*MongoRepository, error) { func TestStorage_Integration(t *testing.T) { test.IntegrationTest(t) - t.Parallel() assert := require.New(t) @@ -41,7 +40,6 @@ func TestStorage_Integration(t *testing.T) { assert.NoError(err) t.Run("GetUniqueClusterId should return same id for each call", func(t *testing.T) { - t.Parallel() // given/when id1, err := repository.GetUniqueClusterId(context.Background()) assert.NoError(err) @@ -59,7 +57,6 @@ func TestStorage_Integration(t *testing.T) { }) t.Run("Upsert should insert new config entry", func(t *testing.T) { - t.Parallel() // given, clusterId := "uniq3" _, err := repository.Upsert(context.Background(), testkube.Config{ diff --git a/pkg/triggers/executor_test.go b/pkg/triggers/executor_test.go index f8574173bd9..1f62dcb61c4 100644 --- a/pkg/triggers/executor_test.go +++ b/pkg/triggers/executor_test.go @@ -59,7 +59,7 @@ func TestExecute(t *testing.T) { mockExecutor := client.NewMockExecutor(mockCtrl) - mockEventEmitter := event.NewEmitter(bus.NewEventBusMock(), "", nil) + mockEventEmitter := event.NewEmitter(bus.NewEventBusMock(), "") mockTest := testsv3.Test{ ObjectMeta: metav1.ObjectMeta{Namespace: "testkube", Name: "some-test"}, diff --git a/pkg/triggers/service_test.go b/pkg/triggers/service_test.go index 823aae25e56..039ce435720 100644 --- a/pkg/triggers/service_test.go +++ b/pkg/triggers/service_test.go @@ -67,7 +67,7 @@ func TestService_Run(t *testing.T) { mockExecutor := client.NewMockExecutor(mockCtrl) - mockEventEmitter := event.NewEmitter(bus.NewEventBusMock(), "", nil) + mockEventEmitter := event.NewEmitter(bus.NewEventBusMock(), "") mockTest := testsv3.Test{ ObjectMeta: metav1.ObjectMeta{Namespace: "testkube", Name: "some-test"},