Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export cache image and app image in parallel #1167

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions acceptance/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,11 @@ func testExporterFunc(platformAPI string) func(t *testing.T, when spec.G, it spe
)
h.AssertStringContains(t, output, "Saving "+exportedImageName)

// To detect whether the export of cacheImage and exportedImage is successful
h.Run(t, exec.Command("docker", "pull", exportedImageName))
assertImageOSAndArchAndCreatedAt(t, exportedImageName, exportTest, imgutil.NormalizedDateTime)

h.Run(t, exec.Command("docker", "pull", cacheImageName))
})

it("is created with empty layer", func() {
Expand Down
5 changes: 5 additions & 0 deletions cmd/lifecycle/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func FlagPlanPath(planPath *string) {
flagSet.StringVar(planPath, "plan", *planPath, "path to plan.toml")
}

// FlagParallelExport parses `parallel` flag
func FlagParallelExport(parallelExport *bool) {
flagSet.BoolVar(parallelExport, "parallel", *parallelExport, "export app image and cache image in parallel")
}

func FlagPlatformDir(platformDir *string) {
flagSet.StringVar(platformDir, "platform", *platformDir, "path to platform directory")
}
Expand Down
6 changes: 6 additions & 0 deletions cmd/lifecycle/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (c *createCmd) DefineFlags() {
cli.FlagLauncherPath(&c.LauncherPath)
cli.FlagLayersDir(&c.LayersDir)
cli.FlagOrderPath(&c.OrderPath)
cli.FlagParallelExport(&c.ParallelExport)
cli.FlagPlatformDir(&c.PlatformDir)
cli.FlagPreviousImage(&c.PreviousImageRef)
cli.FlagProcessType(&c.DefaultProcessType)
Expand Down Expand Up @@ -75,6 +76,11 @@ func (c *createCmd) Args(nargs int, args []string) error {
return err
}
}
if c.ParallelExport {
if c.CacheImageRef == "" {
cmd.DefaultLogger.Warn("parallel export has been enabled, but it has not taken effect because cache image (-cache-image) has not been specified.")
}
}
Comment on lines +79 to +83
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put this validation in the platform package? Somewhere in ResolveInputs? Our eventual aim is to move all such validations there. That would also have the nice side effect of printing the warning when cmd/lifecycle/exporter is invoked with this configuration.

return nil
}

Expand Down
63 changes: 43 additions & 20 deletions cmd/lifecycle/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"strconv"
"sync"
"time"

"github.com/BurntSushi/toml"
Expand Down Expand Up @@ -69,6 +70,7 @@ func (e *exportCmd) DefineFlags() {
cli.FlagLaunchCacheDir(&e.LaunchCacheDir)
cli.FlagLauncherPath(&e.LauncherPath)
cli.FlagLayersDir(&e.LayersDir)
cli.FlagParallelExport(&e.ParallelExport)
cli.FlagProcessType(&e.DefaultProcessType)
cli.FlagProjectMetadataPath(&e.ProjectMetadataPath)
cli.FlagReportPath(&e.ReportPath)
Expand Down Expand Up @@ -205,31 +207,52 @@ func (e *exportCmd) export(group buildpack.Group, cacheStore lifecycle.Cache, an
return err
}

report, err := exporter.Export(lifecycle.ExportOptions{
AdditionalNames: e.AdditionalTags,
AppDir: e.AppDir,
DefaultProcessType: e.DefaultProcessType,
ExtendedDir: e.ExtendedDir,
LauncherConfig: launcherConfig(e.LauncherPath, e.LauncherSBOMDir),
LayersDir: e.LayersDir,
OrigMetadata: analyzedMD.LayersMetadata,
Project: projectMD,
RunImageRef: runImageID,
RunImageForExport: runImageForExport,
WorkingImage: appImage,
})
if err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "export")
var exportWaitGroup sync.WaitGroup
var report files.Report
var appErr error
appErr = nil

exportWaitGroup.Add(1)
go func() {
defer exportWaitGroup.Done()
report, appErr = exporter.Export(lifecycle.ExportOptions{
AdditionalNames: e.AdditionalTags,
AppDir: e.AppDir,
DefaultProcessType: e.DefaultProcessType,
ExtendedDir: e.ExtendedDir,
LauncherConfig: launcherConfig(e.LauncherPath, e.LauncherSBOMDir),
LayersDir: e.LayersDir,
OrigMetadata: analyzedMD.LayersMetadata,
Project: projectMD,
RunImageRef: runImageID,
RunImageForExport: runImageForExport,
WorkingImage: appImage,
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move encoding.WriteTOML(e.ReportPath, &report) into this func as well? That would allow report.toml to be written before the cache has finished, which would allow platforms to use the presence of this file as a signal that the image is ready.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense @natalieparellano - working on this change. 1 qq -> when parallel export is enabled and when go routine to export app image fails, should we cancel go routine to export cache image or wait for it to complete, wdyt?

}()

// waiting here if parallel export is not enabled
if !e.ParallelExport {
exportWaitGroup.Wait()
}

exportWaitGroup.Add(1)
go func() {
defer exportWaitGroup.Done()
if cacheStore != nil {
natalieparellano marked this conversation as resolved.
Show resolved Hide resolved
if cacheErr := exporter.Cache(e.LayersDir, cacheStore); cacheErr != nil {
cmd.DefaultLogger.Warnf("Failed to export cache: %v\n", cacheErr)
}
}
}()

exportWaitGroup.Wait()
if appErr != nil {
return cmd.FailErrCode(appErr, e.CodeFor(platform.ExportError), "export")
}
if err = encoding.WriteTOML(e.ReportPath, &report); err != nil {
return cmd.FailErrCode(err, e.CodeFor(platform.ExportError), "write export report")
}

if cacheStore != nil {
if cacheErr := exporter.Cache(e.LayersDir, cacheStore); cacheErr != nil {
cmd.DefaultLogger.Warnf("Failed to export cache: %v\n", cacheErr)
}
}
return nil
}

Expand Down
16 changes: 8 additions & 8 deletions layers/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"

v1 "github.com/google/go-containerregistry/pkg/v1"

Expand All @@ -27,7 +28,8 @@ type Factory struct {
UID, GID int // UID and GID are used to normalize layer entries
Logger log.Logger

tarHashes map[string]string // tarHases Stores hashes of layer tarballs for reuse between the export and cache steps.
// tarHashes stores hashes of layer tarballs for reuse between the export and cache steps.
tarHashes sync.Map
}

type Layer struct {
Expand All @@ -39,15 +41,13 @@ type Layer struct {

func (f *Factory) writeLayer(id, createdBy string, addEntries func(tw *archive.NormalizingTarWriter) error) (layer Layer, err error) {
tarPath := filepath.Join(f.ArtifactsDir, escape(id)+".tar")
if f.tarHashes == nil {
f.tarHashes = make(map[string]string)
}
if sha, ok := f.tarHashes[tarPath]; ok {
f.Logger.Debugf("Reusing tarball for layer %q with SHA: %s\n", id, sha)
if sha, ok := f.tarHashes.Load(tarPath); ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking about it, and I think we could still have a race condition if this load returns !ok. We could end up processing the same tar path in parallel - exporter and cacher each reading all the bits in the layer before one of them stores the result. What about something like:

const processing = "processing"

func (f *Factory) writeLayer(id, createdBy string, addEntries func(tw *archive.NormalizingTarWriter) error) (layer Layer, err error) {
	tarPath := filepath.Join(f.ArtifactsDir, escape(id)+".tar")
	var (
		tries int
		sha any
		loaded bool
	)
	for {
		sha, loaded = f.tarHashes.LoadOrStore(tarPath, processing)
		if loaded {
			shaString := sha.(string)
			if shaString == processing {
				// another goroutine is processing this layer, wait and try again
				time.Sleep(time.Duration(tries) * 500 * time.Millisecond)
				tries++
				continue
			}
			f.Logger.Debugf("Reusing tarball for layer %q with SHA: %s\n", id, shaString)
			return Layer{
				ID:      id,
				TarPath: tarPath,
				Digest:  shaString,
				History: v1.History{CreatedBy: createdBy},
			}, nil
		}
		break
	}
	// function continues...

We could debate about the manner of the backoff but this seems preferable to potentially reading all the bits twice, especially for large layers which tend to take a lot of time anyway. @jabrown85 do you have any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jabrown85 wdyt?
I think - if not faster than processing the same layer again, backoff and retrying this way won't be at least slower. I will implement exponential backoff and retry. @natalieparellano @jabrown85 is that okay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, do you think "exponential" backoff can cause it to be slower in some case? Ideally we need parallelism to make it faster and at least "exponential" backoff might be counter-productive in worst case and I think fixed time delay would be better here - how about 500ms or even 1 sec. wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think starting with what @natalieparellano seems reasonable - we can always circle back and adjust any delay timings as we get feedback and real world cases.

shaString := sha.(string)
f.Logger.Debugf("Reusing tarball for layer %q with SHA: %s\n", id, shaString)
return Layer{
ID: id,
TarPath: tarPath,
Digest: sha,
Digest: shaString,
History: v1.History{CreatedBy: createdBy},
}, nil
}
Expand All @@ -69,7 +69,7 @@ func (f *Factory) writeLayer(id, createdBy string, addEntries func(tw *archive.N
return Layer{}, err
}
digest := lw.Digest()
f.tarHashes[tarPath] = digest
f.tarHashes.Store(tarPath, digest)
return Layer{
ID: id,
Digest: digest,
Expand Down
3 changes: 3 additions & 0 deletions platform/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ const (

// EnvKanikoCacheTTL is the amount of time to persist layers cached by kaniko during the `extend` phase.
EnvKanikoCacheTTL = "CNB_KANIKO_CACHE_TTL"

// EnvParallelExport is a flag used to instruct the lifecycle to export of application image and cache image in parallel, if true.
EnvParallelExport = "CNB_PARALLEL_EXPORT"
)

// DefaultKanikoCacheTTL is the default kaniko cache TTL (2 weeks).
Expand Down
3 changes: 3 additions & 0 deletions platform/lifecycle_inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type LifecycleInputs struct {
UID int
GID int
ForceRebase bool
ParallelExport bool
SkipLayers bool
UseDaemon bool
UseLayout bool
Expand Down Expand Up @@ -129,6 +130,7 @@ func NewLifecycleInputs(platformAPI *api.Version) *LifecycleInputs {
KanikoDir: "/kaniko",
LaunchCacheDir: os.Getenv(EnvLaunchCacheDir),
SkipLayers: skipLayers,
ParallelExport: boolEnv(EnvParallelExport),

// Images used by the lifecycle during the build

Expand All @@ -147,6 +149,7 @@ func NewLifecycleInputs(platformAPI *api.Version) *LifecycleInputs {
ProjectMetadataPath: envOrDefault(EnvProjectMetadataPath, filepath.Join(PlaceholderLayers, DefaultProjectMetadataFile)),

// Configuration options for rebasing

ForceRebase: boolEnv(EnvForceRebase),
}

Expand Down