Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: use locality from the connected address for load reporting #7378

Merged
merged 9 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,21 @@ func unregisterForTesting(name string) {
delete(m, name)
}

// getConnectedAddress returns the connected address for a SubConnState and
easwars marked this conversation as resolved.
Show resolved Hide resolved
// whether or not it is valid.
func getConnectedAddress(scs SubConnState) (resolver.Address, bool) {
return scs.connectedAddress, scs.ConnectivityState == connectivity.Ready
}

// setConnectedAddress sets the connected address for a SubConnState.
func setConnectedAddress(scs *SubConnState, addr resolver.Address) {
scs.connectedAddress = addr
}

func init() {
internal.BalancerUnregister = unregisterForTesting
internal.GetConnectedAddress = getConnectedAddress
internal.SetConnectedAddress = setConnectedAddress
}

// Get returns the resolver builder registered with the given name.
Expand Down Expand Up @@ -410,6 +423,15 @@ type SubConnState struct {
// ConnectionError is set if the ConnectivityState is TransientFailure,
// describing the reason the SubConn failed. Otherwise, it is nil.
ConnectionError error
// connectedAddr contains the connected address when ConnectivityState is
// Ready. Otherwise, it is indeterminate.
connectedAddress resolver.Address
}

func (lhs SubConnState) Equal(rhs SubConnState) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we delete this entirely when connectedAddress is removed? If so, ignore. If not, then should this accept pointers instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed entirely.

This was only added to get this single check in xds/internal/balancer/outlierdetection/balancer_test.go to pass, but that doesn't work if it uses pointers. I've removed this in favor of adding balancer.SubConnState{} to the cmp.AllowUnexported.

return lhs.ConnectivityState == rhs.ConnectivityState &&
lhs.ConnectionError == rhs.ConnectionError &&
lhs.connectedAddress.Addr == rhs.connectedAddress.Addr
easwars marked this conversation as resolved.
Show resolved Hide resolved
}

// ClientConnState describes the state of a ClientConn relevant to the
Expand Down
11 changes: 9 additions & 2 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
Expand Down Expand Up @@ -252,15 +253,21 @@ type acBalancerWrapper struct {

// updateState is invoked by grpc to push a subConn state update to the
// underlying balancer.
func (acbw *acBalancerWrapper) updateState(s connectivity.State, err error) {
func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolver.Address, err error) {
acbw.ccb.serializer.Schedule(func(ctx context.Context) {
if ctx.Err() != nil || acbw.ccb.balancer == nil {
return
}
// Even though it is optional for balancers, gracefulswitch ensures
// opts.StateListener is set, so this cannot ever be nil.
// TODO: delete this comment when UpdateSubConnState is removed.
acbw.stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
scs := balancer.SubConnState{ConnectivityState: s, ConnectionError: err}
if s == connectivity.Ready {
if sca, ok := internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)); ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please define this as a global instead.

var setConnectedAddress = internal.SetConnectedAddress.(func(*.........))

That way we don't have a conditional here and a runtime unknown. Instead it's an init time assertion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

sca(&scs, curAddr)
}
}
acbw.stateListener(scs)
})
}

Expand Down
30 changes: 15 additions & 15 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,17 +812,11 @@ func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) {
cc.csMgr.updateState(connectivity.TransientFailure)
}

