From 611b505c5de00608ee710659c91b17edcc8a8e27 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 15 Mar 2023 11:21:55 -0500 Subject: [PATCH 1/4] Added reproduction for issue #208 --- .../Akka.Hosting.Tests.csproj | 1 + src/Akka.Hosting.Tests/Bugfix208Specs.cs | 87 +++++++++++++++++++ .../Logging/LogMessageFormatterSpec.cs | 2 +- .../Logging/LoggerConfigEnd2EndSpecs.cs | 2 +- 4 files changed, 90 insertions(+), 2 deletions(-) create mode 100644 src/Akka.Hosting.Tests/Bugfix208Specs.cs diff --git a/src/Akka.Hosting.Tests/Akka.Hosting.Tests.csproj b/src/Akka.Hosting.Tests/Akka.Hosting.Tests.csproj index 288bf80f..68c54c8c 100644 --- a/src/Akka.Hosting.Tests/Akka.Hosting.Tests.csproj +++ b/src/Akka.Hosting.Tests/Akka.Hosting.Tests.csproj @@ -17,6 +17,7 @@ + diff --git a/src/Akka.Hosting.Tests/Bugfix208Specs.cs b/src/Akka.Hosting.Tests/Bugfix208Specs.cs new file mode 100644 index 00000000..681a7e58 --- /dev/null +++ b/src/Akka.Hosting.Tests/Bugfix208Specs.cs @@ -0,0 +1,87 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Actor.Dsl; +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Xunit; + +namespace Akka.Hosting.Tests; + +public class Bugfix208Specs : TestKit.TestKit +{ + private class MyTestActor : ReceiveActor + { + public record SetData(string Data); + + public record GetData(); + + private string _data = string.Empty; + + public MyTestActor() + { + Receive(s => + { + _data = s.Data; + }); + + Receive(g => + { + Sender.Tell(_data); + }); + } + } + + private class TestActorKey{} + + private class MyBackgroundService : BackgroundService + { + private readonly IActorRef _testActor; + + public MyBackgroundService(IRequiredActor requiredActor) + { + _testActor = requiredActor.ActorRef; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + _testActor.Tell("BackgroundService started"); + return Task.CompletedTask; + } + } + + protected override void ConfigureServices(HostBuilderContext context, IServiceCollection services) + { + services.AddHostedService(); + base.ConfigureServices(context, services); + } + + protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) + { + builder.WithActors((system, registry, arg3) => + { + registry.Register(system.ActorOf(Props.Create(() => new MyTestActor()), "test-actor")); + }); + } + + /// + /// Reproduction for https://github.com/akkadotnet/Akka.Hosting/issues/208 + /// + [Fact] + public async Task ShouldStartHostedServiceThatDependsOnActor() + { + // arrange + var testActorRef = ActorRegistry.Get(); + + // act + + // assert + await AwaitAssertAsync(async () => + { + var r = await testActorRef.Ask(new MyTestActor.GetData(), TimeSpan.FromMilliseconds(100)); + r.Should().Be("BackgroundService started"); + }); + } +} \ No newline at end of file diff --git a/src/Akka.Hosting.Tests/Logging/LogMessageFormatterSpec.cs b/src/Akka.Hosting.Tests/Logging/LogMessageFormatterSpec.cs index 804a1940..18b760f2 100644 --- a/src/Akka.Hosting.Tests/Logging/LogMessageFormatterSpec.cs +++ b/src/Akka.Hosting.Tests/Logging/LogMessageFormatterSpec.cs @@ -40,7 +40,7 @@ public async Task TransformMessagesTest() try { var sys = host.Services.GetRequiredService(); - var testKit = new TestKit.Xunit2.TestKit(sys); + var testKit = new Akka.TestKit.Xunit2.TestKit(sys); var probe = testKit.CreateTestProbe(); sys.EventStream.Subscribe(probe, typeof(Error)); diff --git a/src/Akka.Hosting.Tests/Logging/LoggerConfigEnd2EndSpecs.cs b/src/Akka.Hosting.Tests/Logging/LoggerConfigEnd2EndSpecs.cs index 61729e2d..d290629a 100644 --- a/src/Akka.Hosting.Tests/Logging/LoggerConfigEnd2EndSpecs.cs +++ b/src/Akka.Hosting.Tests/Logging/LoggerConfigEnd2EndSpecs.cs @@ -13,7 +13,7 @@ namespace Akka.Hosting.Tests.Logging; -public class LoggerConfigEnd2EndSpecs : TestKit.Xunit2.TestKit +public class LoggerConfigEnd2EndSpecs : Akka.TestKit.Xunit2.TestKit { private class CustomLoggingProvider : ILoggerProvider { From 4ff45b42497dc7035be8767dfe8dfb0299e68d8d Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 15 Mar 2023 11:42:06 -0500 Subject: [PATCH 2/4] attempted fix for #208 --- src/Akka.Hosting/ActorRegistry.cs | 50 ++++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/src/Akka.Hosting/ActorRegistry.cs b/src/Akka.Hosting/ActorRegistry.cs index cf991e6f..389f7fbc 100644 --- a/src/Akka.Hosting/ActorRegistry.cs +++ b/src/Akka.Hosting/ActorRegistry.cs @@ -25,19 +25,62 @@ public interface IRequiredActor IActorRef ActorRef { get; } } + /// + /// INTERNAL API + /// + internal static class RequiredActorDefaults + { + /// + /// Used to timeout sync-over-async operations used when retrieving actors from the registry. + /// + public static readonly TimeSpan ActorFetchTimeout = TimeSpan.FromSeconds(5); + } + /// /// INTERNAL API /// /// The type key of the actor - corresponds to a matching entry inside the . public sealed class RequiredActor : IRequiredActor { + private readonly IReadOnlyActorRegistry _registry; + public RequiredActor(IReadOnlyActorRegistry registry) { - ActorRef = registry.Get(); + _registry = registry; } + private IActorRef? _internalRef = null; + /// - public IActorRef ActorRef { get; } + public IActorRef ActorRef + { + get + { + // attempt 1 - used cached value + if (_internalRef != null) + return _internalRef; + + // attempt 2 - synchronously check the registry (fast path) + if (_registry.TryGet(out _internalRef)) + { + return _internalRef; + } + + // attempt 3 - last resort - sync-over-async wait for someone + // to populate entry into registry. This is the fix for https://github.com/akkadotnet/Akka.Hosting/issues/208 + try + { + using var cts = new CancellationTokenSource(RequiredActorDefaults.ActorFetchTimeout); + var fetchTask = _registry.GetAsync(cts.Token); + _internalRef = fetchTask.Result; // sync, wait for result + return _internalRef; + } + catch (Exception ex) + { + throw new MissingActorRegistryEntryException($"Unable to resolve actor type [{typeof(TActor)})] from the registry within [{RequiredActorDefaults.ActorFetchTimeout}]", ex); + } + } + } } /// @@ -176,8 +219,7 @@ public void Register(IActorRef actor, bool overwrite = false) /// /// Have to store a collection of s here so each waiter gets its own cancellation token. /// - private readonly ConcurrentDictionary> _actorWaiters = - new ConcurrentDictionary>(); + private readonly ConcurrentDictionary> _actorWaiters = new(); /// /// Attempts to register an actor with the registry. From a616dbe86818a4b0441fb4522a965fe38bb35349 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 15 Mar 2023 12:16:38 -0500 Subject: [PATCH 3/4] fixed 208 --- src/Akka.Hosting.Tests/Bugfix208Specs.cs | 39 ++++++++++++++---- src/Akka.Hosting/ActorRegistry.cs | 52 ++++++++++++++++-------- 2 files changed, 66 insertions(+), 25 deletions(-) diff --git a/src/Akka.Hosting.Tests/Bugfix208Specs.cs b/src/Akka.Hosting.Tests/Bugfix208Specs.cs index 681a7e58..7674ed6b 100644 --- a/src/Akka.Hosting.Tests/Bugfix208Specs.cs +++ b/src/Akka.Hosting.Tests/Bugfix208Specs.cs @@ -38,17 +38,17 @@ private class TestActorKey{} private class MyBackgroundService : BackgroundService { - private readonly IActorRef _testActor; + private readonly IRequiredActor _testActor; public MyBackgroundService(IRequiredActor requiredActor) { - _testActor = requiredActor.ActorRef; + _testActor = requiredActor; } - protected override Task ExecuteAsync(CancellationToken stoppingToken) + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - _testActor.Tell("BackgroundService started"); - return Task.CompletedTask; + var myRef = await _testActor.GetAsync(stoppingToken); + myRef.Tell("BackgroundService started"); } } @@ -78,10 +78,31 @@ public async Task ShouldStartHostedServiceThatDependsOnActor() // act // assert - await AwaitAssertAsync(async () => + + // workaround for https://github.com/akkadotnet/Akka.Hosting/issues/265 + var attempts = 5; + do { - var r = await testActorRef.Ask(new MyTestActor.GetData(), TimeSpan.FromMilliseconds(100)); - r.Should().Be("BackgroundService started"); - }); + attempts--; + try + { + var r = await testActorRef.Ask(new MyTestActor.GetData(), TimeSpan.FromMilliseconds(100)); + r.Should().Be("BackgroundService started"); + } + catch (Exception e) + { + attempts--; + if (attempts == 0) + { + throw; + } + } + } while (attempts > 0); + + // await AwaitAssertAsync(async () => + // { + // var r = await testActorRef.Ask(new MyTestActor.GetData(), TimeSpan.FromMilliseconds(100)); + // r.Should().Be("BackgroundService started"); + // }, RemainingOrDefault, TimeSpan.FromMilliseconds(150)); } } \ No newline at end of file diff --git a/src/Akka.Hosting/ActorRegistry.cs b/src/Akka.Hosting/ActorRegistry.cs index 389f7fbc..192c3df1 100644 --- a/src/Akka.Hosting/ActorRegistry.cs +++ b/src/Akka.Hosting/ActorRegistry.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.Util; +using Microsoft.Extensions.Hosting; namespace Akka.Hosting { @@ -23,6 +24,17 @@ public interface IRequiredActor /// The underlying actor resolved via using the given key. /// IActorRef ActorRef { get; } + + /// + /// When calling from inside another , actor registrations may not be + /// available at startup (due to Akka.NET itself being started asynchronously in another hosted service). + /// + /// Instead - you should call the GetAsync method to wait for that actor to be populated into the + /// by the AkkaService at startup. + /// + /// Optional cancellation token. + /// A Task that will return the using the given key. + Task GetAsync(CancellationToken cancellationToken = default); } /// @@ -59,27 +71,35 @@ public IActorRef ActorRef // attempt 1 - used cached value if (_internalRef != null) return _internalRef; - + // attempt 2 - synchronously check the registry (fast path) if (_registry.TryGet(out _internalRef)) { return _internalRef; } - - // attempt 3 - last resort - sync-over-async wait for someone - // to populate entry into registry. This is the fix for https://github.com/akkadotnet/Akka.Hosting/issues/208 - try - { - using var cts = new CancellationTokenSource(RequiredActorDefaults.ActorFetchTimeout); - var fetchTask = _registry.GetAsync(cts.Token); - _internalRef = fetchTask.Result; // sync, wait for result - return _internalRef; - } - catch (Exception ex) - { - throw new MissingActorRegistryEntryException($"Unable to resolve actor type [{typeof(TActor)})] from the registry within [{RequiredActorDefaults.ActorFetchTimeout}]", ex); - } + + + throw new MissingActorRegistryEntryException( + $"Unable to resolve actor type [{typeof(TActor)})] - if you're using IRequiredActor inside the constructor" + + $"of an IHostedService, consider using the GetAsync method instead so you can wait for the actor to be populated by the AkkaService (which runs in parallel.)"); + } + } + + /// + public async Task GetAsync(CancellationToken cancellationToken = default) + { + // attempt 1 - used cached value + if (_internalRef != null) + return _internalRef; + + // attempt 2 - synchronously check the registry (fast path) + if (_registry.TryGet(out _internalRef)) + { + return _internalRef; } + + // attempt 3 - wait for the actor to be registered + return await _registry.GetAsync(cancellationToken).ConfigureAwait(false); } } @@ -287,7 +307,7 @@ public bool TryGet(Type key, out IActorRef actor) /// public async Task GetAsync(CancellationToken ct = default) { - return await GetAsync(typeof(TKey), ct); + return await GetAsync(typeof(TKey), ct).ConfigureAwait(false); } /// From 102a5662e7473ed9224259620dfeb8427a6f463d Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 15 Mar 2023 12:34:53 -0500 Subject: [PATCH 4/4] API approvals --- .../verify/CoreApiSpec.ApproveCore.verified.txt | 2 ++ src/Akka.Hosting/ActorRegistry.cs | 13 +------------ 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCore.verified.txt b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCore.verified.txt index 3b804939..20627f74 100644 --- a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCore.verified.txt +++ b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCore.verified.txt @@ -124,6 +124,7 @@ namespace Akka.Hosting public interface IRequiredActor { Akka.Actor.IActorRef ActorRef { get; } + System.Threading.Tasks.Task GetAsync(System.Threading.CancellationToken cancellationToken = default); } public sealed class LoggerConfigBuilder { @@ -152,6 +153,7 @@ namespace Akka.Hosting { public RequiredActor(Akka.Hosting.IReadOnlyActorRegistry registry) { } public Akka.Actor.IActorRef ActorRef { get; } + public System.Threading.Tasks.Task GetAsync(System.Threading.CancellationToken cancellationToken = default) { } } public delegate System.Threading.Tasks.Task StartupTask(Akka.Actor.ActorSystem system, Akka.Hosting.IActorRegistry registry); public enum TriStateValue diff --git a/src/Akka.Hosting/ActorRegistry.cs b/src/Akka.Hosting/ActorRegistry.cs index 192c3df1..4290263b 100644 --- a/src/Akka.Hosting/ActorRegistry.cs +++ b/src/Akka.Hosting/ActorRegistry.cs @@ -36,18 +36,7 @@ public interface IRequiredActor /// A Task that will return the using the given key. Task GetAsync(CancellationToken cancellationToken = default); } - - /// - /// INTERNAL API - /// - internal static class RequiredActorDefaults - { - /// - /// Used to timeout sync-over-async operations used when retrieving actors from the registry. - /// - public static readonly TimeSpan ActorFetchTimeout = TimeSpan.FromSeconds(5); - } - + /// /// INTERNAL API ///