diff --git a/lib/carbon/cache.py b/lib/carbon/cache.py index cf8b7d1e..72ccf6cf 100644 --- a/lib/carbon/cache.py +++ b/lib/carbon/cache.py @@ -16,7 +16,7 @@ import threading from operator import itemgetter from random import choice -from collections import defaultdict +from collections import defaultdict, deque from carbon.conf import settings from carbon import events, log @@ -189,6 +189,7 @@ class _MetricCache(defaultdict): def __init__(self, strategy=None): self.lock = threading.Lock() self.size = 0 + self.new_metrics = deque() self.strategy = None if strategy: self.strategy = strategy(self) @@ -253,6 +254,8 @@ def store(self, metric, datapoint): log.msg("MetricCache is full: self.size=%d" % self.size) events.cacheFull() else: + if not self[metric]: + self.new_metrics.append(metric) self.size += 1 self[metric][timestamp] = value if self.strategy: diff --git a/lib/carbon/writer.py b/lib/carbon/writer.py index 9161d95a..adcc0a01 100644 --- a/lib/carbon/writer.py +++ b/lib/carbon/writer.py @@ -95,24 +95,25 @@ def writeCachedDataPoints(): cache = MetricCache() while cache: - (metric, datapoints) = cache.drain_metric() - if metric is None: - # end the loop - break + # First, create new metrics files, which is helpful for graphite-web + while cache.new_metrics and (not CREATE_BUCKET or CREATE_BUCKET.peek(1)): + metric = cache.new_metrics.popleft() - dbFileExists = state.database.exists(metric) + if metric not in cache: + # This metric has already been drained. There's no sense in creating it. + continue - if not dbFileExists: - if CREATE_BUCKET and not CREATE_BUCKET.drain(1): - # If our tokenbucket doesn't have enough tokens available to create a new metric - # file then we'll just drop the metric on the ground and move on to the next - # metric. - # XXX This behavior should probably be configurable to no tdrop metrics - # when rate limiting unless our cache is too big or some other legit - # reason. - instrumentation.increment('droppedCreates') + if state.database.exists(metric): continue + if CREATE_BUCKET and not CREATE_BUCKET.drain(1): + # This should never actually happen as no other thread should be + # draining our tokens, and we just checked for a token. + # Just put the new metric back in the create list and we'll try again + # after writing an update. + cache.new_metrics.appendleft(metric) + break + archiveConfig = None xFilesFactor, aggregationMethod = None, None @@ -150,6 +151,18 @@ def writeCachedDataPoints(): instrumentation.increment('errors') continue + # now drain and persist some data + (metric, datapoints) = cache.drain_metric() + if metric is None: + # end the loop + break + + if not state.database.exists(metric): + # If we get here, the metric must still be in new_metrics. We're + # creating too fast, and we'll drop this data. + instrumentation.increment('droppedCreates') + continue + # If we've got a rate limit configured lets makes sure we enforce it waitTime = 0 if UPDATE_BUCKET: