Skip to content

Commit

Permalink
aggregator -- allow batches of metrics from the past to be sent
Browse files Browse the repository at this point in the history
```
== warmup ==
1 3.7179997889325023e-06
1000 0.00032810999982757494
10000 0.003592125000068336
100000 0.03169123900079285
1000000 0.31713802899867005
10000000 3.157578217000264

== noop ==
1 2.0410006982274354e-06
1000 0.00042683100036811084
10000 0.003553029999238788
100000 0.031913518001601915
1000000 0.31525603199952457
10000000 3.1432045359997574

== sum ==
1 9.91100023384206e-06
1000 0.00038492900057462975
10000 0.003426478999244864
100000 0.031299194999519386
1000000 0.3173040299989225
10000000 3.1426827399991453

== fake ==
1 4.47000093117822e-06
1000 0.00035055999978794716
10000 0.0035677849991770927
100000 0.031690031000835006
1000000 0.31694292799875257
10000000 3.1394117309992
```

No appreciable difference in benchmark output. If anything, it's faster.
  • Loading branch information
bucko909 committed Aug 21, 2024
1 parent 7533eee commit 1618009
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 22 deletions.
13 changes: 10 additions & 3 deletions conf/carbon.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -637,9 +637,16 @@ USE_FLOW_CONTROL = True
# You shouldn't need to tune this unless you really know what you're doing.
MAX_DATAPOINTS_PER_MESSAGE = 500

# This defines how many datapoints the aggregator remembers for
# each metric. Aggregation only happens for datapoints that fall in
# the past MAX_AGGREGATION_INTERVALS * intervalSize seconds.
# This defines how many intervals behind the current one the aggregator
# remembers for each metric. Intervals are expired when either:
# * They have seen no new datapoints in the last
# (MAX_AGGREGATION_INTERVALS * configured_frequency) seconds, or
# * There are more than (MAX_AGGREGATION_INTERVALS + 2) intervals with
# datapoints in them. (This allows an application to replay past metrics with
# sensible behaviour.)
# Intervals are expired only when they are aggregated.
# Expired intervals will be treated as empty if new datapoints arrive.
# See WRITE_BACK_FREQUENCY to control the aggregation frequency.
MAX_AGGREGATION_INTERVALS = 5

# Limit the number of open connections the receiver can handle as any time.
Expand Down
33 changes: 20 additions & 13 deletions lib/carbon/aggregator/buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,30 @@ def configure_aggregation(self, frequency, func):
def compute_value(self):
now = int(time.time())
current_interval = now - (now % self.aggregation_frequency)
max_aggregation_intervals = settings['MAX_AGGREGATION_INTERVALS']
age_threshold = current_interval - (
settings['MAX_AGGREGATION_INTERVALS'] * self.aggregation_frequency)
max_aggregation_intervals * self.aggregation_frequency)

for buffer in list(self.interval_buffers.values()):
if buffer.active:
if buffer.inactive_since is None:
value = self.aggregation_func(buffer.values)
datapoint = (buffer.interval, value)
state.events.metricGenerated(self.metric_path, datapoint)
state.instrumentation.increment('aggregateDatapointsSent')
buffer.mark_inactive()
buffer.mark_inactive(current_interval)

if buffer.interval < age_threshold:
elif buffer.inactive_since < age_threshold:
del self.interval_buffers[buffer.interval]
if not self.interval_buffers:
self.close()
self.configured = False
del BufferManager.buffers[self.metric_path]

if len(self.interval_buffers) > max_aggregation_intervals + 2:
ordered_intervals = sorted(self.interval_buffers)
for interval in ordered_intervals[:-max_aggregation_intervals - 2]:
del self.interval_buffers[interval]

if not self.interval_buffers:
self.close()
self.configured = False
del BufferManager.buffers[self.metric_path]

def close(self):
if self.compute_task and self.compute_task.running:
Expand All @@ -89,19 +96,19 @@ def size(self):


class IntervalBuffer:
__slots__ = ('interval', 'values', 'active')
__slots__ = ('interval', 'values', 'inactive_since')

def __init__(self, interval):
self.interval = interval
self.values = []
self.active = True
self.inactive_since = None