// Makes a copy of the input addresses slice and clears out the balancer
// attributes field. Addresses are passed during subconn creation and address
// update operations. In both cases, we will clear the balancer attributes by
// calling this function, and therefore we will be able to use the Equal method
// provided by the resolver.Address type for comparison.
func copyAddressesWithoutBalancerAttributes(in []resolver.Address) []resolver.Address {
// Makes a copy of the input addresses slice. Addresses are passed during
// subconn creation and address update operations.
func copyAddresses(in []resolver.Address) []resolver.Address {
out := make([]resolver.Address, len(in))
for i := range in {
out[i] = in[i]
out[i].BalancerAttributes = nil
}
copy(out, in)
return out
}

Expand All @@ -837,7 +831,7 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.
ac := &addrConn{
state: connectivity.Idle,
cc: cc,
addrs: copyAddressesWithoutBalancerAttributes(addrs),
addrs: copyAddresses(addrs),
scopts: opts,
dopts: cc.dopts,
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
Expand Down Expand Up @@ -924,12 +918,18 @@ func (ac *addrConn) connect() error {
return nil
}

func equalAddress(a, b resolver.Address) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably makes sense to add a function level comment here saying why we are not using the resolver.Address.Equal and instead defining our own. Something as simple as:

// equalAddress returns true is a and b are considered equal.
// This is different from the Equal method on the resolver.Address type 
// which considers all fields to determine equality. Here, we only consider
// fields that are meaningful to the subConn.
func equalAddress(a, b resolver.Address) bool { ... }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming the function might be an even nicer improvement. equalIgnoringBalAttributes?

Copy link
Contributor Author

@townba townba Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Used equalAddressIgnoringBalAttributes so I could also have equalAddressesIgnoringBalAttributes below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should probably be pointers to avoid the copy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return a.Addr == b.Addr && a.ServerName == b.ServerName &&
a.Attributes.Equal(b.Attributes) &&
a.Metadata == b.Metadata
}

func equalAddresses(a, b []resolver.Address) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if !v.Equal(b[i]) {
if !equalAddress(v, b[i]) {
return false
}
}
easwars marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -939,7 +939,7 @@ func equalAddresses(a, b []resolver.Address) bool {
// updateAddrs updates ac.addrs with the new addresses list and handles active
// connections or connection attempts.
func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
addrs = copyAddressesWithoutBalancerAttributes(addrs)
addrs = copyAddresses(addrs)
limit := len(addrs)
if limit > 5 {
limit = 5
Expand All @@ -966,7 +966,7 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
// Try to find the connected address.
for _, a := range addrs {
a.ServerName = ac.cc.getServerName(a)
if a.Equal(ac.curAddr) {
if equalAddress(a, ac.curAddr) {
// We are connected to a valid address, so do nothing but
// update the addresses.
ac.mu.Unlock()
Expand Down Expand Up @@ -1214,7 +1214,7 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
} else {
channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
}
ac.acbw.updateState(s, lastErr)
ac.acbw.updateState(s, ac.curAddr, lastErr)
}

// adjustParams updates parameters used to create transports upon
Expand Down
7 changes: 7 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ var (
// ShuffleAddressListForTesting pseudo-randomizes the order of addresses. n
// is the number of elements. swap swaps the elements with indexes i and j.
ShuffleAddressListForTesting any // func(n int, swap func(i, j int))

// GetConnectedAddress returns the connected address for a SubConnState and
easwars marked this conversation as resolved.
Show resolved Hide resolved
// whether the address is valid based on the state.
easwars marked this conversation as resolved.
Show resolved Hide resolved
GetConnectedAddress any // func (scs SubConnState) (resolver.Address, bool)

// SetConnectedAddress sets the connected address for a SubConnState.
SetConnectedAddress any // func(scs *SubConnState, addr resolver.Address)
)

// HealthChecker defines the signature of the client-side LB channel health
Expand Down
16 changes: 16 additions & 0 deletions internal/testutils/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
)
Expand Down Expand Up @@ -76,6 +77,21 @@ func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) {
}
}

// UpdateStateAndConnectedAddress saves the connected address to state if the
easwars marked this conversation as resolved.
Show resolved Hide resolved
// connectivity state is Ready and pushes the state to the listener if one is
// registered.
func (tsc *TestSubConn) UpdateStateAndConnectedAddress(state balancer.SubConnState, addr resolver.Address) {
<-tsc.connectCalled.Done()
if state.ConnectivityState == connectivity.Ready {
sca := internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
sca(&state, addr)
}
if tsc.stateListener != nil {
tsc.stateListener(state)
return
}
}

// Shutdown pushes the SubConn to the ShutdownSubConn channel in the parent
// TestClientConn.
func (tsc *TestSubConn) Shutdown() {
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func (s) TestLoadReporting(t *testing.T) {
t.Fatal(err.Error())
}

sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateStateAndConnectedAddress(balancer.SubConnState{ConnectivityState: connectivity.Ready}, addrs[0])
// Test pick with one backend.
const successCount = 5
const errorCount = 5
Expand Down
26 changes: 19 additions & 7 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
Expand Down Expand Up @@ -360,22 +361,33 @@
func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
clusterName := b.getClusterName()
newAddrs := make([]resolver.Address, len(addrs))
var lID xdsinternal.LocalityID
for i, addr := range addrs {
newAddrs[i] = xds.SetXDSHandshakeClusterName(addr, clusterName)
lID = xdsinternal.GetLocalityID(newAddrs[i])
}
var sc balancer.SubConn
scw := &scWrapper{}
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state, oldListener) }
opts.StateListener = func(state balancer.SubConnState) {
b.updateSubConnState(sc, state, oldListener)
// Read connected address and call updateLocalityID() based on the connected
// address's locality. https://github.com/grpc/grpc-go/issues/7339
if gca, ok := internal.GetConnectedAddress.(func(balancer.SubConnState) (resolver.Address, bool)); ok {
if addr, ok := gca(state); ok {
lID := xdsinternal.GetLocalityID(addr)
if !lID.Empty() {
scw.updateLocalityID(lID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go style: let's try to avoid all this nesting:

StateListener = func... {
	b.updateSubConnState(...)
	if state != Ready {
		return
	}
	locality := xdsinternal.GetLocalityID(getConnectedAddress(state))
	if locality.Empty() {
		if logger.V(2) { log }
		return
	}
	scw.updateLocalityID(locality)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} else if b.logger.V(2) {
b.logger.Infof("Locality ID for %v unexpectedly empty", addr)

Check warning on line 380 in xds/internal/balancer/clusterimpl/clusterimpl.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterimpl/clusterimpl.go#L380

Added line #L380 was not covered by tests
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
sc, err := b.ClientConn.NewSubConn(newAddrs, opts)
if err != nil {
return nil, err
}
// Wrap this SubConn in a wrapper, and add it to the map.
ret := &scWrapper{SubConn: sc}
ret.updateLocalityID(lID)
return ret, nil
scw.SubConn = sc
return scw, nil
}

func (b *clusterImplBalancer) RemoveSubConn(sc balancer.SubConn) {
Expand Down
11 changes: 3 additions & 8 deletions xds/internal/balancer/clusterimpl/tests/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,9 @@ func (s) TestLoadReportingPickFirstMultiLocality(t *testing.T) {
}
mgmtServer.LRSServer.LRSResponseChan <- &resp

// Wait for load to be reported for locality of server 2.
// We (incorrectly) wait for load report for region-2 because presently
// pickfirst always reports load for the locality of the last address in the
// subconn. This will be fixed by ensuring there is only one address per
// subconn.
// TODO(#7339): Change region to region-1 once fixed.
if err := waitForSuccessfulLoadReport(ctx, mgmtServer.LRSServer, "region-2"); err != nil {
t.Fatalf("region-2 did not receive load due to error: %v", err)
// Wait for load to be reported for locality of server 1.
if err := waitForSuccessfulLoadReport(ctx, mgmtServer.LRSServer, "region-1"); err != nil {
t.Fatalf("Server 1 did not receive load due to error: %v", err)
}

// Stop server 1 and send one more rpc. Now the request should go to server 2.
Expand Down
5 changes: 5 additions & 0 deletions xds/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (l LocalityID) Equal(o any) bool {
return l.Region == ol.Region && l.Zone == ol.Zone && l.SubZone == ol.SubZone
}

// Empty returns whether or not the locality ID is empty.
func (l LocalityID) Empty() bool {
return len(l.Region) == 0 && len(l.Zone) == 0 && len(l.SubZone) == 0
easwars marked this conversation as resolved.
Show resolved Hide resolved
}

// LocalityIDFromString converts a json representation of locality, into a
// LocalityID struct.
func LocalityIDFromString(s string) (ret LocalityID, _ error) {
Expand Down
Loading