diff --git a/conf/carbon.conf.example b/conf/carbon.conf.example index e919de1d..91a90ec2 100644 --- a/conf/carbon.conf.example +++ b/conf/carbon.conf.example @@ -637,9 +637,16 @@ USE_FLOW_CONTROL = True # You shouldn't need to tune this unless you really know what you're doing. MAX_DATAPOINTS_PER_MESSAGE = 500 -# This defines how many datapoints the aggregator remembers for -# each metric. Aggregation only happens for datapoints that fall in -# the past MAX_AGGREGATION_INTERVALS * intervalSize seconds. +# This defines how many intervals behind the current one the aggregator +# remembers for each metric. Intervals are expired when either: +# * They have seen no new datapoints in the last +# (MAX_AGGREGATION_INTERVALS * configured_frequency) seconds, or +# * There are more than (MAX_AGGREGATION_INTERVALS + 2) intervals with +# datapoints in them. (This allows an application to replay past metrics with +# sensible behaviour.) +# Intervals are expired only when they are aggregated. +# Expired intervals will be treated as empty if new datapoints arrive. +# See WRITE_BACK_FREQUENCY to control the aggregation frequency. MAX_AGGREGATION_INTERVALS = 5 # Limit the number of open connections the receiver can handle as any time. diff --git a/lib/carbon/aggregator/buffers.py b/lib/carbon/aggregator/buffers.py index 096dd6c7..baeb707f 100644 --- a/lib/carbon/aggregator/buffers.py +++ b/lib/carbon/aggregator/buffers.py @@ -61,23 +61,30 @@ def configure_aggregation(self, frequency, func): def compute_value(self): now = int(time.time()) current_interval = now - (now % self.aggregation_frequency) + max_aggregation_intervals = settings['MAX_AGGREGATION_INTERVALS'] age_threshold = current_interval - ( - settings['MAX_AGGREGATION_INTERVALS'] * self.aggregation_frequency) + max_aggregation_intervals * self.aggregation_frequency) for buffer in list(self.interval_buffers.values()): - if buffer.active: + if buffer.inactive_since is None: value = self.aggregation_func(buffer.values) datapoint = (buffer.interval, value) state.events.metricGenerated(self.metric_path, datapoint) state.instrumentation.increment('aggregateDatapointsSent') - buffer.mark_inactive() + buffer.mark_inactive(current_interval) - if buffer.interval < age_threshold: + elif buffer.inactive_since < age_threshold: del self.interval_buffers[buffer.interval] - if not self.interval_buffers: - self.close() - self.configured = False - del BufferManager.buffers[self.metric_path] + + if len(self.interval_buffers) > max_aggregation_intervals + 2: + ordered_intervals = sorted(self.interval_buffers) + for interval in ordered_intervals[:-max_aggregation_intervals - 2]: + del self.interval_buffers[interval] + + if not self.interval_buffers: + self.close() + self.configured = False + del BufferManager.buffers[self.metric_path] def close(self): if self.compute_task and self.compute_task.running: @@ -89,19 +96,19 @@ def size(self): class IntervalBuffer: - __slots__ = ('interval', 'values', 'active') + __slots__ = ('interval', 'values', 'inactive_since') def __init__(self, interval): self.interval = interval self.values = [] - self.active = True + self.inactive_since = None def input(self, datapoint): self.values.append(datapoint[1]) - self.active = True + self.inactive_since = None - def mark_inactive(self): - self.active = False + def mark_inactive(self, interval): + self.inactive_since = interval # Shared importable singleton diff --git a/lib/carbon/tests/test_aggregator_buffers.py b/lib/carbon/tests/test_aggregator_buffers.py index ffac75e5..da04ff88 100644 --- a/lib/carbon/tests/test_aggregator_buffers.py +++ b/lib/carbon/tests/test_aggregator_buffers.py @@ -109,7 +109,7 @@ def test_compute_value_marks_buffer_inactive(self): with patch.object(IntervalBuffer, 'mark_inactive') as mark_inactive_mock: self.metric_buffer.compute_value() - mark_inactive_mock.assert_called_once_with() + mark_inactive_mock.assert_called_once_with(600) @patch("time.time", new=Mock(return_value=600)) @patch("carbon.state.events.metricGenerated", new=Mock()) @@ -129,7 +129,7 @@ def test_compute_value_computes_aggregate(self): def test_compute_value_skips_inactive_buffers(self, metric_generated_mock): interval_buffer = IntervalBuffer(600) interval_buffer.input((600, 1.0)) - interval_buffer.mark_inactive() + interval_buffer.mark_inactive(600) self.metric_buffer.interval_buffers[600] = interval_buffer self.metric_buffer.compute_value() @@ -177,26 +177,47 @@ def test_compute_value_deletes_expired_buffers(self): interval_buffer = IntervalBuffer(600) interval_buffer.input((600, 1.0)) - interval_buffer.mark_inactive() + interval_buffer.mark_inactive(600) self.metric_buffer.interval_buffers[600] = interval_buffer # 2nd interval for current time interval_buffer = IntervalBuffer(current_interval) interval_buffer.input((current_interval, 1.0)) - interval_buffer.mark_inactive() + interval_buffer.mark_inactive(current_interval) self.metric_buffer.interval_buffers[current_interval] = interval_buffer with patch("time.time", new=Mock(return_value=current_interval + 60)): self.metric_buffer.compute_value() self.assertFalse(600 in self.metric_buffer.interval_buffers) + @patch("carbon.state.events.metricGenerated") + def test_compute_value_deletes_too_many_buffers(self, metric_generated_mock): + from carbon.conf import settings + current_interval = 600 + 60 * settings['MAX_AGGREGATION_INTERVALS'] + + # We should keep 2 more than MAX_AGGREGATION_INTERVALS. + calls = [] + for i in range(settings['MAX_AGGREGATION_INTERVALS'] + 4): + interval = 600 + i * 60 + interval_buffer = IntervalBuffer(interval) + interval_buffer.input((interval, 1.0)) + self.metric_buffer.interval_buffers[interval] = interval_buffer + calls = [call("carbon.foo.bar", (interval, 1.0))] + + with patch("time.time", new=Mock(return_value=600)): + self.metric_buffer.compute_value() + metric_generated_mock.assert_has_calls(calls) + self.assertFalse(600 in self.metric_buffer.interval_buffers) + self.assertFalse(660 in self.metric_buffer.interval_buffers) + self.assertTrue(720 in self.metric_buffer.interval_buffers) + def test_compute_value_closes_metric_if_last_buffer_deleted(self): from carbon.conf import settings current_interval = 600 + 60 * settings['MAX_AGGREGATION_INTERVALS'] interval_buffer = IntervalBuffer(600) interval_buffer.input((600, 1.0)) - interval_buffer.mark_inactive() + interval_buffer.mark_inactive(600) self.metric_buffer.interval_buffers[600] = interval_buffer BufferManager.buffers['carbon.foo.bar'] = self.metric_buffer @@ -211,7 +232,7 @@ def test_compute_value_unregisters_metric_if_last_buffer_deleted(self): interval_buffer = IntervalBuffer(600) interval_buffer.input((600, 1.0)) - interval_buffer.mark_inactive() + interval_buffer.mark_inactive(600) self.metric_buffer.interval_buffers[600] = interval_buffer BufferManager.buffers['carbon.foo.bar'] = self.metric_buffer