Skip to content

Commit

Permalink
Add cancellation to IScheduler
Browse files Browse the repository at this point in the history
At the same time, the synchronous Sleep method has been removed from
IScheduler, and implemented as an extension method (with a
cancellation token).

Fixes #84.
  • Loading branch information
jskeet committed Nov 25, 2016
1 parent 0d99c41 commit b3dad61
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 68 deletions.
4 changes: 2 additions & 2 deletions src/Google.Api.Gax.Grpc/ApiCallRetryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ internal static Func<TRequest, CallSettings, Task<TResponse>> WithRetry<TRequest
{
throw;
}
await scheduler.Delay(actualDelay);
await scheduler.Delay(actualDelay, callSettings.CancellationToken.GetValueOrDefault());
retryDelay = retrySettings.RetryBackoff.NextDelay(retryDelay);
callTimeout = retrySettings.TimeoutBackoff.NextDelay(callTimeout);
}
Expand Down Expand Up @@ -86,7 +86,7 @@ internal static Func<TRequest, CallSettings, TResponse> WithRetry<TRequest, TRes
{
throw;
}
scheduler.Sleep(actualDelay);
scheduler.Sleep(actualDelay, callSettings.CancellationToken.GetValueOrDefault());
retryDelay = retrySettings.RetryBackoff.NextDelay(retryDelay);
callTimeout = retrySettings.TimeoutBackoff.NextDelay(callTimeout);
}
Expand Down
14 changes: 2 additions & 12 deletions src/Google.Api.Gax.Grpc/ChannelPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,8 @@ public Task ShutdownChannelsAsync()
public Channel GetChannel(ServiceEndpoint endpoint)
{
GaxPreconditions.CheckNotNull(endpoint, nameof(endpoint));
try
{
var credentials = _lazyScopedDefaultChannelCredentials.Value.Result;
return GetChannel(endpoint, credentials);
}
catch (AggregateException e)
{
// Unwrap the first exception, a bit like await would.
// It's very unlikely that we'd ever see an AggregateException without an inner exceptions,
// but let's handle it relatively gracefully.
throw e.InnerExceptions.FirstOrDefault() ?? e;
}
var credentials = _lazyScopedDefaultChannelCredentials.Value.ResultWithUnwrappedExceptions();
return GetChannel(endpoint, credentials);
}

/// <summary>
Expand Down
21 changes: 18 additions & 3 deletions src/Google.Api.Gax/IScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Google.Api.Gax
Expand All @@ -25,13 +26,27 @@ public interface IScheduler
/// (A test implementation may capture the current context to enable reliable testing.)
/// </summary>
/// <param name="delay">Time to delay for. Must not be negative.</param>
/// <param name="cancellationToken">The cancellation token that will be checked prior to completing the returned task.</param>
/// <returns>A task which will complete after the given delay.</returns>
Task Delay(TimeSpan delay);
Task Delay(TimeSpan delay, CancellationToken cancellationToken);
}

/// <summary>
/// Extension methods for <see cref="IScheduler"/>.
/// </summary>
public static class SchedulerExtensions
{
/// <summary>
/// Synchronously sleeps for the given delay.
/// Simulates a synchronous delay by calling <see cref="IScheduler.Delay(TimeSpan, CancellationToken)"/> on
/// <paramref name="scheduler"/>, and unwrapping any exceptions generated (typically cancellation).
/// </summary>
/// <param name="scheduler">The scheduler to use for the sleep operation.</param>
/// <param name="delay">Time to sleep for. Must not be negative.</param>
void Sleep(TimeSpan delay);
/// <param name="cancellationToken">The cancellation token that will be watched during the sleep operation.</param>
/// <exception cref="OperationCanceledException">The cancellation token was cancelled during the sleep.</exception>
public static void Sleep(this IScheduler scheduler, TimeSpan delay, CancellationToken cancellationToken)
=> GaxPreconditions.CheckNotNull(scheduler, nameof(scheduler))
.Delay(delay, cancellationToken)
.WaitWithUnwrappedExceptions();
}
}
16 changes: 9 additions & 7 deletions src/Google.Api.Gax/Polling.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Google.Api.Gax
Expand All @@ -28,14 +29,16 @@ public static class Polling
/// <param name="clock">The clock to use for determining deadlines. Must not be null.</param>
/// <param name="scheduler">The scheduler to use for delaying between calls. Must not be null.</param>
/// <param name="pollSettings">The poll settings, controlling timeouts, call settings and delays.</param>
/// <param name="cancellationToken">The cancellation token used to cancel delays, if any.</param>
/// <returns>The completed response.</returns>
/// <exception cref="TimeoutException">The timeout specified in the poll settings expired.</exception>
public static TResponse PollRepeatedly<TResponse>(
Func<DateTime?, TResponse> pollAction,
Predicate<TResponse> completionPredicate,
IClock clock,
IScheduler scheduler,
PollSettings pollSettings)
PollSettings pollSettings,
CancellationToken cancellationToken)
{
GaxPreconditions.CheckNotNull(pollAction, nameof(pollAction));
GaxPreconditions.CheckNotNull(completionPredicate, nameof(completionPredicate));
Expand All @@ -56,7 +59,7 @@ public static TResponse PollRepeatedly<TResponse>(
// TODO: Could return null instead. Unclear what's better here.
throw new TimeoutException("Operation did not complete within the specified expiry time");
}
scheduler.Sleep(pollSettings.Delay);
scheduler.Sleep(pollSettings.Delay, cancellationToken);
}
}

