Skip to content

Commit

Permalink
Merge pull request #33 from peppy/fix-exception-handling
Browse files Browse the repository at this point in the history
Fix exception handling when multiple items are involved
  • Loading branch information
smoogipoo authored Oct 31, 2024
2 parents 5280840 + b4e8485 commit b01f281
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 11 deletions.
35 changes: 35 additions & 0 deletions osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,41 @@ public void SendThenErrorDoesRetry()
Assert.Equal(obj, receivedObject);
}

[Fact]
public void MultipleErrorsAttachedToCorrectItems()
{
var cts = new CancellationTokenSource(10000);

var obj1 = FakeData.New();
var obj2 = FakeData.New();

bool gotCorrectExceptionForItem1 = false;
bool gotCorrectExceptionForItem2 = false;

processor.Error += (exception, item) =>
{
Assert.NotNull(exception);
Assert.Equal(exception, item.Exception);

gotCorrectExceptionForItem1 |= Equals(item.Data, obj1.Data) && exception.Message == "1";
gotCorrectExceptionForItem2 |= Equals(item.Data, obj2.Data) && exception.Message == "2";
};

processor.PushToQueue(new[] { obj1, obj2 });

processor.Received += o =>
{
if (Equals(o.Data, obj1.Data)) throw new Exception("1");
if (Equals(o.Data, obj2.Data)) throw new Exception("2");
};

processor.Run(cts.Token);

Assert.Equal(0, processor.GetQueueSize());
Assert.True(gotCorrectExceptionForItem1);
Assert.True(gotCorrectExceptionForItem2);
}

[Fact]
public void SendThenErrorForeverDoesDrop()
{
Expand Down
9 changes: 8 additions & 1 deletion osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@ protected override void ProcessResults(IEnumerable<FakeData> items)
{
foreach (var item in items)
{
Received?.Invoke(item);
try
{
Received?.Invoke(item);
}
catch (Exception e)
{
item.Exception = e;
}
}
}

Expand Down
12 changes: 11 additions & 1 deletion osu.Server.QueueProcessor/QueueItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@ namespace osu.Server.QueueProcessor
[Serializable]
public abstract class QueueItem
{
[IgnoreDataMember]
private bool failed;

/// <summary>
/// Set to <c>true</c> to mark this item is failed. This will cause it to be retried.
/// </summary>
[IgnoreDataMember]
public bool Failed { get; set; }
public bool Failed
{
get => failed || Exception != null;
set => failed = value;
}

[IgnoreDataMember]
public Exception? Exception { get; set; }

/// <summary>
/// The number of times processing this item has been retried. Handled internally by <see cref="QueueProcessor{T}"/>.
Expand Down
37 changes: 28 additions & 9 deletions osu.Server.QueueProcessor/QueueProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ public void Run(CancellationToken cancellation = default)

// individual processing should not be cancelled as we have already grabbed from the queue.
Task.Factory.StartNew(() => { ProcessResults(items); }, CancellationToken.None, TaskCreationOptions.HideScheduler, threadPool)
.ContinueWith(t =>
.ContinueWith(_ =>
{
foreach (var item in items)
{
if (t.Exception != null || item.Failed)
if (item.Failed)
{
Interlocked.Increment(ref totalErrors);

Expand All @@ -145,12 +145,18 @@ public void Run(CancellationToken cancellation = default)

Interlocked.Increment(ref consecutiveErrors);

Error?.Invoke(t.Exception, item);
try
{
Error?.Invoke(item.Exception, item);
}
catch
{
}

if (t.Exception != null)
SentrySdk.CaptureException(t.Exception);
if (item.Exception != null)
SentrySdk.CaptureException(item.Exception);

Console.WriteLine($"Error processing {item}: {t.Exception}");
Console.WriteLine($"Error processing {item}: {item.Exception}");
attemptRetry(item);
}
else
Expand Down Expand Up @@ -197,8 +203,6 @@ private void setupSentry(SentryOptions options)

private void attemptRetry(T item)
{
item.Failed = false;

if (item.TotalRetries++ < config.MaxRetries)
{
Console.WriteLine($"Re-queueing for attempt {item.TotalRetries} / {config.MaxRetries}");
Expand Down Expand Up @@ -274,11 +278,26 @@ protected virtual void ProcessResult(T item)
/// <summary>
/// Implement to process batches of items from the queue.
/// </summary>
/// <remarks>
/// In most cases, you should only need to override and implement <see cref="ProcessResult"/>.
/// Only override this if you need more efficient batch processing.
///
/// If overriding this method, you should try-catch for exceptions, and set any exception against
/// the relevant <see cref="QueueItem"/>. If this is not done, failures will not be handled correctly.</remarks>
/// <param name="items">The items to process.</param>
protected virtual void ProcessResults(IEnumerable<T> items)
{
foreach (var item in items)
ProcessResult(item);
{
try
{
ProcessResult(item);
}
catch (Exception e)
{
item.Exception = e;
}
}
}
}
}

0 comments on commit b01f281

Please sign in to comment.