From 504223e222f235a64218c6b5e89325840382c274 Mon Sep 17 00:00:00 2001 From: Yannic Bonenberger Date: Mon, 1 Mar 2021 11:21:02 +0100 Subject: [PATCH] remotetool: add command to upload a tree into the CAS --- go/cmd/remotetool/main.go | 20 +++++--- go/pkg/tool/BUILD.bazel | 5 +- go/pkg/tool/parallel_uploader.go | 85 ++++++++++++++++++++++++++++++++ go/pkg/tool/tool.go | 46 +++++++++++++---- go/pkg/tool/tool_test.go | 53 +++++++++++++++++++- 5 files changed, 192 insertions(+), 17 deletions(-) create mode 100644 go/pkg/tool/parallel_uploader.go diff --git a/go/cmd/remotetool/main.go b/go/cmd/remotetool/main.go index 4f417f8ab..349b6886c 100644 --- a/go/cmd/remotetool/main.go +++ b/go/cmd/remotetool/main.go @@ -43,6 +43,7 @@ const ( reexecuteAction OpType = "reexecute_action" checkDeterminism OpType = "check_determinism" uploadBlob OpType = "upload_blob" + uploadTree OpType = "upload_tree" ) var supportedOps = []OpType{ @@ -53,14 +54,16 @@ var supportedOps = []OpType{ reexecuteAction, checkDeterminism, uploadBlob, + uploadTree, } var ( - operation = flag.String("operation", "", fmt.Sprintf("Specifies the operation to perform. Supported values: %v", supportedOps)) - digest = flag.String("digest", "", "Digest in format.") - pathPrefix = flag.String("path", "", "Path to which outputs should be downloaded to.") - inputRoot = flag.String("input_root", "", "For reexecute_action: if specified, override the action inputs with the specified input root.") - execAttempts = flag.Int("exec_attempts", 10, "For check_determinism: the number of times to remotely execute the action and check for mismatches.") + operation = flag.String("operation", "", fmt.Sprintf("Specifies the operation to perform. Supported values: %v", supportedOps)) + digest = flag.String("digest", "", "Digest in format.") + pathPrefix = flag.String("path", "", "Path to which outputs should be downloaded to.") + inputRoot = flag.String("input_root", "", "For reexecute_action: if specified, override the action inputs with the specified input root.") + execAttempts = flag.Int("exec_attempts", 10, "For check_determinism: the number of times to remotely execute the action and check for mismatches.") + uploadConcurrency = flag.Uint64("upload_concurrency", 1, "The number of concurrent uploads.") ) func main() { @@ -121,7 +124,12 @@ func main() { case uploadBlob: if err := c.UploadBlob(ctx, getPathFlag()); err != nil { - log.Exitf("error uploading blob for digest %v: %v", getDigestFlag(), err) + log.Exitf("error uploading blob from path '%v': %v", getPathFlag(), err) + } + + case uploadTree: + if err := c.UploadTree(ctx, *uploadConcurrency, getPathFlag()); err != nil { + log.Exitf("error uploading tree from path '%v': %v", getPathFlag(), err) } default: diff --git a/go/pkg/tool/BUILD.bazel b/go/pkg/tool/BUILD.bazel index d36c7b913..edda93973 100644 --- a/go/pkg/tool/BUILD.bazel +++ b/go/pkg/tool/BUILD.bazel @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["tool.go"], + srcs = [ + "parallel_uploader.go", + "tool.go", + ], importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/tool", visibility = ["//visibility:public"], deps = [ diff --git a/go/pkg/tool/parallel_uploader.go b/go/pkg/tool/parallel_uploader.go new file mode 100644 index 000000000..6e0458ad1 --- /dev/null +++ b/go/pkg/tool/parallel_uploader.go @@ -0,0 +1,85 @@ +package tool + +import ( + "context" + "sync" + + "github.com/bazelbuild/remote-apis-sdks/go/pkg/digest" + "github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo" + + rc "github.com/bazelbuild/remote-apis-sdks/go/pkg/client" + log "github.com/golang/glog" +) + +type parallelUploader struct { + queue chan *uploadinfo.Entry + closeFn func() error +} + +// Enqueue adds a new file to the upload queue and returns any error occuring +// during enqueueing. It does NOT return error that happen during upload. +func (u *parallelUploader) Enqueue(path string) error { + dg, err := digest.NewFromFile(path) + if err != nil { + return err + } + + log.Infof("Enqueued blob of '%v' from '%v' for uploading", dg, path) + u.queue <- uploadinfo.EntryFromFile(dg, path) + + return nil +} + +// CloseAndWait closes the uploader and waits for all uploads to finish. +func (u *parallelUploader) CloseAndWait() error { + return u.closeFn() +} + +func newParallelUploader(ctx context.Context, grpcClient *rc.Client, concurrency uint64) *parallelUploader { + queue := make(chan *uploadinfo.Entry, 10*concurrency) + var wg sync.WaitGroup + + for i := uint64(0); i < concurrency; i++ { + wg.Add(1) + go func(id uint64) { + defer wg.Done() + + log.Infof("Starting uploader %v", id) + defer log.Infof("Stopping uploader %v", id) + + for { + select { + case <-ctx.Done(): + return + + case ue, ok := <-queue: + if !ok { + return + } + + if ue.IsFile() { + log.Infof("Uploader %v: Uploading blob of '%v' from '%v'", id, ue.Digest, ue.Path) + } else { + log.Infof("Uploader %v: Uploading blob of '%v'", id, ue.Digest) + } + if _, _, err := grpcClient.UploadIfMissing(ctx, ue); err != nil { + if ue.IsFile() { + log.Errorf("Uploader %v: Error uploading blob of '%v' from '%v': %v", id, ue.Digest, ue.Path, err) + } else { + log.Errorf("Uploader %v: Error uploading blob of '%v': %v", id, ue.Digest, err) + } + } + } + } + }(i) + } + + return ¶llelUploader{ + queue: queue, + closeFn: func() error { + close(queue) + wg.Wait() + return nil + }, + } +} diff --git a/go/pkg/tool/tool.go b/go/pkg/tool/tool.go index c2499ad06..e87911b79 100644 --- a/go/pkg/tool/tool.go +++ b/go/pkg/tool/tool.go @@ -18,7 +18,6 @@ import ( "github.com/bazelbuild/remote-apis-sdks/go/pkg/filemetadata" "github.com/bazelbuild/remote-apis-sdks/go/pkg/outerr" "github.com/bazelbuild/remote-apis-sdks/go/pkg/rexec" - "github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo" "github.com/golang/protobuf/ptypes" rc "github.com/bazelbuild/remote-apis-sdks/go/pkg/client" @@ -274,18 +273,47 @@ func (c *Client) DownloadBlob(ctx context.Context, blobDigest, path string) (str } // UploadBlob uploads a blob from the specified path into the remote cache. -func (c *Client) UploadBlob(ctx context.Context, path string) error { - dg, err := digest.NewFromFile(path) - if err != nil { - return err +func (c *Client) UploadBlob(ctx context.Context, paths ...string) error { + return c.UploadBlobs(ctx, 1, paths) +} + +// UploadBlobs uploads blobs from the specified paths into the remote cache. +func (c *Client) UploadBlobs(ctx context.Context, concurrency uint64, paths []string) error { + uploader := newParallelUploader(ctx, c.GrpcClient, concurrency) + for _, path := range paths { + if err := uploader.Enqueue(path); err != nil { + return err + } } + return uploader.CloseAndWait() +} - log.Infof("Uploading blob of %v from %v.", dg, path) - ue := uploadinfo.EntryFromFile(dg, path) - if _, _, err := c.GrpcClient.UploadIfMissing(ctx, ue); err != nil { +// UploadTree uploads a tree from the specified path into the remote cache. +func (c *Client) UploadTree(ctx context.Context, concurrency uint64, path string) error { + uploader := newParallelUploader(ctx, c.GrpcClient, concurrency) + + enqueueFn := func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + switch mode := info.Mode(); { + case mode.IsRegular(): + if err := uploader.Enqueue(path); err != nil { + return err + } + + default: + // TODO(yannic): Handle uploading directory entries and symlinks. + break + } + return nil + } + if err := filepath.Walk(path, enqueueFn); err != nil { return err } - return nil + + return uploader.CloseAndWait() } // DownloadDirectory downloads a an input root from the remote cache into the specified path. diff --git a/go/pkg/tool/tool_test.go b/go/pkg/tool/tool_test.go index 57d8994c6..0cd1a0c98 100644 --- a/go/pkg/tool/tool_test.go +++ b/go/pkg/tool/tool_test.go @@ -3,6 +3,7 @@ package tool import ( "context" "io/ioutil" + "os" "path" "path/filepath" "testing" @@ -244,11 +245,14 @@ func TestTool_DownloadBlob(t *testing.T) { } func TestTool_UploadBlob(t *testing.T) { + tempDir := path.Join(t.TempDir(), "TestTool_UploadBlob") + os.MkdirAll(tempDir, os.ModePerm) + e, cleanup := fakes.NewTestEnv(t) defer cleanup() cas := e.Server.CAS - tmpFile := path.Join(t.TempDir(), "blob") + tmpFile := path.Join(tempDir, "blob") if err := ioutil.WriteFile(tmpFile, []byte("Hello, World!"), 0777); err != nil { t.Fatalf("Could not create temp blob: %v", err) } @@ -276,3 +280,50 @@ func TestTool_UploadBlob(t *testing.T) { t.Fatalf("Expected 1 write for blob '%v', got %v", dg.String(), cas.BlobWrites(dg)) } } + +func TestTool_UploadTree(t *testing.T) { + tempDir := path.Join(t.TempDir(), "TestTool_UploadTree") + os.MkdirAll(tempDir, os.ModePerm) + + e, cleanup := fakes.NewTestEnv(t) + defer cleanup() + cas := e.Server.CAS + + files := []string{ + "blob", + "foo/blob", + "foo/bar/blob", + "bar/blob", + } + + for _, file := range files { + path := path.Join(tempDir, file) + os.MkdirAll(filepath.Dir(path), os.ModePerm) + if err := ioutil.WriteFile(path, []byte(file), 0777); err != nil { + t.Fatalf("Could not create temp blob '%v': %v", file, err) + } + } + + tmpFile := path.Join(tempDir, "blob") + if err := ioutil.WriteFile(tmpFile, []byte("Hello, World!"), 0777); err != nil { + t.Fatalf("Could not create temp blob: %v", err) + } + + toolClient := &Client{GrpcClient: e.Client.GrpcClient} + if err := toolClient.UploadTree(context.Background(), tempDir); err != nil { + t.Fatalf("UploadTree('%v') failed: %v", tmpFile, err) + } + + // First request should upload the blob. + if cas.WriteReqs() != 4 { + t.Fatalf("Expected 4 writes, got %v", cas.WriteReqs()) + } + + // Retries should check whether the blob already exists and skip uploading if it does. + if err := toolClient.UploadTree(context.Background(), tempDir); err != nil { + t.Fatalf("UploadTree('%v') failed: %v", tmpFile, err) + } + if cas.WriteReqs() != 4 { + t.Fatalf("Expected 4 writes, got %v", cas.WriteReqs()) + } +}