Skip to content

Commit

Permalink
client: use interceptor for circuit breaker (#8936)
Browse files Browse the repository at this point in the history
ref #8678

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Jan 7, 2025
1 parent 6a0ed86 commit 5c4ab57
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 147 deletions.
28 changes: 4 additions & 24 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -42,7 +40,6 @@ import (
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
cb "github.com/tikv/pd/client/pkg/circuitbreaker"
"github.com/tikv/pd/client/pkg/utils/tlsutil"
sd "github.com/tikv/pd/client/servicediscovery"
)
Expand Down Expand Up @@ -460,12 +457,6 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
return errors.New("[pd] invalid value type for TSOClientRPCConcurrency option, it should be int")
}
c.inner.option.SetTSOClientRPCConcurrency(value)
case opt.RegionMetadataCircuitBreakerSettings:
applySettingsChange, ok := value.(func(config *cb.Settings))
if !ok {
return errors.New("[pd] invalid value type for RegionMetadataCircuitBreakerSettings option, it should be pd.Settings")
}
c.inner.regionMetaCircuitBreaker.ChangeSettings(applySettingsChange)
default:
return errors.New("[pd] unsupported client option")
}
Expand Down Expand Up @@ -660,13 +651,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) {
region, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
failpoint.Inject("triggerCircuitBreaker", func() {
err = status.Error(codes.ResourceExhausted, "resource exhausted")
})
return region, isOverloaded(err), err
})
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand Down Expand Up @@ -706,10 +691,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) {
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
return resp, isOverloaded(err), err
})
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand Down Expand Up @@ -749,10 +731,8 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) {
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
return resp, isOverloaded(err), err
})

resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand Down
22 changes: 4 additions & 18 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand All @@ -19,7 +17,6 @@ import (
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
cb "github.com/tikv/pd/client/pkg/circuitbreaker"
sd "github.com/tikv/pd/client/servicediscovery"
)

Expand All @@ -29,11 +26,10 @@ const (
)

type innerClient struct {
keyspaceID uint32
svrUrls []string
pdSvcDiscovery sd.ServiceDiscovery
tokenDispatcher *tokenDispatcher
regionMetaCircuitBreaker *cb.CircuitBreaker[*pdpb.GetRegionResponse]
keyspaceID uint32
svrUrls []string
pdSvcDiscovery sd.ServiceDiscovery
tokenDispatcher *tokenDispatcher

// For service mode switching.
serviceModeKeeper
Expand All @@ -59,7 +55,6 @@ func (c *innerClient) init(updateKeyspaceIDCb sd.UpdateKeyspaceIDFunc) error {
}
return err
}
c.regionMetaCircuitBreaker = cb.NewCircuitBreaker[*pdpb.GetRegionResponse]("region_meta", c.option.RegionMetaCircuitBreakerSettings)

return nil
}
Expand Down Expand Up @@ -252,12 +247,3 @@ func (c *innerClient) dispatchTSORequestWithRetry(ctx context.Context) tso.TSFut
}
return req
}

func isOverloaded(err error) cb.Overloading {
switch status.Code(errors.Cause(err)) {
case codes.DeadlineExceeded, codes.Unavailable, codes.ResourceExhausted:
return cb.Yes
default:
return cb.No
}
}
27 changes: 5 additions & 22 deletions client/opt/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/pingcap/errors"

cb "github.com/tikv/pd/client/pkg/circuitbreaker"
"github.com/tikv/pd/client/pkg/retry"
)

Expand All @@ -50,8 +49,6 @@ const (
EnableFollowerHandle
// TSOClientRPCConcurrency controls the amount of ongoing TSO RPC requests at the same time in a single TSO client.
TSOClientRPCConcurrency
// RegionMetadataCircuitBreakerSettings controls settings for circuit breaker for region metadata requests.
RegionMetadataCircuitBreakerSettings

dynamicOptionCount
)
Expand All @@ -72,18 +69,16 @@ type Option struct {
// Dynamic options.
dynamicOptions [dynamicOptionCount]atomic.Value

EnableTSOFollowerProxyCh chan struct{}
RegionMetaCircuitBreakerSettings cb.Settings
EnableTSOFollowerProxyCh chan struct{}
}

