Skip to content

Commit

Permalink
Export app image and cache image in parallel (#1247)
Browse files Browse the repository at this point in the history
* Export app image and cache image in parallel

Signed-off-by: kritka sahni <[email protected]>

* Update platform/resolve_inputs.go

Co-authored-by: Natalie Arellano <[email protected]>
Signed-off-by: Kritka Sahni <[email protected]>

* Fix warning message for parallel export without cache image.

Signed-off-by: kritka sahni <[email protected]>

* Add test for parallel export enabled.

Signed-off-by: kritka sahni <[email protected]>

* Fix test for parallel export without cache image.

Signed-off-by: kritka sahni <[email protected]>

* Fix test message for parallel export without cache image.

Signed-off-by: kritka sahni <[email protected]>

---------

Signed-off-by: kritka sahni <[email protected]>
Signed-off-by: Kritka Sahni <[email protected]>
Co-authored-by: Natalie Arellano <[email protected]>
  • Loading branch information
kritkasahni-google and natalieparellano authored Dec 14, 2023
1 parent b8a92ac commit f8b8669
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 53 deletions.
24 changes: 24 additions & 0 deletions acceptance/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,29 @@ func testExporterFunc(platformAPI string) func(t *testing.T, when spec.G, it spe
exportedImageName = exportTest.RegRepoName("some-exported-image-" + h.RandString(10))
exportArgs = append(exportArgs, exportedImageName)

output := h.DockerRun(t,
exportImage,
h.WithFlags(
"--env", "CNB_PLATFORM_API="+platformAPI,
"--env", "CNB_REGISTRY_AUTH="+exportRegAuthConfig,
"--network", exportRegNetwork,
),
h.WithArgs(exportArgs...),
)
h.AssertStringContains(t, output, "Saving "+exportedImageName)
// To detect whether the export of cacheImage and exportedImage is successful
h.Run(t, exec.Command("docker", "pull", exportedImageName))
assertImageOSAndArchAndCreatedAt(t, exportedImageName, exportTest, imgutil.NormalizedDateTime)
h.Run(t, exec.Command("docker", "pull", cacheImageName))
})

