Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't fail closeFuture when an error occurs on closing #487

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
"get_100000_headers_canonical_form_trimming_whitespace_from_long_string": 300050,
"get_100000_headers_canonical_form_trimming_whitespace_from_short_string": 200050,
"hpack_decoding": 5050,
"stream_teardown_100_concurrent": 262550,
"stream_teardown_100_concurrent_inline": 261650
"stream_teardown_100_concurrent": 252400,
"stream_teardown_100_concurrent_inline": 252400
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
"get_100000_headers_canonical_form_trimming_whitespace_from_long_string": 300050,
"get_100000_headers_canonical_form_trimming_whitespace_from_short_string": 200050,
"hpack_decoding": 5050,
"stream_teardown_100_concurrent": 262550,
"stream_teardown_100_concurrent_inline": 261650
"stream_teardown_100_concurrent": 252400,
"stream_teardown_100_concurrent_inline": 252400
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
"get_100000_headers_canonical_form_trimming_whitespace_from_long_string": 300050,
"get_100000_headers_canonical_form_trimming_whitespace_from_short_string": 200050,
"hpack_decoding": 5050,
"stream_teardown_100_concurrent": 262450,
"stream_teardown_100_concurrent_inline": 261750
"stream_teardown_100_concurrent": 252450,
"stream_teardown_100_concurrent_inline": 251550
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
"get_100000_headers_canonical_form_trimming_whitespace_from_long_string": 300050,
"get_100000_headers_canonical_form_trimming_whitespace_from_short_string": 200050,
"hpack_decoding": 5050,
"stream_teardown_100_concurrent": 262450,
"stream_teardown_100_concurrent_inline": 261750
"stream_teardown_100_concurrent": 252450,
"stream_teardown_100_concurrent_inline": 251550
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
"get_100000_headers_canonical_form_trimming_whitespace_from_long_string": 300050,
"get_100000_headers_canonical_form_trimming_whitespace_from_short_string": 200050,
"hpack_decoding": 5050,
"stream_teardown_100_concurrent": 262450,
"stream_teardown_100_concurrent_inline": 261750
"stream_teardown_100_concurrent": 252450,
"stream_teardown_100_concurrent_inline": 251550
}
6 changes: 3 additions & 3 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore, @unchecked Sendable {

self.eventLoop.execute {
self.removeHandlers(pipeline: self.pipeline)
self.closePromise.succeed(())
self.closePromise.succeed()
if let streamID = self.streamID {
self.multiplexer.streamClosed(id: streamID)
} else {
Expand All @@ -694,14 +694,14 @@ final class HTTP2StreamChannel: Channel, ChannelCore, @unchecked Sendable {
self.failPendingWrites(error: error)
if let promise = self.pendingClosePromise {
self.pendingClosePromise = nil
promise.fail(error)
promise.succeed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is what was agreed: IIRC the promise from close(mode:promise:) (i.e. pendingClosePromise) can fail to provide diagnostic information. It's the closeFuture (i.e. part of the Channel API) which mustn't fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with what @glbrntt said here. This means we need to change the implementation of executeThenClose to discard the error from close and instead add a call to closeFuture.get()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And update the docs for Channel.closeFuture

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh okay, I misunderstood this. The one thing I found weird when implementing this (and why I decided to not fail this promise either) is that calling channel.close() will return a failed promise (since it creates one and passes it to channel.close(promise: newPromise) under the hood), which I personally find slightly confusing. But if you're okay with this behaviour I can just be clear in the docs about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's expected. Roughly speaking:

  • channel.close(promise:) (and variants of, i.e. channel.close()) are "close the channel and tell me how it closed" (cleanly/uncleanly)
  • channel.closeFuture is "tell me when the channel is closed"

}
self.pipeline.fireErrorCaught(error)
self.pipeline.fireChannelInactive()

self.eventLoop.execute {
self.removeHandlers(pipeline: self.pipeline)
self.closePromise.fail(error)
self.closePromise.succeed()
if let streamID = self.streamID {
self.multiplexer.streamClosed(id: streamID)
} else {
Expand Down
50 changes: 34 additions & 16 deletions Tests/NIOHTTP2Tests/ConfiguringPipelineInlineMultiplexerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
)
)

clientMultiplexer.createStreamChannel(promise: nil) { channel in
let errorHandler = ErrorEncounteredHandler()
let streamChannelPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self)
clientMultiplexer.createStreamChannel(promise: streamChannelPromise) { channel in
try? channel.pipeline.addHandler(errorHandler).wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stream channel initializer will be executed on the event-loop; you can't wait() while on an EL. You need to use an alternative API, like the sync operations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one got marked as resolved but wasn't actually resolved.

FWIW I also don't think it's necessary for all of these tests to check this behaviour (i.e. the stream closed error gets fired down the pipeline); we end up with a much larger diff than necessary and harder to maintain tests. I'd prefer we had a separate test which asserts the specific behaviour.

Copy link
Contributor Author

@gjcairo gjcairo Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one got marked as resolved but wasn't actually resolved.

Ah, sorry I fixed it in a bunch of places but missed a few others.

FWIW I also don't think it's necessary for all of these tests to check this behaviour [..] I'd prefer we had a separate test which asserts the specific behaviour.

We were already performing assertions on the close promise everywhere - you could use the same argument there. I thought that it was more consistent if we are going to assert something on the close promise that we also check the right behaviour is seen for the closeFuture and the errors being thrown downstream. But I can revert those changes if you disagree and add a single test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, I saw the first one and assumed they'd all been missed.

I'm okay not reverting it, the work's been done now. I get the following the existing pattern argument; I just think we should be mindful when we make changes like this to keep tests maintainable. This package is bad enough as it is as most tests have about three different versions because of the different multiplexing APIs...

channel.writeAndFlush(reqFrame.payload).whenComplete { _ in channel.close(promise: requestPromise) }
return channel.eventLoop.makeSucceededFuture(())
}
Expand All @@ -82,9 +85,10 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
self.interactInMemory(self.clientChannel, self.serverChannel)
(self.clientChannel.eventLoop as! EmbeddedEventLoop).run()

XCTAssertThrowsError(try requestPromise.futureResult.wait()) { error in
XCTAssertTrue(error is NIOHTTP2Errors.StreamClosed)
}
let streamChannel = try XCTUnwrap(streamChannelPromise.futureResult.wait())
XCTAssertNoThrow(try streamChannel.closeFuture.wait())
XCTAssertNoThrow(try requestPromise.futureResult.wait())
XCTAssertTrue(errorHandler.encounteredError is NIOHTTP2Errors.StreamClosed)

// We should have received a HEADERS and a RST_STREAM frame.
// The RST_STREAM frame is from closing an incomplete stream on the client side.
Expand Down Expand Up @@ -133,7 +137,10 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
)
)

clientMultiplexer.createStreamChannel(promise: nil) { channel in
let errorHandler = ErrorEncounteredHandler()
let streamChannelPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self)
clientMultiplexer.createStreamChannel(promise: streamChannelPromise) { channel in
try? channel.pipeline.addHandler(errorHandler).wait()
channel.writeAndFlush(reqFrame.payload).whenComplete { _ in channel.close(promise: requestPromise) }
return channel.eventLoop.makeSucceededFuture(())
}
Expand All @@ -144,9 +151,10 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
self.interactInMemory(self.clientChannel, self.serverChannel)
(self.clientChannel.eventLoop as! EmbeddedEventLoop).run()

XCTAssertThrowsError(try requestPromise.futureResult.wait()) { error in
XCTAssertTrue(error is NIOHTTP2Errors.StreamClosed)
}
let streamChannel = try XCTUnwrap(streamChannelPromise.futureResult.wait())
XCTAssertNoThrow(try streamChannel.closeFuture.wait())
XCTAssertNoThrow(try requestPromise.futureResult.wait())
XCTAssertTrue(errorHandler.encounteredError is NIOHTTP2Errors.StreamClosed)

// We should have received a HEADERS and a RST_STREAM frame.
// The RST_STREAM frame is from closing an incomplete stream on the client side.
Expand Down Expand Up @@ -466,7 +474,10 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
)
)