// NewOption creates a new PD client option with the default values set.
func NewOption() *Option {
co := &Option{
Timeout: defaultPDTimeout,
MaxRetryTimes: maxInitClusterRetries,
EnableTSOFollowerProxyCh: make(chan struct{}, 1),
InitMetrics: true,
RegionMetaCircuitBreakerSettings: cb.AlwaysClosedSettings,
Timeout: defaultPDTimeout,
MaxRetryTimes: maxInitClusterRetries,
EnableTSOFollowerProxyCh: make(chan struct{}, 1),
InitMetrics: true,
}

co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)
Expand Down Expand Up @@ -154,11 +149,6 @@ func (o *Option) GetTSOClientRPCConcurrency() int {
return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int)
}

// GetRegionMetadataCircuitBreakerSettings gets circuit breaker settings for PD region metadata calls.
func (o *Option) GetRegionMetadataCircuitBreakerSettings() cb.Settings {
return o.dynamicOptions[RegionMetadataCircuitBreakerSettings].Load().(cb.Settings)
}

// ClientOption configures client.
type ClientOption func(*Option)

Expand Down Expand Up @@ -213,13 +203,6 @@ func WithInitMetricsOption(initMetrics bool) ClientOption {
}
}

// WithRegionMetaCircuitBreaker configures the client with circuit breaker for region meta calls
func WithRegionMetaCircuitBreaker(config cb.Settings) ClientOption {
return func(op *Option) {
op.RegionMetaCircuitBreakerSettings = config
}
}

