Skip to content

Commit

Permalink
Make least_request delegate to pickfirst
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Dec 25, 2024
1 parent 724f450 commit 1fcd5db
Showing 1 changed file with 133 additions and 58 deletions.
191 changes: 133 additions & 58 deletions balancer/leastrequest/leastrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,35 @@ import (
"encoding/json"
"fmt"
rand "math/rand/v2"
"sync"
"sync/atomic"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/endpointsharding"
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)

// randuint32 is a global to stub out in tests.
var randuint32 = rand.Uint32

// Name is the name of the least request balancer.
const Name = "least_request_experimental"

var logger = grpclog.Component("least-request")
var (
// randuint32 is a global to stub out in tests.
randuint32 = rand.Uint32
endpointShardingLBConfig serviceconfig.LoadBalancingConfig
logger = grpclog.Component("least-request")
)

func init() {
var err error
endpointShardingLBConfig, err = endpointsharding.ParseConfig(json.RawMessage(endpointsharding.PickFirstConfig))
if err != nil {
logger.Fatal(err)
}

Check warning on line 54 in balancer/leastrequest/leastrequest.go

View check run for this annotation

Codecov / codecov/patch

balancer/leastrequest/leastrequest.go#L53-L54

Added lines #L53 - L54 were not covered by tests
balancer.Register(bb{})
}

Expand Down Expand Up @@ -80,104 +92,167 @@ func (bb) Name() string {
}

func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*atomic.Int32)}
baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true})
b.Balancer = baseBuilder.Build(cc, bOpts)
b := &leastRequestBalancer{
ClientConn: cc,
endpointRPCCounts: resolver.NewEndpointMap(),
choiceCount: 2,
}
b.child = endpointsharding.NewBalancer(b, bOpts)
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[%p] ", b))
b.logger.Infof("Created")
return b
}

type leastRequestBalancer struct {
// Embeds balancer.Balancer because needs to intercept UpdateClientConnState
// to learn about choiceCount.
balancer.Balancer
// Embeds balancer.ClientConn because needs to intercept UpdateState calls
// from the child balancer.
balancer.ClientConn
child balancer.Balancer
logger *internalgrpclog.PrefixLogger

mu sync.Mutex
choiceCount uint32
scRPCCounts map[balancer.SubConn]*atomic.Int32 // Hold onto RPC counts to keep track for subsequent picker updates.
// endpointRPCCounts holds RPC counts to keep track for subsequent picker
// updates.
endpointRPCCounts *resolver.EndpointMap // endpoint -> *atomic.Int32
}

// Close implements balancer.Balancer.
func (lrb *leastRequestBalancer) Close() {
lrb.child.Close()
lrb.endpointRPCCounts = nil
}

func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
lrCfg, ok := s.BalancerConfig.(*LBConfig)
// ResolverError implements balancer.Balancer.
func (lrb *leastRequestBalancer) ResolverError(err error) {
lrb.child.ResolverError(err)

Check warning on line 128 in balancer/leastrequest/leastrequest.go

View check run for this annotation

Codecov / codecov/patch

balancer/leastrequest/leastrequest.go#L127-L128

Added lines #L127 - L128 were not covered by tests
}

// UpdateSubConnState implements balancer.Balancer.
func (lrb *leastRequestBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
lrb.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)

Check warning on line 133 in balancer/leastrequest/leastrequest.go

View check run for this annotation

Codecov / codecov/patch

balancer/leastrequest/leastrequest.go#L132-L133

Added lines #L132 - L133 were not covered by tests
}

