From 50fc8c1766cd71fc6829e414409976a39a02b480 Mon Sep 17 00:00:00 2001 From: Marcin Owsiany Date: Tue, 30 Apr 2024 12:54:49 +0200 Subject: [PATCH] Add support for metrics. --- cmd/fetch.go | 15 +- cmd/metrics.go | 29 +++ deploy/deployment.yaml.gotpl | 45 +++++ deploy/main.go | 9 +- go.mod | 6 +- go.sum | 4 + internal/logging/main.go | 22 +++ internal/main.go | 53 ++++- internal/metrics/.gitignore | 1 + internal/metrics/Makefile | 53 +++++ internal/metrics/fetcher/go.mod | 17 ++ internal/metrics/fetcher/go.sum | 16 ++ internal/metrics/fetcher/main.go | 39 ++++ internal/metrics/gen/metrics.pb.go | 252 ++++++++++++++++++++++++ internal/metrics/gen/metrics_grpc.pb.go | 207 +++++++++++++++++++ internal/metrics/metrics.proto | 19 ++ internal/metrics/server/main.go | 78 ++++++++ internal/metrics/submitter/main.go | 100 ++++++++++ internal/metrics/tools/.gitignore | 2 + internal/metrics/tools/github.mk | 22 +++ internal/metrics/tools/go.mod | 10 + internal/metrics/tools/go.sum | 6 + internal/metrics/tools/gotools.mk | 129 ++++++++++++ internal/metrics/tools/tools-import.go | 13 ++ internal/metrics/tools/tools.go | 3 + 25 files changed, 1129 insertions(+), 21 deletions(-) create mode 100644 cmd/metrics.go create mode 100644 internal/logging/main.go create mode 100644 internal/metrics/.gitignore create mode 100644 internal/metrics/Makefile create mode 100644 internal/metrics/fetcher/go.mod create mode 100644 internal/metrics/fetcher/go.sum create mode 100644 internal/metrics/fetcher/main.go create mode 100644 internal/metrics/gen/metrics.pb.go create mode 100644 internal/metrics/gen/metrics_grpc.pb.go create mode 100644 internal/metrics/metrics.proto create mode 100644 internal/metrics/server/main.go create mode 100644 internal/metrics/submitter/main.go create mode 100644 internal/metrics/tools/.gitignore create mode 100644 internal/metrics/tools/github.mk create mode 100644 internal/metrics/tools/go.mod create mode 100644 internal/metrics/tools/go.sum create mode 100644 internal/metrics/tools/gotools.mk create mode 100644 internal/metrics/tools/tools-import.go create mode 100644 internal/metrics/tools/tools.go diff --git a/cmd/fetch.go b/cmd/fetch.go index 4312451..4ff806b 100644 --- a/cmd/fetch.go +++ b/cmd/fetch.go @@ -1,12 +1,12 @@ package cmd import ( - "log/slog" "os" "strings" "time" "github.com/stackrox/image-prefetcher/internal" + "github.com/stackrox/image-prefetcher/internal/logging" "github.com/spf13/cobra" ) @@ -19,11 +19,7 @@ var fetchCmd = &cobra.Command{ It talks to Container Runtime Interface API to pull images in parallel, with retries.`, RunE: func(cmd *cobra.Command, args []string) error { - opts := &slog.HandlerOptions{AddSource: true} - if debug { - opts.Level = slog.LevelDebug - } - logger := slog.New(slog.NewTextHandler(os.Stderr, opts)) + logger := logging.GetLogger() timing := internal.TimingConfig{ ImageListTimeout: imageListTimeout, InitialPullAttemptTimeout: initialPullAttemptTimeout, @@ -37,7 +33,7 @@ It talks to Container Runtime Interface API to pull images in parallel, with ret return err } imageList = append(imageList, args...) - return internal.Run(logger, criSocket, dockerConfigJSONPath, timing, imageList...) + return internal.Run(logger, criSocket, dockerConfigJSONPath, timing, metricsEndpoint, imageList...) }, } @@ -45,7 +41,7 @@ var ( criSocket string dockerConfigJSONPath string imageListFile string - debug bool + metricsEndpoint string imageListTimeout = time.Minute initialPullAttemptTimeout = 30 * time.Second maxPullAttemptTimeout = 5 * time.Minute @@ -56,11 +52,12 @@ var ( func init() { rootCmd.AddCommand(fetchCmd) + logging.AddFlags(fetchCmd.Flags()) fetchCmd.Flags().StringVar(&criSocket, "cri-socket", "/run/containerd/containerd.sock", "Path to CRI UNIX socket.") fetchCmd.Flags().StringVar(&dockerConfigJSONPath, "docker-config", "", "Path to docker config json file.") fetchCmd.Flags().StringVar(&imageListFile, "image-list-file", "", "Path to text file containing images to pull (one per line).") - fetchCmd.Flags().BoolVar(&debug, "debug", false, "Whether to enable debug logging.") + fetchCmd.Flags().StringVar(&metricsEndpoint, "metrics-endpoint", "", "A host:port to submit image pull metrics to.") fetchCmd.Flags().DurationVar(&imageListTimeout, "image-list-timeout", imageListTimeout, "Timeout for image list calls (for debugging).") fetchCmd.Flags().DurationVar(&initialPullAttemptTimeout, "initial-pull-attempt-timeout", initialPullAttemptTimeout, "Timeout for initial image pull call. Each subsequent attempt doubles it until max.") diff --git a/cmd/metrics.go b/cmd/metrics.go new file mode 100644 index 0000000..2ef3c95 --- /dev/null +++ b/cmd/metrics.go @@ -0,0 +1,29 @@ +package cmd + +import ( + "github.com/stackrox/image-prefetcher/internal/logging" + "github.com/stackrox/image-prefetcher/internal/metrics/server" + + "github.com/spf13/cobra" +) + +// aggregateMetricsCmd represents the aggregate-metrics command +var aggregateMetricsCmd = &cobra.Command{ + Use: "aggregate-metrics", + Short: "Accept metrics submissions and serve them.", + Long: `This subcommand is intended to run in a single pod. + +It serves a gRPC endpoint to which individual metrics can be submitted, +and from which the aggregate metrics can be fetched.`, + RunE: func(cmd *cobra.Command, args []string) error { + return server.Run(logging.GetLogger(), port) + }, +} + +var port int + +func init() { + rootCmd.AddCommand(aggregateMetricsCmd) + logging.AddFlags(aggregateMetricsCmd.Flags()) + aggregateMetricsCmd.Flags().IntVar(&port, "port", 8443, "Port for metrics gRPC endpoint to listen on.") +} diff --git a/deploy/deployment.yaml.gotpl b/deploy/deployment.yaml.gotpl index 8edf071..8685da8 100644 --- a/deploy/deployment.yaml.gotpl +++ b/deploy/deployment.yaml.gotpl @@ -26,6 +26,48 @@ roleRef: name: privileged-scc-use --- {{ end }} +{{ if .CollectMetrics }} +apiVersion: v1 +kind: Pod +metadata: + name: {{ .Name }}-metrics + labels: + app: {{ .Name }}-metrics +spec: + containers: + - name: aggregator + image: {{ .Image }}:{{ .Version }} + args: + - "aggregate-metrics" + - "--debug" + ports: + - containerPort: 8443 + readinessProbe: + grpc: + port: 8443 + resources: + requests: + cpu: "5m" + memory: "16Mi" + limits: + cpu: "100m" + memory: "64Mi" + securityContext: + readOnlyRootFilesystem: true +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ .Name }}-metrics +spec: + ports: + - port: 8443 + protocol: TCP + selector: + app: {{ .Name }}-metrics + type: LoadBalancer +--- +{{ end }} apiVersion: apps/v1 kind: DaemonSet metadata: @@ -64,6 +106,9 @@ spec: {{ else }} - "--cri-socket=/tmp/cri/containerd.sock" {{ end }} + {{ if .CollectMetrics }} + - "--metrics-endpoint={{ .Name }}-metrics:8443" + {{ end }} resources: requests: cpu: "20m" diff --git a/deploy/main.go b/deploy/main.go index c4090ce..c083068 100644 --- a/deploy/main.go +++ b/deploy/main.go @@ -14,16 +14,17 @@ type settings struct { Secret string IsCRIO bool NeedsPrivileged bool + CollectMetrics bool } const imageRepo = "quay.io/stackrox-io/image-prefetcher" //go:embed deployment.yaml.gotpl -var daemonSetTemplate string +var deploymentTemplate string func main() { if len(os.Args) < 4 { - println("Usage:", os.Args[0], " vanilla|ocp [secret]") + println("Usage:", os.Args[0], " vanilla|ocp [secret] [collect-metrics]") os.Exit(1) } name := os.Args[1] @@ -33,6 +34,7 @@ func main() { if len(os.Args) > 4 { secret = os.Args[4] } + collectMetrics := len(os.Args) > 5 && os.Args[5] == "collect-metrics" s := settings{ Name: name, @@ -41,8 +43,9 @@ func main() { Secret: secret, IsCRIO: isOcp, NeedsPrivileged: isOcp, + CollectMetrics: collectMetrics, } - tmpl := template.Must(template.New("deployment").Parse(daemonSetTemplate)) + tmpl := template.Must(template.New("deployment").Parse(deploymentTemplate)) if err := tmpl.Execute(os.Stdout, s); err != nil { log.Fatal(err) } diff --git a/go.mod b/go.mod index f7bc8d3..072e2cf 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,12 @@ go 1.21 toolchain go1.21.7 require ( + github.com/cenkalti/backoff/v4 v4.2.1 + github.com/google/uuid v1.6.0 github.com/spf13/cobra v1.8.0 + github.com/spf13/pflag v1.0.5 google.golang.org/grpc v1.63.2 + google.golang.org/protobuf v1.33.0 k8s.io/cri-api v0.29.3 k8s.io/kubernetes v1.29.3 ) @@ -51,12 +55,10 @@ require ( github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect - github.com/spf13/pflag v1.0.5 // indirect golang.org/x/net v0.21.0 // indirect golang.org/x/sys v0.17.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect - google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect k8s.io/apiextensions-apiserver v0.29.3 // indirect k8s.io/apimachinery v0.29.3 // indirect diff --git a/go.sum b/go.sum index 53ee923..4c45736 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= @@ -17,6 +19,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/internal/logging/main.go b/internal/logging/main.go new file mode 100644 index 0000000..9746225 --- /dev/null +++ b/internal/logging/main.go @@ -0,0 +1,22 @@ +package logging + +import ( + "log/slog" + "os" + + "github.com/spf13/pflag" +) + +var debug bool + +func GetLogger() *slog.Logger { + opts := &slog.HandlerOptions{AddSource: true} + if debug { + opts.Level = slog.LevelDebug + } + return slog.New(slog.NewTextHandler(os.Stderr, opts)) +} + +func AddFlags(flags *pflag.FlagSet) { + flags.BoolVar(&debug, "debug", false, "Whether to enable debug logging.") +} diff --git a/internal/main.go b/internal/main.go index 05f67bd..1c1d25e 100644 --- a/internal/main.go +++ b/internal/main.go @@ -9,6 +9,10 @@ import ( "sync" "time" + metricsProto "github.com/stackrox/image-prefetcher/internal/metrics/gen" + "github.com/stackrox/image-prefetcher/internal/metrics/submitter" + + "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" criV1 "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -24,20 +28,30 @@ type TimingConfig struct { MaxPullAttemptDelay time.Duration } -func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, timing TimingConfig, imageNames ...string) error { +func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, timing TimingConfig, metricsEndpoint string, imageNames ...string) error { ctx, cancel := context.WithTimeout(context.Background(), timing.OverallTimeout) defer cancel() - clientConn, err := grpc.DialContext(ctx, "unix://"+criSocketPath, grpc.WithTransportCredentials(insecure.NewCredentials())) + criConn, err := grpc.DialContext(ctx, "unix://"+criSocketPath, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return fmt.Errorf("failed to dial CRI socket %q: %w", criSocketPath, err) } - client := criV1.NewImageServiceClient(clientConn) + criClient := criV1.NewImageServiceClient(criConn) - if err := listImagesForDebugging(ctx, logger, client, timing.ImageListTimeout, "before"); err != nil { + if err := listImagesForDebugging(ctx, logger, criClient, timing.ImageListTimeout, "before"); err != nil { return fmt.Errorf("failed to list images for debugging before pulling: %w", err) } + var metricsSink *submitter.Submitter + if metricsEndpoint != "" { + metricsConn, err := grpc.DialContext(ctx, metricsEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return fmt.Errorf("failed to dial metrics endpoint %q: %w", metricsEndpoint, err) + } + metricsSink = submitter.NewSubmitter(logger, metricsProto.NewMetricsClient(metricsConn)) + go metricsSink.Run(ctx) + } + kr := credentialprovider.BasicDockerKeyring{} if err := loadPullSecret(logger, &kr, dockerConfigJSONPath); err != nil { return fmt.Errorf("failed to load image pull secrets: %w", err) @@ -54,12 +68,13 @@ func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, }, Auth: auth, } - go pullImageWithRetries(ctx, logger.With("image", imageName, "authNum", i), &wg, client, request, timing) + go pullImageWithRetries(ctx, logger.With("image", imageName, "authNum", i), &wg, criClient, metricsSink.Chan(), imageName, request, timing) } } wg.Wait() logger.Info("pulling images finished") - if err := listImagesForDebugging(ctx, logger, client, timing.ImageListTimeout, "after"); err != nil { + metricsSink.Await() + if err := listImagesForDebugging(ctx, logger, criClient, timing.ImageListTimeout, "after"); err != nil { return fmt.Errorf("failed to list images for debugging after pulling: %w", err) } return nil @@ -122,7 +137,7 @@ func getAuthsForImage(ctx context.Context, logger *slog.Logger, kr credentialpro return auths } -func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.WaitGroup, client criV1.ImageServiceClient, request *criV1.PullImageRequest, timing TimingConfig) { +func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.WaitGroup, client criV1.ImageServiceClient, metricsSink chan<- *metricsProto.Metric, name string, request *criV1.PullImageRequest, timing TimingConfig) { defer wg.Done() attemptTimeout := timing.InitialPullAttemptTimeout delay := timing.InitialPullAttemptDelay @@ -135,9 +150,33 @@ func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.Wai cancel() if err == nil { logger.InfoContext(ctx, "image pulled successfully", "response", response, "elapsed", elapsed) + imageStatus, err := client.ImageStatus(ctx, &criV1.ImageStatusRequest{ + Image: &criV1.ImageSpec{ + Image: response.ImageRef, + }, + Verbose: true, + }) + var size uint64 + if err != nil { + logger.WarnContext(ctx, "failed to obtain image status", "image", response.ImageRef, "error", err) + } else { + size = imageStatus.GetImage().GetSize_() + } + metricsSink <- &metricsProto.Metric{ + AttemptId: uuid.NewString(), + Image: name, + DurationMs: uint64(elapsed.Milliseconds()), + Size: size, + } return } logger.ErrorContext(ctx, "image failed to pull", "error", err, "timeout", attemptTimeout, "elapsed", elapsed) + metricsSink <- &metricsProto.Metric{ + AttemptId: uuid.NewString(), + Image: name, + DurationMs: uint64(elapsed.Milliseconds()), + Error: err.Error(), + } if ctx.Err() != nil { logger.ErrorContext(ctx, "not retrying any more", "error", ctx.Err()) return diff --git a/internal/metrics/.gitignore b/internal/metrics/.gitignore new file mode 100644 index 0000000..f81b0a3 --- /dev/null +++ b/internal/metrics/.gitignore @@ -0,0 +1 @@ +/.gotools/ diff --git a/internal/metrics/Makefile b/internal/metrics/Makefile new file mode 100644 index 0000000..c41e9b7 --- /dev/null +++ b/internal/metrics/Makefile @@ -0,0 +1,53 @@ +BASE_PATH ?= $(CURDIR) +SILENT ?= @ + +include $(BASE_PATH)/tools/gotools.mk +include $(BASE_PATH)/tools/github.mk + +$(call go-tool, PROTOC_GEN_GO_BIN, google.golang.org/protobuf/cmd/protoc-gen-go, tools) +$(call go-tool, PROTOC_GEN_GO_GRPC_BIN, google.golang.org/grpc/cmd/protoc-gen-go-grpc, tools) + +PROTOC_VERSION := 26.1 +UNAME_S := $(shell uname -s) +ifeq ($(UNAME_S),Linux) +PROTOC_OS = linux +endif +ifeq ($(UNAME_S),Darwin) +PROTOC_OS = osx +endif +PROTOC_ARCH=$(shell case $$(uname -m) in (arm64) echo aarch_64 ;; (s390x) echo s390_64 ;; (*) uname -m ;; esac) + +PROTO_PRIVATE_DIR := $(BASE_PATH)/tools + +PROTOC_DIR := $(PROTO_PRIVATE_DIR)/protoc-$(PROTOC_OS)-$(PROTOC_ARCH)-$(PROTOC_VERSION) + +PROTOC := $(PROTOC_DIR)/bin/protoc + +PROTOC_DOWNLOADS_DIR := $(PROTO_PRIVATE_DIR)/.downloads + +.PHONY: all +all: $(PROTOC) $(PROTOC_GEN_GO_BIN) $(PROTOC_GEN_GO_GRPC_BIN) + mkdir -p gen + export PATH=$(BASE_PATH)/tools:$(BASE_PATH)/.gotools/bin:$$PATH; \ + $(PROTOC) --go_out=gen --go_opt=paths=source_relative --go-grpc_out=gen --go-grpc_opt=paths=source_relative metrics.proto + +$(PROTOC_DOWNLOADS_DIR): + @echo "+ $@" + $(SILENT)mkdir -p "$@" + +PROTOC_ZIP := protoc-$(PROTOC_VERSION)-$(PROTOC_OS)-$(PROTOC_ARCH).zip +PROTOC_FILE := $(PROTOC_DOWNLOADS_DIR)/$(PROTOC_ZIP) + +$(PROTOC_FILE): $(PROTOC_DOWNLOADS_DIR) + @$(GET_GITHUB_RELEASE_FN); \ + get_github_release "$@" "https://github.com/protocolbuffers/protobuf/releases/download/v$(PROTOC_VERSION)/$(PROTOC_ZIP)" + +.PRECIOUS: $(PROTOC_FILE) + +$(PROTOC): + @echo "+ $@" + $(SILENT)$(MAKE) "$(PROTOC_FILE)" + $(SILENT)mkdir -p "$(PROTOC_DIR)" + $(SILENT)unzip -q -o -d "$(PROTOC_DIR)" "$(PROTOC_FILE)" + $(SILENT)test -x "$@" + diff --git a/internal/metrics/fetcher/go.mod b/internal/metrics/fetcher/go.mod new file mode 100644 index 0000000..a4a2365 --- /dev/null +++ b/internal/metrics/fetcher/go.mod @@ -0,0 +1,17 @@ +module github.com/stackrox/image-prefetcher/internal/metrics/fetcher + +go 1.22 + +replace github.com/stackrox/image-prefetcher v0.1.0 => ../../../ + +require github.com/stackrox/image-prefetcher v0.1.0 + +require ( + github.com/cenkalti/backoff/v4 v4.2.1 // indirect + golang.org/x/net v0.21.0 // indirect + golang.org/x/sys v0.17.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect + google.golang.org/grpc v1.63.2 // indirect + google.golang.org/protobuf v1.33.0 // indirect +) diff --git a/internal/metrics/fetcher/go.sum b/internal/metrics/fetcher/go.sum new file mode 100644 index 0000000..6a0ccff --- /dev/null +++ b/internal/metrics/fetcher/go.sum @@ -0,0 +1,16 @@ +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= +google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= +google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= diff --git a/internal/metrics/fetcher/main.go b/internal/metrics/fetcher/main.go new file mode 100644 index 0000000..005fd63 --- /dev/null +++ b/internal/metrics/fetcher/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "context" + "fmt" + "io" + "log" + "os" + "time" + + metricsProto "github.com/stackrox/image-prefetcher/internal/metrics/gen" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func main() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + metricsEndpoint := os.Args[1] + metricsConn, err := grpc.DialContext(ctx, metricsEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("failed to dial metrics endpoint %q: %v", metricsEndpoint, err) + } + client := metricsProto.NewMetricsClient(metricsConn) + fetch, err := client.Fetch(ctx, &metricsProto.Empty{}) + if err != nil { + log.Fatalf("failed to start fetching metrics from endpoint %q: %v", metricsEndpoint, err) + } + for { + metric, err := fetch.Recv() + if err == io.EOF { + return + } + if err != nil { + log.Fatalf("failed to fetch metrics from endpoint %q: %v", metricsEndpoint, err) + } + fmt.Println(metric) + } +} diff --git a/internal/metrics/gen/metrics.pb.go b/internal/metrics/gen/metrics.pb.go new file mode 100644 index 0000000..6a346eb --- /dev/null +++ b/internal/metrics/gen/metrics.pb.go @@ -0,0 +1,252 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v5.26.1 +// source: metrics.proto + +package gen + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Metric struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + AttemptId string `protobuf:"bytes,1,opt,name=attempt_id,json=attemptId,proto3" json:"attempt_id,omitempty"` + Image string `protobuf:"bytes,2,opt,name=image,proto3" json:"image,omitempty"` + Error string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"` + DurationMs uint64 `protobuf:"varint,4,opt,name=duration_ms,json=durationMs,proto3" json:"duration_ms,omitempty"` + Node string `protobuf:"bytes,5,opt,name=node,proto3" json:"node,omitempty"` + Size uint64 `protobuf:"varint,6,opt,name=size,proto3" json:"size,omitempty"` +} + +func (x *Metric) Reset() { + *x = Metric{} + if protoimpl.UnsafeEnabled { + mi := &file_metrics_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Metric) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Metric) ProtoMessage() {} + +func (x *Metric) ProtoReflect() protoreflect.Message { + mi := &file_metrics_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Metric.ProtoReflect.Descriptor instead. +func (*Metric) Descriptor() ([]byte, []int) { + return file_metrics_proto_rawDescGZIP(), []int{0} +} + +func (x *Metric) GetAttemptId() string { + if x != nil { + return x.AttemptId + } + return "" +} + +func (x *Metric) GetImage() string { + if x != nil { + return x.Image + } + return "" +} + +func (x *Metric) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *Metric) GetDurationMs() uint64 { + if x != nil { + return x.DurationMs + } + return 0 +} + +func (x *Metric) GetNode() string { + if x != nil { + return x.Node + } + return "" +} + +func (x *Metric) GetSize() uint64 { + if x != nil { + return x.Size + } + return 0 +} + +type Empty struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *Empty) Reset() { + *x = Empty{} + if protoimpl.UnsafeEnabled { + mi := &file_metrics_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Empty) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Empty) ProtoMessage() {} + +func (x *Empty) ProtoReflect() protoreflect.Message { + mi := &file_metrics_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Empty.ProtoReflect.Descriptor instead. +func (*Empty) Descriptor() ([]byte, []int) { + return file_metrics_proto_rawDescGZIP(), []int{1} +} + +var File_metrics_proto protoreflect.FileDescriptor + +var file_metrics_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, + 0x9c, 0x01, 0x0a, 0x06, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x74, + 0x74, 0x65, 0x6d, 0x70, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, + 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6d, 0x61, + 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x12, + 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x6d, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x64, 0x75, 0x72, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x6f, 0x64, 0x65, 0x18, 0x05, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, + 0x7a, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x22, 0x07, + 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x32, 0x46, 0x0a, 0x07, 0x4d, 0x65, 0x74, 0x72, 0x69, + 0x63, 0x73, 0x12, 0x1d, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x07, 0x2e, 0x4d, + 0x65, 0x74, 0x72, 0x69, 0x63, 0x1a, 0x06, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x28, + 0x01, 0x12, 0x1c, 0x0a, 0x05, 0x46, 0x65, 0x74, 0x63, 0x68, 0x12, 0x06, 0x2e, 0x45, 0x6d, 0x70, + 0x74, 0x79, 0x1a, 0x07, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x22, 0x00, 0x30, 0x01, 0x42, + 0x3b, 0x5a, 0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x74, + 0x61, 0x63, 0x6b, 0x72, 0x6f, 0x78, 0x2f, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x2d, 0x70, 0x72, 0x65, + 0x66, 0x65, 0x74, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, + 0x2f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x3b, 0x67, 0x65, 0x6e, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_metrics_proto_rawDescOnce sync.Once + file_metrics_proto_rawDescData = file_metrics_proto_rawDesc +) + +func file_metrics_proto_rawDescGZIP() []byte { + file_metrics_proto_rawDescOnce.Do(func() { + file_metrics_proto_rawDescData = protoimpl.X.CompressGZIP(file_metrics_proto_rawDescData) + }) + return file_metrics_proto_rawDescData +} + +var file_metrics_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_metrics_proto_goTypes = []interface{}{ + (*Metric)(nil), // 0: Metric + (*Empty)(nil), // 1: Empty +} +var file_metrics_proto_depIdxs = []int32{ + 0, // 0: Metrics.Submit:input_type -> Metric + 1, // 1: Metrics.Fetch:input_type -> Empty + 1, // 2: Metrics.Submit:output_type -> Empty + 0, // 3: Metrics.Fetch:output_type -> Metric + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_metrics_proto_init() } +func file_metrics_proto_init() { + if File_metrics_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_metrics_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Metric); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_metrics_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Empty); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_metrics_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_metrics_proto_goTypes, + DependencyIndexes: file_metrics_proto_depIdxs, + MessageInfos: file_metrics_proto_msgTypes, + }.Build() + File_metrics_proto = out.File + file_metrics_proto_rawDesc = nil + file_metrics_proto_goTypes = nil + file_metrics_proto_depIdxs = nil +} diff --git a/internal/metrics/gen/metrics_grpc.pb.go b/internal/metrics/gen/metrics_grpc.pb.go new file mode 100644 index 0000000..15b0ca9 --- /dev/null +++ b/internal/metrics/gen/metrics_grpc.pb.go @@ -0,0 +1,207 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.26.1 +// source: metrics.proto + +package gen + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + Metrics_Submit_FullMethodName = "/Metrics/Submit" + Metrics_Fetch_FullMethodName = "/Metrics/Fetch" +) + +// MetricsClient is the client API for Metrics service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type MetricsClient interface { + Submit(ctx context.Context, opts ...grpc.CallOption) (Metrics_SubmitClient, error) + Fetch(ctx context.Context, in *Empty, opts ...grpc.CallOption) (Metrics_FetchClient, error) +} + +type metricsClient struct { + cc grpc.ClientConnInterface +} + +func NewMetricsClient(cc grpc.ClientConnInterface) MetricsClient { + return &metricsClient{cc} +} + +func (c *metricsClient) Submit(ctx context.Context, opts ...grpc.CallOption) (Metrics_SubmitClient, error) { + stream, err := c.cc.NewStream(ctx, &Metrics_ServiceDesc.Streams[0], Metrics_Submit_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &metricsSubmitClient{stream} + return x, nil +} + +type Metrics_SubmitClient interface { + Send(*Metric) error + CloseAndRecv() (*Empty, error) + grpc.ClientStream +} + +type metricsSubmitClient struct { + grpc.ClientStream +} + +func (x *metricsSubmitClient) Send(m *Metric) error { + return x.ClientStream.SendMsg(m) +} + +func (x *metricsSubmitClient) CloseAndRecv() (*Empty, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(Empty) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *metricsClient) Fetch(ctx context.Context, in *Empty, opts ...grpc.CallOption) (Metrics_FetchClient, error) { + stream, err := c.cc.NewStream(ctx, &Metrics_ServiceDesc.Streams[1], Metrics_Fetch_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &metricsFetchClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Metrics_FetchClient interface { + Recv() (*Metric, error) + grpc.ClientStream +} + +type metricsFetchClient struct { + grpc.ClientStream +} + +func (x *metricsFetchClient) Recv() (*Metric, error) { + m := new(Metric) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// MetricsServer is the server API for Metrics service. +// All implementations must embed UnimplementedMetricsServer +// for forward compatibility +type MetricsServer interface { + Submit(Metrics_SubmitServer) error + Fetch(*Empty, Metrics_FetchServer) error + mustEmbedUnimplementedMetricsServer() +} + +// UnimplementedMetricsServer must be embedded to have forward compatible implementations. +type UnimplementedMetricsServer struct { +} + +func (UnimplementedMetricsServer) Submit(Metrics_SubmitServer) error { + return status.Errorf(codes.Unimplemented, "method Submit not implemented") +} +func (UnimplementedMetricsServer) Fetch(*Empty, Metrics_FetchServer) error { + return status.Errorf(codes.Unimplemented, "method Fetch not implemented") +} +func (UnimplementedMetricsServer) mustEmbedUnimplementedMetricsServer() {} + +// UnsafeMetricsServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to MetricsServer will +// result in compilation errors. +type UnsafeMetricsServer interface { + mustEmbedUnimplementedMetricsServer() +} + +func RegisterMetricsServer(s grpc.ServiceRegistrar, srv MetricsServer) { + s.RegisterService(&Metrics_ServiceDesc, srv) +} + +func _Metrics_Submit_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MetricsServer).Submit(&metricsSubmitServer{stream}) +} + +type Metrics_SubmitServer interface { + SendAndClose(*Empty) error + Recv() (*Metric, error) + grpc.ServerStream +} + +type metricsSubmitServer struct { + grpc.ServerStream +} + +func (x *metricsSubmitServer) SendAndClose(m *Empty) error { + return x.ServerStream.SendMsg(m) +} + +func (x *metricsSubmitServer) Recv() (*Metric, error) { + m := new(Metric) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Metrics_Fetch_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(MetricsServer).Fetch(m, &metricsFetchServer{stream}) +} + +type Metrics_FetchServer interface { + Send(*Metric) error + grpc.ServerStream +} + +type metricsFetchServer struct { + grpc.ServerStream +} + +func (x *metricsFetchServer) Send(m *Metric) error { + return x.ServerStream.SendMsg(m) +} + +// Metrics_ServiceDesc is the grpc.ServiceDesc for Metrics service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Metrics_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "Metrics", + HandlerType: (*MetricsServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Submit", + Handler: _Metrics_Submit_Handler, + ClientStreams: true, + }, + { + StreamName: "Fetch", + Handler: _Metrics_Fetch_Handler, + ServerStreams: true, + }, + }, + Metadata: "metrics.proto", +} diff --git a/internal/metrics/metrics.proto b/internal/metrics/metrics.proto new file mode 100644 index 0000000..283a41d --- /dev/null +++ b/internal/metrics/metrics.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +option go_package = "github.com/stackrox/image-prefetcher/internal/metrics;gen"; + +message Metric { + string attempt_id = 1; + string image = 2; + string error = 3; + uint64 duration_ms = 4; + string node = 5; + uint64 size = 6; +} + +message Empty {} + +service Metrics { + rpc Submit(stream Metric) returns (Empty) {} + rpc Fetch(Empty) returns (stream Metric) {} +} \ No newline at end of file diff --git a/internal/metrics/server/main.go b/internal/metrics/server/main.go new file mode 100644 index 0000000..179db52 --- /dev/null +++ b/internal/metrics/server/main.go @@ -0,0 +1,78 @@ +package server + +import ( + "fmt" + "io" + "log/slog" + "net" + "sync" + + "github.com/stackrox/image-prefetcher/internal/metrics/gen" + + "google.golang.org/grpc" +) + +type metricsServer struct { + mutex sync.Mutex + metrics map[string]*gen.Metric + logger *slog.Logger + gen.UnimplementedMetricsServer +} + +func (s *metricsServer) Submit(stream gen.Metrics_SubmitServer) error { + for { + metric, err := stream.Recv() + if err == io.EOF { + return stream.SendAndClose(&gen.Empty{}) + } + if err != nil { + return err + } + s.metricSubmitted(metric) + } +} + +func (s *metricsServer) metricSubmitted(metric *gen.Metric) { + s.logger.Debug("metric submitted", "metric", metric) + s.mutex.Lock() + defer s.mutex.Unlock() + if _, ok := s.metrics[metric.AttemptId]; ok { + s.logger.Info("duplicate metric submitted", "metric", metric) + } + s.metrics[metric.AttemptId] = metric +} + +func (s *metricsServer) Fetch(_ *gen.Empty, server gen.Metrics_FetchServer) error { + for _, m := range s.currentMetrics() { + if err := server.Send(m); err != nil { + return err + } + } + return nil +} + +func (s *metricsServer) currentMetrics() []*gen.Metric { + s.mutex.Lock() + defer s.mutex.Unlock() + metrics := make([]*gen.Metric, 0, len(s.metrics)) + for _, metric := range s.metrics { + metrics = append(metrics, metric) + } + return metrics +} + +func Run(logger *slog.Logger, port int) error { + spec := fmt.Sprintf(":%d", port) + listener, err := net.Listen("tcp", spec) + if err != nil { + return fmt.Errorf("failed to listen on %s", spec) + } + grpcServer := grpc.NewServer() + server := &metricsServer{ + logger: logger, + metrics: make(map[string]*gen.Metric), + } + gen.RegisterMetricsServer(grpcServer, server) + logger.Info("starting to serve", "spec", spec) + return grpcServer.Serve(listener) +} diff --git a/internal/metrics/submitter/main.go b/internal/metrics/submitter/main.go new file mode 100644 index 0000000..2305dce --- /dev/null +++ b/internal/metrics/submitter/main.go @@ -0,0 +1,100 @@ +package submitter + +import ( + "context" + "fmt" + "log/slog" + "os" + "time" + + "github.com/stackrox/image-prefetcher/internal/metrics/gen" + + "github.com/cenkalti/backoff/v4" +) + +type Submitter struct { + c chan *gen.Metric + done chan struct{} + client gen.MetricsClient + logger *slog.Logger +} + +func NewSubmitter(logger *slog.Logger, client gen.MetricsClient) *Submitter { + return &Submitter{ + c: make(chan *gen.Metric, 1), + done: make(chan struct{}), + client: client, + logger: logger, + } +} + +func (s *Submitter) Run(ctx context.Context) { + defer func() { s.done <- struct{}{} }() + if s.client == nil { + for range s.c { + } + return + } + hostName, err := os.Hostname() + if err != nil { + s.logger.WarnContext(ctx, "could not obtain hostname", "error", err) + hostName = "unknown" + } + + var metrics []*gen.Metric + for metric := range s.c { + metric.Node = hostName + s.logger.DebugContext(ctx, "metric received", "metric", metric) + metrics = append(metrics, metric) + } + + if err = s.submit(ctx, metrics); err == nil { + s.logger.InfoContext(ctx, "metrics submitted") + return + } + s.logger.ErrorContext(ctx, "metric Submit RPC failed, retrying", "error", err) + b := backoff.NewExponentialBackOff() + b.InitialInterval = 10 * time.Second + b.MaxElapsedTime = 0 + ticker := backoff.NewTicker(backoff.WithContext(b, ctx)) + defer ticker.Stop() + for range ticker.C { + if ctx.Err() != nil { + s.logger.ErrorContext(ctx, "giving up retrying metrics submission", "error", ctx.Err()) + } + if err = s.submit(ctx, metrics); err == nil { + s.logger.InfoContext(ctx, "metrics submitted") + return + } + s.logger.ErrorContext(ctx, "metric Submit RPC failed, retrying", "error", err) + } +} + +func (s *Submitter) Await() { + if s == nil { + return + } + close(s.c) + s.logger.Info("waiting for metrics to be submitted") + <-s.done +} + +func (s *Submitter) submit(ctx context.Context, metrics []*gen.Metric) error { + submitClient, err := s.client.Submit(ctx) + if err != nil { + return fmt.Errorf("invoking metric Submit RPC failed: %w", err) + } + for _, metric := range metrics { + if err := submitClient.Send(metric); err != nil { + return fmt.Errorf("streaming metric to Submit RPC failed: %w", err) + } + } + if _, err := submitClient.CloseAndRecv(); err != nil { + return fmt.Errorf("finishing metric Submit RPC failed: %w", err) + } + return nil +} + +func (s *Submitter) Chan() chan<- *gen.Metric { + return s.c +} diff --git a/internal/metrics/tools/.gitignore b/internal/metrics/tools/.gitignore new file mode 100644 index 0000000..580cecf --- /dev/null +++ b/internal/metrics/tools/.gitignore @@ -0,0 +1,2 @@ +/protoc-*/ +/.downloads/ diff --git a/internal/metrics/tools/github.mk b/internal/metrics/tools/github.mk new file mode 100644 index 0000000..20e33ad --- /dev/null +++ b/internal/metrics/tools/github.mk @@ -0,0 +1,22 @@ +# github.mk +# Helpers for fetching released binaries from github projects. +# Copied from https://github.com/stackrox/stackrox/blob/master/make/github.mk + +# For usage instructions, see uses of this macro elsewhere, since there is no +# single standard for architecture naming. +GET_GITHUB_RELEASE_FN = get_github_release() { \ + [[ -x $${1} ]] || { \ + set -euo pipefail ;\ + echo "+ $${1}" ;\ + mkdir -p bin ;\ + attempts=5 ;\ + for i in $$(seq $$attempts); do \ + curl --silent --show-error --fail --location --output "$${1}" "$${2}" && break ;\ + [[ $$i -eq $$attempts ]] && exit 1; \ + echo "Retrying after $$((i*i)) seconds..."; \ + sleep "$$((i*i))"; \ + done ;\ + [[ $$(uname -s) != Darwin ]] || xattr -c "$${1}" ;\ + chmod +x "$${1}" ;\ + } \ +} diff --git a/internal/metrics/tools/go.mod b/internal/metrics/tools/go.mod new file mode 100644 index 0000000..c2171e6 --- /dev/null +++ b/internal/metrics/tools/go.mod @@ -0,0 +1,10 @@ +module github.com/stackrox/image-prefetcher/internal/metrics/tools + +go 1.20 + +require ( + google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 + google.golang.org/protobuf v1.33.0 +) + +require github.com/google/go-cmp v0.6.0 // indirect diff --git a/internal/metrics/tools/go.sum b/internal/metrics/tools/go.sum new file mode 100644 index 0000000..e8e124c --- /dev/null +++ b/internal/metrics/tools/go.sum @@ -0,0 +1,6 @@ +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 h1:rNBFJjBCOgVr9pWD7rs/knKL4FRTKgpZmsRfV214zcA= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= diff --git a/internal/metrics/tools/gotools.mk b/internal/metrics/tools/gotools.mk new file mode 100644 index 0000000..afa9df8 --- /dev/null +++ b/internal/metrics/tools/gotools.mk @@ -0,0 +1,129 @@ +# gotools.mk +# Simplified installation & usage of Go-based tools +# Copied from https://github.com/stackrox/stackrox/blob/master/make/gotools.mk +# +# Input variables: +# GOTOOLS_PROJECT_ROOT: the project root directory; defaults to $(CURDIR) +# GOTOOLS_ROOT: the directory in which this file stores auxiliary data (should be .gitignore'd); defaults to +# $(GOTOOLS_PROJECT_ROOT)/.gotools +# GOTOOLS_BIN: the directory in which binaries are stored; defaults to $(GOTOOLS_ROOT)/bin. +# +# This file defines a single (user-facing) macro, `go-tool`, which can be invoked via +# $(call go-tool VARNAME, go-pkg [, module-root]) +# where go-pkg can be: +# - an absolute Go import path with an explicit version, e.g., +# github.com/golangci/golangci-lint/cmd/golangci-lint@v1.49.0. In this case, the tool is installed via `go install`, +# and module information from the local workspace is ignored, in accordance with the normal behavior of go install +# with an explicit version given. +# - an absolute Go import path WITHOUT a version, e.g., github.com/golangci/golangci-lint/cmd/golangci-lint. In this +# case, the tool is installed via `go install` from the module rooted at $(GOTOOLS_PROJECT_ROOT), or, if +# module-root is given, from the module rooted at that (relative) path. I.e., go-pkg must be provided by a module +# listed as a requirement in /go.mod. +# - a relative Go import path (WITHOUT a version), e.g., ./tools/roxvet. In this case, the tool is installed via +# `go install` from the module rooted at $(GOTOOLS_PROJECT_ROOT). +# +# Invoking go-tool will set up Makefile rules to build the tools, using reasonable strategies for caching to avoid +# building a tool multiple times. In particular: +# - when using an absolute Go import path with a version, the rule is set up such that the `go install` command is only +# run once. +# - when using an absolute Go import path without a version, the rule is set up such that the `go install` command is +# re-run only when the respective go.mod file changes. +# - when using a relative Go import path, the rule is set up such that the `go install` command is re-run on every +# `make` invocation. +# Note that `go install` uses a pretty effective caching strategy under the hood, so even with relative import path, +# you should not expect noticeable latency. +# +# In addition to setting up the rules for building, invoking go-tool will also set the value of the variable `VARNAME` +# to the (canonical) location of the respective tool's binary, which is $(GOTOOLS_BIN)/. `$(VARNAME)` +# should be used as the only way of both invoking the tool in the Makefile as well as expressing a dependency on the +# installation of the tool. +# For use in non-Makefile scripts, a target `which-` is added, whhere is the basename of the tool binary. +# This target prints the canonical location of the binary and, if necessary, builds it. Note that invocations of +# `make which-tool` should be made with the flags `--quiet --no-print-directory` set, as otherwise the output gets +# clobbered. +# +# This file also defines two static, global targets: +# gotools-clean: this removes all gotools-related data +# gotools-all: this builds all gotools. + +GOTOOLS_PROJECT_ROOT ?= $(CURDIR) +GOTOOLS_ROOT ?= $(GOTOOLS_PROJECT_ROOT)/.gotools +GOTOOLS_BIN ?= $(GOTOOLS_ROOT)/bin + +_GOTOOLS_ALL_GOTOOLS := + +define go-tool-impl +# The variable via which the executable can be referenced +_gotools_var_name := $(strip $(1)) +# The importable Go package path that contains the "main" package for the tool +_gotools_pkg := $(firstword $(subst @, ,$(strip $(2)))) +# The version of the tool (if a version was explicitly specified) +_gotools_version := $(word 2,$(subst @, ,$(strip $(2)))) +# The folder containing the go.mod file, if not the root folder +ifeq ($(strip $(3)),) +_gotools_mod_root := $(GOTOOLS_PROJECT_ROOT) +else +_gotools_mod_root := $(strip $(3)) +endif + +# We need to strip a `/v2` (etc.) suffix to derive the tool binary's basename. +_gotools_bin_name := $$(notdir $$(shell echo "$$(_gotools_pkg)" | sed -E 's@/v[[:digit:]]+$$$$@@g')) +_gotools_canonical_bin_path := $(GOTOOLS_BIN)/$$(_gotools_bin_name) +$$(_gotools_var_name) := $$(_gotools_canonical_bin_path) + +.PHONY: which-$$(_gotools_bin_name) +which-$$(_gotools_bin_name): + @$(MAKE) $$($(strip $(1))) >&2 + @echo $$($(strip $(1))) + +ifneq ($(filter ./%,$(2)),) +# Tool is built from local files. We have to introduce a phony target and let the Go compiler +# do all the caching. +.PHONY: $$(_gotools_canonical_bin_path) +$$(_gotools_canonical_bin_path): + @echo "+ $$(notdir $$@)" + $$(SILENT)GOBIN="$$(dir $$@)" go install "$(strip $(2))" +else +# Tool is specified with version, so we don't take any info from the go.mod file. +# We install the tool into a location that is version-dependent, and build it via this target. Since the name of +# the tool under that path is version-dependent, we never have to rebuild it, as it's either the correct version, or +# does not exist. +ifneq ($$(_gotools_version),) +_gotools_versioned_bin_path := $(GOTOOLS_ROOT)/versioned/$$(_gotools_pkg)/$$(_gotools_version)/$$(_gotools_bin_name) +$$(_gotools_versioned_bin_path): + @echo "+ $$(notdir $$@)" + $$(SILENT)GOBIN="$$(dir $$@)" go install "$(strip $(2))" + +# To make the tool accessible in the canonical location, we create a symlink. This only depends on the versioned path, +# i.e., only needs to be recreated when the version is bumped. +$$(_gotools_canonical_bin_path): $$(_gotools_versioned_bin_path) + @mkdir -p "$(GOTOOLS_BIN)" + $$(SILENT)ln -sf "$$<" "$$@" + +else + +# Tool is specified with an absolute path without a version. Take info from go.mod file in the respective directory. +$$(_gotools_canonical_bin_path): $$(_gotools_mod_root)/go.mod $$(_gotools_mod_root)/go.sum + @echo "+ $$(notdir $$@)" + $$(SILENT)cd "$$(dir $$<)" && GOBIN="$$(dir $$@)" go install "$(strip $(2))" + +endif +endif + +_GOTOOLS_ALL_GOTOOLS += $$(_gotools_canonical_bin_path) + +endef + +go-tool = $(eval $(call go-tool-impl,$(1),$(2),$(3))) + + +.PHONY: gotools-clean +gotools-clean: + @echo "+ $@" + @git clean -dfX "$(GOTOOLS_ROOT)" # don't use rm -rf to avoid catastrophes + +.PHONY: gotools-all +gotools-all: + @# these cannot be dependencies, as we need `$(_GOTOOLS_ALL_GOTOOLS)` to be + @# evaluated when the target is actually run. + $(MAKE) $(_GOTOOLS_ALL_GOTOOLS) diff --git a/internal/metrics/tools/tools-import.go b/internal/metrics/tools/tools-import.go new file mode 100644 index 0000000..7006aaf --- /dev/null +++ b/internal/metrics/tools/tools-import.go @@ -0,0 +1,13 @@ +//go:build tools + +package tools + +// This file declares dependencies on tool for `go mod` purposes. +// See https://github.com/golang/go/wiki/Modules#how-can-i-track-tool-dependencies-for-a-module +// for an explanation of the approach. + +import ( + // Tool dependencies, not used anywhere in the code. + _ "google.golang.org/grpc/cmd/protoc-gen-go-grpc" + _ "google.golang.org/protobuf/cmd/protoc-gen-go" +) diff --git a/internal/metrics/tools/tools.go b/internal/metrics/tools/tools.go new file mode 100644 index 0000000..b2918ed --- /dev/null +++ b/internal/metrics/tools/tools.go @@ -0,0 +1,3 @@ +package tools + +// This file only exists to prevent package loading errors for this directory.