Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric Instrumentation Framework #1767

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ package main

import (
"context"
"net/http"
"time"

flag "github.com/spf13/pflag"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/metrics"
logsapi "k8s.io/component-base/logs/api/v1"
json "k8s.io/component-base/logs/json"
"k8s.io/component-base/metrics/legacyregistry"

"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -58,17 +56,9 @@ func main() {
}()
}

cloud.RegisterMetrics()
if options.ServerOptions.HttpEndpoint != "" {
mux := http.NewServeMux()
mux.Handle("/metrics", legacyregistry.HandlerWithReset())
go func() {
err := http.ListenAndServe(options.ServerOptions.HttpEndpoint, mux)
if err != nil {
klog.ErrorS(err, "failed to listen & serve metrics", "endpoint", options.ServerOptions.HttpEndpoint)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}()
r := metrics.InitializeRecorder()
r.InitializeMetricsHandler(options.ServerOptions.HttpEndpoint, "/metrics")
}

drv, err := driver.NewDriver(
Expand Down
75 changes: 0 additions & 75 deletions pkg/cloud/aws_metrics.go

This file was deleted.

1 change: 0 additions & 1 deletion pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ var _ Cloud = &cloud{}
// NewCloud returns a new instance of AWS cloud
// It panics if session is invalid
func NewCloud(region string, awsSdkDebugLog bool, userAgentExtra string) (Cloud, error) {
RegisterMetrics()
return newEC2Cloud(region, awsSdkDebugLog, userAgentExtra)
}

Expand Down
23 changes: 18 additions & 5 deletions pkg/cloud/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,32 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws/request"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/metrics"
"k8s.io/klog/v2"
)

// RecordRequestsComplete is added to the Complete chain; called after any request
// RecordRequestsHandler is added to the Complete chain; called after any request
func RecordRequestsHandler(r *request.Request) {
recordAWSMetric(operationName(r), time.Since(r.Time).Seconds(), r.Error)
labels := map[string]string{
"request": operationName(r),
}

if r.Error != nil {
metrics.Recorder().IncreaseCount("cloudprovider_aws_api_request_errors", labels)
} else {
duration := time.Since(r.Time).Seconds()
metrics.Recorder().ObserveHistogram("cloudprovider_aws_api_request_duration_seconds", duration, labels, nil)
}
}

// RecordThrottlesAfterRetry is added to the AfterRetry chain; called after any error
// RecordThrottledRequestsHandler is added to the AfterRetry chain; called after any error
func RecordThrottledRequestsHandler(r *request.Request) {
labels := map[string]string{
"operation_name": operationName(r),
}

if r.IsErrorThrottle() {
recordAWSThrottlesMetric(operationName(r))
metrics.Recorder().IncreaseCount("cloudprovider_aws_api_throttled_requests_total", labels)
klog.InfoS("Got RequestLimitExceeded error on AWS request", "request", describeRequest(r))
}
}
Expand Down
149 changes: 149 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package metrics

import (
"net/http"
"sync"
"time"

"k8s.io/component-base/metrics"
"k8s.io/klog/v2"
)

var (
r *metricRecorder // singleton instance of metricRecorder
once sync.Once
)

type metricRecorder struct {
registry metrics.KubeRegistry
metrics map[string]interface{}
}

// Recorder returns the singleton instance of metricRecorder.
// nil is returned if the recorder is not initialized.
func Recorder() *metricRecorder {
return r
}

// InitializeRecorder initializes a new metricRecorder instance if it hasn't been initialized.
func InitializeRecorder() *metricRecorder {
once.Do(func() {
r = &metricRecorder{
registry: metrics.NewKubeRegistry(),
metrics: make(map[string]interface{}),
}
})
return r
}

// IncreaseCount increases the counter metric by 1.
func (m *metricRecorder) IncreaseCount(name string, labels map[string]string) {
if m == nil {
return // recorder is not initialized
}

metric, ok := m.metrics[name]

if !ok {
klog.V(4).InfoS("Metric not found, registering", "name", name, "labels", labels)
m.registerCounterVec(name, "ebs_csi_aws_com metric", getLabelNames(labels))
m.IncreaseCount(name, labels)
return
}

metric.(*metrics.CounterVec).With(metrics.Labels(labels)).Inc()
}

// ObserveHistogram records the given value in the histogram metric.
func (m *metricRecorder) ObserveHistogram(name string, value float64, labels map[string]string, buckets []float64) {
if m == nil {
return // recorder is not initialized
}
metric, ok := m.metrics[name]

if !ok {
klog.V(4).InfoS("Metric not found, registering", "name", name, "labels", labels, "buckets", buckets)
m.registerHistogramVec(name, "ebs_csi_aws_com metric", getLabelNames(labels), buckets)
m.ObserveHistogram(name, value, labels, buckets)
return
}

metric.(*metrics.HistogramVec).With(metrics.Labels(labels)).Observe(value)
}

// InitializeMetricsHandler starts a new HTTP server to expose the metrics.
func (m *metricRecorder) InitializeMetricsHandler(address, path string) {
if m == nil {
klog.InfoS("InitializeMetricsHandler: metric recorder is not initialized")
return
}

mux := http.NewServeMux()
mux.Handle(path, metrics.HandlerFor(
m.registry,
metrics.HandlerOpts{
ErrorHandling: metrics.ContinueOnError,
}))

server := &http.Server{
Addr: address,
Handler: mux,
ReadTimeout: 3 * time.Second,
}

go func() {
klog.InfoS("Metric server listening", "address", address, "path", path)

if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
klog.ErrorS(err, "Failed to start metric server", "address", address, "path", path)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}()
}

func (m *metricRecorder) registerHistogramVec(name, help string, labels []string, buckets []float64) {
if _, exists := m.metrics[name]; exists {
return
}
histogram := createHistogramVec(name, help, labels, buckets)
m.metrics[name] = histogram
m.registry.MustRegister(histogram)
}

func (m *metricRecorder) registerCounterVec(name, help string, labels []string) {
if _, exists := m.metrics[name]; exists {
return
}
counter := createCounterVec(name, help, labels)
m.metrics[name] = counter
m.registry.MustRegister(counter)
}

func createHistogramVec(name, help string, labels []string, buckets []float64) *metrics.HistogramVec {
opts := &metrics.HistogramOpts{
Name: name,
Help: help,
StabilityLevel: metrics.ALPHA,
Buckets: buckets,
}
return metrics.NewHistogramVec(opts, labels)
}

func createCounterVec(name, help string, labels []string) *metrics.CounterVec {
return metrics.NewCounterVec(
&metrics.CounterOpts{
Name: name,
Help: help,
StabilityLevel: metrics.ALPHA,
},
labels,
)
}

func getLabelNames(labels map[string]string) []string {
names := make([]string, 0, len(labels))
for n := range labels {
names = append(names, n)
}
return names
}
Loading