Skip to content

Commit

Permalink
Revert back to await from continue with
Browse files Browse the repository at this point in the history
Remove extension for continueWith
Update test to ignore maxbuffer tests until we return functionality
  • Loading branch information
Jroland committed May 28, 2015
1 parent 0f03ea0 commit 5bce3c3
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 20 deletions.
13 changes: 0 additions & 13 deletions src/kafka-net/Common/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,5 @@ public static Exception ExtractException(this Task task)

return new ApplicationException("Unknown exception occured.");
}

/// <summary>
/// Extracts a concrete exception out of a Continue with result.
/// </summary>
public static void ThrowOnFault(this Task task)
{
if (task.IsFaulted == false) return;
if (task.Exception != null)
throw task.Exception.Flatten();

throw new ApplicationException("Unknown exception occured.");
}

}
}
10 changes: 3 additions & 7 deletions src/kafka-net/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Producer(IBrokerRouter brokerRouter, int maximumAsyncRequests = MaximumAs
/// <param name="timeout">Interal kafka timeout to wait for the requested level of ack to occur before returning. Defaults to 1000ms.</param>
/// <param name="codec">The codec to apply to the message collection. Defaults to none.</param>
/// <returns>List of ProduceResponses from each partition sent to or empty list if acks = 0.</returns>
public Task<List<ProduceResponse>> SendMessageAsync(string topic, IEnumerable<Message> messages, Int16 acks = 1,
public async Task<List<ProduceResponse>> SendMessageAsync(string topic, IEnumerable<Message> messages, Int16 acks = 1,
TimeSpan? timeout = null, MessageCodec codec = MessageCodec.CodecNone)
{
if (_stopToken.IsCancellationRequested)
Expand All @@ -126,15 +126,11 @@ public Task<List<ProduceResponse>> SendMessageAsync(string topic, IEnumerable<Me

_asyncCollection.AddRange(batch);

return Task.WhenAll(batch.Select(x => x.Tcs.Task))
.ContinueWith(t =>
{
t.ThrowOnFault();
await Task.WhenAll(batch.Select(x => x.Tcs.Task));

return batch.Select(topicMessage => topicMessage.Tcs.Task.Result)
return batch.Select(topicMessage => topicMessage.Tcs.Task.Result)
.Distinct()
.ToList();
});
}

/// <summary>
Expand Down
1 change: 1 addition & 0 deletions src/kafka-tests/Unit/ProducerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ public void ProducerShouldBlockWhenFullBufferReached()
}

[Test]
[Ignore("Removed the max message limit. Caused performance problems. Will find a better way.")]
public void ProducerShouldBlockEvenOnMessagesInTransit()
{
//with max buffer set below the batch size, this should cause the producer to block until batch delay time.
Expand Down

0 comments on commit 5bce3c3

Please sign in to comment.