Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

all: replace RemoveSubConn with Shutdown as much as possible #6505

Merged
merged 2 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,9 @@ type Balancer interface {
// Deprecated: Use NewSubConnOptions.StateListener when creating the
// SubConn instead.
UpdateSubConnState(SubConn, SubConnState)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
// Close closes the balancer. The balancer is not currently required to
// call SubConn.Shutdown for its existing SubConns; however, this will be
// required in a future release, so it is recommended.
Close()
}

Expand Down
8 changes: 4 additions & 4 deletions balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
sc := sci.(balancer.SubConn)
// a was removed by resolver.
if _, ok := addrsSet.Get(a); !ok {
b.cc.RemoveSubConn(sc)
sc.Shutdown()
b.subConns.Delete(a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
Expand Down Expand Up @@ -204,8 +204,8 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
// When an address was removed by resolver, b called Shutdown but kept
// the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
case connectivity.TransientFailure:
// Save error to be reported via picker.
Expand All @@ -226,7 +226,7 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
}

// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
// and it doesn't need to call Shutdown for the SubConns.
func (b *baseBalancer) Close() {
}

Expand Down
8 changes: 4 additions & 4 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ type lbBalancer struct {
backendAddrsWithoutMetadata []resolver.Address
// Roundrobin functionalities.
state connectivity.State
subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn.
subConns map[resolver.Address]balancer.SubConn // Used to new/shutdown SubConn.
scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
picker balancer.Picker
// Support fallback to resolved backend addresses if there's no response
Expand Down Expand Up @@ -290,7 +290,7 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
// aggregateSubConnStats calculate the aggregated state of SubConns in
// lb.SubConns. These SubConns are subconns in use (when switching between
// fallback and grpclb). lb.scState contains states for all SubConns, including
// those in cache (SubConns are cached for 10 seconds after remove).
// those in cache (SubConns are cached for 10 seconds after shutdown).
//
// The aggregated state is:
// - If at least one SubConn in Ready, the aggregated state is Ready;
Expand Down Expand Up @@ -345,8 +345,8 @@ func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubCo
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
// When an address was removed by resolver, b called Shutdown but kept
// the sc's state in scStates. Remove state for this sc here.
delete(lb.scStates, sc)
case connectivity.TransientFailure:
lb.connErr = scs.ConnectionError
Expand Down
13 changes: 3 additions & 10 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
}

balancingPolicyChanged := lb.usePickFirst != pickFirst
oldUsePickFirst := lb.usePickFirst
lb.usePickFirst = pickFirst

if fallbackModeChanged || balancingPolicyChanged {
Expand All @@ -123,13 +122,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
// For fallback mode switching with pickfirst, we want to recreate the
// SubConn because the creds could be different.
for a, sc := range lb.subConns {
if oldUsePickFirst {
// If old SubConn were created for pickfirst, bypass cache and
// remove directly.
lb.cc.ClientConn.RemoveSubConn(sc)
} else {
lb.cc.RemoveSubConn(sc)
}
sc.Shutdown()
delete(lb.subConns, a)
}
}
Expand All @@ -144,7 +137,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
}
if sc != nil {
if len(backendAddrs) == 0 {
lb.cc.ClientConn.RemoveSubConn(sc)
sc.Shutdown()
delete(lb.subConns, scKey)
return
}
Expand Down Expand Up @@ -197,7 +190,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
for a, sc := range lb.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
lb.cc.RemoveSubConn(sc)
sc.Shutdown()
delete(lb.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
Expand Down
12 changes: 6 additions & 6 deletions balancer/grpclb/grpclb_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r *lbManualResolver) UpdateState(s resolver.State) {
const subConnCacheTime = time.Second * 10

// lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache.
// SubConns will be kept in cache for subConnCacheTime before being removed.
// SubConns will be kept in cache for subConnCacheTime before being shut down.
//
// Its NewSubconn and SubConn.Shutdown methods are updated to do cache first.
type lbCacheClientConn struct {
Expand Down Expand Up @@ -149,7 +149,7 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer
}

func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc)
}

