Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add basic periodic exporting metric_reader #1603

Merged
merged 20 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
32a6582
feat: add basic periodic-reader
xuan-cao-swi Feb 21, 2024
3566cd2
feat: lint
xuan-cao-swi Feb 21, 2024
cb84455
feat: make thread properly close
xuan-cao-swi Feb 22, 2024
0ecb543
Merge branch 'main' of github.com:xuan-cao-swi/opentelemetry-ruby int…
xuan-cao-swi Mar 26, 2024
3731304
Update metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metr…
xuan-cao-swi Apr 5, 2024
2aef8bf
Update metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metr…
xuan-cao-swi Apr 5, 2024
8e90ebb
Update metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metr…
xuan-cao-swi Apr 5, 2024
23d9a1f
Update metrics_sdk/test/integration/periodic_metric_reader_test.rb
xuan-cao-swi Apr 5, 2024
02055b7
Merge branch 'main' into periodic-reader
xuan-cao-swi Apr 5, 2024
2d31c72
Merge branch 'periodic-reader' of github.com:xuan-cao-swi/opentelemet…
xuan-cao-swi Apr 5, 2024
512f959
feat: periodic reader - revision
xuan-cao-swi Apr 6, 2024
da09c44
Merge branch 'main' into periodic-reader
xuan-cao-swi Apr 10, 2024
ed9029c
Merge branch 'main' into periodic-reader
xuan-cao-swi Apr 23, 2024
63405d5
feat: change interval and timeout name
xuan-cao-swi Apr 23, 2024
9fad788
Merge branch 'main' into periodic-reader
xuan-cao-swi May 15, 2024
2662097
feat: change back to millis
xuan-cao-swi May 22, 2024
4fd4e2f
Merge branch 'main' into periodic-reader
xuan-cao-swi Aug 7, 2024
81edbc5
revision
xuan-cao-swi Aug 7, 2024
cfa4974
lint
xuan-cao-swi Aug 7, 2024
0cb373c
Merge branch 'main' into periodic-reader
mwear Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ module Export
require 'opentelemetry/sdk/metrics/export/metric_reader'
require 'opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter'
require 'opentelemetry/sdk/metrics/export/console_metric_pull_exporter'
require 'opentelemetry/sdk/metrics/export/periodic_metric_reader'
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Export
# PeriodicMetricReader provides a minimal example implementation.
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved
class PeriodicMetricReader < MetricReader
def initialize(export_interval_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60_000)),
export_timeout_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30_000)),
exporter: nil)
super()

@export_interval = export_interval_millis / 1000.0
@export_timeout = export_timeout_millis / 1000.0
@exporter = exporter
@thread = nil
@continue = false

start
end

def start
@continue = true
if @exporter.nil?
OpenTelemetry.logger.warn 'Missing exporter in PeriodicMetricReader.'
elsif @thread&.alive?
OpenTelemetry.logger.warn 'PeriodicMetricReader is still running. Please close it if it needs to restart.'
else
@thread = Thread.new do
while @continue
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved
sleep(@export_interval)
begin
Timeout.timeout(@export_timeout) { @exporter.export(collect) }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also pass timeout: @export_timeout to the @exporter?
Same as here

result_code = @export_mutex.synchronize { @exporter.export(batch, timeout: timeout) }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the Timeout.timeout(@export_timeout) { ... } will serve the purpose of timeout exporter if the exporter doesn't behave correctly (e.g. hanging).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I was just saying maybe we should pass the timeout also to the exporter to give it a chance to behave properly. We still keep the surrounding Timeout.timeout(@export_timeout) { ... } block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I have updated the part that pass timeout to exporter

rescue Timeout::Error => e
OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.') evaluating to FAILURE ? if not then we should return FAILURE explicitly here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think metric_reader shouldn't return anything (similar to metric_reader); only exporter will return either fail or success from export

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, you are absolutely right.

end
end
end
end
end

def close
@continue = false # force termination in next iteration
@thread.join(@export_interval) # wait 5 seconds for collecting and exporting
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'Fail to close PeriodicMetricReader.')
end

def shutdown(timeout: nil)
close
@exporter.force_flush if @exporter.respond_to?(:force_flush)
@exporter.shutdown
Export::SUCCESS
rescue StandardError
Export::FAILURE
end

def force_flush(timeout: nil)
@exporter.export(collect)
Export::SUCCESS
rescue StandardError
Export::FAILURE
end
end
end
end
end
end
85 changes: 85 additions & 0 deletions metrics_sdk/test/integration/periodic_metric_reader_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'test_helper'

describe OpenTelemetry::SDK do
describe '#periodic_metric_reader' do
before { reset_metrics_sdk }

it 'emits 2 metrics after 10 seconds' do
OpenTelemetry::SDK.configure
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved

metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter)

OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader)

meter = OpenTelemetry.meter_provider.meter('test')
counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something')

counter.add(1)
counter.add(2, attributes: { 'a' => 'b' })
counter.add(2, attributes: { 'a' => 'b' })
counter.add(3, attributes: { 'b' => 'c' })
counter.add(4, attributes: { 'd' => 'e' })

sleep(8)
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved

periodic_metric_reader.shutdown
snapshot = metric_exporter.metric_snapshots

_(snapshot.size).must_equal(2)

first_snapshot = snapshot[0]
_(first_snapshot[0].name).must_equal('counter')
_(first_snapshot[0].unit).must_equal('smidgen')
_(first_snapshot[0].description).must_equal('a small amount of something')

_(first_snapshot[0].instrumentation_scope.name).must_equal('test')

_(first_snapshot[0].data_points[0].value).must_equal(1)
_(first_snapshot[0].data_points[0].attributes).must_equal({})

_(first_snapshot[0].data_points[1].value).must_equal(4)
_(first_snapshot[0].data_points[1].attributes).must_equal('a' => 'b')

_(first_snapshot[0].data_points[2].value).must_equal(3)
_(first_snapshot[0].data_points[2].attributes).must_equal('b' => 'c')

_(first_snapshot[0].data_points[3].value).must_equal(4)
_(first_snapshot[0].data_points[3].attributes).must_equal('d' => 'e')

_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
end

it 'emits 1 metric after 1 second when interval is > 1 second' do
OpenTelemetry::SDK.configure

metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter)

OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader)

meter = OpenTelemetry.meter_provider.meter('test')
counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something')

counter.add(1)
counter.add(2, attributes: { 'a' => 'b' })
counter.add(2, attributes: { 'a' => 'b' })
counter.add(3, attributes: { 'b' => 'c' })
counter.add(4, attributes: { 'd' => 'e' })

sleep(1)

periodic_metric_reader.shutdown
snapshot = metric_exporter.metric_snapshots

_(snapshot.size).must_equal(1)
_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
end
end
end
Loading