diff --git a/conf/carbon.conf.example b/conf/carbon.conf.example index e919de1d..561c3e48 100644 --- a/conf/carbon.conf.example +++ b/conf/carbon.conf.example @@ -472,10 +472,33 @@ DESTINATIONS = 127.0.0.1:2004 # This is the maximum number of datapoints that can be queued up # for a single destination. Once this limit is hit, we will -# stop accepting new data if USE_FLOW_CONTROL is True, otherwise -# we will drop any subsequently received datapoints. +# stop accepting new data if USE_FLOW_CONTROL is True. In-flight and +# internally-generated datapoints will still be processed, and data +# will still be dropped if MAX_QUEUE_SIZE_HARD_PCT * MAX_QUEUE_SIZE +# is hit. If USE_FLOW_CONTROL is False, metrics are immediately dropped +# after MAX_QUEUE_SIZE, and MAX_QUEUE_SIZE_HARD_PCT is unused. MAX_QUEUE_SIZE = 10000 +# This is the factor that the queue must be empty before it will accept +# more messages. For a larger site, if the queue is very large it makes sense +# to tune this to allow for incoming stats. So if you have an average +# flow of 100k stats/minute, and a MAX_QUEUE_SIZE of 3,000,000, it makes sense +# to allow stats to start flowing when you've cleared the queue to 95% since +# you should have space to accommodate the next minute's worth of stats +# even before the relay incrementally clears more of the queue +QUEUE_LOW_WATERMARK_PCT = 0.8 + +# This is the factor of the max length of a queue before data will be dropped +# with USE_FLOW_CONTROL enabled. When incoming data is paused, in-flight data +# is still processed, which can send a queue slightly over the configured max. +MAX_QUEUE_SIZE_HARD_PCT = 1.25 + +# Set this to False to drop datapoints when any send queue (sending datapoints +# to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the +# default) then sockets over which metrics are received will temporarily stop accepting +# data until the send queues fall below QUEUE_LOW_WATERMARK_PCT * MAX_QUEUE_SIZE. +USE_FLOW_CONTROL = True + # This defines the maximum "message size" between carbon daemons. If # your queue is large, setting this to a lower number will cause the # relay to forward smaller discrete chunks of stats, which may prevent @@ -492,26 +515,11 @@ MAX_DATAPOINTS_PER_MESSAGE = 500 # If this is blank carbon-relay runs as the user that invokes it # USER = -# This is the percentage that the queue must be empty before it will accept -# more messages. For a larger site, if the queue is very large it makes sense -# to tune this to allow for incoming stats. So if you have an average -# flow of 100k stats/minute, and a MAX_QUEUE_SIZE of 3,000,000, it makes sense -# to allow stats to start flowing when you've cleared the queue to 95% since -# you should have space to accommodate the next minute's worth of stats -# even before the relay incrementally clears more of the queue -QUEUE_LOW_WATERMARK_PCT = 0.8 - # To allow for batch efficiency from the pickle protocol and to benefit from # other batching advantages, all writes are deferred by putting them into a queue, # and then the queue is flushed and sent a small fraction of a second later. TIME_TO_DEFER_SENDING = 0.0001 -# Set this to False to drop datapoints when any send queue (sending datapoints -# to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the -# default) then sockets over which metrics are received will temporarily stop accepting -# data until the send queues fall below QUEUE_LOW_WATERMARK_PCT * MAX_QUEUE_SIZE. -USE_FLOW_CONTROL = True - # If enabled this setting is used to timeout metric client connection if no # metrics have been sent in specified time in seconds #METRIC_CLIENT_IDLE_TIMEOUT = None @@ -619,10 +627,27 @@ REPLICATION_FACTOR = 1 # This is the maximum number of datapoints that can be queued up # for a single destination. Once this limit is hit, we will -# stop accepting new data if USE_FLOW_CONTROL is True, otherwise -# we will drop any subsequently received datapoints. +# stop accepting new data if USE_FLOW_CONTROL is True. In-flight and +# internally-generated datapoints will still be processed, and data +# will still be dropped if MAX_QUEUE_SIZE_HARD_PCT * MAX_QUEUE_SIZE +# is hit. If USE_FLOW_CONTROL is False, metrics are immediately dropped +# after MAX_QUEUE_SIZE, and MAX_QUEUE_SIZE_HARD_PCT is unused. MAX_QUEUE_SIZE = 10000 +# This is the factor that the queue must be empty before it will accept +# more messages. For a larger site, if the queue is very large it makes sense +# to tune this to allow for incoming stats. So if you have an average +# flow of 100k stats/minute, and a MAX_QUEUE_SIZE of 3,000,000, it makes sense +# to allow stats to start flowing when you've cleared the queue to 95% since +# you should have space to accommodate the next minute's worth of stats +# even before the relay incrementally clears more of the queue +QUEUE_LOW_WATERMARK_PCT = 0.8 + +# This is the factor of the max length of a queue before data will be dropped +# with USE_FLOW_CONTROL enabled. When incoming data is paused, in-flight data +# is still processed, which can send a queue slightly over the configured max. +MAX_QUEUE_SIZE_HARD_PCT = 1.25 + # Set this to False to drop datapoints when any send queue (sending datapoints # to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the # default) then sockets over which metrics are received will temporarily stop accepting diff --git a/lib/carbon/cache.py b/lib/carbon/cache.py index 72ccf6cf..0304143a 100644 --- a/lib/carbon/cache.py +++ b/lib/carbon/cache.py @@ -208,6 +208,13 @@ def watermarks(self): @property def is_full(self): + if settings.CACHE_SIZE_HARD_MAX == float('inf'): + return False + else: + return self.size >= settings.CACHE_SIZE_HARD_MAX + + @property + def is_nearly_full(self): if settings.MAX_CACHE_SIZE == float('inf'): return False else: @@ -252,8 +259,12 @@ def store(self, metric, datapoint): # Not a duplicate, hence process if cache is not full if self.is_full: log.msg("MetricCache is full: self.size=%d" % self.size) - events.cacheFull() + events.cacheOverflow() else: + if self.is_nearly_full: + # This will disable reading when flow control is enabled + log.msg("MetricCache is nearly full: self.size=%d" % self.size) + events.cacheFull() if not self[metric]: self.new_metrics.append(metric) self.size += 1 diff --git a/lib/carbon/client.py b/lib/carbon/client.py index d9ba5a99..6bed7b7f 100644 --- a/lib/carbon/client.py +++ b/lib/carbon/client.py @@ -35,6 +35,10 @@ SEND_QUEUE_LOW_WATERMARK = settings.MAX_QUEUE_SIZE * settings.QUEUE_LOW_WATERMARK_PCT +if settings.USE_FLOW_CONTROL: + SEND_QUEUE_HARD_MAX = settings.MAX_QUEUE_SIZE * settings.MAX_QUEUE_SIZE_HARD_PCT +else: + SEND_QUEUE_HARD_MAX = settings.MAX_QUEUE_SIZE class CarbonClientProtocol(object): @@ -350,7 +354,10 @@ def sendDatapoint(self, metric, datapoint): if self.queueSize >= settings.MAX_QUEUE_SIZE: if not self.queueFull.called: self.queueFull.callback(self.queueSize) - instrumentation.increment(self.fullQueueDrops) + if self.queueSize < SEND_QUEUE_HARD_MAX: + self.enqueue(metric, datapoint) + else: + instrumentation.increment(self.fullQueueDrops) else: self.enqueue(metric, datapoint) diff --git a/lib/carbon/conf.py b/lib/carbon/conf.py index aef5e684..43532255 100644 --- a/lib/carbon/conf.py +++ b/lib/carbon/conf.py @@ -68,8 +68,9 @@ MAX_DATAPOINTS_PER_MESSAGE=500, MAX_AGGREGATION_INTERVALS=5, FORWARD_ALL=True, - MAX_QUEUE_SIZE=1000, + MAX_QUEUE_SIZE=10000, QUEUE_LOW_WATERMARK_PCT=0.8, + MAX_QUEUE_SIZE_HARD_PCT=1.25, TIME_TO_DEFER_SENDING=0.0001, ENABLE_AMQP=False, AMQP_METRIC_NAME_IN_BODY=False, @@ -296,6 +297,10 @@ def cleanpath(path): state.database = database_class(settings) settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95 + if settings.USE_FLOW_CONTROL: + settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE * 1.05 + else: + settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE if "action" not in self: self["action"] = "start" diff --git a/lib/carbon/events.py b/lib/carbon/events.py index a7057ae3..ae36f473 100644 --- a/lib/carbon/events.py +++ b/lib/carbon/events.py @@ -22,6 +22,7 @@ def __call__(self, *args, **kwargs): metricReceived = Event('metricReceived') metricGenerated = Event('metricGenerated') +cacheOverflow = Event('cacheOverflow') cacheFull = Event('cacheFull') cacheSpaceAvailable = Event('cacheSpaceAvailable') pauseReceivingMetrics = Event('pauseReceivingMetrics') @@ -32,7 +33,7 @@ def __call__(self, *args, **kwargs): lambda metric, datapoint: state.instrumentation.increment('metricsReceived')) -cacheFull.addHandler(lambda: state.instrumentation.increment('cache.overflow')) +cacheOverflow.addHandler(lambda: state.instrumentation.increment('cache.overflow')) cacheFull.addHandler(lambda: setattr(state, 'cacheTooFull', True)) cacheSpaceAvailable.addHandler(lambda: setattr(state, 'cacheTooFull', False)) diff --git a/lib/carbon/service.py b/lib/carbon/service.py index 0c0fb894..f7b85ea7 100644 --- a/lib/carbon/service.py +++ b/lib/carbon/service.py @@ -170,6 +170,10 @@ def setupAggregatorProcessor(root_service, settings): "aggregation processor: file does not exist {0}".format(aggregation_rules_path)) RuleManager.read_from(aggregation_rules_path) + if settings.USE_FLOW_CONTROL: + events.cacheFull.addHandler(events.pauseReceivingMetrics) + events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics) + def setupRewriterProcessor(root_service, settings): from carbon.rewrite import RewriteRuleManager @@ -177,6 +181,10 @@ def setupRewriterProcessor(root_service, settings): rewrite_rules_path = settings["rewrite-rules"] RewriteRuleManager.read_from(rewrite_rules_path) + if settings.USE_FLOW_CONTROL: + events.cacheFull.addHandler(events.pauseReceivingMetrics) + events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics) + def setupRelayProcessor(root_service, settings): from carbon.routers import DatapointRouter @@ -191,6 +199,10 @@ def setupRelayProcessor(root_service, settings): for destination in util.parseDestinations(settings.DESTINATIONS): state.client_manager.startClient(destination) + if settings.USE_FLOW_CONTROL: + events.cacheFull.addHandler(events.pauseReceivingMetrics) + events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics) + def setupWriterProcessor(root_service, settings): from carbon import cache # NOQA Register CacheFeedingProcessor diff --git a/lib/carbon/tests/benchmark_cache.py b/lib/carbon/tests/benchmark_cache.py index 1c2b1379..aa7948f5 100644 --- a/lib/carbon/tests/benchmark_cache.py +++ b/lib/carbon/tests/benchmark_cache.py @@ -4,6 +4,10 @@ NaiveStrategy, MaxStrategy, RandomStrategy, SortedStrategy, \ TimeSortedStrategy, BucketMaxStrategy +from carbon.conf import settings + +settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95 +settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE * 1.05 metric_cache = _MetricCache(DrainStrategy) count = 0 diff --git a/lib/carbon/tests/test_cache.py b/lib/carbon/tests/test_cache.py index 4eb391da..8bd8bb9d 100644 --- a/lib/carbon/tests/test_cache.py +++ b/lib/carbon/tests/test_cache.py @@ -11,6 +11,7 @@ class MetricCacheTest(TestCase): def setUp(self): settings = { 'MAX_CACHE_SIZE': float('inf'), + 'CACHE_SIZE_HARD_MAX': float('inf'), 'CACHE_SIZE_LOW_WATERMARK': float('inf') } self._settings_patch = patch.dict('carbon.conf.settings', settings) @@ -67,6 +68,13 @@ def test_store_checks_fullness(self): def test_store_on_full_triggers_events(self): is_full_mock = PropertyMock(return_value=True) with patch.object(_MetricCache, 'is_full', is_full_mock): + with patch('carbon.cache.events') as events_mock: + self.metric_cache.store('foo', (123456, 1.0)) + events_mock.cacheOverflow.assert_called_with() + + def test_store_on_nearly_full_triggers_events(self): + is_nearly_full_mock = PropertyMock(return_value=True) + with patch.object(_MetricCache, 'is_nearly_full', is_nearly_full_mock): with patch('carbon.cache.events') as events_mock: self.metric_cache.store('foo', (123456, 1.0)) events_mock.cacheFull.assert_called_with() @@ -150,7 +158,7 @@ def test_is_full_short_circuits_on_inf(self): size_mock.assert_not_called() def test_is_full(self): - self._settings_patch.values['MAX_CACHE_SIZE'] = 2.0 + self._settings_patch.values['CACHE_SIZE_HARD_MAX'] = 2.0 self._settings_patch.start() with patch('carbon.cache.events'): self.assertFalse(self.metric_cache.is_full) @@ -178,8 +186,18 @@ def test_counts_multiple_datapoints(self): class DrainStrategyTest(TestCase): def setUp(self): + settings = { + 'MAX_CACHE_SIZE': float('inf'), + 'CACHE_SIZE_HARD_MAX': float('inf'), + 'CACHE_SIZE_LOW_WATERMARK': float('inf') + } + self._settings_patch = patch.dict('carbon.conf.settings', settings) + self._settings_patch.start() self.metric_cache = _MetricCache() + def tearDown(self): + self._settings_patch.stop() + def test_bucketmax_strategy(self): bucketmax_strategy = BucketMaxStrategy(self.metric_cache) self.metric_cache.strategy = bucketmax_strategy @@ -303,8 +321,18 @@ def test_time_sorted_strategy_min_lag(self): class RandomStrategyTest(TestCase): def setUp(self): + settings = { + 'MAX_CACHE_SIZE': float('inf'), + 'CACHE_SIZE_HARD_MAX': float('inf'), + 'CACHE_SIZE_LOW_WATERMARK': float('inf') + } + self._settings_patch = patch.dict('carbon.conf.settings', settings) + self._settings_patch.start() self.metric_cache = _MetricCache() + def tearDown(self): + self._settings_patch.stop() + def test_random_strategy(self): self.metric_cache.store('foo', (123456, 1.0)) self.metric_cache.store('bar', (123457, 2.0))