Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roundrobin: Delegate subchannel creation to pickfirst #7966

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions balancer/endpointsharding/endpointsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package endpointsharding
package endpointsharding_test

import (
"context"
Expand All @@ -28,6 +28,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/endpointsharding"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
Expand Down Expand Up @@ -55,7 +56,7 @@ var logger = grpclog.Component("endpoint-sharding-test")

func init() {
var err error
gracefulSwitchPickFirst, err = ParseConfig(json.RawMessage(PickFirstConfig))
gracefulSwitchPickFirst, err = endpointsharding.ParseConfig(json.RawMessage(endpointsharding.PickFirstConfig))
if err != nil {
logger.Fatal(err)
}
Expand All @@ -75,7 +76,7 @@ func (fakePetioleBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptio
ClientConn: cc,
bOpts: opts,
}
fp.Balancer = NewBalancer(fp, opts)
fp.Balancer = endpointsharding.NewBalancer(fp, opts)
return fp
}

Expand Down Expand Up @@ -105,7 +106,7 @@ func (fp *fakePetiole) UpdateClientConnState(state balancer.ClientConnState) err
}

func (fp *fakePetiole) UpdateState(state balancer.State) {
childStates := ChildStatesFromPicker(state.Picker)
childStates := endpointsharding.ChildStatesFromPicker(state.Picker)
// Both child states should be present in the child picker. States and
// picker change over the lifecycle of test, but there should always be two.
if len(childStates) != 2 {
Expand Down
3 changes: 1 addition & 2 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
// Record a connection attempt when exiting CONNECTING.
if newState.ConnectivityState == connectivity.TransientFailure {
sd.connectionFailedInFirstPass = true
sd.lastErr = newState.ConnectionError
dfawley marked this conversation as resolved.
Show resolved Hide resolved
connectionAttemptsFailedMetric.Record(b.metricsRecorder, 1, b.target)
}

Expand Down Expand Up @@ -702,7 +703,6 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
})
}
case connectivity.TransientFailure:
sd.lastErr = newState.ConnectionError
sd.effectiveState = connectivity.TransientFailure
// Since we're re-using common SubConns while handling resolver
// updates, we could receive an out of turn TRANSIENT_FAILURE from
Expand All @@ -728,7 +728,6 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
switch newState.ConnectivityState {
case connectivity.TransientFailure:
b.numTF = (b.numTF + 1) % b.subConns.Len()
sd.lastErr = newState.ConnectionError
if b.numTF%b.subConns.Len() == 0 {
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Expand Down
93 changes: 57 additions & 36 deletions balancer/roundrobin/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,60 +22,81 @@
package roundrobin

import (
rand "math/rand/v2"
"sync/atomic"
"encoding/json"
"fmt"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/endpointsharding"
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/serviceconfig"
)

// Name is the name of round_robin balancer.
const Name = "round_robin"

var logger = grpclog.Component("roundrobin")

// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
}
var (
logger = grpclog.Component("roundrobin")
// endpointSharding which specifies pick first children.
endpointShardingLBConfig serviceconfig.LoadBalancingConfig
)

func init() {
balancer.Register(newBuilder())
var err error
endpointShardingLBConfig, err = endpointsharding.ParseConfig(json.RawMessage(endpointsharding.PickFirstConfig))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was in the other PR, too.

Probably we should have endpointsharding produce a parsed PF config if it's going to be used frequently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a nice idea. I've raised a PR with this change: #8007

PTAL, I'll rebase both the PRs once #8007 is merged.

if err != nil {
logger.Fatal(err)
}

Check warning on line 50 in balancer/roundrobin/roundrobin.go

View check run for this annotation

Codecov / codecov/patch

balancer/roundrobin/roundrobin.go#L49-L50

Added lines #L49 - L50 were not covered by tests
balancer.Register(builder{})
}

type rrPickerBuilder struct{}
type builder struct{}

func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
logger.Infof("roundrobinPicker: Build called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
scs := make([]balancer.SubConn, 0, len(info.ReadySCs))
for sc := range info.ReadySCs {
scs = append(scs, sc)
func (bb builder) Name() string {
return Name
}

func (bb builder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
bal := &rrBalancer{
cc: cc,
child: endpointsharding.NewBalancer(cc, opts),
}
return &rrPicker{
subConns: scs,
// Start at a random index, as the same RR balancer rebuilds a new
// picker when SubConn states change, and we don't want to apply excess
// load to the first server in the list.
next: uint32(rand.IntN(len(scs))),
bal.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[%p] ", bal))
bal.logger.Infof("Created")
return bal
}

type rrBalancer struct {
cc balancer.ClientConn
child balancer.Balancer
logger *internalgrpclog.PrefixLogger
}

func (b *rrBalancer) Close() {
b.child.Close()
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}

func (b *rrBalancer) ExitIdle() {
// Should always be ok, as child is endpoint sharding.
if ei, ok := b.child.(balancer.ExitIdler); ok {
ei.ExitIdle()

Check warning on line 83 in balancer/roundrobin/roundrobin.go

View check run for this annotation

Codecov / codecov/patch

balancer/roundrobin/roundrobin.go#L83

Added line #L83 was not covered by tests
}
}

type rrPicker struct {
// subConns is the snapshot of the roundrobin balancer when this picker was
// created. The slice is immutable. Each Get() will do a round robin
// selection from it and return the selected SubConn.
subConns []balancer.SubConn
next uint32
func (b *rrBalancer) ResolverError(err error) {
// Will cause inline picker update from endpoint sharding.
b.child.ResolverError(err)
}

func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
subConnsLen := uint32(len(p.subConns))
nextIndex := atomic.AddUint32(&p.next, 1)
func (b *rrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
// Enable the health listener in pickfirst children for client side health
// checks and outlier detection, if configured.
ccs.ResolverState = pickfirstleaf.EnableHealthListener(ccs.ResolverState)
ccs.BalancerConfig = endpointShardingLBConfig
return b.child.UpdateClientConnState(ccs)
}

sc := p.subConns[nextIndex%subConnsLen]
return balancer.PickResult{SubConn: sc}, nil
func (b *rrBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)

Check warning on line 101 in balancer/roundrobin/roundrobin.go

View check run for this annotation

Codecov / codecov/patch

balancer/roundrobin/roundrobin.go#L100-L101

Added lines #L100 - L101 were not covered by tests
}
1 change: 1 addition & 0 deletions balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func (b *wrrBalancer) Close() {
ew.stopORCAListener()
}
}
b.child.Close()
}

func (b *wrrBalancer) ExitIdle() {
Expand Down
Loading
Loading