diff --git a/balancer/balancer.go b/balancer/balancer.go index b78401eb4cf2..668e1f1e38a5 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -115,6 +115,13 @@ type SubConn interface { // creates a new one and returns it. Returns a close function which must // be called when the Producer is no longer needed. GetOrBuildProducer(ProducerBuilder) (p Producer, close func()) + // Shutdown shuts down the SubConn gracefully. Any started RPCs will be + // allowed to complete. No future calls should be made on the SubConn. + // One final state update will be delivered to the StateListener (or + // UpdateSubConnState; deprecated) with ConnectivityState of Shutdown to + // indicate the shutdown operation. This may be delivered before + // in-progress RPCs are complete and the actual connection is closed. + Shutdown() } // NewSubConnOptions contains options to create new SubConn. @@ -161,6 +168,8 @@ type ClientConn interface { NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error) // RemoveSubConn removes the SubConn from ClientConn. // The SubConn will be shutdown. + // + // Deprecated: use SubConn.Shutdown instead. RemoveSubConn(SubConn) // UpdateAddresses updates the addresses used in the passed in SubConn. // gRPC checks if the currently connected address is still in the new list. diff --git a/balancer/base/balancer_test.go b/balancer/base/balancer_test.go index b50abf8526e6..7bf4d92f8f0a 100644 --- a/balancer/base/balancer_test.go +++ b/balancer/base/balancer_test.go @@ -44,6 +44,8 @@ func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {} func (sc *testSubConn) Connect() {} +func (sc *testSubConn) Shutdown() {} + func (sc *testSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) { return nil, nil } diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index 261ddb5c8b45..4d616a98b0c3 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -319,7 +319,13 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State { return connectivity.TransientFailure } +// UpdateSubConnState is unused; NewSubConn's options always specifies +// updateSubConnState as the listener. func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) { + logger.Errorf("grpclb: UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs) +} + +func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) { s := scs.ConnectivityState if logger.V(2) { logger.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s) @@ -373,8 +379,13 @@ func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop if forceRegeneratePicker || (lb.state != oldAggrState) { lb.regeneratePicker(resetDrop) } + var cc balancer.ClientConn = lb.cc + if lb.usePickFirst { + // Bypass the caching layer that would wrap the picker. + cc = lb.cc.ClientConn + } - lb.cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker}) + cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker}) } // fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index e56006d7131a..6c99a6d788f6 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -126,7 +126,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback if oldUsePickFirst { // If old SubConn were created for pickfirst, bypass cache and // remove directly. - lb.cc.cc.RemoveSubConn(sc) + lb.cc.ClientConn.RemoveSubConn(sc) } else { lb.cc.RemoveSubConn(sc) } @@ -144,16 +144,17 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback } if sc != nil { if len(backendAddrs) == 0 { - lb.cc.cc.RemoveSubConn(sc) + lb.cc.ClientConn.RemoveSubConn(sc) delete(lb.subConns, scKey) return } - lb.cc.cc.UpdateAddresses(sc, backendAddrs) + lb.cc.ClientConn.UpdateAddresses(sc, backendAddrs) sc.Connect() return } + opts.StateListener = func(scs balancer.SubConnState) { lb.updateSubConnState(sc, scs) } // This bypasses the cc wrapper with SubConn cache. - sc, err := lb.cc.cc.NewSubConn(backendAddrs, opts) + sc, err := lb.cc.ClientConn.NewSubConn(backendAddrs, opts) if err != nil { logger.Warningf("grpclb: failed to create new SubConn: %v", err) return @@ -176,6 +177,8 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback if _, ok := lb.subConns[addrWithoutAttrs]; !ok { // Use addrWithMD to create the SubConn. + var sc balancer.SubConn + opts.StateListener = func(scs balancer.SubConnState) { lb.updateSubConnState(sc, scs) } sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts) if err != nil { logger.Warningf("grpclb: failed to create new SubConn: %v", err) @@ -419,7 +422,7 @@ func (ccw *remoteBalancerCCWrapper) watchRemoteBalancer() { } } // Trigger a re-resolve when the stream errors. - ccw.lb.cc.cc.ResolveNow(resolver.ResolveNowOptions{}) + ccw.lb.cc.ClientConn.ResolveNow(resolver.ResolveNowOptions{}) ccw.lb.mu.Lock() ccw.lb.remoteBalancerConnected = false diff --git a/balancer/grpclb/grpclb_util.go b/balancer/grpclb/grpclb_util.go index 373f04b98d37..6e01bf803d6a 100644 --- a/balancer/grpclb/grpclb_util.go +++ b/balancer/grpclb/grpclb_util.go @@ -93,9 +93,10 @@ const subConnCacheTime = time.Second * 10 // lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache. // SubConns will be kept in cache for subConnCacheTime before being removed. // -// Its new and remove methods are updated to do cache first. +// Its NewSubconn and SubConn.Shutdown methods are updated to do cache first. type lbCacheClientConn struct { - cc balancer.ClientConn + balancer.ClientConn + timeout time.Duration mu sync.Mutex @@ -113,7 +114,7 @@ type subConnCacheEntry struct { func newLBCacheClientConn(cc balancer.ClientConn) *lbCacheClientConn { return &lbCacheClientConn{ - cc: cc, + ClientConn: cc, timeout: subConnCacheTime, subConnCache: make(map[resolver.Address]*subConnCacheEntry), subConnToAddr: make(map[balancer.SubConn]resolver.Address), @@ -137,16 +138,27 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer return entry.sc, nil } - scNew, err := ccc.cc.NewSubConn(addrs, opts) + scNew, err := ccc.ClientConn.NewSubConn(addrs, opts) if err != nil { return nil, err } + scNew = &lbCacheSubConn{SubConn: scNew, ccc: ccc} ccc.subConnToAddr[scNew] = addrWithoutAttrs return scNew, nil } func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) { + sc.Shutdown() +} + +type lbCacheSubConn struct { + balancer.SubConn + ccc *lbCacheClientConn +} + +func (sc *lbCacheSubConn) Shutdown() { + ccc := sc.ccc ccc.mu.Lock() defer ccc.mu.Unlock() addr, ok := ccc.subConnToAddr[sc] @@ -160,7 +172,7 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) { // same address, and those SubConns are all removed. We remove sc // immediately here. delete(ccc.subConnToAddr, sc) - ccc.cc.RemoveSubConn(sc) + sc.SubConn.Shutdown() } return } @@ -176,7 +188,7 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) { if entry.abortDeleting { return } - ccc.cc.RemoveSubConn(sc) + sc.SubConn.Shutdown() delete(ccc.subConnToAddr, sc) delete(ccc.subConnCache, addr) }) @@ -195,14 +207,28 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) { } func (ccc *lbCacheClientConn) UpdateState(s balancer.State) { - ccc.cc.UpdateState(s) + s.Picker = &lbCachePicker{Picker: s.Picker} + ccc.ClientConn.UpdateState(s) } func (ccc *lbCacheClientConn) close() { ccc.mu.Lock() + defer ccc.mu.Unlock() // Only cancel all existing timers. There's no need to remove SubConns. for _, entry := range ccc.subConnCache { entry.cancel() } - ccc.mu.Unlock() +} + +type lbCachePicker struct { + balancer.Picker +} + +func (cp *lbCachePicker) Pick(i balancer.PickInfo) (balancer.PickResult, error) { + res, err := cp.Picker.Pick(i) + if err != nil { + return res, err + } + res.SubConn = res.SubConn.(*lbCacheSubConn).SubConn + return res, nil } diff --git a/balancer/grpclb/grpclb_util_test.go b/balancer/grpclb/grpclb_util_test.go index 463d7ba23c75..31d6eaa55db5 100644 --- a/balancer/grpclb/grpclb_util_test.go +++ b/balancer/grpclb/grpclb_util_test.go @@ -30,6 +30,13 @@ import ( type mockSubConn struct { balancer.SubConn + mcc *mockClientConn +} + +func (msc *mockSubConn) Shutdown() { + msc.mcc.mu.Lock() + defer msc.mcc.mu.Unlock() + delete(msc.mcc.subConns, msc) } type mockClientConn struct { @@ -46,7 +53,7 @@ func newMockClientConn() *mockClientConn { } func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { - sc := &mockSubConn{} + sc := &mockSubConn{mcc: mcc} mcc.mu.Lock() defer mcc.mu.Unlock() mcc.subConns[sc] = addrs[0] @@ -54,9 +61,7 @@ func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.Ne } func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) { - mcc.mu.Lock() - defer mcc.mu.Unlock() - delete(mcc.subConns, sc) + sc.Shutdown() } const testCacheTimeout = 100 * time.Millisecond diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index b1c910f70b36..8b3e819d4351 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -301,6 +301,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer return nil, err } acbw := &acBalancerWrapper{ + ccb: ccb, ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer), stateListener: opts.StateListener, @@ -370,7 +371,8 @@ func (ccb *ccBalancerWrapper) Target() string { // acBalancerWrapper is a wrapper on top of ac for balancers. // It implements balancer.SubConn interface. type acBalancerWrapper struct { - ac *addrConn // read-only + ac *addrConn // read-only + ccb *ccBalancerWrapper // read-only stateListener func(balancer.SubConnState) mu sync.Mutex @@ -389,6 +391,10 @@ func (acbw *acBalancerWrapper) Connect() { go acbw.ac.connect() } +func (acbw *acBalancerWrapper) Shutdown() { + acbw.ccb.RemoveSubConn(acbw) +} + // NewStream begins a streaming RPC on the addrConn. If the addrConn is not // ready, blocks until it is or ctx expires. Returns an error when the context // expires or the addrConn is shut down. diff --git a/internal/balancer/gracefulswitch/gracefulswitch.go b/internal/balancer/gracefulswitch/gracefulswitch.go index 65a59201c23a..dad905cffdbf 100644 --- a/internal/balancer/gracefulswitch/gracefulswitch.go +++ b/internal/balancer/gracefulswitch/gracefulswitch.go @@ -364,13 +364,7 @@ func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) { } func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) { - bw.gsb.mu.Lock() - if !bw.gsb.balancerCurrentOrPending(bw) { - bw.gsb.mu.Unlock() - return - } - bw.gsb.mu.Unlock() - bw.gsb.cc.RemoveSubConn(sc) + sc.Shutdown() } func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { diff --git a/internal/testutils/balancer.go b/internal/testutils/balancer.go index 0301c4b38462..73aef813f3eb 100644 --- a/internal/testutils/balancer.go +++ b/internal/testutils/balancer.go @@ -38,6 +38,7 @@ type testingLogger interface { // TestSubConn implements the SubConn interface, to be used in tests. type TestSubConn struct { + tcc *TestClientConn // the CC that owns this SubConn id string ConnectCh chan struct{} stateListener func(balancer.SubConnState) @@ -81,6 +82,16 @@ func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) { } } +// Shutdown pushes the SubConn to the RemoveSubConn channel in the parent +// TestClientConn. +func (tsc *TestSubConn) Shutdown() { + tsc.tcc.logger.Logf("SubConn %s: Shutdown", tsc) + select { + case tsc.tcc.RemoveSubConnCh <- tsc: + default: + } +} + // String implements stringer to print human friendly error message. func (tsc *TestSubConn) String() string { return tsc.id @@ -121,6 +132,7 @@ func NewTestClientConn(t *testing.T) *TestClientConn { // NewSubConn creates a new SubConn. func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) { sc := &TestSubConn{ + tcc: tcc, id: fmt.Sprintf("sc%d", tcc.subConnIdx), ConnectCh: make(chan struct{}, 1), stateListener: o.StateListener, @@ -143,11 +155,7 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon // RemoveSubConn removes the SubConn. func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) { - tcc.logger.Logf("testClientConn: RemoveSubConn(%s)", sc) - select { - case tcc.RemoveSubConnCh <- sc.(*TestSubConn): - default: - } + sc.(*TestSubConn).Shutdown() } // UpdateAddresses updates the addresses on the SubConn. diff --git a/test/balancer_test.go b/test/balancer_test.go index 7109b20d2e1e..e769d304df2a 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -1061,3 +1061,59 @@ func (s) TestBalancerProducerHonorsContext(t *testing.T) { t.Fatalf("RPC error: %v; want status.Code(err)=%v", err, codes.Canceled) } } + +// TestSubConnShutdown confirms that the Shutdown method on subconns properly +// initiates their shutdown. +func (s) TestSubConnShutdown(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + gotShutdown := grpcsync.NewEvent() + + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + var sc balancer.SubConn + opts := balancer.NewSubConnOptions{ + StateListener: func(scs balancer.SubConnState) { + switch scs.ConnectivityState { + case connectivity.Connecting: + // Ignored. + case connectivity.Ready: + sc.Shutdown() + case connectivity.Shutdown: + gotShutdown.Fire() + default: + t.Errorf("got unexpected state %q in listener", scs.ConnectivityState) + } + }, + } + sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, opts) + if err != nil { + return err + } + sc.Connect() + // Report the state as READY to unblock ss.Start(), which waits for ready. + bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready}) + return nil + }, + } + + const testBalName = "shutdown-test-balancer" + stub.Register(testBalName, bf) + t.Logf("Registered balancer %s...", testBalName) + + ss := &stubserver.StubServer{} + if err := ss.Start(nil, grpc.WithDefaultServiceConfig( + fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, testBalName), + )); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + select { + case <-gotShutdown.Done(): + // Success + case <-ctx.Done(): + t.Fatalf("Timed out waiting for gotShutdown to be fired.") + } +} diff --git a/test/roundrobin_test.go b/test/roundrobin_test.go index 92fed10ffed0..a23f2058bedc 100644 --- a/test/roundrobin_test.go +++ b/test/roundrobin_test.go @@ -225,8 +225,8 @@ func (s) TestRoundRobin_AllServersDown(t *testing.T) { // Failfast RPCs should fail with Unavailable. client := testgrpc.NewTestServiceClient(cc) - if _, err := client.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.Unavailable { - return + if _, err := client.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { + t.Fatalf("EmptyCall got err: %v; want Unavailable", err) } } diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 2c18fbe1a973..67521be91ebb 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -403,19 +403,7 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer } func (b *clusterImplBalancer) RemoveSubConn(sc balancer.SubConn) { - scw, ok := sc.(*scWrapper) - if !ok { - b.ClientConn.RemoveSubConn(sc) - return - } - // Remove the original SubConn from the parent ClientConn. - // - // Note that we don't remove this SubConn from the scWrappers map. We will - // need it to forward the final SubConn state Shutdown to the child policy. - // - // This entry is kept in the map until it's state is changes to Shutdown, - // and will be deleted in UpdateSubConnState(). - b.ClientConn.RemoveSubConn(scw.SubConn) + sc.Shutdown() } func (b *clusterImplBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index 46903bb7acc9..12509fe599a3 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -502,14 +502,7 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal } func (b *outlierDetectionBalancer) RemoveSubConn(sc balancer.SubConn) { - scw, ok := sc.(*subConnWrapper) - if !ok { // Shouldn't happen - return - } - // Remove the wrapped SubConn from the parent Client Conn. We don't remove - // from map entry until we get a Shutdown state for the SubConn, as we need - // that data to forward that state down. - b.cc.RemoveSubConn(scw.SubConn) + sc.Shutdown() } // appendIfPresent appends the scw to the address, if the address is present in