Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer: add SubConn.Shutdown; deprecate Balancer.RemoveSubConn #6493

Merged
merged 2 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ type SubConn interface {
// creates a new one and returns it. Returns a close function which must
// be called when the Producer is no longer needed.
GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
// Shutdown shuts down the SubConn gracefully. Any started RPCs will be
// allowed to complete. No future calls should be made on the SubConn.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No future calls should be made on the SubConn

Do we handle calls on a closed subConn gracefully?
From what I see UpdateAddresses and Connect become no-ops if the state is SHUTDOWN. It would have been nicer if these methods returned an error though. At least Connect() could have returned an error, as the underlying addrConn.connect() already returns an error if the addrConn is shutdown, but we drop that error in our acBalancerWrapper.

Would it be better to say that future calls will be no-ops instead of saying future calls shouldn't be made?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t would have been nicer if these methods returned an error though.

UpdateAddresses is going away, but note that adding error return values to things typically makes the calling code more complex and the API harder to reason about.

Would it be better to say that future calls will be no-ops instead of saying future calls shouldn't be made?

I'm of the opinion that telling people not to do things is better than stating what will happen if they do things that are not sensible. This is more straightforward and also gives us flexibility in how we decide to behave.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that adding error return values to things typically makes the calling code more complex and the API harder to reason about

It feels completely logical for an operation like Connect to return something to its caller. But I do agree that in this case though, the caller could (and does) rely on subsequent state updates to figure out how things went. But for UpdateAddresses, there is no clean way to let the caller know that the update did not happen. Good that it is going away :)

// One final state update will be delivered to the StateListener (or
// UpdateSubConnState; deprecated) with ConnectivityState of Shutdown to
// indicate the shutdown operation. This will be delivered before
// in-progress operations are complete and the actual connection is closed.
easwars marked this conversation as resolved.
Show resolved Hide resolved
Shutdown()
}

// NewSubConnOptions contains options to create new SubConn.
Expand Down Expand Up @@ -161,6 +168,8 @@ type ClientConn interface {
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
// RemoveSubConn removes the SubConn from ClientConn.
// The SubConn will be shutdown.
//
// Deprecated: use SubConn.Shutdown instead.
RemoveSubConn(SubConn)
// UpdateAddresses updates the addresses used in the passed in SubConn.
// gRPC checks if the currently connected address is still in the new list.
Expand Down
2 changes: 2 additions & 0 deletions balancer/base/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}

func (sc *testSubConn) Connect() {}

func (sc *testSubConn) Shutdown() {}

func (sc *testSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, nil
}
Expand Down
13 changes: 11 additions & 2 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,11 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
return connectivity.TransientFailure
}

func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
// UpdateSubConnState is unused; NewSubConn's options always specifies
// updateSubConnState as the listener.
func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {}
easwars marked this conversation as resolved.
Show resolved Hide resolved

func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
s := scs.ConnectivityState
if logger.V(2) {
logger.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
Expand Down Expand Up @@ -373,8 +377,13 @@ func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop
if forceRegeneratePicker || (lb.state != oldAggrState) {
lb.regeneratePicker(resetDrop)
}
var cc balancer.ClientConn = lb.cc
if lb.usePickFirst {
// Bypass the caching layer that would wrap the picker.
cc = lb.cc.cc
}

lb.cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
}

// fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
Expand Down
3 changes: 3 additions & 0 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
sc.Connect()
return
}
opts.StateListener = func(scs balancer.SubConnState) { lb.updateSubConnState(sc, scs) }
// This bypasses the cc wrapper with SubConn cache.
sc, err := lb.cc.cc.NewSubConn(backendAddrs, opts)
if err != nil {
Expand All @@ -176,6 +177,8 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback

if _, ok := lb.subConns[addrWithoutAttrs]; !ok {
// Use addrWithMD to create the SubConn.
var sc balancer.SubConn
opts.StateListener = func(scs balancer.SubConnState) { lb.updateSubConnState(sc, scs) }
sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts)
if err != nil {
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
Expand Down
36 changes: 32 additions & 4 deletions balancer/grpclb/grpclb_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ const subConnCacheTime = time.Second * 10
// lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache.
// SubConns will be kept in cache for subConnCacheTime before being removed.
//
// Its new and remove methods are updated to do cache first.
// Its NewSubconn and SubConn.Shutdown methods are updated to do cache first.
type lbCacheClientConn struct {
balancer.ClientConn

cc balancer.ClientConn
easwars marked this conversation as resolved.
Show resolved Hide resolved
timeout time.Duration

Expand All @@ -113,6 +115,7 @@ type subConnCacheEntry struct {

func newLBCacheClientConn(cc balancer.ClientConn) *lbCacheClientConn {
return &lbCacheClientConn{
ClientConn: cc,
cc: cc,
timeout: subConnCacheTime,
subConnCache: make(map[resolver.Address]*subConnCacheEntry),
Expand Down Expand Up @@ -141,12 +144,23 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer
if err != nil {
return nil, err
}
scNew = &lbCacheSubConn{SubConn: scNew, ccc: ccc}

ccc.subConnToAddr[scNew] = addrWithoutAttrs
return scNew, nil
}

func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
}

type lbCacheSubConn struct {
balancer.SubConn
ccc *lbCacheClientConn
}

func (sc *lbCacheSubConn) Shutdown() {
ccc := sc.ccc
ccc.mu.Lock()
defer ccc.mu.Unlock()
addr, ok := ccc.subConnToAddr[sc]
Expand All @@ -160,7 +174,7 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
// same address, and those SubConns are all removed. We remove sc
// immediately here.
delete(ccc.subConnToAddr, sc)
ccc.cc.RemoveSubConn(sc)
sc.SubConn.Shutdown()
}
return
}
Expand All @@ -176,7 +190,7 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
if entry.abortDeleting {
return
}
ccc.cc.RemoveSubConn(sc)
sc.SubConn.Shutdown()
delete(ccc.subConnToAddr, sc)
delete(ccc.subConnCache, addr)
})
Expand All @@ -195,14 +209,28 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
}

func (ccc *lbCacheClientConn) UpdateState(s balancer.State) {
s.Picker = &lbCachePicker{Picker: s.Picker}
ccc.cc.UpdateState(s)
}

func (ccc *lbCacheClientConn) close() {
ccc.mu.Lock()
defer ccc.mu.Unlock()
// Only cancel all existing timers. There's no need to remove SubConns.
for _, entry := range ccc.subConnCache {
entry.cancel()
}
ccc.mu.Unlock()
}

type lbCachePicker struct {
balancer.Picker
}

func (cp *lbCachePicker) Pick(i balancer.PickInfo) (balancer.PickResult, error) {
res, err := cp.Picker.Pick(i)
if err != nil {
return res, err
}
res.SubConn = res.SubConn.(*lbCacheSubConn).SubConn
return res, nil
}
13 changes: 9 additions & 4 deletions balancer/grpclb/grpclb_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ import (

type mockSubConn struct {
balancer.SubConn
mcc *mockClientConn
}

func (msc *mockSubConn) Shutdown() {
msc.mcc.mu.Lock()
defer msc.mcc.mu.Unlock()
delete(msc.mcc.subConns, msc)
}

type mockClientConn struct {
Expand All @@ -46,17 +53,15 @@ func newMockClientConn() *mockClientConn {
}

func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc := &mockSubConn{}
sc := &mockSubConn{mcc: mcc}
mcc.mu.Lock()
defer mcc.mu.Unlock()
mcc.subConns[sc] = addrs[0]
return sc, nil
}

func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
mcc.mu.Lock()
defer mcc.mu.Unlock()
delete(mcc.subConns, sc)
sc.Shutdown()
}

const testCacheTimeout = 100 * time.Millisecond
Expand Down
8 changes: 7 additions & 1 deletion balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
return nil, err
}
acbw := &acBalancerWrapper{
ccb: ccb,
ac: ac,
producers: make(map[balancer.ProducerBuilder]*refCountedProducer),
stateListener: opts.StateListener,
Expand Down Expand Up @@ -372,7 +373,8 @@ func (ccb *ccBalancerWrapper) Target() string {
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
ac *addrConn // read-only
ac *addrConn // read-only
ccb *ccBalancerWrapper // read-only
stateListener func(balancer.SubConnState)

mu sync.Mutex
Expand All @@ -391,6 +393,10 @@ func (acbw *acBalancerWrapper) Connect() {
go acbw.ac.connect()
}

func (acbw *acBalancerWrapper) Shutdown() {
acbw.ccb.RemoveSubConn(acbw)
}

// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
// ready, blocks until it is or ctx expires. Returns an error when the context
// expires or the addrConn is shut down.
Expand Down
8 changes: 1 addition & 7 deletions internal/balancer/gracefulswitch/gracefulswitch.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,13 +364,7 @@ func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
}

func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
bw.gsb.mu.Lock()
if !bw.gsb.balancerCurrentOrPending(bw) {
bw.gsb.mu.Unlock()
return
}
bw.gsb.mu.Unlock()
bw.gsb.cc.RemoveSubConn(sc)
sc.Shutdown()
}

