From ea4148fd5b60a849b3046c9e366ecbc2e254a8e2 Mon Sep 17 00:00:00 2001 From: David Buckley Date: Wed, 21 Aug 2024 17:53:07 +0100 Subject: [PATCH 1/5] make config file more consistent --- conf/carbon.conf.example | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/conf/carbon.conf.example b/conf/carbon.conf.example index e919de1d..ebb76e46 100644 --- a/conf/carbon.conf.example +++ b/conf/carbon.conf.example @@ -476,6 +476,21 @@ DESTINATIONS = 127.0.0.1:2004 # we will drop any subsequently received datapoints. MAX_QUEUE_SIZE = 10000 +# This is the factor that the queue must be empty before it will accept +# more messages. For a larger site, if the queue is very large it makes sense +# to tune this to allow for incoming stats. So if you have an average +# flow of 100k stats/minute, and a MAX_QUEUE_SIZE of 3,000,000, it makes sense +# to allow stats to start flowing when you've cleared the queue to 95% since +# you should have space to accommodate the next minute's worth of stats +# even before the relay incrementally clears more of the queue +QUEUE_LOW_WATERMARK_PCT = 0.8 + +# Set this to False to drop datapoints when any send queue (sending datapoints +# to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the +# default) then sockets over which metrics are received will temporarily stop accepting +# data until the send queues fall below QUEUE_LOW_WATERMARK_PCT * MAX_QUEUE_SIZE. +USE_FLOW_CONTROL = True + # This defines the maximum "message size" between carbon daemons. If # your queue is large, setting this to a lower number will cause the # relay to forward smaller discrete chunks of stats, which may prevent @@ -492,26 +507,11 @@ MAX_DATAPOINTS_PER_MESSAGE = 500 # If this is blank carbon-relay runs as the user that invokes it # USER = -# This is the percentage that the queue must be empty before it will accept -# more messages. For a larger site, if the queue is very large it makes sense -# to tune this to allow for incoming stats. So if you have an average -# flow of 100k stats/minute, and a MAX_QUEUE_SIZE of 3,000,000, it makes sense -# to allow stats to start flowing when you've cleared the queue to 95% since -# you should have space to accommodate the next minute's worth of stats -# even before the relay incrementally clears more of the queue -QUEUE_LOW_WATERMARK_PCT = 0.8 - # To allow for batch efficiency from the pickle protocol and to benefit from # other batching advantages, all writes are deferred by putting them into a queue, # and then the queue is flushed and sent a small fraction of a second later. TIME_TO_DEFER_SENDING = 0.0001 -# Set this to False to drop datapoints when any send queue (sending datapoints -# to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the -# default) then sockets over which metrics are received will temporarily stop accepting -# data until the send queues fall below QUEUE_LOW_WATERMARK_PCT * MAX_QUEUE_SIZE. -USE_FLOW_CONTROL = True - # If enabled this setting is used to timeout metric client connection if no # metrics have been sent in specified time in seconds #METRIC_CLIENT_IDLE_TIMEOUT = None @@ -623,6 +623,15 @@ REPLICATION_FACTOR = 1 # we will drop any subsequently received datapoints. MAX_QUEUE_SIZE = 10000 +# This is the factor that the queue must be empty before it will accept +# more messages. For a larger site, if the queue is very large it makes sense +# to tune this to allow for incoming stats. So if you have an average +# flow of 100k stats/minute, and a MAX_QUEUE_SIZE of 3,000,000, it makes sense +# to allow stats to start flowing when you've cleared the queue to 95% since +# you should have space to accommodate the next minute's worth of stats +# even before the relay incrementally clears more of the queue +QUEUE_LOW_WATERMARK_PCT = 0.8 + # Set this to False to drop datapoints when any send queue (sending datapoints # to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the # default) then sockets over which metrics are received will temporarily stop accepting From 1ae7548750dbc9e954c79bc49c195ebbb6dd5b5b Mon Sep 17 00:00:00 2001 From: David Buckley Date: Wed, 21 Aug 2024 17:53:13 +0100 Subject: [PATCH 2/5] fix MAX_QUEUE_SIZE to match config file default It's not really clear what to do here. If someone has altered their config file to remove this line, this patch will alter behaviour of their instance. However, the example config default has been 10000 since first commit, and that's probably a more sensible default value. It might nonetheless be more sensible to just document the discrepancy. --- lib/carbon/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/carbon/conf.py b/lib/carbon/conf.py index aef5e684..c474cd91 100644 --- a/lib/carbon/conf.py +++ b/lib/carbon/conf.py @@ -68,7 +68,7 @@ MAX_DATAPOINTS_PER_MESSAGE=500, MAX_AGGREGATION_INTERVALS=5, FORWARD_ALL=True, - MAX_QUEUE_SIZE=1000, + MAX_QUEUE_SIZE=10000, QUEUE_LOW_WATERMARK_PCT=0.8, TIME_TO_DEFER_SENDING=0.0001, ENABLE_AMQP=False, From 84d5f94baddd4709f13514e6d0be8309f474ff88 Mon Sep 17 00:00:00 2001 From: David Buckley Date: Wed, 21 Aug 2024 17:53:15 +0100 Subject: [PATCH 3/5] [#956] allow instances to go 25% over their hard max --- conf/carbon.conf.example | 24 ++++++++++++++++++++---- lib/carbon/client.py | 9 ++++++++- lib/carbon/conf.py | 1 + 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/conf/carbon.conf.example b/conf/carbon.conf.example index ebb76e46..561c3e48 100644 --- a/conf/carbon.conf.example +++ b/conf/carbon.conf.example @@ -472,8 +472,11 @@ DESTINATIONS = 127.0.0.1:2004 # This is the maximum number of datapoints that can be queued up # for a single destination. Once this limit is hit, we will -# stop accepting new data if USE_FLOW_CONTROL is True, otherwise -# we will drop any subsequently received datapoints. +# stop accepting new data if USE_FLOW_CONTROL is True. In-flight and +# internally-generated datapoints will still be processed, and data +# will still be dropped if MAX_QUEUE_SIZE_HARD_PCT * MAX_QUEUE_SIZE +# is hit. If USE_FLOW_CONTROL is False, metrics are immediately dropped +# after MAX_QUEUE_SIZE, and MAX_QUEUE_SIZE_HARD_PCT is unused. MAX_QUEUE_SIZE = 10000 # This is the factor that the queue must be empty before it will accept @@ -485,6 +488,11 @@ MAX_QUEUE_SIZE = 10000 # even before the relay incrementally clears more of the queue QUEUE_LOW_WATERMARK_PCT = 0.8 +# This is the factor of the max length of a queue before data will be dropped +# with USE_FLOW_CONTROL enabled. When incoming data is paused, in-flight data +# is still processed, which can send a queue slightly over the configured max. +MAX_QUEUE_SIZE_HARD_PCT = 1.25 + # Set this to False to drop datapoints when any send queue (sending datapoints # to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the # default) then sockets over which metrics are received will temporarily stop accepting @@ -619,8 +627,11 @@ REPLICATION_FACTOR = 1 # This is the maximum number of datapoints that can be queued up # for a single destination. Once this limit is hit, we will -# stop accepting new data if USE_FLOW_CONTROL is True, otherwise -# we will drop any subsequently received datapoints. +# stop accepting new data if USE_FLOW_CONTROL is True. In-flight and +# internally-generated datapoints will still be processed, and data +# will still be dropped if MAX_QUEUE_SIZE_HARD_PCT * MAX_QUEUE_SIZE +# is hit. If USE_FLOW_CONTROL is False, metrics are immediately dropped +# after MAX_QUEUE_SIZE, and MAX_QUEUE_SIZE_HARD_PCT is unused. MAX_QUEUE_SIZE = 10000 # This is the factor that the queue must be empty before it will accept @@ -632,6 +643,11 @@ MAX_QUEUE_SIZE = 10000 # even before the relay incrementally clears more of the queue QUEUE_LOW_WATERMARK_PCT = 0.8 +# This is the factor of the max length of a queue before data will be dropped +# with USE_FLOW_CONTROL enabled. When incoming data is paused, in-flight data +# is still processed, which can send a queue slightly over the configured max. +MAX_QUEUE_SIZE_HARD_PCT = 1.25 + # Set this to False to drop datapoints when any send queue (sending datapoints # to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the # default) then sockets over which metrics are received will temporarily stop accepting diff --git a/lib/carbon/client.py b/lib/carbon/client.py index d9ba5a99..6bed7b7f 100644 --- a/lib/carbon/client.py +++ b/lib/carbon/client.py @@ -35,6 +35,10 @@ SEND_QUEUE_LOW_WATERMARK = settings.MAX_QUEUE_SIZE * settings.QUEUE_LOW_WATERMARK_PCT +if settings.USE_FLOW_CONTROL: + SEND_QUEUE_HARD_MAX = settings.MAX_QUEUE_SIZE * settings.MAX_QUEUE_SIZE_HARD_PCT +else: + SEND_QUEUE_HARD_MAX = settings.MAX_QUEUE_SIZE class CarbonClientProtocol(object): @@ -350,7 +354,10 @@ def sendDatapoint(self, metric, datapoint): if self.queueSize >= settings.MAX_QUEUE_SIZE: if not self.queueFull.called: self.queueFull.callback(self.queueSize) - instrumentation.increment(self.fullQueueDrops) + if self.queueSize < SEND_QUEUE_HARD_MAX: + self.enqueue(metric, datapoint) + else: + instrumentation.increment(self.fullQueueDrops) else: self.enqueue(metric, datapoint) diff --git a/lib/carbon/conf.py b/lib/carbon/conf.py index c474cd91..8a99df06 100644 --- a/lib/carbon/conf.py +++ b/lib/carbon/conf.py @@ -70,6 +70,7 @@ FORWARD_ALL=True, MAX_QUEUE_SIZE=10000, QUEUE_LOW_WATERMARK_PCT=0.8, + MAX_QUEUE_SIZE_HARD_PCT=1.25, TIME_TO_DEFER_SENDING=0.0001, ENABLE_AMQP=False, AMQP_METRIC_NAME_IN_BODY=False, From 4d321ef6dc581ddd14b5d389319bad034fb1592f Mon Sep 17 00:00:00 2001 From: David Buckley Date: Wed, 21 Aug 2024 17:53:16 +0100 Subject: [PATCH 4/5] [#956] allow cache to slightly exceed max size I've used a smaller overage for this setting, as the cache is a much slower process than the relay, so hopefully its size is much higher. --- lib/carbon/cache.py | 13 ++++++++++++- lib/carbon/conf.py | 4 ++++ lib/carbon/events.py | 3 ++- lib/carbon/tests/benchmark_cache.py | 4 ++++ lib/carbon/tests/test_cache.py | 30 ++++++++++++++++++++++++++++- 5 files changed, 51 insertions(+), 3 deletions(-) diff --git a/lib/carbon/cache.py b/lib/carbon/cache.py index 72ccf6cf..0304143a 100644 --- a/lib/carbon/cache.py +++ b/lib/carbon/cache.py @@ -208,6 +208,13 @@ def watermarks(self): @property def is_full(self): + if settings.CACHE_SIZE_HARD_MAX == float('inf'): + return False + else: + return self.size >= settings.CACHE_SIZE_HARD_MAX + + @property + def is_nearly_full(self): if settings.MAX_CACHE_SIZE == float('inf'): return False else: @@ -252,8 +259,12 @@ def store(self, metric, datapoint): # Not a duplicate, hence process if cache is not full if self.is_full: log.msg("MetricCache is full: self.size=%d" % self.size) - events.cacheFull() + events.cacheOverflow() else: + if self.is_nearly_full: + # This will disable reading when flow control is enabled + log.msg("MetricCache is nearly full: self.size=%d" % self.size) + events.cacheFull() if not self[metric]: self.new_metrics.append(metric) self.size += 1 diff --git a/lib/carbon/conf.py b/lib/carbon/conf.py index 8a99df06..43532255 100644 --- a/lib/carbon/conf.py +++ b/lib/carbon/conf.py @@ -297,6 +297,10 @@ def cleanpath(path): state.database = database_class(settings) settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95 + if settings.USE_FLOW_CONTROL: + settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE * 1.05 + else: + settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE if "action" not in self: self["action"] = "start" diff --git a/lib/carbon/events.py b/lib/carbon/events.py index a7057ae3..ae36f473 100644 --- a/lib/carbon/events.py +++ b/lib/carbon/events.py @@ -22,6 +22,7 @@ def __call__(self, *args, **kwargs): metricReceived = Event('metricReceived') metricGenerated = Event('metricGenerated') +cacheOverflow = Event('cacheOverflow') cacheFull = Event('cacheFull') cacheSpaceAvailable = Event('cacheSpaceAvailable') pauseReceivingMetrics = Event('pauseReceivingMetrics') @@ -32,7 +33,7 @@ def __call__(self, *args, **kwargs): lambda metric, datapoint: state.instrumentation.increment('metricsReceived')) -cacheFull.addHandler(lambda: state.instrumentation.increment('cache.overflow')) +cacheOverflow.addHandler(lambda: state.instrumentation.increment('cache.overflow')) cacheFull.addHandler(lambda: setattr(state, 'cacheTooFull', True)) cacheSpaceAvailable.addHandler(lambda: setattr(state, 'cacheTooFull', False)) diff --git a/lib/carbon/tests/benchmark_cache.py b/lib/carbon/tests/benchmark_cache.py index 1c2b1379..aa7948f5 100644 --- a/lib/carbon/tests/benchmark_cache.py +++ b/lib/carbon/tests/benchmark_cache.py @@ -4,6 +4,10 @@ NaiveStrategy, MaxStrategy, RandomStrategy, SortedStrategy, \ TimeSortedStrategy, BucketMaxStrategy +from carbon.conf import settings + +settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95 +settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE * 1.05 metric_cache = _MetricCache(DrainStrategy) count = 0 diff --git a/lib/carbon/tests/test_cache.py b/lib/carbon/tests/test_cache.py index 4eb391da..8bd8bb9d 100644 --- a/lib/carbon/tests/test_cache.py +++ b/lib/carbon/tests/test_cache.py @@ -11,6 +11,7 @@ class MetricCacheTest(TestCase): def setUp(self): settings = { 'MAX_CACHE_SIZE': float('inf'), + 'CACHE_SIZE_HARD_MAX': float('inf'), 'CACHE_SIZE_LOW_WATERMARK': float('inf') } self._settings_patch = patch.dict('carbon.conf.settings', settings) @@ -67,6 +68,13 @@ def test_store_checks_fullness(self): def test_store_on_full_triggers_events(self): is_full_mock = PropertyMock(return_value=True) with patch.object(_MetricCache, 'is_full', is_full_mock): + with patch('carbon.cache.events') as events_mock: + self.metric_cache.store('foo', (123456, 1.0)) + events_mock.cacheOverflow.assert_called_with() + + def test_store_on_nearly_full_triggers_events(self): + is_nearly_full_mock = PropertyMock(return_value=True) + with patch.object(_MetricCache, 'is_nearly_full', is_nearly_full_mock): with patch('carbon.cache.events') as events_mock: self.metric_cache.store('foo', (123456, 1.0)) events_mock.cacheFull.assert_called_with() @@ -150,7 +158,7 @@ def test_is_full_short_circuits_on_inf(self): size_mock.assert_not_called() def test_is_full(self): - self._settings_patch.values['MAX_CACHE_SIZE'] = 2.0 + self._settings_patch.values['CACHE_SIZE_HARD_MAX'] = 2.0 self._settings_patch.start() with patch('carbon.cache.events'): self.assertFalse(self.metric_cache.is_full) @@ -178,8 +186,18 @@ def test_counts_multiple_datapoints(self): class DrainStrategyTest(TestCase): def setUp(self): + settings = { + 'MAX_CACHE_SIZE': float('inf'), + 'CACHE_SIZE_HARD_MAX': float('inf'), + 'CACHE_SIZE_LOW_WATERMARK': float('inf') + } + self._settings_patch = patch.dict('carbon.conf.settings', settings) + self._settings_patch.start() self.metric_cache = _MetricCache() + def tearDown(self): + self._settings_patch.stop() + def test_bucketmax_strategy(self): bucketmax_strategy = BucketMaxStrategy(self.metric_cache) self.metric_cache.strategy = bucketmax_strategy @@ -303,8 +321,18 @@ def test_time_sorted_strategy_min_lag(self): class RandomStrategyTest(TestCase): def setUp(self): + settings = { + 'MAX_CACHE_SIZE': float('inf'), + 'CACHE_SIZE_HARD_MAX': float('inf'), + 'CACHE_SIZE_LOW_WATERMARK': float('inf') + } + self._settings_patch = patch.dict('carbon.conf.settings', settings) + self._settings_patch.start() self.metric_cache = _MetricCache() + def tearDown(self): + self._settings_patch.stop() + def test_random_strategy(self): self.metric_cache.store('foo', (123456, 1.0)) self.metric_cache.store('bar', (123457, 2.0)) From dbf292521276916da843858299379b61c9297adf Mon Sep 17 00:00:00 2001 From: David Buckley Date: Wed, 21 Aug 2024 17:53:18 +0100 Subject: [PATCH 5/5] enable USE_FLOW_CONTROL on aggregator, rewriter and relay --- lib/carbon/service.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lib/carbon/service.py b/lib/carbon/service.py index 0c0fb894..f7b85ea7 100644 --- a/lib/carbon/service.py +++ b/lib/carbon/service.py @@ -170,6 +170,10 @@ def setupAggregatorProcessor(root_service, settings): "aggregation processor: file does not exist {0}".format(aggregation_rules_path)) RuleManager.read_from(aggregation_rules_path) + if settings.USE_FLOW_CONTROL: + events.cacheFull.addHandler(events.pauseReceivingMetrics) + events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics) + def setupRewriterProcessor(root_service, settings): from carbon.rewrite import RewriteRuleManager @@ -177,6 +181,10 @@ def setupRewriterProcessor(root_service, settings): rewrite_rules_path = settings["rewrite-rules"] RewriteRuleManager.read_from(rewrite_rules_path) + if settings.USE_FLOW_CONTROL: + events.cacheFull.addHandler(events.pauseReceivingMetrics) + events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics) + def setupRelayProcessor(root_service, settings): from carbon.routers import DatapointRouter @@ -191,6 +199,10 @@ def setupRelayProcessor(root_service, settings): for destination in util.parseDestinations(settings.DESTINATIONS): state.client_manager.startClient(destination) + if settings.USE_FLOW_CONTROL: + events.cacheFull.addHandler(events.pauseReceivingMetrics) + events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics) + def setupWriterProcessor(root_service, settings): from carbon import cache # NOQA Register CacheFeedingProcessor