From 7876573da2fa15334b4b500a86e5689f0376db34 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Thu, 26 Dec 2024 09:04:14 +0100 Subject: [PATCH] Improved semaphore stores resiliency --- .../RFunctionTests/SemaphoreTests.cs | 25 ++++++--- .../MariaDbSemaphoreStore.cs | 56 ++++++++++++------- ...reStore.cs => PostgreSqlSemaphoreStore.cs} | 48 ++++++++++------ .../SqlServerSemaphoreStore.cs | 42 ++++++++++---- 4 files changed, 114 insertions(+), 57 deletions(-) rename Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/{PostgreSqlCemaphoreStore.cs => PostgreSqlSemaphoreStore.cs} (79%) diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/RFunctionTests/SemaphoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/RFunctionTests/SemaphoreTests.cs index 38e387f1..5a5be888 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/RFunctionTests/SemaphoreTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/RFunctionTests/SemaphoreTests.cs @@ -1,3 +1,4 @@ +using System; using System.Linq; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; @@ -164,11 +165,21 @@ async Task(string param, Workflow workflow) => await @lock.DisposeAsync(); }); - var scheduled1 = await firstFlow.Schedule(flowId1.Instance, "hello"); - var scheduled2 = await secondFlow.Schedule(flowId2.Instance, "hello"); - - await firstFlowAcquiredSemaphore.WaitForRaised(); - await secondFlowAcquiredSemaphore.WaitForRaised(); + var firstFlowTask = Task.Run(() => firstFlow.Invoke(flowId1.Instance, "hello")); + var secondFlowTask = Task.Run(() => secondFlow.Invoke(flowId2.Instance, "hello")); + + try + { + await firstFlowAcquiredSemaphore.WaitForRaised(); + await secondFlowAcquiredSemaphore.WaitForRaised(); + } + catch (Exception) + { + await firstFlowTask; + await secondFlowTask; + + throw; + } var scheduled3 = await thirdFlow.Schedule(flowId3.Instance, "hello"); @@ -185,9 +196,7 @@ async Task(string param, Workflow workflow) => queued.Any(s => s == storedId3).ShouldBeTrue(); continueFlowFlag.Raise(); - - await scheduled1.Completion(); - await scheduled2.Completion(); + await scheduled3.Completion(); queued = await store.SemaphoreStore.GetQueued("SomeGroup", "SomeInstance", count: 10); diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbSemaphoreStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbSemaphoreStore.cs index 68771a70..b8cd1e66 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbSemaphoreStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbSemaphoreStore.cs @@ -31,11 +31,12 @@ public async Task Truncate() await command.ExecuteNonQueryAsync(); } - private string? _takeSql; public async Task Acquire(string group, string instance, StoredId storedId, int maximumCount) + => await Acquire(group, instance, storedId, maximumCount, depth: 0); + + private string? _takeSql; + private async Task Acquire(string group, string instance, StoredId storedId, int maximumCount, int depth) { - await using var conn = await CreateConnection(); - _takeSql ??= @$" IF (NOT EXISTS (SELECT 1 FROM {tablePrefix}_semaphores WHERE type = ? AND instance = ? AND owner = ?)) THEN @@ -48,28 +49,41 @@ INSERT INTO {tablePrefix}_semaphores SELECT -1; END IF;"; - var command = new MySqlCommand(_takeSql, conn) + await using var conn = await CreateConnection(); + try { - Parameters = + var command = new MySqlCommand(_takeSql, conn) { - new() { Value = group }, - new() { Value = instance }, - new() { Value = storedId.Serialize() }, - - new() { Value = group }, - new() { Value = instance }, - new() { Value = storedId.Serialize() } + Parameters = + { + new() { Value = group }, + new() { Value = instance }, + new() { Value = storedId.Serialize() }, + + new() { Value = group }, + new() { Value = instance }, + new() { Value = storedId.Serialize() } + } + }; + + var position = (int?)await command.ExecuteScalarAsync(); + if (position == null || position == -1) + { + var queued = await GetQueued(group, instance, maximumCount); + return queued.Any(id => id == storedId); } - }; - var position = (int?) await command.ExecuteScalarAsync(); - if (position == null || position == -1) + return position < maximumCount; + } + catch (MySqlException e) when (e.Number == 1213) //deadlock found when trying to get lock; try restarting transaction { - var queued = await GetQueued(group, instance, maximumCount); - return queued.Any(id => id == storedId); + // ReSharper disable once DisposeOnUsingVariable + await conn.DisposeAsync(); //eagerly free taken connection + if (depth == 10) throw; + + await Task.Delay(Random.Shared.Next(10, 250)); + return await Acquire(group, instance, storedId, maximumCount, depth + 1); } - - return position < maximumCount; } private string? _releaseSql; @@ -78,8 +92,8 @@ public async Task> Release(string group, string instance await using var conn = await CreateConnection(); _releaseSql ??= @$" - DELETE FROM {tablePrefix}_semaphores - WHERE type = ? AND instance = ? AND owner = ?; + DELETE FROM {tablePrefix}_semaphores + WHERE type = ? AND instance = ? AND owner = ?; SELECT owner FROM {tablePrefix}_semaphores diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlCemaphoreStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlSemaphoreStore.cs similarity index 79% rename from Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlCemaphoreStore.cs rename to Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlSemaphoreStore.cs index 55273865..7d8beef0 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlCemaphoreStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlSemaphoreStore.cs @@ -1,3 +1,4 @@ +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -33,12 +34,13 @@ public async Task Truncate() var command = new NpgsqlCommand(_truncateSql, conn); await command.ExecuteNonQueryAsync(); } - - private string? _takeSql; public async Task Acquire(string group, string instance, StoredId storedId, int maximumCount) + => await Acquire(group, instance, storedId, maximumCount, depth: 0); + + private string? _takeSql; + private async Task Acquire(string group, string instance, StoredId storedId, int maximumCount, int depth) { - await using var conn = await CreateConnection(); _takeSql ??= @$" INSERT INTO {tablePrefix}_semaphores SELECT @@ -48,24 +50,38 @@ INSERT INTO {tablePrefix}_semaphores $3 WHERE NOT EXISTS (SELECT 1 FROM {tablePrefix}_semaphores WHERE type = $1 AND instance = $2 AND owner = $3) RETURNING position;"; - var command = new NpgsqlCommand(_takeSql, conn) + + await using var conn = await CreateConnection(); + try { - Parameters = + var command = new NpgsqlCommand(_takeSql, conn) { - new() { Value = group }, - new() { Value = instance }, - new() { Value = storedId.Serialize() } + Parameters = + { + new() { Value = group }, + new() { Value = instance }, + new() { Value = storedId.Serialize() } + } + }; + + var position = (int?)await command.ExecuteScalarAsync(); + if (position == null) + { + var queued = await GetQueued(group, instance, maximumCount); + return queued.Any(id => id == storedId); } - }; - - var position = (int?) await command.ExecuteScalarAsync(); - if (position == null) + + return position < maximumCount; + } + catch (PostgresException e) when (e.SqlState == "23505") { - var queued = await GetQueued(group, instance, maximumCount); - return queued.Any(id => id == storedId); + // ReSharper disable once DisposeOnUsingVariable + await conn.DisposeAsync(); //eagerly free taken connection + if (depth == 10) throw; + + await Task.Delay(Random.Shared.Next(10, 250)); + return await Acquire(group, instance, storedId, maximumCount, depth + 1); } - - return position < maximumCount; } private string? _releaseSql; diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerSemaphoreStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerSemaphoreStore.cs index bfae53da..186b1892 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerSemaphoreStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerSemaphoreStore.cs @@ -1,3 +1,4 @@ +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -36,13 +37,16 @@ public async Task Truncate() var command = new SqlCommand(_truncateSql, conn); await command.ExecuteNonQueryAsync(); } - - private string? _takeSql; + public async Task Acquire(string group, string instance, StoredId storedId, int maximumCount) + => await Acquire(group, instance, storedId, maximumCount, depth: 0); + + private string? _acquireSql; + private async Task Acquire(string group, string instance, StoredId storedId, int maximumCount, int depth) { await using var conn = await CreateConnection(); - _takeSql ??= @$" + _acquireSql ??= @$" INSERT INTO {tablePrefix}_Semaphores OUTPUT INSERTED.Position SELECT @Type, @@ -56,17 +60,31 @@ SELECT 1 FROM {tablePrefix}_Semaphores WHERE Type = @Type AND Instance = @Instance AND Owner = @Owner );"; - var command = new SqlCommand(_takeSql, conn); - command.Parameters.AddWithValue("@Type", group); - command.Parameters.AddWithValue("@Instance", instance); - command.Parameters.AddWithValue("@Owner", storedId.Serialize()); - - var position = await command.ExecuteScalarAsync(); - if (position is null) - return (await GetQueued(group, instance, maximumCount)).Any(id => id == storedId); + try + { + var command = new SqlCommand(_acquireSql, conn); + command.Parameters.AddWithValue("@Type", group); + command.Parameters.AddWithValue("@Instance", instance); + command.Parameters.AddWithValue("@Owner", storedId.Serialize()); - return (int) position < maximumCount; + var position = await command.ExecuteScalarAsync(); + + if (position is null) + return (await GetQueued(group, instance, maximumCount)).Any(id => id == storedId); + + return (int) position < maximumCount; + } + catch (SqlException e) + { + if (depth == 10 || (e.Number != SqlError.DEADLOCK_VICTIM && e.Number != SqlError.UNIQUENESS_VIOLATION)) + throw; + + // ReSharper disable once DisposeOnUsingVariable + await conn.DisposeAsync(); + await Task.Delay(Random.Shared.Next(50, 250)); + return await Acquire(group, instance, storedId, maximumCount, depth + 1); + } } private string? _releaseSql;