From ceccd6ba716065c9b049f4c6790ee9c3e98568f5 Mon Sep 17 00:00:00 2001 From: Eddie Torres Date: Tue, 3 Oct 2023 15:31:58 +0000 Subject: [PATCH] Metric instrumentation framework Signed-off-by: Eddie Torres --- cmd/main.go | 16 +-- pkg/cloud/aws_metrics.go | 75 ------------- pkg/cloud/cloud.go | 1 - pkg/cloud/handlers.go | 11 +- pkg/metrics/metrics.go | 209 ++++++++++++++++++++++++++++++++++++ pkg/metrics/metrics_test.go | 106 ++++++++++++++++++ 6 files changed, 326 insertions(+), 92 deletions(-) delete mode 100644 pkg/cloud/aws_metrics.go create mode 100644 pkg/metrics/metrics.go create mode 100644 pkg/metrics/metrics_test.go diff --git a/cmd/main.go b/cmd/main.go index 2dc8e8403..5194e20a6 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,16 +18,14 @@ package main import ( "context" - "net/http" "time" flag "github.com/spf13/pflag" - "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/metrics" logsapi "k8s.io/component-base/logs/api/v1" json "k8s.io/component-base/logs/json" - "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" ) @@ -58,17 +56,9 @@ func main() { }() } - cloud.RegisterMetrics() if options.ServerOptions.HttpEndpoint != "" { - mux := http.NewServeMux() - mux.Handle("/metrics", legacyregistry.HandlerWithReset()) - go func() { - err := http.ListenAndServe(options.ServerOptions.HttpEndpoint, mux) - if err != nil { - klog.ErrorS(err, "failed to listen & serve metrics", "endpoint", options.ServerOptions.HttpEndpoint) - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } - }() + r := metrics.Recorder() + r.InitializeHttpHandler(options.ServerOptions.HttpEndpoint, "/metrics") } drv, err := driver.NewDriver( diff --git a/pkg/cloud/aws_metrics.go b/pkg/cloud/aws_metrics.go deleted file mode 100644 index 5f48369ab..000000000 --- a/pkg/cloud/aws_metrics.go +++ /dev/null @@ -1,75 +0,0 @@ -//go:build !providerless -// +build !providerless - -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cloud - -import ( - "sync" - - "k8s.io/component-base/metrics" - "k8s.io/component-base/metrics/legacyregistry" -) - -var ( - awsAPIMetric = metrics.NewHistogramVec( - &metrics.HistogramOpts{ - Name: "cloudprovider_aws_api_request_duration_seconds", - Help: "Latency of AWS API calls", - StabilityLevel: metrics.ALPHA, - }, - []string{"request"}) - - awsAPIErrorMetric = metrics.NewCounterVec( - &metrics.CounterOpts{ - Name: "cloudprovider_aws_api_request_errors", - Help: "AWS API errors", - StabilityLevel: metrics.ALPHA, - }, - []string{"request"}) - - awsAPIThrottlesMetric = metrics.NewCounterVec( - &metrics.CounterOpts{ - Name: "cloudprovider_aws_api_throttled_requests_total", - Help: "AWS API throttled requests", - StabilityLevel: metrics.ALPHA, - }, - []string{"operation_name"}) -) - -func recordAWSMetric(actionName string, timeTaken float64, err error) { - if err != nil { - awsAPIErrorMetric.With(metrics.Labels{"request": actionName}).Inc() - } else { - awsAPIMetric.With(metrics.Labels{"request": actionName}).Observe(timeTaken) - } -} - -func recordAWSThrottlesMetric(operation string) { - awsAPIThrottlesMetric.With(metrics.Labels{"operation_name": operation}).Inc() -} - -var registerOnce sync.Once - -func RegisterMetrics() { - registerOnce.Do(func() { - legacyregistry.MustRegister(awsAPIMetric) - legacyregistry.MustRegister(awsAPIErrorMetric) - legacyregistry.MustRegister(awsAPIThrottlesMetric) - }) -} diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 30baee8b3..aa493497f 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -245,7 +245,6 @@ var _ Cloud = &cloud{} // NewCloud returns a new instance of AWS cloud // It panics if session is invalid func NewCloud(region string, awsSdkDebugLog bool, userAgentExtra string) (Cloud, error) { - RegisterMetrics() return newEC2Cloud(region, awsSdkDebugLog, userAgentExtra) } diff --git a/pkg/cloud/handlers.go b/pkg/cloud/handlers.go index d90797fde..0127cefb7 100644 --- a/pkg/cloud/handlers.go +++ b/pkg/cloud/handlers.go @@ -20,19 +20,24 @@ import ( "time" "github.com/aws/aws-sdk-go/aws/request" - + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/metrics" "k8s.io/klog/v2" ) // RecordRequestsComplete is added to the Complete chain; called after any request func RecordRequestsHandler(r *request.Request) { - recordAWSMetric(operationName(r), time.Since(r.Time).Seconds(), r.Error) + if r.Error != nil { + metrics.Recorder().Error(metrics.AWS, operationName(r)) + } else { + duration := time.Since(r.Time).Seconds() + metrics.Recorder().Duration(metrics.AWS, operationName(r), duration) + } } // RecordThrottlesAfterRetry is added to the AfterRetry chain; called after any error func RecordThrottledRequestsHandler(r *request.Request) { if r.IsErrorThrottle() { - recordAWSThrottlesMetric(operationName(r)) + metrics.Recorder().Throttle(metrics.AWS, operationName(r)) klog.InfoS("Got RequestLimitExceeded error on AWS request", "request", describeRequest(r)) } } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 000000000..30251bb29 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,209 @@ +// Package metrics provides functionality for recording and serving +// metric information related to AWS API calls and internal driver operations. +package metrics + +import ( + "net/http" + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/klog/v2" +) + +const ( + // Metric types + AWS metricType = iota + Internal +) +const ( + // AWS API metrics + awsDurationMetric = "cloudprovider_aws_api_request_duration_seconds" + awsThrottleMetric = "cloudprovider_aws_api_throttled_requests_total" + awsErrorMetric = "cloudprovider_aws_api_request_errors" + + // Internal driver metrics + internalDurationMetric = "ebs_csi_aws_com_duration_seconds" + internalValueMetric = "ebs_csi_aws_com_operation" + internalErrorMetric = "ebs_csi_aws_com_errors_total" +) + +var ( + // r is the singleton instance of metricRecorder. + r *metricRecorder + + // initialize ensures that metricRecorder is initialized only once. + initialize sync.Once +) + +// metricType is an enum for the different types of metrics. +type metricType int + +// metricRecorder provides an organized way of recording and managing metrics related to both AWS and driver operations. +type metricRecorder struct { + // registry is the central store where all metrics are registered and exposed for collection. + registry metrics.KubeRegistry + + // metrics maps a metric type (AWS or Internal) to its associated measurement data -- duration, values, error counts, and throttling events. + metrics map[metricType]metricFields +} + +// metricFields holds the metrics for a specific metric type. +type metricFields struct { + duration *metrics.HistogramVec + value *metrics.HistogramVec + err *metrics.CounterVec + throttle *metrics.CounterVec +} + +// Recorder returns the singleton instance of metricRecorder. +// Metrics are registered only once throughout the driver's lifetime. +func Recorder() *metricRecorder { + initialize.Do(func() { + r = newRecorder() + r.registerMetrics() + }) + return r +} + +// Duration logs the duration of an operation for the specified metric type. +// +// Parameters: +// - t: The type of the metric (e.g., AWS or Internal). +// - o: The name of the operation for which the metric is being recorded. +// - d: The duration taken for the action in seconds. +func (m *metricRecorder) Duration(t metricType, o string, d float64) { + if metric, ok := m.metrics[t]; ok { + metric.duration.With(metrics.Labels{"request": o}).Observe(d) + } else { + klog.Warningf("Metric type %d not found", t) + } +} + +// Error logs an error for the specified metric type. +// +// Parameters: +// - t: The type of the metric. +// - o: The name of the operation where the error occurred. +func (m *metricRecorder) Error(t metricType, o string) { + if metric, ok := m.metrics[t]; ok { + metric.err.With(metrics.Labels{"request": o}).Inc() + } else { + klog.Warningf("Metric type %d not found", t) + } +} + +// Throttle logs a throttling event for the metric type. +// +// Parameters: +// - t: The type of the metric. +// - o: The name of the operation that was throttled. +func (m *metricRecorder) Throttle(t metricType, o string) { + if metric, ok := m.metrics[t]; ok { + metric.throttle.With(metrics.Labels{"operation_name": o}).Inc() + } else { + klog.Warningf("Metric type %d not found", t) + } +} + +// Value logs a value observation for the specified metric type. +// Parameters: +// - t: The type of the metric. +// - o: The operation name. +// - v: The observed value to be recorded. +func (m *metricRecorder) Value(t metricType, o string, v float64) { + if metric, ok := m.metrics[t]; ok { + metric.value.With(metrics.Labels{"operation": o}).Observe(v) + } else { + klog.Warningf("Metric type %d not found", t) + } +} + +// InitializeHttpHandler sets up and starts an HTTP server to serve the recorded metrics. +// +// Parameters: +// - address: The server's listening address. +// - path: The URL path to access the metrics. +func (m *metricRecorder) InitializeHttpHandler(address, path string) { + mux := http.NewServeMux() + + mux.Handle(path, metrics.HandlerFor( + m.registry, + metrics.HandlerOpts{ + ErrorHandling: metrics.ContinueOnError})) + + go func() { + klog.InfoS("Metric server listening", "address", address, "path", path) + + if err := http.ListenAndServe(address, mux); err != nil { + klog.ErrorS(err, "Failed to start metric server", "address", address, "path", path) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + }() +} + +// newRecorder returns a new instance of metricRecorder. +func newRecorder() *metricRecorder { + valueBucket := []float64{.1, .25, .5, 1, 2.5, 5, 10, 100, 250, 500, 1000} + + return &metricRecorder{ + registry: metrics.NewKubeRegistry(), + + metrics: map[metricType]metricFields{ + AWS: { + duration: createHistogramVec(awsDurationMetric, "Latency of AWS API calls", []string{"request"}, nil), + throttle: createCounterVec(awsThrottleMetric, "AWS API throttled requests", []string{"operation_name"}), + err: createCounterVec(awsErrorMetric, "AWS API errors", []string{"request"}), + }, + Internal: { + value: createHistogramVec(internalValueMetric, "Driver operation metric values", []string{"operation"}, valueBucket), + duration: createHistogramVec(internalDurationMetric, "Driver operation duration", []string{"operation"}, nil), + err: createCounterVec(internalErrorMetric, "Driver operation errors", []string{"request"}), + }, + }, + } +} + +// registerMetrics registers metrics to the metricRecorder's internal KubeRegistry. +func (m *metricRecorder) registerMetrics() { + for _, metric := range m.metrics { + if metric.duration != nil { + m.registry.MustRegister(metric.duration) + } + if metric.value != nil { + m.registry.MustRegister(metric.value) + } + if metric.err != nil { + m.registry.MustRegister(metric.err) + } + if metric.throttle != nil { + m.registry.MustRegister(metric.throttle) + } + } +} + +// createHistogramVec helper to create a histogram vector for a specific metric. +func createHistogramVec(name, help string, labels []string, buckets []float64) *metrics.HistogramVec { + opts := &metrics.HistogramOpts{ + Name: name, + Help: help, + StabilityLevel: metrics.ALPHA, + } + + if buckets != nil { + opts.Buckets = buckets + } + + return metrics.NewHistogramVec(opts, labels) +} + +// createCounterVec helper to create a counter vector for a specific metric. +func createCounterVec(name, help string, labels []string) *metrics.CounterVec { + return metrics.NewCounterVec( + &metrics.CounterOpts{ + Name: name, + Help: help, + StabilityLevel: metrics.ALPHA, + }, + labels, + ) +} diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go new file mode 100644 index 000000000..394dcf38c --- /dev/null +++ b/pkg/metrics/metrics_test.go @@ -0,0 +1,106 @@ +package metrics + +import ( + "strings" + "testing" + + "k8s.io/component-base/metrics/testutil" +) + +type test struct { + name string + metric string + exec func(m *metricRecorder) + expected string +} + +func TestMetricRecorder(t *testing.T) { + tests := []test{ + { + name: "TestMetricRecorder: Duration", + metric: awsDurationMetric, + exec: func(m *metricRecorder) { + m.Duration(AWS, "TestAction", 1.5) + }, + expected: ` + # HELP cloudprovider_aws_api_request_duration_seconds [ALPHA] Latency of AWS API calls + # TYPE cloudprovider_aws_api_request_duration_seconds histogram + cloudprovider_aws_api_request_duration_seconds_bucket{request="TestAction",le="0.005"} 0 + cloudprovider_aws_api_request_duration_seconds_bucket{request="TestAction",le="0.01"} 0 + cloudprovider_aws_api_request_duration_seconds_bucket{request="TestAction",le="0.025"} 0 + cloudprovider_aws_api_request_duration_seconds_bucket{request="TestAction",le="0.05"} 0 + cloudprovider_aws_api_request_duration_seconds_bucket{request="TestAction",le="0.1"} 0 + cloudprovider_aws_api_request_duration_seconds_bucket{request="TestAction",le="0.25"} 0 + cloudprovider_aws_api_request_duration_seconds_bucket{request="TestAction",le="0.5"} 0 + cloudprovider_aws_api_request_duration_seconds_bucket{request="TestAction",le="1"} 0 + cloudprovider_aws_api_request_duration_seconds_bucket{request="TestAction",le="2.5"} 1 + cloudprovider_aws_api_request_duration_seconds_bucket{request="TestAction",le="5"} 1 + cloudprovider_aws_api_request_duration_seconds_bucket{request="TestAction",le="10"} 1 + cloudprovider_aws_api_request_duration_seconds_bucket{request="TestAction",le="+Inf"} 1 + cloudprovider_aws_api_request_duration_seconds_sum{request="TestAction"} 1.5 + cloudprovider_aws_api_request_duration_seconds_count{request="TestAction"} 1 + `, + }, + { + name: "TestMetricRecorder: Error", + metric: internalErrorMetric, + exec: func(m *metricRecorder) { + m.Error(Internal, "TestAction") + }, + expected: ` + # HELP ebs_csi_aws_com_errors_total [ALPHA] Driver operation errors + # TYPE ebs_csi_aws_com_errors_total counter + ebs_csi_aws_com_errors_total{request="TestAction"} 1 + `, + }, + { + name: "TestMetricRecorder: Throttle", + metric: awsThrottleMetric, + exec: func(m *metricRecorder) { + m.Throttle(AWS, "TestAction") + }, + expected: ` + # HELP cloudprovider_aws_api_throttled_requests_total [ALPHA] AWS API throttled requests + # TYPE cloudprovider_aws_api_throttled_requests_total counter + cloudprovider_aws_api_throttled_requests_total{operation_name="TestAction"} 1 + `, + }, + { + name: "TestMetricRecorder: Value", + metric: internalValueMetric, + exec: func(m *metricRecorder) { + m.Value(Internal, "TestOperation", 51.0) + }, + expected: ` + # HELP ebs_csi_aws_com_operation [ALPHA] Driver operation metric values + # TYPE ebs_csi_aws_com_operation histogram + ebs_csi_aws_com_operation_bucket{operation="TestOperation",le="0.1"} 0 + ebs_csi_aws_com_operation_bucket{operation="TestOperation",le="0.25"} 0 + ebs_csi_aws_com_operation_bucket{operation="TestOperation",le="0.5"} 0 + ebs_csi_aws_com_operation_bucket{operation="TestOperation",le="1"} 0 + ebs_csi_aws_com_operation_bucket{operation="TestOperation",le="2.5"} 0 + ebs_csi_aws_com_operation_bucket{operation="TestOperation",le="5"} 0 + ebs_csi_aws_com_operation_bucket{operation="TestOperation",le="10"} 0 + ebs_csi_aws_com_operation_bucket{operation="TestOperation",le="100"} 1 + ebs_csi_aws_com_operation_bucket{operation="TestOperation",le="250"} 1 + ebs_csi_aws_com_operation_bucket{operation="TestOperation",le="500"} 1 + ebs_csi_aws_com_operation_bucket{operation="TestOperation",le="1000"} 1 + ebs_csi_aws_com_operation_bucket{operation="TestOperation",le="+Inf"} 1 + ebs_csi_aws_com_operation_sum{operation="TestOperation"} 51 + ebs_csi_aws_com_operation_count{operation="TestOperation"} 1 + `, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := Recorder() + + tt.exec(m) + + if err := testutil.GatherAndCompare(m.registry, strings.NewReader(tt.expected), tt.metric); err != nil { + t.Fatal(err) + } + }) + } +}