Skip to content

Commit

Permalink
Merge pull request #775 from glucaci/postMiddleware
Browse files Browse the repository at this point in the history
Add execute middleware phase
  • Loading branch information
danielgerlag authored Apr 19, 2021
2 parents 674929f + 045cb49 commit db42f91
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 48 deletions.
55 changes: 55 additions & 0 deletions ReleaseNotes/3.4.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Workflow Core 3.4.0

## Execute Workflow Middleware

These middleware get run after each workflow execution and can be used to perform additional actions or build metrics/statistics for all workflows in your app.

The following example illustrates how you can use a execute workflow middleware to build [prometheus](https://prometheus.io/) metrics.

Note that you use `WorkflowMiddlewarePhase.ExecuteWorkflow` to specify that it runs after each workflow execution.

**Important:** You should call `next` as part of the workflow middleware to ensure that the next workflow in the chain runs.

```cs
public class MetricsMiddleware : IWorkflowMiddleware
{
private readonly ConcurrentHashSet<string>() _suspendedWorkflows =
new ConcurrentHashSet<string>();

private readonly Counter _completed;
private readonly Counter _suspended;

public MetricsMiddleware()
{
_completed = Prometheus.Metrics.CreateCounter(
"workflow_completed", "Workflow completed");

_suspended = Prometheus.Metrics.CreateCounter(
"workflow_suspended", "Workflow suspended");
}

public WorkflowMiddlewarePhase Phase =>
WorkflowMiddlewarePhase.ExecuteWorkflow;

public Task HandleAsync(
WorkflowInstance workflow,
WorkflowDelegate next)
{
switch (workflow.Status)
{
case WorkflowStatus.Complete:
if (_suspendedWorkflows.TryRemove(workflow.Id))
{
_suspended.Dec();
}
_completed.Inc();
break;
case WorkflowStatus.Suspended:
_suspended.Inc();
break;
}

return next();
}
}
```
7 changes: 6 additions & 1 deletion src/WorkflowCore/Interface/IWorkflowMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ public enum WorkflowMiddlewarePhase
/// <summary>
/// The middleware should run after a workflow completes.
/// </summary>
PostWorkflow
PostWorkflow,

/// <summary>
/// The middleware should run after each workflow execution.
/// </summary>
ExecuteWorkflow
}

/// <summary>
Expand Down
13 changes: 12 additions & 1 deletion src/WorkflowCore/Interface/IWorkflowMiddlewareRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace WorkflowCore.Interface
{
/// <summary>
/// Runs workflow pre/post middleware.
/// Runs workflow pre/post and execute middleware.
/// </summary>
public interface IWorkflowMiddlewareRunner
{
Expand All @@ -29,5 +29,16 @@ public interface IWorkflowMiddlewareRunner
/// <param name="def">The <see cref="WorkflowDefinition"/> definition.</param>
/// <returns>A task that will complete when all middleware has run.</returns>
Task RunPostMiddleware(WorkflowInstance workflow, WorkflowDefinition def);

/// <summary>
/// Runs workflow-level middleware that is set to run at the
/// <see cref="WorkflowMiddlewarePhase.ExecuteWorkflow"/> phase. Middleware will be run in the
/// order in which they were registered with DI with middleware declared earlier starting earlier and
/// completing later.
/// </summary>
/// <param name="workflow">The <see cref="WorkflowInstance"/> to run for.</param>
/// <param name="def">The <see cref="WorkflowDefinition"/> definition.</param>
/// <returns>A task that will complete when all middleware has run.</returns>
Task RunExecuteMiddleware(WorkflowInstance workflow, WorkflowDefinition def);
}
}
1 change: 1 addition & 0 deletions src/WorkflowCore/Models/WorkflowDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class WorkflowDefinition
public WorkflowErrorHandling DefaultErrorBehavior { get; set; }

public Type OnPostMiddlewareError { get; set; }
public Type OnExecuteMiddlewareError { get; set; }

public TimeSpan? DefaultErrorRetryInterval { get; set; }

Expand Down
10 changes: 10 additions & 0 deletions src/WorkflowCore/Services/WorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow, Can
ProcessAfterExecutionIteration(workflow, def, wfResult);
await DetermineNextExecutionTime(workflow, def);

using (var scope = _serviceProvider.CreateScope())
{
var middlewareRunner = scope.ServiceProvider.GetRequiredService<IWorkflowMiddlewareRunner>();
await middlewareRunner.RunExecuteMiddleware(workflow, def);
}

return wfResult;
}

Expand Down Expand Up @@ -213,7 +219,9 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
workflow.NextExecution = null;

if (workflow.Status == WorkflowStatus.Complete)
{
return;
}

foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && (x.Children ?? new List<string>()).Count == 0))
{
Expand Down Expand Up @@ -243,7 +251,9 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
}

if ((workflow.NextExecution != null) || (workflow.ExecutionPointers.Any(x => x.EndTime == null)))
{
return;
}

