Skip to content

Commit

Permalink
Ability to add a Func to inject an Exception Logger to catch inner ex…
Browse files Browse the repository at this point in the history
…ceptions within UPS
  • Loading branch information
juanpgarces committed Apr 29, 2021
1 parent ffb7562 commit 679f029
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 46 deletions.
68 changes: 48 additions & 20 deletions src/UPS/FuncMultipleManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public static class FuncMultipleManager
// Regular Operations
private static long currentCount = 0;

// Error Handling
private static Func<Exception, Task> funcExceptionLogger = null;

/// <summary>
/// Amount of Threads used to Run Tasks
/// </summary>
Expand Down Expand Up @@ -63,6 +66,14 @@ public static async Task<Guid> EnqueueAsync(Func<Task<object>> func, Func<Task<b
return referencedTask.guid;
}

public static async Task SetErrorLoggingFunction(Func<Exception, Task> myFuncExceptionLogger)
{
if (myFuncExceptionLogger != null)
{
funcExceptionLogger = myFuncExceptionLogger;
}
}

//public static async Task<ReferencedResult> GetResultIfExistsAsync(Guid guid)
//{
// // maybe have some logic tha when there is no result found then it looks for that in the exceptions?
Expand Down Expand Up @@ -109,34 +120,51 @@ private static async Task StartProcessing(string queueName)
{
await Task.Factory.StartNew(async () =>
{
Interlocked.Increment(ref currentCount);
while (!queue.IsEmpty)
try
{
queue.TryPeek(out ReferencedFunc<object> referencedTask);
if (referencedTask != null)
Interlocked.Increment(ref currentCount);
while (!queue.IsEmpty)
{
// Accounts for Tasks Not specifying a non-required Checkpoint
if (referencedTask.checkpoint == null)
{
if (queue.TryDequeue(out ReferencedFunc<object> dequeuedReferencedTask))
await ExecuteAsync(dequeuedReferencedTask);
}
else if(await (referencedTask.checkpoint?.Invoke()) == true)
try
{
if (queue.TryDequeue(out ReferencedFunc<object> dequeuedReferencedTask))
await ExecuteAsync(dequeuedReferencedTask);
queue.TryPeek(out ReferencedFunc<object> referencedTask);
if (referencedTask != null)
{
// Accounts for Tasks Not specifying a non-required Checkpoint
if (referencedTask.checkpoint == null)
{
if (queue.TryDequeue(out ReferencedFunc<object> dequeuedReferencedTask))
await ExecuteAsync(dequeuedReferencedTask);
}
else if(await (referencedTask.checkpoint?.Invoke()) == true)
{
if (queue.TryDequeue(out ReferencedFunc<object> dequeuedReferencedTask))
await ExecuteAsync(dequeuedReferencedTask);
}
else
{
break;
}
}
else
{
queue.TryDequeue(out referencedTask);
}
}
else
catch (Exception ex)
{
break;
await (funcExceptionLogger?.Invoke(ex));
}
}
else
{
queue.TryDequeue(out referencedTask);
}
}
Interlocked.Decrement(ref currentCount);
catch (Exception ex)
{
await (funcExceptionLogger?.Invoke(ex));
}
finally
{
Interlocked.Decrement(ref currentCount);
}
}, TaskCreationOptions.LongRunning);
}
}
Expand Down
75 changes: 50 additions & 25 deletions src/UPS/FuncPriorityManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,23 @@ public static class FuncPriorityManager
// Timer
private static Timer checkQueueTimer;

// Error Handling
private static Func<Exception, Task> funcExceptionLogger = null;

/// <summary>
///
/// </summary>
/// <param name="extraQueueLevels"></param>
/// <param name="maxThreads"></param>
/// <param name="period"></param>
public static void Initialize(int extraQueueLevels, int maxThreads, int period)
/// <param name="myFuncExceptionLogger"></param>
public static void Initialize(int extraQueueLevels, int maxThreads, int period, Func<Exception, Task> myFuncExceptionLogger = null)
{
if(myFuncExceptionLogger != null)
{
funcExceptionLogger = myFuncExceptionLogger;
}

if (Interlocked.CompareExchange(ref isInitiated, 0, 0) == 0)
{
FuncPriorityManager.maxThreads = maxThreads == 0 ? FuncPriorityManager.maxThreads : maxThreads;
Expand Down Expand Up @@ -141,48 +150,64 @@ private static async Task StartProcessing()
{
await Task.Factory.StartNew(async () =>
{
Interlocked.Exchange(ref isProcessing, 1);
foreach (var queue in concurrentQueues)
try
{
if (Interlocked.CompareExchange(ref currentCount, 0, 0) < maxThreads)
Interlocked.Exchange(ref isProcessing, 1);
foreach (var queue in concurrentQueues)
{
while (!queue.IsEmpty)
if (Interlocked.CompareExchange(ref currentCount, 0, 0) < maxThreads)
{
queue.TryPeek(out ReferencedFunc<object> referencedTask);
if (referencedTask != null)
while (!queue.IsEmpty)
{
// Accounts for Tasks Not specifying a non-required Checkpoint
if (referencedTask.checkpoint == null)
{
if (queue.TryDequeue(out ReferencedFunc<object> dequeuedReferencedTask))
await ExecuteAsync(dequeuedReferencedTask);
}
else if(await (referencedTask.checkpoint?.Invoke()) == true)
try
{
if (queue.TryDequeue(out ReferencedFunc<object> dequeuedReferencedTask))
await ExecuteAsync(dequeuedReferencedTask);
queue.TryPeek(out ReferencedFunc<object> referencedTask);
if (referencedTask != null)
{
// Accounts for Tasks Not specifying a non-required Checkpoint
if (referencedTask.checkpoint == null)
{
if (queue.TryDequeue(out ReferencedFunc<object> dequeuedReferencedTask))
await ExecuteAsync(dequeuedReferencedTask);
}
else if(await (referencedTask.checkpoint?.Invoke()) == true)
{
if (queue.TryDequeue(out ReferencedFunc<object> dequeuedReferencedTask))
await ExecuteAsync(dequeuedReferencedTask);
}
else
{
break;
}
}
else
{
queue.TryDequeue(out referencedTask);
}
}
else
catch (Exception ex)
{
break;
await (funcExceptionLogger?.Invoke(ex));
}
}
else
{
queue.TryDequeue(out referencedTask);
}
}
}
}
}
catch (Exception ex)
{
await (funcExceptionLogger?.Invoke(ex));
}
finally
{
Interlocked.Exchange(ref isProcessing, 0);
}
Interlocked.Exchange(ref isProcessing, 0);
}, TaskCreationOptions.LongRunning);
}
}

private static async Task ExecuteAsync(ReferencedFunc<object> referencedTask)
{
await (referencedTask?.func?.Invoke());
//AddReferencedResult(new ReferencedResult() { guid = referencedTask.guid, result = result });
}

private static void AddReferencedResult(ReferencedResult referencedResult)
Expand Down
2 changes: 1 addition & 1 deletion src/UPS/UPS.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<RepositoryUrl>https://github.com/juanp3053/UPS</RepositoryUrl>
<RepositoryType>Git</RepositoryType>
<Description>Asynchronous and Resiliiant library to execute Functions safely</Description>
<Version>0.0.12</Version>
<Version>0.1.0</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<PackageId>juanp3053.FuncManager</PackageId>
Expand Down

0 comments on commit 679f029

Please sign in to comment.