clientMultiplexer.createStreamChannel(promise: nil) { channel in
let errorHandler = ErrorEncounteredHandler()
let streamChannelPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self)
clientMultiplexer.createStreamChannel(promise: streamChannelPromise) { channel in
try? channel.pipeline.addHandler(errorHandler).wait()
channel.writeAndFlush(reqFrame.payload).whenComplete { _ in channel.close(promise: requestPromise) }
return channel.eventLoop.makeSucceededFuture(())
}
Expand All @@ -476,9 +487,11 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
(self.clientChannel.eventLoop as! EmbeddedEventLoop).run()
self.interactInMemory(self.clientChannel, self.serverChannel)
(self.clientChannel.eventLoop as! EmbeddedEventLoop).run()
XCTAssertThrowsError(try requestPromise.futureResult.wait()) { error in
XCTAssertTrue(error is NIOHTTP2Errors.StreamClosed)
}

let streamChannel = try XCTUnwrap(streamChannelPromise.futureResult.wait())
XCTAssertNoThrow(try streamChannel.closeFuture.wait())
XCTAssertNoThrow(try requestPromise.futureResult.wait())
XCTAssertTrue(errorHandler.encounteredError is NIOHTTP2Errors.StreamClosed)

// Assert that the user-provided handler received the
// HTTP1 parts corresponding to the H2 message sent
Expand Down Expand Up @@ -556,7 +569,10 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
)
)

