From 13a820a2e247ae51d880afacf047fb163e04d73b Mon Sep 17 00:00:00 2001 From: David Buckley Date: Wed, 21 Aug 2024 18:03:58 +0100 Subject: [PATCH 1/2] save some calls to time by only updating _tokens when needed The old behaviour would increase _tokens on every call to drain; now we will do it only if there are not enough to service the request. As a side-effect, we now have .peek(), which is cheap, non-blocking and non-destructive. --- lib/carbon/util.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/carbon/util.py b/lib/carbon/util.py index 2b6708248..a7eade8d3 100644 --- a/lib/carbon/util.py +++ b/lib/carbon/util.py @@ -287,7 +287,7 @@ def drain(self, cost, blocking=False): '''Given a number of tokens (or fractions) drain will return True and drain the number of tokens from the bucket if the capacity allows, otherwise we return false and leave the contents of the bucket.''' - if cost <= self.tokens: + if self.peek(cost): self._tokens -= cost return True @@ -310,16 +310,16 @@ def setCapacityAndFillRate(self, new_capacity, new_fill_rate): self.fill_rate = float(new_fill_rate) self._tokens = delta + self._tokens - @property - def tokens(self): - '''The tokens property will return the current number of tokens in the - bucket.''' - if self._tokens < self.capacity: + def peek(self, cost): + '''Return true if the bucket can drain cost without blocking.''' + if self._tokens >= cost: + return True + else: now = time() delta = self.fill_rate * (now - self.timestamp) self._tokens = min(self.capacity, self._tokens + delta) self.timestamp = now - return self._tokens + return self._tokens >= cost class PluginRegistrar(type): From 37d144fa5678cd8a49b29e67b9f91f485e37d8f1 Mon Sep 17 00:00:00 2001 From: David Buckley Date: Wed, 21 Aug 2024 18:04:03 +0100 Subject: [PATCH 2/2] pre-emptively create new metrics Based on a patch at https://github.com/graphite-project/carbon/pull/888 by ploxiln; I've fixed the blocking issue and also made it drop metrics only if it comes to write them after continually failing to have enough create tokens. If carbon is configured with a create rate of 0, this deque will grow indefinitely. In pretty much all other circumstances, behaviour should be pretty similar. --- lib/carbon/cache.py | 5 ++++- lib/carbon/writer.py | 41 +++++++++++++++++++++++++++-------------- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/lib/carbon/cache.py b/lib/carbon/cache.py index cf8b7d1e2..72ccf6cf0 100644 --- a/lib/carbon/cache.py +++ b/lib/carbon/cache.py @@ -16,7 +16,7 @@ import threading from operator import itemgetter from random import choice -from collections import defaultdict +from collections import defaultdict, deque from carbon.conf import settings from carbon import events, log @@ -189,6 +189,7 @@ class _MetricCache(defaultdict): def __init__(self, strategy=None): self.lock = threading.Lock() self.size = 0 + self.new_metrics = deque() self.strategy = None if strategy: self.strategy = strategy(self) @@ -253,6 +254,8 @@ def store(self, metric, datapoint): log.msg("MetricCache is full: self.size=%d" % self.size) events.cacheFull() else: + if not self[metric]: + self.new_metrics.append(metric) self.size += 1 self[metric][timestamp] = value if self.strategy: diff --git a/lib/carbon/writer.py b/lib/carbon/writer.py index 9161d95a0..adcc0a011 100644 --- a/lib/carbon/writer.py +++ b/lib/carbon/writer.py @@ -95,24 +95,25 @@ def writeCachedDataPoints(): cache = MetricCache() while cache: - (metric, datapoints) = cache.drain_metric() - if metric is None: - # end the loop - break + # First, create new metrics files, which is helpful for graphite-web + while cache.new_metrics and (not CREATE_BUCKET or CREATE_BUCKET.peek(1)): + metric = cache.new_metrics.popleft() - dbFileExists = state.database.exists(metric) + if metric not in cache: + # This metric has already been drained. There's no sense in creating it. + continue - if not dbFileExists: - if CREATE_BUCKET and not CREATE_BUCKET.drain(1): - # If our tokenbucket doesn't have enough tokens available to create a new metric - # file then we'll just drop the metric on the ground and move on to the next - # metric. - # XXX This behavior should probably be configurable to no tdrop metrics - # when rate limiting unless our cache is too big or some other legit - # reason. - instrumentation.increment('droppedCreates') + if state.database.exists(metric): continue + if CREATE_BUCKET and not CREATE_BUCKET.drain(1): + # This should never actually happen as no other thread should be + # draining our tokens, and we just checked for a token. + # Just put the new metric back in the create list and we'll try again + # after writing an update. + cache.new_metrics.appendleft(metric) + break + archiveConfig = None xFilesFactor, aggregationMethod = None, None @@ -150,6 +151,18 @@ def writeCachedDataPoints(): instrumentation.increment('errors') continue + # now drain and persist some data + (metric, datapoints) = cache.drain_metric() + if metric is None: + # end the loop + break + + if not state.database.exists(metric): + # If we get here, the metric must still be in new_metrics. We're + # creating too fast, and we'll drop this data. + instrumentation.increment('droppedCreates') + continue + # If we've got a rate limit configured lets makes sure we enforce it waitTime = 0 if UPDATE_BUCKET: