Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roundrobin: Delegate subchannel creation to pickfirst #7966

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions balancer/endpointsharding/endpointsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package endpointsharding
package endpointsharding_test

import (
"context"
Expand All @@ -28,6 +28,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/endpointsharding"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
Expand Down Expand Up @@ -55,7 +56,7 @@ var logger = grpclog.Component("endpoint-sharding-test")

func init() {
var err error
gracefulSwitchPickFirst, err = ParseConfig(json.RawMessage(PickFirstConfig))
gracefulSwitchPickFirst, err = endpointsharding.ParseConfig(json.RawMessage(endpointsharding.PickFirstConfig))
if err != nil {
logger.Fatal(err)
}
Expand All @@ -75,7 +76,7 @@ func (fakePetioleBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptio
ClientConn: cc,
bOpts: opts,
}
fp.Balancer = NewBalancer(fp, opts)
fp.Balancer = endpointsharding.NewBalancer(fp, opts)
return fp
}

Expand Down Expand Up @@ -105,7 +106,7 @@ func (fp *fakePetiole) UpdateClientConnState(state balancer.ClientConnState) err
}

func (fp *fakePetiole) UpdateState(state balancer.State) {
childStates := ChildStatesFromPicker(state.Picker)
childStates := endpointsharding.ChildStatesFromPicker(state.Picker)
// Both child states should be present in the child picker. States and
// picker change over the lifecycle of test, but there should always be two.
if len(childStates) != 2 {
Expand Down
77 changes: 39 additions & 38 deletions balancer/roundrobin/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,60 +22,61 @@
package roundrobin

import (
rand "math/rand/v2"
"sync/atomic"
"encoding/json"
"fmt"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/endpointsharding"
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/serviceconfig"
)

// Name is the name of round_robin balancer.
const Name = "round_robin"

var logger = grpclog.Component("roundrobin")

// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
}
var (
logger = grpclog.Component("roundrobin")
// endpointSharding which specifies pick first children.
endpointShardingLBConfig serviceconfig.LoadBalancingConfig
)

func init() {
balancer.Register(newBuilder())
var err error
endpointShardingLBConfig, err = endpointsharding.ParseConfig(json.RawMessage(endpointsharding.PickFirstConfig))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was in the other PR, too.

Probably we should have endpointsharding produce a parsed PF config if it's going to be used frequently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a nice idea. I've raised a PR with this change: #8007

PTAL, I'll rebase both the PRs once #8007 is merged.

if err != nil {
logger.Fatal(err)
}

Check warning on line 50 in balancer/roundrobin/roundrobin.go

View check run for this annotation

Codecov / codecov/patch

balancer/roundrobin/roundrobin.go#L49-L50

Added lines #L49 - L50 were not covered by tests
balancer.Register(builder{})
}

type rrPickerBuilder struct{}
type builder struct{}

func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
logger.Infof("roundrobinPicker: Build called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
scs := make([]balancer.SubConn, 0, len(info.ReadySCs))
for sc := range info.ReadySCs {
scs = append(scs, sc)
}
return &rrPicker{
subConns: scs,
// Start at a random index, as the same RR balancer rebuilds a new
// picker when SubConn states change, and we don't want to apply excess
// load to the first server in the list.
next: uint32(rand.IntN(len(scs))),
}
func (bb builder) Name() string {
return Name
}

type rrPicker struct {
// subConns is the snapshot of the roundrobin balancer when this picker was
// created. The slice is immutable. Each Get() will do a round robin
// selection from it and return the selected SubConn.
subConns []balancer.SubConn
next uint32
func (bb builder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
bal := &rrBalancer{
cc: cc,
Balancer: endpointsharding.NewBalancer(cc, opts),
}
bal.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[%p] ", bal))
bal.logger.Infof("Created")
return bal
}

func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
subConnsLen := uint32(len(p.subConns))
nextIndex := atomic.AddUint32(&p.next, 1)
type rrBalancer struct {
balancer.Balancer
cc balancer.ClientConn
logger *internalgrpclog.PrefixLogger
}

sc := p.subConns[nextIndex%subConnsLen]
return balancer.PickResult{SubConn: sc}, nil
func (b *rrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
// Enable the health listener in pickfirst children for client side health
// checks and outlier detection, if configured.
ccs.ResolverState = pickfirstleaf.EnableHealthListener(ccs.ResolverState)
ccs.BalancerConfig = endpointShardingLBConfig
return b.Balancer.UpdateClientConnState(ccs)
}
1 change: 1 addition & 0 deletions balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func (b *wrrBalancer) Close() {
ew.stopORCAListener()
}
}
b.child.Close()
}

func (b *wrrBalancer) ExitIdle() {
Expand Down
Loading
Loading