diff --git a/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs b/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs index d777fd8..71a4381 100644 --- a/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs +++ b/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs @@ -195,6 +195,41 @@ public void SendThenErrorDoesRetry() Assert.Equal(obj, receivedObject); } + [Fact] + public void MultipleErrorsAttachedToCorrectItems() + { + var cts = new CancellationTokenSource(10000); + + var obj1 = FakeData.New(); + var obj2 = FakeData.New(); + + bool gotCorrectExceptionForItem1 = false; + bool gotCorrectExceptionForItem2 = false; + + processor.Error += (exception, item) => + { + Assert.NotNull(exception); + Assert.Equal(exception, item.Exception); + + gotCorrectExceptionForItem1 |= Equals(item.Data, obj1.Data) && exception.Message == "1"; + gotCorrectExceptionForItem2 |= Equals(item.Data, obj2.Data) && exception.Message == "2"; + }; + + processor.PushToQueue(new[] { obj1, obj2 }); + + processor.Received += o => + { + if (Equals(o.Data, obj1.Data)) throw new Exception("1"); + if (Equals(o.Data, obj2.Data)) throw new Exception("2"); + }; + + processor.Run(cts.Token); + + Assert.Equal(0, processor.GetQueueSize()); + Assert.True(gotCorrectExceptionForItem1); + Assert.True(gotCorrectExceptionForItem2); + } + [Fact] public void SendThenErrorForeverDoesDrop() { diff --git a/osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs b/osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs index 63d5cc9..db9c80b 100644 --- a/osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs +++ b/osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs @@ -21,7 +21,14 @@ protected override void ProcessResults(IEnumerable items) { foreach (var item in items) { - Received?.Invoke(item); + try + { + Received?.Invoke(item); + } + catch (Exception e) + { + item.Exception = e; + } } } diff --git a/osu.Server.QueueProcessor/QueueItem.cs b/osu.Server.QueueProcessor/QueueItem.cs index 9e622fb..823a28e 100644 --- a/osu.Server.QueueProcessor/QueueItem.cs +++ b/osu.Server.QueueProcessor/QueueItem.cs @@ -12,11 +12,21 @@ namespace osu.Server.QueueProcessor [Serializable] public abstract class QueueItem { + [IgnoreDataMember] + private bool failed; + /// /// Set to true to mark this item is failed. This will cause it to be retried. /// [IgnoreDataMember] - public bool Failed { get; set; } + public bool Failed + { + get => failed || Exception != null; + set => failed = value; + } + + [IgnoreDataMember] + public Exception? Exception { get; set; } /// /// The number of times processing this item has been retried. Handled internally by . diff --git a/osu.Server.QueueProcessor/QueueProcessor.cs b/osu.Server.QueueProcessor/QueueProcessor.cs index 8fd2df0..4cfe21c 100644 --- a/osu.Server.QueueProcessor/QueueProcessor.cs +++ b/osu.Server.QueueProcessor/QueueProcessor.cs @@ -132,11 +132,11 @@ public void Run(CancellationToken cancellation = default) // individual processing should not be cancelled as we have already grabbed from the queue. Task.Factory.StartNew(() => { ProcessResults(items); }, CancellationToken.None, TaskCreationOptions.HideScheduler, threadPool) - .ContinueWith(t => + .ContinueWith(_ => { foreach (var item in items) { - if (t.Exception != null || item.Failed) + if (item.Failed) { Interlocked.Increment(ref totalErrors); @@ -145,12 +145,18 @@ public void Run(CancellationToken cancellation = default) Interlocked.Increment(ref consecutiveErrors); - Error?.Invoke(t.Exception, item); + try + { + Error?.Invoke(item.Exception, item); + } + catch + { + } - if (t.Exception != null) - SentrySdk.CaptureException(t.Exception); + if (item.Exception != null) + SentrySdk.CaptureException(item.Exception); - Console.WriteLine($"Error processing {item}: {t.Exception}"); + Console.WriteLine($"Error processing {item}: {item.Exception}"); attemptRetry(item); } else @@ -197,8 +203,6 @@ private void setupSentry(SentryOptions options) private void attemptRetry(T item) { - item.Failed = false; - if (item.TotalRetries++ < config.MaxRetries) { Console.WriteLine($"Re-queueing for attempt {item.TotalRetries} / {config.MaxRetries}"); @@ -274,11 +278,26 @@ protected virtual void ProcessResult(T item) /// /// Implement to process batches of items from the queue. /// + /// + /// In most cases, you should only need to override and implement . + /// Only override this if you need more efficient batch processing. + /// + /// If overriding this method, you should try-catch for exceptions, and set any exception against + /// the relevant . If this is not done, failures will not be handled correctly. /// The items to process. protected virtual void ProcessResults(IEnumerable items) { foreach (var item in items) - ProcessResult(item); + { + try + { + ProcessResult(item); + } + catch (Exception e) + { + item.Exception = e; + } + } } } }