From 32a6582836a1c98b79ec7b4835e9da5eace5cd8f Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Wed, 21 Feb 2024 16:18:38 -0500 Subject: [PATCH 01/12] feat: add basic periodic-reader --- .../lib/opentelemetry/sdk/metrics/export.rb | 1 + .../metrics/export/periodic_metric_reader.rb | 70 +++++++++++++++ .../periodic_metric_reader_test.rb | 89 +++++++++++++++++++ 3 files changed, 160 insertions(+) create mode 100644 metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb create mode 100644 metrics_sdk/test/integration/periodic_metric_reader_test.rb diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb index 64a038f819..782a75aae4 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb @@ -26,3 +26,4 @@ module Export require 'opentelemetry/sdk/metrics/export/metric_reader' require 'opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter' require 'opentelemetry/sdk/metrics/export/console_metric_pull_exporter' +require 'opentelemetry/sdk/metrics/export/periodic_metric_reader' diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb new file mode 100644 index 0000000000..dc34f4035b --- /dev/null +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -0,0 +1,70 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module SDK + module Metrics + module Export + # PeriodicMetricReader provides a minimal example implementation. + class PeriodicMetricReader < MetricReader + + def initialize(interval_millis: 60, timout_millis: 30, exporter: nil) + super() + + @interval_millis = interval_millis + @timout_millis = timout_millis + @exporter = exporter + @thread = nil + + start_reader + end + + def start_reader + if @exporter.nil? + OpenTelemetry.logger.warn "Missing exporter in PeriodicMetricReader." + elsif !@thread.nil? + OpenTelemetry.logger.warn "PeriodicMetricReader is running. Please close it if need to restart." + else + @thread = Thread.new { perodic_collect } + end + end + + def perodic_collect + while true + sleep(@interval_millis) + Timeout.timeout(@timout_millis) { @exporter.export(collect) } + end + end + + def close_reader + @thread.kill + @thread.join + rescue StandardError => e + OpenTelemetry.handle_error(exception: e, message: 'Fail to close PeriodicMetricReader.') + ensure + @thread = nil + end + + # TODO: determine correctness: directly kill the reader without waiting for next metrics collection + def shutdown(timeout: nil) + close_reader + @exporter.shutdown + Export::SUCCESS + rescue + Export::FAILURE + end + + def force_flush(timeout: nil) + @exporter.export(collect) + Export::SUCCESS + rescue + Export::FAILURE + end + end + end + end + end +end diff --git a/metrics_sdk/test/integration/periodic_metric_reader_test.rb b/metrics_sdk/test/integration/periodic_metric_reader_test.rb new file mode 100644 index 0000000000..45cef2a078 --- /dev/null +++ b/metrics_sdk/test/integration/periodic_metric_reader_test.rb @@ -0,0 +1,89 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +describe OpenTelemetry::SDK do + describe '#periodic_metric_reader' do + before { reset_metrics_sdk } + + it 'emits 1 metrics after 10 seconds' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(interval_millis: 5, timout_millis: 5, exporter: metric_exporter) + + OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) + + meter = OpenTelemetry.meter_provider.meter('test') + counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something') + + counter.add(1) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(3, attributes: { 'b' => 'c' }) + counter.add(4, attributes: { 'd' => 'e' }) + + _(periodic_metric_reader.instance_variable_get(:@thread).class).must_equal Thread + + sleep(8) + + periodic_metric_reader.shutdown + snapshot = metric_exporter.metric_snapshots + first_snapshot = snapshot[0] + + _(snapshot).wont_be_empty + + _(first_snapshot[0].name).must_equal('counter') + _(first_snapshot[0].unit).must_equal('smidgen') + _(first_snapshot[0].description).must_equal('a small amount of something') + + _(first_snapshot[0].instrumentation_scope.name).must_equal('test') + + _(first_snapshot[0].data_points[0].value).must_equal(1) + _(first_snapshot[0].data_points[0].attributes).must_equal({}) + + _(first_snapshot[0].data_points[1].value).must_equal(4) + _(first_snapshot[0].data_points[1].attributes).must_equal('a' => 'b') + + _(first_snapshot[0].data_points[2].value).must_equal(3) + _(first_snapshot[0].data_points[2].attributes).must_equal('b' => 'c') + + _(first_snapshot[0].data_points[3].value).must_equal(4) + _(first_snapshot[0].data_points[3].attributes).must_equal('d' => 'e') + + _(periodic_metric_reader.instance_variable_get(:@thread)).must_be_nil + end + + it 'emits 0 metrics after 1 seconds when interval is > 1 seconds' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(interval_millis: 5, timout_millis: 5, exporter: metric_exporter) + + OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) + + meter = OpenTelemetry.meter_provider.meter('test') + counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something') + + counter.add(1) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(3, attributes: { 'b' => 'c' }) + counter.add(4, attributes: { 'd' => 'e' }) + + _(periodic_metric_reader.instance_variable_get(:@thread).class).must_equal Thread + + sleep(1) + + periodic_metric_reader.shutdown + snapshot = metric_exporter.metric_snapshots + + _(snapshot.size).must_equal(0) + _(periodic_metric_reader.instance_variable_get(:@thread)).must_be_nil + end + end +end From 3566cd22f6ec133956e16d8f59264f372b4226a2 Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Wed, 21 Feb 2024 16:21:29 -0500 Subject: [PATCH 02/12] feat: lint --- .../sdk/metrics/export/periodic_metric_reader.rb | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb index dc34f4035b..14e01b2a6f 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -10,7 +10,6 @@ module Metrics module Export # PeriodicMetricReader provides a minimal example implementation. class PeriodicMetricReader < MetricReader - def initialize(interval_millis: 60, timout_millis: 30, exporter: nil) super() @@ -24,16 +23,16 @@ def initialize(interval_millis: 60, timout_millis: 30, exporter: nil) def start_reader if @exporter.nil? - OpenTelemetry.logger.warn "Missing exporter in PeriodicMetricReader." + OpenTelemetry.logger.warn 'Missing exporter in PeriodicMetricReader.' elsif !@thread.nil? - OpenTelemetry.logger.warn "PeriodicMetricReader is running. Please close it if need to restart." + OpenTelemetry.logger.warn 'PeriodicMetricReader is running. Please close it if need to restart.' else @thread = Thread.new { perodic_collect } end end def perodic_collect - while true + loop do sleep(@interval_millis) Timeout.timeout(@timout_millis) { @exporter.export(collect) } end @@ -53,14 +52,14 @@ def shutdown(timeout: nil) close_reader @exporter.shutdown Export::SUCCESS - rescue + rescue StandardError Export::FAILURE end def force_flush(timeout: nil) @exporter.export(collect) Export::SUCCESS - rescue + rescue StandardError Export::FAILURE end end From cb844551136dd13027c1122eba2604bd0fb576e5 Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Thu, 22 Feb 2024 14:35:55 -0500 Subject: [PATCH 03/12] feat: make thread properly close --- .../metrics/export/periodic_metric_reader.rb | 40 ++++++++++--------- .../periodic_metric_reader_test.rb | 21 +++++----- 2 files changed, 30 insertions(+), 31 deletions(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb index 14e01b2a6f..4d463ee200 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -16,40 +16,42 @@ def initialize(interval_millis: 60, timout_millis: 30, exporter: nil) @interval_millis = interval_millis @timout_millis = timout_millis @exporter = exporter - @thread = nil + @thread = nil + @continue = false - start_reader + start end - def start_reader + def start + @continue = true if @exporter.nil? OpenTelemetry.logger.warn 'Missing exporter in PeriodicMetricReader.' - elsif !@thread.nil? - OpenTelemetry.logger.warn 'PeriodicMetricReader is running. Please close it if need to restart.' + elsif @thread&.alive? + OpenTelemetry.logger.warn 'PeriodicMetricReader is still running. Please close it if it needs to restart.' else - @thread = Thread.new { perodic_collect } + @thread = Thread.new do + while @continue + sleep(@interval_millis) + begin + Timeout.timeout(@timout_millis) { @exporter.export(collect) } + rescue Timeout::Error => e + OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.') + end + end + end end end - def perodic_collect - loop do - sleep(@interval_millis) - Timeout.timeout(@timout_millis) { @exporter.export(collect) } - end - end - - def close_reader - @thread.kill - @thread.join + def close + @continue = false # force termination in next iteration + @thread.join(5) # wait 5 seconds for collecting and exporting rescue StandardError => e OpenTelemetry.handle_error(exception: e, message: 'Fail to close PeriodicMetricReader.') - ensure - @thread = nil end # TODO: determine correctness: directly kill the reader without waiting for next metrics collection def shutdown(timeout: nil) - close_reader + close @exporter.shutdown Export::SUCCESS rescue StandardError diff --git a/metrics_sdk/test/integration/periodic_metric_reader_test.rb b/metrics_sdk/test/integration/periodic_metric_reader_test.rb index 45cef2a078..bb78f06d9f 100644 --- a/metrics_sdk/test/integration/periodic_metric_reader_test.rb +++ b/metrics_sdk/test/integration/periodic_metric_reader_test.rb @@ -10,7 +10,7 @@ describe '#periodic_metric_reader' do before { reset_metrics_sdk } - it 'emits 1 metrics after 10 seconds' do + it 'emits 2 metrics after 10 seconds' do OpenTelemetry::SDK.configure metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new @@ -27,16 +27,14 @@ counter.add(3, attributes: { 'b' => 'c' }) counter.add(4, attributes: { 'd' => 'e' }) - _(periodic_metric_reader.instance_variable_get(:@thread).class).must_equal Thread - sleep(8) periodic_metric_reader.shutdown - snapshot = metric_exporter.metric_snapshots - first_snapshot = snapshot[0] + snapshot = metric_exporter.metric_snapshots - _(snapshot).wont_be_empty + _(snapshot.size).must_equal(2) + first_snapshot = snapshot[0] _(first_snapshot[0].name).must_equal('counter') _(first_snapshot[0].unit).must_equal('smidgen') _(first_snapshot[0].description).must_equal('a small amount of something') @@ -55,10 +53,11 @@ _(first_snapshot[0].data_points[3].value).must_equal(4) _(first_snapshot[0].data_points[3].attributes).must_equal('d' => 'e') - _(periodic_metric_reader.instance_variable_get(:@thread)).must_be_nil + puts periodic_metric_reader.instance_variable_get(:@thread) + _(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false end - it 'emits 0 metrics after 1 seconds when interval is > 1 seconds' do + it 'emits 1 metrics after 1 seconds when interval is > 1 seconds' do OpenTelemetry::SDK.configure metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new @@ -75,15 +74,13 @@ counter.add(3, attributes: { 'b' => 'c' }) counter.add(4, attributes: { 'd' => 'e' }) - _(periodic_metric_reader.instance_variable_get(:@thread).class).must_equal Thread - sleep(1) periodic_metric_reader.shutdown snapshot = metric_exporter.metric_snapshots - _(snapshot.size).must_equal(0) - _(periodic_metric_reader.instance_variable_get(:@thread)).must_be_nil + _(snapshot.size).must_equal(1) + _(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false end end end From 3731304fc7388e4d177923e269c3e1fe00af38de Mon Sep 17 00:00:00 2001 From: Xuan <112967240+xuan-cao-swi@users.noreply.github.com> Date: Fri, 5 Apr 2024 13:19:14 -0400 Subject: [PATCH 04/12] Update metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb Co-authored-by: Kayla Reopelle (she/her) <87386821+kaylareopelle@users.noreply.github.com> --- .../opentelemetry/sdk/metrics/export/periodic_metric_reader.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb index 4d463ee200..27e5b192cf 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -10,7 +10,7 @@ module Metrics module Export # PeriodicMetricReader provides a minimal example implementation. class PeriodicMetricReader < MetricReader - def initialize(interval_millis: 60, timout_millis: 30, exporter: nil) + def initialize(interval_millis: 60, timeout_millis: 30, exporter: nil) super() @interval_millis = interval_millis From 2aef8bf4d6894ead30363de6c3713608b56890bc Mon Sep 17 00:00:00 2001 From: Xuan <112967240+xuan-cao-swi@users.noreply.github.com> Date: Fri, 5 Apr 2024 13:19:19 -0400 Subject: [PATCH 05/12] Update metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb Co-authored-by: Kayla Reopelle (she/her) <87386821+kaylareopelle@users.noreply.github.com> --- .../opentelemetry/sdk/metrics/export/periodic_metric_reader.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb index 27e5b192cf..a106366eae 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -14,7 +14,7 @@ def initialize(interval_millis: 60, timeout_millis: 30, exporter: nil) super() @interval_millis = interval_millis - @timout_millis = timout_millis + @timeout_millis = timeout_millis @exporter = exporter @thread = nil @continue = false From 8e90ebb898cea62da9c8c22f24f5723ee7867d33 Mon Sep 17 00:00:00 2001 From: Xuan <112967240+xuan-cao-swi@users.noreply.github.com> Date: Fri, 5 Apr 2024 13:19:26 -0400 Subject: [PATCH 06/12] Update metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb Co-authored-by: Kayla Reopelle (she/her) <87386821+kaylareopelle@users.noreply.github.com> --- .../opentelemetry/sdk/metrics/export/periodic_metric_reader.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb index a106366eae..07d67327c3 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -33,7 +33,7 @@ def start while @continue sleep(@interval_millis) begin - Timeout.timeout(@timout_millis) { @exporter.export(collect) } + Timeout.timeout(@timeout_millis) { @exporter.export(collect) } rescue Timeout::Error => e OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.') end From 23d9a1f50166944052b80846d143813e35fe4946 Mon Sep 17 00:00:00 2001 From: Xuan <112967240+xuan-cao-swi@users.noreply.github.com> Date: Fri, 5 Apr 2024 13:19:35 -0400 Subject: [PATCH 07/12] Update metrics_sdk/test/integration/periodic_metric_reader_test.rb Co-authored-by: Kayla Reopelle (she/her) <87386821+kaylareopelle@users.noreply.github.com> --- metrics_sdk/test/integration/periodic_metric_reader_test.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics_sdk/test/integration/periodic_metric_reader_test.rb b/metrics_sdk/test/integration/periodic_metric_reader_test.rb index bb78f06d9f..0603ffc2b7 100644 --- a/metrics_sdk/test/integration/periodic_metric_reader_test.rb +++ b/metrics_sdk/test/integration/periodic_metric_reader_test.rb @@ -57,7 +57,7 @@ _(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false end - it 'emits 1 metrics after 1 seconds when interval is > 1 seconds' do + it 'emits 1 metric after 1 second when interval is > 1 second' do OpenTelemetry::SDK.configure metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new From 512f959e4a1ae99ea9c2a5da4f6568a1c9ad62dc Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Sat, 6 Apr 2024 16:56:31 -0400 Subject: [PATCH 08/12] feat: periodic reader - revision --- .../sdk/metrics/export/periodic_metric_reader.rb | 10 ++++++---- .../test/integration/periodic_metric_reader_test.rb | 5 ++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb index 07d67327c3..23cfe0f2d8 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -10,7 +10,9 @@ module Metrics module Export # PeriodicMetricReader provides a minimal example implementation. class PeriodicMetricReader < MetricReader - def initialize(interval_millis: 60, timeout_millis: 30, exporter: nil) + def initialize(interval_millis: ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60), + timeout_millis: ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30), + exporter: nil) super() @interval_millis = interval_millis @@ -43,15 +45,15 @@ def start end def close - @continue = false # force termination in next iteration - @thread.join(5) # wait 5 seconds for collecting and exporting + @continue = false # force termination in next iteration + @thread.join(@interval_millis) # wait 5 seconds for collecting and exporting rescue StandardError => e OpenTelemetry.handle_error(exception: e, message: 'Fail to close PeriodicMetricReader.') end - # TODO: determine correctness: directly kill the reader without waiting for next metrics collection def shutdown(timeout: nil) close + @exporter.force_flush if @exporter.respond_to?(:force_flush) @exporter.shutdown Export::SUCCESS rescue StandardError diff --git a/metrics_sdk/test/integration/periodic_metric_reader_test.rb b/metrics_sdk/test/integration/periodic_metric_reader_test.rb index 0603ffc2b7..85c0d8bf51 100644 --- a/metrics_sdk/test/integration/periodic_metric_reader_test.rb +++ b/metrics_sdk/test/integration/periodic_metric_reader_test.rb @@ -14,7 +14,7 @@ OpenTelemetry::SDK.configure metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new - periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(interval_millis: 5, timout_millis: 5, exporter: metric_exporter) + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(interval_millis: 5, timeout_millis: 5, exporter: metric_exporter) OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) @@ -53,7 +53,6 @@ _(first_snapshot[0].data_points[3].value).must_equal(4) _(first_snapshot[0].data_points[3].attributes).must_equal('d' => 'e') - puts periodic_metric_reader.instance_variable_get(:@thread) _(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false end @@ -61,7 +60,7 @@ OpenTelemetry::SDK.configure metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new - periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(interval_millis: 5, timout_millis: 5, exporter: metric_exporter) + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(interval_millis: 5, timeout_millis: 5, exporter: metric_exporter) OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) From 63405d53402f2f432288c3ff793557d70f7497a4 Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Tue, 23 Apr 2024 11:25:26 -0400 Subject: [PATCH 09/12] feat: change interval and timeout name --- .../sdk/metrics/export/periodic_metric_reader.rb | 14 +++++++------- .../integration/periodic_metric_reader_test.rb | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb index 23cfe0f2d8..ea5c7ecc6f 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -10,13 +10,13 @@ module Metrics module Export # PeriodicMetricReader provides a minimal example implementation. class PeriodicMetricReader < MetricReader - def initialize(interval_millis: ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60), - timeout_millis: ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30), + def initialize(export_interval: ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60), + export_timeout: ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30), exporter: nil) super() - @interval_millis = interval_millis - @timeout_millis = timeout_millis + @export_interval = export_interval + @export_timeout = export_timeout @exporter = exporter @thread = nil @continue = false @@ -33,9 +33,9 @@ def start else @thread = Thread.new do while @continue - sleep(@interval_millis) + sleep(@export_interval) begin - Timeout.timeout(@timeout_millis) { @exporter.export(collect) } + Timeout.timeout(@export_timeout) { @exporter.export(collect) } rescue Timeout::Error => e OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.') end @@ -46,7 +46,7 @@ def start def close @continue = false # force termination in next iteration - @thread.join(@interval_millis) # wait 5 seconds for collecting and exporting + @thread.join(@export_interval) # wait 5 seconds for collecting and exporting rescue StandardError => e OpenTelemetry.handle_error(exception: e, message: 'Fail to close PeriodicMetricReader.') end diff --git a/metrics_sdk/test/integration/periodic_metric_reader_test.rb b/metrics_sdk/test/integration/periodic_metric_reader_test.rb index 85c0d8bf51..f711cf266f 100644 --- a/metrics_sdk/test/integration/periodic_metric_reader_test.rb +++ b/metrics_sdk/test/integration/periodic_metric_reader_test.rb @@ -14,7 +14,7 @@ OpenTelemetry::SDK.configure metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new - periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(interval_millis: 5, timeout_millis: 5, exporter: metric_exporter) + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval: 5, export_timeout: 5, exporter: metric_exporter) OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) @@ -60,7 +60,7 @@ OpenTelemetry::SDK.configure metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new - periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(interval_millis: 5, timeout_millis: 5, exporter: metric_exporter) + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval: 5, export_timeout: 5, exporter: metric_exporter) OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) From 26620978155bac2d7c13d7efa6f0a5d556f3a07d Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Wed, 22 May 2024 14:07:22 -0400 Subject: [PATCH 10/12] feat: change back to millis --- .../sdk/metrics/export/periodic_metric_reader.rb | 8 ++++---- .../test/integration/periodic_metric_reader_test.rb | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb index ea5c7ecc6f..4abcb4e3bf 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -10,13 +10,13 @@ module Metrics module Export # PeriodicMetricReader provides a minimal example implementation. class PeriodicMetricReader < MetricReader - def initialize(export_interval: ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60), - export_timeout: ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30), + def initialize(export_interval_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60_000)), + export_timeout_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30_000)), exporter: nil) super() - @export_interval = export_interval - @export_timeout = export_timeout + @export_interval = export_interval_millis / 1000.0 + @export_timeout = export_timeout_millis / 1000.0 @exporter = exporter @thread = nil @continue = false diff --git a/metrics_sdk/test/integration/periodic_metric_reader_test.rb b/metrics_sdk/test/integration/periodic_metric_reader_test.rb index f711cf266f..7bf00a08fc 100644 --- a/metrics_sdk/test/integration/periodic_metric_reader_test.rb +++ b/metrics_sdk/test/integration/periodic_metric_reader_test.rb @@ -14,7 +14,7 @@ OpenTelemetry::SDK.configure metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new - periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval: 5, export_timeout: 5, exporter: metric_exporter) + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter) OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) @@ -60,7 +60,7 @@ OpenTelemetry::SDK.configure metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new - periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval: 5, export_timeout: 5, exporter: metric_exporter) + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter) OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) From 81edbc520be342f7260fb9be4592a61a0aa498de Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Wed, 7 Aug 2024 13:56:28 -0400 Subject: [PATCH 11/12] revision --- .../export/in_memory_metric_pull_exporter.rb | 2 +- .../metrics/export/periodic_metric_reader.rb | 71 +++++++++++++------ 2 files changed, 51 insertions(+), 22 deletions(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb index 35c2dfe575..8291ed9fe7 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb @@ -23,7 +23,7 @@ def pull export(collect) end - def export(metrics) + def export(metrics, timeout: nil) @mutex.synchronize do @metric_snapshots << metrics end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb index 4abcb4e3bf..bfaa81764c 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -10,6 +10,18 @@ module Metrics module Export # PeriodicMetricReader provides a minimal example implementation. class PeriodicMetricReader < MetricReader + # Returns a new instance of the {PeriodicMetricReader}. + # + # @param [Integer] export_interval_millis the maximum interval time. + # Defaults to the value of the OTEL_METRIC_EXPORT_INTERVAL environment + # variable, if set, or 60_000. + # @param [Integer] export_timeout_millis the maximum export timeout. + # Defaults to the value of the OTEL_METRIC_EXPORT_TIMEOUT environment + # variable, if set, or 30_000. + # @param [MetricReader] exporter the (duck type) MetricReader to where the + # recorded metrics are pushed after certain interval. + # + # @return a new instance of the {PeriodicMetricReader}. def initialize(export_interval_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60_000)), export_timeout_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30_000)), exporter: nil) @@ -20,22 +32,49 @@ def initialize(export_interval_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_INTER @exporter = exporter @thread = nil @continue = false + @mutex = Mutex.new + @export_mutex = Mutex.new start end + def shutdown(timeout: nil) + thread = lock do + @continue = false # force termination in next iteration + @thread + end + thread&.join(@export_interval) + @exporter.force_flush if @exporter.respond_to?(:force_flush) + @exporter.shutdown + Export::SUCCESS + rescue StandardError => e + OpenTelemetry.handle_error(exception: e, message: 'Fail to shutdown PeriodicMetricReader.') + Export::FAILURE + end + + def force_flush(timeout: nil) + export(timeout: timeout) + Export::SUCCESS + rescue StandardError + Export::FAILURE + end + + private + def start @continue = true if @exporter.nil? OpenTelemetry.logger.warn 'Missing exporter in PeriodicMetricReader.' elsif @thread&.alive? - OpenTelemetry.logger.warn 'PeriodicMetricReader is still running. Please close it if it needs to restart.' + OpenTelemetry.logger.warn 'PeriodicMetricReader is still running. Please shutdown it if it needs to restart.' else @thread = Thread.new do while @continue sleep(@export_interval) begin - Timeout.timeout(@export_timeout) { @exporter.export(collect) } + Timeout.timeout(@export_timeout) { + export(timeout: @export_timeout) + } rescue Timeout::Error => e OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.') end @@ -44,27 +83,17 @@ def start end end - def close - @continue = false # force termination in next iteration - @thread.join(@export_interval) # wait 5 seconds for collecting and exporting - rescue StandardError => e - OpenTelemetry.handle_error(exception: e, message: 'Fail to close PeriodicMetricReader.') + def export(timeout: nil) + @export_mutex.synchronize { + collected_metrics = collect + @exporter.export(collected_metrics, timeout: timeout || @export_timeout) if collected_metrics.size > 0 + } end - def shutdown(timeout: nil) - close - @exporter.force_flush if @exporter.respond_to?(:force_flush) - @exporter.shutdown - Export::SUCCESS - rescue StandardError - Export::FAILURE - end - - def force_flush(timeout: nil) - @exporter.export(collect) - Export::SUCCESS - rescue StandardError - Export::FAILURE + def lock + @mutex.synchronize do + yield + end end end end From cfa4974a554add29146c8a3e6b4adb30c8783fe7 Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Wed, 7 Aug 2024 14:06:06 -0400 Subject: [PATCH 12/12] lint --- .../sdk/metrics/export/periodic_metric_reader.rb | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb index bfaa81764c..948f98976c 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -72,9 +72,9 @@ def start while @continue sleep(@export_interval) begin - Timeout.timeout(@export_timeout) { + Timeout.timeout(@export_timeout) do export(timeout: @export_timeout) - } + end rescue Timeout::Error => e OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.') end @@ -84,16 +84,14 @@ def start end def export(timeout: nil) - @export_mutex.synchronize { + @export_mutex.synchronize do collected_metrics = collect - @exporter.export(collected_metrics, timeout: timeout || @export_timeout) if collected_metrics.size > 0 - } + @exporter.export(collected_metrics, timeout: timeout || @export_timeout) unless collected_metrics.empty? + end end - def lock - @mutex.synchronize do - yield - end + def lock(&block) + @mutex.synchronize(&block) end end end