diff --git a/balancer/balancer.go b/balancer/balancer.go index 788759bde4b5..ab531f4c0b80 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -101,6 +101,9 @@ type SubConn interface { // a new connection will be created. // // This will trigger a state transition for the SubConn. + // + // Deprecated: This method is now part of the ClientConn interface and will + // eventually be removed from here. UpdateAddresses([]resolver.Address) // Connect starts the connecting for this SubConn. Connect() @@ -143,6 +146,13 @@ type ClientConn interface { // RemoveSubConn removes the SubConn from ClientConn. // The SubConn will be shutdown. RemoveSubConn(SubConn) + // UpdateAddresses updates the addresses used in the passed in SubConn. + // gRPC checks if the currently connected address is still in the new list. + // If so, the connection will be kept. Else, the connection will be + // gracefully closed, and a new connection will be created. + // + // This will trigger a state transition for the SubConn. + UpdateAddresses(SubConn, []resolver.Address) // UpdateState notifies gRPC that the balancer's internal state has // changed. diff --git a/balancer/base/balancer.go b/balancer/base/balancer.go index e0d34288ccfc..383d02ec2bf5 100644 --- a/balancer/base/balancer.go +++ b/balancer/base/balancer.go @@ -135,7 +135,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { // The SubConn does a reflect.DeepEqual of the new and old // addresses. So this is a noop if the current address is the same // as the old one (including attributes). - sc.UpdateAddresses([]resolver.Address{a}) + b.cc.UpdateAddresses(sc, []resolver.Address{a}) } } for a, sc := range b.subConns { diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index 08d326ab40b0..5ac8d86bd570 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -140,7 +140,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback break } if sc != nil { - sc.UpdateAddresses(backendAddrs) + lb.cc.cc.UpdateAddresses(sc, backendAddrs) sc.Connect() return } diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 11e592aabb01..70e07dbe1d63 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -163,6 +163,22 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) } +func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { + acbw, ok := sc.(*acBalancerWrapper) + if !ok { + return + } + + ccb.mu.Lock() + if ccb.subConns == nil { + ccb.mu.Unlock() + return + } + ccb.mu.Unlock() + + acbw.UpdateAddresses(addrs) +} + func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { ccb.mu.Lock() defer ccb.mu.Unlock() diff --git a/pickfirst.go b/pickfirst.go index 56e33f6c76b7..b858c2a5e63b 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -84,7 +84,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) e b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}}) b.sc.Connect() } else { - b.sc.UpdateAddresses(cs.ResolverState.Addresses) + b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses) b.sc.Connect() } return nil diff --git a/xds/internal/testutils/balancer.go b/xds/internal/testutils/balancer.go index 673c4ee9a9b6..4973cbc96e5a 100644 --- a/xds/internal/testutils/balancer.go +++ b/xds/internal/testutils/balancer.go @@ -123,6 +123,9 @@ func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) { } } +// UpdateAddresses updates the addresses on the SubConn. +func (tcc *TestClientConn) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {} + // UpdateState updates connectivity state and picker. func (tcc *TestClientConn) UpdateState(bs balancer.State) { tcc.logger.Logf("testClientConn: UpdateState(%v)", bs)