Skip to content

Commit

Permalink
Change how exceptions are handled to better relate them to individual…
Browse files Browse the repository at this point in the history
… items
  • Loading branch information
peppy committed Oct 31, 2024
1 parent 81fc6e4 commit 563a5ad
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 11 deletions.
1 change: 1 addition & 0 deletions osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public void MultipleErrorsAttachedToCorrectItems()
processor.Error += (exception, item) =>
{
Assert.NotNull(exception);
Assert.Equal(exception, item.Exception);

gotCorrectExceptionForItem1 |= Equals(item.Data, obj1.Data) && exception.Message == "1";
gotCorrectExceptionForItem2 |= Equals(item.Data, obj2.Data) && exception.Message == "2";
Expand Down
9 changes: 8 additions & 1 deletion osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@ protected override void ProcessResults(IEnumerable<FakeData> items)
{
foreach (var item in items)
{
Received?.Invoke(item);
try
{
Received?.Invoke(item);
}
catch (Exception e)
{
item.Exception = e;
}
}
}

Expand Down
12 changes: 11 additions & 1 deletion osu.Server.QueueProcessor/QueueItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@ namespace osu.Server.QueueProcessor
[Serializable]
public abstract class QueueItem
{
[IgnoreDataMember]
private bool failed;

/// <summary>
/// Set to <c>true</c> to mark this item is failed. This will cause it to be retried.
/// </summary>
[IgnoreDataMember]
public bool Failed { get; set; }
public bool Failed
{
get => failed || Exception != null;
set => failed = value;
}

[IgnoreDataMember]
public Exception? Exception { get; set; }

/// <summary>
/// The number of times processing this item has been retried. Handled internally by <see cref="QueueProcessor{T}"/>.
Expand Down
31 changes: 22 additions & 9 deletions osu.Server.QueueProcessor/QueueProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ public void Run(CancellationToken cancellation = default)

// individual processing should not be cancelled as we have already grabbed from the queue.
Task.Factory.StartNew(() => { ProcessResults(items); }, CancellationToken.None, TaskCreationOptions.HideScheduler, threadPool)
.ContinueWith(t =>
.ContinueWith(_ =>
{
foreach (var item in items)
{
if (t.Exception != null || item.Failed)
if (item.Failed)
{
Interlocked.Increment(ref totalErrors);

Expand All @@ -145,12 +145,12 @@ public void Run(CancellationToken cancellation = default)

Interlocked.Increment(ref consecutiveErrors);

Error?.Invoke(t.Exception, item);
Error?.Invoke(item.Exception, item);

if (t.Exception != null)
SentrySdk.CaptureException(t.Exception);
if (item.Exception != null)
SentrySdk.CaptureException(item.Exception);

Console.WriteLine($"Error processing {item}: {t.Exception}");
Console.WriteLine($"Error processing {item}: {item.Exception}");
attemptRetry(item);
}
else
Expand Down Expand Up @@ -197,8 +197,6 @@ private void setupSentry(SentryOptions options)

private void attemptRetry(T item)
{
item.Failed = false;

if (item.TotalRetries++ < config.MaxRetries)
{
Console.WriteLine($"Re-queueing for attempt {item.TotalRetries} / {config.MaxRetries}");
Expand Down Expand Up @@ -274,11 +272,26 @@ protected virtual void ProcessResult(T item)
/// <summary>
/// Implement to process batches of items from the queue.
/// </summary>
/// <remarks>
/// In most cases, you should only need to override and implement <see cref="ProcessResult"/>.
/// Only override this if you need more efficient batch processing.
///
/// If overriding this method, you should try-catch for exceptions, and set any exception against
/// the relevant <see cref="QueueItem"/>. If this is not done, failures will not be handled correctly.</remarks>
/// <param name="items">The items to process.</param>
protected virtual void ProcessResults(IEnumerable<T> items)
{
foreach (var item in items)
ProcessResult(item);
{
try
{
ProcessResult(item);
}
catch (Exception e)
{
item.Exception = e;
}
}
}
}
}

0 comments on commit 563a5ad

Please sign in to comment.