From 420fbcd73dad9b74245af0eae28a1b3a8e09cba6 Mon Sep 17 00:00:00 2001 From: David Buckley Date: Wed, 21 Aug 2024 16:03:39 +0100 Subject: [PATCH] enable USE_FLOW_CONTROL on aggregator, rewriter and relay --- lib/carbon/service.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lib/carbon/service.py b/lib/carbon/service.py index 0c0fb8948..f7b85ea7e 100644 --- a/lib/carbon/service.py +++ b/lib/carbon/service.py @@ -170,6 +170,10 @@ def setupAggregatorProcessor(root_service, settings): "aggregation processor: file does not exist {0}".format(aggregation_rules_path)) RuleManager.read_from(aggregation_rules_path) + if settings.USE_FLOW_CONTROL: + events.cacheFull.addHandler(events.pauseReceivingMetrics) + events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics) + def setupRewriterProcessor(root_service, settings): from carbon.rewrite import RewriteRuleManager @@ -177,6 +181,10 @@ def setupRewriterProcessor(root_service, settings): rewrite_rules_path = settings["rewrite-rules"] RewriteRuleManager.read_from(rewrite_rules_path) + if settings.USE_FLOW_CONTROL: + events.cacheFull.addHandler(events.pauseReceivingMetrics) + events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics) + def setupRelayProcessor(root_service, settings): from carbon.routers import DatapointRouter @@ -191,6 +199,10 @@ def setupRelayProcessor(root_service, settings): for destination in util.parseDestinations(settings.DESTINATIONS): state.client_manager.startClient(destination) + if settings.USE_FLOW_CONTROL: + events.cacheFull.addHandler(events.pauseReceivingMetrics) + events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics) + def setupWriterProcessor(root_service, settings): from carbon import cache # NOQA Register CacheFeedingProcessor