diff --git a/conf/carbon.conf.example b/conf/carbon.conf.example index e919de1d..e8be3066 100644 --- a/conf/carbon.conf.example +++ b/conf/carbon.conf.example @@ -274,10 +274,27 @@ WHISPER_FALLOCATE_CREATE = True # By default, carbon itself will log statistics (such as a count, # metricsReceived) with the top level prefix of 'carbon' at an interval of 60 -# seconds. Set CARBON_METRIC_INTERVAL to 0 to disable instrumentation +# seconds. Set CARBON_METRIC_INTERVAL to 0 to disable instrumentation. See +# also: RELAY_CACHE_METRICS. # CARBON_METRIC_PREFIX = carbon # CARBON_METRIC_INTERVAL = 60 +# By default (RELAY_CACHE_METRICS = False), each cache will keep its internal +# metrics locally. If you run multiple instances, this is likely to violate +# your RELAY_METHOD, which means the webapp will query the wrong cache to find +# recent data, and if the caches don't share storage, it will become impossible +# to query many metrics at all. In order to get metrics to the right place, +# instead configure the cache to forward its internal metrics to an external +# instance (RELAY_CACHE_METRICS = True). RELAY_METHOD, REPLICATION_FACTOR, +# DESTINATIONS and DESTINATION_PROTOCOL will be used to determine the endpoint +# for each datapoint -- either make them consistent with a relay, or set up a +# relay as the sole destination. +# RELAY_METHOD = constant +# REPLICATION_FACTOR = 1 +# DESTINATIONS = 127.0.0.1:2014 +# DESTINATION_PROTOCOL = pickle +RELAY_CACHE_METRICS = False + # Enable AMQP if you want to receive metrics using an amqp broker # ENABLE_AMQP = False @@ -611,6 +628,10 @@ FORWARD_ALL = True # Note that if the destinations are all carbon-caches then this should # exactly match the webapp's CARBONLINK_HOSTS setting in terms of # instances listed (order matters!). +# +# Also note that if there are multiple DESTINATIONS, it's likely that +# RELAY_CACHE_METRICS will need to be set on the caches for correct webapp +# behaviour. DESTINATIONS = 127.0.0.1:2004 # If you want to add redundancy to your data by replicating every diff --git a/lib/carbon/conf.py b/lib/carbon/conf.py index aef5e684..aaa72adb 100644 --- a/lib/carbon/conf.py +++ b/lib/carbon/conf.py @@ -106,6 +106,7 @@ USE_WHITELIST=False, CARBON_METRIC_PREFIX='carbon', CARBON_METRIC_INTERVAL=60, + RELAY_CACHE_METRICS=False, CACHE_WRITE_STRATEGY='sorted', WRITE_BACK_FREQUENCY=None, MIN_RESET_STAT_FLOW=1000, diff --git a/lib/carbon/instrumentation.py b/lib/carbon/instrumentation.py index 5e4f83e4..62d625d3 100644 --- a/lib/carbon/instrumentation.py +++ b/lib/carbon/instrumentation.py @@ -170,7 +170,10 @@ def cache_record(metric, value): else: fullMetric = '%s.agents.%s-%s.%s' % (prefix, HOSTNAME, settings.instance, metric) datapoint = (time.time(), value) - cache.MetricCache().store(fullMetric, datapoint) + if settings.RELAY_CACHE_METRICS: + state.client_manager.sendDatapoint(fullMetric, datapoint) + else: + cache.MetricCache().store(fullMetric, datapoint) def relay_record(metric, value): diff --git a/lib/carbon/routers.py b/lib/carbon/routers.py index 9b8d8ef1..72d93cfa 100644 --- a/lib/carbon/routers.py +++ b/lib/carbon/routers.py @@ -31,6 +31,29 @@ def getDestinations(self, key): raise NotImplementedError() +class ConstantRouter(DatapointRouter): + plugin_name = 'constant' + + def __init__(self, settings): + self.destinations = set() + + def addDestination(self, destination): + self.destinations.add(destination) + + def removeDestination(self, destination): + self.destinations.discard(destination) + + def hasDestination(self, destination): + return destination in self.destinations + + def countDestinations(self): + return len(self.destinations) + + def getDestinations(self, key): + for destination in self.destinations: + yield destination + + class RelayRulesRouter(DatapointRouter): plugin_name = 'rules' diff --git a/lib/carbon/service.py b/lib/carbon/service.py index 0c0fb894..35abd07c 100644 --- a/lib/carbon/service.py +++ b/lib/carbon/service.py @@ -87,6 +87,8 @@ def setupPipeline(pipeline, root_service, settings): setupRelayProcessor(root_service, settings) elif processor == 'write': setupWriterProcessor(root_service, settings) + if settings.RELAY_CACHE_METRICS: + setupRelayProcessor(root_service, settings) else: raise ValueError("Invalid pipeline processor '%s'" % processor) diff --git a/lib/carbon/tests/test_routers.py b/lib/carbon/tests/test_routers.py index f6309e8f..8cc3e476 100644 --- a/lib/carbon/tests/test_routers.py +++ b/lib/carbon/tests/test_routers.py @@ -54,5 +54,5 @@ def testBasic(self): router.addDestination(parseDestination(destination)) self.assertEqual( len(list(router.getDestinations('foo.bar'))), - settings['REPLICATION_FACTOR'] + len(DESTINATIONS) if plugin == 'constant' else settings['REPLICATION_FACTOR'] )