From f8b8669119e4f35276cd5e73ffe4b2c14fc9c910 Mon Sep 17 00:00:00 2001 From: Kritka Sahni <122665407+kritkasahni-google@users.noreply.github.com> Date: Thu, 14 Dec 2023 13:59:52 -0800 Subject: [PATCH] Export app image and cache image in parallel (#1247) * Export app image and cache image in parallel Signed-off-by: kritka sahni * Update platform/resolve_inputs.go Co-authored-by: Natalie Arellano Signed-off-by: Kritka Sahni <122665407+kritkasahni-google@users.noreply.github.com> * Fix warning message for parallel export without cache image. Signed-off-by: kritka sahni * Add test for parallel export enabled. Signed-off-by: kritka sahni * Fix test for parallel export without cache image. Signed-off-by: kritka sahni * Fix test message for parallel export without cache image. Signed-off-by: kritka sahni --------- Signed-off-by: kritka sahni Signed-off-by: Kritka Sahni <122665407+kritkasahni-google@users.noreply.github.com> Co-authored-by: Natalie Arellano --- acceptance/exporter_test.go | 24 +++++++ cmd/lifecycle/cli/flags.go | 5 ++ cmd/lifecycle/creator.go | 1 + cmd/lifecycle/exporter.go | 67 +++++++++++++------ layers/factory.go | 92 +++++++++++++++++--------- platform/defaults.go | 3 + platform/lifecycle_inputs.go | 2 + platform/resolve_create_inputs_test.go | 12 ++++ platform/resolve_inputs.go | 10 +++ 9 files changed, 163 insertions(+), 53 deletions(-) diff --git a/acceptance/exporter_test.go b/acceptance/exporter_test.go index e374dd7e5..b6609202c 100644 --- a/acceptance/exporter_test.go +++ b/acceptance/exporter_test.go @@ -309,6 +309,29 @@ func testExporterFunc(platformAPI string) func(t *testing.T, when spec.G, it spe exportedImageName = exportTest.RegRepoName("some-exported-image-" + h.RandString(10)) exportArgs = append(exportArgs, exportedImageName) + output := h.DockerRun(t, + exportImage, + h.WithFlags( + "--env", "CNB_PLATFORM_API="+platformAPI, + "--env", "CNB_REGISTRY_AUTH="+exportRegAuthConfig, + "--network", exportRegNetwork, + ), + h.WithArgs(exportArgs...), + ) + h.AssertStringContains(t, output, "Saving "+exportedImageName) + // To detect whether the export of cacheImage and exportedImage is successful + h.Run(t, exec.Command("docker", "pull", exportedImageName)) + assertImageOSAndArchAndCreatedAt(t, exportedImageName, exportTest, imgutil.NormalizedDateTime) + h.Run(t, exec.Command("docker", "pull", cacheImageName)) + }) + + it("is created with parallel export enabled", func() { + cacheImageName := exportTest.RegRepoName("some-cache-image-" + h.RandString(10)) + exportFlags := []string{"-cache-image", cacheImageName, "-parallel"} + exportArgs := append([]string{ctrPath(exporterPath)}, exportFlags...) + exportedImageName = exportTest.RegRepoName("some-exported-image-" + h.RandString(10)) + exportArgs = append(exportArgs, exportedImageName) + output := h.DockerRun(t, exportImage, h.WithFlags( @@ -322,6 +345,7 @@ func testExporterFunc(platformAPI string) func(t *testing.T, when spec.G, it spe h.Run(t, exec.Command("docker", "pull", exportedImageName)) assertImageOSAndArchAndCreatedAt(t, exportedImageName, exportTest, imgutil.NormalizedDateTime) + h.Run(t, exec.Command("docker", "pull", cacheImageName)) }) it("is created with empty layer", func() { diff --git a/cmd/lifecycle/cli/flags.go b/cmd/lifecycle/cli/flags.go index 411390352..49ba2a698 100644 --- a/cmd/lifecycle/cli/flags.go +++ b/cmd/lifecycle/cli/flags.go @@ -112,6 +112,11 @@ func FlagPlatformDir(platformDir *string) { flagSet.StringVar(platformDir, "platform", *platformDir, "path to platform directory") } +// FlagParallelExport parses `parallel` flag +func FlagParallelExport(parallelExport *bool) { + flagSet.BoolVar(parallelExport, "parallel", *parallelExport, "export app image and cache image in parallel") +} + func FlagPreviousImage(previousImage *string) { flagSet.StringVar(previousImage, "previous-image", *previousImage, "reference to previous image") } diff --git a/cmd/lifecycle/creator.go b/cmd/lifecycle/creator.go index 8ca977be5..ee5392aca 100644 --- a/cmd/lifecycle/creator.go +++ b/cmd/lifecycle/creator.go @@ -52,6 +52,7 @@ func (c *createCmd) DefineFlags() { cli.FlagLauncherPath(&c.LauncherPath) cli.FlagLayersDir(&c.LayersDir) cli.FlagOrderPath(&c.OrderPath) + cli.FlagParallelExport(&c.ParallelExport) cli.FlagPlatformDir(&c.PlatformDir) cli.FlagPreviousImage(&c.PreviousImageRef) cli.FlagProcessType(&c.DefaultProcessType) diff --git a/cmd/lifecycle/exporter.go b/cmd/lifecycle/exporter.go index 396df08c8..f980faab6 100644 --- a/cmd/lifecycle/exporter.go +++ b/cmd/lifecycle/exporter.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "os" "path/filepath" @@ -18,6 +19,7 @@ import ( "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" "github.com/buildpacks/lifecycle/auth" "github.com/buildpacks/lifecycle/buildpack" @@ -72,6 +74,7 @@ func (e *exportCmd) DefineFlags() { cli.FlagLaunchCacheDir(&e.LaunchCacheDir) cli.FlagLauncherPath(&e.LauncherPath) cli.FlagLayersDir(&e.LayersDir) + cli.FlagParallelExport(&e.ParallelExport) cli.FlagProcessType(&e.DefaultProcessType) cli.FlagProjectMetadataPath(&e.ProjectMetadataPath) cli.FlagReportPath(&e.ReportPath) @@ -170,6 +173,12 @@ func (e *exportCmd) export(group buildpack.Group, cacheStore phase.Cache, analyz return err } + g := new(errgroup.Group) + var ctx context.Context + + if e.ParallelExport { + g, ctx = errgroup.WithContext(context.Background()) + } exporter := &phase.Exporter{ Buildpacks: group.Group, LayerFactory: &layers.Factory{ @@ -177,6 +186,7 @@ func (e *exportCmd) export(group buildpack.Group, cacheStore phase.Cache, analyz UID: e.UID, GID: e.GID, Logger: cmd.DefaultLogger, + Ctx: ctx, }, Logger: cmd.DefaultLogger, PlatformAPI: e.PlatformAPI, @@ -203,31 +213,48 @@ func (e *exportCmd) export(group buildpack.Group, cacheStore phase.Cache, analyz return err } - report, err := exporter.Export(phase.ExportOptions{ - AdditionalNames: e.AdditionalTags, - AppDir: e.AppDir, - DefaultProcessType: e.DefaultProcessType, - ExtendedDir: e.ExtendedDir, - LauncherConfig: launcherConfig(e.LauncherPath, e.LauncherSBOMDir), - LayersDir: e.LayersDir, - OrigMetadata: analyzedMD.LayersMetadata, - Project: projectMD, - RunImageRef: runImageID, - RunImageForExport: runImageForExport, - WorkingImage: appImage, + g.Go(func() error { + report, err := exporter.Export(phase.ExportOptions{ + AdditionalNames: e.AdditionalTags, + AppDir: e.AppDir, + DefaultProcessType: e.DefaultProcessType, + ExtendedDir: e.ExtendedDir, + LauncherConfig: launcherConfig(e.LauncherPath, e.LauncherSBOMDir), + LayersDir: e.LayersDir, + OrigMetadata: analyzedMD.LayersMetadata, + Project: projectMD, + RunImageRef: runImageID, + RunImageForExport: runImageForExport, + WorkingImage: appImage, + }) + if err != nil { + return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "export") + } + if err = files.Handler.WriteReport(e.ReportPath, &report); err != nil { + return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "write export report") + } + return nil }) - if err != nil { - return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "export") - } - if err = files.Handler.WriteReport(e.ReportPath, &report); err != nil { - return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "write export report") + + if !e.ParallelExport { + if err := g.Wait(); err != nil { + return err + } } - if cacheStore != nil { - if cacheErr := exporter.Cache(e.LayersDir, cacheStore); cacheErr != nil { - cmd.DefaultLogger.Warnf("Failed to export cache: %v\n", cacheErr) + g.Go(func() error { + if cacheStore != nil { + if cacheErr := exporter.Cache(e.LayersDir, cacheStore); cacheErr != nil { + cmd.DefaultLogger.Warnf("Failed to export cache: %v\n", cacheErr) + } } + return nil + }) + + if err = g.Wait(); err != nil { + return err } + return nil } diff --git a/layers/factory.go b/layers/factory.go index d73922518..8808e01b2 100644 --- a/layers/factory.go +++ b/layers/factory.go @@ -1,9 +1,12 @@ package layers import ( + "context" "os" "path/filepath" "strings" + "sync" + "time" v1 "github.com/google/go-containerregistry/pkg/v1" @@ -20,14 +23,15 @@ const ( ProcessTypesLayerName = "Buildpacks Process Types" SBOMLayerName = "Software Bill-of-Materials" SliceLayerName = "Application Slice: %d" + processing = "processing" ) type Factory struct { ArtifactsDir string // ArtifactsDir is the directory where layer files are written UID, GID int // UID and GID are used to normalize layer entries Logger log.Logger - - tarHashes map[string]string // tarHases Stores hashes of layer tarballs for reuse between the export and cache steps. + Ctx context.Context + tarHashes sync.Map // tarHases Stores hashes of layer tarballs for reuse between the export and cache steps. } type Layer struct { @@ -38,44 +42,66 @@ type Layer struct { } func (f *Factory) writeLayer(id, createdBy string, addEntries func(tw *archive.NormalizingTarWriter) error) (layer Layer, err error) { + if f.Ctx == nil { + f.Ctx = context.TODO() + } tarPath := filepath.Join(f.ArtifactsDir, escape(id)+".tar") - if f.tarHashes == nil { - f.tarHashes = make(map[string]string) + for { + sha, loaded := f.tarHashes.LoadOrStore(tarPath, processing) + if loaded { + select { + case <-f.Ctx.Done(): + return Layer{}, f.Ctx.Err() + default: + shaString := sha.(string) + if shaString == processing { + // another goroutine is processing this layer, wait and try again + time.Sleep(500 * time.Millisecond) + continue + } + + f.Logger.Debugf("Reusing tarball for layer %q with SHA: %s\n", id, shaString) + return Layer{ + ID: id, + TarPath: tarPath, + Digest: shaString, + History: v1.History{CreatedBy: createdBy}, + }, nil + } + } + break } - if sha, ok := f.tarHashes[tarPath]; ok { - f.Logger.Debugf("Reusing tarball for layer %q with SHA: %s\n", id, sha) + + select { + case <-f.Ctx.Done(): + return Layer{}, f.Ctx.Err() + default: + lw, err := newFileLayerWriter(tarPath) + if err != nil { + return Layer{}, err + } + defer func() { + if closeErr := lw.Close(); err == nil { + err = closeErr + } + }() + tw := tarWriter(lw) + if err := addEntries(tw); err != nil { + return Layer{}, err + } + + if err := tw.Close(); err != nil { + return Layer{}, err + } + digest := lw.Digest() + f.tarHashes.Store(tarPath, digest) return Layer{ ID: id, + Digest: digest, TarPath: tarPath, - Digest: sha, History: v1.History{CreatedBy: createdBy}, - }, nil - } - lw, err := newFileLayerWriter(tarPath) - if err != nil { - return Layer{}, err - } - defer func() { - if closeErr := lw.Close(); err == nil { - err = closeErr - } - }() - tw := tarWriter(lw) - if err := addEntries(tw); err != nil { - return Layer{}, err - } - - if err := tw.Close(); err != nil { - return Layer{}, err + }, err } - digest := lw.Digest() - f.tarHashes[tarPath] = digest - return Layer{ - ID: id, - Digest: digest, - TarPath: tarPath, - History: v1.History{CreatedBy: createdBy}, - }, err } func escape(id string) string { diff --git a/platform/defaults.go b/platform/defaults.go index fa3931f75..60c131a60 100644 --- a/platform/defaults.go +++ b/platform/defaults.go @@ -189,6 +189,9 @@ const ( // EnvKanikoCacheTTL is the amount of time to persist layers cached by kaniko during the `extend` phase. EnvKanikoCacheTTL = "CNB_KANIKO_CACHE_TTL" + + // EnvParallelExport is a flag used to instruct the lifecycle to export of application image and cache image in parallel, if true. + EnvParallelExport = "CNB_PARALLEL_EXPORT" ) // DefaultKanikoCacheTTL is the default kaniko cache TTL (2 weeks). diff --git a/platform/lifecycle_inputs.go b/platform/lifecycle_inputs.go index faf2e4fe5..0e0d4afa6 100644 --- a/platform/lifecycle_inputs.go +++ b/platform/lifecycle_inputs.go @@ -55,6 +55,7 @@ type LifecycleInputs struct { GID int ForceRebase bool SkipLayers bool + ParallelExport bool UseDaemon bool UseLayout bool AdditionalTags str.Slice // str.Slice satisfies the `Value` interface required by the `flag` package @@ -131,6 +132,7 @@ func NewLifecycleInputs(platformAPI *api.Version) *LifecycleInputs { KanikoDir: "/kaniko", LaunchCacheDir: os.Getenv(EnvLaunchCacheDir), SkipLayers: skipLayers, + ParallelExport: boolEnv(EnvParallelExport), // Images used by the lifecycle during the build diff --git a/platform/resolve_create_inputs_test.go b/platform/resolve_create_inputs_test.go index 105afcad6..cb7e8d04f 100644 --- a/platform/resolve_create_inputs_test.go +++ b/platform/resolve_create_inputs_test.go @@ -45,6 +45,18 @@ func testResolveCreateInputs(platformAPI string) func(t *testing.T, when spec.G, h.SkipIf(t, api.MustParse(platformAPI).LessThan("0.12"), "") }) + when("parallel export is enabled and cache image ref is blank", func() { + it("warns", func() { + inputs.ParallelExport = true + inputs.CacheImageRef = "" + inputs.CacheDir = "" + err := platform.ResolveInputs(platform.Create, inputs, logger) + h.AssertNil(t, err) + expected := "Parallel export has been enabled, but it has not taken effect because no cache has been specified." + h.AssertLogEntry(t, logHandler, expected) + }) + }) + when("run image", func() { when("not provided", func() { it.Before(func() { diff --git a/platform/resolve_inputs.go b/platform/resolve_inputs.go index 257b88ee4..6acf28071 100644 --- a/platform/resolve_inputs.go +++ b/platform/resolve_inputs.go @@ -36,6 +36,7 @@ func ResolveInputs(phase LifecyclePhase, i *LifecycleInputs, logger log.Logger) CheckLaunchCache, ValidateImageRefs, ValidateTargetsAreSameRegistry, + CheckParallelExport, ) case Build: // nop @@ -47,6 +48,7 @@ func ResolveInputs(phase LifecyclePhase, i *LifecycleInputs, logger log.Logger) CheckLaunchCache, ValidateImageRefs, ValidateTargetsAreSameRegistry, + CheckParallelExport, ) case Detect: // nop @@ -226,6 +228,14 @@ func ValidateRebaseRunImage(i *LifecycleInputs, _ log.Logger) error { } } +// CheckParallelExport will warn when parallel export is enabled without a cache. +func CheckParallelExport(i *LifecycleInputs, logger log.Logger) error { + if i.ParallelExport && (i.CacheImageRef == "" && i.CacheDir == "") { + logger.Warn("Parallel export has been enabled, but it has not taken effect because no cache has been specified.") + } + return nil +} + // ValidateTargetsAreSameRegistry ensures all output images are on the same registry. func ValidateTargetsAreSameRegistry(i *LifecycleInputs, _ log.Logger) error { if i.UseDaemon {