Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remotetool: add command to upload a tree into the CAS #287

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions go/cmd/remotetool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
reexecuteAction OpType = "reexecute_action"
checkDeterminism OpType = "check_determinism"
uploadBlob OpType = "upload_blob"
uploadTree OpType = "upload_tree"
)

var supportedOps = []OpType{
Expand All @@ -53,14 +54,16 @@ var supportedOps = []OpType{
reexecuteAction,
checkDeterminism,
uploadBlob,
uploadTree,
}

var (
operation = flag.String("operation", "", fmt.Sprintf("Specifies the operation to perform. Supported values: %v", supportedOps))
digest = flag.String("digest", "", "Digest in <digest/size_bytes> format.")
pathPrefix = flag.String("path", "", "Path to which outputs should be downloaded to.")
inputRoot = flag.String("input_root", "", "For reexecute_action: if specified, override the action inputs with the specified input root.")
execAttempts = flag.Int("exec_attempts", 10, "For check_determinism: the number of times to remotely execute the action and check for mismatches.")
operation = flag.String("operation", "", fmt.Sprintf("Specifies the operation to perform. Supported values: %v", supportedOps))
digest = flag.String("digest", "", "Digest in <digest/size_bytes> format.")
pathPrefix = flag.String("path", "", "Path to which outputs should be downloaded to.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this documentation should be updated?

inputRoot = flag.String("input_root", "", "For reexecute_action: if specified, override the action inputs with the specified input root.")
execAttempts = flag.Int("exec_attempts", 10, "For check_determinism: the number of times to remotely execute the action and check for mismatches.")
uploadConcurrency = flag.Uint64("upload_concurrency", 1, "The number of concurrent uploads.")
)

func main() {
Expand Down Expand Up @@ -121,7 +124,12 @@ func main() {

case uploadBlob:
if err := c.UploadBlob(ctx, getPathFlag()); err != nil {
log.Exitf("error uploading blob for digest %v: %v", getDigestFlag(), err)
log.Exitf("error uploading blob from path '%v': %v", getPathFlag(), err)
}

case uploadTree:
if err := c.UploadTree(ctx, *uploadConcurrency, getPathFlag()); err != nil {
log.Exitf("error uploading tree from path '%v': %v", getPathFlag(), err)
}

default:
Expand Down
5 changes: 4 additions & 1 deletion go/pkg/tool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["tool.go"],
srcs = [
"parallel_uploader.go",
"tool.go",
],
importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/tool",
visibility = ["//visibility:public"],
deps = [
Expand Down
85 changes: 85 additions & 0 deletions go/pkg/tool/parallel_uploader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package tool

import (
"context"
"sync"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo"

rc "github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
log "github.com/golang/glog"
)

type parallelUploader struct {
queue chan *uploadinfo.Entry
closeFn func() error
}

// Enqueue adds a new file to the upload queue and returns any error occuring
// during enqueueing. It does NOT return error that happen during upload.
func (u *parallelUploader) Enqueue(path string) error {
dg, err := digest.NewFromFile(path)
if err != nil {
return err
}

log.Infof("Enqueued blob of '%v' from '%v' for uploading", dg, path)
u.queue <- uploadinfo.EntryFromFile(dg, path)

return nil
}

// CloseAndWait closes the uploader and waits for all uploads to finish.
func (u *parallelUploader) CloseAndWait() error {
return u.closeFn()
}

func newParallelUploader(ctx context.Context, grpcClient *rc.Client, concurrency uint64) *parallelUploader {
queue := make(chan *uploadinfo.Entry, 10*concurrency)
var wg sync.WaitGroup

for i := uint64(0); i < concurrency; i++ {
wg.Add(1)
go func(id uint64) {
defer wg.Done()

log.Infof("Starting uploader %v", id)
defer log.Infof("Stopping uploader %v", id)

for {
select {
case <-ctx.Done():
return

case ue, ok := <-queue:
if !ok {
return
}

if ue.IsFile() {
log.Infof("Uploader %v: Uploading blob of '%v' from '%v'", id, ue.Digest, ue.Path)
} else {
log.Infof("Uploader %v: Uploading blob of '%v'", id, ue.Digest)
}
if _, _, err := grpcClient.UploadIfMissing(ctx, ue); err != nil {
if ue.IsFile() {
log.Errorf("Uploader %v: Error uploading blob of '%v' from '%v': %v", id, ue.Digest, ue.Path, err)
} else {
log.Errorf("Uploader %v: Error uploading blob of '%v': %v", id, ue.Digest, err)
}
}
}
}
}(i)
}

return &parallelUploader{
queue: queue,
closeFn: func() error {
close(queue)
wg.Wait()
return nil
},
}
}
46 changes: 37 additions & 9 deletions go/pkg/tool/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/bazelbuild/remote-apis-sdks/go/pkg/filemetadata"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/outerr"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/rexec"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo"
"github.com/golang/protobuf/ptypes"

rc "github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
Expand Down Expand Up @@ -274,18 +273,47 @@ func (c *Client) DownloadBlob(ctx context.Context, blobDigest, path string) (str
}

// UploadBlob uploads a blob from the specified path into the remote cache.
func (c *Client) UploadBlob(ctx context.Context, path string) error {
dg, err := digest.NewFromFile(path)
if err != nil {
return err
func (c *Client) UploadBlob(ctx context.Context, paths ...string) error {
return c.UploadBlobs(ctx, 1, paths)
}

// UploadBlobs uploads blobs from the specified paths into the remote cache.
func (c *Client) UploadBlobs(ctx context.Context, concurrency uint64, paths []string) error {
uploader := newParallelUploader(ctx, c.GrpcClient, concurrency)
for _, path := range paths {
if err := uploader.Enqueue(path); err != nil {
return err
}
}
return uploader.CloseAndWait()
}

log.Infof("Uploading blob of %v from %v.", dg, path)
ue := uploadinfo.EntryFromFile(dg, path)
if _, _, err := c.GrpcClient.UploadIfMissing(ctx, ue); err != nil {
// UploadTree uploads a tree from the specified path into the remote cache.
func (c *Client) UploadTree(ctx context.Context, concurrency uint64, path string) error {
uploader := newParallelUploader(ctx, c.GrpcClient, concurrency)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why couldn't you use UploadIfMissing directly instead of making a parallel uploaded? UploadIfMissing should already has parallel support, controlled by CASConcurrency

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using this tool to upload very large trees with 10,000s files. Just iterating over all of them takes significant time, so it's more efficient to start uploading while iterating over the files in the tree.

I'll have a look if I can achieve this with the existing parallel uploading.


enqueueFn := func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

switch mode := info.Mode(); {
case mode.IsRegular():
if err := uploader.Enqueue(path); err != nil {
return err
}

default:
// TODO(yannic): Handle uploading directory entries and symlinks.
break
}
return nil
}
if err := filepath.Walk(path, enqueueFn); err != nil {
return err
}
return nil

return uploader.CloseAndWait()
}

// DownloadDirectory downloads a an input root from the remote cache into the specified path.
Expand Down
53 changes: 52 additions & 1 deletion go/pkg/tool/tool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tool
import (
"context"
"io/ioutil"
"os"
"path"
"path/filepath"
"testing"
Expand Down Expand Up @@ -244,11 +245,14 @@ func TestTool_DownloadBlob(t *testing.T) {
}

func TestTool_UploadBlob(t *testing.T) {
tempDir := path.Join(t.TempDir(), "TestTool_UploadBlob")
os.MkdirAll(tempDir, os.ModePerm)

e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
cas := e.Server.CAS

tmpFile := path.Join(t.TempDir(), "blob")
tmpFile := path.Join(tempDir, "blob")
if err := ioutil.WriteFile(tmpFile, []byte("Hello, World!"), 0777); err != nil {
t.Fatalf("Could not create temp blob: %v", err)
}
Expand Down Expand Up @@ -276,3 +280,50 @@ func TestTool_UploadBlob(t *testing.T) {
t.Fatalf("Expected 1 write for blob '%v', got %v", dg.String(), cas.BlobWrites(dg))
}
}

func TestTool_UploadTree(t *testing.T) {
tempDir := path.Join(t.TempDir(), "TestTool_UploadTree")
os.MkdirAll(tempDir, os.ModePerm)

e, cleanup := fakes.NewTestEnv(t)
defer cleanup()
cas := e.Server.CAS

files := []string{
"blob",
"foo/blob",
"foo/bar/blob",
"bar/blob",
}

for _, file := range files {
path := path.Join(tempDir, file)
os.MkdirAll(filepath.Dir(path), os.ModePerm)
if err := ioutil.WriteFile(path, []byte(file), 0777); err != nil {
t.Fatalf("Could not create temp blob '%v': %v", file, err)
}
}

tmpFile := path.Join(tempDir, "blob")
if err := ioutil.WriteFile(tmpFile, []byte("Hello, World!"), 0777); err != nil {
t.Fatalf("Could not create temp blob: %v", err)
}

toolClient := &Client{GrpcClient: e.Client.GrpcClient}
if err := toolClient.UploadTree(context.Background(), tempDir); err != nil {
t.Fatalf("UploadTree('%v') failed: %v", tmpFile, err)
}

// First request should upload the blob.
if cas.WriteReqs() != 4 {
t.Fatalf("Expected 4 writes, got %v", cas.WriteReqs())
}

// Retries should check whether the blob already exists and skip uploading if it does.
if err := toolClient.UploadTree(context.Background(), tempDir); err != nil {
t.Fatalf("UploadTree('%v') failed: %v", tmpFile, err)
}
if cas.WriteReqs() != 4 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are skipping the blob upload during second call to the same tempDir path, then shouldn't we have 0 write requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stats aren't reset, so they continue to be 4 from the initial upload. If the second call uploads anything we'd get > 4 here.

t.Fatalf("Expected 4 writes, got %v", cas.WriteReqs())
}
}