diff --git a/balancer/endpointsharding/endpointsharding.go b/balancer/endpointsharding/endpointsharding.go index 263c024a84c7..9b59bfc1d979 100644 --- a/balancer/endpointsharding/endpointsharding.go +++ b/balancer/endpointsharding/endpointsharding.go @@ -35,11 +35,9 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" - "google.golang.org/grpc/balancer/pickfirst" "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/balancer/gracefulswitch" - "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" ) @@ -48,11 +46,7 @@ import ( var PickFirstConfig string func init() { - name := pickfirst.Name - if !envconfig.NewPickFirstEnabled { - name = pickfirstleaf.Name - } - PickFirstConfig = fmt.Sprintf("[{%q: {}}]", name) + PickFirstConfig = fmt.Sprintf("[{%q: {}}]", pickfirstleaf.Name) } // ChildState is the balancer state of a child along with the endpoint which diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go index 2fc0a71f9441..76fa5fea95f2 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go @@ -54,9 +54,18 @@ func init() { balancer.Register(pickfirstBuilder{}) } -// enableHealthListenerKeyType is a unique key type used in resolver attributes -// to indicate whether the health listener usage is enabled. -type enableHealthListenerKeyType struct{} +type ( + // enableHealthListenerKeyType is a unique key type used in resolver + // attributes to indicate whether the health listener usage is enabled. + enableHealthListenerKeyType struct{} + // managedByPickfirstKeyType is an attribute key type to inform Outlier + // Detection that the generic health listener is being used. + // TODO: https://github.com/grpc/grpc-go/issues/7915 - Remove this when + // implementing the dualstack design. This is a hack. Once Dualstack is + // completed, outlier detection will stop sending ejection updates through + // the connectivity listener. + managedByPickfirstKeyType struct{} +) var ( logger = grpclog.Component("pick-first-leaf-lb") @@ -140,6 +149,17 @@ func EnableHealthListener(state resolver.State) resolver.State { return state } +// IsManagedByPickfirst returns whether an address belongs to a SubConn +// managed by the pickfirst LB policy. +// TODO: https://github.com/grpc/grpc-go/issues/7915 - This is a hack to disable +// outlier_detection via the with connectivity listener when using pick_first. +// Once Dualstack changes are complete, all SubConns will be created by +// pick_first and outlier detection will only use the health listener for +// ejection. This hack can then be removed. +func IsManagedByPickfirst(addr resolver.Address) bool { + return addr.BalancerAttributes.Value(managedByPickfirstKeyType{}) != nil +} + type pfConfig struct { serviceconfig.LoadBalancingConfig `json:"-"` @@ -166,6 +186,7 @@ type scData struct { } func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) { + addr.BalancerAttributes = addr.BalancerAttributes.WithValue(managedByPickfirstKeyType{}, true) sd := &scData{ rawConnectivityState: connectivity.Idle, effectiveState: connectivity.Idle, diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go index 9667c2b3db6b..2a87d6690f4c 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go @@ -58,7 +58,13 @@ const ( stateStoringBalancerName = "state_storing" ) -var stateStoringServiceConfig = fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateStoringBalancerName) +var ( + stateStoringServiceConfig = fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateStoringBalancerName) + ignoreBalAttributesOpt = cmp.Transformer("IgnoreBalancerAttributes", func(a resolver.Address) resolver.Address { + a.BalancerAttributes = nil + return a + }) +) type s struct { grpctest.Tester @@ -177,7 +183,7 @@ func (s) TestPickFirstLeaf_SimpleResolverUpdate_FirstServerReady(t *testing.T) { wantSCStates := []scState{ {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -219,7 +225,7 @@ func (s) TestPickFirstLeaf_SimpleResolverUpdate_FirstServerUnReady(t *testing.T) {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Shutdown}, {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -264,7 +270,7 @@ func (s) TestPickFirstLeaf_SimpleResolverUpdate_DuplicateAddrs(t *testing.T) { {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Shutdown}, {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -317,7 +323,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_DisjointLists(t *testing.T) { {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -334,7 +340,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_DisjointLists(t *testing.T) { {Addrs: []resolver.Address{addrs[3]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -378,7 +384,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_ActiveBackendInUpdatedList(t *testing {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -398,7 +404,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_ActiveBackendInUpdatedList(t *testing {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -440,7 +446,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_InActiveBackendInUpdatedList(t *testi {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -458,7 +464,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_InActiveBackendInUpdatedList(t *testi {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -502,7 +508,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_IdenticalLists(t *testing.T) { {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -521,7 +527,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_IdenticalLists(t *testing.T) { {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -576,7 +582,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerRestart(t *testing.T) {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -591,7 +597,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerRestart(t *testing.T) t.Fatal(err) } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -639,7 +645,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerRestart(t *testing.T) {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -660,7 +666,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerRestart(t *testing.T) {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Shutdown}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -708,7 +714,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerToFirst(t *testing.T) {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -729,7 +735,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerToFirst(t *testing.T) {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -776,7 +782,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerToSecond(t *testing.T) {Addrs: []resolver.Address{addrs[0]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -796,7 +802,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerToSecond(t *testing.T) {Addrs: []resolver.Address{addrs[1]}, State: connectivity.Ready}, } - if diff := cmp.Diff(wantSCStates, bal.subConnStates()); diff != "" { + if diff := cmp.Diff(wantSCStates, bal.subConnStates(), ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } @@ -1130,7 +1136,7 @@ func (s) TestPickFirstLeaf_InterleavingIPV4Preffered(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - if diff := cmp.Diff(wantAddrs, gotAddrs); diff != "" { + if diff := cmp.Diff(wantAddrs, gotAddrs, ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn creation order mismatch (-want +got):\n%s", diff) } } @@ -1174,7 +1180,7 @@ func (s) TestPickFirstLeaf_InterleavingIPv6Preffered(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - if diff := cmp.Diff(wantAddrs, gotAddrs); diff != "" { + if diff := cmp.Diff(wantAddrs, gotAddrs, ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn creation order mismatch (-want +got):\n%s", diff) } } @@ -1220,7 +1226,7 @@ func (s) TestPickFirstLeaf_InterleavingUnknownPreffered(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - if diff := cmp.Diff(wantAddrs, gotAddrs); diff != "" { + if diff := cmp.Diff(wantAddrs, gotAddrs, ignoreBalAttributesOpt); diff != "" { t.Errorf("SubConn creation order mismatch (-want +got):\n%s", diff) } } diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index c9c5b576bb0c..d7b9dc4666ee 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/endpointsharding" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/balancer/weightedroundrobin/internal" "google.golang.org/grpc/balancer/weightedtarget" "google.golang.org/grpc/connectivity" @@ -218,7 +219,9 @@ type wrrBalancer struct { } func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { - b.logger.Infof("UpdateCCS: %v", ccs) + if b.logger.V(2) { + b.logger.Infof("UpdateCCS: %v", ccs) + } cfg, ok := ccs.BalancerConfig.(*lbConfig) if !ok { return fmt.Errorf("wrr: received nil or illegal BalancerConfig (type %T): %v", ccs.BalancerConfig, ccs.BalancerConfig) @@ -232,6 +235,9 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error b.updateEndpointsLocked(ccs.ResolverState.Endpoints) b.mu.Unlock() + // Make pickfirst children use health listeners for outlier detection to + // work. + ccs.ResolverState = pickfirstleaf.EnableHealthListener(ccs.ResolverState) // This causes child to update picker inline and will thus cause inline // picker update. return b.child.UpdateClientConnState(balancer.ClientConnState{ diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index c9d496ce09b9..8f58c0030321 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -33,6 +33,7 @@ import ( "unsafe" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/buffer" @@ -72,7 +73,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba } b.logger = prefixLogger(b) b.logger.Infof("Created") - b.child = gracefulswitch.NewBalancer(b, bOpts) + b.child = synchronizingBalancerWrapper{lb: gracefulswitch.NewBalancer(b, bOpts)} go b.run() return b } @@ -152,6 +153,11 @@ type lbCfgUpdate struct { done chan struct{} } +type scHealthUpdate struct { + scw *subConnWrapper + state balancer.SubConnState +} + type outlierDetectionBalancer struct { // These fields are safe to be accessed without holding any mutex because // they are synchronized in run(), which makes these field accesses happen @@ -170,10 +176,7 @@ type outlierDetectionBalancer struct { logger *grpclog.PrefixLogger channelzParent channelz.Identifier - // childMu guards calls into child (to uphold the balancer.Balancer API - // guarantee of synchronous calls). - childMu sync.Mutex - child *gracefulswitch.Balancer + child synchronizingBalancerWrapper // mu guards access to the following fields. It also helps to synchronize // behaviors of the following events: config updates, firing of the interval @@ -190,8 +193,8 @@ type outlierDetectionBalancer struct { // which uses addrs. This balancer waits for the interval timer algorithm to // finish before making the update to the addrs map. // - // This mutex is never held at the same time as childMu (within the context - // of a single goroutine). + // This mutex is never held when calling methods on the child policy + // (within the context of a single goroutine). mu sync.Mutex addrs map[string]*addressInfo cfg *LBConfig @@ -276,13 +279,9 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt // the balancer.Balancer API, so it is guaranteed to be called in a // synchronous manner, so it cannot race with this read. if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name { - b.childMu.Lock() - err := b.child.SwitchTo(bb) - if err != nil { - b.childMu.Unlock() + if err := b.child.switchTo(bb); err != nil { return fmt.Errorf("outlier detection: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err) } - b.childMu.Unlock() } b.mu.Lock() @@ -319,12 +318,10 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt } b.mu.Unlock() - b.childMu.Lock() - err := b.child.UpdateClientConnState(balancer.ClientConnState{ + err := b.child.updateClientConnState(balancer.ClientConnState{ ResolverState: s.ResolverState, BalancerConfig: b.cfg.ChildPolicy.Config, }) - b.childMu.Unlock() done := make(chan struct{}) b.pickerUpdateCh.Put(lbCfgUpdate{ @@ -337,9 +334,7 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt } func (b *outlierDetectionBalancer) ResolverError(err error) { - b.childMu.Lock() - defer b.childMu.Unlock() - b.child.ResolverError(err) + b.child.resolverError(err) } func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { @@ -355,6 +350,7 @@ func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state if state.ConnectivityState == connectivity.Shutdown { delete(b.scWrappers, scw.SubConn) } + scw.setLatestConnectivityState(state.ConnectivityState) b.scUpdateCh.Put(&scUpdate{ scw: scw, state: state, @@ -368,9 +364,7 @@ func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state func (b *outlierDetectionBalancer) Close() { b.closed.Fire() <-b.done.Done() - b.childMu.Lock() - b.child.Close() - b.childMu.Unlock() + b.child.closeLB() b.scUpdateCh.Close() b.pickerUpdateCh.Close() @@ -383,9 +377,7 @@ func (b *outlierDetectionBalancer) Close() { } func (b *outlierDetectionBalancer) ExitIdle() { - b.childMu.Lock() - defer b.childMu.Unlock() - b.child.ExitIdle() + b.child.exitIdle() } // wrappedPicker delegates to the child policy's picker, and when the request @@ -475,10 +467,13 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal return nil, err } scw := &subConnWrapper{ - SubConn: sc, - addresses: addrs, - scUpdateCh: b.scUpdateCh, - listener: oldListener, + SubConn: sc, + addresses: addrs, + scUpdateCh: b.scUpdateCh, + listener: oldListener, + latestRawConnectivityState: balancer.SubConnState{ConnectivityState: connectivity.Idle}, + latestHealthState: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, + healthListenerEnabled: len(addrs) == 1 && pickfirstleaf.IsManagedByPickfirst(addrs[0]), } b.mu.Lock() defer b.mu.Unlock() @@ -596,34 +591,18 @@ func (b *outlierDetectionBalancer) Target() string { // if the SubConn is not ejected. func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { scw := u.scw - scw.latestState = u.state - if !scw.ejected { - if scw.listener != nil { - b.childMu.Lock() - scw.listener(u.state) - b.childMu.Unlock() - } - } + scw.clearHealthListener() + b.child.updateSubConnState(scw, u.state) +} + +func (b *outlierDetectionBalancer) handleSubConnHealthUpdate(u *scHealthUpdate) { + b.child.updateSubConnHealthState(u.scw, u.state) } // handleEjectedUpdate handles any SubConns that get ejected/unejected, and // forwards the appropriate corresponding subConnState to the child policy. func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) { - scw := u.scw - scw.ejected = u.isEjected - // If scw.latestState has never been written to will default to connectivity - // IDLE, which is fine. - stateToUpdate := scw.latestState - if u.isEjected { - stateToUpdate = balancer.SubConnState{ - ConnectivityState: connectivity.TransientFailure, - } - } - if scw.listener != nil { - b.childMu.Lock() - scw.listener(stateToUpdate) - b.childMu.Unlock() - } + b.child.handleEjectionUpdate(u) } // handleChildStateUpdate forwards the picker update wrapped in a wrapped picker @@ -696,6 +675,8 @@ func (b *outlierDetectionBalancer) run() { b.handleSubConnUpdate(u) case *ejectionUpdate: b.handleEjectedUpdate(u) + case *scHealthUpdate: + b.handleSubConnHealthUpdate(u) } case update, ok := <-b.pickerUpdateCh.Get(): if !ok { @@ -880,6 +861,69 @@ func (b *outlierDetectionBalancer) unejectAddress(addrInfo *addressInfo) { } } +// synchronizingBalancerWrapper serializes calls into balancer (to uphold the +// balancer.Balancer API guarantee of synchronous calls). It also ensures a +// consistent order of locking mutexes when using SubConn listeners to avoid +// deadlocks. +type synchronizingBalancerWrapper struct { + // mu should not be used directly from outside this struct, instead use + // methods defined on the struct. + mu sync.Mutex + lb *gracefulswitch.Balancer +} + +func (sbw *synchronizingBalancerWrapper) switchTo(builder balancer.Builder) error { + sbw.mu.Lock() + defer sbw.mu.Unlock() + return sbw.lb.SwitchTo(builder) +} + +func (sbw *synchronizingBalancerWrapper) updateClientConnState(state balancer.ClientConnState) error { + sbw.mu.Lock() + defer sbw.mu.Unlock() + return sbw.lb.UpdateClientConnState(state) +} + +func (sbw *synchronizingBalancerWrapper) resolverError(err error) { + sbw.mu.Lock() + defer sbw.mu.Unlock() + sbw.lb.ResolverError(err) +} + +func (sbw *synchronizingBalancerWrapper) closeLB() { + sbw.mu.Lock() + defer sbw.mu.Unlock() + sbw.lb.Close() +} + +func (sbw *synchronizingBalancerWrapper) exitIdle() { + sbw.mu.Lock() + defer sbw.mu.Unlock() + sbw.lb.ExitIdle() +} + +func (sbw *synchronizingBalancerWrapper) updateSubConnHealthState(scw *subConnWrapper, scs balancer.SubConnState) { + sbw.mu.Lock() + defer sbw.mu.Unlock() + scw.updateSubConnHealthState(scs) +} + +func (sbw *synchronizingBalancerWrapper) updateSubConnState(scw *subConnWrapper, scs balancer.SubConnState) { + sbw.mu.Lock() + defer sbw.mu.Unlock() + scw.updateSubConnConnectivityState(scs) +} + +func (sbw *synchronizingBalancerWrapper) handleEjectionUpdate(u *ejectionUpdate) { + sbw.mu.Lock() + defer sbw.mu.Unlock() + if u.isEjected { + u.scw.handleEjection() + } else { + u.scw.handleUnejection() + } +} + // addressInfo contains the runtime information about an address that pertains // to Outlier Detection. This struct and all of its fields is protected by // outlierDetectionBalancer.mu in the case where it is accessed through the diff --git a/xds/internal/balancer/outlierdetection/balancer_test.go b/xds/internal/balancer/outlierdetection/balancer_test.go index a80d4d9dee4f..ca07dcda0c88 100644 --- a/xds/internal/balancer/outlierdetection/balancer_test.go +++ b/xds/internal/balancer/outlierdetection/balancer_test.go @@ -31,17 +31,24 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpctest" iserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/clusterimpl" + + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" ) var ( @@ -1568,3 +1575,92 @@ func (s) TestConcurrentOperations(t *testing.T) { close(finished) wg.Wait() } + +// Test verifies that outlier detection doesn't eject subchannels created by +// the new pickfirst balancer when pickfirst is a non-leaf policy, i.e. not +// under a petiole policy. When pickfirst is not under a petiole policy, it will +// not register a health listener. pickfirst will still set the address +// attribute to disable ejection through the raw connectivity listener. When +// Outlier Detection processes a health update and sees the health listener is +// enabled but a health listener is not registered, it will drop the ejection +// update. +func (s) TestPickFirstHealthListenerDisabled(t *testing.T) { + backend := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + return nil, errors.New("some error") + }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + defer backend.Stop() + t.Logf("Started bad TestService backend at: %q", backend.Address) + + // The interval is intentionally kept very large, the interval algorithm + // will be triggered manually. + odCfg := &LBConfig{ + Interval: iserviceconfig.Duration(300 * time.Second), + BaseEjectionTime: iserviceconfig.Duration(300 * time.Second), + MaxEjectionTime: iserviceconfig.Duration(500 * time.Second), + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 0, + RequestVolume: 2, + }, + MaxEjectionPercent: 100, + ChildPolicy: &iserviceconfig.BalancerConfig{ + Name: pickfirstleaf.Name, + }, + } + + lbChan := make(chan *outlierDetectionBalancer, 1) + bf := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + bd.Data = balancer.Get(Name).Build(bd.ClientConn, bd.BuildOptions) + lbChan <- bd.Data.(*outlierDetectionBalancer) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + ccs.BalancerConfig = odCfg + return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) + }, + } + + stub.Register(t.Name(), bf) + + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name())), + } + cc, err := grpc.NewClient(backend.Address, opts...) + if err != nil { + t.Fatalf("grpc.NewClient() failed: %v", err) + } + defer cc.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testServiceClient := testgrpc.NewTestServiceClient(cc) + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + testutils.AwaitState(ctx, t, cc, connectivity.Ready) + + // Failing request should not cause ejection. + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + + // Run the interval algorithm. + select { + case <-ctx.Done(): + t.Fatal("Timed out waiting for the outlier detection LB policy to be built.") + case od := <-lbChan: + od.intervalTimerAlgorithm() + } + + shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer shortCancel() + testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Ready) +} diff --git a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go index 35c4d75301b3..ec087eb08b53 100644 --- a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go +++ b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go @@ -28,8 +28,10 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc" + "google.golang.org/grpc/balancer/weightedroundrobin" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/peer" @@ -43,7 +45,18 @@ import ( _ "google.golang.org/grpc/xds/internal/balancer/outlierdetection" // To register helper functions which register/unregister Outlier Detection LB Policy. ) -var defaultTestTimeout = 5 * time.Second +var ( + defaultTestTimeout = 5 * time.Second + leafPolicyName = "round_robin" +) + +func init() { + // Test the health listener code path for ejection when the experimental + // pickfirst is enabled. + if envconfig.NewPickFirstEnabled { + leafPolicyName = weightedroundrobin.Name + } +} type s struct { grpctest.Tester @@ -155,50 +168,50 @@ func (s) TestOutlierDetectionAlgorithmsE2E(t *testing.T) { }{ { name: "Success Rate Algorithm", - odscJSON: ` -{ - "loadBalancingConfig": [ - { - "outlier_detection_experimental": { - "interval": "0.050s", - "baseEjectionTime": "0.100s", - "maxEjectionTime": "300s", - "maxEjectionPercent": 33, - "successRateEjection": { - "stdevFactor": 50, - "enforcementPercentage": 100, - "minimumHosts": 3, - "requestVolume": 5 - }, - "childPolicy": [{"round_robin": {}}] - } - } - ] -}`, + odscJSON: fmt.Sprintf(` + { + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": "0.050s", + "baseEjectionTime": "0.100s", + "maxEjectionTime": "300s", + "maxEjectionPercent": 33, + "successRateEjection": { + "stdevFactor": 50, + "enforcementPercentage": 100, + "minimumHosts": 3, + "requestVolume": 5 + }, + "childPolicy": [{"%s": {}}] + } + } + ] + }`, leafPolicyName), }, { name: "Failure Percentage Algorithm", - odscJSON: ` -{ - "loadBalancingConfig": [ - { - "outlier_detection_experimental": { - "interval": "0.050s", - "baseEjectionTime": "0.100s", - "maxEjectionTime": "300s", - "maxEjectionPercent": 33, - "failurePercentageEjection": { - "threshold": 50, - "enforcementPercentage": 100, - "minimumHosts": 3, - "requestVolume": 5 - }, - "childPolicy": [{"round_robin": {}} - ] - } - } - ] -}`, + odscJSON: fmt.Sprintf(` + { + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": "0.050s", + "baseEjectionTime": "0.100s", + "maxEjectionTime": "300s", + "maxEjectionPercent": 33, + "failurePercentageEjection": { + "threshold": 50, + "enforcementPercentage": 100, + "minimumHosts": 3, + "requestVolume": 5 + }, + "childPolicy": [{"%s": {}} + ] + } + } + ] + }`, leafPolicyName), }, } for _, test := range tests { @@ -273,20 +286,20 @@ func (s) TestNoopConfiguration(t *testing.T) { mr := manual.NewBuilderWithScheme("od-e2e") defer mr.Close() - noopODServiceConfigJSON := ` -{ - "loadBalancingConfig": [ - { - "outlier_detection_experimental": { - "interval": "0.050s", - "baseEjectionTime": "0.100s", - "maxEjectionTime": "300s", - "maxEjectionPercent": 33, - "childPolicy": [{"round_robin": {}}] - } - } - ] -}` + noopODServiceConfigJSON := fmt.Sprintf(` + { + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": "0.050s", + "baseEjectionTime": "0.100s", + "maxEjectionTime": "300s", + "maxEjectionPercent": 33, + "childPolicy": [{"%s": {}}] + } + } + ] + }`, leafPolicyName) sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(noopODServiceConfigJSON) // The full list of addresses. fullAddresses := []resolver.Address{ @@ -321,26 +334,26 @@ func (s) TestNoopConfiguration(t *testing.T) { // specifies to count RPC's and eject upstreams. Due to the balancer no // longer being a noop, it should eject any unhealthy addresses as specified // by the failure percentage portion of the configuration. - countingODServiceConfigJSON := ` -{ - "loadBalancingConfig": [ - { - "outlier_detection_experimental": { - "interval": "0.050s", - "baseEjectionTime": "0.100s", - "maxEjectionTime": "300s", - "maxEjectionPercent": 33, - "failurePercentageEjection": { - "threshold": 50, - "enforcementPercentage": 100, - "minimumHosts": 3, - "requestVolume": 5 - }, - "childPolicy": [{"round_robin": {}}] - } - } - ] -}` + countingODServiceConfigJSON := fmt.Sprintf(` + { + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": "0.050s", + "baseEjectionTime": "0.100s", + "maxEjectionTime": "300s", + "maxEjectionPercent": 33, + "failurePercentageEjection": { + "threshold": 50, + "enforcementPercentage": 100, + "minimumHosts": 3, + "requestVolume": 5 + }, + "childPolicy": [{"%s": {}}] + } + } + ] + }`, leafPolicyName) sc = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(countingODServiceConfigJSON) mr.UpdateState(resolver.State{ diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go index 0fa422d8f262..7d710fde1b2a 100644 --- a/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -19,9 +19,11 @@ package outlierdetection import ( "fmt" + "sync" "unsafe" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/resolver" ) @@ -31,23 +33,54 @@ import ( // whether or not this SubConn is ejected. type subConnWrapper struct { balancer.SubConn - listener func(balancer.SubConnState) - // addressInfo is a pointer to the subConnWrapper's corresponding address - // map entry, if the map entry exists. + // map entry, if the map entry exists. It is accessed atomically. addressInfo unsafe.Pointer // *addressInfo + // The following fields are set during object creation and read-only after + // that. + + listener func(balancer.SubConnState) + // healthListenerEnabled indicates whether the leaf LB policy is using a + // generic health listener. When enabled, ejection updates are sent via the + // health listener instead of the connectivity listener. Once Dualstack + // changes are complete, all SubConns will be created by pickfirst which + // uses the health listener. + // TODO: https://github.com/grpc/grpc-go/issues/7915 - Once Dualstack + // changes are complete, all SubConns will be created by pick_first and + // outlier detection will only use the health listener for ejection and + // this field can be removed. + healthListenerEnabled bool + + scUpdateCh *buffer.Unbounded + + // The following fields are only referenced in the context of a work + // serializing buffer and don't need to be protected by a mutex. + // These two pieces of state will reach eventual consistency due to sync in // run(), and child will always have the correctly updated SubConnState. - // latestState is the latest state update from the underlying SubConn. This - // is used whenever a SubConn gets unejected. - latestState balancer.SubConnState - ejected bool - scUpdateCh *buffer.Unbounded + ejected bool // addresses is the list of address(es) this SubConn was created with to // help support any change in address(es) addresses []resolver.Address + // latestHealthState is tracked to update the child policy during + // unejection. + latestHealthState balancer.SubConnState + // latestRawConnectivityState is tracked to update the child policy during + // unejection. + latestRawConnectivityState balancer.SubConnState + + // Access to the following fields are protected by a mutex. These fields + // should not be accessed from outside this file, instead use methods + // defined on the struct. + mu sync.Mutex + healthListener func(balancer.SubConnState) + // latestReceivedConnectivityState is the SubConn's most recent connectivity + // state received. It may not be delivered to the child balancer yet. It is + // used to ensure a health listener is registered with the SubConn only when + // the SubConn is READY. + latestReceivedConnectivityState connectivity.State } // eject causes the wrapper to report a state update with the TRANSIENT_FAILURE @@ -72,3 +105,108 @@ func (scw *subConnWrapper) uneject() { func (scw *subConnWrapper) String() string { return fmt.Sprintf("%+v", scw.addresses) } + +func (scw *subConnWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) { + // gRPC currently supports two mechanisms that provide a health signal for + // a connection: client-side health checking and outlier detection. Earlier + // both these mechanisms signaled unhealthiness by setting the subchannel + // state to TRANSIENT_FAILURE. As part of the dualstack changes to make + // pick_first the universal leaf policy (see A61), both these mechanisms + // started using the new health listener to make health signal visible to + // the petiole policies without affecting the underlying connectivity + // management of the pick_first policy + if !scw.healthListenerEnabled { + logger.Errorf("Health listener unexpectedly registered on SubConn %v.", scw) + return + } + + scw.mu.Lock() + defer scw.mu.Unlock() + + if scw.latestReceivedConnectivityState != connectivity.Ready { + return + } + scw.healthListener = listener + if listener == nil { + scw.SubConn.RegisterHealthListener(nil) + return + } + + scw.SubConn.RegisterHealthListener(func(scs balancer.SubConnState) { + scw.scUpdateCh.Put(&scHealthUpdate{ + scw: scw, + state: scs, + }) + }) +} + +// updateSubConnHealthState stores the latest health state for unejection and +// sends updates the health listener. +func (scw *subConnWrapper) updateSubConnHealthState(scs balancer.SubConnState) { + scw.latestHealthState = scs + if scw.ejected { + return + } + scw.mu.Lock() + defer scw.mu.Unlock() + if scw.healthListener != nil { + scw.healthListener(scs) + } +} + +// updateSubConnConnectivityState stores the latest connectivity state for +// unejection and updates the raw connectivity listener. +func (scw *subConnWrapper) updateSubConnConnectivityState(scs balancer.SubConnState) { + scw.latestRawConnectivityState = scs + // If the raw connectivity listener is used for ejection, and the SubConn is + // ejected, don't send the update. + if scw.ejected && !scw.healthListenerEnabled { + return + } + if scw.listener != nil { + scw.listener(scs) + } +} + +func (scw *subConnWrapper) clearHealthListener() { + scw.mu.Lock() + defer scw.mu.Unlock() + scw.healthListener = nil +} + +func (scw *subConnWrapper) handleUnejection() { + scw.ejected = false + if !scw.healthListenerEnabled { + // If scw.latestRawConnectivityState has never been written to will + // default to connectivity IDLE, which is fine. + scw.updateSubConnConnectivityState(scw.latestRawConnectivityState) + return + } + // If scw.latestHealthState has never been written to will use the health + // state CONNECTING set during object creation. + scw.updateSubConnHealthState(scw.latestHealthState) +} + +func (scw *subConnWrapper) handleEjection() { + scw.ejected = true + stateToUpdate := balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + } + if !scw.healthListenerEnabled { + if scw.listener != nil { + scw.listener(stateToUpdate) + } + return + } + scw.mu.Lock() + defer scw.mu.Unlock() + if scw.healthListener != nil { + scw.healthListener(stateToUpdate) + } +} + +func (scw *subConnWrapper) setLatestConnectivityState(state connectivity.State) { + scw.mu.Lock() + defer scw.mu.Unlock() + scw.latestReceivedConnectivityState = state +}