// WithBackoffer configures the client with backoffer.
func WithBackoffer(bo *retry.Backoffer) ClientOption {
return func(op *Option) {
Expand Down
60 changes: 41 additions & 19 deletions client/pkg/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package circuitbreaker

import (
"context"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -62,12 +63,12 @@ var AlwaysClosedSettings = Settings{
}

// CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
type CircuitBreaker[T any] struct {
type CircuitBreaker struct {
config *Settings
name string

mutex sync.Mutex
state *State[T]
state *State

successCounter prometheus.Counter
errorCounter prometheus.Counter
Expand Down Expand Up @@ -102,8 +103,8 @@ func (s StateType) String() string {
var replacer = strings.NewReplacer(" ", "_", "-", "_")

// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings.
func NewCircuitBreaker[T any](name string, st Settings) *CircuitBreaker[T] {
cb := new(CircuitBreaker[T])
func NewCircuitBreaker(name string, st Settings) *CircuitBreaker {
cb := new(CircuitBreaker)
cb.name = name
cb.config = &st
cb.state = cb.newState(time.Now(), StateClosed)
Expand All @@ -118,7 +119,7 @@ func NewCircuitBreaker[T any](name string, st Settings) *CircuitBreaker[T] {

// ChangeSettings changes the CircuitBreaker settings.
// The changes will be reflected only in the next evaluation window.
func (cb *CircuitBreaker[T]) ChangeSettings(apply func(config *Settings)) {
func (cb *CircuitBreaker) ChangeSettings(apply func(config *Settings)) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

Expand All @@ -129,12 +130,11 @@ func (cb *CircuitBreaker[T]) ChangeSettings(apply func(config *Settings)) {
// Execute calls the given function if the CircuitBreaker is closed and returns the result of execution.
// Execute returns an error instantly if the CircuitBreaker is open.
// https://github.com/tikv/rfcs/blob/master/text/0115-circuit-breaker.md
func (cb *CircuitBreaker[T]) Execute(call func() (T, Overloading, error)) (T, error) {
func (cb *CircuitBreaker) Execute(call func() (Overloading, error)) error {
state, err := cb.onRequest()
if err != nil {
cb.fastFailCounter.Inc()
var defaultValue T
return defaultValue, err
return err
}

defer func() {
Expand All @@ -146,13 +146,13 @@ func (cb *CircuitBreaker[T]) Execute(call func() (T, Overloading, error)) (T, er
}
}()

result, overloaded, err := call()
overloaded, err := call()
cb.emitMetric(overloaded, err)
cb.onResult(state, overloaded)
return result, err
return err
}

func (cb *CircuitBreaker[T]) onRequest() (*State[T], error) {
func (cb *CircuitBreaker) onRequest() (*State, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

Expand All @@ -161,7 +161,7 @@ func (cb *CircuitBreaker[T]) onRequest() (*State[T], error) {
return state, err
}

func (cb *CircuitBreaker[T]) onResult(state *State[T], overloaded Overloading) {
func (cb *CircuitBreaker) onResult(state *State, overloaded Overloading) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

Expand All @@ -170,7 +170,7 @@ func (cb *CircuitBreaker[T]) onResult(state *State[T], overloaded Overloading) {
state.onResult(overloaded)
}

func (cb *CircuitBreaker[T]) emitMetric(overloaded Overloading, err error) {
func (cb *CircuitBreaker) emitMetric(overloaded Overloading, err error) {
switch overloaded {
case No:
cb.successCounter.Inc()
Expand All @@ -185,9 +185,9 @@ func (cb *CircuitBreaker[T]) emitMetric(overloaded Overloading, err error) {
}

// State represents the state of CircuitBreaker.
type State[T any] struct {
type State struct {
stateType StateType
cb *CircuitBreaker[T]
cb *CircuitBreaker
end time.Time

pendingCount uint32
Expand All @@ -196,7 +196,7 @@ type State[T any] struct {
}

// newState creates a new State with the given configuration and reset all success/failure counters.
func (cb *CircuitBreaker[T]) newState(now time.Time, stateType StateType) *State[T] {
func (cb *CircuitBreaker) newState(now time.Time, stateType StateType) *State {
var end time.Time
var pendingCount uint32
switch stateType {
Expand All @@ -211,7 +211,7 @@ func (cb *CircuitBreaker[T]) newState(now time.Time, stateType StateType) *State
default:
panic("unknown state")
}
return &State[T]{
return &State{
cb: cb,
stateType: stateType,
pendingCount: pendingCount,
Expand All @@ -227,7 +227,7 @@ func (cb *CircuitBreaker[T]) newState(now time.Time, stateType StateType) *State
// Open state fails all request, it has a fixed duration of `Settings.CoolDownInterval` and always moves to HalfOpen state at the end of the interval.
// HalfOpen state does not have a fixed duration and lasts till `Settings.HalfOpenSuccessCount` are evaluated.
// If any of `Settings.HalfOpenSuccessCount` fails then it moves back to Open state, otherwise it moves to Closed state.
func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
func (s *State) onRequest(cb *CircuitBreaker) (*State, error) {
var now = time.Now()
switch s.stateType {
case StateClosed:
Expand Down Expand Up @@ -299,7 +299,7 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
}
}

func (s *State[T]) onResult(overloaded Overloading) {
func (s *State) onResult(overloaded Overloading) {
switch overloaded {
case No:
s.successCount++
Expand All @@ -309,3 +309,25 @@ func (s *State[T]) onResult(overloaded Overloading) {
panic("unknown state")
}
}

// Define context key type
type cbCtxKey struct{}

// Key used to store circuit breaker
var CircuitBreakerKey = cbCtxKey{}

// FromContext retrieves the circuit breaker from the context
func FromContext(ctx context.Context) *CircuitBreaker {
if ctx == nil {
return nil
}
if cb, ok := ctx.Value(CircuitBreakerKey).(*CircuitBreaker); ok {
return cb
}
return nil
}

// WithCircuitBreaker stores the circuit breaker into a new context
func WithCircuitBreaker(ctx context.Context, cb *CircuitBreaker) context.Context {
return context.WithValue(ctx, CircuitBreakerKey, cb)
}
Loading

0 comments on commit 5c4ab57

Please sign in to comment.