clientMultiplexer.createStreamChannel(promise: nil) { channel in
let errorHandler = ErrorEncounteredHandler()
let streamChannelPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self)
clientMultiplexer.createStreamChannel(promise: streamChannelPromise) { channel in
try? channel.pipeline.addHandler(errorHandler).wait()
channel.writeAndFlush(reqFrame.payload).whenComplete { _ in channel.close(promise: requestPromise) }
return channel.eventLoop.makeSucceededFuture(())
}
Expand All @@ -566,9 +582,11 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
(self.clientChannel.eventLoop as! EmbeddedEventLoop).run()
self.interactInMemory(self.clientChannel, self.serverChannel)
(self.clientChannel.eventLoop as! EmbeddedEventLoop).run()
XCTAssertThrowsError(try requestPromise.futureResult.wait()) { error in
XCTAssertTrue(error is NIOHTTP2Errors.StreamClosed)
}

let streamChannel = try XCTUnwrap(streamChannelPromise.futureResult.wait())
XCTAssertNoThrow(try streamChannel.closeFuture.wait())
XCTAssertNoThrow(try requestPromise.futureResult.wait())
XCTAssertTrue(errorHandler.encounteredError is NIOHTTP2Errors.StreamClosed)

// Assert that the user-provided handler received the
// HTTP1 parts corresponding to the H2 message sent
Expand Down
50 changes: 34 additions & 16 deletions Tests/NIOHTTP2Tests/ConfiguringPipelineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ class ConfiguringPipelineTests: XCTestCase {
)
)

