Skip to content

Commit

Permalink
base: update base balancer for new APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Aug 3, 2023
1 parent c88a003 commit bff5391
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 11 deletions.
22 changes: 16 additions & 6 deletions balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
addrsSet.Set(a, nil)
if _, ok := b.subConns.Get(a); !ok {
// a is a new address (not existing in b.subConns).
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
var sc balancer.SubConn
opts := balancer.NewSubConnOptions{
HealthCheckEnabled: b.config.HealthCheck,
StateListener: func(scs balancer.SubConnState) { b.updateSubConnState(sc, scs) },
}
sc, err := b.cc.NewSubConn([]resolver.Address{a}, opts)
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
Expand All @@ -121,10 +126,10 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
sc := sci.(balancer.SubConn)
// a was removed by resolver.
if _, ok := addrsSet.Get(a); !ok {
b.cc.RemoveSubConn(sc)
sc.Shutdown()
b.subConns.Delete(a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
// The entry will be deleted in updateSubConnState.
}
}
// If resolver state contains no addresses, return an error so ClientConn
Expand Down Expand Up @@ -177,7 +182,12 @@ func (b *baseBalancer) regeneratePicker() {
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
}

// UpdateSubConnState is a nop because a StateListener is always set in NewSubConn.
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
logger.Errorf("base.baseBalancer: UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

func (b *baseBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
if logger.V(2) {
logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
Expand All @@ -204,8 +214,8 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
// When an address was removed by resolver, b called Shutdown but kept
// the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
case connectivity.TransientFailure:
// Save error to be reported via picker.
Expand All @@ -226,7 +236,7 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
}

// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
// and it doesn't need to call Shutdown for the SubConns.
func (b *baseBalancer) Close() {
}

Expand Down
28 changes: 23 additions & 5 deletions balancer/base/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package base

import (
"context"
"testing"
"time"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
Expand All @@ -38,7 +40,9 @@ func (c *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewS

func (c *testClientConn) UpdateState(balancer.State) {}

type testSubConn struct{}
type testSubConn struct {
updateState func(balancer.SubConnState)
}

func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}

Expand All @@ -61,7 +65,11 @@ func (p *testPickBuilder) Build(info PickerBuildInfo) balancer.Picker {
}

func TestBaseBalancerReserveAttributes(t *testing.T) {
var v = func(info PickerBuildInfo) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
validated := make(chan struct{}, 1)
v := func(info PickerBuildInfo) {
defer func() { validated <- struct{}{} }()
for _, sc := range info.ReadySCs {
if sc.Address.Addr == "1.1.1.1" {
if sc.Address.Attributes == nil {
Expand All @@ -80,8 +88,8 @@ func TestBaseBalancerReserveAttributes(t *testing.T) {
}
pickBuilder := &testPickBuilder{validate: v}
b := (&baseBuilder{pickerBuilder: pickBuilder}).Build(&testClientConn{
newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
return &testSubConn{}, nil
newSubConn: func(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
return &testSubConn{updateState: opts.StateListener}, nil
},
}, balancer.BuildOptions{}).(*baseBalancer)

Expand All @@ -93,8 +101,18 @@ func TestBaseBalancerReserveAttributes(t *testing.T) {
},
},
})
select {
case <-validated:
case <-ctx.Done():
t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build")
}

for sc := range b.scStates {
b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil})
sc.(*testSubConn).updateState(balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil})
select {
case <-validated:
case <-ctx.Done():
t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build")
}
}
}

0 comments on commit bff5391

Please sign in to comment.