From 7662fd8f26d56d37bd0c67d162b28ff0c9aded98 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 17 Dec 2024 16:16:30 +0530 Subject: [PATCH 1/6] Avoid blocking for subsequent resolver updates --- .../balancer/clusterresolver/e2e_test/eds_impl_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index 89760f6fd23e..c1cf06ddf31e 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -1232,7 +1232,11 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { bd.Data.(balancer.Balancer).Close() }, UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { - resolverUpdateCh <- ccs.ResolverState + select { + case resolverUpdateCh <- ccs.ResolverState: + default: + // Don't block forever in case of multiple updates. + } return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) }, }) From 26e0bccae44c6c56e21e931f9152cb948d637b81 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Mon, 23 Dec 2024 13:47:35 +0530 Subject: [PATCH 2/6] Separate the creation of the EDS watch from the Endpoint update --- .../clusterresolver/e2e_test/eds_impl_test.go | 53 ++++++++++++------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index c1cf06ddf31e..b07031f3c54d 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "net" "strings" "testing" "time" @@ -1160,17 +1159,17 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { return &testpb.SimpleResponse{}, nil }, } - lis1, err := net.Listen("tcp", "localhost:0") + lis1, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("Failed to create listener: %v", err) } defer lis1.Close() - lis2, err := net.Listen("tcp", "localhost:0") + lis2, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("Failed to create listener: %v", err) } defer lis2.Close() - lis3, err := net.Listen("tcp", "localhost:0") + lis3, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("Failed to create listener: %v", err) } @@ -1241,26 +1240,25 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { }, }) + edsWatchCreated := make(chan struct{}) // Spin up a management server to receive xDS resources from. - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() != "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment" { + return nil + } + if req.ResponseNonce == "" { + t.Logf("Got initial ClusterLoadAssignment request: %+v", req) + close(edsWatchCreated) + } + return nil + }, + }) // Create bootstrap configuration pointing to the above management server. nodeID := uuid.New().String() bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - // Create xDS resources for consumption by the test. We start off with a - // single backend in a single EDS locality. - resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ - Name: localityName1, - Weight: 1, - Backends: []e2e.BackendOptions{{ - Ports: ports, - }}, - }}) - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - // Create an xDS client talking to the above management server, configured // with a short watch expiry timeout. xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ @@ -1296,6 +1294,25 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { t.Fatalf("failed to create new client for local test server: %v", err) } defer cc.Close() + cc.Connect() + + select { + case <-ctx.Done(): + t.Fatal("Timed out waiting for EDS watch to be created.") + case <-edsWatchCreated: + } + // Create xDS resources for consumption by the test. + resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ + Name: localityName1, + Weight: 1, + Backends: []e2e.BackendOptions{{ + Ports: ports, + }}, + }}) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + client := testgrpc.NewTestServiceClient(cc) if err := rrutil.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: lis1.Addr().String()}}); err != nil { t.Fatal(err) From e6dede4da2433c481dd2b79690f0ca76abd0b95d Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Mon, 23 Dec 2024 14:39:49 +0530 Subject: [PATCH 3/6] Revert "Separate the creation of the EDS watch from the Endpoint update" This reverts commit 26e0bccae44c6c56e21e931f9152cb948d637b81. --- .../clusterresolver/e2e_test/eds_impl_test.go | 53 +++++++------------ 1 file changed, 18 insertions(+), 35 deletions(-) diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index b07031f3c54d..c1cf06ddf31e 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "net" "strings" "testing" "time" @@ -1159,17 +1160,17 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { return &testpb.SimpleResponse{}, nil }, } - lis1, err := testutils.LocalTCPListener() + lis1, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Failed to create listener: %v", err) } defer lis1.Close() - lis2, err := testutils.LocalTCPListener() + lis2, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Failed to create listener: %v", err) } defer lis2.Close() - lis3, err := testutils.LocalTCPListener() + lis3, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Failed to create listener: %v", err) } @@ -1240,25 +1241,26 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { }, }) - edsWatchCreated := make(chan struct{}) // Spin up a management server to receive xDS resources from. - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - if req.GetTypeUrl() != "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment" { - return nil - } - if req.ResponseNonce == "" { - t.Logf("Got initial ClusterLoadAssignment request: %+v", req) - close(edsWatchCreated) - } - return nil - }, - }) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) // Create bootstrap configuration pointing to the above management server. nodeID := uuid.New().String() bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + // Create xDS resources for consumption by the test. We start off with a + // single backend in a single EDS locality. + resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ + Name: localityName1, + Weight: 1, + Backends: []e2e.BackendOptions{{ + Ports: ports, + }}, + }}) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + // Create an xDS client talking to the above management server, configured // with a short watch expiry timeout. xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ @@ -1294,25 +1296,6 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { t.Fatalf("failed to create new client for local test server: %v", err) } defer cc.Close() - cc.Connect() - - select { - case <-ctx.Done(): - t.Fatal("Timed out waiting for EDS watch to be created.") - case <-edsWatchCreated: - } - // Create xDS resources for consumption by the test. - resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ - Name: localityName1, - Weight: 1, - Backends: []e2e.BackendOptions{{ - Ports: ports, - }}, - }}) - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - client := testgrpc.NewTestServiceClient(cc) if err := rrutil.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: lis1.Addr().String()}}); err != nil { t.Fatal(err) From e8dc2ed8553db95a86056a82cf83c628494e07c6 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Mon, 23 Dec 2024 14:40:06 +0530 Subject: [PATCH 4/6] Revert "Avoid blocking for subsequent resolver updates" This reverts commit 7662fd8f26d56d37bd0c67d162b28ff0c9aded98. --- .../balancer/clusterresolver/e2e_test/eds_impl_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index c1cf06ddf31e..89760f6fd23e 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -1232,11 +1232,7 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { bd.Data.(balancer.Balancer).Close() }, UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { - select { - case resolverUpdateCh <- ccs.ResolverState: - default: - // Don't block forever in case of multiple updates. - } + resolverUpdateCh <- ccs.ResolverState return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) }, }) From bb960bfa609d02b0ce5872b1c33963d41050100f Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Mon, 23 Dec 2024 14:40:45 +0530 Subject: [PATCH 5/6] Use mutex instead of channel for sync --- .../clusterresolver/e2e_test/eds_impl_test.go | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index 89760f6fd23e..cd9ca11aaec3 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -20,8 +20,8 @@ import ( "context" "errors" "fmt" - "net" "strings" + "sync" "testing" "time" @@ -1160,17 +1160,17 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { return &testpb.SimpleResponse{}, nil }, } - lis1, err := net.Listen("tcp", "localhost:0") + lis1, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("Failed to create listener: %v", err) } defer lis1.Close() - lis2, err := net.Listen("tcp", "localhost:0") + lis2, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("Failed to create listener: %v", err) } defer lis2.Close() - lis3, err := net.Listen("tcp", "localhost:0") + lis3, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("Failed to create listener: %v", err) } @@ -1223,7 +1223,8 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { defer func() { balancer.Register(originalRRBuilder) }() - resolverUpdateCh := make(chan resolver.State, 1) + resolverStateMu := sync.Mutex{} + resolverState := resolver.State{} stub.Register(roundrobin.Name, stub.BalancerFuncs{ Init: func(bd *stub.BalancerData) { bd.Data = originalRRBuilder.Build(bd.ClientConn, bd.BuildOptions) @@ -1232,7 +1233,9 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { bd.Data.(balancer.Balancer).Close() }, UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { - resolverUpdateCh <- ccs.ResolverState + resolverStateMu.Lock() + defer resolverStateMu.Unlock() + resolverState = ccs.ResolverState return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) }, }) @@ -1297,15 +1300,12 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { t.Fatal(err) } - var rs resolver.State - select { - case rs = <-resolverUpdateCh: - case <-ctx.Done(): - t.Fatalf("Context timed out waiting for resolver update.") - } + resolverStateMu.Lock() + gotState := resolverState + resolverStateMu.Unlock() gotEndpointPorts := []uint32{} - for _, a := range rs.Endpoints[0].Addresses { + for _, a := range gotState.Endpoints[0].Addresses { gotEndpointPorts = append(gotEndpointPorts, testutils.ParsePort(t, a.Addr)) } if diff := cmp.Diff(gotEndpointPorts, tc.wantEndpointPorts); diff != "" { @@ -1313,7 +1313,7 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { } gotAddrPorts := []uint32{} - for _, a := range rs.Addresses { + for _, a := range gotState.Addresses { gotAddrPorts = append(gotAddrPorts, testutils.ParsePort(t, a.Addr)) } if diff := cmp.Diff(gotAddrPorts, tc.wantAddrPorts); diff != "" { From 8d114e633a64b0736eaa3be7583138c56d27740a Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 24 Dec 2024 15:57:30 +0530 Subject: [PATCH 6/6] Use atomic pointer --- .../clusterresolver/e2e_test/eds_impl_test.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index cd9ca11aaec3..e75cf1970de2 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -21,7 +21,7 @@ import ( "errors" "fmt" "strings" - "sync" + "sync/atomic" "testing" "time" @@ -1223,8 +1223,8 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { defer func() { balancer.Register(originalRRBuilder) }() - resolverStateMu := sync.Mutex{} - resolverState := resolver.State{} + resolverState := atomic.Pointer[resolver.State]{} + resolverState.Store(&resolver.State{}) stub.Register(roundrobin.Name, stub.BalancerFuncs{ Init: func(bd *stub.BalancerData) { bd.Data = originalRRBuilder.Build(bd.ClientConn, bd.BuildOptions) @@ -1233,9 +1233,7 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { bd.Data.(balancer.Balancer).Close() }, UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { - resolverStateMu.Lock() - defer resolverStateMu.Unlock() - resolverState = ccs.ResolverState + resolverState.Store(&ccs.ResolverState) return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) }, }) @@ -1300,9 +1298,7 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { t.Fatal(err) } - resolverStateMu.Lock() - gotState := resolverState - resolverStateMu.Unlock() + gotState := resolverState.Load() gotEndpointPorts := []uint32{} for _, a := range gotState.Endpoints[0].Addresses {