Skip to content

Commit

Permalink
multiple servers with unique bootstrap conig test
Browse files Browse the repository at this point in the history
  • Loading branch information
purnesh42H committed Dec 6, 2024
1 parent 6e822a3 commit 43b394d
Showing 1 changed file with 238 additions and 0 deletions.
238 changes: 238 additions & 0 deletions xds/server_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds"
xdsinternal "google.golang.org/grpc/xds/internal"
Expand All @@ -47,6 +48,7 @@ import (

v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
Expand Down Expand Up @@ -221,6 +223,242 @@ func (s) TestServingModeChanges(t *testing.T) {
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
}

// TestServingModeChanges_MultipleServers tests two servers with unique
// bootstrap configuration are able to serve two different clients with same
// name if they are in different xds client pools.
//
// It tests the Server's logic as it transitions from Not Ready to Ready, then
// then to Not Ready. Before it goes Ready, connections should be accepted and
// closed. After it goes ready, RPC's should proceed as normal according to
// matched route configuration. After it transitions back into not ready
// (through an explicit LDS Resource Not Found), previously running RPC's
// should be gracefully closed and still work, and new RPC's should fail.
func (s) TestServingModeChanges_MultipleServers(t *testing.T) {
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})

// Create two bootstrap configurations pointing to the above management server.
nodeID1 := uuid.New().String()
bootstrapContents1 := e2e.DefaultBootstrapContents(t, nodeID1, managementServer.Address)
nodeID2 := uuid.New().String()
bootstrapContents2 := e2e.DefaultBootstrapContents(t, nodeID2, managementServer.Address)

lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch. Due to not having received the full
// configuration, this should cause the server to be in mode Serving.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}

listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
resources1 := e2e.UpdateOptions{
NodeID: nodeID1,
Listeners: []*v3listenerpb.Listener{listener},
SkipValidation: true,
}
resources2 := e2e.UpdateOptions{
NodeID: nodeID2,
Listeners: []*v3listenerpb.Listener{listener},
SkipValidation: true,
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources1); err != nil {
t.Fatal(err)
}
if err := managementServer.Update(ctx, resources2); err != nil {
t.Fatal(err)
}

serving := grpcsync.NewEvent()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})

stub1 := &stubserver.StubServer{
Listener: lis,
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
for {
_, err := stream.Recv() // hangs here forever if stream doesn't shut down...doesn't receive EOF without any errors
if err == io.EOF {
return nil
}
}
},
}
stub2 := &stubserver.StubServer{
Listener: lis,
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
for {
_, err := stream.Recv() // hangs here forever if stream doesn't shut down...doesn't receive EOF without any errors
if err == io.EOF {
return nil
}
}
},
}

// Create two xds client pools with different bootstrap contents.
pool1, err := xdsclient.NewPool(bootstrapContents1)
if err != nil {
t.Fatalf("Failed to create an xDS client pool: %v", err)
}
sopts1 := []grpc.ServerOption{grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.ClientPoolForTesting(pool1)}
if stub1.S, err = xds.NewGRPCServer(sopts1...); err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
pool2, err := xdsclient.NewPool(bootstrapContents2)
if err != nil {
t.Fatalf("Failed to create an xDS client pool: %v", err)
}
sopts2 := []grpc.ServerOption{grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.ClientPoolForTesting(pool2)}
if stub2.S, err = xds.NewGRPCServer(sopts2...); err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}

// Start the servers and make calls to them.
stubserver.StartTestService(t, stub1)
defer stub1.S.Stop()
cc1, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc1.Close()

stubserver.StartTestService(t, stub2)
defer stub2.S.Stop()
cc2, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc2.Close()

waitForFailedRPCWithStatus(ctx, t, cc1, errAcceptAndClose)
waitForFailedRPCWithStatus(ctx, t, cc2, errAcceptAndClose)

