diff --git a/go/pkg/balancer/BUILD.bazel b/go/pkg/balancer/BUILD.bazel index f02d95603..f43fdf804 100644 --- a/go/pkg/balancer/BUILD.bazel +++ b/go/pkg/balancer/BUILD.bazel @@ -3,32 +3,12 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "balancer", srcs = [ - "gcp_balancer.go", - "gcp_interceptor.go", - "gcp_picker.go", "roundrobin.go", ], importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer", visibility = ["//visibility:public"], deps = [ - "//go/pkg/balancer/proto", "@org_golang_google_grpc//:go_default_library", - "@org_golang_google_grpc//balancer", - "@org_golang_google_grpc//connectivity", - "@org_golang_google_grpc//grpclog", - "@org_golang_google_grpc//resolver", - "@org_golang_google_protobuf//encoding/protojson", ], ) -go_test( - name = "balancer_test", - srcs = ["gcp_balancer_test.go"], - embed = [":balancer"], - deps = [ - "@com_github_google_uuid//:uuid", - "@org_golang_google_grpc//balancer", - "@org_golang_google_grpc//connectivity", - "@org_golang_google_grpc//resolver", - ], -) diff --git a/go/pkg/balancer/README.md b/go/pkg/balancer/README.md deleted file mode 100644 index e3ee37a7d..000000000 --- a/go/pkg/balancer/README.md +++ /dev/null @@ -1,8 +0,0 @@ -# GRPC Balancer -This package is a forked version of https://github.com/GoogleCloudPlatform/grpc-gcp-go. - -We use this primarily to create new sub-connections when we reach -maximum number of streams (100 for GFE) on a given connection. - -Refer to https://github.com/grpc/grpc/issues/21386 for status on the long-term fix -for this issue. diff --git a/go/pkg/balancer/gcp_balancer.go b/go/pkg/balancer/gcp_balancer.go deleted file mode 100644 index 7d03801ba..000000000 --- a/go/pkg/balancer/gcp_balancer.go +++ /dev/null @@ -1,331 +0,0 @@ -// Package balancer is a forked version of https://github.com/GoogleCloudPlatform/grpc-gcp-go. -package balancer - -import ( - "sync" - "sync/atomic" - - "google.golang.org/grpc/balancer" - - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/resolver" -) - -const ( - // Name is the name of grpc_gcp balancer. - Name = "grpc_gcp" - - healthCheckEnabled = true -) - -var ( - // DefaultMinConnections is the default number of gRPC sub-connections the - // gRPC balancer should create during SDK initialization. - DefaultMinConnections = 5 - - // MinConnections is the minimum number of gRPC sub-connections the gRPC balancer - // should create during SDK initialization. - // It is initialized in flags package. - MinConnections = DefaultMinConnections -) - -func init() { - balancer.Register(newBuilder()) -} - -type gcpBalancerBuilder struct { - name string -} - -// Build returns a grpc balancer initialized with given build options. -func (bb *gcpBalancerBuilder) Build( - cc balancer.ClientConn, - opt balancer.BuildOptions, -) balancer.Balancer { - return &gcpBalancer{ - cc: cc, - affinityMap: make(map[string]balancer.SubConn), - scRefs: make(map[balancer.SubConn]*subConnRef), - scStates: make(map[balancer.SubConn]connectivity.State), - csEvltr: &connectivityStateEvaluator{}, - // Initialize picker to a picker that always return - // ErrNoSubConnAvailable, because when state of a SubConn changes, we - // may call UpdateState with this picker. - picker: newErrPicker(balancer.ErrNoSubConnAvailable), - } -} - -// Name returns the name of the balancer. -func (*gcpBalancerBuilder) Name() string { - return Name -} - -// newBuilder creates a new grpcgcp balancer builder. -func newBuilder() balancer.Builder { - return &gcpBalancerBuilder{ - name: Name, - } -} - -// connectivityStateEvaluator gets updated by addrConns when their -// states transition, based on which it evaluates the state of -// ClientConn. -type connectivityStateEvaluator struct { - numReady uint64 // Number of addrConns in ready state. - numConnecting uint64 // Number of addrConns in connecting state. - numTransientFailure uint64 // Number of addrConns in transientFailure. -} - -// recordTransition records state change happening in every subConn and based on -// that it evaluates what aggregated state should be. -// It can only transition between Ready, Connecting and TransientFailure. Other states, -// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection -// before any subConn is created ClientConn is in idle state. In the end when ClientConn -// closes it is in Shutdown state. -// -// recordTransition should only be called synchronously from the same goroutine. -func (cse *connectivityStateEvaluator) recordTransition( - oldState, - newState connectivity.State, -) connectivity.State { - // Update counters. - for idx, state := range []connectivity.State{oldState, newState} { - updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. - switch state { - case connectivity.Ready: - cse.numReady += updateVal - case connectivity.Connecting: - cse.numConnecting += updateVal - case connectivity.TransientFailure: - cse.numTransientFailure += updateVal - } - } - - // Evaluate. - if cse.numReady > 0 { - return connectivity.Ready - } - if cse.numConnecting > 0 { - return connectivity.Connecting - } - return connectivity.TransientFailure -} - -// subConnRef keeps reference to the real SubConn with its -// connectivity state, affinity count and streams count. -type subConnRef struct { - subConn balancer.SubConn - affinityCnt int32 // Keeps track of the number of keys bound to the subConn - streamsCnt int32 // Keeps track of the number of streams opened on the subConn -} - -func (ref *subConnRef) getAffinityCnt() int32 { - return atomic.LoadInt32(&ref.affinityCnt) -} - -func (ref *subConnRef) getStreamsCnt() int32 { - return atomic.LoadInt32(&ref.streamsCnt) -} - -func (ref *subConnRef) affinityIncr() { - atomic.AddInt32(&ref.affinityCnt, 1) -} - -func (ref *subConnRef) affinityDecr() { - atomic.AddInt32(&ref.affinityCnt, -1) -} - -func (ref *subConnRef) streamsIncr() { - atomic.AddInt32(&ref.streamsCnt, 1) -} - -func (ref *subConnRef) streamsDecr() { - atomic.AddInt32(&ref.streamsCnt, -1) -} - -type gcpBalancer struct { - balancer.Balancer // Embed V1 Balancer so it compiles with Builder - addrs []resolver.Address - cc balancer.ClientConn - csEvltr *connectivityStateEvaluator - state connectivity.State - - mu sync.RWMutex - affinityMap map[string]balancer.SubConn - scStates map[balancer.SubConn]connectivity.State - scRefs map[balancer.SubConn]*subConnRef - - picker balancer.Picker -} - -func (gb *gcpBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { - addrs := ccs.ResolverState.Addresses - gb.addrs = addrs - - createdSubCons := false - for len(gb.scRefs) < MinConnections { - gb.newSubConn() - createdSubCons = true - } - if createdSubCons { - return nil - } - - for _, scRef := range gb.scRefs { - scRef.subConn.UpdateAddresses(addrs) - scRef.subConn.Connect() - } - - return nil -} - -func (gb *gcpBalancer) ResolverError(err error) { - grpclog.Warningf( - "grpcgcp.gcpBalancer: ResolverError: %v", - err, - ) -} - -// check current connection pool size -func (gb *gcpBalancer) getConnectionPoolSize() int { - gb.mu.Lock() - defer gb.mu.Unlock() - return len(gb.scRefs) -} - -// newSubConn creates a new SubConn using cc.NewSubConn and initialize the subConnRef. -func (gb *gcpBalancer) newSubConn() { - gb.mu.Lock() - defer gb.mu.Unlock() - - // there are chances the newly created subconns are still connecting, - // we can wait on those new subconns. - for _, scState := range gb.scStates { - if scState == connectivity.Connecting { - return - } - } - - sc, err := gb.cc.NewSubConn( - gb.addrs, - balancer.NewSubConnOptions{HealthCheckEnabled: healthCheckEnabled}, - ) - if err != nil { - grpclog.Errorf("grpcgcp.gcpBalancer: failed to NewSubConn: %v", err) - return - } - gb.scRefs[sc] = &subConnRef{ - subConn: sc, - } - gb.scStates[sc] = connectivity.Idle - sc.Connect() -} - -// getReadySubConnRef returns a subConnRef and a bool. The bool indicates whether -// the boundKey exists in the affinityMap. If returned subConnRef is a nil, it -// means the underlying subconn is not READY yet. -func (gb *gcpBalancer) getReadySubConnRef(boundKey string) (*subConnRef, bool) { - gb.mu.RLock() - defer gb.mu.RUnlock() - - if sc, ok := gb.affinityMap[boundKey]; ok { - if gb.scStates[sc] != connectivity.Ready { - // It's possible that the bound subconn is not in the readySubConns list, - // If it's not ready yet, we throw ErrNoSubConnAvailable - return nil, true - } - return gb.scRefs[sc], true - } - return nil, false -} - -// bindSubConn binds the given affinity key to an existing subConnRef. -func (gb *gcpBalancer) bindSubConn(bindKey string, sc balancer.SubConn) { - gb.mu.Lock() - defer gb.mu.Unlock() - _, ok := gb.affinityMap[bindKey] - if !ok { - gb.affinityMap[bindKey] = sc - } - gb.scRefs[sc].affinityIncr() -} - -// unbindSubConn removes the existing binding associated with the key. -func (gb *gcpBalancer) unbindSubConn(boundKey string) { - gb.mu.Lock() - defer gb.mu.Unlock() - boundSC, ok := gb.affinityMap[boundKey] - if ok { - gb.scRefs[boundSC].affinityDecr() - if gb.scRefs[boundSC].getAffinityCnt() <= 0 { - delete(gb.affinityMap, boundKey) - } - } -} - -// regeneratePicker takes a snapshot of the balancer, and generates a picker -// from it. The picker is -// - errPicker with ErrTransientFailure if the balancer is in TransientFailure, -// - built by the pickerBuilder with all READY SubConns otherwise. -func (gb *gcpBalancer) regeneratePicker() { - gb.mu.RLock() - defer gb.mu.RUnlock() - - if gb.state == connectivity.TransientFailure { - gb.picker = newErrPicker(balancer.ErrTransientFailure) - return - } - readyRefs := []*subConnRef{} - - // Select ready subConns from subConn map. - for sc, scState := range gb.scStates { - if scState == connectivity.Ready { - readyRefs = append(readyRefs, gb.scRefs[sc]) - } - } - gb.picker = newGCPPicker(readyRefs, gb) -} - -func (gb *gcpBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) { - s := scs.ConnectivityState - grpclog.Infof("grpcgcp.gcpBalancer: handle SubConn state change: %p, %v", sc, s) - - gb.mu.Lock() - oldS, ok := gb.scStates[sc] - if !ok { - grpclog.Infof( - "grpcgcp.gcpBalancer: got state changes for an unknown SubConn: %p, %v", - sc, - s, - ) - gb.mu.Unlock() - return - } - gb.scStates[sc] = s - switch s { - case connectivity.Idle: - sc.Connect() - case connectivity.Shutdown: - delete(gb.scRefs, sc) - delete(gb.scStates, sc) - } - gb.mu.Unlock() - - oldAggrState := gb.state - gb.state = gb.csEvltr.recordTransition(oldS, s) - - // Regenerate picker when one of the following happens: - // - this sc became ready from not-ready - // - this sc became not-ready from ready - // - the aggregated state of balancer became TransientFailure from non-TransientFailure - // - the aggregated state of balancer became non-TransientFailure from TransientFailure - if (s == connectivity.Ready) != (oldS == connectivity.Ready) || - (gb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) { - gb.regeneratePicker() - gb.cc.UpdateState(balancer.State{ConnectivityState: gb.state, Picker: gb.picker}) - } -} - -func (gb *gcpBalancer) Close() { -} diff --git a/go/pkg/balancer/gcp_balancer_test.go b/go/pkg/balancer/gcp_balancer_test.go deleted file mode 100644 index 8ab541145..000000000 --- a/go/pkg/balancer/gcp_balancer_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package balancer - -import ( - "sync" - "testing" - "time" - - "github.com/google/uuid" - - grpcbalancer "google.golang.org/grpc/balancer" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/resolver" -) - -func TestGCPBalancer_UpdatingConnectionStateIsMutuallyExclusive(t *testing.T) { - builder := newBuilder() - var subconns []*fakeSubConn - for i := 0; i < MinConnections; i++ { - subconns = append(subconns, &fakeSubConn{id: uuid.New().String()}) - } - - cc := fakeClientConn{subconns: subconns} - balancer := builder.Build(cc, grpcbalancer.BuildOptions{}) - var wg sync.WaitGroup - wg.Add(2) - - // Invoke both UpdateClientConnState() and UpdatesubConnState() simultaneously - // and make sure that it doesn't result in a race condition. The test would fail - // if there's a race condition. - go func() { - balancer.UpdateClientConnState(grpcbalancer.ClientConnState{}) - wg.Done() - }() - go func() { - for i := 0; i < MinConnections; i++ { - // Sending a connectivity.Ready state will actually trigger regenerating the picker - // which is where we expect the race condition to occur. - balancer.UpdateSubConnState(subconns[i], grpcbalancer.SubConnState{ConnectivityState: connectivity.Ready}) - } - wg.Done() - }() - wg.Wait() -} - -type fakeClientConn struct { - grpcbalancer.ClientConn - - subconns []*fakeSubConn -} - -var ( - // Note: These cannot be moved to fakeClientConn since gRPC Builder() interface - // does not accept a pointer to fakeClientConn and so the mutex will be copied if - // passed by value. - // https://godoc.org/google.golang.org/grpc/balancer#Builder - idx int - mu sync.Mutex -) - -func (f fakeClientConn) NewSubConn([]resolver.Address, grpcbalancer.NewSubConnOptions) (grpcbalancer.SubConn, error) { - mu.Lock() - defer mu.Unlock() - idx++ - idx = idx % MinConnections - return f.subconns[idx], nil -} - -func (f fakeClientConn) RemoveSubConn(grpcbalancer.SubConn) {} - -func (f fakeClientConn) UpdateState(grpcbalancer.State) {} - -func (f fakeClientConn) ResolveNow(resolver.ResolveNowOptions) {} - -func (f fakeClientConn) Target() string { - return "" -} - -type fakeSubConn struct { - id string -} - -func (fakeSubConn) UpdateAddresses([]resolver.Address) {} - -func (fakeSubConn) Connect() { - // Sleep to simulate connecting to an actual server. - time.Sleep(100 * time.Millisecond) -} - -func (fakeSubConn) GetOrBuildProducer(grpcbalancer.ProducerBuilder) (grpcbalancer.Producer, func()) { - return nil, nil -} - -func (fakeSubConn) Shutdown() {} diff --git a/go/pkg/balancer/gcp_interceptor.go b/go/pkg/balancer/gcp_interceptor.go deleted file mode 100644 index dcc131b35..000000000 --- a/go/pkg/balancer/gcp_interceptor.go +++ /dev/null @@ -1,215 +0,0 @@ -package balancer - -import ( - "context" - "os" - "sync" - - pb "github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer/proto" - "google.golang.org/grpc" - "google.golang.org/protobuf/encoding/protojson" -) - -const ( - // Default max number of connections is 0, meaning "no limit" - defaultMaxConn = 0 - - // Default max stream watermark is 100, which is the current stream limit for GFE. - // Any value >100 will be rounded down to 100. - defaultMaxStream = 100 -) - -type key int - -var gcpKey key - -type poolConfig struct { - maxConn uint32 - maxStream uint32 -} - -type gcpContext struct { - affinityCfg *pb.AffinityConfig - poolCfg *poolConfig - // request message used for pre-process of an affinity call - reqMsg interface{} - // response message used for post-process of an affinity call - replyMsg interface{} -} - -// GCPInterceptor provides functions for intercepting client requests -// in order to support GCP specific features -type GCPInterceptor struct { - poolCfg *poolConfig - - // Maps method path to AffinityConfig - methodToAffinity map[string]*pb.AffinityConfig -} - -// NewGCPInterceptor creates a new GCPInterceptor with a given ApiConfig -func NewGCPInterceptor(config *pb.ApiConfig) *GCPInterceptor { - mp := make(map[string]*pb.AffinityConfig) - methodCfgs := config.GetMethod() - for _, methodCfg := range methodCfgs { - methodNames := methodCfg.GetName() - affinityCfg := methodCfg.GetAffinity() - if methodNames != nil && affinityCfg != nil { - for _, method := range methodNames { - mp[method] = affinityCfg - } - } - } - - poolCfg := &poolConfig{ - maxConn: defaultMaxConn, - maxStream: defaultMaxStream, - } - - userPoolCfg := config.GetChannelPool() - - // Set user defined MaxSize. - poolCfg.maxConn = userPoolCfg.GetMaxSize() - - // Set user defined MaxConcurrentStreamsLowWatermark if ranged in [1, defaultMaxStream], - // otherwise use the defaultMaxStream. - watermarkValue := userPoolCfg.GetMaxConcurrentStreamsLowWatermark() - if watermarkValue >= 1 && watermarkValue <= defaultMaxStream { - poolCfg.maxStream = watermarkValue - } - return &GCPInterceptor{ - poolCfg: poolCfg, - methodToAffinity: mp, - } -} - -// GCPUnaryClientInterceptor intercepts the execution of a unary RPC -// and injects necessary information to be used by the picker. -func (gcpInt *GCPInterceptor) GCPUnaryClientInterceptor( - ctx context.Context, - method string, - req interface{}, - reply interface{}, - cc *grpc.ClientConn, - invoker grpc.UnaryInvoker, - opts ...grpc.CallOption, -) error { - affinityCfg, _ := gcpInt.methodToAffinity[method] - gcpCtx := &gcpContext{ - affinityCfg: affinityCfg, - reqMsg: req, - replyMsg: reply, - poolCfg: gcpInt.poolCfg, - } - ctx = context.WithValue(ctx, gcpKey, gcpCtx) - - return invoker(ctx, method, req, reply, cc, opts...) -} - -// GCPStreamClientInterceptor intercepts the execution of a client streaming RPC -// and injects necessary information to be used by the picker. -func (gcpInt *GCPInterceptor) GCPStreamClientInterceptor( - ctx context.Context, - desc *grpc.StreamDesc, - cc *grpc.ClientConn, - method string, - streamer grpc.Streamer, - opts ...grpc.CallOption, -) (grpc.ClientStream, error) { - // This constructor does not create a real ClientStream, - // it only stores all parameters and let SendMsg() to create ClientStream. - affinityCfg, _ := gcpInt.methodToAffinity[method] - gcpCtx := &gcpContext{ - affinityCfg: affinityCfg, - poolCfg: gcpInt.poolCfg, - } - ctx = context.WithValue(ctx, gcpKey, gcpCtx) - cs := &gcpClientStream{ - gcpInt: gcpInt, - ctx: ctx, - desc: desc, - cc: cc, - method: method, - streamer: streamer, - opts: opts, - } - cs.cond = sync.NewCond(cs) - return cs, nil -} - -type gcpClientStream struct { - sync.Mutex - grpc.ClientStream - - cond *sync.Cond - initStreamErr error - gcpInt *GCPInterceptor - ctx context.Context - desc *grpc.StreamDesc - cc *grpc.ClientConn - method string - streamer grpc.Streamer - opts []grpc.CallOption -} - -func (cs *gcpClientStream) SendMsg(m interface{}) error { - cs.Lock() - // Initialize underlying ClientStream when getting the first request. - if cs.ClientStream == nil { - affinityCfg, ok := cs.gcpInt.methodToAffinity[cs.method] - ctx := cs.ctx - if ok { - gcpCtx := &gcpContext{ - affinityCfg: affinityCfg, - reqMsg: m, - poolCfg: cs.gcpInt.poolCfg, - } - ctx = context.WithValue(cs.ctx, gcpKey, gcpCtx) - } - realCS, err := cs.streamer(ctx, cs.desc, cs.cc, cs.method, cs.opts...) - if err != nil { - cs.initStreamErr = err - cs.Unlock() - cs.cond.Broadcast() - return err - } - cs.ClientStream = realCS - } - cs.Unlock() - cs.cond.Broadcast() - return cs.ClientStream.SendMsg(m) -} - -func (cs *gcpClientStream) RecvMsg(m interface{}) error { - // If RecvMsg is called before SendMsg, it should wait until cs.ClientStream - // is initialized or the initialization failed. - cs.Lock() - for cs.initStreamErr == nil && cs.ClientStream == nil { - cs.cond.Wait() - } - if cs.initStreamErr != nil { - cs.Unlock() - return cs.initStreamErr - } - cs.Unlock() - return cs.ClientStream.RecvMsg(m) -} - -func (cs *gcpClientStream) CloseSend() error { - cs.Lock() - defer cs.Unlock() - if cs.ClientStream != nil { - return cs.ClientStream.CloseSend() - } - return nil -} - -// ParseAPIConfig parses a json config file into ApiConfig proto message. -func ParseAPIConfig(path string) (*pb.ApiConfig, error) { - jsonFile, err := os.ReadFile(path) - if err != nil { - return nil, err - } - result := &pb.ApiConfig{} - protojson.Unmarshal(jsonFile, result) - return result, nil -} diff --git a/go/pkg/balancer/gcp_picker.go b/go/pkg/balancer/gcp_picker.go deleted file mode 100644 index f3031e90c..000000000 --- a/go/pkg/balancer/gcp_picker.go +++ /dev/null @@ -1,168 +0,0 @@ -package balancer - -import ( - "fmt" - "reflect" - "sort" - "strings" - "sync" - - pb "github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer/proto" - "google.golang.org/grpc/balancer" -) - -func newGCPPicker(readySCRefs []*subConnRef, gb *gcpBalancer) balancer.Picker { - return &gcpPicker{ - gcpBalancer: gb, - scRefs: readySCRefs, - poolCfg: nil, - } -} - -type gcpPicker struct { - gcpBalancer *gcpBalancer - mu sync.Mutex - scRefs []*subConnRef - poolCfg *poolConfig -} - -// Pick picks the appropriate subconnection. -func (p *gcpPicker) Pick(info balancer.PickInfo) (result balancer.PickResult, err error) { - if len(p.scRefs) <= 0 { - return balancer.PickResult{}, balancer.ErrNoSubConnAvailable - } - - p.mu.Lock() - defer p.mu.Unlock() - - gcpCtx, hasGcpCtx := info.Ctx.Value(gcpKey).(*gcpContext) - boundKey := "" - - if hasGcpCtx { - if p.poolCfg == nil { - // Initialize poolConfig for picker. - p.poolCfg = gcpCtx.poolCfg - } - affinity := gcpCtx.affinityCfg - if affinity != nil { - locator := affinity.GetAffinityKey() - cmd := affinity.GetCommand() - if cmd == pb.AffinityConfig_BOUND || cmd == pb.AffinityConfig_UNBIND { - a, err := getAffinityKeyFromMessage(locator, gcpCtx.reqMsg) - if err != nil { - return balancer.PickResult{}, fmt.Errorf( - "failed to retrieve affinity key from request message: %v", err) - } - boundKey = a - } - } - } - - var scRef *subConnRef - scRef, err = p.getSubConnRef(boundKey) - if err != nil { - return balancer.PickResult{}, err - } - result.SubConn = scRef.subConn - scRef.streamsIncr() - - // define callback for post process once call is done - result.Done = func(info balancer.DoneInfo) { - if info.Err == nil { - if hasGcpCtx { - affinity := gcpCtx.affinityCfg - locator := affinity.GetAffinityKey() - cmd := affinity.GetCommand() - if cmd == pb.AffinityConfig_BIND { - bindKey, err := getAffinityKeyFromMessage(locator, gcpCtx.replyMsg) - if err == nil { - p.gcpBalancer.bindSubConn(bindKey, scRef.subConn) - } - } else if cmd == pb.AffinityConfig_UNBIND { - p.gcpBalancer.unbindSubConn(boundKey) - } - } - } - scRef.streamsDecr() - } - return result, err -} - -// getSubConnRef returns the subConnRef object that contains the subconn -// ready to be used by picker. -func (p *gcpPicker) getSubConnRef(boundKey string) (*subConnRef, error) { - if boundKey != "" { - if ref, ok := p.gcpBalancer.getReadySubConnRef(boundKey); ok { - return ref, nil - } - } - - sort.Slice(p.scRefs, func(i, j int) bool { - return p.scRefs[i].getStreamsCnt() < p.scRefs[j].getStreamsCnt() - }) - - // If the least busy connection still has capacity, use it - if len(p.scRefs) > 0 && p.scRefs[0].getStreamsCnt() < int32(p.poolCfg.maxStream) { - return p.scRefs[0], nil - } - - if p.poolCfg.maxConn == 0 || p.gcpBalancer.getConnectionPoolSize() < int(p.poolCfg.maxConn) { - // Ask balancer to create new subconn when all current subconns are busy and - // the connection pool still has capacity (either unlimited or maxSize is not reached). - p.gcpBalancer.newSubConn() - - // Let this picker return ErrNoSubConnAvailable because it needs some time - // for the subconn to be READY. - return nil, balancer.ErrNoSubConnAvailable - } - - if len(p.scRefs) == 0 { - return nil, balancer.ErrNoSubConnAvailable - } - - // If no capacity for the pool size and every connection reachs the soft limit, - // Then picks the least busy one anyway. - return p.scRefs[0], nil -} - -// getAffinityKeyFromMessage retrieves the affinity key from proto message using -// the key locator defined in the affinity config. -func getAffinityKeyFromMessage( - locator string, - msg interface{}, -) (affinityKey string, err error) { - names := strings.Split(locator, ".") - if len(names) == 0 { - return "", fmt.Errorf("Empty affinityKey locator") - } - - val := reflect.ValueOf(msg).Elem() - - // Fields in names except for the last one. - for _, name := range names[:len(names)-1] { - valField := val.FieldByName(strings.Title(name)) - if valField.Kind() != reflect.Ptr && valField.Kind() != reflect.Struct { - return "", fmt.Errorf("Invalid locator path for %v", locator) - } - val = valField.Elem() - } - - valField := val.FieldByName(strings.Title(names[len(names)-1])) - if valField.Kind() != reflect.String { - return "", fmt.Errorf("Cannot get string value from %v", locator) - } - return valField.String(), nil -} - -// NewErrPicker returns a picker that always returns err on Pick(). -func newErrPicker(err error) balancer.Picker { - return &errPicker{err: err} -} - -type errPicker struct { - err error // Pick() always returns this err. -} - -func (p *errPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - return balancer.PickResult{}, p.err -} diff --git a/go/pkg/balancer/proto/BUILD.bazel b/go/pkg/balancer/proto/BUILD.bazel deleted file mode 100644 index b6e74fd7c..000000000 --- a/go/pkg/balancer/proto/BUILD.bazel +++ /dev/null @@ -1,23 +0,0 @@ -load("@rules_proto//proto:defs.bzl", "proto_library") -load("@io_bazel_rules_go//go:def.bzl", "go_library") -load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") - -go_library( - name = "proto", - embed = [":grpcbalancer_go_proto"], - importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer/proto", - visibility = ["//visibility:public"], -) - -proto_library( - name = "grpcbalancer_proto", - srcs = ["grpcbalancer.proto"], - visibility = ["//visibility:public"], -) - -go_proto_library( - name = "grpcbalancer_go_proto", - importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer/proto", - proto = ":grpcbalancer_proto", - visibility = ["//visibility:public"], -) diff --git a/go/pkg/balancer/proto/grpcbalancer.pb.go b/go/pkg/balancer/proto/grpcbalancer.pb.go deleted file mode 100755 index 5ee5eece9..000000000 --- a/go/pkg/balancer/proto/grpcbalancer.pb.go +++ /dev/null @@ -1,451 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.27.1 -// protoc v3.17.0 -// source: go/pkg/balancer/proto/grpcbalancer.proto - -package proto - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type AffinityConfig_Command int32 - -const ( - AffinityConfig_BOUND AffinityConfig_Command = 0 - AffinityConfig_BIND AffinityConfig_Command = 1 - AffinityConfig_UNBIND AffinityConfig_Command = 2 -) - -// Enum value maps for AffinityConfig_Command. -var ( - AffinityConfig_Command_name = map[int32]string{ - 0: "BOUND", - 1: "BIND", - 2: "UNBIND", - } - AffinityConfig_Command_value = map[string]int32{ - "BOUND": 0, - "BIND": 1, - "UNBIND": 2, - } -) - -func (x AffinityConfig_Command) Enum() *AffinityConfig_Command { - p := new(AffinityConfig_Command) - *p = x - return p -} - -func (x AffinityConfig_Command) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (AffinityConfig_Command) Descriptor() protoreflect.EnumDescriptor { - return file_go_pkg_balancer_proto_grpcbalancer_proto_enumTypes[0].Descriptor() -} - -func (AffinityConfig_Command) Type() protoreflect.EnumType { - return &file_go_pkg_balancer_proto_grpcbalancer_proto_enumTypes[0] -} - -func (x AffinityConfig_Command) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use AffinityConfig_Command.Descriptor instead. -func (AffinityConfig_Command) EnumDescriptor() ([]byte, []int) { - return file_go_pkg_balancer_proto_grpcbalancer_proto_rawDescGZIP(), []int{3, 0} -} - -type ApiConfig struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ChannelPool *ChannelPoolConfig `protobuf:"bytes,2,opt,name=channel_pool,json=channelPool,proto3" json:"channel_pool,omitempty"` - Method []*MethodConfig `protobuf:"bytes,1001,rep,name=method,proto3" json:"method,omitempty"` -} - -func (x *ApiConfig) Reset() { - *x = ApiConfig{} - if protoimpl.UnsafeEnabled { - mi := &file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ApiConfig) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ApiConfig) ProtoMessage() {} - -func (x *ApiConfig) ProtoReflect() protoreflect.Message { - mi := &file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ApiConfig.ProtoReflect.Descriptor instead. -func (*ApiConfig) Descriptor() ([]byte, []int) { - return file_go_pkg_balancer_proto_grpcbalancer_proto_rawDescGZIP(), []int{0} -} - -func (x *ApiConfig) GetChannelPool() *ChannelPoolConfig { - if x != nil { - return x.ChannelPool - } - return nil -} - -func (x *ApiConfig) GetMethod() []*MethodConfig { - if x != nil { - return x.Method - } - return nil -} - -type ChannelPoolConfig struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - MaxSize uint32 `protobuf:"varint,1,opt,name=max_size,json=maxSize,proto3" json:"max_size,omitempty"` - IdleTimeout uint64 `protobuf:"varint,2,opt,name=idle_timeout,json=idleTimeout,proto3" json:"idle_timeout,omitempty"` - MaxConcurrentStreamsLowWatermark uint32 `protobuf:"varint,3,opt,name=max_concurrent_streams_low_watermark,json=maxConcurrentStreamsLowWatermark,proto3" json:"max_concurrent_streams_low_watermark,omitempty"` -} - -func (x *ChannelPoolConfig) Reset() { - *x = ChannelPoolConfig{} - if protoimpl.UnsafeEnabled { - mi := &file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ChannelPoolConfig) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ChannelPoolConfig) ProtoMessage() {} - -func (x *ChannelPoolConfig) ProtoReflect() protoreflect.Message { - mi := &file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ChannelPoolConfig.ProtoReflect.Descriptor instead. -func (*ChannelPoolConfig) Descriptor() ([]byte, []int) { - return file_go_pkg_balancer_proto_grpcbalancer_proto_rawDescGZIP(), []int{1} -} - -func (x *ChannelPoolConfig) GetMaxSize() uint32 { - if x != nil { - return x.MaxSize - } - return 0 -} - -func (x *ChannelPoolConfig) GetIdleTimeout() uint64 { - if x != nil { - return x.IdleTimeout - } - return 0 -} - -func (x *ChannelPoolConfig) GetMaxConcurrentStreamsLowWatermark() uint32 { - if x != nil { - return x.MaxConcurrentStreamsLowWatermark - } - return 0 -} - -type MethodConfig struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Name []string `protobuf:"bytes,1,rep,name=name,proto3" json:"name,omitempty"` - Affinity *AffinityConfig `protobuf:"bytes,1001,opt,name=affinity,proto3" json:"affinity,omitempty"` -} - -func (x *MethodConfig) Reset() { - *x = MethodConfig{} - if protoimpl.UnsafeEnabled { - mi := &file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *MethodConfig) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*MethodConfig) ProtoMessage() {} - -func (x *MethodConfig) ProtoReflect() protoreflect.Message { - mi := &file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use MethodConfig.ProtoReflect.Descriptor instead. -func (*MethodConfig) Descriptor() ([]byte, []int) { - return file_go_pkg_balancer_proto_grpcbalancer_proto_rawDescGZIP(), []int{2} -} - -func (x *MethodConfig) GetName() []string { - if x != nil { - return x.Name - } - return nil -} - -func (x *MethodConfig) GetAffinity() *AffinityConfig { - if x != nil { - return x.Affinity - } - return nil -} - -type AffinityConfig struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Command AffinityConfig_Command `protobuf:"varint,2,opt,name=command,proto3,enum=grpcbalancer.AffinityConfig_Command" json:"command,omitempty"` - AffinityKey string `protobuf:"bytes,3,opt,name=affinity_key,json=affinityKey,proto3" json:"affinity_key,omitempty"` -} - -func (x *AffinityConfig) Reset() { - *x = AffinityConfig{} - if protoimpl.UnsafeEnabled { - mi := &file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *AffinityConfig) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*AffinityConfig) ProtoMessage() {} - -func (x *AffinityConfig) ProtoReflect() protoreflect.Message { - mi := &file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use AffinityConfig.ProtoReflect.Descriptor instead. -func (*AffinityConfig) Descriptor() ([]byte, []int) { - return file_go_pkg_balancer_proto_grpcbalancer_proto_rawDescGZIP(), []int{3} -} - -func (x *AffinityConfig) GetCommand() AffinityConfig_Command { - if x != nil { - return x.Command - } - return AffinityConfig_BOUND -} - -func (x *AffinityConfig) GetAffinityKey() string { - if x != nil { - return x.AffinityKey - } - return "" -} - -var File_go_pkg_balancer_proto_grpcbalancer_proto protoreflect.FileDescriptor - -var file_go_pkg_balancer_proto_grpcbalancer_proto_rawDesc = []byte{ - 0x0a, 0x28, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, - 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x61, 0x6c, 0x61, - 0x6e, 0x63, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x67, 0x72, 0x70, 0x63, - 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x72, 0x22, 0x84, 0x01, 0x0a, 0x09, 0x41, 0x70, 0x69, - 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x42, 0x0a, 0x0c, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, - 0x6c, 0x5f, 0x70, 0x6f, 0x6f, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x67, - 0x72, 0x70, 0x63, 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x72, 0x2e, 0x43, 0x68, 0x61, 0x6e, - 0x6e, 0x65, 0x6c, 0x50, 0x6f, 0x6f, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0b, 0x63, - 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x50, 0x6f, 0x6f, 0x6c, 0x12, 0x33, 0x0a, 0x06, 0x6d, 0x65, - 0x74, 0x68, 0x6f, 0x64, 0x18, 0xe9, 0x07, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x72, - 0x70, 0x63, 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x72, 0x2e, 0x4d, 0x65, 0x74, 0x68, 0x6f, - 0x64, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x22, - 0xa1, 0x01, 0x0a, 0x11, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x50, 0x6f, 0x6f, 0x6c, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x19, 0x0a, 0x08, 0x6d, 0x61, 0x78, 0x5f, 0x73, 0x69, 0x7a, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x6d, 0x61, 0x78, 0x53, 0x69, 0x7a, 0x65, - 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x64, 0x6c, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x69, 0x64, 0x6c, 0x65, 0x54, 0x69, 0x6d, 0x65, - 0x6f, 0x75, 0x74, 0x12, 0x4e, 0x0a, 0x24, 0x6d, 0x61, 0x78, 0x5f, 0x63, 0x6f, 0x6e, 0x63, 0x75, - 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x5f, 0x6c, 0x6f, - 0x77, 0x5f, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x0d, 0x52, 0x20, 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, - 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x4c, 0x6f, 0x77, 0x57, 0x61, 0x74, 0x65, 0x72, 0x6d, - 0x61, 0x72, 0x6b, 0x22, 0x5d, 0x0a, 0x0c, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x43, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, - 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x39, 0x0a, 0x08, 0x61, 0x66, 0x66, 0x69, 0x6e, - 0x69, 0x74, 0x79, 0x18, 0xe9, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x67, 0x72, 0x70, - 0x63, 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x72, 0x2e, 0x41, 0x66, 0x66, 0x69, 0x6e, 0x69, - 0x74, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x08, 0x61, 0x66, 0x66, 0x69, 0x6e, 0x69, - 0x74, 0x79, 0x22, 0x9f, 0x01, 0x0a, 0x0e, 0x41, 0x66, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x79, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3e, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x24, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x62, 0x61, 0x6c, - 0x61, 0x6e, 0x63, 0x65, 0x72, 0x2e, 0x41, 0x66, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x79, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x07, 0x63, 0x6f, - 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x66, 0x66, 0x69, 0x6e, 0x69, 0x74, - 0x79, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x66, 0x66, - 0x69, 0x6e, 0x69, 0x74, 0x79, 0x4b, 0x65, 0x79, 0x22, 0x2a, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, - 0x61, 0x6e, 0x64, 0x12, 0x09, 0x0a, 0x05, 0x42, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x08, - 0x0a, 0x04, 0x42, 0x49, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x55, 0x4e, 0x42, 0x49, - 0x4e, 0x44, 0x10, 0x02, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_go_pkg_balancer_proto_grpcbalancer_proto_rawDescOnce sync.Once - file_go_pkg_balancer_proto_grpcbalancer_proto_rawDescData = file_go_pkg_balancer_proto_grpcbalancer_proto_rawDesc -) - -func file_go_pkg_balancer_proto_grpcbalancer_proto_rawDescGZIP() []byte { - file_go_pkg_balancer_proto_grpcbalancer_proto_rawDescOnce.Do(func() { - file_go_pkg_balancer_proto_grpcbalancer_proto_rawDescData = protoimpl.X.CompressGZIP(file_go_pkg_balancer_proto_grpcbalancer_proto_rawDescData) - }) - return file_go_pkg_balancer_proto_grpcbalancer_proto_rawDescData -} - -var file_go_pkg_balancer_proto_grpcbalancer_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes = make([]protoimpl.MessageInfo, 4) -var file_go_pkg_balancer_proto_grpcbalancer_proto_goTypes = []interface{}{ - (AffinityConfig_Command)(0), // 0: grpcbalancer.AffinityConfig.Command - (*ApiConfig)(nil), // 1: grpcbalancer.ApiConfig - (*ChannelPoolConfig)(nil), // 2: grpcbalancer.ChannelPoolConfig - (*MethodConfig)(nil), // 3: grpcbalancer.MethodConfig - (*AffinityConfig)(nil), // 4: grpcbalancer.AffinityConfig -} -var file_go_pkg_balancer_proto_grpcbalancer_proto_depIdxs = []int32{ - 2, // 0: grpcbalancer.ApiConfig.channel_pool:type_name -> grpcbalancer.ChannelPoolConfig - 3, // 1: grpcbalancer.ApiConfig.method:type_name -> grpcbalancer.MethodConfig - 4, // 2: grpcbalancer.MethodConfig.affinity:type_name -> grpcbalancer.AffinityConfig - 0, // 3: grpcbalancer.AffinityConfig.command:type_name -> grpcbalancer.AffinityConfig.Command - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name -} - -func init() { file_go_pkg_balancer_proto_grpcbalancer_proto_init() } -func file_go_pkg_balancer_proto_grpcbalancer_proto_init() { - if File_go_pkg_balancer_proto_grpcbalancer_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ApiConfig); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ChannelPoolConfig); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*MethodConfig); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*AffinityConfig); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_go_pkg_balancer_proto_grpcbalancer_proto_rawDesc, - NumEnums: 1, - NumMessages: 4, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_go_pkg_balancer_proto_grpcbalancer_proto_goTypes, - DependencyIndexes: file_go_pkg_balancer_proto_grpcbalancer_proto_depIdxs, - EnumInfos: file_go_pkg_balancer_proto_grpcbalancer_proto_enumTypes, - MessageInfos: file_go_pkg_balancer_proto_grpcbalancer_proto_msgTypes, - }.Build() - File_go_pkg_balancer_proto_grpcbalancer_proto = out.File - file_go_pkg_balancer_proto_grpcbalancer_proto_rawDesc = nil - file_go_pkg_balancer_proto_grpcbalancer_proto_goTypes = nil - file_go_pkg_balancer_proto_grpcbalancer_proto_depIdxs = nil -} diff --git a/go/pkg/balancer/proto/grpcbalancer.proto b/go/pkg/balancer/proto/grpcbalancer.proto deleted file mode 100644 index e5f1fcdae..000000000 --- a/go/pkg/balancer/proto/grpcbalancer.proto +++ /dev/null @@ -1,66 +0,0 @@ -syntax = "proto3"; - -package grpcbalancer; - -message ApiConfig { - // The channel pool configurations. - ChannelPoolConfig channel_pool = 2; - - // The method configurations. - repeated MethodConfig method = 1001; -} - -// ChannelPoolConfig are options for configuring the channel pool. -// RPCs will be scheduled onto existing channels in the pool until all channels -// have number of streams. At this point -// a new channel is spun out. Once channels have been spun out and -// each has streams, subsequent RPCs will -// hang until any of the in-flight RPCs is finished, freeing up a channel. -message ChannelPoolConfig { - // The max number of channels in the pool. - // Default value is 0, meaning 'unlimited' size. - uint32 max_size = 1; - - // The idle timeout (seconds) of channels without bound affinity sessions. - uint64 idle_timeout = 2; - - // The low watermark of max number of concurrent streams in a channel. - // New channel will be created once it get hit, until we reach the max size of the channel pool. - // Default value is 100. The valid range is [1, 100]. Any value outside the range will be ignored and the default value will be used. - // Note: It is not recommended that users adjust this value, since a single channel should generally have no trouble managing the default (maximum) number of streams. - uint32 max_concurrent_streams_low_watermark = 3; -} - -message MethodConfig { - // A fully qualified name of a gRPC method, or a wildcard pattern ending - // with .*, such as foo.bar.A, foo.bar.*. Method configs are evaluated - // sequentially, and the first one takes precedence. - repeated string name = 1; - - // The channel affinity configurations. - AffinityConfig affinity = 1001; -} - -message AffinityConfig { - enum Command { - // The annotated method will be required to be bound to an existing session - // to execute the RPC. The corresponding will be - // used to find the affinity key from the request message. - BOUND = 0; - // The annotated method will establish the channel affinity with the - // channel which is used to execute the RPC. The corresponding - // will be used to find the affinity key from the - // response message. - BIND = 1; - // The annotated method will remove the channel affinity with the - // channel which is used to execute the RPC. The corresponding - // will be used to find the affinity key from the - // request message. - UNBIND = 2; - } - // The affinity command applies on the selected gRPC methods. - Command command = 2; - // The field path of the affinity key in the request/response message. - // For example: "f.a", "f.b.d", etc. - string affinity_key = 3; -} diff --git a/go/pkg/balancer/roundrobin.go b/go/pkg/balancer/roundrobin.go index 9c8d0d753..579630a48 100644 --- a/go/pkg/balancer/roundrobin.go +++ b/go/pkg/balancer/roundrobin.go @@ -1,3 +1,6 @@ +// Package balancer implements a simple gRPC client-side load balancer. +// It mitigates https://github.com/grpc/grpc/issues/21386 by having multiple connections +// which effectively multplies the concurrent streams limit. package balancer import ( diff --git a/go/pkg/client/BUILD.bazel b/go/pkg/client/BUILD.bazel index 8bc11a69d..d7412a6f8 100644 --- a/go/pkg/client/BUILD.bazel +++ b/go/pkg/client/BUILD.bazel @@ -19,7 +19,6 @@ go_library( "//go/api/command", "//go/pkg/actas", "//go/pkg/balancer", - "//go/pkg/balancer/proto", "//go/pkg/chunker", "//go/pkg/command", "//go/pkg/contextmd", diff --git a/go/pkg/client/client.go b/go/pkg/client/client.go index f99f25858..f0fc8dab7 100644 --- a/go/pkg/client/client.go +++ b/go/pkg/client/client.go @@ -29,7 +29,6 @@ import ( "google.golang.org/grpc/status" // Redundant imports are required for the google3 mirror. Aliases should not be changed. - configpb "github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer/proto" regrpc "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" log "github.com/golang/glog" @@ -428,10 +427,6 @@ const DefaultCASConcurrency = 500 // that the GRPC balancer can perform. const DefaultMaxConcurrentRequests = 25 -// DefaultMaxConcurrentStreams specifies the default threshold value at which the GRPC balancer should create -// new sub-connections. -const DefaultMaxConcurrentStreams = 25 - // Apply sets the CASConcurrency flag on a client. func (cy CASConcurrency) Apply(c *Client) { c.casConcurrency = int64(cy) @@ -547,9 +542,6 @@ type DialParams struct { // MaxConcurrentRequests specifies the maximum number of concurrent RPCs on a single connection. MaxConcurrentRequests uint32 - // MaxConcurrentStreams specifies the maximum number of concurrent stream RPCs on a single connection. - MaxConcurrentStreams uint32 - // TLSClientAuthCert specifies the public key in PEM format for using mTLS auth to connect to the RBE service. // // If this is specified, TLSClientAuthKey must also be specified. @@ -559,31 +551,6 @@ type DialParams struct { // // If this is specified, TLSClientAuthCert must also be specified. TLSClientAuthKey string - - // RoundRobinBalancer enables the simplified gRPC balancer instead of the default one. - RoundRobinBalancer bool - - // RoundRobinPoolSize specifies the pool size for the round-robin load balancer. - RoundRobinPoolSize int -} - -func createGRPCInterceptor(p DialParams) *balancer.GCPInterceptor { - apiConfig := &configpb.ApiConfig{ - ChannelPool: &configpb.ChannelPoolConfig{ - MaxSize: p.MaxConcurrentRequests, - MaxConcurrentStreamsLowWatermark: p.MaxConcurrentStreams, - }, - Method: []*configpb.MethodConfig{ - { - Name: []string{".*"}, - Affinity: &configpb.AffinityConfig{ - Command: configpb.AffinityConfig_BIND, - AffinityKey: "bind-affinity", - }, - }, - }, - } - return balancer.NewGCPInterceptor(apiConfig) } func createTLSConfig(params DialParams) (*tls.Config, error) { @@ -630,9 +597,6 @@ func OptsFromParams(ctx context.Context, params DialParams) ([]grpc.DialOption, if params.MaxConcurrentRequests == 0 { params.MaxConcurrentRequests = DefaultMaxConcurrentRequests } - if params.MaxConcurrentStreams == 0 { - params.MaxConcurrentStreams = DefaultMaxConcurrentStreams - } if params.NoSecurity { authUsed = NoAuth opts = append(opts, grpc.WithInsecure()) @@ -690,12 +654,6 @@ func OptsFromParams(ctx context.Context, params DialParams) ([]grpc.DialOption, opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) } - grpcInt := createGRPCInterceptor(params) - opts = append(opts, grpc.WithDisableServiceConfig()) - opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, balancer.Name))) - opts = append(opts, grpc.WithUnaryInterceptor(grpcInt.GCPUnaryClientInterceptor)) - opts = append(opts, grpc.WithStreamInterceptor(grpcInt.GCPStreamClientInterceptor)) - return opts, authUsed, nil } @@ -716,14 +674,10 @@ func NewClient(ctx context.Context, instanceName string, params DialParams, opts } var conn, casConn GrpcClientConn - if params.RoundRobinBalancer { dial := func(ctx context.Context) (*grpc.ClientConn, error) { return grpc.DialContext(ctx, params.Service, dialOpts...) } - conn, err = balancer.NewRRConnPool(ctx, params.RoundRobinPoolSize, dial) - } else { - conn, err = grpc.Dial(params.Service, dialOpts...) - } + conn, err = balancer.NewRRConnPool(ctx, int(params.MaxConcurrentRequests), dial) if err != nil { return nil, fmt.Errorf("couldn't dial gRPC %q: %v", params.Service, err) } @@ -731,14 +685,10 @@ func NewClient(ctx context.Context, instanceName string, params DialParams, opts casConn = conn if params.CASService != "" && params.CASService != params.Service { log.Infof("Connecting to CAS service %s", params.CASService) - if params.RoundRobinBalancer { dial := func(ctx context.Context) (*grpc.ClientConn, error) { return grpc.DialContext(ctx, params.CASService, dialOpts...) } - casConn, err = balancer.NewRRConnPool(ctx, params.RoundRobinPoolSize, dial) - } else { - casConn, err = grpc.Dial(params.CASService, dialOpts...) - } + casConn, err = balancer.NewRRConnPool(ctx, int(params.MaxConcurrentRequests), dial) } if err != nil { return nil, &InitError{Err: statusWrap(err), AuthUsed: authUsed} diff --git a/go/pkg/client/client_test.go b/go/pkg/client/client_test.go index cb7308d8c..d8e500313 100644 --- a/go/pkg/client/client_test.go +++ b/go/pkg/client/client_test.go @@ -209,8 +209,6 @@ func TestNewClientRR(t *testing.T) { c, err := NewClient(ctx, instance, DialParams{ Service: "server", NoSecurity: true, - RoundRobinBalancer: true, - RoundRobinPoolSize: 25, }, StartupCapabilities(false)) if err != nil { t.Fatalf("Error creating client: %v", err) diff --git a/go/pkg/flags/BUILD.bazel b/go/pkg/flags/BUILD.bazel index c52eb3109..4df4e71cd 100644 --- a/go/pkg/flags/BUILD.bazel +++ b/go/pkg/flags/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/flags", visibility = ["//visibility:public"], deps = [ - "//go/pkg/balancer", "//go/pkg/client", "//go/pkg/moreflag", "@com_github_golang_glog//:glog", diff --git a/go/pkg/flags/flags.go b/go/pkg/flags/flags.go index b3bb6705f..afb07df7b 100644 --- a/go/pkg/flags/flags.go +++ b/go/pkg/flags/flags.go @@ -6,7 +6,6 @@ import ( "flag" "time" - "github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer" "github.com/bazelbuild/remote-apis-sdks/go/pkg/client" "github.com/bazelbuild/remote-apis-sdks/go/pkg/moreflag" "google.golang.org/grpc" @@ -54,8 +53,6 @@ var ( CASConcurrency = flag.Int("cas_concurrency", client.DefaultCASConcurrency, "Num concurrent upload / download RPCs that the SDK is allowed to do.") // MaxConcurrentRequests denotes the maximum number of concurrent RPCs on a single gRPC connection. MaxConcurrentRequests = flag.Uint("max_concurrent_requests_per_conn", client.DefaultMaxConcurrentRequests, "Maximum number of concurrent RPCs on a single gRPC connection.") - // MaxConcurrentStreams denotes the maximum number of concurrent stream RPCs on a single gRPC connection. - MaxConcurrentStreams = flag.Uint("max_concurrent_streams_per_conn", client.DefaultMaxConcurrentStreams, "Maximum number of concurrent stream RPCs on a single gRPC connection.") // TLSServerName overrides the server name sent in the TLS session. TLSServerName = flag.String("tls_server_name", "", "Override the TLS server name") // TLSCACert loads CA certificates from a file @@ -74,17 +71,9 @@ var ( KeepAliveTimeout = flag.Duration("grpc_keepalive_timeout", 20*time.Second, "After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that the connection is closed. Default is 20s.") // KeepAlivePermitWithoutStream specifies gRPCs keepalive permitWithoutStream parameter. KeepAlivePermitWithoutStream = flag.Bool("grpc_keepalive_permit_without_stream", false, "If true, client sends keepalive pings even with no active RPCs; otherwise, doesn't send pings even if time and timeout are set. Default is false.") - // UseRoundRobinBalancer is a temporary feature flag to rollout a simplified load balancer. - // See http://go/remote-apis-sdks/issues/499 - UseRoundRobinBalancer = flag.Bool("use_round_robin_balancer", true, "If true (default), a round-robin connection bool is used for gRPC. Otherwise, the existing load balancer is used.") - // RoundRobinBalancerPoolSize specifies the pool size for the round robin balancer. - RoundRobinBalancerPoolSize = flag.Int("round_robin_balancer_pool_size", client.DefaultMaxConcurrentRequests, "pool size for round robin grpc balacner") ) func init() { - // MinConnections denotes the minimum number of gRPC sub-connections the gRPC balancer should create during SDK initialization. - flag.IntVar(&balancer.MinConnections, "min_grpc_connections", balancer.DefaultMinConnections, "Minimum number of gRPC sub-connections the gRPC balancer should create during SDK initialization.") - // RPCTimeouts stores the per-RPC timeout values. The flag allows users to override the defaults // set in client.DefaultRPCTimeouts. This is in order to not force the users to familiarize // themselves with every RPC, otherwise it is easy to accidentally enforce a timeout on // WaitExecution, for example. @@ -149,8 +138,5 @@ func NewClientFromFlags(ctx context.Context, opts ...client.Opt) (*client.Client TLSClientAuthCert: *TLSClientAuthCert, TLSClientAuthKey: *TLSClientAuthKey, MaxConcurrentRequests: uint32(*MaxConcurrentRequests), - MaxConcurrentStreams: uint32(*MaxConcurrentStreams), - RoundRobinBalancer: *UseRoundRobinBalancer, - RoundRobinPoolSize: *RoundRobinBalancerPoolSize, }, opts...) }