diff --git a/pkg/controlplane/server.go b/pkg/controlplane/server.go index a88a744c98..5703ac400b 100644 --- a/pkg/controlplane/server.go +++ b/pkg/controlplane/server.go @@ -143,6 +143,82 @@ func (s *Server) GetTestWorkflowNotificationsStream(srv cloud.TestKubeCloudAPI_G return g.Wait() } +func (s *Server) GetTestWorkflowServiceNotificationsStream(srv cloud.TestKubeCloudAPI_GetTestWorkflowServiceNotificationsStreamServer) error { + ctx, cancel := context.WithCancel(srv.Context()) + defer cancel() + g, _ := errgroup.WithContext(ctx) + + ticker := time.NewTicker(SendPingInterval) + defer ticker.Stop() + + g.Go(func() error { + for { + select { + case <-ticker.C: + srv.Send(&cloud.TestWorkflowServiceNotificationsRequest{ + StreamId: "ping", + ExecutionId: "ping", + RequestType: cloud.TestWorkflowNotificationsRequestType_WORKFLOW_STREAM_HEALTH_CHECK, + }) + case <-ctx.Done(): + return nil + } + } + }) + + // Ignore all the messages + g.Go(func() error { + for { + select { + case <-ctx.Done(): + return nil + default: + srv.Recv() + } + } + }) + + return g.Wait() +} + +func (s *Server) GetTestWorkflowParallelStepNotificationsStream(srv cloud.TestKubeCloudAPI_GetTestWorkflowParallelStepNotificationsStreamServer) error { + ctx, cancel := context.WithCancel(srv.Context()) + defer cancel() + g, _ := errgroup.WithContext(ctx) + + ticker := time.NewTicker(SendPingInterval) + defer ticker.Stop() + + g.Go(func() error { + for { + select { + case <-ticker.C: + srv.Send(&cloud.TestWorkflowParallelStepNotificationsRequest{ + StreamId: "ping", + ExecutionId: "ping", + RequestType: cloud.TestWorkflowNotificationsRequestType_WORKFLOW_STREAM_HEALTH_CHECK, + }) + case <-ctx.Done(): + return nil + } + } + }) + + // Ignore all the messages + g.Go(func() error { + for { + select { + case <-ctx.Done(): + return nil + default: + srv.Recv() + } + } + }) + + return g.Wait() +} + // Send is called on agent client, returning from this method closes the connection func (s *Server) Send(srv cloud.TestKubeCloudAPI_SendServer) error { for {