Skip to content

Commit

Permalink
sql locking
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgerlag committed Jun 9, 2017
1 parent 0d28331 commit cfe8dfb
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 34 deletions.
137 changes: 104 additions & 33 deletions src/providers/WorkflowCore.LockProviders.SqlServer/SqlLockProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,71 +3,142 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using WorkflowCore.Interface;
using System.Data;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading;

namespace WorkflowCore.LockProviders.SqlServer
{
public class SqlLockProvider : IDistributedLockProvider
{
private const string Prefix = "wfc";

private readonly SqlConnection _connection;
private readonly string _connectionString;
private readonly ILogger _logger;
private readonly Dictionary<string, SqlConnection> _locks = new Dictionary<string, SqlConnection>();
private readonly AutoResetEvent _mutex = new AutoResetEvent(true);

public SqlLockProvider(string connectionString, ILoggerFactory logFactory)
{
_logger = logFactory.CreateLogger<SqlLockProvider>();
var csb = new SqlConnectionStringBuilder(connectionString);
csb.Pooling = false;
csb.Pooling = true;
csb.ApplicationName = "Workflow Core Lock Manager";

_connection = new SqlConnection(csb.ToString());
_connectionString = csb.ToString();
}


public async Task<bool> AcquireLock(string Id)
{
var cmd = _connection.CreateCommand();
cmd.CommandText = "EXEC @result = sp_getapplock @Resource = @id, @LockMode = 'Exclusive', @LockOwner = 'Session'";
cmd.Parameters.AddWithValue("id", $"{Prefix}:{Id}");
var result = Convert.ToInt32(await cmd.ExecuteScalarAsync());

switch (result)
if (_mutex.WaitOne())
{
case -1:
_logger.LogDebug($"The lock request timed out for {Id}");
break;
case -2:
_logger.LogDebug($"The lock request was canceled for {Id}");
break;
case -3:
_logger.LogDebug($"The lock request was chosen as a deadlock victim for {Id}");
break;
case -999:
_logger.LogError($"Lock provider error for {Id}");
break;
}
try
{
var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
try
{
var cmd = connection.CreateCommand();
cmd.CommandText = "sp_getapplock";
cmd.CommandType = CommandType.StoredProcedure;
cmd.Parameters.AddWithValue("@Resource", $"{Prefix}:{Id}");
cmd.Parameters.AddWithValue("@LockOwner", $"Session");
cmd.Parameters.AddWithValue("@LockMode", $"Exclusive");
cmd.Parameters.AddWithValue("@LockTimeout", 0);
var returnParameter = cmd.Parameters.Add("RetVal", SqlDbType.Int);
returnParameter.Direction = ParameterDirection.ReturnValue;
await cmd.ExecuteNonQueryAsync();
var result = Convert.ToInt32(returnParameter.Value);

return (result >= 0);
switch (result)
{
case -1:
_logger.LogDebug($"The lock request timed out for {Id}");
break;
case -2:
_logger.LogDebug($"The lock request was canceled for {Id}");
break;
case -3:
_logger.LogDebug($"The lock request was chosen as a deadlock victim for {Id}");
break;
case -999:
_logger.LogError($"Lock provider error for {Id}");
break;
}
if (result >= 0)
{
_locks[Id] = connection;
return true;
}
else
{
connection.Close();
return false;
}
}
catch (Exception ex)
{
connection.Close();
throw ex;
}
}
finally
{
_mutex.Set();
}
}
return false;
}

public async Task ReleaseLock(string Id)
{
var cmd = _connection.CreateCommand();
cmd.CommandText = "EXEC @result = sp_releaseapplock @Resource = @id, @LockOwner = 'Session'";
cmd.Parameters.AddWithValue("id", $"{Prefix}:{Id}");
var result = Convert.ToInt32(await cmd.ExecuteScalarAsync());

if (result < 0)
_logger.LogError($"Unable to release lock for {Id}");
if (_mutex.WaitOne())
{
try
{
SqlConnection connection = null;
connection = _locks[Id];

if (connection == null)
return;

try
{
var cmd = connection.CreateCommand();
cmd.CommandText = "sp_releaseapplock";
cmd.CommandType = CommandType.StoredProcedure;
cmd.Parameters.AddWithValue("@Resource", $"{Prefix}:{Id}");
cmd.Parameters.AddWithValue("@LockOwner", $"Session");
var returnParameter = cmd.Parameters.Add("RetVal", SqlDbType.Int);
returnParameter.Direction = ParameterDirection.ReturnValue;

await cmd.ExecuteNonQueryAsync();
var result = Convert.ToInt32(returnParameter.Value);

if (result < 0)
_logger.LogError($"Unable to release lock for {Id}");
}
finally
{
connection.Close();
_locks.Remove(Id);
}
}
finally
{
_mutex.Set();
}
}
}

public async Task Start()
{
await _connection.OpenAsync();
{
}

public async Task Stop()
{
_connection.Close();
}
}
}
2 changes: 1 addition & 1 deletion src/samples/WorkflowCore.Sample04/EventSampleWorkflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public void Build(IWorkflowBuilder<MyDataClass> builder)
{
builder
.StartWith(context => ExecutionResult.Next())
.WaitFor("MyEvent", data => "0")
.WaitFor("MyEvent", data => "0", data => DateTime.Now)
.Output(data => data.StrValue, step => step.EventData)
.Then<CustomMessage>()
.Name("Print custom message")
Expand Down
6 changes: 6 additions & 0 deletions src/samples/WorkflowCore.Sample04/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ private static IServiceProvider ConfigureServices()
// x.UseMongoDB(@"mongodb://localhost:27017", "workflow9999");
//});

//services.AddWorkflow(x =>
//{
// x.UseSqlServer(@"Server=.\SQLEXPRESS;Database=WorkflowCore;Trusted_Connection=True;", true, true);
// x.UseSqlServerLocking(@"Server=.\SQLEXPRESS;Database=WorkflowCore;Trusted_Connection=True;");
//});

//redis = ConnectionMultiplexer.Connect("127.0.0.1");
//services.AddWorkflow(x =>
//{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\providers\WorkflowCore.LockProviders.SqlServer\WorkflowCore.LockProviders.SqlServer.csproj" />
<ProjectReference Include="..\..\providers\WorkflowCore.Persistence.PostgreSQL\WorkflowCore.Persistence.PostgreSQL.csproj" />
<ProjectReference Include="..\..\providers\WorkflowCore.Persistence.Sqlite\WorkflowCore.Persistence.Sqlite.csproj" />
<ProjectReference Include="..\..\providers\WorkflowCore.Persistence.SqlServer\WorkflowCore.Persistence.SqlServer.csproj" />
Expand Down
6 changes: 6 additions & 0 deletions src/samples/WorkflowCore.Sample13/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ private static IServiceProvider ConfigureServices()
// x.UseMongoDB(@"mongodb://localhost:27017", "workflow-test002");
//});

//services.AddWorkflow(x =>
//{
// x.UseSqlServer(@"Server=.\SQLEXPRESS;Database=WorkflowCore3;Trusted_Connection=True;", true, true);
// x.UseSqlServerLocking(@"Server=.\SQLEXPRESS;Database=WorkflowCore3;Trusted_Connection=True;");
//});

var serviceProvider = services.BuildServiceProvider();

//config logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\providers\WorkflowCore.LockProviders.SqlServer\WorkflowCore.LockProviders.SqlServer.csproj" />
<ProjectReference Include="..\..\providers\WorkflowCore.Persistence.MongoDB\WorkflowCore.Persistence.MongoDB.csproj" />
<ProjectReference Include="..\..\providers\WorkflowCore.Persistence.SqlServer\WorkflowCore.Persistence.SqlServer.csproj" />
<ProjectReference Include="..\..\providers\WorkflowCore.Providers.Azure\WorkflowCore.Providers.Azure.csproj" />
<ProjectReference Include="..\..\WorkflowCore\WorkflowCore.csproj" />
</ItemGroup>
Expand Down

0 comments on commit cfe8dfb

Please sign in to comment.