Expand All @@ -73,13 +76,15 @@ public static TResponse PollRepeatedly<TResponse>(
/// <param name="clock">The clock to use for determining deadlines. Must not be null.</param>
/// <param name="scheduler">The scheduler to use for delaying between calls. Must not be null.</param>
/// <param name="pollSettings">The poll settings, controlling timeouts, call settings and delays.</param>
/// <param name="cancellationToken">The cancellation token used to cancel delays, if any.</param>
/// <returns>A task representing the asynchronous operation. The result of the task will be the completed response.</returns>
public static async Task<TResponse> PollRepeatedlyAsync<TResponse>(
Func<DateTime?, Task<TResponse>> pollAction,
Predicate<TResponse> completionPredicate,
IClock clock,
IScheduler scheduler,
PollSettings pollSettings)
PollSettings pollSettings,
CancellationToken cancellationToken)
{
GaxPreconditions.CheckNotNull(pollAction, nameof(pollAction));
GaxPreconditions.CheckNotNull(completionPredicate, nameof(completionPredicate));
Expand All @@ -100,10 +105,7 @@ public static async Task<TResponse> PollRepeatedlyAsync<TResponse>(
// TODO: Could return null instead. Unclear what's better here.
throw new TimeoutException("Operation did not complete within the specified expiry time");
}
// TODO: Use a cancellation token if we have one. Not clear where we'd get it from though,
// as it could be in the underlying call settings which we don't have access to. Maybe
// only use a cancellation token if it's specified in pollSettings.CallSettings?
await scheduler.Delay(pollSettings.Delay).ConfigureAwait(false);
await scheduler.Delay(pollSettings.Delay, cancellationToken).ConfigureAwait(false);
}
}
}
Expand Down
22 changes: 4 additions & 18 deletions src/Google.Api.Gax/ScopedCredentialProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,11 @@ public ScopedCredentialProvider(IEnumerable<string> scopes)
/// in which case the default application credentials will be used.</param>
/// <returns>A task representing the asynchronous operation. The result of the task
/// is the scoped credentials.</returns>
public GoogleCredential GetCredentials(GoogleCredential credentials)
{
if (credentials != null)
{
return ApplyScopes(credentials);
}
try
{
public GoogleCredential GetCredentials(GoogleCredential credentials) =>
credentials == null
// No need to apply scopes here - they're already applied.
return _lazyScopedDefaultCredentials.Value.Result;
}
catch (AggregateException e)
{
// Unwrap the first exception, a bit like await would.
// It's very unlikely that we'd ever see an AggregateException without an inner exceptions,
// but let's handle it relatively gracefully.
throw e.InnerExceptions.FirstOrDefault() ?? e;
}
}
? _lazyScopedDefaultCredentials.Value.ResultWithUnwrappedExceptions()
: ApplyScopes(credentials);

