Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a variant of configureAsyncHTTPServerPipeline which takes a stream delegate #471

Merged
merged 5 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions Sources/NIOHTTP2/HTTP2PipelineHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -705,13 +705,63 @@ extension Channel {
(HTTP2ConnectionOutput, NIOHTTP2Handler.AsyncStreamMultiplexer<HTTP2StreamOutput>)
>
>
> {
self.configureAsyncHTTPServerPipeline(
streamDelegate: nil,
http2Configuration: http2Configuration,
http1ConnectionInitializer: http1ConnectionInitializer,
http2ConnectionInitializer: http2ConnectionInitializer,
http2StreamInitializer: http2StreamInitializer
)
}

/// Configures a `ChannelPipeline` to speak either HTTP/1.1 or HTTP/2 according to what can be negotiated with the client.
///
/// This helper takes care of configuring the server pipeline such that it negotiates whether to
/// use HTTP/1.1 or HTTP/2.
///
/// This function doesn't configure the TLS handler. Callers of this function need to add a TLS
/// handler appropriately configured to perform protocol negotiation.
///
/// - Parameters:
/// - streamDelegate: A delegate which is called when streams are created and closed.
/// - http2Configuration: The settings that will be used when establishing the HTTP/2 connections and new HTTP/2 streams.
/// - http1ConnectionInitializer: An optional callback that will be invoked only when the negotiated protocol
/// is HTTP/1.1 to configure the connection channel.
/// - http2ConnectionInitializer: An optional callback that will be invoked only when the negotiated protocol
/// is HTTP/2 to configure the connection channel. The channel has an `ChannelOutboundHandler/OutboundIn` type of ``HTTP2Frame``.
/// - http2StreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// The output of this closure is the element type of the returned multiplexer
/// - Returns: An `EventLoopFuture` containing a `NIOTypedApplicationProtocolNegotiationHandler` that completes when the channel
/// is ready to negotiate. This can then be used to access the `NIOProtocolNegotiationResult` which may itself
/// be waited on to retrieve the result of the negotiation.
@inlinable
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public func configureAsyncHTTPServerPipeline<
HTTP1ConnectionOutput: Sendable,
HTTP2ConnectionOutput: Sendable,
HTTP2StreamOutput: Sendable
>(
streamDelegate: NIOHTTP2StreamDelegate?,
http2Configuration: NIOHTTP2Handler.Configuration = .init(),
http1ConnectionInitializer: @escaping NIOChannelInitializerWithOutput<HTTP1ConnectionOutput>,
http2ConnectionInitializer: @escaping NIOChannelInitializerWithOutput<HTTP2ConnectionOutput>,
http2StreamInitializer: @escaping NIOChannelInitializerWithOutput<HTTP2StreamOutput>
) -> EventLoopFuture<
EventLoopFuture<
NIONegotiatedHTTPVersion<
HTTP1ConnectionOutput,
(HTTP2ConnectionOutput, NIOHTTP2Handler.AsyncStreamMultiplexer<HTTP2StreamOutput>)
>
>
> {
let http2ConnectionInitializer:
NIOChannelInitializerWithOutput<
(HTTP2ConnectionOutput, NIOHTTP2Handler.AsyncStreamMultiplexer<HTTP2StreamOutput>)
> = { channel in
channel.configureAsyncHTTP2Pipeline(
mode: .server,
streamDelegate: streamDelegate,
configuration: http2Configuration,
streamInitializer: http2StreamInitializer
).flatMap { multiplexer in
Expand Down
106 changes: 106 additions & 0 deletions Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,112 @@ final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase {
try await assertNoThrow(try await self.serverChannel.finish())
}

// `testNegotiatedHTTP2BasicPipelineStreamDelegate` ensures that a client-server system set up to use async stream abstractions
// with a NIOHTTP2StreamDelegate calls the delegate methods .
func testNegotiatedHTTP2BasicPipelineStreamDelegate() async throws {
final class TestStreamDelegate: NIOHTTP2StreamDelegate {
let streamCount: NIOLockedValueBox<Int>
let streamsCreated: NIOLockedValueBox<Int>

init() {
self.streamCount = .init(0)
self.streamsCreated = .init(0)
}

func streamCreated(_ id: NIOHTTP2.HTTP2StreamID, channel: any NIOCore.Channel) {
self.streamCount.withLockedValue { $0 += 1 }
self.streamsCreated.withLockedValue { $0 += 1 }
}

func streamClosed(_ id: NIOHTTP2.HTTP2StreamID, channel: any NIOCore.Channel) {
self.streamCount.withLockedValue { $0 -= 1 }
}
}
let requestCount = 100

let streamDelegate = TestStreamDelegate()

let clientMultiplexer = try await assertNoThrowWithValue(
try await self.clientChannel.configureAsyncHTTP2Pipeline(mode: .client) {
channel -> EventLoopFuture<Channel> in
channel.eventLoop.makeSucceededFuture(channel)
}.get()
)

let negotiationResultFuture = try await self.serverChannel.configureAsyncHTTPServerPipeline(
streamDelegate: streamDelegate
) { channel in
channel.eventLoop.makeSucceededVoidFuture()
} http2ConnectionInitializer: { channel in
channel.eventLoop.makeSucceededVoidFuture()
} http2StreamInitializer: { channel -> EventLoopFuture<Channel> in
channel.pipeline.addHandlers([OKResponder()]).map { _ in channel }
}.get()

// Let's pretend the TLS handler did protocol negotiation for us
self.serverChannel.pipeline.fireUserInboundEventTriggered(
TLSUserEvent.handshakeCompleted(negotiatedProtocol: "h2")
)

try await assertNoThrow(
try await self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel)
)

try await withThrowingTaskGroup(of: Int.self, returning: Void.self) { group in
// server
group.addTask {
let negotiationResult = try await negotiationResultFuture.get()
let serverMultiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer<Channel>
switch negotiationResult {
case .http1_1:
preconditionFailure("Negotiation result must be HTTP/2")
case .http2(let (_, multiplexer)):
serverMultiplexer = multiplexer
}

var serverInboundChannelCount = 0
for try await _ in serverMultiplexer.inbound {
serverInboundChannelCount += 1
}
return serverInboundChannelCount
}

// client
for _ in 0..<requestCount {
// Let's try sending some requests
let streamChannel = try await clientMultiplexer.openStream { channel -> EventLoopFuture<Channel> in
channel.pipeline.addHandlers([SimpleRequest(), InboundFramePayloadRecorder()]).map {
channel
}
}

let clientRecorder = try await streamChannel.pipeline.handler(type: InboundFramePayloadRecorder.self)
.get()

try await Self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel)
try await Self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel)

clientRecorder.receivedFrames.assertFramePayloadsMatch([
ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload
])
try await streamChannel.closeFuture.get()
}

try await assertNoThrow(try await self.clientChannel.finish())
try await assertNoThrow(try await self.serverChannel.finish())

let serverInboundChannelCount = try await assertNoThrowWithValue(try await group.next()!)
XCTAssertEqual(
serverInboundChannelCount,
requestCount,
"We should have created one server-side channel as a result of the each HTTP/2 stream used."
)
}

XCTAssertEqual(streamDelegate.streamCount.withLockedValue { $0 }, 0)
XCTAssertEqual(streamDelegate.streamsCreated.withLockedValue { $0 }, requestCount)
}

// Simple handler which maps client request parts to remove references to `IOData` which isn't Sendable
internal final class HTTP1ClientSendability: ChannelOutboundHandler {
public typealias RequestPart = HTTPPart<HTTPRequestHead, ByteBuffer>
Expand Down
Loading