Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: use interceptor for circuit breaker #8936

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 4 additions & 24 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -42,7 +40,6 @@ import (
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
cb "github.com/tikv/pd/client/pkg/circuitbreaker"
"github.com/tikv/pd/client/pkg/utils/tlsutil"
sd "github.com/tikv/pd/client/servicediscovery"
)
Expand Down Expand Up @@ -461,12 +458,6 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
return errors.New("[pd] invalid value type for TSOClientRPCConcurrency option, it should be int")
}
c.inner.option.SetTSOClientRPCConcurrency(value)
case opt.RegionMetadataCircuitBreakerSettings:
applySettingsChange, ok := value.(func(config *cb.Settings))
if !ok {
return errors.New("[pd] invalid value type for RegionMetadataCircuitBreakerSettings option, it should be pd.Settings")
}
c.inner.regionMetaCircuitBreaker.ChangeSettings(applySettingsChange)
Comment on lines -464 to -469
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rleungx how do you plan to update circuit breaker settings? Would it be defined at the caller layer (tidb) and passed through the context, so we won't need to plumb ChangeSettings at the client level at all?

Copy link
Member Author

@rleungx rleungx Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the circuit breaker can be defined in the caller layer. Once TiDB variable is changed, it can update the circuit breaker in the caller layer and we don't need to change the client. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I just confirming that I've read the PR intent correctly.

default:
return errors.New("[pd] unsupported client option")
}
Expand Down Expand Up @@ -661,13 +652,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) {
region, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
failpoint.Inject("triggerCircuitBreaker", func() {
err = status.Error(codes.ResourceExhausted, "resource exhausted")
})
return region, isOverloaded(err), err
})
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand Down Expand Up @@ -707,10 +692,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) {
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
return resp, isOverloaded(err), err
})
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand Down Expand Up @@ -750,10 +732,8 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) {
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
return resp, isOverloaded(err), err
})

resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand Down
22 changes: 4 additions & 18 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand All @@ -19,7 +17,6 @@ import (
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
cb "github.com/tikv/pd/client/pkg/circuitbreaker"
sd "github.com/tikv/pd/client/servicediscovery"
)

Expand All @@ -29,11 +26,10 @@ const (
)

type innerClient struct {
keyspaceID uint32
svrUrls []string
pdSvcDiscovery sd.ServiceDiscovery
tokenDispatcher *tokenDispatcher
regionMetaCircuitBreaker *cb.CircuitBreaker[*pdpb.GetRegionResponse]
keyspaceID uint32
svrUrls []string
pdSvcDiscovery sd.ServiceDiscovery
tokenDispatcher *tokenDispatcher

// For service mode switching.
serviceModeKeeper
Expand All @@ -59,7 +55,6 @@ func (c *innerClient) init(updateKeyspaceIDCb sd.UpdateKeyspaceIDFunc) error {
}
return err
}
c.regionMetaCircuitBreaker = cb.NewCircuitBreaker[*pdpb.GetRegionResponse]("region_meta", c.option.RegionMetaCircuitBreakerSettings)

return nil
}
Expand Down Expand Up @@ -252,12 +247,3 @@ func (c *innerClient) dispatchTSORequestWithRetry(ctx context.Context) tso.TSFut
}
return req
}

func isOverloaded(err error) cb.Overloading {
switch status.Code(errors.Cause(err)) {
case codes.DeadlineExceeded, codes.Unavailable, codes.ResourceExhausted:
return cb.Yes
default:
return cb.No
}
}
27 changes: 5 additions & 22 deletions client/opt/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/pingcap/errors"

cb "github.com/tikv/pd/client/pkg/circuitbreaker"
"github.com/tikv/pd/client/pkg/retry"
)

Expand All @@ -50,8 +49,6 @@ const (
EnableFollowerHandle
// TSOClientRPCConcurrency controls the amount of ongoing TSO RPC requests at the same time in a single TSO client.
TSOClientRPCConcurrency
// RegionMetadataCircuitBreakerSettings controls settings for circuit breaker for region metadata requests.
RegionMetadataCircuitBreakerSettings

dynamicOptionCount
)
Expand All @@ -72,18 +69,16 @@ type Option struct {
// Dynamic options.
dynamicOptions [dynamicOptionCount]atomic.Value

EnableTSOFollowerProxyCh chan struct{}
RegionMetaCircuitBreakerSettings cb.Settings
EnableTSOFollowerProxyCh chan struct{}
}

