Skip to content

Commit

Permalink
Fix load shedding
Browse files Browse the repository at this point in the history
  • Loading branch information
hczhu-db committed Dec 19, 2024
1 parent 95cb360 commit 209f0e8
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 10 deletions.
4 changes: 2 additions & 2 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,8 +1073,8 @@ func quorumReached(successes []int, successThreshold int) bool {

// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.
func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) {
if h.Limiter.ShouldRejectNewRequest() {
return nil, status.Error(codes.ResourceExhausted, "too many pending write requests")
if rejected, msg := h.Limiter.ShouldRejectNewRequest(); rejected {
return nil, status.Error(codes.ResourceExhausted, msg)
}
// NB: ShouldRejectNewRequest() increments the number of pending requests only when it returns false.
defer h.Limiter.DecrementPendingRequests()
Expand Down
17 changes: 11 additions & 6 deletions pkg/receive/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,25 @@ func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter {
return l.headSeriesLimiter
}

func (l *Limiter) ShouldRejectNewRequest() bool {
func (l *Limiter) ShouldRejectNewRequest() (bool, string) {
// maxPendingRequests doesn't change once set when a limiter instance is created.
// So, it's safe to read it without a lock.
if l.maxPendingRequests > 0 && l.pendingRequests.Load() >= l.maxPendingRequests {
if l.maxPendingRequestLimitHit != nil {
l.maxPendingRequestLimitHit.Inc()
if l.maxPendingRequests > 0 {
if pendingRequests := l.pendingRequests.Load(); pendingRequests >= l.maxPendingRequests {
if l.maxPendingRequestLimitHit != nil {
l.maxPendingRequestLimitHit.Inc()
}
if l.pendingRequestsGauge != nil {
l.pendingRequestsGauge.Set(float64(pendingRequests))
}
return true, fmt.Sprintf("too many pending write requests: %d >= %d", l.pendingRequests.Load(), l.maxPendingRequests)
}
return true
}
newValue := l.pendingRequests.Add(1)
if l.pendingRequestsGauge != nil {
l.pendingRequestsGauge.Set(float64(newValue))
}
return false
return false, ""
}

func (l *Limiter) DecrementPendingRequests() {
Expand Down
12 changes: 10 additions & 2 deletions pkg/store/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package store

import (
"fmt"
"sync"

"github.com/alecthomas/units"
Expand Down Expand Up @@ -160,8 +161,15 @@ func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer,
}

func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
if s.maxPendingRequests > 0 && s.pendingRequests.Load() >= s.maxPendingRequests {
return status.Error(codes.ResourceExhausted, "too many pending series requests")
if s.maxPendingRequests > 0 {
if pendingRequests := s.pendingRequests.Load(); pendingRequests >= s.maxPendingRequests {
s.maxPendingRequestLimitHit.Inc()
s.pendingRequestsGauge.Set(float64(pendingRequests))
return status.Error(
codes.ResourceExhausted,
fmt.Sprintf("too many pending series requests: %d >= %d", s.pendingRequests.Load(), s.maxPendingRequests),
)
}
}
s.pendingRequestsGauge.Set(float64(s.pendingRequests.Add(1)))
defer s.pendingRequests.Add(-1)
Expand Down

0 comments on commit 209f0e8

Please sign in to comment.