/// <summary>
/// Asynchronously returns credentials with the scopes applied if required.
Expand Down
17 changes: 10 additions & 7 deletions src/Google.Api.Gax/SystemScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
using System;
/*
* Copyright 2015 Google Inc. All Rights Reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file or at
* https://developers.google.com/open-source/licenses/bsd
*/

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Google.Api.Gax
{
/// <summary>
/// Singleton implementation of <see cref="IScheduler"/> which uses <see cref="Task.Delay(TimeSpan)"/>
/// and <see cref="Task.Run(Action)"/> internally.
/// Singleton implementation of <see cref="IScheduler"/> which uses <see cref="Task.Delay(TimeSpan, CancellationToken)"/>.
/// </summary>
public sealed class SystemScheduler : IScheduler
{
Expand All @@ -18,9 +24,6 @@ public sealed class SystemScheduler : IScheduler
private SystemScheduler() {}

/// <inheritdoc />
public Task Delay(TimeSpan timeSpan) => Task.Delay(timeSpan);

/// <inheritdoc />
public void Sleep(TimeSpan timeSpan) => Thread.Sleep(timeSpan);
public Task Delay(TimeSpan timeSpan, CancellationToken cancellationToken) => Task.Delay(timeSpan, cancellationToken);
}
}
54 changes: 54 additions & 0 deletions src/Google.Api.Gax/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2015 Google Inc. All Rights Reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file or at
* https://developers.google.com/open-source/licenses/bsd
*/

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Google.Api.Gax
{
/// <summary>
/// Extension methods for tasks.
/// </summary>
internal static class TaskExtensions
{
/// <summary>
/// Synchronously waits for the given task to complete, and returns the result.
/// Any <see cref="AggregateException"/> thrown is unwrapped to the first inner exception.
/// </summary>
/// <typeparam name="T">The result type of the task</typeparam>
/// <param name="task">The task to wait for.</param>
/// <returns>The result of the completed task.</returns>
internal static T ResultWithUnwrappedExceptions<T>(this Task<T> task)
{
task.WaitWithUnwrappedExceptions();
return task.Result;
}

/// <summary>
/// Synchronously waits for the given task to complete.
/// Any <see cref="AggregateException"/> thrown is unwrapped to the first inner exception.
/// </summary>
/// <param name="task">The task to wait for.</param>
internal static void WaitWithUnwrappedExceptions(this Task task)
{
try
{
task.Wait();
}
catch (AggregateException e)
{
// Unwrap the first exception, a bit like await would.
// It's very unlikely that we'd ever see an AggregateException without an inner exceptions,
// but let's handle it relatively gracefully.
throw e.InnerExceptions.FirstOrDefault() ?? e;
}

}
}
}
2 changes: 1 addition & 1 deletion test/Google.Api.Gax.Grpc.Tests/RetryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public SimpleResponse MethodSync(SimpleRequest request, CallSettings callSetting
{
CallTimes.Add(scheduler.Clock.GetCurrentDateTimeUtc());
CallSettingsReceived.Add(callSettings);
scheduler.Sleep(callDuration);
scheduler.Delay(callDuration).Wait();
if (failuresToReturn > 0)
{
failuresToReturn--;
Expand Down
19 changes: 17 additions & 2 deletions test/Google.Api.Gax.Tests/FakeSchedulerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using Google.Api.Gax.Testing;
using System;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

Expand All @@ -15,11 +16,25 @@ namespace Google.Api.Gax.Tests
public class FakeSchedulerTest
{
[Fact]
public void SynchronousSleep()
public async Task SimpleDelay()
{
var scheduler = new FakeScheduler();
scheduler.Run(() => scheduler.Sleep(TimeSpan.FromTicks(1000)));
await scheduler.RunAsync(() => scheduler.Delay(TimeSpan.FromTicks(1000)));
Assert.Equal(1000, scheduler.Clock.GetCurrentDateTimeUtc().Ticks);
}

[Fact]
public async Task Cancellation()
{
var scheduler = new FakeScheduler();
await scheduler.RunAsync(async () =>
{
var cts = new CancellationTokenSource();
scheduler.ScheduleCancellation(TimeSpan.FromTicks(500), cts);
var task = scheduler.Delay(TimeSpan.FromTicks(1000), cts.Token);
await Assert.ThrowsAsync<TaskCanceledException>(() => scheduler.Delay(TimeSpan.FromTicks(1000), cts.Token));
});
Assert.Equal(500, scheduler.Clock.GetCurrentDateTimeUtc().Ticks);
}
}
}
Loading

0 comments on commit b3dad61

Please sign in to comment.