// NewOption creates a new PD client option with the default values set.
func NewOption() *Option {
co := &Option{
Timeout: defaultPDTimeout,
MaxRetryTimes: maxInitClusterRetries,
EnableTSOFollowerProxyCh: make(chan struct{}, 1),
InitMetrics: true,
RegionMetaCircuitBreakerSettings: cb.AlwaysClosedSettings,
Timeout: defaultPDTimeout,
MaxRetryTimes: maxInitClusterRetries,
EnableTSOFollowerProxyCh: make(chan struct{}, 1),
InitMetrics: true,
}

co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)
Expand Down Expand Up @@ -154,11 +149,6 @@ func (o *Option) GetTSOClientRPCConcurrency() int {
return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int)
}

// GetRegionMetadataCircuitBreakerSettings gets circuit breaker settings for PD region metadata calls.
func (o *Option) GetRegionMetadataCircuitBreakerSettings() cb.Settings {
return o.dynamicOptions[RegionMetadataCircuitBreakerSettings].Load().(cb.Settings)
}

// ClientOption configures client.
type ClientOption func(*Option)

Expand Down Expand Up @@ -213,13 +203,6 @@ func WithInitMetricsOption(initMetrics bool) ClientOption {
}
}

// WithRegionMetaCircuitBreaker configures the client with circuit breaker for region meta calls
func WithRegionMetaCircuitBreaker(config cb.Settings) ClientOption {
return func(op *Option) {
op.RegionMetaCircuitBreakerSettings = config
}
}

