Skip to content

Commit

Permalink
Improved semaphore stores resiliency
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Dec 26, 2024
1 parent 9097683 commit 7876573
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
Expand Down Expand Up @@ -164,11 +165,21 @@ async Task(string param, Workflow workflow) =>
await @lock.DisposeAsync();
});

var scheduled1 = await firstFlow.Schedule(flowId1.Instance, "hello");
var scheduled2 = await secondFlow.Schedule(flowId2.Instance, "hello");

await firstFlowAcquiredSemaphore.WaitForRaised();
await secondFlowAcquiredSemaphore.WaitForRaised();
var firstFlowTask = Task.Run(() => firstFlow.Invoke(flowId1.Instance, "hello"));
var secondFlowTask = Task.Run(() => secondFlow.Invoke(flowId2.Instance, "hello"));

try
{
await firstFlowAcquiredSemaphore.WaitForRaised();
await secondFlowAcquiredSemaphore.WaitForRaised();
}
catch (Exception)
{
await firstFlowTask;
await secondFlowTask;

throw;
}

var scheduled3 = await thirdFlow.Schedule(flowId3.Instance, "hello");

Expand All @@ -185,9 +196,7 @@ async Task(string param, Workflow workflow) =>
queued.Any(s => s == storedId3).ShouldBeTrue();

continueFlowFlag.Raise();

await scheduled1.Completion();
await scheduled2.Completion();

await scheduled3.Completion();

queued = await store.SemaphoreStore.GetQueued("SomeGroup", "SomeInstance", count: 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ public async Task Truncate()
await command.ExecuteNonQueryAsync();
}

private string? _takeSql;
public async Task<bool> Acquire(string group, string instance, StoredId storedId, int maximumCount)
=> await Acquire(group, instance, storedId, maximumCount, depth: 0);

private string? _takeSql;
private async Task<bool> Acquire(string group, string instance, StoredId storedId, int maximumCount, int depth)
{
await using var conn = await CreateConnection();

_takeSql ??= @$"
IF (NOT EXISTS (SELECT 1 FROM {tablePrefix}_semaphores WHERE type = ? AND instance = ? AND owner = ?))
THEN
Expand All @@ -48,28 +49,41 @@ INSERT INTO {tablePrefix}_semaphores
SELECT -1;
END IF;";

var command = new MySqlCommand(_takeSql, conn)
await using var conn = await CreateConnection();
try
{
Parameters =
var command = new MySqlCommand(_takeSql, conn)
{
new() { Value = group },
new() { Value = instance },
new() { Value = storedId.Serialize() },

new() { Value = group },
new() { Value = instance },
new() { Value = storedId.Serialize() }
Parameters =
{
new() { Value = group },
new() { Value = instance },
new() { Value = storedId.Serialize() },

new() { Value = group },
new() { Value = instance },
new() { Value = storedId.Serialize() }
}
};

var position = (int?)await command.ExecuteScalarAsync();
if (position == null || position == -1)
{
var queued = await GetQueued(group, instance, maximumCount);
return queued.Any(id => id == storedId);
}
};

var position = (int?) await command.ExecuteScalarAsync();
if (position == null || position == -1)
return position < maximumCount;
}
catch (MySqlException e) when (e.Number == 1213) //deadlock found when trying to get lock; try restarting transaction
{
var queued = await GetQueued(group, instance, maximumCount);
return queued.Any(id => id == storedId);
// ReSharper disable once DisposeOnUsingVariable
await conn.DisposeAsync(); //eagerly free taken connection
if (depth == 10) throw;

await Task.Delay(Random.Shared.Next(10, 250));
return await Acquire(group, instance, storedId, maximumCount, depth + 1);
}

return position < maximumCount;
}

private string? _releaseSql;
Expand All @@ -78,8 +92,8 @@ public async Task<IReadOnlyList<StoredId>> Release(string group, string instance
await using var conn = await CreateConnection();

_releaseSql ??= @$"
DELETE FROM {tablePrefix}_semaphores
WHERE type = ? AND instance = ? AND owner = ?;
DELETE FROM {tablePrefix}_semaphores
WHERE type = ? AND instance = ? AND owner = ?;
SELECT owner
FROM {tablePrefix}_semaphores
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -33,12 +34,13 @@ public async Task Truncate()
var command = new NpgsqlCommand(_truncateSql, conn);
await command.ExecuteNonQueryAsync();
}

