Skip to content

Commit

Permalink
fix: sensitive data in events (#5821)
Browse files Browse the repository at this point in the history
* fix: remove sensitive envs from events passed to websockets

* fix: remove envs from agent

* fix: moved envs to webhooks loader

* removed variable

* fix: passed envs to listener event

* fix: not needed map in tests

* Update pkg/event/kind/webhook/listener.go

Co-authored-by: Dawid Rusnak <[email protected]>

* fix: flaky parallel test

* fix: clean envs passed to the default payload

---------

Co-authored-by: Dawid Rusnak <[email protected]>
  • Loading branch information
exu and rangoo94 authored Sep 5, 2024
1 parent 9786aee commit c56796d
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 34 deletions.
3 changes: 2 additions & 1 deletion cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func main() {
eventBus.TraceEvents()
}

eventsEmitter := event.NewEmitter(eventBus, cfg.TestkubeClusterName, envs)
eventsEmitter := event.NewEmitter(eventBus, cfg.TestkubeClusterName)

var logsStream logsclient.Stream

Expand Down Expand Up @@ -642,6 +642,7 @@ func main() {
logGrpcClient,
subscriptionChecker,
serviceAccountNames,
envs,
)

if mode == common.ModeAgent {
Expand Down
5 changes: 4 additions & 1 deletion internal/app/api/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func NewTestkubeAPI(
logGrpcClient logsclient.StreamGetter,
subscriptionChecker checktcl.SubscriptionChecker,
serviceAccountNames map[string]string,
envs map[string]string,
) TestkubeAPI {

var httpConfig server.Config
Expand Down Expand Up @@ -163,6 +164,7 @@ func NewTestkubeAPI(
SubscriptionChecker: subscriptionChecker,
LabelSources: common.Ptr(make([]LabelSource, 0)),
ServiceAccountNames: serviceAccountNames,
Envs: envs,
}
}

Expand Down Expand Up @@ -211,6 +213,7 @@ type TestkubeAPI struct {
SubscriptionChecker checktcl.SubscriptionChecker
LabelSources *[]LabelSource
ServiceAccountNames map[string]string
Envs map[string]string
}

type storageParams struct {
Expand Down Expand Up @@ -612,7 +615,7 @@ func (s *TestkubeAPI) InitEventListeners(

s.Events.Loader.Register(webhook.NewWebhookLoader(
s.Log, webhookClient, templatesClient, testExecutionResults, testSuiteExecutionsResults,
testWorkflowResults, metrics, s.proContext))
testWorkflowResults, metrics, s.proContext, s.Envs))
s.Events.Loader.Register(s.WebsocketLoader)
s.Events.Loader.Register(s.slackLoader)

Expand Down
1 change: 0 additions & 1 deletion pkg/agent/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func (ag *Agent) Metadata() map[string]string {

func (ag *Agent) Notify(event testkube.Event) (result testkube.EventResult) {
event.ClusterName = ag.clusterName
event.Envs = ag.envs
// Non blocking send
select {
case ag.events <- event:
Expand Down
5 changes: 1 addition & 4 deletions pkg/event/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ const (
)

// NewEmitter returns new emitter instance
func NewEmitter(eventBus bus.Bus, clusterName string, envs map[string]string) *Emitter {
func NewEmitter(eventBus bus.Bus, clusterName string) *Emitter {
return &Emitter{
Log: log.DefaultLogger,
Loader: NewLoader(),
Bus: eventBus,
Listeners: make(common.Listeners, 0),
ClusterName: clusterName,
Envs: envs,
}
}

Expand All @@ -39,7 +38,6 @@ type Emitter struct {
mutex sync.RWMutex
Bus bus.Bus
ClusterName string
Envs map[string]string
}

// Register adds new listener
Expand Down Expand Up @@ -126,7 +124,6 @@ func listerersToMap(listeners []common.Listener) map[string]map[string]common.Li
// Notify notifies emitter with webhook
func (e *Emitter) Notify(event testkube.Event) {
event.ClusterName = e.ClusterName
event.Envs = e.Envs
err := e.Bus.PublishTopic(event.Topic(), event)
if err != nil {
e.Log.Errorw("error publishing event", append(event.Log(), "error", err))
Expand Down
2 changes: 1 addition & 1 deletion pkg/event/emitter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func GetTestNATSEmitter() *Emitter {
if err != nil {
panic(err)
}
return NewEmitter(bus.NewNATSBus(nc), "", nil)
return NewEmitter(bus.NewNATSBus(nc), "")
}

func TestEmitter_NATS_Register_Integration(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/event/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestEmitter_Register(t *testing.T) {
t.Parallel()
// given
eventBus := bus.NewEventBusMock()
emitter := NewEmitter(eventBus, "", nil)
emitter := NewEmitter(eventBus, "")
// when
emitter.Register(&dummy.DummyListener{Id: "l1"})

Expand All @@ -43,7 +43,7 @@ func TestEmitter_Listen(t *testing.T) {
t.Parallel()
// given
eventBus := bus.NewEventBusMock()
emitter := NewEmitter(eventBus, "", nil)
emitter := NewEmitter(eventBus, "")
// given listener with matching selector
listener1 := &dummy.DummyListener{Id: "l1", SelectorString: "type=listener1"}
// and listener with second matic selector
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestEmitter_Notify(t *testing.T) {
t.Parallel()
// given
eventBus := bus.NewEventBusMock()
emitter := NewEmitter(eventBus, "", nil)
emitter := NewEmitter(eventBus, "")
// and 2 listeners subscribed to the same queue
// * first on pod1
listener1 := &dummy.DummyListener{Id: "l3"}
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestEmitter_NotifyBecome(t *testing.T) {
t.Parallel()
// given
eventBus := bus.NewEventBusMock()
emitter := NewEmitter(eventBus, "", nil)
emitter := NewEmitter(eventBus, "")
// and 2 listeners subscribed to the same queue
// * first on pod1
listener1 := &dummy.DummyListener{Id: "l5", Types: []testkube.EventType{
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestEmitter_Reconcile(t *testing.T) {
t.Parallel()
// given first reconciler loop was done
eventBus := bus.NewEventBusMock()
emitter := NewEmitter(eventBus, "", nil)
emitter := NewEmitter(eventBus, "")
emitter.Loader.Register(&dummy.DummyLoader{IdPrefix: "dummy1"})
emitter.Loader.Register(&dummy.DummyLoader{IdPrefix: "dummy2"})

Expand Down Expand Up @@ -229,7 +229,7 @@ func TestEmitter_UpdateListeners(t *testing.T) {
t.Parallel()
// given
eventBus := bus.NewEventBusMock()
emitter := NewEmitter(eventBus, "", nil)
emitter := NewEmitter(eventBus, "")
// given listener with matching selector
listener1 := &dummy.DummyListener{Id: "l1", SelectorString: "type=listener1"}
// and listener with second matching selector
Expand Down
13 changes: 11 additions & 2 deletions pkg/event/kind/webhook/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ func NewWebhookListener(name, uri, selector string, events []testkube.EventType,
testSuiteExecutionResults testresult.Repository,
testWorkflowExecutionResults testworkflow.Repository,
metrics v1.Metrics,
proContext *config.ProContext) *WebhookListener {
proContext *config.ProContext,
envs map[string]string,
) *WebhookListener {
return &WebhookListener{
name: name,
Uri: uri,
Expand All @@ -50,6 +52,7 @@ func NewWebhookListener(name, uri, selector string, events []testkube.EventType,
testWorkflowExecutionResults: testWorkflowExecutionResults,
metrics: metrics,
proContext: proContext,
envs: envs,
}
}

Expand All @@ -69,6 +72,7 @@ type WebhookListener struct {
testWorkflowExecutionResults testworkflow.Repository
metrics v1.Metrics
proContext *config.ProContext
envs map[string]string
}

func (l *WebhookListener) Name() string {
Expand Down Expand Up @@ -112,6 +116,9 @@ func (l *WebhookListener) Disabled() bool {
}

func (l *WebhookListener) Notify(event testkube.Event) (result testkube.EventResult) {
// load global envs to be able to use them in templates
event.Envs = l.envs

defer func() {
var eventType, res string
if event.Type_ != nil {
Expand Down Expand Up @@ -169,9 +176,11 @@ func (l *WebhookListener) Notify(event testkube.Event) (result testkube.EventRes

_, err = body.Write(data)
} else {
// clean envs if not requested explicitly by payload template
event.Envs = nil
err = json.NewEncoder(body).Encode(event)
if err == nil && l.payloadObjectField != "" {
data := map[string]string{l.payloadObjectField: string(body.Bytes())}
data := map[string]string{l.payloadObjectField: body.String()}
body.Reset()
err = json.NewEncoder(body).Encode(data)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/event/kind/webhook/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestWebhookListener_Notify(t *testing.T) {
svr := httptest.NewServer(testHandler)
defer svr.Close()

l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil)
l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil, nil)

// when
r := l.Notify(testkube.Event{
Expand All @@ -56,7 +56,7 @@ func TestWebhookListener_Notify(t *testing.T) {
svr := httptest.NewServer(testHandler)
defer svr.Close()

l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil)
l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil, nil)

// when
r := l.Notify(testkube.Event{
Expand All @@ -73,7 +73,7 @@ func TestWebhookListener_Notify(t *testing.T) {
t.Parallel()
// given

s := NewWebhookListener("l1", "http://baduri.badbadbad", "", testEventTypes, "", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil)
s := NewWebhookListener("l1", "http://baduri.badbadbad", "", testEventTypes, "", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil, nil)

// when
r := s.Notify(testkube.Event{
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestWebhookListener_Notify(t *testing.T) {
svr := httptest.NewServer(testHandler)
defer svr.Close()

l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "field", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil)
l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "field", "", nil, false, nil, nil, nil, v1.NewMetrics(), nil, nil)

// when
r := l.Notify(testkube.Event{
Expand All @@ -133,7 +133,7 @@ func TestWebhookListener_Notify(t *testing.T) {
defer svr.Close()

l := NewWebhookListener("l1", svr.URL, "", testEventTypes, "", "{\"id\": \"{{ .Id }}\"}",
map[string]string{"Content-Type": "application/json"}, false, nil, nil, nil, v1.NewMetrics(), nil)
map[string]string{"Content-Type": "application/json"}, false, nil, nil, nil, v1.NewMetrics(), nil, nil)

// when
r := l.Notify(testkube.Event{
Expand All @@ -150,7 +150,7 @@ func TestWebhookListener_Notify(t *testing.T) {
t.Parallel()
// given

s := NewWebhookListener("l1", "http://baduri.badbadbad", "", testEventTypes, "", "", nil, true, nil, nil, nil, v1.NewMetrics(), nil)
s := NewWebhookListener("l1", "http://baduri.badbadbad", "", testEventTypes, "", "", nil, true, nil, nil, nil, v1.NewMetrics(), nil, nil)

// when
r := s.Notify(testkube.Event{
Expand Down
17 changes: 12 additions & 5 deletions pkg/event/kind/webhook/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type WebhooksLister interface {

func NewWebhookLoader(log *zap.SugaredLogger, webhooksClient WebhooksLister, templatesClient templatesclientv1.Interface,
testExecutionResults result.Repository, testSuiteExecutionResults testresult.Repository, testWorkflowExecutionResults testworkflow.Repository,
metrics v1.Metrics, proContext *config.ProContext,
metrics v1.Metrics, proContext *config.ProContext, envs map[string]string,
) *WebhooksLoader {
return &WebhooksLoader{
log: log,
Expand All @@ -37,6 +37,7 @@ func NewWebhookLoader(log *zap.SugaredLogger, webhooksClient WebhooksLister, tem
testWorkflowExecutionResults: testWorkflowExecutionResults,
metrics: metrics,
proContext: proContext,
envs: envs,
}
}

Expand All @@ -49,6 +50,7 @@ type WebhooksLoader struct {
testWorkflowExecutionResults testworkflow.Repository
metrics v1.Metrics
proContext *config.ProContext
envs map[string]string
}

func (r WebhooksLoader) Kind() string {
Expand Down Expand Up @@ -84,10 +86,15 @@ func (r WebhooksLoader) Load() (listeners common.Listeners, err error) {

types := webhooks.MapEventArrayToCRDEvents(webhook.Spec.Events)
name := fmt.Sprintf("%s.%s", webhook.ObjectMeta.Namespace, webhook.ObjectMeta.Name)
listeners = append(listeners, NewWebhookListener(name, webhook.Spec.Uri, webhook.Spec.Selector, types,
webhook.Spec.PayloadObjectField, payloadTemplate, webhook.Spec.Headers, webhook.Spec.Disabled,
r.testExecutionResults, r.testSuiteExecutionResults, r.testWorkflowExecutionResults,
r.metrics, r.proContext))
listeners = append(
listeners,
NewWebhookListener(
name, webhook.Spec.Uri, webhook.Spec.Selector, types,
webhook.Spec.PayloadObjectField, payloadTemplate, webhook.Spec.Headers, webhook.Spec.Disabled,
r.testExecutionResults, r.testSuiteExecutionResults, r.testWorkflowExecutionResults,
r.metrics, r.proContext, r.envs,
),
)
}

return listeners, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/event/kind/webhook/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestWebhookLoader(t *testing.T) {
defer mockCtrl.Finish()

mockTemplatesClient := templatesclientv1.NewMockInterface(mockCtrl)
webhooksLoader := NewWebhookLoader(zap.NewNop().Sugar(), &DummyLoader{}, mockTemplatesClient, nil, nil, nil, v1.NewMetrics(), nil)
webhooksLoader := NewWebhookLoader(zap.NewNop().Sugar(), &DummyLoader{}, mockTemplatesClient, nil, nil, nil, v1.NewMetrics(), nil, nil)
listeners, err := webhooksLoader.Load()

assert.Equal(t, 1, len(listeners))
Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestLogs_EventsFlow(t *testing.T) {
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
assert.NoError(t, err)
eventBus := bus.NewNATSBus(ec)
emitter := event.NewEmitter(eventBus, "test-cluster", map[string]string{})
emitter := event.NewEmitter(eventBus, "test-cluster")

// and stream client
stream, err := client.NewNatsLogStream(nc)
Expand Down
3 changes: 0 additions & 3 deletions pkg/repository/config/mongo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func getRepository() (*MongoRepository, error) {

func TestStorage_Integration(t *testing.T) {
test.IntegrationTest(t)
t.Parallel()

assert := require.New(t)

Expand All @@ -41,7 +40,6 @@ func TestStorage_Integration(t *testing.T) {
assert.NoError(err)

t.Run("GetUniqueClusterId should return same id for each call", func(t *testing.T) {
t.Parallel()
// given/when
id1, err := repository.GetUniqueClusterId(context.Background())
assert.NoError(err)
Expand All @@ -59,7 +57,6 @@ func TestStorage_Integration(t *testing.T) {
})

t.Run("Upsert should insert new config entry", func(t *testing.T) {
t.Parallel()
// given,
clusterId := "uniq3"
_, err := repository.Upsert(context.Background(), testkube.Config{
Expand Down
2 changes: 1 addition & 1 deletion pkg/triggers/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestExecute(t *testing.T) {

mockExecutor := client.NewMockExecutor(mockCtrl)

mockEventEmitter := event.NewEmitter(bus.NewEventBusMock(), "", nil)
mockEventEmitter := event.NewEmitter(bus.NewEventBusMock(), "")

mockTest := testsv3.Test{
ObjectMeta: metav1.ObjectMeta{Namespace: "testkube", Name: "some-test"},
Expand Down
2 changes: 1 addition & 1 deletion pkg/triggers/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestService_Run(t *testing.T) {

mockExecutor := client.NewMockExecutor(mockCtrl)

mockEventEmitter := event.NewEmitter(bus.NewEventBusMock(), "", nil)
mockEventEmitter := event.NewEmitter(bus.NewEventBusMock(), "")

mockTest := testsv3.Test{
ObjectMeta: metav1.ObjectMeta{Namespace: "testkube", Name: "some-test"},
Expand Down

0 comments on commit c56796d

Please sign in to comment.