diff --git a/src/UPS/FuncMultipleManager.cs b/src/UPS/FuncMultipleManager.cs index 4fa46a8..3a51e9f 100644 --- a/src/UPS/FuncMultipleManager.cs +++ b/src/UPS/FuncMultipleManager.cs @@ -25,6 +25,9 @@ public static class FuncMultipleManager // Regular Operations private static long currentCount = 0; + // Error Handling + private static Func funcExceptionLogger = null; + /// /// Amount of Threads used to Run Tasks /// @@ -63,6 +66,14 @@ public static async Task EnqueueAsync(Func> func, Func myFuncExceptionLogger) + { + if (myFuncExceptionLogger != null) + { + funcExceptionLogger = myFuncExceptionLogger; + } + } + //public static async Task GetResultIfExistsAsync(Guid guid) //{ // // maybe have some logic tha when there is no result found then it looks for that in the exceptions? @@ -109,34 +120,51 @@ private static async Task StartProcessing(string queueName) { await Task.Factory.StartNew(async () => { - Interlocked.Increment(ref currentCount); - while (!queue.IsEmpty) + try { - queue.TryPeek(out ReferencedFunc referencedTask); - if (referencedTask != null) + Interlocked.Increment(ref currentCount); + while (!queue.IsEmpty) { - // Accounts for Tasks Not specifying a non-required Checkpoint - if (referencedTask.checkpoint == null) - { - if (queue.TryDequeue(out ReferencedFunc dequeuedReferencedTask)) - await ExecuteAsync(dequeuedReferencedTask); - } - else if(await (referencedTask.checkpoint?.Invoke()) == true) + try { - if (queue.TryDequeue(out ReferencedFunc dequeuedReferencedTask)) - await ExecuteAsync(dequeuedReferencedTask); + queue.TryPeek(out ReferencedFunc referencedTask); + if (referencedTask != null) + { + // Accounts for Tasks Not specifying a non-required Checkpoint + if (referencedTask.checkpoint == null) + { + if (queue.TryDequeue(out ReferencedFunc dequeuedReferencedTask)) + await ExecuteAsync(dequeuedReferencedTask); + } + else if(await (referencedTask.checkpoint?.Invoke()) == true) + { + if (queue.TryDequeue(out ReferencedFunc dequeuedReferencedTask)) + await ExecuteAsync(dequeuedReferencedTask); + } + else + { + break; + } + } + else + { + queue.TryDequeue(out referencedTask); + } } - else + catch (Exception ex) { - break; + await (funcExceptionLogger?.Invoke(ex)); } } - else - { - queue.TryDequeue(out referencedTask); - } } - Interlocked.Decrement(ref currentCount); + catch (Exception ex) + { + await (funcExceptionLogger?.Invoke(ex)); + } + finally + { + Interlocked.Decrement(ref currentCount); + } }, TaskCreationOptions.LongRunning); } } diff --git a/src/UPS/FuncPriorityManager.cs b/src/UPS/FuncPriorityManager.cs index 0f616b5..ccfa8c3 100644 --- a/src/UPS/FuncPriorityManager.cs +++ b/src/UPS/FuncPriorityManager.cs @@ -35,14 +35,23 @@ public static class FuncPriorityManager // Timer private static Timer checkQueueTimer; + // Error Handling + private static Func funcExceptionLogger = null; + /// /// /// /// /// /// - public static void Initialize(int extraQueueLevels, int maxThreads, int period) + /// + public static void Initialize(int extraQueueLevels, int maxThreads, int period, Func myFuncExceptionLogger = null) { + if(myFuncExceptionLogger != null) + { + funcExceptionLogger = myFuncExceptionLogger; + } + if (Interlocked.CompareExchange(ref isInitiated, 0, 0) == 0) { FuncPriorityManager.maxThreads = maxThreads == 0 ? FuncPriorityManager.maxThreads : maxThreads; @@ -141,40 +150,57 @@ private static async Task StartProcessing() { await Task.Factory.StartNew(async () => { - Interlocked.Exchange(ref isProcessing, 1); - foreach (var queue in concurrentQueues) + try { - if (Interlocked.CompareExchange(ref currentCount, 0, 0) < maxThreads) + Interlocked.Exchange(ref isProcessing, 1); + foreach (var queue in concurrentQueues) { - while (!queue.IsEmpty) + if (Interlocked.CompareExchange(ref currentCount, 0, 0) < maxThreads) { - queue.TryPeek(out ReferencedFunc referencedTask); - if (referencedTask != null) + while (!queue.IsEmpty) { - // Accounts for Tasks Not specifying a non-required Checkpoint - if (referencedTask.checkpoint == null) - { - if (queue.TryDequeue(out ReferencedFunc dequeuedReferencedTask)) - await ExecuteAsync(dequeuedReferencedTask); - } - else if(await (referencedTask.checkpoint?.Invoke()) == true) + try { - if (queue.TryDequeue(out ReferencedFunc dequeuedReferencedTask)) - await ExecuteAsync(dequeuedReferencedTask); + queue.TryPeek(out ReferencedFunc referencedTask); + if (referencedTask != null) + { + // Accounts for Tasks Not specifying a non-required Checkpoint + if (referencedTask.checkpoint == null) + { + if (queue.TryDequeue(out ReferencedFunc dequeuedReferencedTask)) + await ExecuteAsync(dequeuedReferencedTask); + } + else if(await (referencedTask.checkpoint?.Invoke()) == true) + { + if (queue.TryDequeue(out ReferencedFunc dequeuedReferencedTask)) + await ExecuteAsync(dequeuedReferencedTask); + } + else + { + break; + } + } + else + { + queue.TryDequeue(out referencedTask); + } } - else + catch (Exception ex) { - break; + await (funcExceptionLogger?.Invoke(ex)); } } - else - { - queue.TryDequeue(out referencedTask); - } } - } + } + } + catch (Exception ex) + { + await (funcExceptionLogger?.Invoke(ex)); + } + finally + { + Interlocked.Exchange(ref isProcessing, 0); } - Interlocked.Exchange(ref isProcessing, 0); }, TaskCreationOptions.LongRunning); } } @@ -182,7 +208,6 @@ await Task.Factory.StartNew(async () => private static async Task ExecuteAsync(ReferencedFunc referencedTask) { await (referencedTask?.func?.Invoke()); - //AddReferencedResult(new ReferencedResult() { guid = referencedTask.guid, result = result }); } private static void AddReferencedResult(ReferencedResult referencedResult) diff --git a/src/UPS/UPS.csproj b/src/UPS/UPS.csproj index f0f3994..9e17396 100644 --- a/src/UPS/UPS.csproj +++ b/src/UPS/UPS.csproj @@ -8,7 +8,7 @@ https://github.com/juanp3053/UPS Git Asynchronous and Resiliiant library to execute Functions safely - 0.0.12 + 0.1.0 $(Version) $(Version) juanp3053.FuncManager