private string? _takeSql;

public async Task<bool> Acquire(string group, string instance, StoredId storedId, int maximumCount)
=> await Acquire(group, instance, storedId, maximumCount, depth: 0);

private string? _takeSql;
private async Task<bool> Acquire(string group, string instance, StoredId storedId, int maximumCount, int depth)
{
await using var conn = await CreateConnection();
_takeSql ??= @$"
INSERT INTO {tablePrefix}_semaphores
SELECT
Expand All @@ -48,24 +50,38 @@ INSERT INTO {tablePrefix}_semaphores
$3
WHERE NOT EXISTS (SELECT 1 FROM {tablePrefix}_semaphores WHERE type = $1 AND instance = $2 AND owner = $3)
RETURNING position;";
var command = new NpgsqlCommand(_takeSql, conn)

await using var conn = await CreateConnection();
try
{
Parameters =
var command = new NpgsqlCommand(_takeSql, conn)
{
new() { Value = group },
new() { Value = instance },
new() { Value = storedId.Serialize() }
Parameters =
{
new() { Value = group },
new() { Value = instance },
new() { Value = storedId.Serialize() }
}
};

var position = (int?)await command.ExecuteScalarAsync();
if (position == null)
{
var queued = await GetQueued(group, instance, maximumCount);
return queued.Any(id => id == storedId);
}
};

var position = (int?) await command.ExecuteScalarAsync();
if (position == null)

return position < maximumCount;
}
catch (PostgresException e) when (e.SqlState == "23505")
{
var queued = await GetQueued(group, instance, maximumCount);
return queued.Any(id => id == storedId);
// ReSharper disable once DisposeOnUsingVariable
await conn.DisposeAsync(); //eagerly free taken connection
if (depth == 10) throw;

await Task.Delay(Random.Shared.Next(10, 250));
return await Acquire(group, instance, storedId, maximumCount, depth + 1);
}

return position < maximumCount;
}

private string? _releaseSql;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -36,13 +37,16 @@ public async Task Truncate()
var command = new SqlCommand(_truncateSql, conn);
await command.ExecuteNonQueryAsync();
}

private string? _takeSql;

public async Task<bool> Acquire(string group, string instance, StoredId storedId, int maximumCount)
=> await Acquire(group, instance, storedId, maximumCount, depth: 0);

private string? _acquireSql;
private async Task<bool> Acquire(string group, string instance, StoredId storedId, int maximumCount, int depth)
{
await using var conn = await CreateConnection();

_takeSql ??= @$"
_acquireSql ??= @$"
INSERT INTO {tablePrefix}_Semaphores
OUTPUT INSERTED.Position
SELECT @Type,
Expand All @@ -56,17 +60,31 @@ SELECT 1
FROM {tablePrefix}_Semaphores
WHERE Type = @Type AND Instance = @Instance AND Owner = @Owner
);";
var command = new SqlCommand(_takeSql, conn);
command.Parameters.AddWithValue("@Type", group);
command.Parameters.AddWithValue("@Instance", instance);
command.Parameters.AddWithValue("@Owner", storedId.Serialize());

var position = await command.ExecuteScalarAsync();

if (position is null)
return (await GetQueued(group, instance, maximumCount)).Any(id => id == storedId);
try
{
var command = new SqlCommand(_acquireSql, conn);
command.Parameters.AddWithValue("@Type", group);
command.Parameters.AddWithValue("@Instance", instance);
command.Parameters.AddWithValue("@Owner", storedId.Serialize());

return (int) position < maximumCount;
var position = await command.ExecuteScalarAsync();

if (position is null)
return (await GetQueued(group, instance, maximumCount)).Any(id => id == storedId);

return (int) position < maximumCount;
}
catch (SqlException e)
{
if (depth == 10 || (e.Number != SqlError.DEADLOCK_VICTIM && e.Number != SqlError.UNIQUENESS_VIOLATION))
throw;

// ReSharper disable once DisposeOnUsingVariable
await conn.DisposeAsync();
await Task.Delay(Random.Shared.Next(50, 250));
return await Acquire(group, instance, storedId, maximumCount, depth + 1);
}
}

private string? _releaseSql;
Expand Down

0 comments on commit 7876573

Please sign in to comment.