workflow.Status = WorkflowStatus.Complete;
workflow.CompleteTime = _datetimeProvider.UtcNow;
Expand Down
76 changes: 35 additions & 41 deletions src/WorkflowCore/Services/WorkflowMiddlewareRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace WorkflowCore.Services
{
/// <inheritdoc />
/// <inheritdoc cref="IWorkflowMiddlewareRunner" />
public class WorkflowMiddlewareRunner : IWorkflowMiddlewareRunner
{
private static readonly WorkflowDelegate NoopWorkflowDelegate = () => Task.CompletedTask;
Expand All @@ -17,80 +17,74 @@ public class WorkflowMiddlewareRunner : IWorkflowMiddlewareRunner

public WorkflowMiddlewareRunner(
IEnumerable<IWorkflowMiddleware> middleware,
IServiceProvider serviceProvider
)
IServiceProvider serviceProvider)
{
_middleware = middleware;
_serviceProvider = serviceProvider;
}


/// <summary>
/// Runs workflow-level middleware that is set to run at the
/// <see cref="WorkflowMiddlewarePhase.PreWorkflow"/> phase. Middleware will be run in the
/// order in which they were registered with DI with middleware declared earlier starting earlier and
/// completing later.
/// </summary>
/// <param name="workflow">The <see cref="WorkflowInstance"/> to run for.</param>
/// <param name="def">The <see cref="WorkflowDefinition"/> definition.</param>
/// <returns>A task that will complete when all middleware has run.</returns>
/// <inheritdoc cref="IWorkflowMiddlewareRunner.RunPreMiddleware"/>
public async Task RunPreMiddleware(WorkflowInstance workflow, WorkflowDefinition def)
{
var preMiddleware = _middleware
.Where(m => m.Phase == WorkflowMiddlewarePhase.PreWorkflow)
.ToArray();
.Where(m => m.Phase == WorkflowMiddlewarePhase.PreWorkflow);

await RunWorkflowMiddleware(workflow, preMiddleware);
}

/// <summary>
/// Runs workflow-level middleware that is set to run at the
/// <see cref="WorkflowMiddlewarePhase.PostWorkflow"/> phase. Middleware will be run in the
/// order in which they were registered with DI with middleware declared earlier starting earlier and
/// completing later.
/// </summary>
/// <param name="workflow">The <see cref="WorkflowInstance"/> to run for.</param>
/// <param name="def">The <see cref="WorkflowDefinition"/> definition.</param>
/// <returns>A task that will complete when all middleware has run.</returns>
public async Task RunPostMiddleware(WorkflowInstance workflow, WorkflowDefinition def)
/// <inheritdoc cref="IWorkflowMiddlewareRunner.RunPostMiddleware"/>
public Task RunPostMiddleware(WorkflowInstance workflow, WorkflowDefinition def)
{
var postMiddleware = _middleware
.Where(m => m.Phase == WorkflowMiddlewarePhase.PostWorkflow)
.ToArray();
return RunWorkflowMiddlewareWithErrorHandling(
workflow,
WorkflowMiddlewarePhase.PostWorkflow,
def.OnPostMiddlewareError);
}

/// <inheritdoc cref="IWorkflowMiddlewareRunner.RunExecuteMiddleware"/>
public Task RunExecuteMiddleware(WorkflowInstance workflow, WorkflowDefinition def)
{
return RunWorkflowMiddlewareWithErrorHandling(
workflow,
WorkflowMiddlewarePhase.ExecuteWorkflow,
def.OnExecuteMiddlewareError);
}

public async Task RunWorkflowMiddlewareWithErrorHandling(
WorkflowInstance workflow,
WorkflowMiddlewarePhase phase,
Type middlewareErrorType)
{
var middleware = _middleware.Where(m => m.Phase == phase);

try
{
await RunWorkflowMiddleware(workflow, postMiddleware);
await RunWorkflowMiddleware(workflow, middleware);
}
catch (Exception exception)
{
// On error, determine which error handler to run and then run it
var errorHandlerType = def.OnPostMiddlewareError ?? typeof(IWorkflowMiddlewareErrorHandler);
var errorHandlerType = middlewareErrorType ?? typeof(IWorkflowMiddlewareErrorHandler);

using (var scope = _serviceProvider.CreateScope())
{
var typeInstance = scope.ServiceProvider.GetService(errorHandlerType);
if (typeInstance != null && typeInstance is IWorkflowMiddlewareErrorHandler handler)
if (typeInstance is IWorkflowMiddlewareErrorHandler handler)
{
await handler.HandleAsync(exception);
}
}
}
}

private static async Task RunWorkflowMiddleware(
private static Task RunWorkflowMiddleware(
WorkflowInstance workflow,
IEnumerable<IWorkflowMiddleware> middlewareCollection
)
IEnumerable<IWorkflowMiddleware> middlewareCollection)
{
// Build the middleware chain
var middlewareChain = middlewareCollection
return middlewareCollection
.Reverse()
.Aggregate(
NoopWorkflowDelegate,
(previous, middleware) => () => middleware.HandleAsync(workflow, previous)
);

await middlewareChain();
(previous, middleware) => () => middleware.HandleAsync(workflow, previous))();
}
}
}
8 changes: 4 additions & 4 deletions src/WorkflowCore/WorkflowCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<Description>Workflow Core is a light weight workflow engine targeting .NET Standard.</Description>
<Version>3.3.6</Version>
<AssemblyVersion>3.3.6.0</AssemblyVersion>
<FileVersion>3.3.6.0</FileVersion>
<Version>3.4.0</Version>
<AssemblyVersion>3.4.0.0</AssemblyVersion>
<FileVersion>3.4.0.0</FileVersion>
<PackageReleaseNotes></PackageReleaseNotes>
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
<PackageVersion>3.3.6</PackageVersion>
<PackageVersion>3.4.0</PackageVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
70 changes: 69 additions & 1 deletion test/WorkflowCore.UnitTests/Services/WorkflowExecutorFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ public WorkflowExecutorFixture()

