diff --git a/src/providers/WorkflowCore.Providers.AWS/README.md b/src/providers/WorkflowCore.Providers.AWS/README.md index f9c000662..0a9286f37 100644 --- a/src/providers/WorkflowCore.Providers.AWS/README.md +++ b/src/providers/WorkflowCore.Providers.AWS/README.md @@ -34,6 +34,18 @@ services.AddWorkflow(cfg => If any AWS resources do not exists, they will be automatcially created. By default, all DynamoDB tables and indexes will be provisioned with a throughput of 1, you can modify these values from the AWS console. You may also specify a prefix for the dynamo table names. +If you have a preconfigured dynamoClient, you can pass this in instead of the credentials and config +```C# +var client = new AmazonDynamoDBClient(); +var sqsClient = new AmazonSQSClient(); +services.AddWorkflow(cfg => +{ + cfg.UseAwsDynamoPersistenceWithProvisionedClient(client, "table-prefix"); + cfg.UseAwsDynamoLockingWithProvisionedClient(client, "workflow-core-locks"); + cfg.UseAwsSimpleQueueServiceWithProvisionedClient(sqsClient, "queues-prefix"); +}); +``` + ## Usage (Kinesis) diff --git a/src/providers/WorkflowCore.Providers.AWS/ServiceCollectionExtensions.cs b/src/providers/WorkflowCore.Providers.AWS/ServiceCollectionExtensions.cs index 57b6f6bc8..c3c545f80 100644 --- a/src/providers/WorkflowCore.Providers.AWS/ServiceCollectionExtensions.cs +++ b/src/providers/WorkflowCore.Providers.AWS/ServiceCollectionExtensions.cs @@ -1,6 +1,7 @@ using System; using Amazon; using Amazon.DynamoDBv2; +using Amazon.Kinesis; using Amazon.Runtime; using Amazon.SQS; using Microsoft.Extensions.Logging; @@ -15,28 +16,55 @@ public static class ServiceCollectionExtensions { public static WorkflowOptions UseAwsSimpleQueueService(this WorkflowOptions options, AWSCredentials credentials, AmazonSQSConfig config, string queuesPrefix = "workflowcore") { - options.UseQueueProvider(sp => new SQSQueueProvider(credentials, config, sp.GetService(), queuesPrefix)); + var sqsClient = new AmazonSQSClient(credentials, config); + return options.UseAwsSimpleQueueServiceWithProvisionedClient(sqsClient, queuesPrefix); + } + + public static WorkflowOptions UseAwsSimpleQueueServiceWithProvisionedClient(this WorkflowOptions options, AmazonSQSClient sqsClient, string queuesPrefix = "workflowcore") + { + options.UseQueueProvider(sp => new SQSQueueProvider(sqsClient, sp.GetService(), queuesPrefix)); return options; } public static WorkflowOptions UseAwsDynamoLocking(this WorkflowOptions options, AWSCredentials credentials, AmazonDynamoDBConfig config, string tableName) { - options.UseDistributedLockManager(sp => new DynamoLockProvider(credentials, config, tableName, sp.GetService(), sp.GetService())); + var dbClient = new AmazonDynamoDBClient(credentials, config); + return options.UseAwsDynamoLockingWithProvisionedClient(dbClient, tableName); + } + + public static WorkflowOptions UseAwsDynamoLockingWithProvisionedClient (this WorkflowOptions options, AmazonDynamoDBClient dynamoClient, string tableName) + { + options.UseDistributedLockManager(sp => new DynamoLockProvider(dynamoClient, tableName, sp.GetService(), sp.GetService())); return options; } public static WorkflowOptions UseAwsDynamoPersistence(this WorkflowOptions options, AWSCredentials credentials, AmazonDynamoDBConfig config, string tablePrefix) { - options.Services.AddTransient(sp => new DynamoDbProvisioner(credentials, config, tablePrefix, sp.GetService())); - options.UsePersistence(sp => new DynamoPersistenceProvider(credentials, config, sp.GetService(), tablePrefix, sp.GetService())); + var dbClient = new AmazonDynamoDBClient(credentials, config); + return options.UseAwsDynamoPersistenceWithProvisionedClient(dbClient, tablePrefix); + } + + public static WorkflowOptions UseAwsDynamoPersistenceWithProvisionedClient(this WorkflowOptions options, AmazonDynamoDBClient dynamoClient, string tablePrefix) + { + options.Services.AddTransient(sp => new DynamoDbProvisioner(dynamoClient, tablePrefix, sp.GetService())); + options.UsePersistence(sp => new DynamoPersistenceProvider(dynamoClient, sp.GetService(), tablePrefix, sp.GetService())); return options; } public static WorkflowOptions UseAwsKinesis(this WorkflowOptions options, AWSCredentials credentials, RegionEndpoint region, string appName, string streamName) { - options.Services.AddTransient(sp => new KinesisTracker(credentials, region, "workflowcore_kinesis", sp.GetService())); - options.Services.AddTransient(sp => new KinesisStreamConsumer(credentials, region, sp.GetService(), sp.GetService(), sp.GetService(), sp.GetService())); - options.UseEventHub(sp => new KinesisProvider(credentials, region, appName, streamName, sp.GetService(), sp.GetService())); + var kinesisClient = new AmazonKinesisClient(credentials, region); + var dynamoClient = new AmazonDynamoDBClient(credentials, region); + + return options.UseAwsKinesisWithProvisionedClients(kinesisClient, dynamoClient,appName, streamName); + + } + + public static WorkflowOptions UseAwsKinesisWithProvisionedClients(this WorkflowOptions options, AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDbClient, string appName, string streamName) + { + options.Services.AddTransient(sp => new KinesisTracker(dynamoDbClient, "workflowcore_kinesis", sp.GetService())); + options.Services.AddTransient(sp => new KinesisStreamConsumer(kinesisClient, sp.GetService(), sp.GetService(), sp.GetService(), sp.GetService())); + options.UseEventHub(sp => new KinesisProvider(kinesisClient, appName, streamName, sp.GetService(), sp.GetService())); return options; } } diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoDbProvisioner.cs b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoDbProvisioner.cs index 3f381c8c3..887d11a7a 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoDbProvisioner.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoDbProvisioner.cs @@ -15,10 +15,10 @@ public class DynamoDbProvisioner : IDynamoDbProvisioner private readonly IAmazonDynamoDB _client; private readonly string _tablePrefix; - public DynamoDbProvisioner(AWSCredentials credentials, AmazonDynamoDBConfig config, string tablePrefix, ILoggerFactory logFactory) + public DynamoDbProvisioner(AmazonDynamoDBClient dynamoDBClient, string tablePrefix, ILoggerFactory logFactory) { _logger = logFactory.CreateLogger(); - _client = new AmazonDynamoDBClient(credentials, config); + _client = dynamoDBClient; _tablePrefix = tablePrefix; } diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoLockProvider.cs b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoLockProvider.cs index 6f4aca0e8..0863f1393 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoLockProvider.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoLockProvider.cs @@ -25,10 +25,10 @@ public class DynamoLockProvider : IDistributedLockProvider private readonly AutoResetEvent _mutex = new AutoResetEvent(true); private readonly IDateTimeProvider _dateTimeProvider; - public DynamoLockProvider(AWSCredentials credentials, AmazonDynamoDBConfig config, string tableName, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider) + public DynamoLockProvider(AmazonDynamoDBClient dynamoDBClient, string tableName, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider) { _logger = logFactory.CreateLogger(); - _client = new AmazonDynamoDBClient(credentials, config); + _client = dynamoDBClient; _localLocks = new List(); _tableName = tableName; _nodeId = Guid.NewGuid().ToString(); diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs index 09f1dbc4c..01beaaabe 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs @@ -26,10 +26,10 @@ public class DynamoPersistenceProvider : IPersistenceProvider public bool SupportsScheduledCommands => false; - public DynamoPersistenceProvider(AWSCredentials credentials, AmazonDynamoDBConfig config, IDynamoDbProvisioner provisioner, string tablePrefix, ILoggerFactory logFactory) + public DynamoPersistenceProvider(AmazonDynamoDBClient dynamoDBClient, IDynamoDbProvisioner provisioner, string tablePrefix, ILoggerFactory logFactory) { _logger = logFactory.CreateLogger(); - _client = new AmazonDynamoDBClient(credentials, config); + _client = dynamoDBClient; _tablePrefix = tablePrefix; _provisioner = provisioner; } diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/KinesisProvider.cs b/src/providers/WorkflowCore.Providers.AWS/Services/KinesisProvider.cs index 99d43d94f..d8aa519bd 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/KinesisProvider.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/KinesisProvider.cs @@ -26,7 +26,7 @@ public class KinesisProvider : ILifeCycleEventHub private readonly int _defaultShardCount = 1; private bool _started = false; - public KinesisProvider(AWSCredentials credentials, RegionEndpoint region, string appName, string streamName, IKinesisStreamConsumer consumer, ILoggerFactory logFactory) + public KinesisProvider(AmazonKinesisClient kinesisClient, string appName, string streamName, IKinesisStreamConsumer consumer, ILoggerFactory logFactory) { _logger = logFactory.CreateLogger(GetType()); _appName = appName; @@ -34,7 +34,7 @@ public KinesisProvider(AWSCredentials credentials, RegionEndpoint region, string _consumer = consumer; _serializer = new JsonSerializer(); _serializer.TypeNameHandling = TypeNameHandling.All; - _client = new AmazonKinesisClient(credentials, region); + _client = kinesisClient; } public async Task PublishNotification(LifeCycleEvent evt) diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/KinesisStreamConsumer.cs b/src/providers/WorkflowCore.Providers.AWS/Services/KinesisStreamConsumer.cs index 799125a0d..5c89f7837 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/KinesisStreamConsumer.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/KinesisStreamConsumer.cs @@ -25,12 +25,12 @@ public class KinesisStreamConsumer : IKinesisStreamConsumer, IDisposable private ICollection _subscribers = new HashSet(); private readonly IDateTimeProvider _dateTimeProvider; - public KinesisStreamConsumer(AWSCredentials credentials, RegionEndpoint region, IKinesisTracker tracker, IDistributedLockProvider lockManager, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider) + public KinesisStreamConsumer(AmazonKinesisClient kinesisClient, IKinesisTracker tracker, IDistributedLockProvider lockManager, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider) { _logger = logFactory.CreateLogger(GetType()); _tracker = tracker; _lockManager = lockManager; - _client = new AmazonKinesisClient(credentials, region); + _client = kinesisClient; _processTask = new Task(Process); _processTask.Start(); _dateTimeProvider = dateTimeProvider; diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/KinesisTracker.cs b/src/providers/WorkflowCore.Providers.AWS/Services/KinesisTracker.cs index 9c5548420..d7c028c46 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/KinesisTracker.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/KinesisTracker.cs @@ -17,10 +17,10 @@ public class KinesisTracker : IKinesisTracker private readonly string _tableName; private bool _tableConfirmed = false; - public KinesisTracker(AWSCredentials credentials, RegionEndpoint region, string tableName, ILoggerFactory logFactory) + public KinesisTracker(AmazonDynamoDBClient client, string tableName, ILoggerFactory logFactory) { _logger = logFactory.CreateLogger(GetType()); - _client = new AmazonDynamoDBClient(credentials, region); + _client = client; _tableName = tableName; } diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/SQSQueueProvider.cs b/src/providers/WorkflowCore.Providers.AWS/Services/SQSQueueProvider.cs index dd1c15e14..c15fb02af 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/SQSQueueProvider.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/SQSQueueProvider.cs @@ -21,10 +21,10 @@ public class SQSQueueProvider : IQueueProvider public bool IsDequeueBlocking => true; - public SQSQueueProvider(AWSCredentials credentials, AmazonSQSConfig config, ILoggerFactory logFactory, string queuesPrefix) + public SQSQueueProvider(AmazonSQSClient sqsClient, ILoggerFactory logFactory, string queuesPrefix) { _logger = logFactory.CreateLogger(); - _client = new AmazonSQSClient(credentials, config); + _client = sqsClient; _queuesPrefix = queuesPrefix; } diff --git a/test/WorkflowCore.Tests.DynamoDB/DynamoPersistenceProviderFixture.cs b/test/WorkflowCore.Tests.DynamoDB/DynamoPersistenceProviderFixture.cs index 6fb9c1ea9..7d215f4ee 100644 --- a/test/WorkflowCore.Tests.DynamoDB/DynamoPersistenceProviderFixture.cs +++ b/test/WorkflowCore.Tests.DynamoDB/DynamoPersistenceProviderFixture.cs @@ -26,8 +26,9 @@ protected override IPersistenceProvider Subject if (_subject == null) { var cfg = new AmazonDynamoDBConfig { ServiceURL = DynamoDbDockerSetup.ConnectionString }; - var provisioner = new DynamoDbProvisioner(DynamoDbDockerSetup.Credentials, cfg, "unittests", new LoggerFactory()); - var client = new DynamoPersistenceProvider(DynamoDbDockerSetup.Credentials, cfg, provisioner, "unittests", new LoggerFactory()); + var dbClient = new AmazonDynamoDBClient(DynamoDbDockerSetup.Credentials, cfg); + var provisioner = new DynamoDbProvisioner(dbClient, "unittests", new LoggerFactory()); + var client = new DynamoPersistenceProvider(dbClient, provisioner, "unittests", new LoggerFactory()); client.EnsureStoreExists(); _subject = client; }