func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
Expand Down
18 changes: 13 additions & 5 deletions internal/testutils/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type testingLogger interface {

// TestSubConn implements the SubConn interface, to be used in tests.
type TestSubConn struct {
tcc *TestClientConn // the CC that owns this SubConn
id string
ConnectCh chan struct{}
stateListener func(balancer.SubConnState)
Expand Down Expand Up @@ -66,6 +67,16 @@ func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) {
}
}

// Shutdown pushes the SubConn to the RemoveSubConn channel in the parent
// TestClientConn.
func (tsc *TestSubConn) Shutdown() {
tsc.tcc.logger.Logf("SubConn %s: Shutdown", tsc)
easwars marked this conversation as resolved.
Show resolved Hide resolved
select {
case tsc.tcc.RemoveSubConnCh <- tsc:
default:
}
}

// String implements stringer to print human friendly error message.
func (tsc *TestSubConn) String() string {
return tsc.id
Expand Down Expand Up @@ -106,6 +117,7 @@ func NewTestClientConn(t *testing.T) *TestClientConn {
// NewSubConn creates a new SubConn.
func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc := &TestSubConn{
tcc: tcc,
id: fmt.Sprintf("sc%d", tcc.subConnIdx),
ConnectCh: make(chan struct{}, 1),
stateListener: o.StateListener,
Expand All @@ -127,11 +139,7 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon

// RemoveSubConn removes the SubConn.
func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) {
tcc.logger.Logf("testClientConn: RemoveSubConn(%s)", sc)
select {
case tcc.RemoveSubConnCh <- sc.(*TestSubConn):
default:
}
sc.(*TestSubConn).Shutdown()
}

// UpdateAddresses updates the addresses on the SubConn.
Expand Down
56 changes: 56 additions & 0 deletions test/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,3 +1061,59 @@ func (s) TestBalancerProducerHonorsContext(t *testing.T) {
t.Fatalf("RPC error: %v; want status.Code(err)=%v", err, codes.Canceled)
}
}

// TestSubConnShutdown confirms that the Shutdown method on subconns properly
// initiates their shutdown.
func (s) TestSubConnShutdown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

gotShutdown := grpcsync.NewEvent()

bf := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
var sc balancer.SubConn
opts := balancer.NewSubConnOptions{
StateListener: func(scs balancer.SubConnState) {
switch scs.ConnectivityState {
case connectivity.Connecting:
// Ignored.
case connectivity.Ready:
sc.Shutdown()
case connectivity.Shutdown:
gotShutdown.Fire()
default:
t.Errorf("got unexpected state %q in listener", scs.ConnectivityState)
}
},
}
sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, opts)
if err != nil {
return err
}
sc.Connect()
// Report the state as READY to unblock ss.Start(), which waits for ready.
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready})
return nil
},
}

const testBalName = "shutdown-test-balancer"
stub.Register(testBalName, bf)
t.Logf("Registered balancer %s...", testBalName)

ss := &stubserver.StubServer{}
if err := ss.Start(nil, grpc.WithDefaultServiceConfig(
fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, testBalName),
)); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()

select {
case <-gotShutdown.Done():
// Success
case <-ctx.Done():
t.Fatalf("Timed out waiting for gotShutdown to be fired.")
}
}
14 changes: 1 addition & 13 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,19 +403,7 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer
}

func (b *clusterImplBalancer) RemoveSubConn(sc balancer.SubConn) {
scw, ok := sc.(*scWrapper)
if !ok {
b.ClientConn.RemoveSubConn(sc)
return
}
// Remove the original SubConn from the parent ClientConn.
//
// Note that we don't remove this SubConn from the scWrappers map. We will
// need it to forward the final SubConn state Shutdown to the child policy.
//
// This entry is kept in the map until it's state is changes to Shutdown,
// and will be deleted in UpdateSubConnState().
b.ClientConn.RemoveSubConn(scw.SubConn)
sc.Shutdown()
}

func (b *clusterImplBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
Expand Down
9 changes: 1 addition & 8 deletions xds/internal/balancer/outlierdetection/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,7 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal
}

func (b *outlierDetectionBalancer) RemoveSubConn(sc balancer.SubConn) {
scw, ok := sc.(*subConnWrapper)
if !ok { // Shouldn't happen
return
}
// Remove the wrapped SubConn from the parent Client Conn. We don't remove
// from map entry until we get a Shutdown state for the SubConn, as we need
// that data to forward that state down.
b.cc.RemoveSubConn(scw.SubConn)
sc.Shutdown()
}

// appendIfPresent appends the scw to the address, if the address is present in
Expand Down
Loading