type lbCacheSubConn struct {
Expand All @@ -168,9 +168,9 @@ func (sc *lbCacheSubConn) Shutdown() {

if entry, ok := ccc.subConnCache[addr]; ok {
if entry.sc != sc {
// This could happen if NewSubConn was called multiple times for the
// same address, and those SubConns are all removed. We remove sc
// immediately here.
// This could happen if NewSubConn was called multiple times for
// the same address, and those SubConns are all shut down. We
// remove sc immediately here.
delete(ccc.subConnToAddr, sc)
sc.SubConn.Shutdown()
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func (ccc *lbCacheClientConn) UpdateState(s balancer.State) {
func (ccc *lbCacheClientConn) close() {
ccc.mu.Lock()
defer ccc.mu.Unlock()
// Only cancel all existing timers. There's no need to remove SubConns.
// Only cancel all existing timers. There's no need to shut down SubConns.
for _, entry := range ccc.subConnCache {
entry.cancel()
}
Expand Down
26 changes: 13 additions & 13 deletions balancer/grpclb/grpclb_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.Ne
}

func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
panic(fmt.Sprintf("RemoveSubConn(%v) called unexpectedly", sc))
}

const testCacheTimeout = 100 * time.Millisecond
Expand All @@ -87,7 +87,7 @@ func checkCacheCC(ccc *lbCacheClientConn, sccLen, sctaLen int) error {
return nil
}

// Test that SubConn won't be immediately removed.
// Test that SubConn won't be immediately shut down.
func (s) TestLBCacheClientConnExpire(t *testing.T) {
mcc := newMockClientConn()
if err := checkMockCC(mcc, 0); err != nil {
Expand All @@ -110,7 +110,7 @@ func (s) TestLBCacheClientConnExpire(t *testing.T) {
t.Fatal(err)
}

ccc.RemoveSubConn(sc)
sc.Shutdown()
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -138,7 +138,7 @@ func (s) TestLBCacheClientConnExpire(t *testing.T) {
}
}

// Test that NewSubConn with the same address of a SubConn being removed will
// Test that NewSubConn with the same address of a SubConn being shut down will
// reuse the SubConn and cancel the removing.
func (s) TestLBCacheClientConnReuse(t *testing.T) {
mcc := newMockClientConn()
Expand All @@ -162,7 +162,7 @@ func (s) TestLBCacheClientConnReuse(t *testing.T) {
t.Fatal(err)
}

ccc.RemoveSubConn(sc)
sc.Shutdown()
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -195,8 +195,8 @@ func (s) TestLBCacheClientConnReuse(t *testing.T) {
t.Fatal(err)
}

// Call remove again, will delete after timeout.
ccc.RemoveSubConn(sc)
// Call Shutdown again, will delete after timeout.
sc.Shutdown()
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
Expand All @@ -223,9 +223,9 @@ func (s) TestLBCacheClientConnReuse(t *testing.T) {
}
}

// Test that if the timer to remove a SubConn fires at the same time NewSubConn
// cancels the timer, it doesn't cause deadlock.
func (s) TestLBCache_RemoveTimer_New_Race(t *testing.T) {
// Test that if the timer to shut down a SubConn fires at the same time
// NewSubConn cancels the timer, it doesn't cause deadlock.
func (s) TestLBCache_ShutdownTimer_New_Race(t *testing.T) {
mcc := newMockClientConn()
if err := checkMockCC(mcc, 0); err != nil {
t.Fatal(err)
Expand All @@ -251,9 +251,9 @@ func (s) TestLBCache_RemoveTimer_New_Race(t *testing.T) {

go func() {
for i := 0; i < 1000; i++ {
// Remove starts a timer with 1 ns timeout, the NewSubConn will race
// with with the timer.
ccc.RemoveSubConn(sc)
// Shutdown starts a timer with 1 ns timeout, the NewSubConn will
// race with with the timer.
sc.Shutdown()
sc, _ = ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
}
close(done)
Expand Down
2 changes: 1 addition & 1 deletion balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (b *wrrBalancer) updateAddresses(addrs []resolver.Address) {
// addr was removed by resolver. Remove.
wsci, _ := b.subConns.Get(addr)
wsc := wsci.(*weightedSubConn)
b.cc.RemoveSubConn(wsc.SubConn)
wsc.SubConn.Shutdown()
b.subConns.Delete(addr)
}
}
Expand Down
52 changes: 26 additions & 26 deletions balancer/weightedtarget/weightedtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,12 @@ func (s) TestWeightedTarget(t *testing.T) {
// attribute set to the config that was passed to it.
verifyAddressInNewSubConn(t, cc, setConfigKey(addr2, "cluster_2"))

// The subconn for cluster_1 should be removed.
scRemoved := <-cc.RemoveSubConnCh
if scRemoved != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
// The subconn for cluster_1 should be shut down.
scShutdown := <-cc.ShutdownSubConnCh
if scShutdown != sc1 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
}
scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})

sc2 := <-cc.NewSubConnCh
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
Expand Down Expand Up @@ -286,12 +286,12 @@ func (s) TestWeightedTarget(t *testing.T) {
}
verifyAddressInNewSubConn(t, cc, addr3)

// The subconn from the test_config_balancer should be removed.
scRemoved = <-cc.RemoveSubConnCh
if scRemoved != sc2 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
// The subconn from the test_config_balancer should be shut down.
scShutdown = <-cc.ShutdownSubConnCh
if scShutdown != sc2 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
}
scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})

// Send subconn state change.
sc3 := <-cc.NewSubConnCh
Expand Down Expand Up @@ -409,12 +409,12 @@ func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) {
t.Fatalf("failed to update ClientConn state: %v", err)
}

// Expect one SubConn to be removed.
scRemoved := <-cc.RemoveSubConnCh
if scRemoved != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
// Expect one SubConn to be shut down.
scShutdown := <-cc.ShutdownSubConnCh
if scShutdown != sc1 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
}
scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
p = <-cc.NewPickerCh

// Test pick with only the second SubConn.
Expand Down Expand Up @@ -579,7 +579,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
t.Fatalf("want %v, got %v", want, err)
}

// Remove subConn corresponding to addr3.
// Shut down subConn corresponding to addr3.
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
Expand All @@ -590,11 +590,11 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
scRemoved := <-cc.RemoveSubConnCh
if scRemoved != sc3 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc3, scRemoved)
scShutdown := <-cc.ShutdownSubConnCh
if scShutdown != sc3 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc3, scShutdown)
}
scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
p = <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc4}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
Expand Down Expand Up @@ -823,9 +823,9 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
// picker which ensures that the removed subBalancer is not picked for RPCs.
p = <-cc.NewPickerCh

