Skip to content

Commit

Permalink
Add max pending request limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
hczhu-db committed Dec 15, 2024
1 parent a26ab5f commit 8b8943a
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 3 deletions.
23 changes: 22 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func runReceive(
return errors.Wrap(err, "creating limiter")
}

if conf.maxPendingGrpcWriteRequests > 0 {
level.Info(logger).Log("msg", "set max pending gRPC write request in limiter", "max_pending_requests", conf.maxPendingGrpcWriteRequests)
limiter.SetMaxPendingRequests(conf.maxPendingGrpcWriteRequests)
}

webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: conf.rwAddress,
Expand Down Expand Up @@ -408,7 +413,17 @@ func runReceive(
store.LazyRetrieval,
options...,
)
mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
instrumentedStoreServerConfig := store.InstrumentedStoreServerConfig{
MaxPendingRequests: int32(conf.maxPendingGrpcReadRequests),
}
if conf.maxPendingGrpcReadRequests > 0 {
level.Info(logger).Log(
"msg", "set max pending gRPC read request in instrumented store server",
"max_pending_requests", conf.maxPendingGrpcReadRequests,
)
}
instrumentedStoreServer := store.NewInstrumentedStoreServerWithConfig(reg, proxy, instrumentedStoreServerConfig)
mts := store.NewLimitedStoreServer(instrumentedStoreServer, reg, conf.storeRateLimits)
rw := store.ReadWriteTSDBStore{
StoreServer: mts,
WriteableStoreServer: webHandler,
Expand Down Expand Up @@ -974,6 +989,8 @@ type receiveConfig struct {
topMetricsMinimumCardinality uint64
topMetricsUpdateInterval time.Duration
matcherConverterCacheCapacity int
maxPendingGrpcReadRequests int
maxPendingGrpcWriteRequests int

featureList *[]string
}
Expand Down Expand Up @@ -1138,6 +1155,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("5m").DurationVar(&rc.topMetricsUpdateInterval)
cmd.Flag("receive.store-matcher-converter-cache-capacity", "The number of label matchers to cache in the matcher converter for the Store API. Set to 0 to disable to cache. Default is 0.").
Default("0").IntVar(&rc.matcherConverterCacheCapacity)
cmd.Flag("receive.max-pending-grcp-read-requests", "Throttle gRPC read requests when this number of requests are pending. Value 0 disables this feature.").
Default("0").IntVar(&rc.maxPendingGrpcReadRequests)
cmd.Flag("receive.max-pending-grcp-write-requests", "Throttle gRPC write requests when this number of requests are pending. Value 0 disables this feature.").
Default("0").IntVar(&rc.maxPendingGrpcWriteRequests)
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,10 @@ func quorumReached(successes []int, successThreshold int) bool {

// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.
func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) {
if h.Limiter.ShouldRejectRequest(int(h.pendingWriteRequestsCounter.Load())) {
return nil, status.Error(codes.ResourceExhausted, "too many pending write requests")
}

span, ctx := tracing.StartSpan(ctx, "receive_grpc")
defer span.Finish()

Expand Down
30 changes: 30 additions & 0 deletions pkg/receive/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type Limiter struct {
configReloadFailedCounter prometheus.Counter
receiverMode ReceiverMode
configReloadTimer time.Duration

// Reject a request if this limit is reached.
maxPendingRequests int
maxPendingRequestLimitHit prometheus.Counter
}

// headSeriesLimiter encompasses active/head series limiting logic.
Expand Down Expand Up @@ -62,6 +66,24 @@ func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter {
return l.headSeriesLimiter
}

func (l *Limiter) SetMaxPendingRequests(maxPendingRequests int) {
l.Lock()
defer l.Unlock()
l.maxPendingRequests = maxPendingRequests
}

func (l *Limiter) ShouldRejectRequest(pendingRequests int) bool {
// maxPendingRequests shouldn't change once set when a limiter instance is created.
// So, it's safe to read it without a lock.
if l.maxPendingRequests > 0 && pendingRequests >= l.maxPendingRequests {
if l.maxPendingRequestLimitHit != nil {
l.maxPendingRequestLimitHit.Inc()
}
return true
}
return false
}

// NewLimiter creates a new *Limiter given a configuration and prometheus
// registerer.
func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger, configReloadTimer time.Duration) (*Limiter, error) {
Expand Down Expand Up @@ -92,6 +114,14 @@ func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMod
Help: "How many times the limit configuration failed to reload.",
},
)
limiter.configReloadFailedCounter = promauto.With(limiter.registerer).NewCounter(
prometheus.CounterOpts{
Namespace: "thanos",
Subsystem: "receive",
Name: "limits_config_reload_err_total",
Help: "How many times the limit configuration failed to reload.",
},
)
}

if configFile == nil {
Expand Down
37 changes: 35 additions & 2 deletions pkg/store/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/store/storepb"

Expand Down Expand Up @@ -160,10 +162,31 @@ type instrumentedStoreServer struct {

pendingRequests prometheus.Gauge
pendingRequestsCounter atomic.Int32

// TODO: move request limiting to another proper place other than instrumented store server.
// This is a read-only field once it's set.
maxPendingRequests int32
maxPendingRequestLimitHit prometheus.Counter
}

type InstrumentedStoreServerConfig struct {
MaxPendingRequests int32
}

func (s *instrumentedStoreServer) SetMaxPendingRequests(maxPendingRequests int32) {
s.maxPendingRequests = maxPendingRequests
}

// NewInstrumentedStoreServer creates a new instrumentedStoreServer.
func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreServer) storepb.StoreServer {
func NewInstrumentedStoreServer( reg prometheus.Registerer, store storepb.StoreServer) storepb.StoreServer {
return NewInstrumentedStoreServerWithConfig(reg, store, InstrumentedStoreServerConfig{})
}

func NewInstrumentedStoreServerWithConfig(
reg prometheus.Registerer,
store storepb.StoreServer,
conf InstrumentedStoreServerConfig,
) storepb.StoreServer {
return &instrumentedStoreServer{
StoreServer: store,
seriesRequested: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Expand All @@ -180,14 +203,24 @@ func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreSe
Name: "thanos_store_server_pending_series_requests",
Help: "Number of pending series requests",
}),
maxPendingRequestLimitHit: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_store_server_hit_max_pending_series_request_limit",
Help: "Number of pending series requests that hit the max pending request limit",
}),
maxPendingRequests: conf.MaxPendingRequests,
}
}

func (s *instrumentedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
instrumented := newInstrumentedServer(srv)
if s.maxPendingRequests > 0 && s.pendingRequestsCounter.Load() >= int32(s.maxPendingRequests) {
return status.Error(codes.ResourceExhausted, "too many pending series equests")

Check failure on line 216 in pkg/store/telemetry.go

View workflow job for this annotation

GitHub Actions / Check misspelled words

equests ==> requests, quests
}

s.pendingRequests.Set(float64(s.pendingRequestsCounter.Add(1)))
defer s.pendingRequestsCounter.Add(-1)

instrumented := newInstrumentedServer(srv)

if err := s.StoreServer.Series(req, instrumented); err != nil {
return err
}
Expand Down

0 comments on commit 8b8943a

Please sign in to comment.