diff --git a/lib/carbon/cache.py b/lib/carbon/cache.py index 72ccf6cf..0304143a 100644 --- a/lib/carbon/cache.py +++ b/lib/carbon/cache.py @@ -208,6 +208,13 @@ def watermarks(self): @property def is_full(self): + if settings.CACHE_SIZE_HARD_MAX == float('inf'): + return False + else: + return self.size >= settings.CACHE_SIZE_HARD_MAX + + @property + def is_nearly_full(self): if settings.MAX_CACHE_SIZE == float('inf'): return False else: @@ -252,8 +259,12 @@ def store(self, metric, datapoint): # Not a duplicate, hence process if cache is not full if self.is_full: log.msg("MetricCache is full: self.size=%d" % self.size) - events.cacheFull() + events.cacheOverflow() else: + if self.is_nearly_full: + # This will disable reading when flow control is enabled + log.msg("MetricCache is nearly full: self.size=%d" % self.size) + events.cacheFull() if not self[metric]: self.new_metrics.append(metric) self.size += 1 diff --git a/lib/carbon/conf.py b/lib/carbon/conf.py index 8a99df06..43532255 100644 --- a/lib/carbon/conf.py +++ b/lib/carbon/conf.py @@ -297,6 +297,10 @@ def cleanpath(path): state.database = database_class(settings) settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95 + if settings.USE_FLOW_CONTROL: + settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE * 1.05 + else: + settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE if "action" not in self: self["action"] = "start" diff --git a/lib/carbon/events.py b/lib/carbon/events.py index a7057ae3..ae36f473 100644 --- a/lib/carbon/events.py +++ b/lib/carbon/events.py @@ -22,6 +22,7 @@ def __call__(self, *args, **kwargs): metricReceived = Event('metricReceived') metricGenerated = Event('metricGenerated') +cacheOverflow = Event('cacheOverflow') cacheFull = Event('cacheFull') cacheSpaceAvailable = Event('cacheSpaceAvailable') pauseReceivingMetrics = Event('pauseReceivingMetrics') @@ -32,7 +33,7 @@ def __call__(self, *args, **kwargs): lambda metric, datapoint: state.instrumentation.increment('metricsReceived')) -cacheFull.addHandler(lambda: state.instrumentation.increment('cache.overflow')) +cacheOverflow.addHandler(lambda: state.instrumentation.increment('cache.overflow')) cacheFull.addHandler(lambda: setattr(state, 'cacheTooFull', True)) cacheSpaceAvailable.addHandler(lambda: setattr(state, 'cacheTooFull', False)) diff --git a/lib/carbon/tests/benchmark_cache.py b/lib/carbon/tests/benchmark_cache.py index 1c2b1379..aa7948f5 100644 --- a/lib/carbon/tests/benchmark_cache.py +++ b/lib/carbon/tests/benchmark_cache.py @@ -4,6 +4,10 @@ NaiveStrategy, MaxStrategy, RandomStrategy, SortedStrategy, \ TimeSortedStrategy, BucketMaxStrategy +from carbon.conf import settings + +settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95 +settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE * 1.05 metric_cache = _MetricCache(DrainStrategy) count = 0 diff --git a/lib/carbon/tests/test_cache.py b/lib/carbon/tests/test_cache.py index 4eb391da..8bd8bb9d 100644 --- a/lib/carbon/tests/test_cache.py +++ b/lib/carbon/tests/test_cache.py @@ -11,6 +11,7 @@ class MetricCacheTest(TestCase): def setUp(self): settings = { 'MAX_CACHE_SIZE': float('inf'), + 'CACHE_SIZE_HARD_MAX': float('inf'), 'CACHE_SIZE_LOW_WATERMARK': float('inf') } self._settings_patch = patch.dict('carbon.conf.settings', settings) @@ -67,6 +68,13 @@ def test_store_checks_fullness(self): def test_store_on_full_triggers_events(self): is_full_mock = PropertyMock(return_value=True) with patch.object(_MetricCache, 'is_full', is_full_mock): + with patch('carbon.cache.events') as events_mock: + self.metric_cache.store('foo', (123456, 1.0)) + events_mock.cacheOverflow.assert_called_with() + + def test_store_on_nearly_full_triggers_events(self): + is_nearly_full_mock = PropertyMock(return_value=True) + with patch.object(_MetricCache, 'is_nearly_full', is_nearly_full_mock): with patch('carbon.cache.events') as events_mock: self.metric_cache.store('foo', (123456, 1.0)) events_mock.cacheFull.assert_called_with() @@ -150,7 +158,7 @@ def test_is_full_short_circuits_on_inf(self): size_mock.assert_not_called() def test_is_full(self): - self._settings_patch.values['MAX_CACHE_SIZE'] = 2.0 + self._settings_patch.values['CACHE_SIZE_HARD_MAX'] = 2.0 self._settings_patch.start() with patch('carbon.cache.events'): self.assertFalse(self.metric_cache.is_full) @@ -178,8 +186,18 @@ def test_counts_multiple_datapoints(self): class DrainStrategyTest(TestCase): def setUp(self): + settings = { + 'MAX_CACHE_SIZE': float('inf'), + 'CACHE_SIZE_HARD_MAX': float('inf'), + 'CACHE_SIZE_LOW_WATERMARK': float('inf') + } + self._settings_patch = patch.dict('carbon.conf.settings', settings) + self._settings_patch.start() self.metric_cache = _MetricCache() + def tearDown(self): + self._settings_patch.stop() + def test_bucketmax_strategy(self): bucketmax_strategy = BucketMaxStrategy(self.metric_cache) self.metric_cache.strategy = bucketmax_strategy @@ -303,8 +321,18 @@ def test_time_sorted_strategy_min_lag(self): class RandomStrategyTest(TestCase): def setUp(self): + settings = { + 'MAX_CACHE_SIZE': float('inf'), + 'CACHE_SIZE_HARD_MAX': float('inf'), + 'CACHE_SIZE_LOW_WATERMARK': float('inf') + } + self._settings_patch = patch.dict('carbon.conf.settings', settings) + self._settings_patch.start() self.metric_cache = _MetricCache() + def tearDown(self): + self._settings_patch.stop() + def test_random_strategy(self): self.metric_cache.store('foo', (123456, 1.0)) self.metric_cache.store('bar', (123457, 2.0))