Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add testing functionality for WaitExecution #393

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ type Client struct {
casDownloadRequests chan *downloadRequest
rpcTimeouts RPCTimeouts
creds credentials.PerRPCCredentials
// ForceEarlyWaitCalls specifies whether ExecuteAndWait should call WaitExecution immedately
// after receiving an Operation from Execute. It is primarily useful for testing
// WaitExecution.
ForceEarlyWaitCalls bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not inheretly opposed to this... But I think the most appropriate place for this behavior would be on the fake server: https://github.com/bazelbuild/remote-apis-sdks/blob/af1232ee0d79920ab8390fb855f95dca18a69cf0/go/pkg/fakes/exec.go. Wdyt?

}

const (
Expand Down
21 changes: 5 additions & 16 deletions go/pkg/client/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,6 @@ func buildCommand(ac *Action) *repb.Command {
// 1) If an error occurs before the first operation is returned, or after the final operation is
// returned (i.e. the one with op.Done==true), retry by calling Execute again.
// 2) Otherwise, retry by calling WaitExecution with the last operation name.
// In addition, we want the retrier to trigger based on certain operation statuses as well as on
// explicit errors. (The shouldRetry function knows which statuses.) We do this by mapping statuses,
// if present, to errors inside the closure and then throwing away such "fake" errors outside the
// closure (if we ran out of retries or if there was never a retrier enabled). The exception is
// deadline-exceeded statuses, which we never give to the retrier (and hence will always propagate
// directly to the caller).
func (c *Client) ExecuteAndWait(ctx context.Context, req *repb.ExecuteRequest) (op *oppb.Operation, err error) {
return c.ExecuteAndWaitProgress(ctx, req, nil)
}
Expand All @@ -233,8 +227,7 @@ func (c *Client) ExecuteAndWait(ctx context.Context, req *repb.ExecuteRequest) (
// The supplied callback function is called for each message received to update the state of
// the remote action.
func (c *Client) ExecuteAndWaitProgress(ctx context.Context, req *repb.ExecuteRequest, progress func(metadata *repb.ExecuteOperationMetadata)) (op *oppb.Operation, err error) {
wait := false // Should we retry by calling WaitExecution instead of Execute?
opError := false // Are we propagating an Operation status as an error for the retrier's benefit?
wait := false // Should we retry by calling WaitExecution instead of Execute?
lastOp := &oppb.Operation{}
closure := func(ctx context.Context) (e error) {
var res regrpc.Execution_ExecuteClient
Expand All @@ -254,6 +247,7 @@ func (c *Client) ExecuteAndWaitProgress(ctx context.Context, req *repb.ExecuteRe
if e != nil {
return e
}
returnEarly := !wait && c.ForceEarlyWaitCalls && !op.Done
wait = !op.Done
lastOp = op
if progress != nil {
Expand All @@ -262,19 +256,14 @@ func (c *Client) ExecuteAndWaitProgress(ctx context.Context, req *repb.ExecuteRe
progress(metadata)
}
}
}
st := OperationStatus(lastOp)
if st != nil {
opError = true
if st.Code() == codes.DeadlineExceeded {
return nil
if returnEarly {
return status.Error(codes.Internal, "fake error to for wait call")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "(...) error to for wait (...)", shouldn't it be, "(...) error to trigger the wait (...)"

}
return st.Err()
}
return nil
}
err = c.Retrier.Do(ctx, func() error { return c.CallWithTimeout(ctx, "Execute", closure) })
if err != nil && !opError {
if err != nil {
if st, ok := status.FromError(err); ok {
err = StatusDetailedError(st)
}
Expand Down
40 changes: 38 additions & 2 deletions go/pkg/client/retries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type flakyServer struct {
retriableForever bool // Set to true to make the flaky server return a retriable error forever, rather than eventually a non-retriable error.
sleepDelay time.Duration // How long to sleep on each RPC.
useBSCompression bool // Whether to use/expect compression on ByteStream calls.
executeNotFlakey bool // Whether calls to Execute should return flakey errors.
}

func (f *flakyServer) incNumCalls(method string) int {
Expand Down Expand Up @@ -171,10 +172,17 @@ func (f *flakyServer) GetTree(req *repb.GetTreeRequest, stream regrpc.ContentAdd

func (f *flakyServer) Execute(req *repb.ExecuteRequest, stream regrpc.Execution_ExecuteServer) error {
numCalls := f.incNumCalls("Execute")
if numCalls < 2 {
if !f.executeNotFlakey && numCalls < 2 {
return status.Error(codes.Canceled, "transient error!")
}
stream.Send(&oppb.Operation{Done: false, Name: "dummy"})
for {
stream.Send(&oppb.Operation{Done: false, Name: "dummy"})
// Flakey execution returns a single unfinished operation, then cancels the stream.
// Non-flakey execution just returns a stream of unfinished operations.
if !f.executeNotFlakey {
break
}
}
// After this error, retries should to go the WaitExecution method.
return status.Error(codes.Internal, "another transient error!")
}
Expand Down Expand Up @@ -423,6 +431,34 @@ func TestExecuteAndWaitRetries(t *testing.T) {
}
}

func TestExecuteAndWaitEarlyRetries(t *testing.T) {
t.Parallel()
f := setup(t)
f.fake.executeNotFlakey = true
f.client.ForceEarlyWaitCalls = true
defer f.shutDown()

op, err := f.client.ExecuteAndWait(f.ctx, &repb.ExecuteRequest{})
if err != nil {
t.Fatalf("client.WaitExecution(ctx, {}) = %v", err)
}
st := client.OperationStatus(op)
if st == nil {
t.Errorf("client.WaitExecution(ctx, {}) returned no status, expected Aborted")
}
if st != nil && st.Code() != codes.Aborted {
t.Errorf("client.WaitExecution(ctx, {}) returned unexpected status code %s", st.Code())
}
// 1 separate transient Execute errors.
if f.fake.numCalls["Execute"] != 1 {
t.Errorf("Expected 1 Execute calls, got %v", f.fake.numCalls["Execute"])
}
// 3 separate transient WaitExecution errors + the final successful call.
if f.fake.numCalls["WaitExecution"] != 4 {
t.Errorf("Expected 4 WaitExecution calls, got %v", f.fake.numCalls["WaitExecution"])
}
}

func TestNonStreamingRpcRetries(t *testing.T) {
t.Parallel()
testcases := []struct {
Expand Down