Skip to content

Commit

Permalink
Merge pull request #936 from danielgerlag/scheduled-commands
Browse files Browse the repository at this point in the history
Scheduled commands
  • Loading branch information
danielgerlag authored Oct 31, 2021
2 parents 5fbaa59 + eb28727 commit b45525a
Show file tree
Hide file tree
Showing 41 changed files with 2,314 additions and 130 deletions.
8 changes: 8 additions & 0 deletions ReleaseNotes/3.6.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Workflow Core 3.6.0

## Scheduled Commands

Introduces the ability to schedule delayed commands to process a workflow or event, by persisting them to storage.
This is the first step toward removing constant polling of the DB. It also filters out duplicate work items on the queue which is the current problem the greylist tries to solve.
Initial implementation is supported by MongoDb, SQL Server, PostgeSQL, MySQL and SQLite.
Additional support from the other persistence providers to follow.
2 changes: 2 additions & 0 deletions WorkflowCore.sln
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ReleaseNotes", "ReleaseNote
ReleaseNotes\3.0.0.md = ReleaseNotes\3.0.0.md
ReleaseNotes\3.1.0.md = ReleaseNotes\3.1.0.md
ReleaseNotes\3.3.0.md = ReleaseNotes\3.3.0.md
ReleaseNotes\3.4.0.md = ReleaseNotes\3.4.0.md
ReleaseNotes\3.6.0.md = ReleaseNotes\3.6.0.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample14", "src\samples\WorkflowCore.Sample14\WorkflowCore.Sample14.csproj", "{6BC66637-B42A-4334-ADFB-DBEC9F29D293}"
Expand Down
8 changes: 4 additions & 4 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
<PackageLicenseUrl>https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md</PackageLicenseUrl>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/danielgerlag/workflow-core.git</RepositoryUrl>
<Version>3.5.7</Version>
<AssemblyVersion>3.5.7.0</AssemblyVersion>
<FileVersion>3.5.7.0</FileVersion>
<Version>3.6.0</Version>
<AssemblyVersion>3.6.0.0</AssemblyVersion>
<FileVersion>3.6.0.0</FileVersion>
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
<PackageVersion>3.5.7</PackageVersion>
<PackageVersion>3.6.0</PackageVersion>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace WorkflowCore.Interface
{
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository, IScheduledCommandRepository
{

Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using WorkflowCore.Models;

namespace WorkflowCore.Interface
{
public interface IScheduledCommandRepository
{
bool SupportsScheduledCommands { get; }

Task ScheduleCommand(ScheduledCommand command);

Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default);
}
}
16 changes: 16 additions & 0 deletions src/WorkflowCore/Models/ScheduledCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace WorkflowCore.Models
{
public class ScheduledCommand
{
public const string ProcessWorkflow = "ProcessWorkflow";
public const string ProcessEvent = "ProcessEvent";

public string CommandName { get; set; }
public string Data { get; set; }
public long ExecuteTime { get; set; }
}
}
83 changes: 81 additions & 2 deletions src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using WorkflowCore.Interface;
using WorkflowCore.Models;
Expand Down Expand Up @@ -48,17 +49,41 @@ public void Stop()
/// Poll the persistence store for stashed unpublished events
/// </summary>
private async void PollRunnables(object target)
{
await PollWorkflows();
await PollEvents();
await PollCommands();
}

private async Task PollWorkflows()
{
try
{
if (await _lockProvider.AcquireLock("poll runnables", new CancellationToken()))
{
try
{
_logger.LogInformation("Polling for runnable workflows");
_logger.LogInformation("Polling for runnable workflows");
var runnables = await _persistenceStore.GetRunnableInstances(_dateTimeProvider.Now);
foreach (var item in runnables)
{
if (_persistenceStore.SupportsScheduledCommands)
{
try
{
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
{
CommandName = ScheduledCommand.ProcessWorkflow,
Data = item,
ExecuteTime = _dateTimeProvider.UtcNow.Ticks
});
continue;
}
catch (Exception ex)
{
_logger.LogError(ex, ex.Message);
}
}
if (_greylist.Contains($"wf:{item}"))
{
_logger.LogDebug($"Got greylisted workflow {item}");
Expand All @@ -79,17 +104,37 @@ private async void PollRunnables(object target)
{
_logger.LogError(ex, ex.Message);
}
}

private async Task PollEvents()
{
try
{
if (await _lockProvider.AcquireLock("unprocessed events", new CancellationToken()))
{
try
{
_logger.LogInformation("Polling for unprocessed events");
_logger.LogInformation("Polling for unprocessed events");
var events = await _persistenceStore.GetRunnableEvents(_dateTimeProvider.Now);
foreach (var item in events.ToList())
{
if (_persistenceStore.SupportsScheduledCommands)
{
try
{
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
{
CommandName = ScheduledCommand.ProcessEvent,
Data = item,
ExecuteTime = _dateTimeProvider.UtcNow.Ticks
});
continue;
}
catch (Exception ex)
{
_logger.LogError(ex, ex.Message);
}
}
if (_greylist.Contains($"evt:{item}"))
{
_logger.LogDebug($"Got greylisted event {item}");
Expand All @@ -111,5 +156,39 @@ private async void PollRunnables(object target)
_logger.LogError(ex, ex.Message);
}
}

private async Task PollCommands()
{
try
{
if (await _lockProvider.AcquireLock("poll-commands", new CancellationToken()))
{
try
{
_logger.LogInformation("Polling for scheduled commands");
await _persistenceStore.ProcessCommands(new DateTimeOffset(_dateTimeProvider.UtcNow), async (command) =>
{
switch (command.CommandName)
{
case ScheduledCommand.ProcessWorkflow:
await _queueProvider.QueueWork(command.Data, QueueType.Workflow);
break;
case ScheduledCommand.ProcessEvent:
await _queueProvider.QueueWork(command.Data, QueueType.Event);
break;
}
});
}
finally
{
await _lockProvider.ReleaseLock("poll-commands");
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, ex.Message);
}
}
}
}
22 changes: 18 additions & 4 deletions src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,25 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance

await _persistenceStore.PersistErrors(result.Errors, cancellationToken);

var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks;

if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue && workflow.NextExecution.Value < readAheadTicks)
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue)
{
new Task(() => FutureQueue(workflow, cancellationToken)).Start();
var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks;
if (workflow.NextExecution.Value < readAheadTicks)
{
new Task(() => FutureQueue(workflow, cancellationToken)).Start();
}
else
{
if (_persistenceStore.SupportsScheduledCommands)
{
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
{
CommandName = ScheduledCommand.ProcessWorkflow,
Data = workflow.Id,
ExecuteTime = workflow.NextExecution.Value
});
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class MemoryPersistenceProvider : ISingletonMemoryProvider
private readonly List<Event> _events = new List<Event>();
private readonly List<ExecutionError> _errors = new List<ExecutionError>();

public bool SupportsScheduledCommands => false;

public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
{
lock (_instances)
Expand Down Expand Up @@ -255,6 +257,16 @@ public async Task PersistErrors(IEnumerable<ExecutionError> errors, Cancellation
_errors.AddRange(errors);
}
}

public Task ScheduleCommand(ScheduledCommand command)
{
throw new NotImplementedException();
}

public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
}

#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ public class TransientMemoryPersistenceProvider : IPersistenceProvider
{
private readonly ISingletonMemoryProvider _innerService;

public bool SupportsScheduledCommands => false;

public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
{
_innerService = innerService;
Expand Down Expand Up @@ -56,5 +58,15 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry);

public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token);

public Task ScheduleCommand(ScheduledCommand command)
{
throw new NotImplementedException();
}

public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
}
}
1 change: 1 addition & 0 deletions src/WorkflowCore/WorkflowCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<PackageReference Include="System.Reflection.TypeExtensions" Version="4.7.0" />
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
<PackageReference Include="System.Linq.Queryable" Version="4.3.0" />
<InternalsVisibleTo Include="WorkflowCore.IntegrationTests" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ internal static PersistedEvent ToPersistable(this Event instance)
return result;
}

internal static PersistedScheduledCommand ToPersistable(this ScheduledCommand instance)
{
var result = new PersistedScheduledCommand();
result.CommandName = instance.CommandName;
result.Data = instance.Data;
result.ExecuteTime = instance.ExecuteTime;

return result;
}

internal static WorkflowInstance ToWorkflowInstance(this PersistedWorkflow instance)
{
WorkflowInstance result = new WorkflowInstance();
Expand Down Expand Up @@ -219,5 +229,15 @@ internal static Event ToEvent(this PersistedEvent instance)

return result;
}

internal static ScheduledCommand ToScheduledCommand(this PersistedScheduledCommand instance)
{
var result = new ScheduledCommand();
result.CommandName = instance.CommandName;
result.Data = instance.Data;
result.ExecuteTime = instance.ExecuteTime;

return result;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.ComponentModel.DataAnnotations;
using System.Linq;

namespace WorkflowCore.Persistence.EntityFramework.Models
{
public class PersistedScheduledCommand
{
[Key]
public long PersistenceId { get; set; }

[MaxLength(200)]
public string CommandName { get; set; }

[MaxLength(500)]
public string Data { get; set; }

public long ExecuteTime { get; set; }
}
}
Loading

0 comments on commit b45525a

Please sign in to comment.