From a403c6b33558e4ff33ba4f09e4fea7ad1abc6f94 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Thu, 5 Dec 2024 17:21:15 +0530 Subject: [PATCH 01/13] Outlier detection using health listener --- .../pickfirst/pickfirstleaf/pickfirstleaf.go | 26 ++++- .../pickfirstleaf/pickfirstleaf_ext_test.go | 58 +++++----- .../balancer/outlierdetection/balancer.go | 73 ++++++++++--- .../e2e_test/outlierdetection_test.go | 101 ++++++++++++++++++ .../outlierdetection/subconn_wrapper.go | 55 +++++++++- 5 files changed, 268 insertions(+), 45 deletions(-) diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go index 2fc0a71f9441..39c3ea83a5d0 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go @@ -54,9 +54,17 @@ func init() { balancer.Register(pickfirstBuilder{}) } -// enableHealthListenerKeyType is a unique key type used in resolver attributes -// to indicate whether the health listener usage is enabled. -type enableHealthListenerKeyType struct{} +type ( + // enableHealthListenerKeyType is a unique key type used in resolver attributes + // to indicate whether the health listener usage is enabled. + enableHealthListenerKeyType struct{} + // managedByPickfirstKeyType is an attribute key type to inform Outlier + // Detection that the generic health listener is being used. + // TODO(arjan-bal): Remove this when implementing the dualstack design. + // This is a hack. Once Dualstack is completed, outlier detection will stop + // sending ejection updates through the connectivity listener. + managedByPickfirstKeyType = struct{} +) var ( logger = grpclog.Component("pick-first-leaf-lb") @@ -140,6 +148,17 @@ func EnableHealthListener(state resolver.State) resolver.State { return state } +// IsManagedByPickfirst returns whether an address belongs to a SubConn +// managed by the pickfirst LB policy. +// TODO(arjan-bal): This is a hack to disable outlier_detection via the +// with connectivity listener when using pick_first. Once Dualstack changes +// are complete, all SubConns will be created by pick_first and outlier +// detection will only use the health listener for ejection. This hack can +// then be removed. +func IsManagedByPickfirst(addr resolver.Address) bool { + return addr.BalancerAttributes.Value(managedByPickfirstKeyType{}) != nil +} + type pfConfig struct { serviceconfig.LoadBalancingConfig `json:"-"` @@ -166,6 +185,7 @@ type scData struct { } func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) { + addr.BalancerAttributes = addr.BalancerAttributes.WithValue(managedByPickfirstKeyType{}, true) sd := &scData{ rawConnectivityState: connectivity.Idle, effectiveState: connectivity.Idle, diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go index 9667c2b3db6b..fee9d716bc3f 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go @@ -58,7 +58,13 @@ const ( stateStoringBalancerName = "state_storing" ) -var stateStoringServiceConfig = fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateStoringBalancerName) +var ( + stateStoringServiceConfig = fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateStoringBalancerName) + ignoreBalAttributesOpt = cmp.Transformer("IgnoreBalancerAttributes", func(a resolver.Address) resolver.Address { + a.BalancerAttributes = nil + return a + }) +) type s struct { grpctest.Tester @@ -177,7 +183,7 @@ func (s) TestPickFirstLeaf_SimpleResolverUpdate_FirstServerReady(t *testing.T) { wantSCStates := []scState{ {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -219,7 +225,7 @@ func (s) TestPickFirstLeaf_SimpleResolverUpdate_FirstServerUnReady(t *testing.T) {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Shutdown}, {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -264,7 +270,7 @@ func (s) TestPickFirstLeaf_SimpleResolverUpdate_DuplicateAddrs(t *testing.T) { {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Shutdown}, {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -317,7 +323,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_DisjointLists(t *testing.T) { {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -334,7 +340,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_DisjointLists(t *testing.T) { {Addrs: []resolver.Address{addrs[3]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -378,7 +384,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_ActiveBackendInUpdatedList(t *testing {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -398,7 +404,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_ActiveBackendInUpdatedList(t *testing {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -440,7 +446,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_InActiveBackendInUpdatedList(t *testi {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -458,7 +464,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_InActiveBackendInUpdatedList(t *testi {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -502,7 +508,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_IdenticalLists(t *testing.T) { {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -521,7 +527,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_IdenticalLists(t *testing.T) { {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -576,7 +582,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerRestart(t *testing.T) {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -591,7 +597,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerRestart(t *testing.T) t.Fatal(err) } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -639,7 +645,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerRestart(t *testing.T) {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -660,7 +666,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerRestart(t *testing.T) {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Shutdown}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -708,7 +714,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerToFirst(t *testing.T) {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -729,7 +735,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerToFirst(t *testing.T) {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -776,7 +782,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerToSecond(t *testing.T) {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -796,7 +802,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerToSecond(t *testing.T) {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -1130,7 +1136,7 @@ func (s) TestPickFirstLeaf_InterleavingIPV4Preffered(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - if diff := cmp.Diff(wantAddrs, gotAddrs); diff != "" { + if diff := cmp.Diff(wantAddrs, gotAddrs, ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn creation order mismatch (-want +got):\n%s", diff) } } @@ -1174,7 +1180,7 @@ func (s) TestPickFirstLeaf_InterleavingIPv6Preffered(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - if diff := cmp.Diff(wantAddrs, gotAddrs); diff != "" { + if diff := cmp.Diff(wantAddrs, gotAddrs, ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn creation order mismatch (-want +got):\n%s", diff) } } @@ -1220,7 +1226,7 @@ func (s) TestPickFirstLeaf_InterleavingUnknownPreffered(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - if diff := cmp.Diff(wantAddrs, gotAddrs); diff != "" { + if diff := cmp.Diff(wantAddrs, gotAddrs, ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn creation order mismatch (-want +got):\n%s", diff) } } @@ -1485,6 +1491,7 @@ func subConnAddresses(ctx context.Context, cc *testutils.BalancerClientConn, sub if len(sc.Addresses) != 1 { return nil, fmt.Errorf("new subchannel created with %d addresses, want 1", len(sc.Addresses)) } + sc.Addresses[0].Attributes = nil addresses = append(addresses, sc.Addresses[0]) sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc.UpdateState(balancer.SubConnState{ @@ -1586,7 +1593,10 @@ func (b *backendManager) stopAllExcept(index int) { func (b *backendManager) resolverAddrs() []resolver.Address { addrs := make([]resolver.Address, len(b.backends)) for i, backend := range b.backends { - addrs[i] = resolver.Address{Addr: backend.Address} + addrs[i] = resolver.Address{ + Addr: backend.Address, + // Attributes: attributes.New(pfinternal.ManagedByPickfirstKeyType{}, true), + } } return addrs } diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index c9d496ce09b9..9e8588d55d8d 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -33,6 +33,7 @@ import ( "unsafe" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/buffer" @@ -152,6 +153,11 @@ type lbCfgUpdate struct { done chan struct{} } +type scHealthUpdate struct { + scw *subConnWrapper + state balancer.SubConnState +} + type outlierDetectionBalancer struct { // These fields are safe to be accessed without holding any mutex because // they are synchronized in run(), which makes these field accesses happen @@ -355,6 +361,9 @@ func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state if state.ConnectivityState == connectivity.Shutdown { delete(b.scWrappers, scw.SubConn) } + scw.mu.Lock() + defer scw.mu.Unlock() + scw.latestReceivedConnectivityState = state.ConnectivityState b.scUpdateCh.Put(&scUpdate{ scw: scw, state: state, @@ -475,10 +484,11 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal return nil, err } scw := &subConnWrapper{ - SubConn: sc, - addresses: addrs, - scUpdateCh: b.scUpdateCh, - listener: oldListener, + SubConn: sc, + addresses: addrs, + scUpdateCh: b.scUpdateCh, + listener: oldListener, + healthListenerEnabled: len(addrs) == 1 && pickfirstleaf.IsManagedByPickfirst(addrs[0]), } b.mu.Lock() defer b.mu.Unlock() @@ -596,13 +606,35 @@ func (b *outlierDetectionBalancer) Target() string { // if the SubConn is not ejected. func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { scw := u.scw - scw.latestState = u.state - if !scw.ejected { - if scw.listener != nil { - b.childMu.Lock() - scw.listener(u.state) - b.childMu.Unlock() - } + scw.mu.Lock() + scw.healthListener = nil + scw.mu.Unlock() + + if scw.listener == nil { + return + } + + if !scw.healthListenerEnabled { + scw.latestDeleveredState = u.state + } + + if scw.healthListenerEnabled || !scw.ejected { + b.childMu.Lock() + scw.listener(u.state) + b.childMu.Unlock() + } +} + +func (b *outlierDetectionBalancer) handleSubConnHealthUpdate(u *scHealthUpdate) { + scw := u.scw + scw.mu.Lock() + defer scw.mu.Unlock() + + scw.latestDeleveredState = u.state + if !scw.ejected && scw.healthListener != nil { + b.childMu.Lock() + scw.healthListener(u.state) + b.childMu.Unlock() } } @@ -613,15 +645,26 @@ func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) { scw.ejected = u.isEjected // If scw.latestState has never been written to will default to connectivity // IDLE, which is fine. - stateToUpdate := scw.latestState + stateToUpdate := scw.latestDeleveredState if u.isEjected { stateToUpdate = balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, } } - if scw.listener != nil { + + if !scw.healthListenerEnabled { + if scw.listener != nil { + b.childMu.Lock() + scw.listener(stateToUpdate) + b.childMu.Unlock() + } + return + } + scw.mu.Lock() + defer scw.mu.Unlock() + if scw.healthListener != nil { b.childMu.Lock() - scw.listener(stateToUpdate) + scw.healthListener(stateToUpdate) b.childMu.Unlock() } } @@ -696,6 +739,8 @@ func (b *outlierDetectionBalancer) run() { b.handleSubConnUpdate(u) case *ejectionUpdate: b.handleEjectedUpdate(u) + case *scHealthUpdate: + b.handleSubConnHealthUpdate(u) } case update, ok := <-b.pickerUpdateCh.Get(): if !ok { diff --git a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go index 35c4d75301b3..188d99ebe041 100644 --- a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go +++ b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go @@ -28,10 +28,14 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -367,3 +371,100 @@ func (s) TestNoopConfiguration(t *testing.T) { t.Fatalf("error in expected round robin: %v", err) } } + +// TestPickFirstIsNoop verifies that outlier detection is performed using the +// generic health producer when the pickfirstleaf LB policy is used. The test +// server returns error for consecutive requests and test verifies that the +// endpoint is not ejected. +func (s) TestPickFirst(t *testing.T) { + backend1 := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return nil, errors.New("some error") + }, + } + if err := backend1.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + defer backend1.Stop() + t.Logf("Started bad TestService backend at: %q", backend1.Address) + + countingODServiceConfigJSON := fmt.Sprintf(` +{ + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": "0.025s", + "baseEjectionTime": "0.100s", + "maxEjectionTime": "300s", + "failurePercentageEjection": { + "threshold": 50, + "enforcementPercentage": 100, + "minimumHosts": 0, + "requestVolume": 2 + }, + "childPolicy": [{"%s": {}}] + } + } + ] +}`, healthCheckingPetiolePolicyName) + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(countingODServiceConfigJSON) + + mr := manual.NewBuilderWithScheme("od-e2e") + // The full list of addresses. + mr.InitialState(resolver.State{ + Addresses: []resolver.Address{{Addr: backend1.Address}}, + ServiceConfig: sc, + }) + cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient() failed: %v", err) + } + defer cc.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testServiceClient := testgrpc.NewTestServiceClient(cc) + + // The first request should not cause ejection. + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + // Wait for the failure rate algorithm to run once. + shortCtx, shortCancel := context.WithTimeout(ctx, 50*time.Millisecond) + defer shortCancel() + testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Ready) + + // 2 failing request should cause ejection. + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + // Wait for the failure rate algorithm to run once. + testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) + + // The SubConn should be unejected after 100 millis. + testutils.AwaitState(ctx, t, cc, connectivity.Ready) +} + +type healthCheckingPetiolePolicyBuilder struct{} + +func (bb *healthCheckingPetiolePolicyBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + b := &healthCheckingPetiolePolicy{ + Balancer: balancer.Get(pickfirstleaf.Name).Build(cc, opts), + } + return b +} + +func (bb *healthCheckingPetiolePolicyBuilder) Name() string { + return healthCheckingPetiolePolicyName +} + +func (b *healthCheckingPetiolePolicy) UpdateClientConnState(state balancer.ClientConnState) error { + state.ResolverState = pickfirstleaf.EnableHealthListener(state.ResolverState) + return b.Balancer.UpdateClientConnState(state) +} + +type healthCheckingPetiolePolicy struct { + balancer.Balancer +} + +const healthCheckingPetiolePolicyName = "hcp" + +func init() { + balancer.Register(&healthCheckingPetiolePolicyBuilder{}) +} diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go index 0fa422d8f262..058bd182dbac 100644 --- a/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -19,9 +19,11 @@ package outlierdetection import ( "fmt" + "sync" "unsafe" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/resolver" ) @@ -31,6 +33,9 @@ import ( // whether or not this SubConn is ejected. type subConnWrapper struct { balancer.SubConn + // The following fields are set during object creation and read-only after + // that. + listener func(balancer.SubConnState) // addressInfo is a pointer to the subConnWrapper's corresponding address @@ -38,16 +43,33 @@ type subConnWrapper struct { addressInfo unsafe.Pointer // *addressInfo // These two pieces of state will reach eventual consistency due to sync in // run(), and child will always have the correctly updated SubConnState. - // latestState is the latest state update from the underlying SubConn. This - // is used whenever a SubConn gets unejected. - latestState balancer.SubConnState - ejected bool + // latestDeleveredState is the latest state update from the underlying SubConn. This + // is used whenever a SubConn gets unejected. This will be the health state + // if a health listener is being used, otherwise it will be the connectivity + // state. + latestDeleveredState balancer.SubConnState + ejected bool scUpdateCh *buffer.Unbounded // addresses is the list of address(es) this SubConn was created with to // help support any change in address(es) addresses []resolver.Address + // healthListenerEnabled indicates whether the leaf LB policy is using a + // generic health listener. When enabled, ejection updates are sent via the + // health listener instead of the connectivity listener. Once Dualstack + // changes are complete, all SubConns will be created by pickfirst which + // uses the health listener. + healthListenerEnabled bool + + // Access to the following fields are protected by a mutex. + mu sync.Mutex + healthListener func(balancer.SubConnState) + // latestReceivedConnectivityState is the SubConn most recent connectivity + // state received from the subchannel. It may not be delivered to the child + // balancer yet. It is used to ensure a health listener is registered only + // when the subchannel is READY. + latestReceivedConnectivityState connectivity.State } // eject causes the wrapper to report a state update with the TRANSIENT_FAILURE @@ -72,3 +94,28 @@ func (scw *subConnWrapper) uneject() { func (scw *subConnWrapper) String() string { return fmt.Sprintf("%+v", scw.addresses) } + +func (scw *subConnWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) { + scw.mu.Lock() + defer scw.mu.Unlock() + + if !scw.healthListenerEnabled { + logger.Errorf("Health listener unexpectedly registered on SubConn %v.", scw) + return + } + + if scw.latestReceivedConnectivityState != connectivity.Ready { + return + } + scw.healthListener = listener + if listener == nil { + scw.SubConn.RegisterHealthListener(nil) + } else { + scw.SubConn.RegisterHealthListener(func(scs balancer.SubConnState) { + scw.scUpdateCh.Put(&scHealthUpdate{ + scw: scw, + state: scs, + }) + }) + } +} From 693cee20b358394f6a24967e12fd708f5c3e9b50 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 10 Dec 2024 16:02:34 +0530 Subject: [PATCH 02/13] Address review comments --- .../pickfirst/pickfirstleaf/pickfirstleaf.go | 9 ++- .../pickfirstleaf/pickfirstleaf_ext_test.go | 1 - .../e2e_test/outlierdetection_test.go | 74 +++++++++++++++++-- 3 files changed, 72 insertions(+), 12 deletions(-) diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go index 39c3ea83a5d0..89b307f8ddb6 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go @@ -60,10 +60,11 @@ type ( enableHealthListenerKeyType struct{} // managedByPickfirstKeyType is an attribute key type to inform Outlier // Detection that the generic health listener is being used. - // TODO(arjan-bal): Remove this when implementing the dualstack design. - // This is a hack. Once Dualstack is completed, outlier detection will stop - // sending ejection updates through the connectivity listener. - managedByPickfirstKeyType = struct{} + // TODO: https://github.com/grpc/grpc-go/issues/7915 - Remove this when + // implementing the dualstack design. This is a hack. Once Dualstack is + // completed, outlier detection will stop sending ejection updates through + // the connectivity listener. + managedByPickfirstKeyType struct{} ) var ( diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go index fee9d716bc3f..d5a148c1155f 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go @@ -1595,7 +1595,6 @@ func (b *backendManager) resolverAddrs() []resolver.Address { for i, backend := range b.backends { addrs[i] = resolver.Address{ Addr: backend.Address, - // Attributes: attributes.New(pfinternal.ManagedByPickfirstKeyType{}, true), } } return addrs diff --git a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go index 188d99ebe041..b27edc1cdb05 100644 --- a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go +++ b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go @@ -372,21 +372,21 @@ func (s) TestNoopConfiguration(t *testing.T) { } } -// TestPickFirstIsNoop verifies that outlier detection is performed using the +// Test verifies that outlier detection is performed using the // generic health producer when the pickfirstleaf LB policy is used. The test // server returns error for consecutive requests and test verifies that the // endpoint is not ejected. -func (s) TestPickFirst(t *testing.T) { - backend1 := &stubserver.StubServer{ +func (s) TestPickFirstHealthListenerEnabled(t *testing.T) { + backend := &stubserver.StubServer{ EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return nil, errors.New("some error") }, } - if err := backend1.StartServer(); err != nil { + if err := backend.StartServer(); err != nil { t.Fatalf("Failed to start backend: %v", err) } - defer backend1.Stop() - t.Logf("Started bad TestService backend at: %q", backend1.Address) + defer backend.Stop() + t.Logf("Started bad TestService backend at: %q", backend.Address) countingODServiceConfigJSON := fmt.Sprintf(` { @@ -412,7 +412,7 @@ func (s) TestPickFirst(t *testing.T) { mr := manual.NewBuilderWithScheme("od-e2e") // The full list of addresses. mr.InitialState(resolver.State{ - Addresses: []resolver.Address{{Addr: backend1.Address}}, + Addresses: []resolver.Address{{Addr: backend.Address}}, ServiceConfig: sc, }) cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -441,6 +441,66 @@ func (s) TestPickFirst(t *testing.T) { testutils.AwaitState(ctx, t, cc, connectivity.Ready) } +// Test verifies that outlier detection doesn't eject subchannels created by +// the new pickfirst balancer when pickfirst is a top-level policy. +func (s) TestPickFirstHealthListenerDisabled(t *testing.T) { + backend := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return nil, errors.New("some error") + }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + defer backend.Stop() + t.Logf("Started bad TestService backend at: %q", backend.Address) + + countingODServiceConfigJSON := fmt.Sprintf(` +{ + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": "0.025s", + "baseEjectionTime": "0.100s", + "maxEjectionTime": "300s", + "failurePercentageEjection": { + "threshold": 50, + "enforcementPercentage": 100, + "minimumHosts": 0, + "requestVolume": 2 + }, + "childPolicy": [{"%s": {}}] + } + } + ] +}`, pickfirstleaf.Name) + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(countingODServiceConfigJSON) + + mr := manual.NewBuilderWithScheme("od-e2e") + + mr.InitialState(resolver.State{ + Addresses: []resolver.Address{{Addr: backend.Address}}, + ServiceConfig: sc, + }) + cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient() failed: %v", err) + } + defer cc.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testServiceClient := testgrpc.NewTestServiceClient(cc) + + // Failing request should not cause ejection. + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + // Wait for the failure rate algorithm to run once. + shortCtx, shortCancel := context.WithTimeout(ctx, 50*time.Millisecond) + defer shortCancel() + testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Ready) +} + type healthCheckingPetiolePolicyBuilder struct{} func (bb *healthCheckingPetiolePolicyBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { From f27c8779c0acb8267a53aae5694789a315e81d1e Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 11 Dec 2024 10:51:12 +0530 Subject: [PATCH 03/13] make OD work with wrr --- balancer/weightedroundrobin/balancer.go | 4 + .../e2e_test/outlierdetection_test.go | 299 +++++++----------- .../outlierdetection/subconn_wrapper.go | 1 + 3 files changed, 112 insertions(+), 192 deletions(-) diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index c9c5b576bb0c..fb1393688863 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/endpointsharding" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/balancer/weightedroundrobin/internal" "google.golang.org/grpc/balancer/weightedtarget" "google.golang.org/grpc/connectivity" @@ -232,6 +233,9 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error b.updateEndpointsLocked(ccs.ResolverState.Endpoints) b.mu.Unlock() + // Make children pickfirst use health listeners for outlier detection to + // work. + ccs.ResolverState = pickfirstleaf.EnableHealthListener(ccs.ResolverState) // This causes child to update picker inline and will thus cause inline // picker update. return b.child.UpdateClientConnState(balancer.ClientConnState{ diff --git a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go index b27edc1cdb05..28da7582969b 100644 --- a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go +++ b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go @@ -28,11 +28,12 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc" - "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" + "google.golang.org/grpc/balancer/weightedroundrobin" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" @@ -47,7 +48,18 @@ import ( _ "google.golang.org/grpc/xds/internal/balancer/outlierdetection" // To register helper functions which register/unregister Outlier Detection LB Policy. ) -var defaultTestTimeout = 5 * time.Second +var ( + defaultTestTimeout = 5 * time.Second + leafPolicyName = "round_robin" +) + +func init() { + // Test the health listener code path for ejection when the experimental + // pickfirst is enabled. + if envconfig.NewPickFirstEnabled { + leafPolicyName = weightedroundrobin.Name + } +} type s struct { grpctest.Tester @@ -159,50 +171,50 @@ func (s) TestOutlierDetectionAlgorithmsE2E(t *testing.T) { }{ { name: "Success Rate Algorithm", - odscJSON: ` -{ - "loadBalancingConfig": [ - { - "outlier_detection_experimental": { - "interval": "0.050s", - "baseEjectionTime": "0.100s", - "maxEjectionTime": "300s", - "maxEjectionPercent": 33, - "successRateEjection": { - "stdevFactor": 50, - "enforcementPercentage": 100, - "minimumHosts": 3, - "requestVolume": 5 - }, - "childPolicy": [{"round_robin": {}}] - } - } - ] -}`, + odscJSON: fmt.Sprintf(` + { + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": "0.050s", + "baseEjectionTime": "0.100s", + "maxEjectionTime": "300s", + "maxEjectionPercent": 33, + "successRateEjection": { + "stdevFactor": 50, + "enforcementPercentage": 100, + "minimumHosts": 3, + "requestVolume": 5 + }, + "childPolicy": [{"%s": {}}] + } + } + ] + }`, leafPolicyName), }, { name: "Failure Percentage Algorithm", - odscJSON: ` -{ - "loadBalancingConfig": [ - { - "outlier_detection_experimental": { - "interval": "0.050s", - "baseEjectionTime": "0.100s", - "maxEjectionTime": "300s", - "maxEjectionPercent": 33, - "failurePercentageEjection": { - "threshold": 50, - "enforcementPercentage": 100, - "minimumHosts": 3, - "requestVolume": 5 - }, - "childPolicy": [{"round_robin": {}} - ] - } - } - ] -}`, + odscJSON: fmt.Sprintf(` + { + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": "0.050s", + "baseEjectionTime": "0.100s", + "maxEjectionTime": "300s", + "maxEjectionPercent": 33, + "failurePercentageEjection": { + "threshold": 50, + "enforcementPercentage": 100, + "minimumHosts": 3, + "requestVolume": 5 + }, + "childPolicy": [{"%s": {}} + ] + } + } + ] + }`, leafPolicyName), }, } for _, test := range tests { @@ -277,20 +289,20 @@ func (s) TestNoopConfiguration(t *testing.T) { mr := manual.NewBuilderWithScheme("od-e2e") defer mr.Close() - noopODServiceConfigJSON := ` -{ - "loadBalancingConfig": [ - { - "outlier_detection_experimental": { - "interval": "0.050s", - "baseEjectionTime": "0.100s", - "maxEjectionTime": "300s", - "maxEjectionPercent": 33, - "childPolicy": [{"round_robin": {}}] - } - } - ] -}` + noopODServiceConfigJSON := fmt.Sprintf(` + { + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": "0.050s", + "baseEjectionTime": "0.100s", + "maxEjectionTime": "300s", + "maxEjectionPercent": 33, + "childPolicy": [{"%s": {}}] + } + } + ] + }`, leafPolicyName) sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(noopODServiceConfigJSON) // The full list of addresses. fullAddresses := []resolver.Address{ @@ -325,26 +337,26 @@ func (s) TestNoopConfiguration(t *testing.T) { // specifies to count RPC's and eject upstreams. Due to the balancer no // longer being a noop, it should eject any unhealthy addresses as specified // by the failure percentage portion of the configuration. - countingODServiceConfigJSON := ` -{ - "loadBalancingConfig": [ - { - "outlier_detection_experimental": { - "interval": "0.050s", - "baseEjectionTime": "0.100s", - "maxEjectionTime": "300s", - "maxEjectionPercent": 33, - "failurePercentageEjection": { - "threshold": 50, - "enforcementPercentage": 100, - "minimumHosts": 3, - "requestVolume": 5 - }, - "childPolicy": [{"round_robin": {}}] - } - } - ] -}` + countingODServiceConfigJSON := fmt.Sprintf(` + { + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": "0.050s", + "baseEjectionTime": "0.100s", + "maxEjectionTime": "300s", + "maxEjectionPercent": 33, + "failurePercentageEjection": { + "threshold": 50, + "enforcementPercentage": 100, + "minimumHosts": 3, + "requestVolume": 5 + }, + "childPolicy": [{"%s": {}}] + } + } + ] + }`, leafPolicyName) sc = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(countingODServiceConfigJSON) mr.UpdateState(resolver.State{ @@ -372,75 +384,6 @@ func (s) TestNoopConfiguration(t *testing.T) { } } -// Test verifies that outlier detection is performed using the -// generic health producer when the pickfirstleaf LB policy is used. The test -// server returns error for consecutive requests and test verifies that the -// endpoint is not ejected. -func (s) TestPickFirstHealthListenerEnabled(t *testing.T) { - backend := &stubserver.StubServer{ - EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { - return nil, errors.New("some error") - }, - } - if err := backend.StartServer(); err != nil { - t.Fatalf("Failed to start backend: %v", err) - } - defer backend.Stop() - t.Logf("Started bad TestService backend at: %q", backend.Address) - - countingODServiceConfigJSON := fmt.Sprintf(` -{ - "loadBalancingConfig": [ - { - "outlier_detection_experimental": { - "interval": "0.025s", - "baseEjectionTime": "0.100s", - "maxEjectionTime": "300s", - "failurePercentageEjection": { - "threshold": 50, - "enforcementPercentage": 100, - "minimumHosts": 0, - "requestVolume": 2 - }, - "childPolicy": [{"%s": {}}] - } - } - ] -}`, healthCheckingPetiolePolicyName) - sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(countingODServiceConfigJSON) - - mr := manual.NewBuilderWithScheme("od-e2e") - // The full list of addresses. - mr.InitialState(resolver.State{ - Addresses: []resolver.Address{{Addr: backend.Address}}, - ServiceConfig: sc, - }) - cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatalf("grpc.NewClient() failed: %v", err) - } - defer cc.Close() - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - testServiceClient := testgrpc.NewTestServiceClient(cc) - - // The first request should not cause ejection. - testServiceClient.EmptyCall(ctx, &testpb.Empty{}) - // Wait for the failure rate algorithm to run once. - shortCtx, shortCancel := context.WithTimeout(ctx, 50*time.Millisecond) - defer shortCancel() - testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Ready) - - // 2 failing request should cause ejection. - testServiceClient.EmptyCall(ctx, &testpb.Empty{}) - testServiceClient.EmptyCall(ctx, &testpb.Empty{}) - // Wait for the failure rate algorithm to run once. - testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) - - // The SubConn should be unejected after 100 millis. - testutils.AwaitState(ctx, t, cc, connectivity.Ready) -} - // Test verifies that outlier detection doesn't eject subchannels created by // the new pickfirst balancer when pickfirst is a top-level policy. func (s) TestPickFirstHealthListenerDisabled(t *testing.T) { @@ -456,24 +399,24 @@ func (s) TestPickFirstHealthListenerDisabled(t *testing.T) { t.Logf("Started bad TestService backend at: %q", backend.Address) countingODServiceConfigJSON := fmt.Sprintf(` -{ - "loadBalancingConfig": [ - { - "outlier_detection_experimental": { - "interval": "0.025s", - "baseEjectionTime": "0.100s", - "maxEjectionTime": "300s", - "failurePercentageEjection": { - "threshold": 50, - "enforcementPercentage": 100, - "minimumHosts": 0, - "requestVolume": 2 - }, - "childPolicy": [{"%s": {}}] - } - } - ] -}`, pickfirstleaf.Name) + { + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": "0.025s", + "baseEjectionTime": "0.100s", + "maxEjectionTime": "300s", + "failurePercentageEjection": { + "threshold": 50, + "enforcementPercentage": 100, + "minimumHosts": 0, + "requestVolume": 2 + }, + "childPolicy": [{"%s": {}}] + } + } + ] + }`, pickfirstleaf.Name) sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(countingODServiceConfigJSON) mr := manual.NewBuilderWithScheme("od-e2e") @@ -500,31 +443,3 @@ func (s) TestPickFirstHealthListenerDisabled(t *testing.T) { defer shortCancel() testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Ready) } - -type healthCheckingPetiolePolicyBuilder struct{} - -func (bb *healthCheckingPetiolePolicyBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - b := &healthCheckingPetiolePolicy{ - Balancer: balancer.Get(pickfirstleaf.Name).Build(cc, opts), - } - return b -} - -func (bb *healthCheckingPetiolePolicyBuilder) Name() string { - return healthCheckingPetiolePolicyName -} - -func (b *healthCheckingPetiolePolicy) UpdateClientConnState(state balancer.ClientConnState) error { - state.ResolverState = pickfirstleaf.EnableHealthListener(state.ResolverState) - return b.Balancer.UpdateClientConnState(state) -} - -type healthCheckingPetiolePolicy struct { - balancer.Balancer -} - -const healthCheckingPetiolePolicyName = "hcp" - -func init() { - balancer.Register(&healthCheckingPetiolePolicyBuilder{}) -} diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go index 058bd182dbac..dfa404483a7e 100644 --- a/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -96,6 +96,7 @@ func (scw *subConnWrapper) String() string { } func (scw *subConnWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) { + fmt.Println("Health listener registered") scw.mu.Lock() defer scw.mu.Unlock() From d91af318759e1ddf58bbc8b7ec14b2273fa6e3f2 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 11 Dec 2024 10:59:48 +0530 Subject: [PATCH 04/13] Less logging --- balancer/weightedroundrobin/balancer.go | 4 +++- xds/internal/balancer/outlierdetection/subconn_wrapper.go | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index fb1393688863..ad3ec98b7806 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -219,7 +219,9 @@ type wrrBalancer struct { } func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { - b.logger.Infof("UpdateCCS: %v", ccs) + if b.logger.V(2) { + b.logger.Infof("UpdateCCS: %v", ccs) + } cfg, ok := ccs.BalancerConfig.(*lbConfig) if !ok { return fmt.Errorf("wrr: received nil or illegal BalancerConfig (type %T): %v", ccs.BalancerConfig, ccs.BalancerConfig) diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go index dfa404483a7e..058bd182dbac 100644 --- a/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -96,7 +96,6 @@ func (scw *subConnWrapper) String() string { } func (scw *subConnWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) { - fmt.Println("Health listener registered") scw.mu.Lock() defer scw.mu.Unlock() From 44ae533e92941164efcfb639b6eadcec06a4f93e Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 11 Dec 2024 11:14:36 +0530 Subject: [PATCH 05/13] avoid conditional logic in endpointsharding --- balancer/endpointsharding/endpointsharding.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/balancer/endpointsharding/endpointsharding.go b/balancer/endpointsharding/endpointsharding.go index 263c024a84c7..9b59bfc1d979 100644 --- a/balancer/endpointsharding/endpointsharding.go +++ b/balancer/endpointsharding/endpointsharding.go @@ -35,11 +35,9 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" - "google.golang.org/grpc/balancer/pickfirst" "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/balancer/gracefulswitch" - "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" ) @@ -48,11 +46,7 @@ import ( var PickFirstConfig string func init() { - name := pickfirst.Name - if !envconfig.NewPickFirstEnabled { - name = pickfirstleaf.Name - } - PickFirstConfig = fmt.Sprintf("[{%q: {}}]", name) + PickFirstConfig = fmt.Sprintf("[{%q: {}}]", pickfirstleaf.Name) } // ChildState is the balancer state of a child along with the endpoint which From 8f63de9fcc6b04389c45ca2cda34a1c906497731 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Mon, 16 Dec 2024 12:56:57 +0530 Subject: [PATCH 06/13] Address review comments --- .../pickfirst/pickfirstleaf/pickfirstleaf.go | 10 ++-- .../pickfirstleaf/pickfirstleaf_ext_test.go | 5 +- .../balancer/outlierdetection/balancer.go | 23 ++++++--- .../e2e_test/outlierdetection_test.go | 7 ++- .../outlierdetection/subconn_wrapper.go | 50 ++++++++++++------- 5 files changed, 60 insertions(+), 35 deletions(-) diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go index 89b307f8ddb6..2699bff6caac 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go @@ -151,11 +151,11 @@ func EnableHealthListener(state resolver.State) resolver.State { // IsManagedByPickfirst returns whether an address belongs to a SubConn // managed by the pickfirst LB policy. -// TODO(arjan-bal): This is a hack to disable outlier_detection via the -// with connectivity listener when using pick_first. Once Dualstack changes -// are complete, all SubConns will be created by pick_first and outlier -// detection will only use the health listener for ejection. This hack can -// then be removed. +// TODO: https://github.com/grpc/grpc-go/issues/7915 -This is a hack to disable +// outlier_detection via the with connectivity listener when using pick_first. +// Once Dualstack changes are complete, all SubConns will be created by +// pick_first and outlier detection will only use the health listener for +// ejection. This hack can then be removed. func IsManagedByPickfirst(addr resolver.Address) bool { return addr.BalancerAttributes.Value(managedByPickfirstKeyType{}) != nil } diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go index d5a148c1155f..2a87d6690f4c 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go @@ -1491,7 +1491,6 @@ func subConnAddresses(ctx context.Context, cc *testutils.BalancerClientConn, sub if len(sc.Addresses) != 1 { return nil, fmt.Errorf("new subchannel created with %d addresses, want 1", len(sc.Addresses)) } - sc.Addresses[0].Attributes = nil addresses = append(addresses, sc.Addresses[0]) sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc.UpdateState(balancer.SubConnState{ @@ -1593,9 +1592,7 @@ func (b *backendManager) stopAllExcept(index int) { func (b *backendManager) resolverAddrs() []resolver.Address { addrs := make([]resolver.Address, len(b.backends)) for i, backend := range b.backends { - addrs[i] = resolver.Address{ - Addr: backend.Address, - } + addrs[i] = resolver.Address{Addr: backend.Address} } return addrs } diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index 9e8588d55d8d..cfb7155848f2 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -614,15 +614,24 @@ func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { return } - if !scw.healthListenerEnabled { - scw.latestDeleveredState = u.state - } - - if scw.healthListenerEnabled || !scw.ejected { + // If the health listener is being used for ejection, forward the + // connectivity updates unconditionally. + if scw.healthListenerEnabled { b.childMu.Lock() scw.listener(u.state) b.childMu.Unlock() + return + } + + // Raw connectivity listener is being used for ejection. + scw.stateForUnjection = u.state + if scw.ejected { + return } + b.childMu.Lock() + scw.listener(u.state) + b.childMu.Unlock() + } func (b *outlierDetectionBalancer) handleSubConnHealthUpdate(u *scHealthUpdate) { @@ -630,7 +639,7 @@ func (b *outlierDetectionBalancer) handleSubConnHealthUpdate(u *scHealthUpdate) scw.mu.Lock() defer scw.mu.Unlock() - scw.latestDeleveredState = u.state + scw.stateForUnjection = u.state if !scw.ejected && scw.healthListener != nil { b.childMu.Lock() scw.healthListener(u.state) @@ -645,7 +654,7 @@ func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) { scw.ejected = u.isEjected // If scw.latestState has never been written to will default to connectivity // IDLE, which is fine. - stateToUpdate := scw.latestDeleveredState + stateToUpdate := scw.stateForUnjection if u.isEjected { stateToUpdate = balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, diff --git a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go index 28da7582969b..428de85fdfb8 100644 --- a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go +++ b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go @@ -385,7 +385,12 @@ func (s) TestNoopConfiguration(t *testing.T) { } // Test verifies that outlier detection doesn't eject subchannels created by -// the new pickfirst balancer when pickfirst is a top-level policy. +// the new pickfirst balancer when pickfirst is a petiole policy itself. When +// pickfirst is not under a petiole policy, it will not register a health +// listener. pickfirst will still set the address attribute to disable ejection +// through the raw connectivity listener. When Outlier Detection processes a +// health update and sees the health listener is enabled but a health listener +// is not registered, it will drop the ejection update. func (s) TestPickFirstHealthListenerDisabled(t *testing.T) { backend := &stubserver.StubServer{ EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go index 058bd182dbac..dbcbb4dd5fc6 100644 --- a/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -37,38 +37,44 @@ type subConnWrapper struct { // that. listener func(balancer.SubConnState) + // healthListenerEnabled indicates whether the leaf LB policy is using a + // generic health listener. When enabled, ejection updates are sent via the + // health listener instead of the connectivity listener. Once Dualstack + // changes are complete, all SubConns will be created by pickfirst which + // uses the health listener. + healthListenerEnabled bool // addressInfo is a pointer to the subConnWrapper's corresponding address // map entry, if the map entry exists. addressInfo unsafe.Pointer // *addressInfo + + // The following fields are only referenced in the context of a work + // serializing buffer and don't need to be protected by a mutex. + // These two pieces of state will reach eventual consistency due to sync in // run(), and child will always have the correctly updated SubConnState. - // latestDeleveredState is the latest state update from the underlying SubConn. This - // is used whenever a SubConn gets unejected. This will be the health state - // if a health listener is being used, otherwise it will be the connectivity + + // stateForUnjection is the latest state update from the underlying + // SubConn that was processed through the serializer. This is used whenever + // a SubConn gets unejected. This will be the health state if a health + // listener is being used, otherwise it will be the connectivity // state. - latestDeleveredState balancer.SubConnState - ejected bool + stateForUnjection balancer.SubConnState + ejected bool scUpdateCh *buffer.Unbounded // addresses is the list of address(es) this SubConn was created with to // help support any change in address(es) addresses []resolver.Address - // healthListenerEnabled indicates whether the leaf LB policy is using a - // generic health listener. When enabled, ejection updates are sent via the - // health listener instead of the connectivity listener. Once Dualstack - // changes are complete, all SubConns will be created by pickfirst which - // uses the health listener. - healthListenerEnabled bool // Access to the following fields are protected by a mutex. mu sync.Mutex healthListener func(balancer.SubConnState) - // latestReceivedConnectivityState is the SubConn most recent connectivity - // state received from the subchannel. It may not be delivered to the child - // balancer yet. It is used to ensure a health listener is registered only - // when the subchannel is READY. + // latestReceivedConnectivityState is the SubConn's most recent connectivity + // state received. It may not be delivered to the child balancer yet. It is + // used to ensure a health listener is registered with the SubConn only when + // the SubConn is READY. latestReceivedConnectivityState connectivity.State } @@ -96,14 +102,22 @@ func (scw *subConnWrapper) String() string { } func (scw *subConnWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) { - scw.mu.Lock() - defer scw.mu.Unlock() - + // gRPC currently supports two mechanisms that provide a health signal for + // a connection: client-side health checking and outlier detection. Earlier + // both these mechanisms signaled unhealthiness by setting the subchannel + // state to TRANSIENT_FAILURE. As part of the dualstack changes to make + // pick_first the universal leaf policy (see A61), both these mechanisms + // started using the new health listener to make health signal visible to + // the petiole policies without affecting the underlying connectivity + // management of the pick_first policy if !scw.healthListenerEnabled { logger.Errorf("Health listener unexpectedly registered on SubConn %v.", scw) return } + scw.mu.Lock() + defer scw.mu.Unlock() + if scw.latestReceivedConnectivityState != connectivity.Ready { return } From f8eda82aaad5073bbfcec5220634bed578ea9dfb Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Mon, 16 Dec 2024 15:08:00 +0530 Subject: [PATCH 07/13] Refactor to simplify mutex locking --- .../balancer/outlierdetection/balancer.go | 128 ++++++++++-------- .../outlierdetection/subconn_wrapper.go | 30 +++- 2 files changed, 98 insertions(+), 60 deletions(-) diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index cfb7155848f2..4ac1be6f01c8 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -73,7 +73,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba } b.logger = prefixLogger(b) b.logger.Infof("Created") - b.child = gracefulswitch.NewBalancer(b, bOpts) + b.child = synchronizingBalancerWrapper{lb: gracefulswitch.NewBalancer(b, bOpts)} go b.run() return b } @@ -176,10 +176,7 @@ type outlierDetectionBalancer struct { logger *grpclog.PrefixLogger channelzParent channelz.Identifier - // childMu guards calls into child (to uphold the balancer.Balancer API - // guarantee of synchronous calls). - childMu sync.Mutex - child *gracefulswitch.Balancer + child synchronizingBalancerWrapper // mu guards access to the following fields. It also helps to synchronize // behaviors of the following events: config updates, firing of the interval @@ -282,13 +279,10 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt // the balancer.Balancer API, so it is guaranteed to be called in a // synchronous manner, so it cannot race with this read. if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name { - b.childMu.Lock() - err := b.child.SwitchTo(bb) - if err != nil { - b.childMu.Unlock() + + if err := b.child.switchTo(bb); err != nil { return fmt.Errorf("outlier detection: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err) } - b.childMu.Unlock() } b.mu.Lock() @@ -325,12 +319,10 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt } b.mu.Unlock() - b.childMu.Lock() - err := b.child.UpdateClientConnState(balancer.ClientConnState{ + err := b.child.updateClientConnState(balancer.ClientConnState{ ResolverState: s.ResolverState, BalancerConfig: b.cfg.ChildPolicy.Config, }) - b.childMu.Unlock() done := make(chan struct{}) b.pickerUpdateCh.Put(lbCfgUpdate{ @@ -343,9 +335,7 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt } func (b *outlierDetectionBalancer) ResolverError(err error) { - b.childMu.Lock() - defer b.childMu.Unlock() - b.child.ResolverError(err) + b.child.resolverError(err) } func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { @@ -361,9 +351,7 @@ func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state if state.ConnectivityState == connectivity.Shutdown { delete(b.scWrappers, scw.SubConn) } - scw.mu.Lock() - defer scw.mu.Unlock() - scw.latestReceivedConnectivityState = state.ConnectivityState + scw.setLatestConnectivityState(state.ConnectivityState) b.scUpdateCh.Put(&scUpdate{ scw: scw, state: state, @@ -377,9 +365,7 @@ func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state func (b *outlierDetectionBalancer) Close() { b.closed.Fire() <-b.done.Done() - b.childMu.Lock() - b.child.Close() - b.childMu.Unlock() + b.child.closeLB() b.scUpdateCh.Close() b.pickerUpdateCh.Close() @@ -392,9 +378,7 @@ func (b *outlierDetectionBalancer) Close() { } func (b *outlierDetectionBalancer) ExitIdle() { - b.childMu.Lock() - defer b.childMu.Unlock() - b.child.ExitIdle() + b.child.exitIdle() } // wrappedPicker delegates to the child policy's picker, and when the request @@ -606,9 +590,7 @@ func (b *outlierDetectionBalancer) Target() string { // if the SubConn is not ejected. func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { scw := u.scw - scw.mu.Lock() - scw.healthListener = nil - scw.mu.Unlock() + scw.clearHealthListener() if scw.listener == nil { return @@ -617,9 +599,7 @@ func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { // If the health listener is being used for ejection, forward the // connectivity updates unconditionally. if scw.healthListenerEnabled { - b.childMu.Lock() - scw.listener(u.state) - b.childMu.Unlock() + b.child.updateSubConnState(scw, u.state) return } @@ -628,22 +608,13 @@ func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { if scw.ejected { return } - b.childMu.Lock() - scw.listener(u.state) - b.childMu.Unlock() - + b.child.updateSubConnState(scw, u.state) } func (b *outlierDetectionBalancer) handleSubConnHealthUpdate(u *scHealthUpdate) { - scw := u.scw - scw.mu.Lock() - defer scw.mu.Unlock() - - scw.stateForUnjection = u.state - if !scw.ejected && scw.healthListener != nil { - b.childMu.Lock() - scw.healthListener(u.state) - b.childMu.Unlock() + u.scw.stateForUnjection = u.state + if !u.scw.ejected { + b.child.updateSubConnHealthState(u.scw, u.state) } } @@ -662,20 +633,10 @@ func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) { } if !scw.healthListenerEnabled { - if scw.listener != nil { - b.childMu.Lock() - scw.listener(stateToUpdate) - b.childMu.Unlock() - } + b.child.updateSubConnState(scw, stateToUpdate) return } - scw.mu.Lock() - defer scw.mu.Unlock() - if scw.healthListener != nil { - b.childMu.Lock() - scw.healthListener(stateToUpdate) - b.childMu.Unlock() - } + b.child.updateSubConnHealthState(scw, stateToUpdate) } // handleChildStateUpdate forwards the picker update wrapped in a wrapped picker @@ -934,6 +895,61 @@ func (b *outlierDetectionBalancer) unejectAddress(addrInfo *addressInfo) { } } +// synchronizingBalancerWrapper serialized calls into balancer (to uphold the +// balancer.Balancer API guarantee of synchronous calls). It also ensures a +// consistent order of locking mutexes when using SubConn listeners to avoid +// deadlocks. +type synchronizingBalancerWrapper struct { + // mu should not be used directly from outside this struct, instead use + // methods defined on the struct. + mu sync.Mutex + lb *gracefulswitch.Balancer +} + +func (sbw *synchronizingBalancerWrapper) switchTo(builder balancer.Builder) error { + sbw.mu.Lock() + defer sbw.mu.Unlock() + return sbw.lb.SwitchTo(builder) +} + +func (sbw *synchronizingBalancerWrapper) updateClientConnState(state balancer.ClientConnState) error { + sbw.mu.Lock() + defer sbw.mu.Unlock() + return sbw.lb.UpdateClientConnState(state) +} + +func (sbw *synchronizingBalancerWrapper) resolverError(err error) { + sbw.mu.Lock() + sbw.lb.ResolverError(err) + sbw.mu.Unlock() +} + +func (sbw *synchronizingBalancerWrapper) closeLB() { + sbw.mu.Lock() + sbw.lb.Close() + sbw.mu.Unlock() +} + +func (sbw *synchronizingBalancerWrapper) exitIdle() { + sbw.mu.Lock() + sbw.lb.ExitIdle() + sbw.mu.Unlock() +} + +func (sbw *synchronizingBalancerWrapper) updateSubConnHealthState(scw *subConnWrapper, scs balancer.SubConnState) { + sbw.mu.Lock() + scw.updateHealthState(scs) + sbw.mu.Unlock() +} + +func (sbw *synchronizingBalancerWrapper) updateSubConnState(scw *subConnWrapper, scs balancer.SubConnState) { + sbw.mu.Lock() + if scw.listener != nil { + scw.listener(scs) + } + sbw.mu.Unlock() +} + // addressInfo contains the runtime information about an address that pertains // to Outlier Detection. This struct and all of its fields is protected by // outlierDetectionBalancer.mu in the case where it is accessed through the diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go index dbcbb4dd5fc6..09aadd031b87 100644 --- a/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -68,14 +68,16 @@ type subConnWrapper struct { // help support any change in address(es) addresses []resolver.Address - // Access to the following fields are protected by a mutex. + // Access to the following fields are protected by a mutex. These fields + // should not be accessed from outside this file, instead use methods + // defined on the struct. mu sync.Mutex healthListener func(balancer.SubConnState) - // latestReceivedConnectivityState is the SubConn's most recent connectivity + // latestConnectivityState is the SubConn's most recent connectivity // state received. It may not be delivered to the child balancer yet. It is // used to ensure a health listener is registered with the SubConn only when // the SubConn is READY. - latestReceivedConnectivityState connectivity.State + latestConnectivityState connectivity.State } // eject causes the wrapper to report a state update with the TRANSIENT_FAILURE @@ -118,7 +120,7 @@ func (scw *subConnWrapper) RegisterHealthListener(listener func(balancer.SubConn scw.mu.Lock() defer scw.mu.Unlock() - if scw.latestReceivedConnectivityState != connectivity.Ready { + if scw.latestConnectivityState != connectivity.Ready { return } scw.healthListener = listener @@ -133,3 +135,23 @@ func (scw *subConnWrapper) RegisterHealthListener(listener func(balancer.SubConn }) } } + +func (scw *subConnWrapper) updateHealthState(scs balancer.SubConnState) { + scw.mu.Lock() + if scw.healthListener != nil { + scw.healthListener(scs) + } + scw.mu.Unlock() +} + +func (scw *subConnWrapper) clearHealthListener() { + scw.mu.Lock() + scw.healthListener = nil + scw.mu.Unlock() +} + +func (scw *subConnWrapper) setLatestConnectivityState(state connectivity.State) { + scw.mu.Lock() + scw.latestConnectivityState = state + scw.mu.Unlock() +} From 6a6aa7a247a23ed4fee8445325ffc4a39ade6476 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 17 Dec 2024 12:48:48 +0530 Subject: [PATCH 08/13] Address review comments --- .../balancer/outlierdetection/balancer.go | 61 ++++++------- .../outlierdetection/subconn_wrapper.go | 91 ++++++++++++++----- 2 files changed, 94 insertions(+), 58 deletions(-) diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index 4ac1be6f01c8..d2e08db637c9 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -193,8 +193,8 @@ type outlierDetectionBalancer struct { // which uses addrs. This balancer waits for the interval timer algorithm to // finish before making the update to the addrs map. // - // This mutex is never held at the same time as childMu (within the context - // of a single goroutine). + // This mutex is never held when calling methods on the child policy + // (within the context of a single goroutine). mu sync.Mutex addrs map[string]*addressInfo cfg *LBConfig @@ -468,11 +468,13 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal return nil, err } scw := &subConnWrapper{ - SubConn: sc, - addresses: addrs, - scUpdateCh: b.scUpdateCh, - listener: oldListener, - healthListenerEnabled: len(addrs) == 1 && pickfirstleaf.IsManagedByPickfirst(addrs[0]), + SubConn: sc, + addresses: addrs, + scUpdateCh: b.scUpdateCh, + listener: oldListener, + latestRawConnectivityState: balancer.SubConnState{ConnectivityState: connectivity.Idle}, + latestHealthState: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, + healthListenerEnabled: len(addrs) == 1 && pickfirstleaf.IsManagedByPickfirst(addrs[0]), } b.mu.Lock() defer b.mu.Unlock() @@ -604,7 +606,6 @@ func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { } // Raw connectivity listener is being used for ejection. - scw.stateForUnjection = u.state if scw.ejected { return } @@ -612,7 +613,6 @@ func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { } func (b *outlierDetectionBalancer) handleSubConnHealthUpdate(u *scHealthUpdate) { - u.scw.stateForUnjection = u.state if !u.scw.ejected { b.child.updateSubConnHealthState(u.scw, u.state) } @@ -621,22 +621,7 @@ func (b *outlierDetectionBalancer) handleSubConnHealthUpdate(u *scHealthUpdate) // handleEjectedUpdate handles any SubConns that get ejected/unejected, and // forwards the appropriate corresponding subConnState to the child policy. func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) { - scw := u.scw - scw.ejected = u.isEjected - // If scw.latestState has never been written to will default to connectivity - // IDLE, which is fine. - stateToUpdate := scw.stateForUnjection - if u.isEjected { - stateToUpdate = balancer.SubConnState{ - ConnectivityState: connectivity.TransientFailure, - } - } - - if !scw.healthListenerEnabled { - b.child.updateSubConnState(scw, stateToUpdate) - return - } - b.child.updateSubConnHealthState(scw, stateToUpdate) + b.child.handleEjectionUpdate(u) } // handleChildStateUpdate forwards the picker update wrapped in a wrapped picker @@ -895,7 +880,7 @@ func (b *outlierDetectionBalancer) unejectAddress(addrInfo *addressInfo) { } } -// synchronizingBalancerWrapper serialized calls into balancer (to uphold the +// synchronizingBalancerWrapper serializes calls into balancer (to uphold the // balancer.Balancer API guarantee of synchronous calls). It also ensures a // consistent order of locking mutexes when using SubConn listeners to avoid // deadlocks. @@ -920,34 +905,42 @@ func (sbw *synchronizingBalancerWrapper) updateClientConnState(state balancer.Cl func (sbw *synchronizingBalancerWrapper) resolverError(err error) { sbw.mu.Lock() + defer sbw.mu.Unlock() sbw.lb.ResolverError(err) - sbw.mu.Unlock() } func (sbw *synchronizingBalancerWrapper) closeLB() { sbw.mu.Lock() + defer sbw.mu.Unlock() sbw.lb.Close() - sbw.mu.Unlock() } func (sbw *synchronizingBalancerWrapper) exitIdle() { sbw.mu.Lock() + defer sbw.mu.Unlock() sbw.lb.ExitIdle() - sbw.mu.Unlock() } func (sbw *synchronizingBalancerWrapper) updateSubConnHealthState(scw *subConnWrapper, scs balancer.SubConnState) { sbw.mu.Lock() - scw.updateHealthState(scs) - sbw.mu.Unlock() + defer sbw.mu.Unlock() + scw.updateSubConnHealthState(scs) } func (sbw *synchronizingBalancerWrapper) updateSubConnState(scw *subConnWrapper, scs balancer.SubConnState) { sbw.mu.Lock() - if scw.listener != nil { - scw.listener(scs) + defer sbw.mu.Unlock() + scw.updateSubConnConnectivityState(scs) +} + +func (sbw *synchronizingBalancerWrapper) handleEjectionUpdate(u *ejectionUpdate) { + sbw.mu.Lock() + defer sbw.mu.Unlock() + if u.isEjected { + u.scw.handleEjection() + } else { + u.scw.handleUnejection() } - sbw.mu.Unlock() } // addressInfo contains the runtime information about an address that pertains diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go index 09aadd031b87..9f1ec959a80b 100644 --- a/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -47,6 +47,7 @@ type subConnWrapper struct { // addressInfo is a pointer to the subConnWrapper's corresponding address // map entry, if the map entry exists. addressInfo unsafe.Pointer // *addressInfo + scUpdateCh *buffer.Unbounded // The following fields are only referenced in the context of a work // serializing buffer and don't need to be protected by a mutex. @@ -54,30 +55,28 @@ type subConnWrapper struct { // These two pieces of state will reach eventual consistency due to sync in // run(), and child will always have the correctly updated SubConnState. - // stateForUnjection is the latest state update from the underlying - // SubConn that was processed through the serializer. This is used whenever - // a SubConn gets unejected. This will be the health state if a health - // listener is being used, otherwise it will be the connectivity - // state. - stateForUnjection balancer.SubConnState - ejected bool - - scUpdateCh *buffer.Unbounded + ejected bool // addresses is the list of address(es) this SubConn was created with to // help support any change in address(es) addresses []resolver.Address + // latestHealthState is tracked to update the child policy during + // unejection. + latestHealthState balancer.SubConnState + // latestRawConnectivityState is tracked to update the child policy during + // unejection. + latestRawConnectivityState balancer.SubConnState // Access to the following fields are protected by a mutex. These fields // should not be accessed from outside this file, instead use methods // defined on the struct. mu sync.Mutex healthListener func(balancer.SubConnState) - // latestConnectivityState is the SubConn's most recent connectivity + // latestReceivedConnectivityState is the SubConn's most recent connectivity // state received. It may not be delivered to the child balancer yet. It is // used to ensure a health listener is registered with the SubConn only when // the SubConn is READY. - latestConnectivityState connectivity.State + latestReceivedConnectivityState connectivity.State } // eject causes the wrapper to report a state update with the TRANSIENT_FAILURE @@ -120,38 +119,82 @@ func (scw *subConnWrapper) RegisterHealthListener(listener func(balancer.SubConn scw.mu.Lock() defer scw.mu.Unlock() - if scw.latestConnectivityState != connectivity.Ready { + if scw.latestReceivedConnectivityState != connectivity.Ready { return } scw.healthListener = listener if listener == nil { scw.SubConn.RegisterHealthListener(nil) - } else { - scw.SubConn.RegisterHealthListener(func(scs balancer.SubConnState) { - scw.scUpdateCh.Put(&scHealthUpdate{ - scw: scw, - state: scs, - }) - }) + return } + + scw.SubConn.RegisterHealthListener(func(scs balancer.SubConnState) { + scw.scUpdateCh.Put(&scHealthUpdate{ + scw: scw, + state: scs, + }) + }) } -func (scw *subConnWrapper) updateHealthState(scs balancer.SubConnState) { +// updateSubConnHealthState stores the latest health state for unejection and +// sends updates the health listener. +func (scw *subConnWrapper) updateSubConnHealthState(scs balancer.SubConnState) { + scw.latestHealthState = scs scw.mu.Lock() + defer scw.mu.Unlock() if scw.healthListener != nil { scw.healthListener(scs) } - scw.mu.Unlock() +} + +// updateSubConnConnectivityState stores the latest connectivity state for +// unejection and updates the raw connectivity listener. +func (scw *subConnWrapper) updateSubConnConnectivityState(scs balancer.SubConnState) { + scw.latestRawConnectivityState = scs + if scw.listener != nil { + scw.listener(scs) + } } func (scw *subConnWrapper) clearHealthListener() { scw.mu.Lock() + defer scw.mu.Unlock() scw.healthListener = nil - scw.mu.Unlock() +} + +func (scw *subConnWrapper) handleUnejection() { + scw.ejected = false + if !scw.healthListenerEnabled { + // If scw.latestRawConnectivityState has never been written to will + // default to connectivity IDLE, which is fine. + scw.updateSubConnConnectivityState(scw.latestRawConnectivityState) + return + } + // If scw.latestHealthState has never been written to will use the health + // state CONNECTING set during object creation. + scw.updateSubConnHealthState(scw.latestHealthState) +} + +func (scw *subConnWrapper) handleEjection() { + scw.ejected = true + stateToUpdate := balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + } + if !scw.healthListenerEnabled { + if scw.listener != nil { + scw.listener(stateToUpdate) + } + return + } + scw.mu.Lock() + defer scw.mu.Unlock() + if scw.healthListener != nil { + scw.healthListener(stateToUpdate) + } } func (scw *subConnWrapper) setLatestConnectivityState(state connectivity.State) { scw.mu.Lock() - scw.latestConnectivityState = state - scw.mu.Unlock() + defer scw.mu.Unlock() + scw.latestReceivedConnectivityState = state } From 1af2206bb4233558032e41b2361ac9de4ecc05b5 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 17 Dec 2024 15:09:36 +0530 Subject: [PATCH 09/13] Move ejection tracking to sc wrapper --- .../balancer/outlierdetection/balancer.go | 20 +------------------ .../outlierdetection/subconn_wrapper.go | 8 ++++++++ 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index d2e08db637c9..e607f0785a6d 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -593,29 +593,11 @@ func (b *outlierDetectionBalancer) Target() string { func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { scw := u.scw scw.clearHealthListener() - - if scw.listener == nil { - return - } - - // If the health listener is being used for ejection, forward the - // connectivity updates unconditionally. - if scw.healthListenerEnabled { - b.child.updateSubConnState(scw, u.state) - return - } - - // Raw connectivity listener is being used for ejection. - if scw.ejected { - return - } b.child.updateSubConnState(scw, u.state) } func (b *outlierDetectionBalancer) handleSubConnHealthUpdate(u *scHealthUpdate) { - if !u.scw.ejected { - b.child.updateSubConnHealthState(u.scw, u.state) - } + b.child.updateSubConnHealthState(u.scw, u.state) } // handleEjectedUpdate handles any SubConns that get ejected/unejected, and diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go index 9f1ec959a80b..7c0a2c117bdf 100644 --- a/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -140,6 +140,9 @@ func (scw *subConnWrapper) RegisterHealthListener(listener func(balancer.SubConn // sends updates the health listener. func (scw *subConnWrapper) updateSubConnHealthState(scs balancer.SubConnState) { scw.latestHealthState = scs + if scw.ejected { + return + } scw.mu.Lock() defer scw.mu.Unlock() if scw.healthListener != nil { @@ -151,6 +154,11 @@ func (scw *subConnWrapper) updateSubConnHealthState(scs balancer.SubConnState) { // unejection and updates the raw connectivity listener. func (scw *subConnWrapper) updateSubConnConnectivityState(scs balancer.SubConnState) { scw.latestRawConnectivityState = scs + // If the raw connectivity listener is used for ejection, and the SubConn is + // ejected, don't send the update. + if scw.ejected && !scw.healthListenerEnabled { + return + } if scw.listener != nil { scw.listener(scs) } From d84d0dede75d3bbb02791a42db5f7b99fced01eb Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 17 Dec 2024 15:40:46 +0530 Subject: [PATCH 10/13] Fix test comment --- .../e2e_test/outlierdetection_test.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go index 428de85fdfb8..4be141eb58b0 100644 --- a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go +++ b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go @@ -385,12 +385,13 @@ func (s) TestNoopConfiguration(t *testing.T) { } // Test verifies that outlier detection doesn't eject subchannels created by -// the new pickfirst balancer when pickfirst is a petiole policy itself. When -// pickfirst is not under a petiole policy, it will not register a health -// listener. pickfirst will still set the address attribute to disable ejection -// through the raw connectivity listener. When Outlier Detection processes a -// health update and sees the health listener is enabled but a health listener -// is not registered, it will drop the ejection update. +// the new pickfirst balancer when pickfirst is a non-leaf policy, i.e. not +// under a petiole policy. When pickfirst is not under a petiole policy, it will +// not register a health listener. pickfirst will still set the address +// attribute to disable ejection through the raw connectivity listener. When +// Outlier Detection processes a health update and sees the health listener is +// enabled but a health listener is not registered, it will drop the ejection +// update. func (s) TestPickFirstHealthListenerDisabled(t *testing.T) { backend := &stubserver.StubServer{ EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { From e3879a5bff9d88484bb9a325bf21dcf2662aa15d Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Fri, 20 Dec 2024 00:38:29 +0530 Subject: [PATCH 11/13] Address review comments --- balancer/pickfirst/pickfirstleaf/pickfirstleaf.go | 2 +- balancer/weightedroundrobin/balancer.go | 2 +- xds/internal/balancer/outlierdetection/subconn_wrapper.go | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go index 2699bff6caac..41f820297c00 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go @@ -151,7 +151,7 @@ func EnableHealthListener(state resolver.State) resolver.State { // IsManagedByPickfirst returns whether an address belongs to a SubConn // managed by the pickfirst LB policy. -// TODO: https://github.com/grpc/grpc-go/issues/7915 -This is a hack to disable +// TODO: https://github.com/grpc/grpc-go/issues/7915 - This is a hack to disable // outlier_detection via the with connectivity listener when using pick_first. // Once Dualstack changes are complete, all SubConns will be created by // pick_first and outlier detection will only use the health listener for diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index ad3ec98b7806..d7b9dc4666ee 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -235,7 +235,7 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error b.updateEndpointsLocked(ccs.ResolverState.Endpoints) b.mu.Unlock() - // Make children pickfirst use health listeners for outlier detection to + // Make pickfirst children use health listeners for outlier detection to // work. ccs.ResolverState = pickfirstleaf.EnableHealthListener(ccs.ResolverState) // This causes child to update picker inline and will thus cause inline diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go index 7c0a2c117bdf..a0f9dcab093e 100644 --- a/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -42,6 +42,10 @@ type subConnWrapper struct { // health listener instead of the connectivity listener. Once Dualstack // changes are complete, all SubConns will be created by pickfirst which // uses the health listener. + // TODO: https://github.com/grpc/grpc-go/issues/7915 - Once Dualstack + // changes are complete, all SubConns will be created by pick_first and + // outlier detection will only use the health listener for ejection and + // this field can be removed. healthListenerEnabled bool // addressInfo is a pointer to the subConnWrapper's corresponding address From 056e0ae8c1bb841c17c8cddcc64dc35492c2a4bb Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Fri, 20 Dec 2024 14:22:41 +0530 Subject: [PATCH 12/13] Move test to internal and trigger internal algo directly --- .../balancer/outlierdetection/balancer.go | 1 - .../outlierdetection/balancer_test.go | 96 +++++++++++++++++++ .../e2e_test/outlierdetection_test.go | 69 ------------- 3 files changed, 96 insertions(+), 70 deletions(-) diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index e607f0785a6d..8f58c0030321 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -279,7 +279,6 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt // the balancer.Balancer API, so it is guaranteed to be called in a // synchronous manner, so it cannot race with this read. if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name { - if err := b.child.switchTo(bb); err != nil { return fmt.Errorf("outlier detection: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err) } diff --git a/xds/internal/balancer/outlierdetection/balancer_test.go b/xds/internal/balancer/outlierdetection/balancer_test.go index a80d4d9dee4f..ca07dcda0c88 100644 --- a/xds/internal/balancer/outlierdetection/balancer_test.go +++ b/xds/internal/balancer/outlierdetection/balancer_test.go @@ -31,17 +31,24 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpctest" iserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/clusterimpl" + + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" ) var ( @@ -1568,3 +1575,92 @@ func (s) TestConcurrentOperations(t *testing.T) { close(finished) wg.Wait() } + +// Test verifies that outlier detection doesn't eject subchannels created by +// the new pickfirst balancer when pickfirst is a non-leaf policy, i.e. not +// under a petiole policy. When pickfirst is not under a petiole policy, it will +// not register a health listener. pickfirst will still set the address +// attribute to disable ejection through the raw connectivity listener. When +// Outlier Detection processes a health update and sees the health listener is +// enabled but a health listener is not registered, it will drop the ejection +// update. +func (s) TestPickFirstHealthListenerDisabled(t *testing.T) { + backend := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + return nil, errors.New("some error") + }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + defer backend.Stop() + t.Logf("Started bad TestService backend at: %q", backend.Address) + + // The interval is intentionally kept very large, the interval algorithm + // will be triggered manually. + odCfg := &LBConfig{ + Interval: iserviceconfig.Duration(300 * time.Second), + BaseEjectionTime: iserviceconfig.Duration(300 * time.Second), + MaxEjectionTime: iserviceconfig.Duration(500 * time.Second), + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 0, + RequestVolume: 2, + }, + MaxEjectionPercent: 100, + ChildPolicy: &iserviceconfig.BalancerConfig{ + Name: pickfirstleaf.Name, + }, + } + + lbChan := make(chan *outlierDetectionBalancer, 1) + bf := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + bd.Data = balancer.Get(Name).Build(bd.ClientConn, bd.BuildOptions) + lbChan <- bd.Data.(*outlierDetectionBalancer) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + ccs.BalancerConfig = odCfg + return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) + }, + } + + stub.Register(t.Name(), bf) + + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name())), + } + cc, err := grpc.NewClient(backend.Address, opts...) + if err != nil { + t.Fatalf("grpc.NewClient() failed: %v", err) + } + defer cc.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testServiceClient := testgrpc.NewTestServiceClient(cc) + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + testutils.AwaitState(ctx, t, cc, connectivity.Ready) + + // Failing request should not cause ejection. + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + + // Run the interval algorithm. + select { + case <-ctx.Done(): + t.Fatal("Timed out waiting for the outlier detection LB policy to be built.") + case od := <-lbChan: + od.intervalTimerAlgorithm() + } + + shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer shortCancel() + testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Ready) +} diff --git a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go index 4be141eb58b0..ec087eb08b53 100644 --- a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go +++ b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go @@ -28,15 +28,12 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc" - "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/balancer/weightedroundrobin" - "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" - "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -383,69 +380,3 @@ func (s) TestNoopConfiguration(t *testing.T) { t.Fatalf("error in expected round robin: %v", err) } } - -// Test verifies that outlier detection doesn't eject subchannels created by -// the new pickfirst balancer when pickfirst is a non-leaf policy, i.e. not -// under a petiole policy. When pickfirst is not under a petiole policy, it will -// not register a health listener. pickfirst will still set the address -// attribute to disable ejection through the raw connectivity listener. When -// Outlier Detection processes a health update and sees the health listener is -// enabled but a health listener is not registered, it will drop the ejection -// update. -func (s) TestPickFirstHealthListenerDisabled(t *testing.T) { - backend := &stubserver.StubServer{ - EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { - return nil, errors.New("some error") - }, - } - if err := backend.StartServer(); err != nil { - t.Fatalf("Failed to start backend: %v", err) - } - defer backend.Stop() - t.Logf("Started bad TestService backend at: %q", backend.Address) - - countingODServiceConfigJSON := fmt.Sprintf(` - { - "loadBalancingConfig": [ - { - "outlier_detection_experimental": { - "interval": "0.025s", - "baseEjectionTime": "0.100s", - "maxEjectionTime": "300s", - "failurePercentageEjection": { - "threshold": 50, - "enforcementPercentage": 100, - "minimumHosts": 0, - "requestVolume": 2 - }, - "childPolicy": [{"%s": {}}] - } - } - ] - }`, pickfirstleaf.Name) - sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(countingODServiceConfigJSON) - - mr := manual.NewBuilderWithScheme("od-e2e") - - mr.InitialState(resolver.State{ - Addresses: []resolver.Address{{Addr: backend.Address}}, - ServiceConfig: sc, - }) - cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatalf("grpc.NewClient() failed: %v", err) - } - defer cc.Close() - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - testServiceClient := testgrpc.NewTestServiceClient(cc) - - // Failing request should not cause ejection. - testServiceClient.EmptyCall(ctx, &testpb.Empty{}) - testServiceClient.EmptyCall(ctx, &testpb.Empty{}) - testServiceClient.EmptyCall(ctx, &testpb.Empty{}) - // Wait for the failure rate algorithm to run once. - shortCtx, shortCancel := context.WithTimeout(ctx, 50*time.Millisecond) - defer shortCancel() - testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Ready) -} From b20e0189cfb0f991dd2923e4244daa4217b110f7 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Mon, 23 Dec 2024 10:24:50 +0530 Subject: [PATCH 13/13] Fix comments --- balancer/pickfirst/pickfirstleaf/pickfirstleaf.go | 4 ++-- xds/internal/balancer/outlierdetection/subconn_wrapper.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go index 41f820297c00..76fa5fea95f2 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go @@ -55,8 +55,8 @@ func init() { } type ( - // enableHealthListenerKeyType is a unique key type used in resolver attributes - // to indicate whether the health listener usage is enabled. + // enableHealthListenerKeyType is a unique key type used in resolver + // attributes to indicate whether the health listener usage is enabled. enableHealthListenerKeyType struct{} // managedByPickfirstKeyType is an attribute key type to inform Outlier // Detection that the generic health listener is being used. diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go index a0f9dcab093e..7d710fde1b2a 100644 --- a/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -33,6 +33,9 @@ import ( // whether or not this SubConn is ejected. type subConnWrapper struct { balancer.SubConn + // addressInfo is a pointer to the subConnWrapper's corresponding address + // map entry, if the map entry exists. It is accessed atomically. + addressInfo unsafe.Pointer // *addressInfo // The following fields are set during object creation and read-only after // that. @@ -48,10 +51,7 @@ type subConnWrapper struct { // this field can be removed. healthListenerEnabled bool - // addressInfo is a pointer to the subConnWrapper's corresponding address - // map entry, if the map entry exists. - addressInfo unsafe.Pointer // *addressInfo - scUpdateCh *buffer.Unbounded + scUpdateCh *buffer.Unbounded // The following fields are only referenced in the context of a work // serializing buffer and don't need to be protected by a mutex.