Skip to content

Commit

Permalink
Metric instrumentation framework
Browse files Browse the repository at this point in the history
Signed-off-by: Eddie Torres <[email protected]>
  • Loading branch information
torredil committed Oct 3, 2023
1 parent 27ad245 commit ceccd6b
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 92 deletions.
16 changes: 3 additions & 13 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ package main

import (
"context"
"net/http"
"time"

flag "github.com/spf13/pflag"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/metrics"
logsapi "k8s.io/component-base/logs/api/v1"
json "k8s.io/component-base/logs/json"
"k8s.io/component-base/metrics/legacyregistry"

"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -58,17 +56,9 @@ func main() {
}()
}

cloud.RegisterMetrics()
if options.ServerOptions.HttpEndpoint != "" {
mux := http.NewServeMux()
mux.Handle("/metrics", legacyregistry.HandlerWithReset())
go func() {
err := http.ListenAndServe(options.ServerOptions.HttpEndpoint, mux)
if err != nil {
klog.ErrorS(err, "failed to listen & serve metrics", "endpoint", options.ServerOptions.HttpEndpoint)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}()
r := metrics.Recorder()
r.InitializeHttpHandler(options.ServerOptions.HttpEndpoint, "/metrics")
}

drv, err := driver.NewDriver(
Expand Down
75 changes: 0 additions & 75 deletions pkg/cloud/aws_metrics.go

This file was deleted.

1 change: 0 additions & 1 deletion pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ var _ Cloud = &cloud{}
// NewCloud returns a new instance of AWS cloud
// It panics if session is invalid
func NewCloud(region string, awsSdkDebugLog bool, userAgentExtra string) (Cloud, error) {
RegisterMetrics()
return newEC2Cloud(region, awsSdkDebugLog, userAgentExtra)
}

Expand Down
11 changes: 8 additions & 3 deletions pkg/cloud/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,24 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws/request"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/metrics"
"k8s.io/klog/v2"
)

// RecordRequestsComplete is added to the Complete chain; called after any request
func RecordRequestsHandler(r *request.Request) {
recordAWSMetric(operationName(r), time.Since(r.Time).Seconds(), r.Error)
if r.Error != nil {
metrics.Recorder().Error(metrics.AWS, operationName(r))
} else {
duration := time.Since(r.Time).Seconds()
metrics.Recorder().Duration(metrics.AWS, operationName(r), duration)
}
}

// RecordThrottlesAfterRetry is added to the AfterRetry chain; called after any error
func RecordThrottledRequestsHandler(r *request.Request) {
if r.IsErrorThrottle() {
recordAWSThrottlesMetric(operationName(r))
metrics.Recorder().Throttle(metrics.AWS, operationName(r))
klog.InfoS("Got RequestLimitExceeded error on AWS request", "request", describeRequest(r))
}
}
Expand Down
209 changes: 209 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Package metrics provides functionality for recording and serving
// metric information related to AWS API calls and internal driver operations.
package metrics

import (
"net/http"
"sync"

"k8s.io/component-base/metrics"
"k8s.io/klog/v2"
)

const (
// Metric types
AWS metricType = iota
Internal
)
const (
// AWS API metrics
awsDurationMetric = "cloudprovider_aws_api_request_duration_seconds"
awsThrottleMetric = "cloudprovider_aws_api_throttled_requests_total"
awsErrorMetric = "cloudprovider_aws_api_request_errors"

// Internal driver metrics
internalDurationMetric = "ebs_csi_aws_com_duration_seconds"
internalValueMetric = "ebs_csi_aws_com_operation"
internalErrorMetric = "ebs_csi_aws_com_errors_total"
)

var (
// r is the singleton instance of metricRecorder.
r *metricRecorder

// initialize ensures that metricRecorder is initialized only once.
initialize sync.Once
)

// metricType is an enum for the different types of metrics.
type metricType int

// metricRecorder provides an organized way of recording and managing metrics related to both AWS and driver operations.
type metricRecorder struct {
// registry is the central store where all metrics are registered and exposed for collection.
registry metrics.KubeRegistry

// metrics maps a metric type (AWS or Internal) to its associated measurement data -- duration, values, error counts, and throttling events.
metrics map[metricType]metricFields
}

// metricFields holds the metrics for a specific metric type.
type metricFields struct {
duration *metrics.HistogramVec
value *metrics.HistogramVec
err *metrics.CounterVec
throttle *metrics.CounterVec
}

// Recorder returns the singleton instance of metricRecorder.
// Metrics are registered only once throughout the driver's lifetime.
func Recorder() *metricRecorder {
initialize.Do(func() {
r = newRecorder()
r.registerMetrics()
})
return r
}

// Duration logs the duration of an operation for the specified metric type.
//
// Parameters:
// - t: The type of the metric (e.g., AWS or Internal).
// - o: The name of the operation for which the metric is being recorded.
// - d: The duration taken for the action in seconds.
func (m *metricRecorder) Duration(t metricType, o string, d float64) {
if metric, ok := m.metrics[t]; ok {
metric.duration.With(metrics.Labels{"request": o}).Observe(d)
} else {
klog.Warningf("Metric type %d not found", t)
}
}

// Error logs an error for the specified metric type.
//
// Parameters:
// - t: The type of the metric.
// - o: The name of the operation where the error occurred.
func (m *metricRecorder) Error(t metricType, o string) {
if metric, ok := m.metrics[t]; ok {
metric.err.With(metrics.Labels{"request": o}).Inc()
} else {
klog.Warningf("Metric type %d not found", t)
}
}

// Throttle logs a throttling event for the metric type.
//
// Parameters:
// - t: The type of the metric.
// - o: The name of the operation that was throttled.
func (m *metricRecorder) Throttle(t metricType, o string) {
if metric, ok := m.metrics[t]; ok {
metric.throttle.With(metrics.Labels{"operation_name": o}).Inc()
} else {
klog.Warningf("Metric type %d not found", t)
}
}

// Value logs a value observation for the specified metric type.
// Parameters:
// - t: The type of the metric.
// - o: The operation name.
// - v: The observed value to be recorded.
func (m *metricRecorder) Value(t metricType, o string, v float64) {
if metric, ok := m.metrics[t]; ok {
metric.value.With(metrics.Labels{"operation": o}).Observe(v)
} else {
klog.Warningf("Metric type %d not found", t)
}
}

// InitializeHttpHandler sets up and starts an HTTP server to serve the recorded metrics.
//
// Parameters:
// - address: The server's listening address.
// - path: The URL path to access the metrics.
func (m *metricRecorder) InitializeHttpHandler(address, path string) {
mux := http.NewServeMux()

mux.Handle(path, metrics.HandlerFor(
m.registry,
metrics.HandlerOpts{
ErrorHandling: metrics.ContinueOnError}))

go func() {
klog.InfoS("Metric server listening", "address", address, "path", path)

if err := http.ListenAndServe(address, mux); err != nil {
klog.ErrorS(err, "Failed to start metric server", "address", address, "path", path)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}()
}

// newRecorder returns a new instance of metricRecorder.
func newRecorder() *metricRecorder {
valueBucket := []float64{.1, .25, .5, 1, 2.5, 5, 10, 100, 250, 500, 1000}

return &metricRecorder{
registry: metrics.NewKubeRegistry(),

metrics: map[metricType]metricFields{
AWS: {
duration: createHistogramVec(awsDurationMetric, "Latency of AWS API calls", []string{"request"}, nil),
throttle: createCounterVec(awsThrottleMetric, "AWS API throttled requests", []string{"operation_name"}),
err: createCounterVec(awsErrorMetric, "AWS API errors", []string{"request"}),
},
Internal: {
value: createHistogramVec(internalValueMetric, "Driver operation metric values", []string{"operation"}, valueBucket),
duration: createHistogramVec(internalDurationMetric, "Driver operation duration", []string{"operation"}, nil),
err: createCounterVec(internalErrorMetric, "Driver operation errors", []string{"request"}),
},
},
}
}

// registerMetrics registers metrics to the metricRecorder's internal KubeRegistry.
func (m *metricRecorder) registerMetrics() {
for _, metric := range m.metrics {
if metric.duration != nil {
m.registry.MustRegister(metric.duration)
}
if metric.value != nil {
m.registry.MustRegister(metric.value)
}
if metric.err != nil {
m.registry.MustRegister(metric.err)
}
if metric.throttle != nil {
m.registry.MustRegister(metric.throttle)
}
}
}

// createHistogramVec helper to create a histogram vector for a specific metric.
func createHistogramVec(name, help string, labels []string, buckets []float64) *metrics.HistogramVec {
opts := &metrics.HistogramOpts{
Name: name,
Help: help,
StabilityLevel: metrics.ALPHA,
}

if buckets != nil {
opts.Buckets = buckets
}

return metrics.NewHistogramVec(opts, labels)
}

// createCounterVec helper to create a counter vector for a specific metric.
func createCounterVec(name, help string, labels []string) *metrics.CounterVec {
return metrics.NewCounterVec(
&metrics.CounterOpts{
Name: name,
Help: help,
StabilityLevel: metrics.ALPHA,
},
labels,
)
}
Loading

0 comments on commit ceccd6b

Please sign in to comment.