Skip to content

Commit

Permalink
xds: Plumb EDS endpoints through xDS Balancer Tree (#7816)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq authored Nov 11, 2024
1 parent c2a2d20 commit d2c1aae
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 1 deletion.
2 changes: 2 additions & 0 deletions balancer/weightedtarget/weightedtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
endpointsSplit := hierarchy.GroupEndpoints(s.ResolverState.Endpoints)

b.stateAggregator.PauseStateUpdates()
defer b.stateAggregator.ResumeStateUpdates()
Expand Down Expand Up @@ -155,6 +156,7 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
_ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: addressesSplit[name],
Endpoints: endpointsSplit[name],
ServiceConfig: s.ResolverState.ServiceConfig,
Attributes: s.ResolverState.Attributes.WithValue(localityKey, name),
},
Expand Down
59 changes: 59 additions & 0 deletions internal/hierarchy/hierarchy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ func (p pathValue) Equal(o any) bool {
return true
}

// FromEndpoint returns the hierarchical path of endpoint.
func FromEndpoint(endpoint resolver.Endpoint) []string {
path, _ := endpoint.Attributes.Value(pathKey).(pathValue)
return path
}

// SetInEndpoint overrides the hierarchical path in endpoint with path.
func SetInEndpoint(endpoint resolver.Endpoint, path []string) resolver.Endpoint {
endpoint.Attributes = endpoint.Attributes.WithValue(pathKey, pathValue(path))
return endpoint
}

// Get returns the hierarchical path of addr.
func Get(addr resolver.Address) []string {
attrs := addr.BalancerAttributes
Expand Down Expand Up @@ -110,3 +122,50 @@ func Group(addrs []resolver.Address) map[string][]resolver.Address {
}
return ret
}

// GroupEndpoints splits a slice of endpoints into groups based on
// the first hierarchy path. The first hierarchy path will be removed from the
// result.
//
// Input:
// [
//
// {endpoint0, path: [p0, wt0]}
// {endpoint1, path: [p0, wt1]}
// {endpoint2, path: [p1, wt2]}
// {endpoint3, path: [p1, wt3]}
//
// ]
//
// Endpoints will be split into p0/p1, and the p0/p1 will be removed from the
// path.
//
// Output:
//
// {
// p0: [
// {endpoint0, path: [wt0]},
// {endpoint1, path: [wt1]},
// ],
// p1: [
// {endpoint2, path: [wt2]},
// {endpoint3, path: [wt3]},
// ],
// }
//
// If hierarchical path is not set, or has no path in it, the endpoint is
// dropped.
func GroupEndpoints(endpoints []resolver.Endpoint) map[string][]resolver.Endpoint {
ret := make(map[string][]resolver.Endpoint)
for _, endpoint := range endpoints {
oldPath := FromEndpoint(endpoint)
if len(oldPath) == 0 {
continue
}
curPath := oldPath[0]
newPath := oldPath[1:]
newEndpoint := SetInEndpoint(endpoint, newPath)
ret[curPath] = append(ret[curPath], newEndpoint)
}
return ret
}
2 changes: 2 additions & 0 deletions xds/internal/balancer/clustermanager/clustermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) er
// TODO: Get rid of handling hierarchy in addresses. This LB policy never
// gets addresses from the resolver.
addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
endpointsSplit := hierarchy.GroupEndpoints(s.ResolverState.Endpoints)

// Remove sub-balancers that are not in the new list from the aggregator and
// balancergroup.
Expand Down Expand Up @@ -139,6 +140,7 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) er
if err := b.bg.UpdateClientConnState(childName, balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: addressesSplit[childName],
Endpoints: endpointsSplit[childName],
ServiceConfig: s.ResolverState.ServiceConfig,
Attributes: s.ResolverState.Attributes,
},
Expand Down
1 change: 0 additions & 1 deletion xds/internal/balancer/clusterresolver/clusterresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ func (b *clusterResolverBalancer) updateChildConfig() {
for i, a := range addrs {
endpoints[i].Attributes = a.BalancerAttributes
endpoints[i].Addresses = []resolver.Address{a}
endpoints[i].Addresses[0].BalancerAttributes = nil
}
if err := b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Expand Down
3 changes: 3 additions & 0 deletions xds/internal/balancer/priority/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
endpointsSplit := hierarchy.GroupEndpoints(s.ResolverState.Endpoints)

b.mu.Lock()
// Create and remove children, since we know all children from the config
Expand All @@ -142,6 +143,7 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err
cb := newChildBalancer(name, b, bb.Name(), b.cc)
cb.updateConfig(newSubConfig, resolver.State{
Addresses: addressesSplit[name],
Endpoints: endpointsSplit[name],
ServiceConfig: s.ResolverState.ServiceConfig,
Attributes: s.ResolverState.Attributes,
})
Expand All @@ -163,6 +165,7 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err
// be built, if it's a low priority).
currentChild.updateConfig(newSubConfig, resolver.State{
Addresses: addressesSplit[name],
Endpoints: endpointsSplit[name],
ServiceConfig: s.ResolverState.ServiceConfig,
Attributes: s.ResolverState.Attributes,
})
Expand Down

0 comments on commit d2c1aae

Please sign in to comment.