diff --git a/xds/internal/xdsclient/tests/lrs_stream_backoff_test.go b/xds/internal/xdsclient/tests/lrs_stream_backoff_test.go index aa5315819c3e..6a728bdcb06b 100644 --- a/xds/internal/xdsclient/tests/lrs_stream_backoff_test.go +++ b/xds/internal/xdsclient/tests/lrs_stream_backoff_test.go @@ -44,59 +44,59 @@ import ( // period, and that the previously requested resources are re-requested on the // new stream. func (s) TestLRS_BackoffAfterStreamFailure(t *testing.T) { - // Channels for test state. - streamCloseCh := make(chan struct{}, 1) - resourceRequestCh := make(chan []string, 1) - backoffCh := make(chan struct{}, 1) - // Context with timeout. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - // Simulate LRS stream error. - streamErr := errors.New("LRS stream error") - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - SupportLoadReportingService: true, - OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - t.Logf("Simulated server: Received stream request: %+v\n", req) - if req.GetTypeUrl() == version.V3ListenerURL { - select { - case resourceRequestCh <- req.GetResourceNames(): - case <-ctx.Done(): - } - } - return streamErr - }, - OnStreamClosed: func(int64, *v3corepb.Node) { - t.Log("Simulated server: Stream closed") - select { - case streamCloseCh <- struct{}{}: - case <-ctx.Done(): - } - }, - }) - // Backoff behavior. - streamBackoff := func(v int) time.Duration { - t.Log("Backoff triggered") - select { - case backoffCh <- struct{}{}: - case <-ctx.Done(): - } - return 500 * time.Millisecond - } - // Create xDS client and bootstrap configuration. - nodeID := uuid.New().String() - bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - testutils.CreateBootstrapFileForTesting(t, bc) - client := createXDSClientWithBackoff(t, bc, streamBackoff) - - const listenerName = "listener" - lw := newListenerWatcher() - ldsCancel := xdsresource.WatchListener(client, listenerName, lw) - defer ldsCancel() - - // Verify resource request. - if err := waitForResourceNames(ctx, t, resourceRequestCh, []string{listenerName}); err != nil { - t.Fatal(err) - } + // Channels for test state. + streamCloseCh := make(chan struct{}, 1) + resourceRequestCh := make(chan []string, 1) + backoffCh := make(chan struct{}, 1) + // Context with timeout. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Simulate LRS stream error. + streamErr := errors.New("LRS stream error") + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + SupportLoadReportingService: true, + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + t.Logf("Simulated server: Received stream request: %+v\n", req) + if req.GetTypeUrl() == version.V3ListenerURL { + select { + case resourceRequestCh <- req.GetResourceNames(): + case <-ctx.Done(): + } + } + return streamErr + }, + OnStreamClosed: func(int64, *v3corepb.Node) { + t.Log("Simulated server: Stream closed") + select { + case streamCloseCh <- struct{}{}: + case <-ctx.Done(): + } + }, + }) + // Backoff behavior. + streamBackoff := func(v int) time.Duration { + t.Log("Backoff triggered") + select { + case backoffCh <- struct{}{}: + case <-ctx.Done(): + } + return 500 * time.Millisecond + } + // Create xDS client and bootstrap configuration. + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + testutils.CreateBootstrapFileForTesting(t, bc) + client := createXDSClientWithBackoff(t, bc, streamBackoff) + + const listenerName = "listener" + lw := newListenerWatcher() + ldsCancel := xdsresource.WatchListener(client, listenerName, lw) + defer ldsCancel() + + // Verify resource request. + if err := waitForResourceNames(ctx, t, resourceRequestCh, []string{listenerName}); err != nil { + t.Fatal(err) + } // Verify that the received stream error is reported to the watcher. u, err := lw.updateCh.Receive(ctx) @@ -108,108 +108,108 @@ func (s) TestLRS_BackoffAfterStreamFailure(t *testing.T) { t.Fatalf("Received stream error: %v, wantErr: %v", gotErr, streamErr) } - // Verify stream closure. - select { - case <-streamCloseCh: - t.Log("Stream closure observed after error") - case <-ctx.Done(): - t.Fatal("Timeout waiting for LRS stream closure") - } - // Verify backoff signal. - select { - case <-backoffCh: - t.Log("Backoff observed before stream restart") - case <-ctx.Done(): - t.Fatal("Timeout waiting for backoff signal") - } - // Verify re-request. - if err := waitForResourceNames(ctx, t, resourceRequestCh, []string{listenerName}); err != nil { - t.Fatal(err) - } + // Verify stream closure. + select { + case <-streamCloseCh: + t.Log("Stream closure observed after error") + case <-ctx.Done(): + t.Fatal("Timeout waiting for LRS stream closure") + } + // Verify backoff signal. + select { + case <-backoffCh: + t.Log("Backoff observed before stream restart") + case <-ctx.Done(): + t.Fatal("Timeout waiting for backoff signal") + } + // Verify re-request. + if err := waitForResourceNames(ctx, t, resourceRequestCh, []string{listenerName}); err != nil { + t.Fatal(err) + } } // Tests the case where a stream breaks because the server goes down. Verifies // that when the server comes back up, the same resources are re-requested, // this time with the previously acked version and an empty nonce. func (s) TestLRS_BackoffAfterBrokenStream(t *testing.T) { - // Channels for verifying different events in the test. - streamCloseCh := make(chan struct{}, 1) // LRS stream is closed. - resourceRequestCh := make(chan []string, 1) // Resource names in the discovery request. - backoffCh := make(chan struct{}, 1) // Backoff after stream failure. - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // Simulate LRS stream error. - // streamErr := errors.New("LRS stream error") - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - SupportLoadReportingService: true, - OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - if req.GetTypeUrl() == version.V3ListenerURL { - t.Logf("Received LRS request for resources: %v", req.GetResourceNames()) - select { - case resourceRequestCh <- req.GetResourceNames(): - case <-ctx.Done(): - } - } - return errors.New("unsupported TypeURL") - }, - OnStreamClosed: func(int64, *v3corepb.Node) { - t.Log("Simulated server: Stream closed") - select { - case streamCloseCh <- struct{}{}: - case <-ctx.Done(): - } - }, - }) - - // Override the backoff implementation. - streamBackoff := func(v int) time.Duration { - t.Log("Backoff triggered") - select { - case backoffCh <- struct{}{}: - case <-ctx.Done(): - } - return 500 * time.Millisecond - } - - // Create an xDS client with bootstrap pointing to the above server. - nodeID := uuid.New().String() - bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - testutils.CreateBootstrapFileForTesting(t, bc) - client := createXDSClientWithBackoff(t, bc, streamBackoff) - - // Register a watch for load reporting resource. - const resourceName = "load-report" - lw := newListenerWatcher() // Replace this with the correct LRS watcher if available. - lrsCancel := xdsresource.WatchListener(client, resourceName, lw) - defer lrsCancel() - - // Verify the initial resource request. - if err := waitForResourceNames(ctx, t, resourceRequestCh, []string{resourceName}); err != nil { - t.Fatal(err) - } - - // Verify stream closure after an error. - select { - case <-streamCloseCh: - t.Log("Stream closure observed after error") - case <-ctx.Done(): - t.Fatal("Timeout waiting for LRS stream closure") - } - - // Verify backoff signal before restarting the stream. - select { - case <-backoffCh: - t.Log("Backoff observed before stream restart") - case <-ctx.Done(): - t.Fatal("Timeout waiting for backoff signal") - } - - // Verify the resource request is re-sent after stream recovery. - if err := waitForResourceNames(ctx, t, resourceRequestCh, []string{resourceName}); err != nil { - t.Fatal(err) - } + // Channels for verifying different events in the test. + streamCloseCh := make(chan struct{}, 1) // LRS stream is closed. + resourceRequestCh := make(chan []string, 1) // Resource names in the discovery request. + backoffCh := make(chan struct{}, 1) // Backoff after stream failure. + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Simulate LRS stream error. + // streamErr := errors.New("LRS stream error") + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + SupportLoadReportingService: true, + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3ListenerURL { + t.Logf("Received LRS request for resources: %v", req.GetResourceNames()) + select { + case resourceRequestCh <- req.GetResourceNames(): + case <-ctx.Done(): + } + } + return errors.New("unsupported TypeURL") + }, + OnStreamClosed: func(int64, *v3corepb.Node) { + t.Log("Simulated server: Stream closed") + select { + case streamCloseCh <- struct{}{}: + case <-ctx.Done(): + } + }, + }) + + // Override the backoff implementation. + streamBackoff := func(v int) time.Duration { + t.Log("Backoff triggered") + select { + case backoffCh <- struct{}{}: + case <-ctx.Done(): + } + return 500 * time.Millisecond + } + + // Create an xDS client with bootstrap pointing to the above server. + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + testutils.CreateBootstrapFileForTesting(t, bc) + client := createXDSClientWithBackoff(t, bc, streamBackoff) + + // Register a watch for load reporting resource. + const resourceName = "load-report" + lw := newListenerWatcher() // Replace this with the correct LRS watcher if available. + lrsCancel := xdsresource.WatchListener(client, resourceName, lw) + defer lrsCancel() + + // Verify the initial resource request. + if err := waitForResourceNames(ctx, t, resourceRequestCh, []string{resourceName}); err != nil { + t.Fatal(err) + } + + // Verify stream closure after an error. + select { + case <-streamCloseCh: + t.Log("Stream closure observed after error") + case <-ctx.Done(): + t.Fatal("Timeout waiting for LRS stream closure") + } + + // Verify backoff signal before restarting the stream. + select { + case <-backoffCh: + t.Log("Backoff observed before stream restart") + case <-ctx.Done(): + t.Fatal("Timeout waiting for backoff signal") + } + + // Verify the resource request is re-sent after stream recovery. + if err := waitForResourceNames(ctx, t, resourceRequestCh, []string{resourceName}); err != nil { + t.Fatal(err) + } } // Tests the case where a stream breaks because the server goes down. Verifies @@ -230,8 +230,8 @@ func (s) TestLRS_RetriesAfterBrokenStream(t *testing.T) { } lis := testutils.NewRestartableListener(l) mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - Listener: lis, - SupportLoadReportingService: true, + Listener: lis, + SupportLoadReportingService: true, // Push the received request on to a channel for the test goroutine to // verify that it matches expectations. OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { @@ -390,102 +390,101 @@ func (s) TestLRS_RetriesAfterBrokenStream(t *testing.T) { // exists. Verifies that the a discovery request is sent out for the previously // requested resource once a valid stream is created. func (s) TestLRS_ResourceRequestedBeforeStreamCreation(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // Channels for verifying different events in the test. - streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) - - // Create an xDS management server listening on a local port. - l, err := testutils.LocalTCPListener() - if err != nil { - t.Fatalf("Failed to create a local listener: %v", err) - } - defer l.Close() - - lis := testutils.NewRestartableListener(l) - defer lis.Stop() - - streamErr := errors.New("LRS stream error") - - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - Listener: lis, - SupportLoadReportingService: true, - OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { - // Capture only LoadStats requests. - if req.GetTypeUrl() == version.V3ListenerURL { - select { - case streamRequestCh <- req: - default: - } - } - return streamErr - }, - }) - // defer mgmtServer.Stop() - - // Bring down the management server before creating the transport. - lis.Stop() - - // Override backoff to minimize test time. - backoffCh := make(chan struct{}, 1) - unblockBackoffCh := make(chan struct{}) - streamBackoff := func(v int) time.Duration { - select { - case backoffCh <- struct{}{}: - default: - } - <-unblockBackoffCh - return 0 - } - - // Create an xDS client with bootstrap pointing to the above server. - nodeID := uuid.New().String() - bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - testutils.CreateBootstrapFileForTesting(t, bc) - client := createXDSClientWithBackoff(t, bc, streamBackoff) - if client == nil { - t.Fatalf("Failed to create xDS client") - } - - // Register a listener watch for the "load-report" resource. - const listenerName = "load-report" - lw := newListenerWatcher() - ldsCancel := xdsresource.WatchListener(client, listenerName, lw) - defer ldsCancel() - - // Wait for backoff to kick in. - select { - case <-backoffCh: - case <-ctx.Done(): - t.Fatal("Timeout waiting for stream backoff") - } - - // Bring up the connection to the management server and unblock the backoff. - lis.Restart() - close(unblockBackoffCh) - - // Verify that the initial discovery request matches expectations. - var gotReq *v3discoverypb.DiscoveryRequest - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery request on the stream") - } - wantReq := &v3discoverypb.DiscoveryRequest{ - VersionInfo: "", - Node: &v3corepb.Node{ - Id: nodeID, - UserAgentName: "gRPC Go", - UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, - ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, - }, - ResourceNames: []string{listenerName}, - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - ResponseNonce: "", - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } -} + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Channels for verifying different events in the test. + streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) + + // Create an xDS management server listening on a local port. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create a local listener: %v", err) + } + defer l.Close() + + lis := testutils.NewRestartableListener(l) + defer lis.Stop() + + streamErr := errors.New("LRS stream error") + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: lis, + SupportLoadReportingService: true, + OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { + // Capture only LoadStats requests. + if req.GetTypeUrl() == version.V3ListenerURL { + select { + case streamRequestCh <- req: + default: + } + } + return streamErr + }, + }) + // defer mgmtServer.Stop() + + // Bring down the management server before creating the transport. + lis.Stop() + + // Override backoff to minimize test time. + backoffCh := make(chan struct{}, 1) + unblockBackoffCh := make(chan struct{}) + streamBackoff := func(v int) time.Duration { + select { + case backoffCh <- struct{}{}: + default: + } + <-unblockBackoffCh + return 0 + } + + // Create an xDS client with bootstrap pointing to the above server. + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + testutils.CreateBootstrapFileForTesting(t, bc) + client := createXDSClientWithBackoff(t, bc, streamBackoff) + if client == nil { + t.Fatalf("Failed to create xDS client") + } + + // Register a listener watch for the "load-report" resource. + const listenerName = "load-report" + lw := newListenerWatcher() + ldsCancel := xdsresource.WatchListener(client, listenerName, lw) + defer ldsCancel() + + // Wait for backoff to kick in. + select { + case <-backoffCh: + case <-ctx.Done(): + t.Fatal("Timeout waiting for stream backoff") + } + + // Bring up the connection to the management server and unblock the backoff. + lis.Restart() + close(unblockBackoffCh) + + // Verify that the initial discovery request matches expectations. + var gotReq *v3discoverypb.DiscoveryRequest + select { + case gotReq = <-streamRequestCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for discovery request on the stream") + } + wantReq := &v3discoverypb.DiscoveryRequest{ + VersionInfo: "", + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + }, + ResourceNames: []string{listenerName}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + ResponseNonce: "", + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } +}