diff --git a/lib/carbon/service.py b/lib/carbon/service.py index 0c0fb894..f7b85ea7 100644 --- a/lib/carbon/service.py +++ b/lib/carbon/service.py @@ -170,6 +170,10 @@ def setupAggregatorProcessor(root_service, settings): "aggregation processor: file does not exist {0}".format(aggregation_rules_path)) RuleManager.read_from(aggregation_rules_path) + if settings.USE_FLOW_CONTROL: + events.cacheFull.addHandler(events.pauseReceivingMetrics) + events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics) + def setupRewriterProcessor(root_service, settings): from carbon.rewrite import RewriteRuleManager @@ -177,6 +181,10 @@ def setupRewriterProcessor(root_service, settings): rewrite_rules_path = settings["rewrite-rules"] RewriteRuleManager.read_from(rewrite_rules_path) + if settings.USE_FLOW_CONTROL: + events.cacheFull.addHandler(events.pauseReceivingMetrics) + events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics) + def setupRelayProcessor(root_service, settings): from carbon.routers import DatapointRouter @@ -191,6 +199,10 @@ def setupRelayProcessor(root_service, settings): for destination in util.parseDestinations(settings.DESTINATIONS): state.client_manager.startClient(destination) + if settings.USE_FLOW_CONTROL: + events.cacheFull.addHandler(events.pauseReceivingMetrics) + events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics) + def setupWriterProcessor(root_service, settings): from carbon import cache # NOQA Register CacheFeedingProcessor