diff --git a/ReleaseNotes/3.6.0.md b/ReleaseNotes/3.6.0.md new file mode 100644 index 000000000..697976a46 --- /dev/null +++ b/ReleaseNotes/3.6.0.md @@ -0,0 +1,8 @@ +# Workflow Core 3.6.0 + +## Scheduled Commands + +Introduces the ability to schedule delayed commands to process a workflow or event, by persisting them to storage. +This is the first step toward removing constant polling of the DB. It also filters out duplicate work items on the queue which is the current problem the greylist tries to solve. +Initial implementation is supported by MongoDb, SQL Server, PostgeSQL, MySQL and SQLite. +Additional support from the other persistence providers to follow. diff --git a/WorkflowCore.sln b/WorkflowCore.sln index 5893af37d..26bc01cf6 100644 --- a/WorkflowCore.sln +++ b/WorkflowCore.sln @@ -104,6 +104,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ReleaseNotes", "ReleaseNote ReleaseNotes\3.0.0.md = ReleaseNotes\3.0.0.md ReleaseNotes\3.1.0.md = ReleaseNotes\3.1.0.md ReleaseNotes\3.3.0.md = ReleaseNotes\3.3.0.md + ReleaseNotes\3.4.0.md = ReleaseNotes\3.4.0.md + ReleaseNotes\3.6.0.md = ReleaseNotes\3.6.0.md EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample14", "src\samples\WorkflowCore.Sample14\WorkflowCore.Sample14.csproj", "{6BC66637-B42A-4334-ADFB-DBEC9F29D293}" diff --git a/src/Directory.Build.props b/src/Directory.Build.props index e276b701b..7c162693d 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -4,10 +4,10 @@ https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md git https://github.com/danielgerlag/workflow-core.git - 3.5.7 - 3.5.7.0 - 3.5.7.0 + 3.6.0 + 3.6.0.0 + 3.6.0.0 https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png - 3.5.7 + 3.6.0 diff --git a/src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs b/src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs index 7bff56345..4b83f5920 100644 --- a/src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs +++ b/src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs @@ -6,7 +6,7 @@ namespace WorkflowCore.Interface { - public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository + public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository, IScheduledCommandRepository { Task PersistErrors(IEnumerable errors, CancellationToken cancellationToken = default); diff --git a/src/WorkflowCore/Interface/Persistence/IScheduledCommandRepository.cs b/src/WorkflowCore/Interface/Persistence/IScheduledCommandRepository.cs new file mode 100644 index 000000000..a86bccc42 --- /dev/null +++ b/src/WorkflowCore/Interface/Persistence/IScheduledCommandRepository.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using WorkflowCore.Models; + +namespace WorkflowCore.Interface +{ + public interface IScheduledCommandRepository + { + bool SupportsScheduledCommands { get; } + + Task ScheduleCommand(ScheduledCommand command); + + Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default); + } +} diff --git a/src/WorkflowCore/Models/ScheduledCommand.cs b/src/WorkflowCore/Models/ScheduledCommand.cs new file mode 100644 index 000000000..de3dcc5dd --- /dev/null +++ b/src/WorkflowCore/Models/ScheduledCommand.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace WorkflowCore.Models +{ + public class ScheduledCommand + { + public const string ProcessWorkflow = "ProcessWorkflow"; + public const string ProcessEvent = "ProcessEvent"; + + public string CommandName { get; set; } + public string Data { get; set; } + public long ExecuteTime { get; set; } + } +} diff --git a/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs b/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs index 9d5edc575..b40a7a208 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs @@ -1,6 +1,7 @@ using System; using System.Linq; using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.Logging; using WorkflowCore.Interface; using WorkflowCore.Models; @@ -48,6 +49,13 @@ public void Stop() /// Poll the persistence store for stashed unpublished events /// private async void PollRunnables(object target) + { + await PollWorkflows(); + await PollEvents(); + await PollCommands(); + } + + private async Task PollWorkflows() { try { @@ -55,10 +63,27 @@ private async void PollRunnables(object target) { try { - _logger.LogInformation("Polling for runnable workflows"); + _logger.LogInformation("Polling for runnable workflows"); var runnables = await _persistenceStore.GetRunnableInstances(_dateTimeProvider.Now); foreach (var item in runnables) { + if (_persistenceStore.SupportsScheduledCommands) + { + try + { + await _persistenceStore.ScheduleCommand(new ScheduledCommand() + { + CommandName = ScheduledCommand.ProcessWorkflow, + Data = item, + ExecuteTime = _dateTimeProvider.UtcNow.Ticks + }); + continue; + } + catch (Exception ex) + { + _logger.LogError(ex, ex.Message); + } + } if (_greylist.Contains($"wf:{item}")) { _logger.LogDebug($"Got greylisted workflow {item}"); @@ -79,17 +104,37 @@ private async void PollRunnables(object target) { _logger.LogError(ex, ex.Message); } + } + private async Task PollEvents() + { try { if (await _lockProvider.AcquireLock("unprocessed events", new CancellationToken())) { try { - _logger.LogInformation("Polling for unprocessed events"); + _logger.LogInformation("Polling for unprocessed events"); var events = await _persistenceStore.GetRunnableEvents(_dateTimeProvider.Now); foreach (var item in events.ToList()) { + if (_persistenceStore.SupportsScheduledCommands) + { + try + { + await _persistenceStore.ScheduleCommand(new ScheduledCommand() + { + CommandName = ScheduledCommand.ProcessEvent, + Data = item, + ExecuteTime = _dateTimeProvider.UtcNow.Ticks + }); + continue; + } + catch (Exception ex) + { + _logger.LogError(ex, ex.Message); + } + } if (_greylist.Contains($"evt:{item}")) { _logger.LogDebug($"Got greylisted event {item}"); @@ -111,5 +156,39 @@ private async void PollRunnables(object target) _logger.LogError(ex, ex.Message); } } + + private async Task PollCommands() + { + try + { + if (await _lockProvider.AcquireLock("poll-commands", new CancellationToken())) + { + try + { + _logger.LogInformation("Polling for scheduled commands"); + await _persistenceStore.ProcessCommands(new DateTimeOffset(_dateTimeProvider.UtcNow), async (command) => + { + switch (command.CommandName) + { + case ScheduledCommand.ProcessWorkflow: + await _queueProvider.QueueWork(command.Data, QueueType.Workflow); + break; + case ScheduledCommand.ProcessEvent: + await _queueProvider.QueueWork(command.Data, QueueType.Event); + break; + } + }); + } + finally + { + await _lockProvider.ReleaseLock("poll-commands"); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, ex.Message); + } + } } } diff --git a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs index 6e5c1d3c1..a387848b6 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs @@ -69,11 +69,25 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance await _persistenceStore.PersistErrors(result.Errors, cancellationToken); - var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks; - - if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue && workflow.NextExecution.Value < readAheadTicks) + if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue) { - new Task(() => FutureQueue(workflow, cancellationToken)).Start(); + var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks; + if (workflow.NextExecution.Value < readAheadTicks) + { + new Task(() => FutureQueue(workflow, cancellationToken)).Start(); + } + else + { + if (_persistenceStore.SupportsScheduledCommands) + { + await _persistenceStore.ScheduleCommand(new ScheduledCommand() + { + CommandName = ScheduledCommand.ProcessWorkflow, + Data = workflow.Id, + ExecuteTime = workflow.NextExecution.Value + }); + } + } } } } diff --git a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs index 96ea772af..e983f8fd5 100644 --- a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs @@ -24,6 +24,8 @@ public class MemoryPersistenceProvider : ISingletonMemoryProvider private readonly List _events = new List(); private readonly List _errors = new List(); + public bool SupportsScheduledCommands => false; + public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default) { lock (_instances) @@ -255,6 +257,16 @@ public async Task PersistErrors(IEnumerable errors, Cancellation _errors.AddRange(errors); } } + + public Task ScheduleCommand(ScheduledCommand command) + { + throw new NotImplementedException(); + } + + public Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } } #pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously diff --git a/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs index c2fab66e1..0a94c6f8e 100644 --- a/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs @@ -11,6 +11,8 @@ public class TransientMemoryPersistenceProvider : IPersistenceProvider { private readonly ISingletonMemoryProvider _innerService; + public bool SupportsScheduledCommands => false; + public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService) { _innerService = innerService; @@ -56,5 +58,15 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService) public Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry); public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token); + + public Task ScheduleCommand(ScheduledCommand command) + { + throw new NotImplementedException(); + } + + public Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } } } diff --git a/src/WorkflowCore/WorkflowCore.csproj b/src/WorkflowCore/WorkflowCore.csproj index 1457092bd..800ef9c08 100644 --- a/src/WorkflowCore/WorkflowCore.csproj +++ b/src/WorkflowCore/WorkflowCore.csproj @@ -26,6 +26,7 @@ + diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/ExtensionMethods.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/ExtensionMethods.cs index c978131d1..b0b2b1d99 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/ExtensionMethods.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/ExtensionMethods.cs @@ -123,6 +123,16 @@ internal static PersistedEvent ToPersistable(this Event instance) return result; } + internal static PersistedScheduledCommand ToPersistable(this ScheduledCommand instance) + { + var result = new PersistedScheduledCommand(); + result.CommandName = instance.CommandName; + result.Data = instance.Data; + result.ExecuteTime = instance.ExecuteTime; + + return result; + } + internal static WorkflowInstance ToWorkflowInstance(this PersistedWorkflow instance) { WorkflowInstance result = new WorkflowInstance(); @@ -219,5 +229,15 @@ internal static Event ToEvent(this PersistedEvent instance) return result; } + + internal static ScheduledCommand ToScheduledCommand(this PersistedScheduledCommand instance) + { + var result = new ScheduledCommand(); + result.CommandName = instance.CommandName; + result.Data = instance.Data; + result.ExecuteTime = instance.ExecuteTime; + + return result; + } } } diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Models/PersistedScheduledCommand.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Models/PersistedScheduledCommand.cs new file mode 100644 index 000000000..7dad156a3 --- /dev/null +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Models/PersistedScheduledCommand.cs @@ -0,0 +1,20 @@ +using System; +using System.ComponentModel.DataAnnotations; +using System.Linq; + +namespace WorkflowCore.Persistence.EntityFramework.Models +{ + public class PersistedScheduledCommand + { + [Key] + public long PersistenceId { get; set; } + + [MaxLength(200)] + public string CommandName { get; set; } + + [MaxLength(500)] + public string Data { get; set; } + + public long ExecuteTime { get; set; } + } +} diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs index e0c7082fd..44798a0c9 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs @@ -17,6 +17,8 @@ public class EntityFrameworkPersistenceProvider : IPersistenceProvider private readonly bool _canMigrateDB; private readonly IWorkflowDbContextFactory _contextFactory; + public bool SupportsScheduledCommands => true; + public EntityFrameworkPersistenceProvider(IWorkflowDbContextFactory contextFactory, bool canCreateDB, bool canMigrateDB) { _contextFactory = contextFactory; @@ -365,5 +367,47 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke await db.SaveChangesAsync(cancellationToken); } } + + public async Task ScheduleCommand(ScheduledCommand command) + { + try + { + using (var db = ConstructDbContext()) + { + var persistable = command.ToPersistable(); + var result = db.Set().Add(persistable); + await db.SaveChangesAsync(); + } + } + catch (DbUpdateException) + { + //log + } + } + + public async Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default) + { + using (var db = ConstructDbContext()) + { + var cursor = db.Set() + .Where(x => x.ExecuteTime < asOf.UtcDateTime.Ticks) + .AsAsyncEnumerable(); + + await foreach (var command in cursor) + { + try + { + await action(command.ToScheduledCommand()); + using var db2 = ConstructDbContext(); + db2.Set().Remove(command); + await db2.SaveChangesAsync(); + } + catch (Exception) + { + //TODO: add logger + } + } + } + } } } diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/WorkflowDbContext.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/WorkflowDbContext.cs index 7b80db3da..53e0967e7 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/WorkflowDbContext.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/WorkflowDbContext.cs @@ -14,6 +14,7 @@ public abstract class WorkflowDbContext : DbContext protected abstract void ConfigureExetensionAttributeStorage(EntityTypeBuilder builder); protected abstract void ConfigureSubscriptionStorage(EntityTypeBuilder builder); protected abstract void ConfigureEventStorage(EntityTypeBuilder builder); + protected abstract void ConfigureScheduledCommandStorage(EntityTypeBuilder builder); protected override void OnModelCreating(ModelBuilder modelBuilder) { @@ -38,12 +39,17 @@ protected override void OnModelCreating(ModelBuilder modelBuilder) events.HasIndex(x => x.EventTime); events.HasIndex(x => x.IsProcessed); + var commands = modelBuilder.Entity(); + commands.HasIndex(x => x.ExecuteTime); + commands.HasIndex(x => new { x.CommandName, x.Data}).IsUnique(); + ConfigureWorkflowStorage(workflows); ConfigureExecutionPointerStorage(executionPointers); ConfigureExecutionErrorStorage(executionErrors); ConfigureExetensionAttributeStorage(extensionAttributes); ConfigureSubscriptionStorage(subscriptions); ConfigureEventStorage(events); + ConfigureScheduledCommandStorage(commands); } protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/WorkflowCore.Persistence.EntityFramework.csproj b/src/providers/WorkflowCore.Persistence.EntityFramework/WorkflowCore.Persistence.EntityFramework.csproj index 39627c7c9..6e1206632 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/WorkflowCore.Persistence.EntityFramework.csproj +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/WorkflowCore.Persistence.EntityFramework.csproj @@ -3,7 +3,7 @@ Workflow Core EntityFramework Core Persistence Provider Daniel Gerlag - netstandard2.0 + netstandard2.1 WorkflowCore.Persistence.EntityFramework WorkflowCore.Persistence.EntityFramework workflow;.NET;Core;state machine;WorkflowCore;EntityFramework;EntityFrameworkCore diff --git a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs index 79e9fe5b7..ac65eb39b 100644 --- a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs @@ -83,6 +83,8 @@ static MongoPersistenceProvider() BsonClassMap.RegisterClassMap(x => x.AutoMap()); BsonClassMap.RegisterClassMap(x => x.AutoMap()); BsonClassMap.RegisterClassMap(x => x.AutoMap()); + BsonClassMap.RegisterClassMap(x => x.AutoMap()) + .SetIgnoreExtraElements(true); } static bool indexesCreated = false; @@ -111,6 +113,17 @@ static void CreateIndexes(MongoPersistenceProvider instance) .Ascending(x => x.EventKey), new CreateIndexOptions { Background = true, Name = "idx_namekey" })); + instance.ScheduledCommands.Indexes.CreateOne(new CreateIndexModel( + Builders.IndexKeys + .Descending(x => x.ExecuteTime), + new CreateIndexOptions { Background = true, Name = "idx_exectime" })); + + instance.ScheduledCommands.Indexes.CreateOne(new CreateIndexModel( + Builders.IndexKeys + .Ascending(x => x.CommandName) + .Ascending(x => x.Data), + new CreateIndexOptions { Background = true, Unique = true, Name = "idx_key" })); + indexesCreated = true; } } @@ -123,6 +136,8 @@ static void CreateIndexes(MongoPersistenceProvider instance) private IMongoCollection ExecutionErrors => _database.GetCollection("wfc.execution_errors"); + private IMongoCollection ScheduledCommands => _database.GetCollection("wfc.scheduled_commands"); + public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default) { await WorkflowInstances.InsertOneAsync(workflow, cancellationToken: cancellationToken); @@ -291,5 +306,41 @@ public async Task PersistErrors(IEnumerable errors, Cancellation if (errors.Any()) await ExecutionErrors.InsertManyAsync(errors, cancellationToken: cancellationToken); } + + public bool SupportsScheduledCommands => true; + + public async Task ScheduleCommand(ScheduledCommand command) + { + try + { + await ScheduledCommands.InsertOneAsync(command); + } + catch (MongoBulkWriteException ex) + { + if (ex.WriteErrors.All(x => x.Category == ServerErrorCategory.DuplicateKey)) + return; + throw; + } + } + + public async Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default) + { + var cursor = await ScheduledCommands.FindAsync(x => x.ExecuteTime < asOf.UtcDateTime.Ticks); + while (await cursor.MoveNextAsync(cancellationToken)) + { + foreach (var command in cursor.Current) + { + try + { + await action(command); + await ScheduledCommands.DeleteOneAsync(x => x.CommandName == command.CommandName && x.Data == command.Data); + } + catch (Exception) + { + //TODO: add logger + } + } + } + } } } diff --git a/src/providers/WorkflowCore.Persistence.MySQL/Migrations/20211023161949_scheduled-commands.Designer.cs b/src/providers/WorkflowCore.Persistence.MySQL/Migrations/20211023161949_scheduled-commands.Designer.cs new file mode 100644 index 000000000..94222e8d5 --- /dev/null +++ b/src/providers/WorkflowCore.Persistence.MySQL/Migrations/20211023161949_scheduled-commands.Designer.cs @@ -0,0 +1,357 @@ +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using WorkflowCore.Persistence.MySQL; + +namespace WorkflowCore.Persistence.MySQL.Migrations +{ + [DbContext(typeof(MysqlContext))] + [Migration("20211023161949_scheduled-commands")] + partial class scheduledcommands + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("Relational:MaxIdentifierLength", 64) + .HasAnnotation("ProductVersion", "5.0.8"); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedEvent", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint"); + + b.Property("EventData") + .HasColumnType("longtext"); + + b.Property("EventId") + .HasColumnType("char(36)"); + + b.Property("EventKey") + .HasMaxLength(200) + .HasColumnType("varchar(200)"); + + b.Property("EventName") + .HasMaxLength(200) + .HasColumnType("varchar(200)"); + + b.Property("EventTime") + .HasColumnType("datetime(6)"); + + b.Property("IsProcessed") + .HasColumnType("tinyint(1)"); + + b.HasKey("PersistenceId"); + + b.HasIndex("EventId") + .IsUnique(); + + b.HasIndex("EventTime"); + + b.HasIndex("IsProcessed"); + + b.HasIndex("EventName", "EventKey"); + + b.ToTable("Event"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionError", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint"); + + b.Property("ErrorTime") + .HasColumnType("datetime(6)"); + + b.Property("ExecutionPointerId") + .HasMaxLength(100) + .HasColumnType("varchar(100)"); + + b.Property("Message") + .HasColumnType("longtext"); + + b.Property("WorkflowId") + .HasMaxLength(100) + .HasColumnType("varchar(100)"); + + b.HasKey("PersistenceId"); + + b.ToTable("ExecutionError"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint"); + + b.Property("Active") + .HasColumnType("tinyint(1)"); + + b.Property("Children") + .HasColumnType("longtext"); + + b.Property("ContextItem") + .HasColumnType("longtext"); + + b.Property("EndTime") + .HasColumnType("datetime(6)"); + + b.Property("EventData") + .HasColumnType("longtext"); + + b.Property("EventKey") + .HasMaxLength(100) + .HasColumnType("varchar(100)"); + + b.Property("EventName") + .HasMaxLength(100) + .HasColumnType("varchar(100)"); + + b.Property("EventPublished") + .HasColumnType("tinyint(1)"); + + b.Property("Id") + .HasMaxLength(50) + .HasColumnType("varchar(50)"); + + b.Property("Outcome") + .HasColumnType("longtext"); + + b.Property("PersistenceData") + .HasColumnType("longtext"); + + b.Property("PredecessorId") + .HasMaxLength(100) + .HasColumnType("varchar(100)"); + + b.Property("RetryCount") + .HasColumnType("int"); + + b.Property("Scope") + .HasColumnType("longtext"); + + b.Property("SleepUntil") + .HasColumnType("datetime(6)"); + + b.Property("StartTime") + .HasColumnType("datetime(6)"); + + b.Property("Status") + .HasColumnType("int"); + + b.Property("StepId") + .HasColumnType("int"); + + b.Property("StepName") + .HasMaxLength(100) + .HasColumnType("varchar(100)"); + + b.Property("WorkflowId") + .HasColumnType("bigint"); + + b.HasKey("PersistenceId"); + + b.HasIndex("WorkflowId"); + + b.ToTable("ExecutionPointer"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint"); + + b.Property("AttributeKey") + .HasMaxLength(100) + .HasColumnType("varchar(100)"); + + b.Property("AttributeValue") + .HasColumnType("longtext"); + + b.Property("ExecutionPointerId") + .HasColumnType("bigint"); + + b.HasKey("PersistenceId"); + + b.HasIndex("ExecutionPointerId"); + + b.ToTable("ExtensionAttribute"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedScheduledCommand", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint"); + + b.Property("CommandName") + .HasMaxLength(200) + .HasColumnType("varchar(200)"); + + b.Property("Data") + .HasMaxLength(500) + .HasColumnType("varchar(500)"); + + b.Property("ExecuteTime") + .HasColumnType("bigint"); + + b.HasKey("PersistenceId"); + + b.HasIndex("ExecuteTime"); + + b.HasIndex("CommandName", "Data") + .IsUnique(); + + b.ToTable("ScheduledCommand"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedSubscription", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint"); + + b.Property("EventKey") + .HasMaxLength(200) + .HasColumnType("varchar(200)"); + + b.Property("EventName") + .HasMaxLength(200) + .HasColumnType("varchar(200)"); + + b.Property("ExecutionPointerId") + .HasMaxLength(200) + .HasColumnType("varchar(200)"); + + b.Property("ExternalToken") + .HasMaxLength(200) + .HasColumnType("varchar(200)"); + + b.Property("ExternalTokenExpiry") + .HasColumnType("datetime(6)"); + + b.Property("ExternalWorkerId") + .HasMaxLength(200) + .HasColumnType("varchar(200)"); + + b.Property("StepId") + .HasColumnType("int"); + + b.Property("SubscribeAsOf") + .HasColumnType("datetime(6)"); + + b.Property("SubscriptionData") + .HasColumnType("longtext"); + + b.Property("SubscriptionId") + .HasMaxLength(200) + .HasColumnType("char(200)"); + + b.Property("WorkflowId") + .HasMaxLength(200) + .HasColumnType("varchar(200)"); + + b.HasKey("PersistenceId"); + + b.HasIndex("EventKey"); + + b.HasIndex("EventName"); + + b.HasIndex("SubscriptionId") + .IsUnique(); + + b.ToTable("Subscription"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint"); + + b.Property("CompleteTime") + .HasColumnType("datetime(6)"); + + b.Property("CreateTime") + .HasColumnType("datetime(6)"); + + b.Property("Data") + .HasColumnType("longtext"); + + b.Property("Description") + .HasMaxLength(500) + .HasColumnType("varchar(500)"); + + b.Property("InstanceId") + .HasMaxLength(200) + .HasColumnType("char(200)"); + + b.Property("NextExecution") + .HasColumnType("bigint"); + + b.Property("Reference") + .HasMaxLength(200) + .HasColumnType("varchar(200)"); + + b.Property("Status") + .HasColumnType("int"); + + b.Property("Version") + .HasColumnType("int"); + + b.Property("WorkflowDefinitionId") + .HasMaxLength(200) + .HasColumnType("varchar(200)"); + + b.HasKey("PersistenceId"); + + b.HasIndex("InstanceId") + .IsUnique(); + + b.HasIndex("NextExecution"); + + b.ToTable("Workflow"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.HasOne("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", "Workflow") + .WithMany("ExecutionPointers") + .HasForeignKey("WorkflowId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Workflow"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => + { + b.HasOne("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", "ExecutionPointer") + .WithMany("ExtensionAttributes") + .HasForeignKey("ExecutionPointerId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("ExecutionPointer"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.Navigation("ExtensionAttributes"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", b => + { + b.Navigation("ExecutionPointers"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/providers/WorkflowCore.Persistence.MySQL/Migrations/20211023161949_scheduled-commands.cs b/src/providers/WorkflowCore.Persistence.MySQL/Migrations/20211023161949_scheduled-commands.cs new file mode 100644 index 000000000..b06303070 --- /dev/null +++ b/src/providers/WorkflowCore.Persistence.MySQL/Migrations/20211023161949_scheduled-commands.cs @@ -0,0 +1,337 @@ +using System; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; + +namespace WorkflowCore.Persistence.MySQL.Migrations +{ + public partial class scheduledcommands : Migration + { + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.AlterColumn( + name: "InstanceId", + table: "Workflow", + type: "char(200)", + maxLength: 200, + nullable: false, + collation: "ascii_general_ci", + oldClrType: typeof(string), + oldType: "char(200)", + oldMaxLength: 200) + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "Data", + table: "Workflow", + type: "longtext", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext CHARACTER SET utf8mb4", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "SubscriptionId", + table: "Subscription", + type: "char(200)", + maxLength: 200, + nullable: false, + collation: "ascii_general_ci", + oldClrType: typeof(string), + oldType: "char(200)", + oldMaxLength: 200) + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "SubscriptionData", + table: "Subscription", + type: "longtext", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext CHARACTER SET utf8mb4", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "AttributeValue", + table: "ExtensionAttribute", + type: "longtext", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext CHARACTER SET utf8mb4", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "Scope", + table: "ExecutionPointer", + type: "longtext", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext CHARACTER SET utf8mb4", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "PersistenceData", + table: "ExecutionPointer", + type: "longtext", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext CHARACTER SET utf8mb4", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "Outcome", + table: "ExecutionPointer", + type: "longtext", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext CHARACTER SET utf8mb4", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "EventData", + table: "ExecutionPointer", + type: "longtext", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext CHARACTER SET utf8mb4", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "ContextItem", + table: "ExecutionPointer", + type: "longtext", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext CHARACTER SET utf8mb4", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "Children", + table: "ExecutionPointer", + type: "longtext", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext CHARACTER SET utf8mb4", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "Message", + table: "ExecutionError", + type: "longtext", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext CHARACTER SET utf8mb4", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "EventData", + table: "Event", + type: "longtext", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext CHARACTER SET utf8mb4", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.CreateTable( + name: "ScheduledCommand", + columns: table => new + { + PersistenceId = table.Column(type: "bigint", nullable: false) + .Annotation("MySql:ValueGenerationStrategy", MySqlValueGenerationStrategy.IdentityColumn), + CommandName = table.Column(type: "varchar(200)", maxLength: 200, nullable: true) + .Annotation("MySql:CharSet", "utf8mb4"), + Data = table.Column(type: "varchar(500)", maxLength: 500, nullable: true) + .Annotation("MySql:CharSet", "utf8mb4"), + ExecuteTime = table.Column(type: "bigint", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_ScheduledCommand", x => x.PersistenceId); + }) + .Annotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.CreateIndex( + name: "IX_ScheduledCommand_CommandName_Data", + table: "ScheduledCommand", + columns: new[] { "CommandName", "Data" }, + unique: true); + + migrationBuilder.CreateIndex( + name: "IX_ScheduledCommand_ExecuteTime", + table: "ScheduledCommand", + column: "ExecuteTime"); + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "ScheduledCommand"); + + migrationBuilder.AlterColumn( + name: "InstanceId", + table: "Workflow", + type: "char(200)", + maxLength: 200, + nullable: false, + oldClrType: typeof(Guid), + oldType: "char(200)", + oldMaxLength: 200) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("Relational:Collation", "ascii_general_ci"); + + migrationBuilder.AlterColumn( + name: "Data", + table: "Workflow", + type: "longtext CHARACTER SET utf8mb4", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "SubscriptionId", + table: "Subscription", + type: "char(200)", + maxLength: 200, + nullable: false, + oldClrType: typeof(Guid), + oldType: "char(200)", + oldMaxLength: 200) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("Relational:Collation", "ascii_general_ci"); + + migrationBuilder.AlterColumn( + name: "SubscriptionData", + table: "Subscription", + type: "longtext CHARACTER SET utf8mb4", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "AttributeValue", + table: "ExtensionAttribute", + type: "longtext CHARACTER SET utf8mb4", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "Scope", + table: "ExecutionPointer", + type: "longtext CHARACTER SET utf8mb4", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "PersistenceData", + table: "ExecutionPointer", + type: "longtext CHARACTER SET utf8mb4", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "Outcome", + table: "ExecutionPointer", + type: "longtext CHARACTER SET utf8mb4", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "EventData", + table: "ExecutionPointer", + type: "longtext CHARACTER SET utf8mb4", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "ContextItem", + table: "ExecutionPointer", + type: "longtext CHARACTER SET utf8mb4", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "Children", + table: "ExecutionPointer", + type: "longtext CHARACTER SET utf8mb4", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "Message", + table: "ExecutionError", + type: "longtext CHARACTER SET utf8mb4", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + + migrationBuilder.AlterColumn( + name: "EventData", + table: "Event", + type: "longtext CHARACTER SET utf8mb4", + nullable: true, + oldClrType: typeof(string), + oldType: "longtext", + oldNullable: true) + .Annotation("MySql:CharSet", "utf8mb4") + .OldAnnotation("MySql:CharSet", "utf8mb4"); + } + } +} diff --git a/src/providers/WorkflowCore.Persistence.MySQL/Migrations/MysqlPersistenceProviderModelSnapshot.cs b/src/providers/WorkflowCore.Persistence.MySQL/Migrations/MysqlPersistenceProviderModelSnapshot.cs index c72c16199..8a3e02e9b 100644 --- a/src/providers/WorkflowCore.Persistence.MySQL/Migrations/MysqlPersistenceProviderModelSnapshot.cs +++ b/src/providers/WorkflowCore.Persistence.MySQL/Migrations/MysqlPersistenceProviderModelSnapshot.cs @@ -2,6 +2,8 @@ using System; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using WorkflowCore.Persistence.MySQL; namespace WorkflowCore.Persistence.MySQL.Migrations { @@ -12,8 +14,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) { #pragma warning disable 612, 618 modelBuilder - .HasAnnotation("ProductVersion", "3.1.2") - .HasAnnotation("Relational:MaxIdentifierLength", 64); + .HasAnnotation("Relational:MaxIdentifierLength", 64) + .HasAnnotation("ProductVersion", "5.0.8"); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedEvent", b => { @@ -22,18 +24,18 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("bigint"); b.Property("EventData") - .HasColumnType("longtext CHARACTER SET utf8mb4"); + .HasColumnType("longtext"); b.Property("EventId") .HasColumnType("char(36)"); b.Property("EventKey") - .HasColumnType("varchar(200) CHARACTER SET utf8mb4") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("varchar(200)"); b.Property("EventName") - .HasColumnType("varchar(200) CHARACTER SET utf8mb4") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("varchar(200)"); b.Property("EventTime") .HasColumnType("datetime(6)"); @@ -65,15 +67,15 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("datetime(6)"); b.Property("ExecutionPointerId") - .HasColumnType("varchar(100) CHARACTER SET utf8mb4") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("varchar(100)"); b.Property("Message") - .HasColumnType("longtext CHARACTER SET utf8mb4"); + .HasColumnType("longtext"); b.Property("WorkflowId") - .HasColumnType("varchar(100) CHARACTER SET utf8mb4") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("varchar(100)"); b.HasKey("PersistenceId"); @@ -90,47 +92,47 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("tinyint(1)"); b.Property("Children") - .HasColumnType("longtext CHARACTER SET utf8mb4"); + .HasColumnType("longtext"); b.Property("ContextItem") - .HasColumnType("longtext CHARACTER SET utf8mb4"); + .HasColumnType("longtext"); b.Property("EndTime") .HasColumnType("datetime(6)"); b.Property("EventData") - .HasColumnType("longtext CHARACTER SET utf8mb4"); + .HasColumnType("longtext"); b.Property("EventKey") - .HasColumnType("varchar(100) CHARACTER SET utf8mb4") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("varchar(100)"); b.Property("EventName") - .HasColumnType("varchar(100) CHARACTER SET utf8mb4") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("varchar(100)"); b.Property("EventPublished") .HasColumnType("tinyint(1)"); b.Property("Id") - .HasColumnType("varchar(50) CHARACTER SET utf8mb4") - .HasMaxLength(50); + .HasMaxLength(50) + .HasColumnType("varchar(50)"); b.Property("Outcome") - .HasColumnType("longtext CHARACTER SET utf8mb4"); + .HasColumnType("longtext"); b.Property("PersistenceData") - .HasColumnType("longtext CHARACTER SET utf8mb4"); + .HasColumnType("longtext"); b.Property("PredecessorId") - .HasColumnType("varchar(100) CHARACTER SET utf8mb4") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("varchar(100)"); b.Property("RetryCount") .HasColumnType("int"); b.Property("Scope") - .HasColumnType("longtext CHARACTER SET utf8mb4"); + .HasColumnType("longtext"); b.Property("SleepUntil") .HasColumnType("datetime(6)"); @@ -145,8 +147,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("int"); b.Property("StepName") - .HasColumnType("varchar(100) CHARACTER SET utf8mb4") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("varchar(100)"); b.Property("WorkflowId") .HasColumnType("bigint"); @@ -165,11 +167,11 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("bigint"); b.Property("AttributeKey") - .HasColumnType("varchar(100) CHARACTER SET utf8mb4") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("varchar(100)"); b.Property("AttributeValue") - .HasColumnType("longtext CHARACTER SET utf8mb4"); + .HasColumnType("longtext"); b.Property("ExecutionPointerId") .HasColumnType("bigint"); @@ -181,6 +183,33 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.ToTable("ExtensionAttribute"); }); + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedScheduledCommand", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint"); + + b.Property("CommandName") + .HasMaxLength(200) + .HasColumnType("varchar(200)"); + + b.Property("Data") + .HasMaxLength(500) + .HasColumnType("varchar(500)"); + + b.Property("ExecuteTime") + .HasColumnType("bigint"); + + b.HasKey("PersistenceId"); + + b.HasIndex("ExecuteTime"); + + b.HasIndex("CommandName", "Data") + .IsUnique(); + + b.ToTable("ScheduledCommand"); + }); + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedSubscription", b => { b.Property("PersistenceId") @@ -188,27 +217,27 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("bigint"); b.Property("EventKey") - .HasColumnType("varchar(200) CHARACTER SET utf8mb4") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("varchar(200)"); b.Property("EventName") - .HasColumnType("varchar(200) CHARACTER SET utf8mb4") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("varchar(200)"); b.Property("ExecutionPointerId") - .HasColumnType("varchar(200) CHARACTER SET utf8mb4") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("varchar(200)"); b.Property("ExternalToken") - .HasColumnType("varchar(200) CHARACTER SET utf8mb4") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("varchar(200)"); b.Property("ExternalTokenExpiry") .HasColumnType("datetime(6)"); b.Property("ExternalWorkerId") - .HasColumnType("varchar(200) CHARACTER SET utf8mb4") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("varchar(200)"); b.Property("StepId") .HasColumnType("int"); @@ -217,15 +246,15 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("datetime(6)"); b.Property("SubscriptionData") - .HasColumnType("longtext CHARACTER SET utf8mb4"); + .HasColumnType("longtext"); b.Property("SubscriptionId") - .HasColumnType("char(200)") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("char(200)"); b.Property("WorkflowId") - .HasColumnType("varchar(200) CHARACTER SET utf8mb4") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("varchar(200)"); b.HasKey("PersistenceId"); @@ -252,22 +281,22 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("datetime(6)"); b.Property("Data") - .HasColumnType("longtext CHARACTER SET utf8mb4"); + .HasColumnType("longtext"); b.Property("Description") - .HasColumnType("varchar(500) CHARACTER SET utf8mb4") - .HasMaxLength(500); + .HasMaxLength(500) + .HasColumnType("varchar(500)"); b.Property("InstanceId") - .HasColumnType("char(200)") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("char(200)"); b.Property("NextExecution") .HasColumnType("bigint"); b.Property("Reference") - .HasColumnType("varchar(200) CHARACTER SET utf8mb4") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("varchar(200)"); b.Property("Status") .HasColumnType("int"); @@ -276,8 +305,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("int"); b.Property("WorkflowDefinitionId") - .HasColumnType("varchar(200) CHARACTER SET utf8mb4") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("varchar(200)"); b.HasKey("PersistenceId"); @@ -296,6 +325,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasForeignKey("WorkflowId") .OnDelete(DeleteBehavior.Cascade) .IsRequired(); + + b.Navigation("Workflow"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => @@ -305,6 +336,18 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasForeignKey("ExecutionPointerId") .OnDelete(DeleteBehavior.Cascade) .IsRequired(); + + b.Navigation("ExecutionPointer"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.Navigation("ExtensionAttributes"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", b => + { + b.Navigation("ExecutionPointers"); }); #pragma warning restore 612, 618 } diff --git a/src/providers/WorkflowCore.Persistence.MySQL/MysqlContext.cs b/src/providers/WorkflowCore.Persistence.MySQL/MysqlContext.cs index a6ebba9c2..dc49617fd 100644 --- a/src/providers/WorkflowCore.Persistence.MySQL/MysqlContext.cs +++ b/src/providers/WorkflowCore.Persistence.MySQL/MysqlContext.cs @@ -63,5 +63,11 @@ protected override void ConfigureEventStorage(EntityTypeBuilder builder.ToTable("Event"); builder.Property(x => x.PersistenceId).ValueGeneratedOnAdd(); } + + protected override void ConfigureScheduledCommandStorage(EntityTypeBuilder builder) + { + builder.ToTable("ScheduledCommand"); + builder.Property(x => x.PersistenceId).ValueGeneratedOnAdd(); + } } } diff --git a/src/providers/WorkflowCore.Persistence.MySQL/WorkflowCore.Persistence.MySQL.csproj b/src/providers/WorkflowCore.Persistence.MySQL/WorkflowCore.Persistence.MySQL.csproj index e0e83315b..1b2fa9f6b 100644 --- a/src/providers/WorkflowCore.Persistence.MySQL/WorkflowCore.Persistence.MySQL.csproj +++ b/src/providers/WorkflowCore.Persistence.MySQL/WorkflowCore.Persistence.MySQL.csproj @@ -4,7 +4,7 @@ Workflow Core MySQL Persistence Provider 1.0.0 Daniel Gerlag - netstandard2.0;netstandard2.1 + netstandard2.1 WorkflowCore.Persistence.MySQL WorkflowCore.Persistence.MySQL workflow;.NET;Core;state machine;WorkflowCore;MySQL diff --git a/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/20211023161649_scheduled-commands.Designer.cs b/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/20211023161649_scheduled-commands.Designer.cs new file mode 100644 index 000000000..0608d5e41 --- /dev/null +++ b/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/20211023161649_scheduled-commands.Designer.cs @@ -0,0 +1,366 @@ +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; +using WorkflowCore.Persistence.PostgreSQL; + +namespace WorkflowCore.Persistence.PostgreSQL.Migrations +{ + [DbContext(typeof(PostgresContext))] + [Migration("20211023161649_scheduled-commands")] + partial class scheduledcommands + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("Relational:MaxIdentifierLength", 63) + .HasAnnotation("ProductVersion", "5.0.8") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedEvent", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn); + + b.Property("EventData") + .HasColumnType("text"); + + b.Property("EventId") + .HasColumnType("uuid"); + + b.Property("EventKey") + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("EventName") + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("EventTime") + .HasColumnType("timestamp without time zone"); + + b.Property("IsProcessed") + .HasColumnType("boolean"); + + b.HasKey("PersistenceId"); + + b.HasIndex("EventId") + .IsUnique(); + + b.HasIndex("EventTime"); + + b.HasIndex("IsProcessed"); + + b.HasIndex("EventName", "EventKey"); + + b.ToTable("Event", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionError", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn); + + b.Property("ErrorTime") + .HasColumnType("timestamp without time zone"); + + b.Property("ExecutionPointerId") + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("Message") + .HasColumnType("text"); + + b.Property("WorkflowId") + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.HasKey("PersistenceId"); + + b.ToTable("ExecutionError", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn); + + b.Property("Active") + .HasColumnType("boolean"); + + b.Property("Children") + .HasColumnType("text"); + + b.Property("ContextItem") + .HasColumnType("text"); + + b.Property("EndTime") + .HasColumnType("timestamp without time zone"); + + b.Property("EventData") + .HasColumnType("text"); + + b.Property("EventKey") + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("EventName") + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("EventPublished") + .HasColumnType("boolean"); + + b.Property("Id") + .HasMaxLength(50) + .HasColumnType("character varying(50)"); + + b.Property("Outcome") + .HasColumnType("text"); + + b.Property("PersistenceData") + .HasColumnType("text"); + + b.Property("PredecessorId") + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("RetryCount") + .HasColumnType("integer"); + + b.Property("Scope") + .HasColumnType("text"); + + b.Property("SleepUntil") + .HasColumnType("timestamp without time zone"); + + b.Property("StartTime") + .HasColumnType("timestamp without time zone"); + + b.Property("Status") + .HasColumnType("integer"); + + b.Property("StepId") + .HasColumnType("integer"); + + b.Property("StepName") + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("WorkflowId") + .HasColumnType("bigint"); + + b.HasKey("PersistenceId"); + + b.HasIndex("WorkflowId"); + + b.ToTable("ExecutionPointer", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn); + + b.Property("AttributeKey") + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("AttributeValue") + .HasColumnType("text"); + + b.Property("ExecutionPointerId") + .HasColumnType("bigint"); + + b.HasKey("PersistenceId"); + + b.HasIndex("ExecutionPointerId"); + + b.ToTable("ExtensionAttribute", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedScheduledCommand", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn); + + b.Property("CommandName") + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("Data") + .HasMaxLength(500) + .HasColumnType("character varying(500)"); + + b.Property("ExecuteTime") + .HasColumnType("bigint"); + + b.HasKey("PersistenceId"); + + b.HasIndex("ExecuteTime"); + + b.HasIndex("CommandName", "Data") + .IsUnique(); + + b.ToTable("ScheduledCommand", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedSubscription", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn); + + b.Property("EventKey") + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("EventName") + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("ExecutionPointerId") + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("ExternalToken") + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("ExternalTokenExpiry") + .HasColumnType("timestamp without time zone"); + + b.Property("ExternalWorkerId") + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("StepId") + .HasColumnType("integer"); + + b.Property("SubscribeAsOf") + .HasColumnType("timestamp without time zone"); + + b.Property("SubscriptionData") + .HasColumnType("text"); + + b.Property("SubscriptionId") + .HasMaxLength(200) + .HasColumnType("uuid"); + + b.Property("WorkflowId") + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.HasKey("PersistenceId"); + + b.HasIndex("EventKey"); + + b.HasIndex("EventName"); + + b.HasIndex("SubscriptionId") + .IsUnique(); + + b.ToTable("Subscription", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn); + + b.Property("CompleteTime") + .HasColumnType("timestamp without time zone"); + + b.Property("CreateTime") + .HasColumnType("timestamp without time zone"); + + b.Property("Data") + .HasColumnType("text"); + + b.Property("Description") + .HasMaxLength(500) + .HasColumnType("character varying(500)"); + + b.Property("InstanceId") + .HasMaxLength(200) + .HasColumnType("uuid"); + + b.Property("NextExecution") + .HasColumnType("bigint"); + + b.Property("Reference") + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("Status") + .HasColumnType("integer"); + + b.Property("Version") + .HasColumnType("integer"); + + b.Property("WorkflowDefinitionId") + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.HasKey("PersistenceId"); + + b.HasIndex("InstanceId") + .IsUnique(); + + b.HasIndex("NextExecution"); + + b.ToTable("Workflow", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.HasOne("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", "Workflow") + .WithMany("ExecutionPointers") + .HasForeignKey("WorkflowId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Workflow"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => + { + b.HasOne("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", "ExecutionPointer") + .WithMany("ExtensionAttributes") + .HasForeignKey("ExecutionPointerId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("ExecutionPointer"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.Navigation("ExtensionAttributes"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", b => + { + b.Navigation("ExecutionPointers"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/20211023161649_scheduled-commands.cs b/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/20211023161649_scheduled-commands.cs new file mode 100644 index 000000000..0f1302181 --- /dev/null +++ b/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/20211023161649_scheduled-commands.cs @@ -0,0 +1,47 @@ +using Microsoft.EntityFrameworkCore.Migrations; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; + +namespace WorkflowCore.Persistence.PostgreSQL.Migrations +{ + public partial class scheduledcommands : Migration + { + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.CreateTable( + name: "ScheduledCommand", + schema: "wfc", + columns: table => new + { + PersistenceId = table.Column(type: "bigint", nullable: false) + .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn), + CommandName = table.Column(type: "character varying(200)", maxLength: 200, nullable: true), + Data = table.Column(type: "character varying(500)", maxLength: 500, nullable: true), + ExecuteTime = table.Column(type: "bigint", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_ScheduledCommand", x => x.PersistenceId); + }); + + migrationBuilder.CreateIndex( + name: "IX_ScheduledCommand_CommandName_Data", + schema: "wfc", + table: "ScheduledCommand", + columns: new[] { "CommandName", "Data" }, + unique: true); + + migrationBuilder.CreateIndex( + name: "IX_ScheduledCommand_ExecuteTime", + schema: "wfc", + table: "ScheduledCommand", + column: "ExecuteTime"); + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "ScheduledCommand", + schema: "wfc"); + } + } +} diff --git a/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/PostgresPersistenceProviderModelSnapshot.cs b/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/PostgresPersistenceProviderModelSnapshot.cs index 6def22497..d0278e0c8 100644 --- a/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/PostgresPersistenceProviderModelSnapshot.cs +++ b/src/providers/WorkflowCore.Persistence.PostgreSQL/Migrations/PostgresPersistenceProviderModelSnapshot.cs @@ -2,7 +2,9 @@ using System; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; +using WorkflowCore.Persistence.PostgreSQL; namespace WorkflowCore.Persistence.PostgreSQL.Migrations { @@ -13,9 +15,9 @@ protected override void BuildModel(ModelBuilder modelBuilder) { #pragma warning disable 612, 618 modelBuilder - .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn) - .HasAnnotation("ProductVersion", "3.1.0") - .HasAnnotation("Relational:MaxIdentifierLength", 63); + .HasAnnotation("Relational:MaxIdentifierLength", 63) + .HasAnnotation("ProductVersion", "5.0.8") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedEvent", b => { @@ -31,12 +33,12 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("uuid"); b.Property("EventKey") - .HasColumnType("character varying(200)") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("character varying(200)"); b.Property("EventName") - .HasColumnType("character varying(200)") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("character varying(200)"); b.Property("EventTime") .HasColumnType("timestamp without time zone"); @@ -55,7 +57,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("EventName", "EventKey"); - b.ToTable("Event","wfc"); + b.ToTable("Event", "wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionError", b => @@ -69,19 +71,19 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("timestamp without time zone"); b.Property("ExecutionPointerId") - .HasColumnType("character varying(100)") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("character varying(100)"); b.Property("Message") .HasColumnType("text"); b.Property("WorkflowId") - .HasColumnType("character varying(100)") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("character varying(100)"); b.HasKey("PersistenceId"); - b.ToTable("ExecutionError","wfc"); + b.ToTable("ExecutionError", "wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => @@ -107,19 +109,19 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("text"); b.Property("EventKey") - .HasColumnType("character varying(100)") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("character varying(100)"); b.Property("EventName") - .HasColumnType("character varying(100)") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("character varying(100)"); b.Property("EventPublished") .HasColumnType("boolean"); b.Property("Id") - .HasColumnType("character varying(50)") - .HasMaxLength(50); + .HasMaxLength(50) + .HasColumnType("character varying(50)"); b.Property("Outcome") .HasColumnType("text"); @@ -128,8 +130,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("text"); b.Property("PredecessorId") - .HasColumnType("character varying(100)") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("character varying(100)"); b.Property("RetryCount") .HasColumnType("integer"); @@ -150,8 +152,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("integer"); b.Property("StepName") - .HasColumnType("character varying(100)") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("character varying(100)"); b.Property("WorkflowId") .HasColumnType("bigint"); @@ -160,7 +162,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("WorkflowId"); - b.ToTable("ExecutionPointer","wfc"); + b.ToTable("ExecutionPointer", "wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => @@ -171,8 +173,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn); b.Property("AttributeKey") - .HasColumnType("character varying(100)") - .HasMaxLength(100); + .HasMaxLength(100) + .HasColumnType("character varying(100)"); b.Property("AttributeValue") .HasColumnType("text"); @@ -184,7 +186,35 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("ExecutionPointerId"); - b.ToTable("ExtensionAttribute","wfc"); + b.ToTable("ExtensionAttribute", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedScheduledCommand", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn); + + b.Property("CommandName") + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("Data") + .HasMaxLength(500) + .HasColumnType("character varying(500)"); + + b.Property("ExecuteTime") + .HasColumnType("bigint"); + + b.HasKey("PersistenceId"); + + b.HasIndex("ExecuteTime"); + + b.HasIndex("CommandName", "Data") + .IsUnique(); + + b.ToTable("ScheduledCommand", "wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedSubscription", b => @@ -195,27 +225,27 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasAnnotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn); b.Property("EventKey") - .HasColumnType("character varying(200)") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("character varying(200)"); b.Property("EventName") - .HasColumnType("character varying(200)") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("character varying(200)"); b.Property("ExecutionPointerId") - .HasColumnType("character varying(200)") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("character varying(200)"); b.Property("ExternalToken") - .HasColumnType("character varying(200)") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("character varying(200)"); b.Property("ExternalTokenExpiry") .HasColumnType("timestamp without time zone"); b.Property("ExternalWorkerId") - .HasColumnType("character varying(200)") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("character varying(200)"); b.Property("StepId") .HasColumnType("integer"); @@ -227,12 +257,12 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("text"); b.Property("SubscriptionId") - .HasColumnType("uuid") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("uuid"); b.Property("WorkflowId") - .HasColumnType("character varying(200)") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("character varying(200)"); b.HasKey("PersistenceId"); @@ -243,7 +273,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("SubscriptionId") .IsUnique(); - b.ToTable("Subscription","wfc"); + b.ToTable("Subscription", "wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", b => @@ -263,19 +293,19 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("text"); b.Property("Description") - .HasColumnType("character varying(500)") - .HasMaxLength(500); + .HasMaxLength(500) + .HasColumnType("character varying(500)"); b.Property("InstanceId") - .HasColumnType("uuid") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("uuid"); b.Property("NextExecution") .HasColumnType("bigint"); b.Property("Reference") - .HasColumnType("character varying(200)") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("character varying(200)"); b.Property("Status") .HasColumnType("integer"); @@ -284,8 +314,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("integer"); b.Property("WorkflowDefinitionId") - .HasColumnType("character varying(200)") - .HasMaxLength(200); + .HasMaxLength(200) + .HasColumnType("character varying(200)"); b.HasKey("PersistenceId"); @@ -294,7 +324,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("NextExecution"); - b.ToTable("Workflow","wfc"); + b.ToTable("Workflow", "wfc"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => @@ -304,6 +334,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasForeignKey("WorkflowId") .OnDelete(DeleteBehavior.Cascade) .IsRequired(); + + b.Navigation("Workflow"); }); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => @@ -313,6 +345,18 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasForeignKey("ExecutionPointerId") .OnDelete(DeleteBehavior.Cascade) .IsRequired(); + + b.Navigation("ExecutionPointer"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.Navigation("ExtensionAttributes"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", b => + { + b.Navigation("ExecutionPointers"); }); #pragma warning restore 612, 618 } diff --git a/src/providers/WorkflowCore.Persistence.PostgreSQL/PostgresContext.cs b/src/providers/WorkflowCore.Persistence.PostgreSQL/PostgresContext.cs index 17e83e958..3eb80b448 100644 --- a/src/providers/WorkflowCore.Persistence.PostgreSQL/PostgresContext.cs +++ b/src/providers/WorkflowCore.Persistence.PostgreSQL/PostgresContext.cs @@ -60,6 +60,12 @@ protected override void ConfigureEventStorage(EntityTypeBuilder builder.ToTable("Event", _schemaName); builder.Property(x => x.PersistenceId).ValueGeneratedOnAdd(); } + + protected override void ConfigureScheduledCommandStorage(EntityTypeBuilder builder) + { + builder.ToTable("ScheduledCommand", _schemaName); + builder.Property(x => x.PersistenceId).ValueGeneratedOnAdd(); + } } } diff --git a/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs index d115f1eb2..695b8e31f 100644 --- a/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs @@ -18,7 +18,9 @@ public class RavendbPersistenceProvider : IPersistenceProvider private readonly IDocumentStore _database; static bool indexesCreated = false; - public RavendbPersistenceProvider(IDocumentStore database) + public bool SupportsScheduledCommands => false; + + public RavendbPersistenceProvider(IDocumentStore database) { _database = database; CreateIndexes(this); @@ -316,5 +318,14 @@ public async Task PersistErrors(IEnumerable errors, Cancellation } } - } + public Task ScheduleCommand(ScheduledCommand command) + { + throw new NotImplementedException(); + } + + public Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + } } diff --git a/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/20211023161544_scheduled-commands.Designer.cs b/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/20211023161544_scheduled-commands.Designer.cs new file mode 100644 index 000000000..84bed0be0 --- /dev/null +++ b/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/20211023161544_scheduled-commands.Designer.cs @@ -0,0 +1,381 @@ +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using WorkflowCore.Persistence.SqlServer; + +namespace WorkflowCore.Persistence.SqlServer.Migrations +{ + [DbContext(typeof(SqlServerContext))] + [Migration("20211023161544_scheduled-commands")] + partial class scheduledcommands + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("Relational:MaxIdentifierLength", 128) + .HasAnnotation("ProductVersion", "5.0.8") + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedEvent", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("EventData") + .HasColumnType("nvarchar(max)"); + + b.Property("EventId") + .HasColumnType("uniqueidentifier"); + + b.Property("EventKey") + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.Property("EventName") + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.Property("EventTime") + .HasColumnType("datetime2"); + + b.Property("IsProcessed") + .HasColumnType("bit"); + + b.HasKey("PersistenceId"); + + b.HasIndex("EventId") + .IsUnique(); + + b.HasIndex("EventTime"); + + b.HasIndex("IsProcessed"); + + b.HasIndex("EventName", "EventKey"); + + b.ToTable("Event", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionError", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("ErrorTime") + .HasColumnType("datetime2"); + + b.Property("ExecutionPointerId") + .HasMaxLength(100) + .HasColumnType("nvarchar(100)"); + + b.Property("Message") + .HasColumnType("nvarchar(max)"); + + b.Property("WorkflowId") + .HasMaxLength(100) + .HasColumnType("nvarchar(100)"); + + b.HasKey("PersistenceId"); + + b.ToTable("ExecutionError", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("Active") + .HasColumnType("bit"); + + b.Property("Children") + .HasColumnType("nvarchar(max)"); + + b.Property("ContextItem") + .HasColumnType("nvarchar(max)"); + + b.Property("EndTime") + .HasColumnType("datetime2"); + + b.Property("EventData") + .HasColumnType("nvarchar(max)"); + + b.Property("EventKey") + .HasMaxLength(100) + .HasColumnType("nvarchar(100)"); + + b.Property("EventName") + .HasMaxLength(100) + .HasColumnType("nvarchar(100)"); + + b.Property("EventPublished") + .HasColumnType("bit"); + + b.Property("Id") + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.Property("Outcome") + .HasColumnType("nvarchar(max)"); + + b.Property("PersistenceData") + .HasColumnType("nvarchar(max)"); + + b.Property("PredecessorId") + .HasMaxLength(100) + .HasColumnType("nvarchar(100)"); + + b.Property("RetryCount") + .HasColumnType("int"); + + b.Property("Scope") + .HasColumnType("nvarchar(max)"); + + b.Property("SleepUntil") + .HasColumnType("datetime2"); + + b.Property("StartTime") + .HasColumnType("datetime2"); + + b.Property("Status") + .HasColumnType("int"); + + b.Property("StepId") + .HasColumnType("int"); + + b.Property("StepName") + .HasMaxLength(100) + .HasColumnType("nvarchar(100)"); + + b.Property("WorkflowId") + .HasColumnType("bigint"); + + b.HasKey("PersistenceId"); + + b.HasIndex("WorkflowId"); + + b.ToTable("ExecutionPointer", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("AttributeKey") + .HasMaxLength(100) + .HasColumnType("nvarchar(100)"); + + b.Property("AttributeValue") + .HasColumnType("nvarchar(max)"); + + b.Property("ExecutionPointerId") + .HasColumnType("bigint"); + + b.HasKey("PersistenceId"); + + b.HasIndex("ExecutionPointerId"); + + b.ToTable("ExtensionAttribute", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedScheduledCommand", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("CommandName") + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.Property("Data") + .HasMaxLength(500) + .HasColumnType("nvarchar(500)"); + + b.Property("ExecuteTime") + .HasColumnType("bigint"); + + b.HasKey("PersistenceId"); + + b.HasIndex("ExecuteTime"); + + b.HasIndex("CommandName", "Data") + .IsUnique() + .HasFilter("[CommandName] IS NOT NULL AND [Data] IS NOT NULL"); + + b.ToTable("ScheduledCommand", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedSubscription", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("EventKey") + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.Property("EventName") + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.Property("ExecutionPointerId") + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.Property("ExternalToken") + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.Property("ExternalTokenExpiry") + .HasColumnType("datetime2"); + + b.Property("ExternalWorkerId") + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.Property("StepId") + .HasColumnType("int"); + + b.Property("SubscribeAsOf") + .HasColumnType("datetime2"); + + b.Property("SubscriptionData") + .HasColumnType("nvarchar(max)"); + + b.Property("SubscriptionId") + .HasMaxLength(200) + .HasColumnType("uniqueidentifier"); + + b.Property("WorkflowId") + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.HasKey("PersistenceId"); + + b.HasIndex("EventKey"); + + b.HasIndex("EventName"); + + b.HasIndex("SubscriptionId") + .IsUnique(); + + b.ToTable("Subscription", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("CompleteTime") + .HasColumnType("datetime2"); + + b.Property("CreateTime") + .HasColumnType("datetime2"); + + b.Property("Data") + .HasColumnType("nvarchar(max)"); + + b.Property("Description") + .HasMaxLength(500) + .HasColumnType("nvarchar(500)"); + + b.Property("InstanceId") + .HasMaxLength(200) + .HasColumnType("uniqueidentifier"); + + b.Property("NextExecution") + .HasColumnType("bigint"); + + b.Property("Reference") + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.Property("Status") + .HasColumnType("int"); + + b.Property("Version") + .HasColumnType("int"); + + b.Property("WorkflowDefinitionId") + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.HasKey("PersistenceId"); + + b.HasIndex("InstanceId") + .IsUnique(); + + b.HasIndex("NextExecution"); + + b.ToTable("Workflow", "wfc"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.HasOne("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", "Workflow") + .WithMany("ExecutionPointers") + .HasForeignKey("WorkflowId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Workflow"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExtensionAttribute", b => + { + b.HasOne("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", "ExecutionPointer") + .WithMany("ExtensionAttributes") + .HasForeignKey("ExecutionPointerId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("ExecutionPointer"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedExecutionPointer", b => + { + b.Navigation("ExtensionAttributes"); + }); + + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedWorkflow", b => + { + b.Navigation("ExecutionPointers"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/20211023161544_scheduled-commands.cs b/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/20211023161544_scheduled-commands.cs new file mode 100644 index 000000000..2d5ec1572 --- /dev/null +++ b/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/20211023161544_scheduled-commands.cs @@ -0,0 +1,47 @@ +using Microsoft.EntityFrameworkCore.Migrations; + +namespace WorkflowCore.Persistence.SqlServer.Migrations +{ + public partial class scheduledcommands : Migration + { + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.CreateTable( + name: "ScheduledCommand", + schema: "wfc", + columns: table => new + { + PersistenceId = table.Column(type: "bigint", nullable: false) + .Annotation("SqlServer:Identity", "1, 1"), + CommandName = table.Column(type: "nvarchar(200)", maxLength: 200, nullable: true), + Data = table.Column(type: "nvarchar(500)", maxLength: 500, nullable: true), + ExecuteTime = table.Column(type: "bigint", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_ScheduledCommand", x => x.PersistenceId); + }); + + migrationBuilder.CreateIndex( + name: "IX_ScheduledCommand_CommandName_Data", + schema: "wfc", + table: "ScheduledCommand", + columns: new[] { "CommandName", "Data" }, + unique: true, + filter: "[CommandName] IS NOT NULL AND [Data] IS NOT NULL"); + + migrationBuilder.CreateIndex( + name: "IX_ScheduledCommand_ExecuteTime", + schema: "wfc", + table: "ScheduledCommand", + column: "ExecuteTime"); + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "ScheduledCommand", + schema: "wfc"); + } + } +} diff --git a/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/SqlServerPersistenceProviderModelSnapshot.cs b/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/SqlServerPersistenceProviderModelSnapshot.cs index 2e32d838d..39da38276 100644 --- a/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/SqlServerPersistenceProviderModelSnapshot.cs +++ b/src/providers/WorkflowCore.Persistence.SqlServer/Migrations/SqlServerPersistenceProviderModelSnapshot.cs @@ -2,6 +2,9 @@ using System; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using WorkflowCore.Persistence.SqlServer; namespace WorkflowCore.Persistence.SqlServer.Migrations { @@ -12,16 +15,18 @@ protected override void BuildModel(ModelBuilder modelBuilder) { #pragma warning disable 612, 618 modelBuilder - .UseIdentityColumns() .HasAnnotation("Relational:MaxIdentifierLength", 128) - .HasAnnotation("ProductVersion", "5.0.1"); + .HasAnnotation("ProductVersion", "5.0.8") + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedEvent", b => { b.Property("PersistenceId") .ValueGeneratedOnAdd() .HasColumnType("bigint") - .UseIdentityColumn(); + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); b.Property("EventData") .HasColumnType("nvarchar(max)"); @@ -62,7 +67,9 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.Property("PersistenceId") .ValueGeneratedOnAdd() .HasColumnType("bigint") - .UseIdentityColumn(); + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); b.Property("ErrorTime") .HasColumnType("datetime2"); @@ -88,7 +95,9 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.Property("PersistenceId") .ValueGeneratedOnAdd() .HasColumnType("bigint") - .UseIdentityColumn(); + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); b.Property("Active") .HasColumnType("bit"); @@ -167,7 +176,9 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.Property("PersistenceId") .ValueGeneratedOnAdd() .HasColumnType("bigint") - .UseIdentityColumn(); + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); b.Property("AttributeKey") .HasMaxLength(100) @@ -186,12 +197,45 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.ToTable("ExtensionAttribute", "wfc"); }); + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedScheduledCommand", b => + { + b.Property("PersistenceId") + .ValueGeneratedOnAdd() + .HasColumnType("bigint") + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + b.Property("CommandName") + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.Property("Data") + .HasMaxLength(500) + .HasColumnType("nvarchar(500)"); + + b.Property("ExecuteTime") + .HasColumnType("bigint"); + + b.HasKey("PersistenceId"); + + b.HasIndex("ExecuteTime"); + + b.HasIndex("CommandName", "Data") + .IsUnique() + .HasFilter("[CommandName] IS NOT NULL AND [Data] IS NOT NULL"); + + b.ToTable("ScheduledCommand", "wfc"); + }); + modelBuilder.Entity("WorkflowCore.Persistence.EntityFramework.Models.PersistedSubscription", b => { b.Property("PersistenceId") .ValueGeneratedOnAdd() .HasColumnType("bigint") - .UseIdentityColumn(); + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); b.Property("EventKey") .HasMaxLength(200) @@ -250,7 +294,9 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.Property("PersistenceId") .ValueGeneratedOnAdd() .HasColumnType("bigint") - .UseIdentityColumn(); + .HasAnnotation("SqlServer:IdentityIncrement", 1) + .HasAnnotation("SqlServer:IdentitySeed", 1) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); b.Property("CompleteTime") .HasColumnType("datetime2"); diff --git a/src/providers/WorkflowCore.Persistence.SqlServer/SqlServerContext.cs b/src/providers/WorkflowCore.Persistence.SqlServer/SqlServerContext.cs index 6e2656a5b..eb03f3647 100644 --- a/src/providers/WorkflowCore.Persistence.SqlServer/SqlServerContext.cs +++ b/src/providers/WorkflowCore.Persistence.SqlServer/SqlServerContext.cs @@ -58,5 +58,11 @@ protected override void ConfigureEventStorage(EntityTypeBuilder builder.ToTable("Event", "wfc"); builder.Property(x => x.PersistenceId).UseIdentityColumn(); } + + protected override void ConfigureScheduledCommandStorage(EntityTypeBuilder builder) + { + builder.ToTable("ScheduledCommand", "wfc"); + builder.Property(x => x.PersistenceId).UseIdentityColumn(); + } } } diff --git a/src/providers/WorkflowCore.Persistence.Sqlite/SqliteContext.cs b/src/providers/WorkflowCore.Persistence.Sqlite/SqliteContext.cs index 1b5a6c469..0f2083933 100644 --- a/src/providers/WorkflowCore.Persistence.Sqlite/SqliteContext.cs +++ b/src/providers/WorkflowCore.Persistence.Sqlite/SqliteContext.cs @@ -52,5 +52,10 @@ protected override void ConfigureEventStorage(EntityTypeBuilder { builder.ToTable("Event"); } + + protected override void ConfigureScheduledCommandStorage(EntityTypeBuilder builder) + { + builder.ToTable("ScheduledCommand"); + } } } diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs index d3828389a..1126b6c20 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs @@ -24,6 +24,8 @@ public class DynamoPersistenceProvider : IPersistenceProvider public const string SUBCRIPTION_TABLE = "subscriptions"; public const string EVENT_TABLE = "events"; + public bool SupportsScheduledCommands => false; + public DynamoPersistenceProvider(AWSCredentials credentials, AmazonDynamoDBConfig config, IDynamoDbProvisioner provisioner, string tablePrefix, ILoggerFactory logFactory) { _logger = logFactory.CreateLogger(); @@ -469,5 +471,15 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke await _client.UpdateItemAsync(request, cancellationToken); } + + public Task ScheduleCommand(ScheduledCommand command) + { + throw new NotImplementedException(); + } + + public Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } } } \ No newline at end of file diff --git a/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs index 36e51b86c..644009df2 100644 --- a/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs @@ -35,6 +35,8 @@ public CosmosDbPersistenceProvider( _subscriptionContainer = new Lazy(() => _clientFactory.GetCosmosClient().GetDatabase(_dbId).GetContainer(cosmosDbStorageOptions.SubscriptionContainerName)); } + public bool SupportsScheduledCommands => false; + public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default) { var existing = await _subscriptionContainer.Value.ReadItemAsync(eventSubscriptionId, new PartitionKey(eventSubscriptionId)); @@ -225,6 +227,16 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c await _workflowContainer.Value.UpsertItemAsync(PersistedWorkflow.FromInstance(workflow), cancellationToken: cancellationToken); } + public Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + + public Task ScheduleCommand(ScheduledCommand command) + { + throw new NotImplementedException(); + } + public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken) { var sub = await _subscriptionContainer.Value.ReadItemAsync(eventSubscriptionId, new PartitionKey(eventSubscriptionId), cancellationToken: cancellationToken); diff --git a/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs index dcd932af0..cde65e2e5 100644 --- a/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs @@ -28,6 +28,8 @@ public class RedisPersistenceProvider : IPersistenceProvider private readonly JsonSerializerSettings _serializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }; private readonly bool _removeComplete; + public bool SupportsScheduledCommands => false; + public RedisPersistenceProvider(string connectionString, string prefix, bool removeComplete, ILoggerFactory logFactory) { _connectionString = connectionString; @@ -232,5 +234,15 @@ public Task PersistErrors(IEnumerable errors, CancellationToken public void EnsureStoreExists() { } + + public Task ScheduleCommand(ScheduledCommand command) + { + throw new NotImplementedException(); + } + + public Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } } } diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/DelayScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/DelayScenario.cs new file mode 100644 index 000000000..704b27d79 --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/DelayScenario.cs @@ -0,0 +1,57 @@ +using System; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using Xunit; +using FluentAssertions; +using WorkflowCore.Testing; + +namespace WorkflowCore.IntegrationTests.Scenarios +{ + public class DelayWorkflow : IWorkflow + { + internal static int Step1Ticker = 0; + internal static int Step2Ticker = 0; + + public string Id => "DelayWorkflow"; + public int Version => 1; + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(context => Step1Ticker++) + .Delay(data => data.WaitTime) + .Then(context => Step2Ticker++); + + } + + public class MyDataClass + { + public TimeSpan WaitTime { get; set; } + } + } + + public class DelayScenario : WorkflowTest + { + public DelayScenario() + { + Setup(); + } + + [Fact] + public void Scenario() + { + DelayWorkflow.Step1Ticker = 0; + DelayWorkflow.Step2Ticker = 0; + + var workflowId = StartWorkflow(new DelayWorkflow.MyDataClass() + { + WaitTime = Host.Options.PollInterval.Add(TimeSpan.FromSeconds(1)) + }); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); + + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(0); + DelayWorkflow.Step1Ticker.Should().Be(1); + DelayWorkflow.Step2Ticker.Should().Be(1); + } + } +} diff --git a/test/WorkflowCore.Testing/WorkflowTest.cs b/test/WorkflowCore.Testing/WorkflowTest.cs index 46a6d0ce8..bf0eb97ab 100644 --- a/test/WorkflowCore.Testing/WorkflowTest.cs +++ b/test/WorkflowCore.Testing/WorkflowTest.cs @@ -46,7 +46,7 @@ protected void Host_OnStepError(WorkflowInstance workflow, WorkflowStep step, Ex protected virtual void ConfigureServices(IServiceCollection services) { - services.AddWorkflow(); + services.AddWorkflow(options => options.UsePollInterval(TimeSpan.FromSeconds(3))); } public string StartWorkflow(TData data) diff --git a/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoDelayScenario.cs b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoDelayScenario.cs new file mode 100644 index 000000000..511aa79fb --- /dev/null +++ b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoDelayScenario.cs @@ -0,0 +1,26 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using MongoDB.Bson.Serialization; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.MongoDB.Scenarios +{ + [Collection("Mongo collection")] + public class MongoDelayScenario : DelayScenario + { + public MongoDelayScenario() : base() + { + BsonClassMap.RegisterClassMap(map => map.AutoMap()); + } + + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(cfg => + { + cfg.UseMongoDB(MongoDockerSetup.ConnectionString, "integration-tests"); + cfg.UsePollInterval(TimeSpan.FromSeconds(2)); + }); + } + } +} diff --git a/test/WorkflowCore.Tests.MySQL/Scenarios/MysqlDelayScenario.cs b/test/WorkflowCore.Tests.MySQL/Scenarios/MysqlDelayScenario.cs new file mode 100644 index 000000000..deed80f2b --- /dev/null +++ b/test/WorkflowCore.Tests.MySQL/Scenarios/MysqlDelayScenario.cs @@ -0,0 +1,20 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.MySQL.Scenarios +{ + [Collection("Mysql collection")] + public class MysqlDelayScenario : DelayScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(cfg => + { + cfg.UseMySQL(MysqlDockerSetup.ScenarioConnectionString, true, true); + cfg.UsePollInterval(TimeSpan.FromSeconds(2)); + }); + } + } +} diff --git a/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresDelayScenario.cs b/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresDelayScenario.cs new file mode 100644 index 000000000..649632ac5 --- /dev/null +++ b/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresDelayScenario.cs @@ -0,0 +1,20 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.PostgreSQL.Scenarios +{ + [Collection("Postgres collection")] + public class PostgresDelayScenario : DelayScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(cfg => + { + cfg.UsePostgreSQL(PostgresDockerSetup.ScenarioConnectionString, true, true); + cfg.UsePollInterval(TimeSpan.FromSeconds(2)); + }); + } + } +} diff --git a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerDelayScenario.cs b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerDelayScenario.cs new file mode 100644 index 000000000..5e623c1db --- /dev/null +++ b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerDelayScenario.cs @@ -0,0 +1,20 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.SqlServer.Scenarios +{ + [Collection("SqlServer collection")] + public class SqlServerDelayScenario : DelayScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(cfg => + { + cfg.UseSqlServer(SqlDockerSetup.ScenarioConnectionString, true, true); + cfg.UsePollInterval(TimeSpan.FromSeconds(2)); + }); + } + } +}