Skip to content

Commit

Permalink
Add support for metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
porridge committed May 7, 2024
1 parent 4da0963 commit 50fc8c1
Show file tree
Hide file tree
Showing 25 changed files with 1,129 additions and 21 deletions.
15 changes: 6 additions & 9 deletions cmd/fetch.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package cmd

import (
"log/slog"
"os"
"strings"
"time"

"github.com/stackrox/image-prefetcher/internal"
"github.com/stackrox/image-prefetcher/internal/logging"

"github.com/spf13/cobra"
)
Expand All @@ -19,11 +19,7 @@ var fetchCmd = &cobra.Command{
It talks to Container Runtime Interface API to pull images in parallel, with retries.`,
RunE: func(cmd *cobra.Command, args []string) error {
opts := &slog.HandlerOptions{AddSource: true}
if debug {
opts.Level = slog.LevelDebug
}
logger := slog.New(slog.NewTextHandler(os.Stderr, opts))
logger := logging.GetLogger()
timing := internal.TimingConfig{
ImageListTimeout: imageListTimeout,
InitialPullAttemptTimeout: initialPullAttemptTimeout,
Expand All @@ -37,15 +33,15 @@ It talks to Container Runtime Interface API to pull images in parallel, with ret
return err
}
imageList = append(imageList, args...)
return internal.Run(logger, criSocket, dockerConfigJSONPath, timing, imageList...)
return internal.Run(logger, criSocket, dockerConfigJSONPath, timing, metricsEndpoint, imageList...)
},
}

var (
criSocket string
dockerConfigJSONPath string
imageListFile string
debug bool
metricsEndpoint string
imageListTimeout = time.Minute
initialPullAttemptTimeout = 30 * time.Second
maxPullAttemptTimeout = 5 * time.Minute
Expand All @@ -56,11 +52,12 @@ var (

func init() {
rootCmd.AddCommand(fetchCmd)
logging.AddFlags(fetchCmd.Flags())

fetchCmd.Flags().StringVar(&criSocket, "cri-socket", "/run/containerd/containerd.sock", "Path to CRI UNIX socket.")
fetchCmd.Flags().StringVar(&dockerConfigJSONPath, "docker-config", "", "Path to docker config json file.")
fetchCmd.Flags().StringVar(&imageListFile, "image-list-file", "", "Path to text file containing images to pull (one per line).")
fetchCmd.Flags().BoolVar(&debug, "debug", false, "Whether to enable debug logging.")
fetchCmd.Flags().StringVar(&metricsEndpoint, "metrics-endpoint", "", "A host:port to submit image pull metrics to.")

fetchCmd.Flags().DurationVar(&imageListTimeout, "image-list-timeout", imageListTimeout, "Timeout for image list calls (for debugging).")
fetchCmd.Flags().DurationVar(&initialPullAttemptTimeout, "initial-pull-attempt-timeout", initialPullAttemptTimeout, "Timeout for initial image pull call. Each subsequent attempt doubles it until max.")
Expand Down
29 changes: 29 additions & 0 deletions cmd/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package cmd

import (
"github.com/stackrox/image-prefetcher/internal/logging"
"github.com/stackrox/image-prefetcher/internal/metrics/server"

"github.com/spf13/cobra"
)

// aggregateMetricsCmd represents the aggregate-metrics command
var aggregateMetricsCmd = &cobra.Command{
Use: "aggregate-metrics",
Short: "Accept metrics submissions and serve them.",
Long: `This subcommand is intended to run in a single pod.
It serves a gRPC endpoint to which individual metrics can be submitted,
and from which the aggregate metrics can be fetched.`,
RunE: func(cmd *cobra.Command, args []string) error {
return server.Run(logging.GetLogger(), port)
},
}

var port int

func init() {
rootCmd.AddCommand(aggregateMetricsCmd)
logging.AddFlags(aggregateMetricsCmd.Flags())
aggregateMetricsCmd.Flags().IntVar(&port, "port", 8443, "Port for metrics gRPC endpoint to listen on.")
}
45 changes: 45 additions & 0 deletions deploy/deployment.yaml.gotpl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,48 @@ roleRef:
name: privileged-scc-use
---
{{ end }}
{{ if .CollectMetrics }}
apiVersion: v1
kind: Pod
metadata:
name: {{ .Name }}-metrics
labels:
app: {{ .Name }}-metrics
spec:
containers:
- name: aggregator
image: {{ .Image }}:{{ .Version }}
args:
- "aggregate-metrics"
- "--debug"
ports:
- containerPort: 8443
readinessProbe:
grpc:
port: 8443
resources:
requests:
cpu: "5m"
memory: "16Mi"
limits:
cpu: "100m"
memory: "64Mi"
securityContext:
readOnlyRootFilesystem: true
---
apiVersion: v1
kind: Service
metadata:
name: {{ .Name }}-metrics
spec:
ports:
- port: 8443
protocol: TCP
selector:
app: {{ .Name }}-metrics
type: LoadBalancer
---
{{ end }}
apiVersion: apps/v1
kind: DaemonSet
metadata:
Expand Down Expand Up @@ -64,6 +106,9 @@ spec:
{{ else }}
- "--cri-socket=/tmp/cri/containerd.sock"
{{ end }}
{{ if .CollectMetrics }}
- "--metrics-endpoint={{ .Name }}-metrics:8443"
{{ end }}
resources:
requests:
cpu: "20m"
Expand Down
9 changes: 6 additions & 3 deletions deploy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ type settings struct {
Secret string
IsCRIO bool
NeedsPrivileged bool
CollectMetrics bool
}

const imageRepo = "quay.io/stackrox-io/image-prefetcher"

//go:embed deployment.yaml.gotpl
var daemonSetTemplate string
var deploymentTemplate string

func main() {
if len(os.Args) < 4 {
println("Usage:", os.Args[0], "<name> <version> vanilla|ocp [secret]")
println("Usage:", os.Args[0], "<name> <version> vanilla|ocp [secret] [collect-metrics]")
os.Exit(1)
}
name := os.Args[1]
Expand All @@ -33,6 +34,7 @@ func main() {
if len(os.Args) > 4 {
secret = os.Args[4]
}
collectMetrics := len(os.Args) > 5 && os.Args[5] == "collect-metrics"

s := settings{
Name: name,
Expand All @@ -41,8 +43,9 @@ func main() {
Secret: secret,
IsCRIO: isOcp,
NeedsPrivileged: isOcp,
CollectMetrics: collectMetrics,
}
tmpl := template.Must(template.New("deployment").Parse(daemonSetTemplate))
tmpl := template.Must(template.New("deployment").Parse(deploymentTemplate))
if err := tmpl.Execute(os.Stdout, s); err != nil {
log.Fatal(err)
}
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ go 1.21
toolchain go1.21.7

require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/google/uuid v1.6.0
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
k8s.io/cri-api v0.29.3
k8s.io/kubernetes v1.29.3
)
Expand Down Expand Up @@ -51,12 +55,10 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/apiextensions-apiserver v0.29.3 // indirect
k8s.io/apimachinery v0.29.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
Expand All @@ -17,6 +19,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
Expand Down
22 changes: 22 additions & 0 deletions internal/logging/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package logging

import (
"log/slog"
"os"

"github.com/spf13/pflag"
)

var debug bool

func GetLogger() *slog.Logger {
opts := &slog.HandlerOptions{AddSource: true}
if debug {
opts.Level = slog.LevelDebug
}
return slog.New(slog.NewTextHandler(os.Stderr, opts))
}

func AddFlags(flags *pflag.FlagSet) {
flags.BoolVar(&debug, "debug", false, "Whether to enable debug logging.")
}
53 changes: 46 additions & 7 deletions internal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"sync"
"time"

metricsProto "github.com/stackrox/image-prefetcher/internal/metrics/gen"
"github.com/stackrox/image-prefetcher/internal/metrics/submitter"

"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
criV1 "k8s.io/cri-api/pkg/apis/runtime/v1"
Expand All @@ -24,20 +28,30 @@ type TimingConfig struct {
MaxPullAttemptDelay time.Duration
}

func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, timing TimingConfig, imageNames ...string) error {
func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, timing TimingConfig, metricsEndpoint string, imageNames ...string) error {
ctx, cancel := context.WithTimeout(context.Background(), timing.OverallTimeout)
defer cancel()

clientConn, err := grpc.DialContext(ctx, "unix://"+criSocketPath, grpc.WithTransportCredentials(insecure.NewCredentials()))
criConn, err := grpc.DialContext(ctx, "unix://"+criSocketPath, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("failed to dial CRI socket %q: %w", criSocketPath, err)
}
client := criV1.NewImageServiceClient(clientConn)
criClient := criV1.NewImageServiceClient(criConn)

if err := listImagesForDebugging(ctx, logger, client, timing.ImageListTimeout, "before"); err != nil {
if err := listImagesForDebugging(ctx, logger, criClient, timing.ImageListTimeout, "before"); err != nil {
return fmt.Errorf("failed to list images for debugging before pulling: %w", err)
}

var metricsSink *submitter.Submitter
if metricsEndpoint != "" {
metricsConn, err := grpc.DialContext(ctx, metricsEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("failed to dial metrics endpoint %q: %w", metricsEndpoint, err)
}
metricsSink = submitter.NewSubmitter(logger, metricsProto.NewMetricsClient(metricsConn))
go metricsSink.Run(ctx)
}

kr := credentialprovider.BasicDockerKeyring{}
if err := loadPullSecret(logger, &kr, dockerConfigJSONPath); err != nil {
return fmt.Errorf("failed to load image pull secrets: %w", err)
Expand All @@ -54,12 +68,13 @@ func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string,
},
Auth: auth,
}
go pullImageWithRetries(ctx, logger.With("image", imageName, "authNum", i), &wg, client, request, timing)
go pullImageWithRetries(ctx, logger.With("image", imageName, "authNum", i), &wg, criClient, metricsSink.Chan(), imageName, request, timing)
}
}
wg.Wait()
logger.Info("pulling images finished")
if err := listImagesForDebugging(ctx, logger, client, timing.ImageListTimeout, "after"); err != nil {
metricsSink.Await()
if err := listImagesForDebugging(ctx, logger, criClient, timing.ImageListTimeout, "after"); err != nil {
return fmt.Errorf("failed to list images for debugging after pulling: %w", err)
}
return nil
Expand Down Expand Up @@ -122,7 +137,7 @@ func getAuthsForImage(ctx context.Context, logger *slog.Logger, kr credentialpro
return auths
}

func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.WaitGroup, client criV1.ImageServiceClient, request *criV1.PullImageRequest, timing TimingConfig) {
func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.WaitGroup, client criV1.ImageServiceClient, metricsSink chan<- *metricsProto.Metric, name string, request *criV1.PullImageRequest, timing TimingConfig) {
defer wg.Done()
attemptTimeout := timing.InitialPullAttemptTimeout
delay := timing.InitialPullAttemptDelay
Expand All @@ -135,9 +150,33 @@ func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.Wai
cancel()
if err == nil {
logger.InfoContext(ctx, "image pulled successfully", "response", response, "elapsed", elapsed)
imageStatus, err := client.ImageStatus(ctx, &criV1.ImageStatusRequest{
Image: &criV1.ImageSpec{
Image: response.ImageRef,
},
Verbose: true,
})
var size uint64
if err != nil {
logger.WarnContext(ctx, "failed to obtain image status", "image", response.ImageRef, "error", err)
} else {
size = imageStatus.GetImage().GetSize_()
}
metricsSink <- &metricsProto.Metric{
AttemptId: uuid.NewString(),
Image: name,
DurationMs: uint64(elapsed.Milliseconds()),
Size: size,
}
return
}
logger.ErrorContext(ctx, "image failed to pull", "error", err, "timeout", attemptTimeout, "elapsed", elapsed)
metricsSink <- &metricsProto.Metric{
AttemptId: uuid.NewString(),
Image: name,
DurationMs: uint64(elapsed.Milliseconds()),
Error: err.Error(),
}
if ctx.Err() != nil {
logger.ErrorContext(ctx, "not retrying any more", "error", ctx.Err())
return
Expand Down
1 change: 1 addition & 0 deletions internal/metrics/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/.gotools/
Loading

0 comments on commit 50fc8c1

Please sign in to comment.