From 82fcf3246f8aa09c3488bc35c2f554e239d24c0d Mon Sep 17 00:00:00 2001 From: David Buckley Date: Wed, 21 Aug 2024 17:52:04 +0100 Subject: [PATCH] make metrics from carbon-cache go via relay config --- lib/carbon/instrumentation.py | 2 +- lib/carbon/routers.py | 23 +++++++++++++++++++++++ lib/carbon/service.py | 1 + 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/lib/carbon/instrumentation.py b/lib/carbon/instrumentation.py index 5e4f83e44..49c9abcfb 100644 --- a/lib/carbon/instrumentation.py +++ b/lib/carbon/instrumentation.py @@ -170,7 +170,7 @@ def cache_record(metric, value): else: fullMetric = '%s.agents.%s-%s.%s' % (prefix, HOSTNAME, settings.instance, metric) datapoint = (time.time(), value) - cache.MetricCache().store(fullMetric, datapoint) + state.client_manager.sendDatapoint(fullMetric, datapoint) def relay_record(metric, value): diff --git a/lib/carbon/routers.py b/lib/carbon/routers.py index 9b8d8ef1d..72d93cfa2 100644 --- a/lib/carbon/routers.py +++ b/lib/carbon/routers.py @@ -31,6 +31,29 @@ def getDestinations(self, key): raise NotImplementedError() +class ConstantRouter(DatapointRouter): + plugin_name = 'constant' + + def __init__(self, settings): + self.destinations = set() + + def addDestination(self, destination): + self.destinations.add(destination) + + def removeDestination(self, destination): + self.destinations.discard(destination) + + def hasDestination(self, destination): + return destination in self.destinations + + def countDestinations(self): + return len(self.destinations) + + def getDestinations(self, key): + for destination in self.destinations: + yield destination + + class RelayRulesRouter(DatapointRouter): plugin_name = 'rules' diff --git a/lib/carbon/service.py b/lib/carbon/service.py index 0c0fb8948..5cd6bbb5d 100644 --- a/lib/carbon/service.py +++ b/lib/carbon/service.py @@ -87,6 +87,7 @@ def setupPipeline(pipeline, root_service, settings): setupRelayProcessor(root_service, settings) elif processor == 'write': setupWriterProcessor(root_service, settings) + setupRelayProcessor(root_service, settings) else: raise ValueError("Invalid pipeline processor '%s'" % processor)