diff --git a/ReleaseNotes/3.4.0.md b/ReleaseNotes/3.4.0.md new file mode 100644 index 000000000..ca724c4b2 --- /dev/null +++ b/ReleaseNotes/3.4.0.md @@ -0,0 +1,55 @@ +# Workflow Core 3.4.0 + +## Execute Workflow Middleware + +These middleware get run after each workflow execution and can be used to perform additional actions or build metrics/statistics for all workflows in your app. + +The following example illustrates how you can use a execute workflow middleware to build [prometheus](https://prometheus.io/) metrics. + +Note that you use `WorkflowMiddlewarePhase.ExecuteWorkflow` to specify that it runs after each workflow execution. + +**Important:** You should call `next` as part of the workflow middleware to ensure that the next workflow in the chain runs. + +```cs +public class MetricsMiddleware : IWorkflowMiddleware +{ + private readonly ConcurrentHashSet() _suspendedWorkflows = + new ConcurrentHashSet(); + + private readonly Counter _completed; + private readonly Counter _suspended; + + public MetricsMiddleware() + { + _completed = Prometheus.Metrics.CreateCounter( + "workflow_completed", "Workflow completed"); + + _suspended = Prometheus.Metrics.CreateCounter( + "workflow_suspended", "Workflow suspended"); + } + + public WorkflowMiddlewarePhase Phase => + WorkflowMiddlewarePhase.ExecuteWorkflow; + + public Task HandleAsync( + WorkflowInstance workflow, + WorkflowDelegate next) + { + switch (workflow.Status) + { + case WorkflowStatus.Complete: + if (_suspendedWorkflows.TryRemove(workflow.Id)) + { + _suspended.Dec(); + } + _completed.Inc(); + break; + case WorkflowStatus.Suspended: + _suspended.Inc(); + break; + } + + return next(); + } +} +``` \ No newline at end of file diff --git a/src/WorkflowCore/Interface/IWorkflowMiddleware.cs b/src/WorkflowCore/Interface/IWorkflowMiddleware.cs index 71781b30d..ede4ca8ec 100644 --- a/src/WorkflowCore/Interface/IWorkflowMiddleware.cs +++ b/src/WorkflowCore/Interface/IWorkflowMiddleware.cs @@ -16,7 +16,12 @@ public enum WorkflowMiddlewarePhase /// /// The middleware should run after a workflow completes. /// - PostWorkflow + PostWorkflow, + + /// + /// The middleware should run after each workflow execution. + /// + ExecuteWorkflow } /// diff --git a/src/WorkflowCore/Interface/IWorkflowMiddlewareRunner.cs b/src/WorkflowCore/Interface/IWorkflowMiddlewareRunner.cs index 96aab8b74..6c47899f5 100644 --- a/src/WorkflowCore/Interface/IWorkflowMiddlewareRunner.cs +++ b/src/WorkflowCore/Interface/IWorkflowMiddlewareRunner.cs @@ -4,7 +4,7 @@ namespace WorkflowCore.Interface { /// - /// Runs workflow pre/post middleware. + /// Runs workflow pre/post and execute middleware. /// public interface IWorkflowMiddlewareRunner { @@ -29,5 +29,16 @@ public interface IWorkflowMiddlewareRunner /// The definition. /// A task that will complete when all middleware has run. Task RunPostMiddleware(WorkflowInstance workflow, WorkflowDefinition def); + + /// + /// Runs workflow-level middleware that is set to run at the + /// phase. Middleware will be run in the + /// order in which they were registered with DI with middleware declared earlier starting earlier and + /// completing later. + /// + /// The to run for. + /// The definition. + /// A task that will complete when all middleware has run. + Task RunExecuteMiddleware(WorkflowInstance workflow, WorkflowDefinition def); } } diff --git a/src/WorkflowCore/Models/WorkflowDefinition.cs b/src/WorkflowCore/Models/WorkflowDefinition.cs index 0cea6cc40..d40fc5a2e 100644 --- a/src/WorkflowCore/Models/WorkflowDefinition.cs +++ b/src/WorkflowCore/Models/WorkflowDefinition.cs @@ -18,6 +18,7 @@ public class WorkflowDefinition public WorkflowErrorHandling DefaultErrorBehavior { get; set; } public Type OnPostMiddlewareError { get; set; } + public Type OnExecuteMiddlewareError { get; set; } public TimeSpan? DefaultErrorRetryInterval { get; set; } diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs index cb186f59d..ebdc818fe 100755 --- a/src/WorkflowCore/Services/WorkflowExecutor.cs +++ b/src/WorkflowCore/Services/WorkflowExecutor.cs @@ -99,6 +99,12 @@ public async Task Execute(WorkflowInstance workflow, Can ProcessAfterExecutionIteration(workflow, def, wfResult); await DetermineNextExecutionTime(workflow, def); + using (var scope = _serviceProvider.CreateScope()) + { + var middlewareRunner = scope.ServiceProvider.GetRequiredService(); + await middlewareRunner.RunExecuteMiddleware(workflow, def); + } + return wfResult; } @@ -213,7 +219,9 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo workflow.NextExecution = null; if (workflow.Status == WorkflowStatus.Complete) + { return; + } foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && (x.Children ?? new List()).Count == 0)) { @@ -243,7 +251,9 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo } if ((workflow.NextExecution != null) || (workflow.ExecutionPointers.Any(x => x.EndTime == null))) + { return; + } workflow.Status = WorkflowStatus.Complete; workflow.CompleteTime = _datetimeProvider.UtcNow; diff --git a/src/WorkflowCore/Services/WorkflowMiddlewareRunner.cs b/src/WorkflowCore/Services/WorkflowMiddlewareRunner.cs index 6eb6968b8..d904c5aa5 100644 --- a/src/WorkflowCore/Services/WorkflowMiddlewareRunner.cs +++ b/src/WorkflowCore/Services/WorkflowMiddlewareRunner.cs @@ -8,7 +8,7 @@ namespace WorkflowCore.Services { - /// + /// public class WorkflowMiddlewareRunner : IWorkflowMiddlewareRunner { private static readonly WorkflowDelegate NoopWorkflowDelegate = () => Task.CompletedTask; @@ -17,59 +17,58 @@ public class WorkflowMiddlewareRunner : IWorkflowMiddlewareRunner public WorkflowMiddlewareRunner( IEnumerable middleware, - IServiceProvider serviceProvider - ) + IServiceProvider serviceProvider) { _middleware = middleware; _serviceProvider = serviceProvider; } - - /// - /// Runs workflow-level middleware that is set to run at the - /// phase. Middleware will be run in the - /// order in which they were registered with DI with middleware declared earlier starting earlier and - /// completing later. - /// - /// The to run for. - /// The definition. - /// A task that will complete when all middleware has run. + /// public async Task RunPreMiddleware(WorkflowInstance workflow, WorkflowDefinition def) { var preMiddleware = _middleware - .Where(m => m.Phase == WorkflowMiddlewarePhase.PreWorkflow) - .ToArray(); + .Where(m => m.Phase == WorkflowMiddlewarePhase.PreWorkflow); await RunWorkflowMiddleware(workflow, preMiddleware); } - /// - /// Runs workflow-level middleware that is set to run at the - /// phase. Middleware will be run in the - /// order in which they were registered with DI with middleware declared earlier starting earlier and - /// completing later. - /// - /// The to run for. - /// The definition. - /// A task that will complete when all middleware has run. - public async Task RunPostMiddleware(WorkflowInstance workflow, WorkflowDefinition def) + /// + public Task RunPostMiddleware(WorkflowInstance workflow, WorkflowDefinition def) { - var postMiddleware = _middleware - .Where(m => m.Phase == WorkflowMiddlewarePhase.PostWorkflow) - .ToArray(); + return RunWorkflowMiddlewareWithErrorHandling( + workflow, + WorkflowMiddlewarePhase.PostWorkflow, + def.OnPostMiddlewareError); + } + + /// + public Task RunExecuteMiddleware(WorkflowInstance workflow, WorkflowDefinition def) + { + return RunWorkflowMiddlewareWithErrorHandling( + workflow, + WorkflowMiddlewarePhase.ExecuteWorkflow, + def.OnExecuteMiddlewareError); + } + + public async Task RunWorkflowMiddlewareWithErrorHandling( + WorkflowInstance workflow, + WorkflowMiddlewarePhase phase, + Type middlewareErrorType) + { + var middleware = _middleware.Where(m => m.Phase == phase); try { - await RunWorkflowMiddleware(workflow, postMiddleware); + await RunWorkflowMiddleware(workflow, middleware); } catch (Exception exception) { - // On error, determine which error handler to run and then run it - var errorHandlerType = def.OnPostMiddlewareError ?? typeof(IWorkflowMiddlewareErrorHandler); + var errorHandlerType = middlewareErrorType ?? typeof(IWorkflowMiddlewareErrorHandler); + using (var scope = _serviceProvider.CreateScope()) { var typeInstance = scope.ServiceProvider.GetService(errorHandlerType); - if (typeInstance != null && typeInstance is IWorkflowMiddlewareErrorHandler handler) + if (typeInstance is IWorkflowMiddlewareErrorHandler handler) { await handler.HandleAsync(exception); } @@ -77,20 +76,15 @@ public async Task RunPostMiddleware(WorkflowInstance workflow, WorkflowDefinitio } } - private static async Task RunWorkflowMiddleware( + private static Task RunWorkflowMiddleware( WorkflowInstance workflow, - IEnumerable middlewareCollection - ) + IEnumerable middlewareCollection) { - // Build the middleware chain - var middlewareChain = middlewareCollection + return middlewareCollection .Reverse() .Aggregate( NoopWorkflowDelegate, - (previous, middleware) => () => middleware.HandleAsync(workflow, previous) - ); - - await middlewareChain(); + (previous, middleware) => () => middleware.HandleAsync(workflow, previous))(); } } } diff --git a/src/WorkflowCore/WorkflowCore.csproj b/src/WorkflowCore/WorkflowCore.csproj index 2ef5a4a1c..a439d3bac 100644 --- a/src/WorkflowCore/WorkflowCore.csproj +++ b/src/WorkflowCore/WorkflowCore.csproj @@ -15,12 +15,12 @@ false false Workflow Core is a light weight workflow engine targeting .NET Standard. - 3.3.6 - 3.3.6.0 - 3.3.6.0 + 3.4.0 + 3.4.0.0 + 3.4.0.0 https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png - 3.3.6 + 3.4.0 diff --git a/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs b/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs index 037dd0e33..e548b8e88 100644 --- a/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs +++ b/test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs @@ -44,8 +44,14 @@ public WorkflowExecutorFixture() Options = new WorkflowOptions(A.Fake()); + var stepExecutionScope = A.Fake(); + A.CallTo(() => ScopeProvider.CreateScope(A._)).Returns(stepExecutionScope); + A.CallTo(() => stepExecutionScope.ServiceProvider).Returns(ServiceProvider); + var scope = A.Fake(); - A.CallTo(() => ScopeProvider.CreateScope(A._)).Returns(scope); + var scopeFactory = A.Fake(); + A.CallTo(() => ServiceProvider.GetService(typeof(IServiceScopeFactory))).Returns(scopeFactory); + A.CallTo(() => scopeFactory.CreateScope()).Returns(scope); A.CallTo(() => scope.ServiceProvider).Returns(ServiceProvider); A.CallTo(() => DateTimeProvider.Now).Returns(DateTime.Now); @@ -63,6 +69,10 @@ public WorkflowExecutorFixture() .RunPostMiddleware(A._, A._)) .Returns(Task.CompletedTask); + A.CallTo(() => MiddlewareRunner + .RunExecuteMiddleware(A._, A._)) + .Returns(Task.CompletedTask); + A.CallTo(() => StepExecutor.ExecuteStep(A._, A._)) .ReturnsLazily(call => call.Arguments[1].As().RunAsync( @@ -105,6 +115,64 @@ public void should_execute_active_step() A.CallTo(() => ResultProcesser.ProcessExecutionResult(instance, A.Ignored, A.Ignored, step1, A.Ignored, A.Ignored)).MustHaveHappened(); } + [Fact(DisplayName = "Should call execute middleware when not completed")] + public void should_call_execute_middleware_when_not_completed() + { + //arrange + var step1Body = A.Fake(); + A.CallTo(() => step1Body.RunAsync(A.Ignored)).Returns(ExecutionResult.Next()); + WorkflowStep step1 = BuildFakeStep(step1Body); + Given1StepWorkflow(step1, "Workflow", 1); + + var instance = new WorkflowInstance + { + WorkflowDefinitionId = "Workflow", + Version = 1, + Status = WorkflowStatus.Runnable, + NextExecution = 0, + Id = "001", + ExecutionPointers = new ExecutionPointerCollection(new List + { + new ExecutionPointer { Id = "1", Active = true, StepId = 0 } + }) + }; + + //act + Subject.Execute(instance); + + //assert + A.CallTo(() => MiddlewareRunner.RunExecuteMiddleware(instance, A.Ignored)).MustHaveHappened(); + } + + [Fact(DisplayName = "Should not call post middleware when not completed")] + public void should_not_call_post_middleware_when_not_completed() + { + //arrange + var step1Body = A.Fake(); + A.CallTo(() => step1Body.RunAsync(A.Ignored)).Returns(ExecutionResult.Next()); + WorkflowStep step1 = BuildFakeStep(step1Body); + Given1StepWorkflow(step1, "Workflow", 1); + + var instance = new WorkflowInstance + { + WorkflowDefinitionId = "Workflow", + Version = 1, + Status = WorkflowStatus.Runnable, + NextExecution = 0, + Id = "001", + ExecutionPointers = new ExecutionPointerCollection(new List + { + new ExecutionPointer { Id = "1", Active = true, StepId = 0 } + }) + }; + + //act + Subject.Execute(instance); + + //assert + A.CallTo(() => MiddlewareRunner.RunPostMiddleware(instance, A.Ignored)).MustNotHaveHappened(); + } + [Fact(DisplayName = "Should trigger step hooks")] public void should_trigger_step_hooks() { diff --git a/test/WorkflowCore.UnitTests/Services/WorkflowMiddlewareRunnerTests.cs b/test/WorkflowCore.UnitTests/Services/WorkflowMiddlewareRunnerTests.cs index 991acfb23..4b58d965f 100644 --- a/test/WorkflowCore.UnitTests/Services/WorkflowMiddlewareRunnerTests.cs +++ b/test/WorkflowCore.UnitTests/Services/WorkflowMiddlewareRunnerTests.cs @@ -238,6 +238,117 @@ public async Task .MustHaveHappenedOnceExactly(); } + [Fact(DisplayName = "RunExecuteMiddleware should run nothing when no middleware")] + public void RunExecuteMiddleware_should_run_nothing_when_no_middleware() + { + // Act + Func action = async () => await Runner.RunExecuteMiddleware(Workflow, Definition); + + // Assert + action.ShouldNotThrow(); + } + + [Fact(DisplayName = "RunExecuteMiddleware should run middleware when one middleware")] + public async Task RunExecuteMiddleware_should_run_middleware_when_one_middleware() + { + // Arrange + var middleware = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.ExecuteWorkflow); + Middleware.Add(middleware); + + // Act + await Runner.RunExecuteMiddleware(Workflow, Definition); + + // Assert + A + .CallTo(HandleMethodFor(middleware)) + .MustHaveHappenedOnceExactly(); + } + + [Fact(DisplayName = "RunExecuteMiddleware should run all middleware when multiple middleware")] + public async Task RunExecuteMiddleware_should_run_all_middleware_when_multiple_middleware() + { + // Arrange + var middleware1 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.ExecuteWorkflow, 1); + var middleware2 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.ExecuteWorkflow, 2); + var middleware3 = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.ExecuteWorkflow, 3); + Middleware.AddRange(new[] { middleware1, middleware2, middleware3 }); + + // Act + await Runner.RunExecuteMiddleware(Workflow, Definition); + + // Assert + A + .CallTo(HandleMethodFor(middleware3)) + .MustHaveHappenedOnceExactly() + .Then(A + .CallTo(HandleMethodFor(middleware2)) + .MustHaveHappenedOnceExactly()) + .Then(A + .CallTo(HandleMethodFor(middleware1)) + .MustHaveHappenedOnceExactly()); + } + + [Fact(DisplayName = "RunExecuteMiddleware should only run middleware in ExecuteWorkflow phase")] + public async Task RunExecuteMiddleware_should_only_run_middleware_in_ExecuteWorkflow_phase() + { + // Arrange + var executeMiddleware = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.ExecuteWorkflow, 1); + var postMiddleware = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PostWorkflow, 2); + var preMiddleware = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.PreWorkflow, 3); + Middleware.AddRange(new[] { preMiddleware, postMiddleware, executeMiddleware }); + + // Act + await Runner.RunExecuteMiddleware(Workflow, Definition); + + // Assert + A + .CallTo(HandleMethodFor(executeMiddleware)) + .MustHaveHappenedOnceExactly(); + + A.CallTo(HandleMethodFor(preMiddleware)).MustNotHaveHappened(); + A.CallTo(HandleMethodFor(postMiddleware)).MustNotHaveHappened(); + } + + [Fact(DisplayName = "RunExecuteMiddleware should call top level error handler when middleware throws")] + public async Task RunExecuteMiddleware_should_call_top_level_error_handler_when_middleware_throws() + { + // Arrange + var middleware = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.ExecuteWorkflow, 1); + A.CallTo(HandleMethodFor(middleware)).ThrowsAsync(new ApplicationException("Something went wrong")); + Middleware.AddRange(new[] { middleware }); + + // Act + await Runner.RunExecuteMiddleware(Workflow, Definition); + + // Assert + A + .CallTo(HandleMethodFor(TopLevelErrorHandler)) + .MustHaveHappenedOnceExactly(); + } + + [Fact(DisplayName = + "RunExecuteMiddleware should call error handler on workflow def when middleware throws and def has handler defined")] + public async Task + RunExecuteMiddleware_should_call_error_handler_on_workflow_def_when_middleware_throws_and_def_has_handler() + { + // Arrange + var middleware = BuildWorkflowMiddleware(WorkflowMiddlewarePhase.ExecuteWorkflow, 1); + A.CallTo(HandleMethodFor(middleware)).ThrowsAsync(new ApplicationException("Something went wrong")); + Middleware.AddRange(new[] { middleware }); + Definition.OnExecuteMiddlewareError = typeof(IDefLevelErrorHandler); + + // Act + await Runner.RunExecuteMiddleware(Workflow, Definition); + + // Assert + A + .CallTo(HandleMethodFor(TopLevelErrorHandler)) + .MustNotHaveHappened(); + A + .CallTo(HandleMethodFor(DefLevelErrorHandler)) + .MustHaveHappenedOnceExactly(); + } + #region Helpers private IWorkflowMiddleware BuildWorkflowMiddleware(