Skip to content

Commit

Permalink
pre-emptively create new metrics
Browse files Browse the repository at this point in the history
Based on a patch at graphite-project#888
by ploxiln; I've fixed the blocking issue and also made it drop metrics
only if it comes to write them after continually failing to have enough
create tokens.

If carbon is configured with a create rate of 0, this deque will grow
indefinitely. In pretty much all other circumstances, behaviour should
be pretty similar.
  • Loading branch information
bucko909 committed Aug 21, 2024
1 parent 13a820a commit 37d144f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 15 deletions.
5 changes: 4 additions & 1 deletion lib/carbon/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import threading
from operator import itemgetter
from random import choice
from collections import defaultdict
from collections import defaultdict, deque

from carbon.conf import settings
from carbon import events, log
Expand Down Expand Up @@ -189,6 +189,7 @@ class _MetricCache(defaultdict):
def __init__(self, strategy=None):
self.lock = threading.Lock()
self.size = 0
self.new_metrics = deque()
self.strategy = None
if strategy:
self.strategy = strategy(self)
Expand Down Expand Up @@ -253,6 +254,8 @@ def store(self, metric, datapoint):
log.msg("MetricCache is full: self.size=%d" % self.size)
events.cacheFull()
else:
if not self[metric]:
self.new_metrics.append(metric)
self.size += 1
self[metric][timestamp] = value
if self.strategy:
Expand Down
41 changes: 27 additions & 14 deletions lib/carbon/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,25 @@ def writeCachedDataPoints():

cache = MetricCache()
while cache:
(metric, datapoints) = cache.drain_metric()
if metric is None:
# end the loop
break
# First, create new metrics files, which is helpful for graphite-web
while cache.new_metrics and (not CREATE_BUCKET or CREATE_BUCKET.peek(1)):
metric = cache.new_metrics.popleft()

dbFileExists = state.database.exists(metric)
if metric not in cache:
# This metric has already been drained. There's no sense in creating it.
continue

if not dbFileExists:
if CREATE_BUCKET and not CREATE_BUCKET.drain(1):
# If our tokenbucket doesn't have enough tokens available to create a new metric
# file then we'll just drop the metric on the ground and move on to the next
# metric.
# XXX This behavior should probably be configurable to no tdrop metrics
# when rate limiting unless our cache is too big or some other legit
# reason.
instrumentation.increment('droppedCreates')
if state.database.exists(metric):
continue

if CREATE_BUCKET and not CREATE_BUCKET.drain(1):
# This should never actually happen as no other thread should be
# draining our tokens, and we just checked for a token.
# Just put the new metric back in the create list and we'll try again
# after writing an update.
cache.new_metrics.appendleft(metric)
break

archiveConfig = None
xFilesFactor, aggregationMethod = None, None

Expand Down Expand Up @@ -150,6 +151,18 @@ def writeCachedDataPoints():
instrumentation.increment('errors')
continue

# now drain and persist some data
(metric, datapoints) = cache.drain_metric()
if metric is None:
# end the loop
break

if not state.database.exists(metric):
# If we get here, the metric must still be in new_metrics. We're
# creating too fast, and we'll drop this data.
instrumentation.increment('droppedCreates')
continue

# If we've got a rate limit configured lets makes sure we enforce it
waitTime = 0
if UPDATE_BUCKET:
Expand Down

0 comments on commit 37d144f

Please sign in to comment.