func (lrb *leastRequestBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
lrCfg, ok := ccs.BalancerConfig.(*LBConfig)
if !ok {
logger.Errorf("least-request: received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
logger.Errorf("least-request: received config with unexpected type %T: %v", ccs.BalancerConfig, ccs.BalancerConfig)

Check warning on line 139 in balancer/leastrequest/leastrequest.go

View check run for this annotation

Codecov / codecov/patch

balancer/leastrequest/leastrequest.go#L139

Added line #L139 was not covered by tests
return balancer.ErrBadResolverState
}

lrb.mu.Lock()
lrb.choiceCount = lrCfg.ChoiceCount
return lrb.Balancer.UpdateClientConnState(s)
lrb.mu.Unlock()
// Enable the health listener in pickfirst children for client side health
// checks and outlier detection, if configured.
ccs.ResolverState = pickfirstleaf.EnableHealthListener(ccs.ResolverState)
ccs.BalancerConfig = endpointShardingLBConfig
return lrb.child.UpdateClientConnState(ccs)
}

type scWithRPCCount struct {
sc balancer.SubConn
type endpointState struct {
picker balancer.Picker
numRPCs *atomic.Int32
}

func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picker {
if logger.V(2) {
logger.Infof("least-request: Build called with info: %v", info)
func (lrb *leastRequestBalancer) UpdateState(state balancer.State) {
var readyEndpoints []endpointsharding.ChildState
for _, child := range endpointsharding.ChildStatesFromPicker(state.Picker) {
if child.State.ConnectivityState == connectivity.Ready {
readyEndpoints = append(readyEndpoints, child)
}
}
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)

// If no ready pickers are present, simply defer to the round robin picker
// from endpoint sharding, which will round robin across the most relevant
// pick first children in the highest precedence connectivity state.
if len(readyEndpoints) == 0 {
lrb.ClientConn.UpdateState(state)
return
}

for sc := range lrb.scRPCCounts {
if _, ok := info.ReadySCs[sc]; !ok { // If no longer ready, no more need for the ref to count active RPCs.
delete(lrb.scRPCCounts, sc)
}
lrb.mu.Lock()
defer lrb.mu.Unlock()

if logger.V(2) {
lrb.logger.Infof("UpdateState called with ready endpoints: %v", readyEndpoints)

Check warning on line 178 in balancer/leastrequest/leastrequest.go

View check run for this annotation

Codecov / codecov/patch

balancer/leastrequest/leastrequest.go#L178

Added line #L178 was not covered by tests
}

// Create new refs if needed.
for sc := range info.ReadySCs {
if _, ok := lrb.scRPCCounts[sc]; !ok {
lrb.scRPCCounts[sc] = new(atomic.Int32)
// Reconcile endpoints.
newEndpoints := resolver.NewEndpointMap() // endpoint -> nil
for _, child := range readyEndpoints {
newEndpoints.Set(child.Endpoint, nil)
}

// If endpoints are no longer ready, no need to count their active RPCs.
for _, endpoint := range lrb.endpointRPCCounts.Keys() {
if _, ok := newEndpoints.Get(endpoint); !ok {
lrb.endpointRPCCounts.Delete(endpoint)

Check warning on line 190 in balancer/leastrequest/leastrequest.go

View check run for this annotation

Codecov / codecov/patch

balancer/leastrequest/leastrequest.go#L190

Added line #L190 was not covered by tests
}
}

// Copy refs to counters into picker.
scs := make([]scWithRPCCount, 0, len(info.ReadySCs))
for sc := range info.ReadySCs {
scs = append(scs, scWithRPCCount{
sc: sc,
numRPCs: lrb.scRPCCounts[sc], // guaranteed to be present due to algorithm
endpointStates := make([]endpointState, 0, len(readyEndpoints))
for _, child := range readyEndpoints {
var counter *atomic.Int32
if val, ok := lrb.endpointRPCCounts.Get(child.Endpoint); !ok {
// Create new counts if needed.
counter = new(atomic.Int32)
lrb.endpointRPCCounts.Set(child.Endpoint, counter)
} else {
counter = val.(*atomic.Int32)
}
endpointStates = append(endpointStates, endpointState{
picker: child.State.Picker,
numRPCs: counter,
})
}

return &picker{
choiceCount: lrb.choiceCount,
subConns: scs,
}
lrb.ClientConn.UpdateState(balancer.State{
Picker: &picker{
choiceCount: lrb.choiceCount,
endpointStates: endpointStates,
},
ConnectivityState: connectivity.Ready,
})
}

type picker struct {
// choiceCount is the number of random SubConns to find the one with
// the least request.
choiceCount uint32
// Built out when receives list of ready RPCs.
subConns []scWithRPCCount
// Built out when receives list of ready child pickers.
endpointStates []endpointState
}

func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
var pickedSC *scWithRPCCount
var pickedSCNumRPCs int32
func (p *picker) Pick(pInfo balancer.PickInfo) (balancer.PickResult, error) {
var pickedEndpointState *endpointState
var pickedEndpointNumRPCs int32
for i := 0; i < int(p.choiceCount); i++ {
index := randuint32() % uint32(len(p.subConns))
sc := p.subConns[index]
n := sc.numRPCs.Load()
if pickedSC == nil || n < pickedSCNumRPCs {
pickedSC = &sc
pickedSCNumRPCs = n
index := randuint32() % uint32(len(p.endpointStates))
endpointState := p.endpointStates[index]
n := endpointState.numRPCs.Load()
if pickedEndpointState == nil || n < pickedEndpointNumRPCs {
pickedEndpointState = &endpointState
pickedEndpointNumRPCs = n
}
}
result, err := pickedEndpointState.picker.Pick(pInfo)
if err != nil {
return result, err
}

Check warning on line 243 in balancer/leastrequest/leastrequest.go

View check run for this annotation

Codecov / codecov/patch

balancer/leastrequest/leastrequest.go#L242-L243

Added lines #L242 - L243 were not covered by tests
// "The counter for a subchannel should be atomically incremented by one
// after it has been successfully picked by the picker." - A48
pickedSC.numRPCs.Add(1)
pickedEndpointState.numRPCs.Add(1)
// "the picker should add a callback for atomically decrementing the
// subchannel counter once the RPC finishes (regardless of Status code)." -
// A48.
done := func(balancer.DoneInfo) {
pickedSC.numRPCs.Add(-1)
originalDone := result.Done
result.Done = func(info balancer.DoneInfo) {
pickedEndpointState.numRPCs.Add(-1)
if originalDone != nil {
originalDone(info)
}

Check warning on line 255 in balancer/leastrequest/leastrequest.go

View check run for this annotation

Codecov / codecov/patch

balancer/leastrequest/leastrequest.go#L254-L255

Added lines #L254 - L255 were not covered by tests
}
return balancer.PickResult{
SubConn: pickedSC.sc,
Done: done,
}, nil
return result, nil
}

0 comments on commit 1fcd5db

Please sign in to comment.