scRemoved := <-cc.RemoveSubConnCh
if scRemoved != sc2 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scRemoved)
scShutdown := <-cc.ShutdownSubConnCh
if scShutdown != sc2 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc2, scShutdown)
}
want = []balancer.SubConn{sc1, sc3}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
Expand Down Expand Up @@ -865,9 +865,9 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
// Removing a subBalancer causes the weighted target LB policy to push a new
// picker which ensures that the removed subBalancer is not picked for RPCs.

scRemoved = <-cc.RemoveSubConnCh
if scRemoved != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
scShutdown = <-cc.ShutdownSubConnCh
if scShutdown != sc1 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down
34 changes: 16 additions & 18 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,23 +311,8 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
}

func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
if ccb.isIdleOrClosed() {
// It it safe to ignore this call when the balancer is closed or in idle
// because the ClientConn takes care of closing the connections.
//
// Not returning early from here when the balancer is closed or in idle
// leads to a deadlock though, because of the following sequence of
// calls when holding cc.mu:
// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
// ccb.RemoveAddrConn --> cc.removeAddrConn
return
}

acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
}
ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
// The graceful switch balancer will never call this.
logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc")
}

func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
Expand Down Expand Up @@ -392,7 +377,20 @@ func (acbw *acBalancerWrapper) Connect() {
}

func (acbw *acBalancerWrapper) Shutdown() {
acbw.ccb.RemoveSubConn(acbw)
ccb := acbw.ccb
if ccb.isIdleOrClosed() {
// It it safe to ignore this call when the balancer is closed or in idle
// because the ClientConn takes care of closing the connections.
//
// Not returning early from here when the balancer is closed or in idle
// leads to a deadlock though, because of the following sequence of
// calls when holding cc.mu:
// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
// ccb.RemoveAddrConn --> cc.removeAddrConn
return
}

ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
}

// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
Expand Down
Loading