From 12cb8a7d60f50f353465730a303d02b1f339490f Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Tue, 3 Dec 2024 09:40:39 +0530 Subject: [PATCH] use a pool to isolate xds clients with different bootstrap contents for same server --- examples/features/gracefulstop/server/main.go | 87 ----------- xds/csds/csds_e2e_test.go | 8 +- .../cdsbalancer/cdsbalancer_security_test.go | 9 +- .../balancer/cdsbalancer/cdsbalancer_test.go | 18 ++- .../e2e_test/aggregate_cluster_test.go | 7 +- .../clusterresolver/e2e_test/balancer_test.go | 9 +- .../clusterresolver/e2e_test/eds_impl_test.go | 61 +++++--- xds/internal/resolver/xds_resolver.go | 5 +- xds/internal/resolver/xds_resolver_test.go | 7 +- xds/internal/server/rds_handler_test.go | 9 +- xds/internal/xdsclient/client_new.go | 64 +------- xds/internal/xdsclient/client_refcounted.go | 24 +-- .../xdsclient/client_refcounted_test.go | 2 +- xds/internal/xdsclient/clientimpl_dump.go | 6 +- xds/internal/xdsclient/pool.go | 142 ++++++++++++++++++ .../tests/ads_stream_ack_nack_test.go | 9 +- .../tests/ads_stream_backoff_test.go | 7 +- .../tests/ads_stream_flow_control_test.go | 9 +- .../tests/ads_stream_restart_test.go | 9 +- .../xdsclient/tests/ads_stream_watch_test.go | 7 +- .../xdsclient/tests/authority_test.go | 7 +- .../xdsclient/tests/cds_watchers_test.go | 86 +++++++---- xds/internal/xdsclient/tests/dump_test.go | 8 +- .../xdsclient/tests/eds_watchers_test.go | 62 +++++--- xds/internal/xdsclient/tests/fallback_test.go | 21 ++- .../tests/federation_watchers_test.go | 9 +- .../xdsclient/tests/lds_watchers_test.go | 101 +++++++++---- .../xdsclient/tests/misc_watchers_test.go | 18 ++- .../xdsclient/tests/rds_watchers_test.go | 65 +++++--- .../xdsclient/tests/resource_update_test.go | 18 +-- xds/server.go | 7 +- xds/server_ext_test.go | 16 +- xds/server_options.go | 36 ++++- xds/server_test.go | 12 +- 34 files changed, 600 insertions(+), 365 deletions(-) delete mode 100644 examples/features/gracefulstop/server/main.go create mode 100644 xds/internal/xdsclient/pool.go diff --git a/examples/features/gracefulstop/server/main.go b/examples/features/gracefulstop/server/main.go deleted file mode 100644 index 57e6dd7da109..000000000000 --- a/examples/features/gracefulstop/server/main.go +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Copyright 2024 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Binary server demonstrates how to gracefully stop a gRPC server. -package main - -import ( - "flag" - "fmt" - "log" - "net" - "sync" - "sync/atomic" - - "google.golang.org/grpc" - pb "google.golang.org/grpc/examples/features/proto/echo" -) - -var ( - port = flag.Int("port", 50052, "port number") - streamMessages int32 - mu sync.Mutex - streamCh chan struct{} // to signal if server streaming started -) - -type server struct { - pb.UnimplementedEchoServer -} - -// ServerStreamingEcho implements the EchoService.ServerStreamingEcho method. -// It receives an EchoRequest and sends back a stream of EchoResponses until an -// error occurs or the stream is closed. -func (s *server) ServerStreamingEcho(_ *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error { - // Signal streaming start to initiate graceful stop which should wait until - // server streaming finishes. - streamCh <- struct{}{} - - for { - atomic.AddInt32(&streamMessages, 1) - - mu.Lock() - if err := stream.Send(&pb.EchoResponse{Message: fmt.Sprintf("Messages Sent: %d", streamMessages)}); err != nil { - log.Printf("Stream is sending data: %v. Stop Streaming", err) - return err - } - mu.Unlock() - } -} - -func main() { - streamCh = make(chan struct{}) - flag.Parse() - - address := fmt.Sprintf(":%v", *port) - lis, err := net.Listen("tcp", address) - if err != nil { - log.Fatalf("failed to listen: %v", err) - } - - s := grpc.NewServer() - pb.RegisterEchoServer(s, &server{}) - - go func() { - <-streamCh // wait until server streaming starts - log.Println("Initiating graceful shutdown...") - s.GracefulStop() // gracefully stop server after in-flight server streaming rpc finishes - log.Println("Server stopped gracefully.") - }() - - if err := s.Serve(lis); err != nil { - log.Fatalf("failed to serve: %v", err) - } -} diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index 3c838afb67fc..cfbb85d94936 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -224,7 +224,7 @@ func (s) TestCSDS(t *testing.T) { // Create two xDS clients, with different names. These should end up // creating two different xDS clients. const xdsClient1Name = "xds-csds-client-1" - xdsClient1, xdsClose1, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + xdsClient1, xdsClose1, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: xdsClient1Name, Contents: bootstrapContents, }) @@ -233,7 +233,7 @@ func (s) TestCSDS(t *testing.T) { } defer xdsClose1() const xdsClient2Name = "xds-csds-client-2" - xdsClient2, xdsClose2, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + xdsClient2, xdsClose2, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: xdsClient2Name, Contents: bootstrapContents, }) @@ -421,7 +421,7 @@ func (s) TestCSDS_NACK(t *testing.T) { // Create two xDS clients, with different names. These should end up // creating two different xDS clients. const xdsClient1Name = "xds-csds-client-1" - xdsClient1, xdsClose1, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + xdsClient1, xdsClose1, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: xdsClient1Name, Contents: bootstrapContents, }) @@ -430,7 +430,7 @@ func (s) TestCSDS_NACK(t *testing.T) { } defer xdsClose1() const xdsClient2Name = "xds-csds-client-2" - xdsClient2, xdsClose2, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + xdsClient2, xdsClose2, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: xdsClient2Name, Contents: bootstrapContents, }) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go index cb6eff0cd1c8..83f8d274b819 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go @@ -138,9 +138,12 @@ func registerWrappedCDSPolicyWithNewSubConnOverride(t *testing.T, ch chan *xdscr func setupForSecurityTests(t *testing.T, bootstrapContents []byte, clientCreds, serverCreds credentials.TransportCredentials) (*grpc.ClientConn, string) { t.Helper() - xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + xdsClient, xdsClose, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index bd9cd5573805..10ed076555af 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -233,9 +233,12 @@ func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *gr nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + xdsC, xdsClose, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -356,9 +359,12 @@ func (s) TestConfigurationUpdate_EmptyCluster(t *testing.T) { nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + xdsClient, xdsClose, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go index 89f3d1bcac71..d23c5280fd99 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go @@ -1180,9 +1180,12 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) { // Create an xDS client talking to the above management server, configured // with a short watch expiry timeout. - xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + xdsClient, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bootstrapContents, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, }) if err != nil { diff --git a/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go b/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go index 2d934ef3ae10..aa15f204743b 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go @@ -74,9 +74,12 @@ func setupAndDial(t *testing.T, bootstrapContents []byte) (*grpc.ClientConn, fun t.Helper() // Create an xDS client for use by the cluster_resolver LB policy. - xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + xdsC, xdsClose, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index af6224585aca..33de7493d1dd 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -150,9 +150,12 @@ func (s) TestEDS_OneLocality(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -285,9 +288,12 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -449,9 +455,12 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -522,9 +531,12 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -921,9 +933,12 @@ func (s) TestEDS_BadUpdateWithoutPreviousGoodUpdate(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + xdsClient, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -993,9 +1008,12 @@ func (s) TestEDS_BadUpdateWithPreviousGoodUpdate(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + xdsClient, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -1065,9 +1083,12 @@ func (s) TestEDS_ResourceNotFound(t *testing.T) { // with a short watch expiry timeout. nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + xdsClient, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, }) if err != nil { diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 1ba6c001d93d..51ed79fd9adb 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -46,11 +46,12 @@ const Scheme = "xds" // newBuilderWithConfigForTesting creates a new xds resolver builder using a // specific xds bootstrap config, so tests can use multiple xds clients in -// different ClientConns at the same time. +// different ClientConns at the same time. It creates the new xds client int +// the default pool with provided config. func newBuilderWithConfigForTesting(config []byte) (resolver.Builder, error) { return &xdsResolverBuilder{ newXDSClient: func(name string) (xdsclient.XDSClient, func(), error) { - return xdsclient.NewForTesting(xdsclient.OptionsForTesting{Name: name, Contents: config}) + return xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{Name: name, Contents: config}) }, }, nil } diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 77e8c47e6cd5..c7965b206860 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -254,9 +254,12 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) { closeCh := make(chan struct{}) rinternal.NewXDSClient = func(string) (xdsclient.XDSClient, func(), error) { bc := e2e.DefaultBootstrapContents(t, uuid.New().String(), "dummy-management-server-address") - c, cancel, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + c, cancel, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestTimeout, }) return c, grpcsync.OnceFunc(func() { diff --git a/xds/internal/server/rds_handler_test.go b/xds/internal/server/rds_handler_test.go index 915fedadbbab..b6a18c61b835 100644 --- a/xds/internal/server/rds_handler_test.go +++ b/xds/internal/server/rds_handler_test.go @@ -111,9 +111,12 @@ func xdsSetupForTests(t *testing.T) (*e2e.ManagementServer, string, chan []strin nodeID := uuid.New().String() bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - xdsC, cancel, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + xdsC, cancel, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatal(err) diff --git a/xds/internal/xdsclient/client_new.go b/xds/internal/xdsclient/client_new.go index d8f9d6c9417b..37cad6198881 100644 --- a/xds/internal/xdsclient/client_new.go +++ b/xds/internal/xdsclient/client_new.go @@ -25,7 +25,6 @@ import ( "time" "google.golang.org/grpc/internal" - "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/cache" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/xds/bootstrap" @@ -40,6 +39,8 @@ import ( // value, and is defined in gRFC A71. const NameForServer = "#server" +var clientsMu sync.Mutex + // New returns an xDS client configured with bootstrap configuration specified // by the ordered list: // - file name containing the configuration specified by GRPC_XDS_BOOTSTRAP @@ -61,7 +62,10 @@ func New(name string) (XDSClient, func(), error) { if err != nil { return nil, nil, fmt.Errorf("xds: failed to get xDS bootstrap config: %v", err) } - return newRefCounted(name, config, defaultWatchExpiryTimeout, defaultIdleChannelExpiryTimeout, backoff.DefaultExponential.Backoff) + clientsMu.Lock() + DefaultPool.config = config + clientsMu.Unlock() + return DefaultPool.NewClient(name) } // newClientImpl returns a new xdsClient with the given config. @@ -131,57 +135,6 @@ type OptionsForTesting struct { StreamBackoffAfterFailure func(int) time.Duration } -// NewForTesting returns an xDS client configured with the provided options. -// -// The second return value represents a close function which the caller is -// expected to invoke once they are done using the client. It is safe for the -// caller to invoke this close function multiple times. -// -// # Testing Only -// -// This function should ONLY be used for testing purposes. -func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) { - if opts.Name == "" { - return nil, nil, fmt.Errorf("opts.Name field must be non-empty") - } - if opts.WatchExpiryTimeout == 0 { - opts.WatchExpiryTimeout = defaultWatchExpiryTimeout - } - if opts.IdleChannelExpiryTimeout == 0 { - opts.IdleChannelExpiryTimeout = defaultIdleChannelExpiryTimeout - } - if opts.StreamBackoffAfterFailure == nil { - opts.StreamBackoffAfterFailure = defaultStreamBackoffFunc - } - - config, err := bootstrap.NewConfigForTesting(opts.Contents) - if err != nil { - return nil, nil, err - } - return newRefCounted(opts.Name, config, opts.WatchExpiryTimeout, opts.IdleChannelExpiryTimeout, opts.StreamBackoffAfterFailure) -} - -// GetForTesting returns an xDS client created earlier using the given name. -// -// The second return value represents a close function which the caller is -// expected to invoke once they are done using the client. It is safe for the -// caller to invoke this close function multiple times. -// -// # Testing Only -// -// This function should ONLY be used for testing purposes. -func GetForTesting(name string) (XDSClient, func(), error) { - clientsMu.Lock() - defer clientsMu.Unlock() - - c, ok := clients[name] - if !ok { - return nil, nil, fmt.Errorf("xDS client with name %q not found", name) - } - c.incrRef() - return c, grpcsync.OnceFunc(func() { clientRefCountedClose(name) }), nil -} - func init() { internal.TriggerXDSResourceNotFoundForTesting = triggerXDSResourceNotFoundForTesting xdsclientinternal.ResourceWatchStateForTesting = resourceWatchStateForTesting @@ -202,8 +155,3 @@ func resourceWatchStateForTesting(client XDSClient, typ xdsresource.Type, name s } return crc.clientImpl.resourceWatchStateForTesting(typ, name) } - -var ( - clients = map[string]*clientRefCounted{} - clientsMu sync.Mutex -) diff --git a/xds/internal/xdsclient/client_refcounted.go b/xds/internal/xdsclient/client_refcounted.go index 1c105ac4e061..c6afbc951f8e 100644 --- a/xds/internal/xdsclient/client_refcounted.go +++ b/xds/internal/xdsclient/client_refcounted.go @@ -41,11 +41,11 @@ var ( defaultStreamBackoffFunc = backoff.DefaultExponential.Backoff ) -func clientRefCountedClose(name string) { - clientsMu.Lock() - defer clientsMu.Unlock() +func clientRefCountedClose(name string, pool *Pool) { + pool.mu.Lock() + defer pool.mu.Unlock() - client, ok := clients[name] + client, ok := pool.clients[name] if !ok { logger.Errorf("Attempt to close a non-existent xDS client with name %s", name) return @@ -55,20 +55,20 @@ func clientRefCountedClose(name string) { } client.clientImpl.close() xdsClientImplCloseHook(name) - delete(clients, name) + delete(pool.clients, name) } // newRefCounted creates a new reference counted xDS client implementation for // name, if one does not exist already. If an xDS client for the given name // exists, it gets a reference to it and returns it. -func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout, idleChannelExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) { - clientsMu.Lock() - defer clientsMu.Unlock() +func newRefCounted(name string, pool *Pool, config *bootstrap.Config, watchExpiryTimeout, idleChannelExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) { + pool.mu.Lock() + defer pool.mu.Unlock() - if c := clients[name]; c != nil { + if c := pool.clients[name]; c != nil { c.incrRef() - return c, grpcsync.OnceFunc(func() { clientRefCountedClose(name) }), nil + return c, grpcsync.OnceFunc(func() { clientRefCountedClose(name, pool) }), nil } // Create the new client implementation. @@ -78,11 +78,11 @@ func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout, id } c.logger.Infof("Created client with name %q and bootstrap configuration:\n %s", name, config) client := &clientRefCounted{clientImpl: c, refCount: 1} - clients[name] = client + pool.clients[name] = client xdsClientImplCreateHook(name) logger.Infof("xDS node ID: %s", config.Node().GetId()) - return client, grpcsync.OnceFunc(func() { clientRefCountedClose(name) }), nil + return client, grpcsync.OnceFunc(func() { clientRefCountedClose(name, pool) }), nil } // clientRefCounted is ref-counted, and to be shared by the xds resolver and diff --git a/xds/internal/xdsclient/client_refcounted_test.go b/xds/internal/xdsclient/client_refcounted_test.go index 0824ad385b26..08c67c16403d 100644 --- a/xds/internal/xdsclient/client_refcounted_test.go +++ b/xds/internal/xdsclient/client_refcounted_test.go @@ -122,7 +122,7 @@ func (s) TestClientNew_Single(t *testing.T) { // Tests the scenario where there are multiple calls to New() with different // names. Verifies that reference counts are tracked correctly for each client // and that only when all references are released for a client, it is closed. -func (s) TestClientNew_Multiple(t *testing.T) { +func TestClientNew_Multiple(t *testing.T) { // Create a bootstrap configuration, place it in a file in the temp // directory, and set the bootstrap env vars to point to it. nodeID := uuid.New().String() diff --git a/xds/internal/xdsclient/clientimpl_dump.go b/xds/internal/xdsclient/clientimpl_dump.go index 9d7586773046..559ac388d6da 100644 --- a/xds/internal/xdsclient/clientimpl_dump.go +++ b/xds/internal/xdsclient/clientimpl_dump.go @@ -37,11 +37,11 @@ func (c *clientImpl) dumpResources() *v3statuspb.ClientConfig { // DumpResources returns the status and contents of all xDS resources. func DumpResources() *v3statuspb.ClientStatusResponse { - clientsMu.Lock() - defer clientsMu.Unlock() + DefaultPool.mu.Lock() + defer DefaultPool.mu.Unlock() resp := &v3statuspb.ClientStatusResponse{} - for key, client := range clients { + for key, client := range DefaultPool.clients { cfg := client.dumpResources() cfg.ClientScope = key resp.Config = append(resp.Config, cfg) diff --git a/xds/internal/xdsclient/pool.go b/xds/internal/xdsclient/pool.go new file mode 100644 index 000000000000..8347b9dfbfb0 --- /dev/null +++ b/xds/internal/xdsclient/pool.go @@ -0,0 +1,142 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient + +import ( + "fmt" + "sync" + + "google.golang.org/grpc/internal/backoff" + "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/xds/bootstrap" +) + +var ( + // DefaultPool is the default pool for xds clients. It is created at the + // init time. + DefaultPool *Pool +) + +// Pool represents pool of xds clients. +type Pool struct { + mu sync.Mutex + clients map[string]*clientRefCounted + config *bootstrap.Config +} + +func init() { + DefaultPool = &Pool{clients: make(map[string]*clientRefCounted)} +} + +// NewPool creates a new xds client pool with the given bootstrap contents. +func NewPool(bootstrapContents []byte) (*Pool, error) { + config, err := bootstrap.NewConfigForTesting(bootstrapContents) + if err != nil { + return nil, err + } + return &Pool{ + clients: make(map[string]*clientRefCounted), + config: config, + }, nil +} + +// NewClient returns an xds client with the given name from the pool. If the +// client doesn't already exist, it creates a new xds client and adds it to the +// pool. +// +// The second return value represents a close function which the caller is +// expected to invoke once they are done using the client. It is safe for the +// caller to invoke this close function multiple times. +func (p *Pool) NewClient(name string) (XDSClient, func(), error) { + return newRefCounted(name, p, p.config, defaultWatchExpiryTimeout, defaultIdleChannelExpiryTimeout, backoff.DefaultExponential.Backoff) +} + +// NewClientForTesting returns an xds client configured with the provided +// options from the pool. If the client doesn't already exist, it creates a new +// xds client and adds it to the pool. +// +// The second return value represents a close function which the caller is +// expected to invoke once they are done using the client. It is safe for the +// caller to invoke this close function multiple times. +// +// # Testing Only +// +// This function should ONLY be used for testing purposes. +func (p *Pool) NewClientForTesting(opts OptionsForTesting) (XDSClient, func(), error) { + if opts.Name == "" { + return nil, nil, fmt.Errorf("opts.Name field must be non-empty") + } + if opts.WatchExpiryTimeout == 0 { + opts.WatchExpiryTimeout = defaultWatchExpiryTimeout + } + if opts.IdleChannelExpiryTimeout == 0 { + opts.IdleChannelExpiryTimeout = defaultIdleChannelExpiryTimeout + } + if opts.StreamBackoffAfterFailure == nil { + opts.StreamBackoffAfterFailure = defaultStreamBackoffFunc + } + if opts.Contents != nil { + p.mu.Lock() + config, err := bootstrap.NewConfigForTesting(opts.Contents) + if err != nil { + p.mu.Unlock() + return nil, nil, err + } + p.config = config + p.mu.Unlock() + } + + return newRefCounted(opts.Name, p, p.config, opts.WatchExpiryTimeout, opts.IdleChannelExpiryTimeout, opts.StreamBackoffAfterFailure) +} + +// GetClientForTesting returns an xds client created earlier using the given +// name from the pool. +// +// The second return value represents a close function which the caller is +// expected to invoke once they are done using the client. It is safe for the +// caller to invoke this close function multiple times. +// +// # Testing Only +// +// This function should ONLY be used for testing purposes. +func (p *Pool) GetClientForTesting(name string) (XDSClient, func(), error) { + p.mu.Lock() + defer p.mu.Unlock() + + c, ok := p.clients[name] + if !ok { + return nil, nil, fmt.Errorf("xDS client with name %q not found", name) + } + c.incrRef() + return c, grpcsync.OnceFunc(func() { clientRefCountedClose(name, p) }), nil +} + +// SetFallbackBootstrapConfig to specify a bootstrap configuration to use a +// fallback when the bootstrap env vars are not specified. +func (p *Pool) SetFallbackBootstrapConfig(bootstrapContents []byte) error { + p.mu.Lock() + defer p.mu.Unlock() + + config, err := bootstrap.NewConfigForTesting(bootstrapContents) + if err != nil { + return err + } + p.config = config + return nil +} diff --git a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go index feebc1adc915..8c01e7d5387a 100644 --- a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go @@ -376,9 +376,12 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { // Create an xDS client with bootstrap pointing to the above server. bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) testutils.CreateBootstrapFileForTesting(t, bc) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/xds/internal/xdsclient/tests/ads_stream_backoff_test.go b/xds/internal/xdsclient/tests/ads_stream_backoff_test.go index fe8125048207..659e22558869 100644 --- a/xds/internal/xdsclient/tests/ads_stream_backoff_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_backoff_test.go @@ -46,10 +46,13 @@ import ( func createXDSClientWithBackoff(t *testing.T, bootstrapContents []byte, streamBackoff func(int) time.Duration) xdsclient.XDSClient { t.Helper() - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), StreamBackoffAfterFailure: streamBackoff, - Contents: bootstrapContents, }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go index ff0243f3d462..e6df37995634 100644 --- a/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go @@ -150,9 +150,12 @@ func overrideADSStreamCreation(t *testing.T) chan *wrappedADSStream { func createXDSClient(t *testing.T, bootstrapContents []byte) xdsclient.XDSClient { t.Helper() - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/xds/internal/xdsclient/tests/ads_stream_restart_test.go b/xds/internal/xdsclient/tests/ads_stream_restart_test.go index f0da932f5fd8..af94556186da 100644 --- a/xds/internal/xdsclient/tests/ads_stream_restart_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_restart_test.go @@ -112,9 +112,12 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) // Create an xDS client with the above bootstrap configuration. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/xds/internal/xdsclient/tests/ads_stream_watch_test.go b/xds/internal/xdsclient/tests/ads_stream_watch_test.go index 285ba99cf5ab..44ecc169c976 100644 --- a/xds/internal/xdsclient/tests/ads_stream_watch_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_watch_test.go @@ -147,9 +147,12 @@ func (s) TestADS_WatchState_TimerFires(t *testing.T) { nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) testutils.CreateBootstrapFileForTesting(t, bc) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, }) if err != nil { diff --git a/xds/internal/xdsclient/tests/authority_test.go b/xds/internal/xdsclient/tests/authority_test.go index 1947daf6dffe..3afe95c62396 100644 --- a/xds/internal/xdsclient/tests/authority_test.go +++ b/xds/internal/xdsclient/tests/authority_test.go @@ -101,9 +101,12 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time. if err != nil { t.Fatalf("Failed to create bootstrap configuration: %v", err) } - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bootstrapContents, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, IdleChannelExpiryTimeout: idleTimeout, }) diff --git a/xds/internal/xdsclient/tests/cds_watchers_test.go b/xds/internal/xdsclient/tests/cds_watchers_test.go index f8cd6dac7691..f68d40055ed1 100644 --- a/xds/internal/xdsclient/tests/cds_watchers_test.go +++ b/xds/internal/xdsclient/tests/cds_watchers_test.go @@ -210,9 +210,12 @@ func (s) TestCDSWatch(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -360,9 +363,12 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -463,9 +469,12 @@ func (s) TestCDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -560,9 +569,12 @@ func (s) TestCDSWatch_ResourceCaching(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -633,9 +645,12 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) testutils.CreateBootstrapFileForTesting(t, bc) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, }) if err != nil { @@ -673,9 +688,12 @@ func (s) TestCDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) testutils.CreateBootstrapFileForTesting(t, bc) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, }) if err != nil { @@ -756,9 +774,12 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -868,9 +889,12 @@ func (s) TestCDSWatch_NACKError(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -937,9 +961,12 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -1029,9 +1056,12 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/xds/internal/xdsclient/tests/dump_test.go b/xds/internal/xdsclient/tests/dump_test.go index bb4b2719113e..f2d29ef77035 100644 --- a/xds/internal/xdsclient/tests/dump_test.go +++ b/xds/internal/xdsclient/tests/dump_test.go @@ -140,7 +140,7 @@ func (s) TestDumpResources_ManyToOne(t *testing.T) { // Create two xDS clients with the above bootstrap contents. client1Name := t.Name() + "-1" - client1, close1, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + client1, close1, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: client1Name, Contents: bc, }) @@ -149,7 +149,7 @@ func (s) TestDumpResources_ManyToOne(t *testing.T) { } defer close1() client2Name := t.Name() + "-2" - client2, close2, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + client2, close2, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: client2Name, Contents: bc, }) @@ -409,7 +409,7 @@ func (s) TestDumpResources_ManyToMany(t *testing.T) { // Create two xDS clients with the above bootstrap contents. client1Name := t.Name() + "-1" - client1, close1, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + client1, close1, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: client1Name, Contents: bc, }) @@ -418,7 +418,7 @@ func (s) TestDumpResources_ManyToMany(t *testing.T) { } defer close1() client2Name := t.Name() + "-2" - client2, close2, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + client2, close2, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: client2Name, Contents: bc, }) diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index ef27178fac22..c4872c3c4748 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -240,9 +240,12 @@ func (s) TestEDSWatch(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -430,9 +433,12 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -534,9 +540,12 @@ func (s) TestEDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -636,7 +645,11 @@ func (s) TestEDSWatch_ResourceCaching(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), Contents: bc, }) @@ -720,9 +733,12 @@ func (s) TestEDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) testutils.CreateBootstrapFileForTesting(t, bc) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, }) if err != nil { @@ -761,9 +777,12 @@ func (s) TestEDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client talking to the above management server. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, }) if err != nil { @@ -830,7 +849,11 @@ func (s) TestEDSWatch_NACKError(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), Contents: bc, }) @@ -899,9 +922,12 @@ func (s) TestEDSWatch_PartialValid(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/xds/internal/xdsclient/tests/fallback_test.go b/xds/internal/xdsclient/tests/fallback_test.go index 514945f833d0..ba8f3f14bf4f 100644 --- a/xds/internal/xdsclient/tests/fallback_test.go +++ b/xds/internal/xdsclient/tests/fallback_test.go @@ -163,9 +163,12 @@ func (s) TestFallback_OnStartup(t *testing.T) { // Create an xDS client with the above bootstrap configuration and a short // idle channel expiry timeout. This ensures that connections to lower // priority servers get closed quickly, for the test to verify. - xdsC, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + xdsC, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bootstrapContents, IdleChannelExpiryTimeout: defaultTestIdleChannelExpiryTimeout, }) if err != nil { @@ -366,9 +369,12 @@ func (s) TestFallback_MidUpdate(t *testing.T) { // Create an xDS client with the above bootstrap configuration and a short // idle channel expiry timeout. This ensures that connections to lower // priority servers get closed quickly, for the test to verify. - xdsC, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + xdsC, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bootstrapContents, IdleChannelExpiryTimeout: defaultTestIdleChannelExpiryTimeout, }) if err != nil { @@ -559,9 +565,12 @@ func (s) TestFallback_MidStartup(t *testing.T) { // Create an xDS client with the above bootstrap configuration and a short // idle channel expiry timeout. This ensures that connections to lower // priority servers get closed quickly, for the test to verify. - xdsC, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + xdsC, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bootstrapContents, IdleChannelExpiryTimeout: defaultTestIdleChannelExpiryTimeout, }) if err != nil { diff --git a/xds/internal/xdsclient/tests/federation_watchers_test.go b/xds/internal/xdsclient/tests/federation_watchers_test.go index 78f69518cd6a..16f370da6b05 100644 --- a/xds/internal/xdsclient/tests/federation_watchers_test.go +++ b/xds/internal/xdsclient/tests/federation_watchers_test.go @@ -69,9 +69,12 @@ func setupForFederationWatchersTest(t *testing.T) (*e2e.ManagementServer, string t.Fatalf("Failed to create bootstrap configuration: %v", err) } // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 7b49b9b17b74..a1d38ce34ce0 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -266,9 +266,12 @@ func (s) TestLDSWatch(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -416,9 +419,12 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -520,9 +526,12 @@ func (s) TestLDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -612,9 +621,12 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -686,9 +698,12 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client talking to the above management server. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, }) if err != nil { @@ -726,9 +741,12 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client talking to the above management server. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, }) if err != nil { @@ -810,9 +828,12 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -924,9 +945,12 @@ func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) { bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -1001,9 +1025,12 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -1083,9 +1110,12 @@ func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -1192,9 +1222,12 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -1281,7 +1314,11 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), Contents: bc, }) diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index 6b8152620231..87834c67edbb 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -126,9 +126,12 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -236,9 +239,12 @@ func (s) TestNodeProtoSentOnlyInFirstRequest(t *testing.T) { bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/xds/internal/xdsclient/tests/rds_watchers_test.go b/xds/internal/xdsclient/tests/rds_watchers_test.go index b8dd1c72f465..619dfc4e3b88 100644 --- a/xds/internal/xdsclient/tests/rds_watchers_test.go +++ b/xds/internal/xdsclient/tests/rds_watchers_test.go @@ -242,9 +242,12 @@ func (s) TestRDSWatch(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -432,9 +435,12 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -536,9 +542,12 @@ func (s) TestRDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -638,9 +647,12 @@ func (s) TestRDSWatch_ResourceCaching(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) @@ -722,9 +734,12 @@ func (s) TestRDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client talking to the above management server. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, }) if err != nil { @@ -762,9 +777,12 @@ func (s) TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client talking to the above management server. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, }) if err != nil { @@ -831,7 +849,11 @@ func (s) TestRDSWatch_NACKError(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), Contents: bc, }) @@ -900,9 +922,12 @@ func (s) TestRDSWatch_PartialValid(t *testing.T) { testutils.CreateBootstrapFileForTesting(t, bc) // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bc, + pool, err := xdsclient.NewPool(bc) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), }) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index b493c820c774..c13752c33631 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -283,10 +283,10 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + client, close, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + Contents: bc, }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) @@ -360,7 +360,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { // involving receipt of an RDS response from the management server. The test // verifies that the internal state of the xDS client (parsed resource and // metadata) matches expectations. -func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { +func TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { const ( resourceName1 = "resource-name-1" resourceName2 = "resource-name-2" @@ -559,10 +559,10 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + client, close, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + Contents: bc, }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) @@ -796,10 +796,10 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + client, close, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + Contents: bc, }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) @@ -1145,10 +1145,10 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + client, close, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{ Name: t.Name(), - Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + Contents: bc, }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) diff --git a/xds/server.go b/xds/server.go index 1fea8c830936..a1b2e1d055f9 100644 --- a/xds/server.go +++ b/xds/server.go @@ -93,12 +93,11 @@ func NewGRPCServer(opts ...grpc.ServerOption) (*GRPCServer, error) { // simplifies the code by eliminating the need for a mutex to protect the // xdsC and xdsClientClose fields. newXDSClient := newXDSClient - if s.opts.bootstrapContentsForTesting != nil { + if s.opts.clientPoolForTesting != nil { // Bootstrap file contents may be specified as a server option for tests. newXDSClient = func(name string) (xdsclient.XDSClient, func(), error) { - return xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: name, - Contents: s.opts.bootstrapContentsForTesting, + return s.opts.clientPoolForTesting.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: name, }) } } diff --git a/xds/server_ext_test.go b/xds/server_ext_test.go index fe349ddc5560..1b2d7d778d98 100644 --- a/xds/server_ext_test.go +++ b/xds/server_ext_test.go @@ -142,7 +142,11 @@ func (s) TestServingModeChanges(t *testing.T) { } }, } - sopts := []grpc.ServerOption{grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)} + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + sopts := []grpc.ServerOption{grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.ClientPoolForTesting(pool)} if stub.S, err = xds.NewGRPCServer(sopts...); err != nil { t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) } @@ -188,7 +192,7 @@ func (s) TestServingModeChanges(t *testing.T) { // Lookup the xDS client in use based on the dedicated well-known key, as // defined in A71, used by the xDS enabled gRPC server. - xdsC, close, err := xdsclient.GetForTesting(xdsclient.NameForServer) + xdsC, close, err := pool.GetClientForTesting(xdsclient.NameForServer) if err != nil { t.Fatalf("Failed to find xDS client for configuration: %v", string(bootstrapContents)) } @@ -280,7 +284,11 @@ func (s) TestResourceNotFoundRDS(t *testing.T) { } }, } - sopts := []grpc.ServerOption{grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)} + pool, err := xdsclient.NewPool(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + sopts := []grpc.ServerOption{grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.ClientPoolForTesting(pool)} if stub.S, err = xds.NewGRPCServer(sopts...); err != nil { t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) } @@ -297,7 +305,7 @@ func (s) TestResourceNotFoundRDS(t *testing.T) { // Lookup the xDS client in use based on the dedicated well-known key, as // defined in A71, used by the xDS enabled gRPC server. - xdsC, close, err := xdsclient.GetForTesting(xdsclient.NameForServer) + xdsC, close, err := pool.GetClientForTesting(xdsclient.NameForServer) if err != nil { t.Fatalf("Failed to find xDS client for configuration: %v", string(bootstrapContents)) } diff --git a/xds/server_options.go b/xds/server_options.go index 9b9700cf3b33..601af7df0213 100644 --- a/xds/server_options.go +++ b/xds/server_options.go @@ -23,11 +23,12 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/xds/internal/xdsclient" ) type serverOptions struct { - modeCallback ServingModeCallbackFunc - bootstrapContentsForTesting []byte + modeCallback ServingModeCallbackFunc + clientPoolForTesting *xdsclient.Pool } type serverOption struct { @@ -58,9 +59,10 @@ type ServingModeChangeArgs struct { Err error } -// BootstrapContentsForTesting returns a grpc.ServerOption which allows users -// to inject a bootstrap configuration used by only this server, instead of the -// global configuration from the environment variables. +// BootstrapContentsForTesting returns a grpc.ServerOption with the default +// pool set with the provided bootstrap congiguration. It allows users to +// inject a bootstrap configuration to be used by only this server, instead of +// the global configuration from the environment variables. // // # Testing Only // @@ -69,8 +71,28 @@ type ServingModeChangeArgs struct { // // # Experimental // +// Notice: This API is EXPERIMENTAL but kept for backward compatibility. Use +// `ClientPoolForTesting` to set the pool directly. It will be removed in +// later release. +func BootstrapContentsForTesting(bootstrapContents []byte) grpc.ServerOption { + err := xdsclient.DefaultPool.SetFallbackBootstrapConfig(bootstrapContents) + if err != nil { + return &serverOption{apply: func(o *serverOptions) { o.clientPoolForTesting = nil }} + } + return &serverOption{apply: func(o *serverOptions) { o.clientPoolForTesting = xdsclient.DefaultPool }} +} + +// ClientPoolForTesting returns a grpc.ServerOption with the pool for xds +// clients. It allows users to set a pool for xds clients to be used by only this server, instead of +// the global configuration from the environment variables. +// +// # Testing Only +// +// This function should ONLY be used for testing and may not work with some +// other features, including the CSDS service. +// // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func BootstrapContentsForTesting(contents []byte) grpc.ServerOption { - return &serverOption{apply: func(o *serverOptions) { o.bootstrapContentsForTesting = contents }} +func ClientPoolForTesting(pool *xdsclient.Pool) grpc.ServerOption { + return &serverOption{apply: func(o *serverOptions) { o.clientPoolForTesting = pool }} } diff --git a/xds/server_test.go b/xds/server_test.go index fe1ba383112a..7576263a091e 100644 --- a/xds/server_test.go +++ b/xds/server_test.go @@ -121,14 +121,18 @@ func (s) TestNewServer_Success(t *testing.T) { desc: "without_xds_creds", serverOpts: []grpc.ServerOption{ grpc.Creds(insecure.NewCredentials()), - BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)), + func() grpc.ServerOption { + return BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)) + }(), }, }, { desc: "with_xds_creds", serverOpts: []grpc.ServerOption{ grpc.Creds(xdsCreds), - BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)), + func() grpc.ServerOption { + return BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)) + }(), }, wantXDSCredsInUse: true, }, @@ -184,7 +188,9 @@ func (s) TestNewServer_Failure(t *testing.T) { desc: "empty bootstrap config", serverOpts: []grpc.ServerOption{ grpc.Creds(xdsCreds), - BootstrapContentsForTesting([]byte(`{}`)), + func() grpc.ServerOption { + return BootstrapContentsForTesting([]byte(`{}`)) + }(), }, wantErr: "xDS client creation failed", },