routeConfig := e2e.RouteConfigNonForwardingAction("routeName")
resources1 = e2e.UpdateOptions{
NodeID: nodeID1,
Listeners: []*v3listenerpb.Listener{listener},
Routes: []*v3routepb.RouteConfiguration{routeConfig},
}
resources2 = e2e.UpdateOptions{
NodeID: nodeID2,
Listeners: []*v3listenerpb.Listener{listener},
Routes: []*v3routepb.RouteConfiguration{routeConfig},
}
defer cancel()
if err := managementServer.Update(ctx, resources1); err != nil {
t.Fatal(err)
}
if err := managementServer.Update(ctx, resources2); err != nil {
t.Fatal(err)
}

select {
case <-ctx.Done():
t.Fatal("timeout waiting for the xDS Server to go Serving")
case <-serving.Done():
}

// A unary RPC should work once it transitions into serving. (need this same
// assertion from LDS resource not found triggering it).
waitForSuccessfulRPC(ctx, t, cc1)
waitForSuccessfulRPC(ctx, t, cc2)

// Start a stream before switching the server to not serving. Due to the
// stream being created before the graceful stop of the underlying
// connection, it should be able to continue even after the server switches
// to not serving.
c1 := testgrpc.NewTestServiceClient(cc1)
stream1, err := c1.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("cc.FullDuplexCall failed: %f", err)
}
c2 := testgrpc.NewTestServiceClient(cc2)
stream2, err := c2.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("cc.FullDuplexCall failed: %f", err)
}

// Lookup the xDS client in use based on the dedicated well-known key, as
// defined in A71, used by the xDS enabled gRPC server.
xdsC1, close1, err := pool1.GetClientForTesting(xdsclient.NameForServer)
if err != nil {
t.Fatalf("Failed to find xDS client for configuration: %v", string(bootstrapContents1))
}
defer close1()
xdsC2, close2, err := pool2.GetClientForTesting(xdsclient.NameForServer)
if err != nil {
t.Fatalf("Failed to find xDS client for configuration: %v", string(bootstrapContents1))
}
defer close2()

// Compare the bootstrap configurations for both xds clients.
if want, _ := bootstrap.NewConfigForTesting(bootstrapContents1); !cmp.Equal(want, xdsC1.BootstrapConfig()) {
t.Fatalf("want %v bootstrap config from xdsC1, got %v", want, xdsC1.BootstrapConfig())
}
if want, _ := bootstrap.NewConfigForTesting(bootstrapContents2); !cmp.Equal(want, xdsC2.BootstrapConfig()) {
t.Fatalf("want %v bootstrap config from xdsC2, got %v", want, xdsC2.BootstrapConfig())
}

// Invoke LDS Resource not found here (tests graceful close).
triggerResourceNotFound := internal.TriggerXDSResourceNotFoundForTesting.(func(xdsclient.XDSClient, xdsresource.Type, string) error)
listenerResourceType := xdsinternal.ResourceTypeMapForTesting[version.V3ListenerURL].(xdsresource.Type)
if err := triggerResourceNotFound(xdsC1, listenerResourceType, listener.GetName()); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}
if err := triggerResourceNotFound(xdsC2, listenerResourceType, listener.GetName()); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}

// New RPCs on that connection should eventually start failing. Due to
// Graceful Stop any started streams continue to work.
if err = stream1.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
}
if err = stream1.CloseSend(); err != nil {
t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
}
if _, err = stream1.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}

if err = stream2.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
}
if err = stream2.CloseSend(); err != nil {
t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
}
if _, err = stream2.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}

// New RPCs on that connection should eventually start failing.
waitForFailedRPCWithStatus(ctx, t, cc1, errAcceptAndClose)
waitForFailedRPCWithStatus(ctx, t, cc2, errAcceptAndClose)
}

// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which
// returns resource not found. Before getting the resource not found, the xDS
// Server has not received all configuration needed, so it should Accept and
Expand Down

0 comments on commit 43b394d

Please sign in to comment.