// WithBackoffer configures the client with backoffer.
func WithBackoffer(bo *retry.Backoffer) ClientOption {
return func(op *Option) {
Expand Down
60 changes: 41 additions & 19 deletions client/pkg/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package circuitbreaker

import (
"context"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -62,12 +63,12 @@
}

// CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
type CircuitBreaker[T any] struct {
type CircuitBreaker struct {
config *Settings
name string

mutex sync.Mutex
state *State[T]
state *State

successCounter prometheus.Counter
errorCounter prometheus.Counter
Expand Down Expand Up @@ -102,8 +103,8 @@
var replacer = strings.NewReplacer(" ", "_", "-", "_")

// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings.
func NewCircuitBreaker[T any](name string, st Settings) *CircuitBreaker[T] {
cb := new(CircuitBreaker[T])
func NewCircuitBreaker(name string, st Settings) *CircuitBreaker {
cb := new(CircuitBreaker)
cb.name = name
cb.config = &st
cb.state = cb.newState(time.Now(), StateClosed)
Expand All @@ -118,7 +119,7 @@

// ChangeSettings changes the CircuitBreaker settings.
// The changes will be reflected only in the next evaluation window.
func (cb *CircuitBreaker[T]) ChangeSettings(apply func(config *Settings)) {
func (cb *CircuitBreaker) ChangeSettings(apply func(config *Settings)) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

Expand All @@ -129,12 +130,11 @@
// Execute calls the given function if the CircuitBreaker is closed and returns the result of execution.
// Execute returns an error instantly if the CircuitBreaker is open.
// https://github.com/tikv/rfcs/blob/master/text/0115-circuit-breaker.md
func (cb *CircuitBreaker[T]) Execute(call func() (T, Overloading, error)) (T, error) {
func (cb *CircuitBreaker) Execute(call func() (Overloading, error)) error {
state, err := cb.onRequest()
if err != nil {
cb.fastFailCounter.Inc()
var defaultValue T
return defaultValue, err
return err
}

defer func() {
Expand All @@ -146,13 +146,13 @@
}
}()

result, overloaded, err := call()
overloaded, err := call()
cb.emitMetric(overloaded, err)
cb.onResult(state, overloaded)
return result, err
return err
}

func (cb *CircuitBreaker[T]) onRequest() (*State[T], error) {
func (cb *CircuitBreaker) onRequest() (*State, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

Expand All @@ -161,7 +161,7 @@
return state, err
}

func (cb *CircuitBreaker[T]) onResult(state *State[T], overloaded Overloading) {
func (cb *CircuitBreaker) onResult(state *State, overloaded Overloading) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

Expand All @@ -170,7 +170,7 @@
state.onResult(overloaded)
}

func (cb *CircuitBreaker[T]) emitMetric(overloaded Overloading, err error) {
func (cb *CircuitBreaker) emitMetric(overloaded Overloading, err error) {
switch overloaded {
case No:
cb.successCounter.Inc()
Expand All @@ -185,9 +185,9 @@
}

// State represents the state of CircuitBreaker.
type State[T any] struct {
type State struct {
stateType StateType
cb *CircuitBreaker[T]
cb *CircuitBreaker
end time.Time

pendingCount uint32
Expand All @@ -196,7 +196,7 @@
}

// newState creates a new State with the given configuration and reset all success/failure counters.
func (cb *CircuitBreaker[T]) newState(now time.Time, stateType StateType) *State[T] {
func (cb *CircuitBreaker) newState(now time.Time, stateType StateType) *State {
var end time.Time
var pendingCount uint32
switch stateType {
Expand All @@ -211,7 +211,7 @@
default:
panic("unknown state")
}
return &State[T]{
return &State{
cb: cb,
stateType: stateType,
pendingCount: pendingCount,
Expand All @@ -227,7 +227,7 @@
// Open state fails all request, it has a fixed duration of `Settings.CoolDownInterval` and always moves to HalfOpen state at the end of the interval.
// HalfOpen state does not have a fixed duration and lasts till `Settings.HalfOpenSuccessCount` are evaluated.
// If any of `Settings.HalfOpenSuccessCount` fails then it moves back to Open state, otherwise it moves to Closed state.
func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
func (s *State) onRequest(cb *CircuitBreaker) (*State, error) {
var now = time.Now()
switch s.stateType {
case StateClosed:
Expand Down Expand Up @@ -299,7 +299,7 @@
}
}

func (s *State[T]) onResult(overloaded Overloading) {
func (s *State) onResult(overloaded Overloading) {
switch overloaded {
case No:
s.successCount++
Expand All @@ -309,3 +309,25 @@
panic("unknown state")
}
}

// Define context key type
type cbCtxKey struct{}

// Key used to store circuit breaker
var CircuitBreakerKey = cbCtxKey{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you foresee the need have multiple circuit breakers per context for different operations?

While this provide a lot flexibility, asking caller to provide the target circuit breaker for each operation is a bit cumbersome and easy to miss on each call path.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just another way to do it. If it's complicated, I'm ok to keep the status quo.

Copy link
Member Author

@rleungx rleungx Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pros:

  1. align with the retry mechanism
  2. use the interceptor, the order is easier to control
  3. it can be changed to request level instead of client level
  4. we can only modify TiDB to add a new type if we remove the generic

cons:

  1. it's more complicated, each caller needs to maintain the circuit breaker
  2. client level is easier to maintain

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the approach with interceptor and context as it is a way more elegant. The only concern that it would be easy to miss to pass a circuit break into a context when a new invocation is added at the client layer. Do your think that over time we can enforce presence of the circuit breaker in context for certain calls like get region?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be very convenient for the caller to switch during configuration changes.🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can get it directly from the SystemVar?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need switch it for every client.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we use a global variable?


// FromContext retrieves the circuit breaker from the context
func FromContext(ctx context.Context) *CircuitBreaker {
if ctx == nil {
return nil
}

Check warning on line 323 in client/pkg/circuitbreaker/circuit_breaker.go

View check run for this annotation

Codecov / codecov/patch

client/pkg/circuitbreaker/circuit_breaker.go#L322-L323

Added lines #L322 - L323 were not covered by tests
if cb, ok := ctx.Value(CircuitBreakerKey).(*CircuitBreaker); ok {
return cb
}
return nil
}

// WithCircuitBreaker stores the circuit breaker into a new context
func WithCircuitBreaker(ctx context.Context, cb *CircuitBreaker) context.Context {
return context.WithValue(ctx, CircuitBreakerKey, cb)
}
Loading
Loading