Skip to content

Commit

Permalink
Merge pull request #960 from bucko909/early-creates
Browse files Browse the repository at this point in the history
  • Loading branch information
deniszh authored Aug 24, 2024
2 parents 52b40b2 + 37d144f commit ef419d0
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 22 deletions.
5 changes: 4 additions & 1 deletion lib/carbon/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import threading
from operator import itemgetter
from random import choice
from collections import defaultdict
from collections import defaultdict, deque

from carbon.conf import settings
from carbon import events, log
Expand Down Expand Up @@ -189,6 +189,7 @@ class _MetricCache(defaultdict):
def __init__(self, strategy=None):
self.lock = threading.Lock()
self.size = 0
self.new_metrics = deque()
self.strategy = None
if strategy:
self.strategy = strategy(self)
Expand Down Expand Up @@ -253,6 +254,8 @@ def store(self, metric, datapoint):
log.msg("MetricCache is full: self.size=%d" % self.size)
events.cacheFull()
else:
if not self[metric]:
self.new_metrics.append(metric)
self.size += 1
self[metric][timestamp] = value
if self.strategy:
Expand Down
14 changes: 7 additions & 7 deletions lib/carbon/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def drain(self, cost, blocking=False):
'''Given a number of tokens (or fractions) drain will return True and
drain the number of tokens from the bucket if the capacity allows,
otherwise we return false and leave the contents of the bucket.'''
if cost <= self.tokens:
if self.peek(cost):
self._tokens -= cost
return True

Expand All @@ -310,16 +310,16 @@ def setCapacityAndFillRate(self, new_capacity, new_fill_rate):
self.fill_rate = float(new_fill_rate)
self._tokens = delta + self._tokens

@property
def tokens(self):
'''The tokens property will return the current number of tokens in the
bucket.'''
if self._tokens < self.capacity:
def peek(self, cost):
'''Return true if the bucket can drain cost without blocking.'''
if self._tokens >= cost:
return True
else:
now = time()
delta = self.fill_rate * (now - self.timestamp)
self._tokens = min(self.capacity, self._tokens + delta)
self.timestamp = now
return self._tokens
return self._tokens >= cost


class PluginRegistrar(type):
Expand Down
41 changes: 27 additions & 14 deletions lib/carbon/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,25 @@ def writeCachedDataPoints():

cache = MetricCache()
while cache:
(metric, datapoints) = cache.drain_metric()
if metric is None:
# end the loop
break
# First, create new metrics files, which is helpful for graphite-web
while cache.new_metrics and (not CREATE_BUCKET or CREATE_BUCKET.peek(1)):
metric = cache.new_metrics.popleft()

dbFileExists = state.database.exists(metric)
if metric not in cache:
# This metric has already been drained. There's no sense in creating it.
continue

if not dbFileExists:
if CREATE_BUCKET and not CREATE_BUCKET.drain(1):
# If our tokenbucket doesn't have enough tokens available to create a new metric
# file then we'll just drop the metric on the ground and move on to the next
# metric.
# XXX This behavior should probably be configurable to no tdrop metrics
# when rate limiting unless our cache is too big or some other legit
# reason.
instrumentation.increment('droppedCreates')
if state.database.exists(metric):
continue

if CREATE_BUCKET and not CREATE_BUCKET.drain(1):
# This should never actually happen as no other thread should be
# draining our tokens, and we just checked for a token.
# Just put the new metric back in the create list and we'll try again
# after writing an update.
cache.new_metrics.appendleft(metric)
break

archiveConfig = None
xFilesFactor, aggregationMethod = None, None

Expand Down Expand Up @@ -150,6 +151,18 @@ def writeCachedDataPoints():
instrumentation.increment('errors')
continue

# now drain and persist some data
(metric, datapoints) = cache.drain_metric()
if metric is None:
# end the loop
break

if not state.database.exists(metric):
# If we get here, the metric must still be in new_metrics. We're
# creating too fast, and we'll drop this data.
instrumentation.increment('droppedCreates')
continue

# If we've got a rate limit configured lets makes sure we enforce it
waitTime = 0
if UPDATE_BUCKET:
Expand Down

0 comments on commit ef419d0

Please sign in to comment.