diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs index e1a3a88d..7216de10 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs @@ -2,31 +2,21 @@ using System.Collections.Generic; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Domain.Exceptions; using Cleipnir.ResilientFunctions.Messaging; using Cleipnir.ResilientFunctions.Storage; using Microsoft.Data.SqlClient; namespace Cleipnir.ResilientFunctions.SqlServer; -public class SqlServerMessageStore : IMessageStore +public class SqlServerMessageStore(string connectionString, string tablePrefix = "") : IMessageStore { - private readonly string _connectionString; - private readonly string _tablePrefix; - - public SqlServerMessageStore(string connectionString, string tablePrefix = "") - { - _connectionString = connectionString; - _tablePrefix = tablePrefix; - } - private string? _initializeSql; public async Task Initialize() { await using var conn = await CreateConnection(); _initializeSql ??= @$" - CREATE TABLE {_tablePrefix}_Messages ( + CREATE TABLE {tablePrefix}_Messages ( FlowType INT, FlowInstance UNIQUEIDENTIFIER, Position INT NOT NULL, @@ -46,23 +36,26 @@ PRIMARY KEY (FlowType, FlowInstance, Position) public async Task TruncateTable() { await using var conn = await CreateConnection(); - _truncateTableSql ??= $"TRUNCATE TABLE {_tablePrefix}_Messages;"; + _truncateTableSql ??= $"TRUNCATE TABLE {tablePrefix}_Messages;"; var command = new SqlCommand(_truncateTableSql, conn); await command.ExecuteNonQueryAsync(); } - private string? _appendMessageSql; public async Task AppendMessage(StoredId storedId, StoredMessage storedMessage) + => await AppendMessage(storedId, storedMessage, depth: 0); + + private string? _appendMessageSql; + private async Task AppendMessage(StoredId storedId, StoredMessage storedMessage, int depth) { await using var conn = await CreateConnection(); _appendMessageSql ??= @$" - INSERT INTO {_tablePrefix}_Messages + INSERT INTO {tablePrefix}_Messages (FlowType, FlowInstance, Position, MessageJson, MessageType, IdempotencyKey) VALUES ( @FlowType, @FlowInstance, - (SELECT COALESCE(MAX(position), -1) + 1 FROM {_tablePrefix}_Messages WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance), + (SELECT COALESCE(MAX(position), -1) + 1 FROM {tablePrefix}_Messages WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance), @MessageJson, @MessageType, @IdempotencyKey );"; @@ -78,14 +71,13 @@ INSERT INTO {_tablePrefix}_Messages } catch (SqlException e) { - if (e.Number == SqlError.UNIQUENESS_INDEX_VIOLATION) //idempotency key already exists - return await GetSuspensionStatus(storedId, conn); - if (e.Number != SqlError.DEADLOCK_VICTIM && e.Number != SqlError.UNIQUENESS_VIOLATION) + if (depth == 10 || e.Number != SqlError.DEADLOCK_VICTIM) throw; + // ReSharper disable once DisposeOnUsingVariable await conn.DisposeAsync(); await Task.Delay(Random.Shared.Next(50, 250)); - return await AppendMessage(storedId, storedMessage); + return await AppendMessage(storedId, storedMessage, depth + 1); } return await GetSuspensionStatus(storedId, conn); @@ -97,7 +89,7 @@ public async Task ReplaceMessage(StoredId storedId, int position, StoredMe await using var conn = await CreateConnection(); _replaceMessageSql ??= @$" - UPDATE {_tablePrefix}_Messages + UPDATE {tablePrefix}_Messages SET MessageJson = @MessageJson, MessageType = @MessageType, IdempotencyKey = @IdempotencyKey WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance AND Position = @Position"; @@ -118,7 +110,7 @@ public async Task Truncate(StoredId storedId) { await using var conn = await CreateConnection(); _truncateSql ??= @$" - DELETE FROM {_tablePrefix}_Messages + DELETE FROM {tablePrefix}_Messages WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance"; await using var command = new SqlCommand(_truncateSql, conn); @@ -134,7 +126,7 @@ public async Task> GetMessages(StoredId storedId, i await using var conn = await CreateConnection(); _getMessagesSql ??= @$" SELECT MessageJson, MessageType, IdempotencyKey - FROM {_tablePrefix}_Messages + FROM {tablePrefix}_Messages WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance AND Position >= @Position ORDER BY Position ASC;"; @@ -168,7 +160,7 @@ public async Task> GetMessages(StoredId storedId, i private async Task CreateConnection() { - var conn = new SqlConnection(_connectionString); + var conn = new SqlConnection(connectionString); await conn.OpenAsync(); return conn; } @@ -178,7 +170,7 @@ private async Task CreateConnection() { _getSuspensionStatusSql ??= @$" SELECT Epoch, Status - FROM {_tablePrefix} + FROM {tablePrefix} WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance"; await using var command = new SqlCommand(_getSuspensionStatusSql, connection); command.Parameters.AddWithValue("@FlowType", storedId.Type.Value);