Skip to content

Commit

Permalink
Improved SqlServerMessageStore append message deadlock handling and r…
Browse files Browse the repository at this point in the history
…efactored class
  • Loading branch information
stidsborg committed Dec 26, 2024
1 parent 17afbdb commit 9097683
Showing 1 changed file with 17 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,21 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Domain.Exceptions;
using Cleipnir.ResilientFunctions.Messaging;
using Cleipnir.ResilientFunctions.Storage;
using Microsoft.Data.SqlClient;

namespace Cleipnir.ResilientFunctions.SqlServer;

public class SqlServerMessageStore : IMessageStore
public class SqlServerMessageStore(string connectionString, string tablePrefix = "") : IMessageStore
{
private readonly string _connectionString;
private readonly string _tablePrefix;

public SqlServerMessageStore(string connectionString, string tablePrefix = "")
{
_connectionString = connectionString;
_tablePrefix = tablePrefix;
}

private string? _initializeSql;
public async Task Initialize()
{
await using var conn = await CreateConnection();

_initializeSql ??= @$"
CREATE TABLE {_tablePrefix}_Messages (
CREATE TABLE {tablePrefix}_Messages (
FlowType INT,
FlowInstance UNIQUEIDENTIFIER,
Position INT NOT NULL,
Expand All @@ -46,23 +36,26 @@ PRIMARY KEY (FlowType, FlowInstance, Position)
public async Task TruncateTable()
{
await using var conn = await CreateConnection();
_truncateTableSql ??= $"TRUNCATE TABLE {_tablePrefix}_Messages;";
_truncateTableSql ??= $"TRUNCATE TABLE {tablePrefix}_Messages;";
var command = new SqlCommand(_truncateTableSql, conn);
await command.ExecuteNonQueryAsync();
}

private string? _appendMessageSql;
public async Task<FunctionStatus?> AppendMessage(StoredId storedId, StoredMessage storedMessage)
=> await AppendMessage(storedId, storedMessage, depth: 0);

private string? _appendMessageSql;
private async Task<FunctionStatus?> AppendMessage(StoredId storedId, StoredMessage storedMessage, int depth)
{
await using var conn = await CreateConnection();

_appendMessageSql ??= @$"
INSERT INTO {_tablePrefix}_Messages
INSERT INTO {tablePrefix}_Messages
(FlowType, FlowInstance, Position, MessageJson, MessageType, IdempotencyKey)
VALUES (
@FlowType,
@FlowInstance,
(SELECT COALESCE(MAX(position), -1) + 1 FROM {_tablePrefix}_Messages WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance),
(SELECT COALESCE(MAX(position), -1) + 1 FROM {tablePrefix}_Messages WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance),
@MessageJson, @MessageType, @IdempotencyKey
);";

Expand All @@ -78,14 +71,13 @@ INSERT INTO {_tablePrefix}_Messages
}
catch (SqlException e)
{
if (e.Number == SqlError.UNIQUENESS_INDEX_VIOLATION) //idempotency key already exists
return await GetSuspensionStatus(storedId, conn);
if (e.Number != SqlError.DEADLOCK_VICTIM && e.Number != SqlError.UNIQUENESS_VIOLATION)
if (depth == 10 || e.Number != SqlError.DEADLOCK_VICTIM)
throw;

// ReSharper disable once DisposeOnUsingVariable
await conn.DisposeAsync();
await Task.Delay(Random.Shared.Next(50, 250));
return await AppendMessage(storedId, storedMessage);
return await AppendMessage(storedId, storedMessage, depth + 1);
}

return await GetSuspensionStatus(storedId, conn);
Expand All @@ -97,7 +89,7 @@ public async Task<bool> ReplaceMessage(StoredId storedId, int position, StoredMe
await using var conn = await CreateConnection();

_replaceMessageSql ??= @$"
UPDATE {_tablePrefix}_Messages
UPDATE {tablePrefix}_Messages
SET MessageJson = @MessageJson, MessageType = @MessageType, IdempotencyKey = @IdempotencyKey
WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance AND Position = @Position";

Expand All @@ -118,7 +110,7 @@ public async Task Truncate(StoredId storedId)
{
await using var conn = await CreateConnection();
_truncateSql ??= @$"
DELETE FROM {_tablePrefix}_Messages
DELETE FROM {tablePrefix}_Messages
WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance";

await using var command = new SqlCommand(_truncateSql, conn);
Expand All @@ -134,7 +126,7 @@ public async Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, i
await using var conn = await CreateConnection();
_getMessagesSql ??= @$"
SELECT MessageJson, MessageType, IdempotencyKey
FROM {_tablePrefix}_Messages
FROM {tablePrefix}_Messages
WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance AND Position >= @Position
ORDER BY Position ASC;";

Expand Down Expand Up @@ -168,7 +160,7 @@ public async Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, i

private async Task<SqlConnection> CreateConnection()
{
var conn = new SqlConnection(_connectionString);
var conn = new SqlConnection(connectionString);
await conn.OpenAsync();
return conn;
}
Expand All @@ -178,7 +170,7 @@ private async Task<SqlConnection> CreateConnection()
{
_getSuspensionStatusSql ??= @$"
SELECT Epoch, Status
FROM {_tablePrefix}
FROM {tablePrefix}
WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance";
await using var command = new SqlCommand(_getSuspensionStatusSql, connection);
command.Parameters.AddWithValue("@FlowType", storedId.Type.Value);
Expand Down

0 comments on commit 9097683

Please sign in to comment.