Options = new WorkflowOptions(A.Fake<IServiceCollection>());

var stepExecutionScope = A.Fake<IServiceScope>();
A.CallTo(() => ScopeProvider.CreateScope(A<IStepExecutionContext>._)).Returns(stepExecutionScope);
A.CallTo(() => stepExecutionScope.ServiceProvider).Returns(ServiceProvider);

var scope = A.Fake<IServiceScope>();
A.CallTo(() => ScopeProvider.CreateScope(A<IStepExecutionContext>._)).Returns(scope);
var scopeFactory = A.Fake<IServiceScopeFactory>();
A.CallTo(() => ServiceProvider.GetService(typeof(IServiceScopeFactory))).Returns(scopeFactory);
A.CallTo(() => scopeFactory.CreateScope()).Returns(scope);
A.CallTo(() => scope.ServiceProvider).Returns(ServiceProvider);

A.CallTo(() => DateTimeProvider.Now).Returns(DateTime.Now);
Expand All @@ -63,6 +69,10 @@ public WorkflowExecutorFixture()
.RunPostMiddleware(A<WorkflowInstance>._, A<WorkflowDefinition>._))
.Returns(Task.CompletedTask);

A.CallTo(() => MiddlewareRunner
.RunExecuteMiddleware(A<WorkflowInstance>._, A<WorkflowDefinition>._))
.Returns(Task.CompletedTask);

A.CallTo(() => StepExecutor.ExecuteStep(A<IStepExecutionContext>._, A<IStepBody>._))
.ReturnsLazily(call =>
call.Arguments[1].As<IStepBody>().RunAsync(
Expand Down Expand Up @@ -105,6 +115,64 @@ public void should_execute_active_step()
A.CallTo(() => ResultProcesser.ProcessExecutionResult(instance, A<WorkflowDefinition>.Ignored, A<ExecutionPointer>.Ignored, step1, A<ExecutionResult>.Ignored, A<WorkflowExecutorResult>.Ignored)).MustHaveHappened();
}

[Fact(DisplayName = "Should call execute middleware when not completed")]
public void should_call_execute_middleware_when_not_completed()
{
//arrange
var step1Body = A.Fake<IStepBody>();
A.CallTo(() => step1Body.RunAsync(A<IStepExecutionContext>.Ignored)).Returns(ExecutionResult.Next());
WorkflowStep step1 = BuildFakeStep(step1Body);
Given1StepWorkflow(step1, "Workflow", 1);

var instance = new WorkflowInstance
{
WorkflowDefinitionId = "Workflow",
Version = 1,
Status = WorkflowStatus.Runnable,
NextExecution = 0,
Id = "001",
ExecutionPointers = new ExecutionPointerCollection(new List<ExecutionPointer>
{
new ExecutionPointer { Id = "1", Active = true, StepId = 0 }
})
};

//act
Subject.Execute(instance);

//assert
A.CallTo(() => MiddlewareRunner.RunExecuteMiddleware(instance, A<WorkflowDefinition>.Ignored)).MustHaveHappened();
}

[Fact(DisplayName = "Should not call post middleware when not completed")]
public void should_not_call_post_middleware_when_not_completed()
{
//arrange
var step1Body = A.Fake<IStepBody>();
A.CallTo(() => step1Body.RunAsync(A<IStepExecutionContext>.Ignored)).Returns(ExecutionResult.Next());
WorkflowStep step1 = BuildFakeStep(step1Body);
Given1StepWorkflow(step1, "Workflow", 1);

var instance = new WorkflowInstance
{
WorkflowDefinitionId = "Workflow",
Version = 1,
Status = WorkflowStatus.Runnable,
NextExecution = 0,
Id = "001",
ExecutionPointers = new ExecutionPointerCollection(new List<ExecutionPointer>
{
new ExecutionPointer { Id = "1", Active = true, StepId = 0 }
})
};

//act
Subject.Execute(instance);

//assert
A.CallTo(() => MiddlewareRunner.RunPostMiddleware(instance, A<WorkflowDefinition>.Ignored)).MustNotHaveHappened();
}

[Fact(DisplayName = "Should trigger step hooks")]
public void should_trigger_step_hooks()
{
Expand Down
Loading

0 comments on commit db42f91

Please sign in to comment.