it("is created with parallel export enabled", func() {
cacheImageName := exportTest.RegRepoName("some-cache-image-" + h.RandString(10))
exportFlags := []string{"-cache-image", cacheImageName, "-parallel"}
exportArgs := append([]string{ctrPath(exporterPath)}, exportFlags...)
exportedImageName = exportTest.RegRepoName("some-exported-image-" + h.RandString(10))
exportArgs = append(exportArgs, exportedImageName)

output := h.DockerRun(t,
exportImage,
h.WithFlags(
Expand All @@ -322,6 +345,7 @@ func testExporterFunc(platformAPI string) func(t *testing.T, when spec.G, it spe

h.Run(t, exec.Command("docker", "pull", exportedImageName))
assertImageOSAndArchAndCreatedAt(t, exportedImageName, exportTest, imgutil.NormalizedDateTime)
h.Run(t, exec.Command("docker", "pull", cacheImageName))
})

it("is created with empty layer", func() {
Expand Down
5 changes: 5 additions & 0 deletions cmd/lifecycle/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func FlagPlatformDir(platformDir *string) {
flagSet.StringVar(platformDir, "platform", *platformDir, "path to platform directory")
}

// FlagParallelExport parses `parallel` flag
func FlagParallelExport(parallelExport *bool) {
flagSet.BoolVar(parallelExport, "parallel", *parallelExport, "export app image and cache image in parallel")
}

func FlagPreviousImage(previousImage *string) {
flagSet.StringVar(previousImage, "previous-image", *previousImage, "reference to previous image")
}
Expand Down
1 change: 1 addition & 0 deletions cmd/lifecycle/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (c *createCmd) DefineFlags() {
cli.FlagLauncherPath(&c.LauncherPath)
cli.FlagLayersDir(&c.LayersDir)
cli.FlagOrderPath(&c.OrderPath)
cli.FlagParallelExport(&c.ParallelExport)
cli.FlagPlatformDir(&c.PlatformDir)
cli.FlagPreviousImage(&c.PreviousImageRef)
cli.FlagProcessType(&c.DefaultProcessType)
Expand Down
67 changes: 47 additions & 20 deletions cmd/lifecycle/exporter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"os"
"path/filepath"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/buildpacks/lifecycle/auth"
"github.com/buildpacks/lifecycle/buildpack"
Expand Down Expand Up @@ -72,6 +74,7 @@ func (e *exportCmd) DefineFlags() {
cli.FlagLaunchCacheDir(&e.LaunchCacheDir)
cli.FlagLauncherPath(&e.LauncherPath)
cli.FlagLayersDir(&e.LayersDir)
cli.FlagParallelExport(&e.ParallelExport)
cli.FlagProcessType(&e.DefaultProcessType)
cli.FlagProjectMetadataPath(&e.ProjectMetadataPath)
cli.FlagReportPath(&e.ReportPath)
Expand Down Expand Up @@ -170,13 +173,20 @@ func (e *exportCmd) export(group buildpack.Group, cacheStore phase.Cache, analyz
return err
}

g := new(errgroup.Group)
var ctx context.Context

if e.ParallelExport {
g, ctx = errgroup.WithContext(context.Background())
}
exporter := &phase.Exporter{
Buildpacks: group.Group,
LayerFactory: &layers.Factory{
ArtifactsDir: artifactsDir,
UID: e.UID,
GID: e.GID,
Logger: cmd.DefaultLogger,
Ctx: ctx,
},
Logger: cmd.DefaultLogger,
PlatformAPI: e.PlatformAPI,
Expand All @@ -203,31 +213,48 @@ func (e *exportCmd) export(group buildpack.Group, cacheStore phase.Cache, analyz
return err
}

report, err := exporter.Export(phase.ExportOptions{
AdditionalNames: e.AdditionalTags,
AppDir: e.AppDir,
DefaultProcessType: e.DefaultProcessType,
ExtendedDir: e.ExtendedDir,
LauncherConfig: launcherConfig(e.LauncherPath, e.LauncherSBOMDir),
LayersDir: e.LayersDir,
OrigMetadata: analyzedMD.LayersMetadata,
Project: projectMD,
RunImageRef: runImageID,
RunImageForExport: runImageForExport,
WorkingImage: appImage,
g.Go(func() error {
report, err := exporter.Export(phase.ExportOptions{
AdditionalNames: e.AdditionalTags,
AppDir: e.AppDir,
DefaultProcessType: e.DefaultProcessType,
ExtendedDir: e.ExtendedDir,
LauncherConfig: launcherConfig(e.LauncherPath, e.LauncherSBOMDir),
LayersDir: e.LayersDir,
OrigMetadata: analyzedMD.LayersMetadata,
Project: projectMD,
RunImageRef: runImageID,
RunImageForExport: runImageForExport,
WorkingImage: appImage,
})
if err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "export")
}
if err = files.Handler.WriteReport(e.ReportPath, &report); err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "write export report")
}
return nil
})
if err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "export")
}
if err = files.Handler.WriteReport(e.ReportPath, &report); err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "write export report")

if !e.ParallelExport {
if err := g.Wait(); err != nil {
return err
}
}

if cacheStore != nil {
if cacheErr := exporter.Cache(e.LayersDir, cacheStore); cacheErr != nil {
cmd.DefaultLogger.Warnf("Failed to export cache: %v\n", cacheErr)
g.Go(func() error {
if cacheStore != nil {
if cacheErr := exporter.Cache(e.LayersDir, cacheStore); cacheErr != nil {
cmd.DefaultLogger.Warnf("Failed to export cache: %v\n", cacheErr)
}
}
return nil
})

if err = g.Wait(); err != nil {
return err
}

return nil
}

Expand Down
92 changes: 59 additions & 33 deletions layers/factory.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package layers

import (
"context"
"os"
"path/filepath"
"strings"
"sync"
"time"

v1 "github.com/google/go-containerregistry/pkg/v1"

Expand All @@ -20,14 +23,15 @@ const (
ProcessTypesLayerName = "Buildpacks Process Types"
SBOMLayerName = "Software Bill-of-Materials"
SliceLayerName = "Application Slice: %d"
processing = "processing"
)

type Factory struct {
ArtifactsDir string // ArtifactsDir is the directory where layer files are written
UID, GID int // UID and GID are used to normalize layer entries
Logger log.Logger

tarHashes map[string]string // tarHases Stores hashes of layer tarballs for reuse between the export and cache steps.
Ctx context.Context
tarHashes sync.Map // tarHases Stores hashes of layer tarballs for reuse between the export and cache steps.
}

type Layer struct {
Expand All @@ -38,44 +42,66 @@ type Layer struct {
}

func (f *Factory) writeLayer(id, createdBy string, addEntries func(tw *archive.NormalizingTarWriter) error) (layer Layer, err error) {
if f.Ctx == nil {
f.Ctx = context.TODO()
}
tarPath := filepath.Join(f.ArtifactsDir, escape(id)+".tar")
if f.tarHashes == nil {
f.tarHashes = make(map[string]string)
for {
sha, loaded := f.tarHashes.LoadOrStore(tarPath, processing)
if loaded {
select {
case <-f.Ctx.Done():
return Layer{}, f.Ctx.Err()
default:
shaString := sha.(string)
if shaString == processing {
// another goroutine is processing this layer, wait and try again
time.Sleep(500 * time.Millisecond)
continue
}

f.Logger.Debugf("Reusing tarball for layer %q with SHA: %s\n", id, shaString)
return Layer{
ID: id,
TarPath: tarPath,
Digest: shaString,
History: v1.History{CreatedBy: createdBy},
}, nil
}
}
break
}
if sha, ok := f.tarHashes[tarPath]; ok {
f.Logger.Debugf("Reusing tarball for layer %q with SHA: %s\n", id, sha)

select {
case <-f.Ctx.Done():
return Layer{}, f.Ctx.Err()
default:
lw, err := newFileLayerWriter(tarPath)
if err != nil {
return Layer{}, err
}
defer func() {
if closeErr := lw.Close(); err == nil {
err = closeErr
}
}()
tw := tarWriter(lw)
if err := addEntries(tw); err != nil {
return Layer{}, err
}

if err := tw.Close(); err != nil {
return Layer{}, err
}
digest := lw.Digest()
f.tarHashes.Store(tarPath, digest)
return Layer{
ID: id,
Digest: digest,
TarPath: tarPath,
Digest: sha,
History: v1.History{CreatedBy: createdBy},
}, nil
}
lw, err := newFileLayerWriter(tarPath)
if err != nil {
return Layer{}, err
}
defer func() {
if closeErr := lw.Close(); err == nil {
err = closeErr
}
}()
tw := tarWriter(lw)
if err := addEntries(tw); err != nil {
return Layer{}, err
}

if err := tw.Close(); err != nil {
return Layer{}, err
}, err
}
digest := lw.Digest()
f.tarHashes[tarPath] = digest
return Layer{
ID: id,
Digest: digest,
TarPath: tarPath,
History: v1.History{CreatedBy: createdBy},
}, err
}

func escape(id string) string {
Expand Down
3 changes: 3 additions & 0 deletions platform/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ const (

// EnvKanikoCacheTTL is the amount of time to persist layers cached by kaniko during the `extend` phase.
EnvKanikoCacheTTL = "CNB_KANIKO_CACHE_TTL"

// EnvParallelExport is a flag used to instruct the lifecycle to export of application image and cache image in parallel, if true.
EnvParallelExport = "CNB_PARALLEL_EXPORT"
)

// DefaultKanikoCacheTTL is the default kaniko cache TTL (2 weeks).
Expand Down
2 changes: 2 additions & 0 deletions platform/lifecycle_inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type LifecycleInputs struct {
GID int
ForceRebase bool
SkipLayers bool
ParallelExport bool
UseDaemon bool
UseLayout bool
AdditionalTags str.Slice // str.Slice satisfies the `Value` interface required by the `flag` package
Expand Down Expand Up @@ -131,6 +132,7 @@ func NewLifecycleInputs(platformAPI *api.Version) *LifecycleInputs {
KanikoDir: "/kaniko",
LaunchCacheDir: os.Getenv(EnvLaunchCacheDir),
SkipLayers: skipLayers,
ParallelExport: boolEnv(EnvParallelExport),

// Images used by the lifecycle during the build

Expand Down
12 changes: 12 additions & 0 deletions platform/resolve_create_inputs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ func testResolveCreateInputs(platformAPI string) func(t *testing.T, when spec.G,
h.SkipIf(t, api.MustParse(platformAPI).LessThan("0.12"), "")
})

when("parallel export is enabled and cache image ref is blank", func() {
it("warns", func() {
inputs.ParallelExport = true
inputs.CacheImageRef = ""
inputs.CacheDir = ""
err := platform.ResolveInputs(platform.Create, inputs, logger)
h.AssertNil(t, err)
expected := "Parallel export has been enabled, but it has not taken effect because no cache has been specified."
h.AssertLogEntry(t, logHandler, expected)
})
})

when("run image", func() {
when("not provided", func() {
it.Before(func() {
Expand Down
10 changes: 10 additions & 0 deletions platform/resolve_inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func ResolveInputs(phase LifecyclePhase, i *LifecycleInputs, logger log.Logger)
CheckLaunchCache,
ValidateImageRefs,
ValidateTargetsAreSameRegistry,
CheckParallelExport,
)
case Build:
// nop
Expand All @@ -47,6 +48,7 @@ func ResolveInputs(phase LifecyclePhase, i *LifecycleInputs, logger log.Logger)
CheckLaunchCache,
ValidateImageRefs,
ValidateTargetsAreSameRegistry,
CheckParallelExport,
)
case Detect:
// nop
Expand Down Expand Up @@ -226,6 +228,14 @@ func ValidateRebaseRunImage(i *LifecycleInputs, _ log.Logger) error {
}
}

// CheckParallelExport will warn when parallel export is enabled without a cache.
func CheckParallelExport(i *LifecycleInputs, logger log.Logger) error {
if i.ParallelExport && (i.CacheImageRef == "" && i.CacheDir == "") {
logger.Warn("Parallel export has been enabled, but it has not taken effect because no cache has been specified.")
}
return nil
}

// ValidateTargetsAreSameRegistry ensures all output images are on the same registry.
func ValidateTargetsAreSameRegistry(i *LifecycleInputs, _ log.Logger) error {
if i.UseDaemon {
Expand Down

0 comments on commit f8b8669

Please sign in to comment.