def input(self, datapoint):
self.values.append(datapoint[1])
self.active = True
self.inactive_since = None

def mark_inactive(self):
self.active = False
def mark_inactive(self, interval):
self.inactive_since = interval


# Shared importable singleton
Expand Down
33 changes: 27 additions & 6 deletions lib/carbon/tests/test_aggregator_buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def test_compute_value_marks_buffer_inactive(self):

with patch.object(IntervalBuffer, 'mark_inactive') as mark_inactive_mock:
self.metric_buffer.compute_value()
mark_inactive_mock.assert_called_once_with()
mark_inactive_mock.assert_called_once_with(600)

@patch("time.time", new=Mock(return_value=600))
@patch("carbon.state.events.metricGenerated", new=Mock())
Expand All @@ -129,7 +129,7 @@ def test_compute_value_computes_aggregate(self):
def test_compute_value_skips_inactive_buffers(self, metric_generated_mock):
interval_buffer = IntervalBuffer(600)
interval_buffer.input((600, 1.0))
interval_buffer.mark_inactive()
interval_buffer.mark_inactive(600)
self.metric_buffer.interval_buffers[600] = interval_buffer

self.metric_buffer.compute_value()
Expand Down Expand Up @@ -177,26 +177,47 @@ def test_compute_value_deletes_expired_buffers(self):

interval_buffer = IntervalBuffer(600)
interval_buffer.input((600, 1.0))
interval_buffer.mark_inactive()
interval_buffer.mark_inactive(600)
self.metric_buffer.interval_buffers[600] = interval_buffer

# 2nd interval for current time
interval_buffer = IntervalBuffer(current_interval)
interval_buffer.input((current_interval, 1.0))
interval_buffer.mark_inactive()
interval_buffer.mark_inactive(current_interval)
self.metric_buffer.interval_buffers[current_interval] = interval_buffer

with patch("time.time", new=Mock(return_value=current_interval + 60)):
self.metric_buffer.compute_value()
self.assertFalse(600 in self.metric_buffer.interval_buffers)

@patch("carbon.state.events.metricGenerated")
def test_compute_value_deletes_too_many_buffers(self, metric_generated_mock):
from carbon.conf import settings
current_interval = 600 + 60 * settings['MAX_AGGREGATION_INTERVALS']

# We should keep 2 more than MAX_AGGREGATION_INTERVALS.
calls = []
for i in range(settings['MAX_AGGREGATION_INTERVALS'] + 4):
interval = 600 + i * 60
interval_buffer = IntervalBuffer(interval)
interval_buffer.input((interval, 1.0))
self.metric_buffer.interval_buffers[interval] = interval_buffer
calls = [call("carbon.foo.bar", (interval, 1.0))]

with patch("time.time", new=Mock(return_value=600)):
self.metric_buffer.compute_value()
metric_generated_mock.assert_has_calls(calls)
self.assertFalse(600 in self.metric_buffer.interval_buffers)
self.assertFalse(660 in self.metric_buffer.interval_buffers)
self.assertTrue(720 in self.metric_buffer.interval_buffers)

def test_compute_value_closes_metric_if_last_buffer_deleted(self):
from carbon.conf import settings
current_interval = 600 + 60 * settings['MAX_AGGREGATION_INTERVALS']

interval_buffer = IntervalBuffer(600)
interval_buffer.input((600, 1.0))
interval_buffer.mark_inactive()
interval_buffer.mark_inactive(600)
self.metric_buffer.interval_buffers[600] = interval_buffer
BufferManager.buffers['carbon.foo.bar'] = self.metric_buffer

Expand All @@ -211,7 +232,7 @@ def test_compute_value_unregisters_metric_if_last_buffer_deleted(self):

interval_buffer = IntervalBuffer(600)
interval_buffer.input((600, 1.0))
interval_buffer.mark_inactive()
interval_buffer.mark_inactive(600)
self.metric_buffer.interval_buffers[600] = interval_buffer
BufferManager.buffers['carbon.foo.bar'] = self.metric_buffer

Expand Down

0 comments on commit 1618009

Please sign in to comment.