clientHandler.createStreamChannel(promise: nil) { channel, streamID in
let errorHandler = ErrorEncounteredHandler()
let streamChannelPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self)
clientHandler.createStreamChannel(promise: streamChannelPromise) { channel, streamID in
try? channel.pipeline.addHandler(errorHandler).wait()
XCTAssertEqual(streamID, HTTP2StreamID(1))
channel.writeAndFlush(reqFrame).whenComplete { _ in channel.close(promise: requestPromise) }
return channel.eventLoop.makeSucceededFuture(())
Expand All @@ -73,9 +76,10 @@ class ConfiguringPipelineTests: XCTestCase {
self.interactInMemory(self.clientChannel, self.serverChannel)
(self.clientChannel.eventLoop as! EmbeddedEventLoop).run()

XCTAssertThrowsError(try requestPromise.futureResult.wait()) { error in
XCTAssertTrue(error is NIOHTTP2Errors.StreamClosed)
}
let streamChannel = try XCTUnwrap(streamChannelPromise.futureResult.wait())
XCTAssertNoThrow(try streamChannel.closeFuture.wait())
XCTAssertNoThrow(try requestPromise.futureResult.wait())
XCTAssertTrue(errorHandler.encounteredError is NIOHTTP2Errors.StreamClosed)

// We should have received a HEADERS and a RST_STREAM frame.
// The RST_STREAM frame is from closing an incomplete stream on the client side.
Expand Down Expand Up @@ -116,7 +120,10 @@ class ConfiguringPipelineTests: XCTestCase {
)
)

clientHandler.createStreamChannel(promise: nil) { channel, streamID in
let errorHandler = ErrorEncounteredHandler()
let streamChannelPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self)
clientHandler.createStreamChannel(promise: streamChannelPromise) { channel, streamID in
try? channel.pipeline.addHandler(errorHandler).wait()
XCTAssertEqual(streamID, HTTP2StreamID(1))
channel.writeAndFlush(reqFrame).whenComplete { _ in channel.close(promise: requestPromise) }
return channel.eventLoop.makeSucceededFuture(())
Expand All @@ -128,9 +135,10 @@ class ConfiguringPipelineTests: XCTestCase {
self.interactInMemory(self.clientChannel, self.serverChannel)
(self.clientChannel.eventLoop as! EmbeddedEventLoop).run()

XCTAssertThrowsError(try requestPromise.futureResult.wait()) { error in
XCTAssertTrue(error is NIOHTTP2Errors.StreamClosed)
}
let streamChannel = try XCTUnwrap(streamChannelPromise.futureResult.wait())
XCTAssertNoThrow(try streamChannel.closeFuture.wait())
XCTAssertNoThrow(try requestPromise.futureResult.wait())
XCTAssertTrue(errorHandler.encounteredError is NIOHTTP2Errors.StreamClosed)

// We should have received a HEADERS and a RST_STREAM frame.
// The RST_STREAM frame is from closing an incomplete stream on the client side.
Expand Down Expand Up @@ -403,7 +411,10 @@ class ConfiguringPipelineTests: XCTestCase {
)
)

clientHandler.createStreamChannel(promise: nil) { channel, streamID in
let errorHandler = ErrorEncounteredHandler()
let streamChannelPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self)
clientHandler.createStreamChannel(promise: streamChannelPromise) { channel, streamID in
try? channel.pipeline.addHandler(errorHandler).wait()
XCTAssertEqual(streamID, HTTP2StreamID(1))
channel.writeAndFlush(reqFrame).whenComplete { _ in channel.close(promise: requestPromise) }
return channel.eventLoop.makeSucceededFuture(())
Expand All @@ -414,9 +425,11 @@ class ConfiguringPipelineTests: XCTestCase {
(self.clientChannel.eventLoop as! EmbeddedEventLoop).run()
self.interactInMemory(self.clientChannel, self.serverChannel)
(self.clientChannel.eventLoop as! EmbeddedEventLoop).run()
XCTAssertThrowsError(try requestPromise.futureResult.wait()) { error in
XCTAssertTrue(error is NIOHTTP2Errors.StreamClosed)
}

let streamChannel = try XCTUnwrap(streamChannelPromise.futureResult.wait())
XCTAssertNoThrow(try streamChannel.closeFuture.wait())
XCTAssertNoThrow(try requestPromise.futureResult.wait())
XCTAssertTrue(errorHandler.encounteredError is NIOHTTP2Errors.StreamClosed)

let serverChildChannel = try serverChildChannelPromise.futureResult.wait()
try serverChildChannel.pipeline.handler(type: HTTP1ServerRequestRecorderHandler.self).map { serverRecorder in
Expand Down Expand Up @@ -492,7 +505,10 @@ class ConfiguringPipelineTests: XCTestCase {
)
)

clientHandler.createStreamChannel(promise: nil) { channel, streamID in
let errorHandler = ErrorEncounteredHandler()
let streamChannelPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self)
clientHandler.createStreamChannel(promise: streamChannelPromise) { channel, streamID in
try? channel.pipeline.addHandler(errorHandler).wait()
XCTAssertEqual(streamID, HTTP2StreamID(1))
channel.writeAndFlush(reqFrame).whenComplete { _ in channel.close(promise: requestPromise) }
return channel.eventLoop.makeSucceededFuture(())
Expand All @@ -503,9 +519,11 @@ class ConfiguringPipelineTests: XCTestCase {
(self.clientChannel.eventLoop as! EmbeddedEventLoop).run()
self.interactInMemory(self.clientChannel, self.serverChannel)
(self.clientChannel.eventLoop as! EmbeddedEventLoop).run()
XCTAssertThrowsError(try requestPromise.futureResult.wait()) { error in
XCTAssertTrue(error is NIOHTTP2Errors.StreamClosed)
}

let streamChannel = try XCTUnwrap(streamChannelPromise.futureResult.wait())
XCTAssertNoThrow(try streamChannel.closeFuture.wait())
XCTAssertNoThrow(try requestPromise.futureResult.wait())
XCTAssertTrue(errorHandler.encounteredError is NIOHTTP2Errors.StreamClosed)

let serverChildChannel = try serverChildChannelPromise.futureResult.wait()
try serverChildChannel.pipeline.handler(type: HTTP1ServerRequestRecorderHandler.self).map { serverRecorder in
Expand Down
Loading
Loading