-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add GetAync
method to IRequiredActor<TActor>
to resolve issues where actor is not available upon injection (i.e. BackgroundService
s)
#264
Changes from 4 commits
611b505
42391ba
4ff45b4
a616dbe
102a566
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
using System; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Akka.Actor; | ||
using Akka.Actor.Dsl; | ||
using FluentAssertions; | ||
using Microsoft.Extensions.DependencyInjection; | ||
using Microsoft.Extensions.Hosting; | ||
using Xunit; | ||
|
||
namespace Akka.Hosting.Tests; | ||
|
||
public class Bugfix208Specs : TestKit.TestKit | ||
{ | ||
private class MyTestActor : ReceiveActor | ||
{ | ||
public record SetData(string Data); | ||
|
||
public record GetData(); | ||
|
||
private string _data = string.Empty; | ||
|
||
public MyTestActor() | ||
{ | ||
Receive<SetData>(s => | ||
{ | ||
_data = s.Data; | ||
}); | ||
|
||
Receive<GetData>(g => | ||
{ | ||
Sender.Tell(_data); | ||
}); | ||
} | ||
} | ||
|
||
private class TestActorKey{} | ||
|
||
private class MyBackgroundService : BackgroundService | ||
{ | ||
private readonly IRequiredActor<TestActorKey> _testActor; | ||
|
||
public MyBackgroundService(IRequiredActor<TestActorKey> requiredActor) | ||
{ | ||
_testActor = requiredActor; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we tried to resolve the |
||
} | ||
|
||
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||
{ | ||
var myRef = await _testActor.GetAsync(stoppingToken); | ||
myRef.Tell("BackgroundService started"); | ||
} | ||
} | ||
|
||
protected override void ConfigureServices(HostBuilderContext context, IServiceCollection services) | ||
{ | ||
services.AddHostedService<MyBackgroundService>(); | ||
base.ConfigureServices(context, services); | ||
} | ||
|
||
protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) | ||
{ | ||
builder.WithActors((system, registry, arg3) => | ||
{ | ||
registry.Register<TestActorKey>(system.ActorOf(Props.Create(() => new MyTestActor()), "test-actor")); | ||
}); | ||
} | ||
|
||
/// <summary> | ||
/// Reproduction for https://github.com/akkadotnet/Akka.Hosting/issues/208 | ||
/// </summary> | ||
[Fact] | ||
public async Task ShouldStartHostedServiceThatDependsOnActor() | ||
{ | ||
// arrange | ||
var testActorRef = ActorRegistry.Get<TestActorKey>(); | ||
|
||
// act | ||
|
||
// assert | ||
|
||
// workaround for https://github.com/akkadotnet/Akka.Hosting/issues/265 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the reproduction for #265 |
||
var attempts = 5; | ||
do | ||
{ | ||
attempts--; | ||
try | ||
{ | ||
var r = await testActorRef.Ask<string>(new MyTestActor.GetData(), TimeSpan.FromMilliseconds(100)); | ||
r.Should().Be("BackgroundService started"); | ||
} | ||
catch (Exception e) | ||
{ | ||
attempts--; | ||
if (attempts == 0) | ||
{ | ||
throw; | ||
} | ||
} | ||
} while (attempts > 0); | ||
|
||
// await AwaitAssertAsync(async () => | ||
// { | ||
// var r = await testActorRef.Ask<string>(new MyTestActor.GetData(), TimeSpan.FromMilliseconds(100)); | ||
// r.Should().Be("BackgroundService started"); | ||
// }, RemainingOrDefault, TimeSpan.FromMilliseconds(150)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
using System.Threading.Tasks; | ||
using Akka.Actor; | ||
using Akka.Util; | ||
using Microsoft.Extensions.Hosting; | ||
|
||
namespace Akka.Hosting | ||
{ | ||
|
@@ -23,6 +24,28 @@ public interface IRequiredActor<TActor> | |
/// The underlying actor resolved via <see cref="ActorRegistry"/> using the given <see cref="TActor"/> key. | ||
/// </summary> | ||
IActorRef ActorRef { get; } | ||
|
||
/// <summary> | ||
/// When calling from inside another <see cref="IHostedService"/>, actor registrations may not be | ||
/// available at startup (due to Akka.NET itself being started asynchronously in another hosted service). | ||
/// | ||
/// Instead - you should call the GetAsync method to wait for that actor to be populated into the <see cref="ActorRegistry"/> | ||
/// by the AkkaService at startup. | ||
/// </summary> | ||
/// <param name="cancellationToken">Optional cancellation token.</param> | ||
/// <returns>A Task that will return the <see cref="IActorRef"/> using the given key.</returns> | ||
Task<IActorRef> GetAsync(CancellationToken cancellationToken = default); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the correct fix - offer a method on the |
||
} | ||
|
||
/// <summary> | ||
/// INTERNAL API | ||
/// </summary> | ||
internal static class RequiredActorDefaults | ||
{ | ||
/// <summary> | ||
/// Used to timeout sync-over-async operations used when retrieving actors from the registry. | ||
/// </summary> | ||
public static readonly TimeSpan ActorFetchTimeout = TimeSpan.FromSeconds(5); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -31,13 +54,53 @@ public interface IRequiredActor<TActor> | |
/// <typeparam name="TActor">The type key of the actor - corresponds to a matching entry inside the <see cref="IActorRegistry"/>.</typeparam> | ||
public sealed class RequiredActor<TActor> : IRequiredActor<TActor> | ||
{ | ||
private readonly IReadOnlyActorRegistry _registry; | ||
|
||
public RequiredActor(IReadOnlyActorRegistry registry) | ||
{ | ||
ActorRef = registry.Get<TActor>(); | ||
_registry = registry; | ||
} | ||
|
||
private IActorRef? _internalRef = null; | ||
|
||
/// <inheritdoc cref="IRequiredActor{TActor}.ActorRef"/> | ||
public IActorRef ActorRef { get; } | ||
public IActorRef ActorRef | ||
{ | ||
get | ||
{ | ||
// attempt 1 - used cached value | ||
if (_internalRef != null) | ||
return _internalRef; | ||
|
||
// attempt 2 - synchronously check the registry (fast path) | ||
if (_registry.TryGet<TActor>(out _internalRef)) | ||
{ | ||
return _internalRef; | ||
} | ||
|
||
|
||
throw new MissingActorRegistryEntryException( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Try our best to give the users a clear signal as to why this failed and what they should do instead. We can't really solve the problem for them and the normal, sync |
||
$"Unable to resolve actor type [{typeof(TActor)})] - if you're using IRequiredActor<T> inside the constructor" + | ||
$"of an IHostedService, consider using the GetAsync method instead so you can wait for the actor to be populated by the AkkaService (which runs in parallel.)"); | ||
} | ||
} | ||
|
||
/// <inheritdoc cref="IRequiredActor{TActor}.GetAsync"/> | ||
public async Task<IActorRef> GetAsync(CancellationToken cancellationToken = default) | ||
{ | ||
// attempt 1 - used cached value | ||
if (_internalRef != null) | ||
return _internalRef; | ||
|
||
// attempt 2 - synchronously check the registry (fast path) | ||
if (_registry.TryGet<TActor>(out _internalRef)) | ||
{ | ||
return _internalRef; | ||
} | ||
|
||
// attempt 3 - wait for the actor to be registered | ||
return await _registry.GetAsync<TActor>(cancellationToken).ConfigureAwait(false); | ||
} | ||
} | ||
|
||
/// <summary> | ||
|
@@ -176,8 +239,7 @@ public void Register<TKey>(IActorRef actor, bool overwrite = false) | |
/// <remarks> | ||
/// Have to store a collection of <see cref="WaitForActorRegistration"/>s here so each waiter gets its own cancellation token. | ||
/// </remarks> | ||
private readonly ConcurrentDictionary<Type, ImmutableHashSet<WaitForActorRegistration>> _actorWaiters = | ||
new ConcurrentDictionary<Type, ImmutableHashSet<WaitForActorRegistration>>(); | ||
private readonly ConcurrentDictionary<Type, ImmutableHashSet<WaitForActorRegistration>> _actorWaiters = new(); | ||
|
||
/// <summary> | ||
/// Attempts to register an actor with the registry. | ||
|
@@ -245,7 +307,7 @@ public bool TryGet(Type key, out IActorRef actor) | |
/// <inheritdoc cref="IReadOnlyActorRegistry.GetAsync{TKey}"/> | ||
public async Task<IActorRef> GetAsync<TKey>(CancellationToken ct = default) | ||
{ | ||
return await GetAsync(typeof(TKey), ct); | ||
return await GetAsync(typeof(TKey), ct).ConfigureAwait(false); | ||
} | ||
|
||
/// <inheritdoc cref="IReadOnlyActorRegistry.GetAsync"/> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed to add the Hosting.TestKit in order to test the
BackgroundService
issue with #208