diff --git a/lib/carbon/cache.py b/lib/carbon/cache.py index cf8b7d1e..a7a0c7e5 100644 --- a/lib/carbon/cache.py +++ b/lib/carbon/cache.py @@ -207,6 +207,13 @@ def watermarks(self): @property def is_full(self): + if settings.CACHE_SIZE_HARD_MAX == float('inf'): + return False + else: + return self.size >= settings.CACHE_SIZE_HARD_MAX + + @property + def is_nearly_full(self): if settings.MAX_CACHE_SIZE == float('inf'): return False else: @@ -251,8 +258,12 @@ def store(self, metric, datapoint): # Not a duplicate, hence process if cache is not full if self.is_full: log.msg("MetricCache is full: self.size=%d" % self.size) - events.cacheFull() + events.cacheOverflow() else: + if self.is_nearly_full: + # This will disable reading when flow control is enabled + log.msg("MetricCache is nearly full: self.size=%d" % self.size) + events.cacheFull() self.size += 1 self[metric][timestamp] = value if self.strategy: diff --git a/lib/carbon/conf.py b/lib/carbon/conf.py index 8a99df06..43532255 100644 --- a/lib/carbon/conf.py +++ b/lib/carbon/conf.py @@ -297,6 +297,10 @@ def cleanpath(path): state.database = database_class(settings) settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95 + if settings.USE_FLOW_CONTROL: + settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE * 1.05 + else: + settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE if "action" not in self: self["action"] = "start" diff --git a/lib/carbon/events.py b/lib/carbon/events.py index a7057ae3..ae36f473 100644 --- a/lib/carbon/events.py +++ b/lib/carbon/events.py @@ -22,6 +22,7 @@ def __call__(self, *args, **kwargs): metricReceived = Event('metricReceived') metricGenerated = Event('metricGenerated') +cacheOverflow = Event('cacheOverflow') cacheFull = Event('cacheFull') cacheSpaceAvailable = Event('cacheSpaceAvailable') pauseReceivingMetrics = Event('pauseReceivingMetrics') @@ -32,7 +33,7 @@ def __call__(self, *args, **kwargs): lambda metric, datapoint: state.instrumentation.increment('metricsReceived')) -cacheFull.addHandler(lambda: state.instrumentation.increment('cache.overflow')) +cacheOverflow.addHandler(lambda: state.instrumentation.increment('cache.overflow')) cacheFull.addHandler(lambda: setattr(state, 'cacheTooFull', True)) cacheSpaceAvailable.addHandler(lambda: setattr(state, 'cacheTooFull', False)) diff --git a/lib/carbon/tests/test_cache.py b/lib/carbon/tests/test_cache.py index 4eb391da..355cb642 100644 --- a/lib/carbon/tests/test_cache.py +++ b/lib/carbon/tests/test_cache.py @@ -11,6 +11,7 @@ class MetricCacheTest(TestCase): def setUp(self): settings = { 'MAX_CACHE_SIZE': float('inf'), + 'CACHE_SIZE_HARD_MAX': float('inf'), 'CACHE_SIZE_LOW_WATERMARK': float('inf') } self._settings_patch = patch.dict('carbon.conf.settings', settings) @@ -67,6 +68,13 @@ def test_store_checks_fullness(self): def test_store_on_full_triggers_events(self): is_full_mock = PropertyMock(return_value=True) with patch.object(_MetricCache, 'is_full', is_full_mock): + with patch('carbon.cache.events') as events_mock: + self.metric_cache.store('foo', (123456, 1.0)) + events_mock.cacheOverflow.assert_called_with() + + def test_store_on_nearly_full_triggers_events(self): + is_nearly_full_mock = PropertyMock(return_value=True) + with patch.object(_MetricCache, 'is_nearly_full', is_nearly_full_mock): with patch('carbon.cache.events') as events_mock: self.metric_cache.store('foo', (123456, 1.0)) events_mock.cacheFull.assert_called_with() @@ -150,7 +158,7 @@ def test_is_full_short_circuits_on_inf(self): size_mock.assert_not_called() def test_is_full(self): - self._settings_patch.values['MAX_CACHE_SIZE'] = 2.0 + self._settings_patch.values['CACHE_SIZE_HARD_MAX'] = 2.0 self._settings_patch.start() with patch('carbon.cache.events'): self.assertFalse(self.metric_cache.is_full)