From 1d8d4b1b650e37377cb0414f833ef729b7a54297 Mon Sep 17 00:00:00 2001 From: Joannis Orlandos Date: Mon, 16 Sep 2024 14:19:00 +0300 Subject: [PATCH 1/5] Allow `with`-style streaming APIs for communicating with a running program --- Sources/Citadel/Errors.swift | 2 +- Sources/Citadel/TTY/Client/TTY.swift | 82 ++++++++++++++++++++++---- Tests/CitadelTests/Citadel2Tests.swift | 52 ++++++++++++++++ 3 files changed, 125 insertions(+), 11 deletions(-) diff --git a/Sources/Citadel/Errors.swift b/Sources/Citadel/Errors.swift index 1e554ad..6604a9c 100644 --- a/Sources/Citadel/Errors.swift +++ b/Sources/Citadel/Errors.swift @@ -42,4 +42,4 @@ public enum CitadelError: Error { case channelFailure } -public struct AuthenticationFailed: Error, Equatable {} \ No newline at end of file +public struct AuthenticationFailed: Error, Equatable {} diff --git a/Sources/Citadel/TTY/Client/TTY.swift b/Sources/Citadel/TTY/Client/TTY.swift index c05619a..5904386 100644 --- a/Sources/Citadel/TTY/Client/TTY.swift +++ b/Sources/Citadel/TTY/Client/TTY.swift @@ -42,6 +42,34 @@ public enum ExecCommandOutput { case stderr(ByteBuffer) } +struct EmptySequence: Sendable, AsyncSequence { + struct AsyncIterator: AsyncIteratorProtocol { + func next() async throws -> Element? { + nil + } + } + + func makeAsyncIterator() -> AsyncIterator { + AsyncIterator() + } +} + +@available(macOS 15.0, *) +public struct TTYOutput: AsyncSequence { + internal let sequence: AsyncThrowingStream + + public func makeAsyncIterator() -> some AsyncIteratorProtocol { + sequence.makeAsyncIterator() + } +} + +public struct TTYStdinWriter { + internal let channel: Channel + + public func write(_ buffer: ByteBuffer) async throws { + try await channel.writeAndFlush(SSHChannelData(type: .channel, data: .byteBuffer(buffer))) + } +} final class ExecCommandHandler: ChannelDuplexHandler { enum Output { @@ -126,7 +154,12 @@ extension SSHClient { /// - maxResponseSize: The maximum size of the response. If the response is larger, the command will fail. /// - mergeStreams: If the answer should also include stderr. /// - inShell: Whether to request the remote server to start a shell before executing the command. - public func executeCommand(_ command: String, maxResponseSize: Int = .max, mergeStreams: Bool = false, inShell: Bool = false) async throws -> ByteBuffer { + public func executeCommand( + _ command: String, + maxResponseSize: Int = .max, + mergeStreams: Bool = false, + inShell: Bool = false + ) async throws -> ByteBuffer { var result = ByteBuffer() let stream = try await executeCommandStream(command, inShell: inShell) @@ -157,11 +190,15 @@ extension SSHClient { /// - command: The command to execute. /// - inShell: Whether to request the remote server to start a shell before executing the command. public func executeCommandStream(_ command: String, inShell: Bool = false) async throws -> AsyncThrowingStream { - var streamContinuation: AsyncThrowingStream.Continuation! - let stream = AsyncThrowingStream(bufferingPolicy: .unbounded) { continuation in - streamContinuation = continuation - } - + try await _executeCommandStream(command, inShell: inShell).output + } + + internal func _executeCommandStream( + _ command: String, + inShell: Bool = false + ) async throws -> (channel: Channel, output: AsyncThrowingStream) { + let (stream, streamContinuation) = AsyncThrowingStream.makeStream() + var hasReceivedChannelSuccess = false var exitCode: Int? @@ -180,9 +217,12 @@ extension SSHClient { streamContinuation.finish() } case .channelSuccess: - if inShell, !hasReceivedChannelSuccess { - let commandData = SSHChannelData(type: .channel, - data: .byteBuffer(ByteBuffer(string: command + ";exit\n"))) + if inShell, + !hasReceivedChannelSuccess { + let commandData = SSHChannelData( + type: .channel, + data: .byteBuffer(ByteBuffer(string: command + ";exit\n")) + ) channel.writeAndFlush(commandData, promise: nil) hasReceivedChannelSuccess = true } @@ -215,7 +255,29 @@ extension SSHClient { )) } - return stream + return (channel, stream) + } + + @available(macOS 15.0, *) + public func withExecutingCommand( + _ command: String, + inShell: Bool, + perform: (_ inbound: TTYOutput, _ outbound: TTYStdinWriter) async throws -> Void + ) async throws { + let (channel, output) = try await _executeCommandStream(command, inShell: inShell) + + func close() async throws { + try await channel.close() + } + + do { + let inbound = TTYOutput(sequence: output) + try await perform(inbound, TTYStdinWriter(channel: channel)) + try await close() + } catch { + try await close() + throw error + } } /// Executes a command on the remote server. This will return the pair of streams stdout and stderr of the command. If the command fails, the error will be thrown. diff --git a/Tests/CitadelTests/Citadel2Tests.swift b/Tests/CitadelTests/Citadel2Tests.swift index 0621324..098aafc 100644 --- a/Tests/CitadelTests/Citadel2Tests.swift +++ b/Tests/CitadelTests/Citadel2Tests.swift @@ -280,4 +280,56 @@ final class Citadel2Tests: XCTestCase { try await client.close() } + + @available(macOS 15.0, *) + func testStdinStream() async throws { + guard + let host = ProcessInfo.processInfo.environment["SSH_HOST"], + let _port = ProcessInfo.processInfo.environment["SSH_PORT"], + let port = Int(_port), + let username = ProcessInfo.processInfo.environment["SSH_USERNAME"], + let password = ProcessInfo.processInfo.environment["SSH_PASSWORD"] + else { + throw XCTSkip() + } + + let client = try await SSHClient.connect( + host: host, + port: port, + authenticationMethod: .passwordBased(username: username, password: password), + hostKeyValidator: .acceptAnything(), + reconnect: .never + ) + + try await client.withExecutingCommand("cat", inShell: false) { inbound, outbound in + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + var i = UInt8.min + for try await value in inbound { + switch value { + case .stdout(let value): + for byte in value.readableBytesView { + XCTAssertEqual(byte, i) + i = i &+ 1 + } + case .stderr: + XCTFail("Unexpected stderr") + } + } + } + + group.addTask { + for i: UInt8 in .min ... .max { + let value = ByteBufferAllocator().buffer(integer: i) + try await outbound.write(value) + } + } + + try await group.next() + group.cancelAll() + } + } + + try await client.close() + } } From 89f570c4aa926de375da0afe24286bf2ae1c9060 Mon Sep 17 00:00:00 2001 From: Joannis Orlandos Date: Mon, 16 Sep 2024 14:29:25 +0300 Subject: [PATCH 2/5] Wrap the async iterator --- Sources/Citadel/TTY/Client/TTY.swift | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/Sources/Citadel/TTY/Client/TTY.swift b/Sources/Citadel/TTY/Client/TTY.swift index 5904386..9e6e2f1 100644 --- a/Sources/Citadel/TTY/Client/TTY.swift +++ b/Sources/Citadel/TTY/Client/TTY.swift @@ -57,9 +57,19 @@ struct EmptySequence: Sendable, AsyncSequence { @available(macOS 15.0, *) public struct TTYOutput: AsyncSequence { internal let sequence: AsyncThrowingStream + public typealias Element = ExecCommandOutput - public func makeAsyncIterator() -> some AsyncIteratorProtocol { - sequence.makeAsyncIterator() + public struct AsyncIterator: AsyncIteratorProtocol { + public typealias Element = ExecCommandOutput + var iterator: AsyncThrowingStream.AsyncIterator + + public mutating func next() async throws -> ExecCommandOutput? { + try await iterator.next() + } + } + + public func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(iterator: sequence.makeAsyncIterator()) } } From 1885fb4ba08864b9bb3710108e669a332ef3dd1c Mon Sep 17 00:00:00 2001 From: Joannis Orlandos Date: Thu, 19 Sep 2024 16:29:00 +0300 Subject: [PATCH 3/5] Change to raw TTY access --- Sources/Citadel/TTY/Client/TTY.swift | 25 ++++++++++++------------- Tests/CitadelTests/Citadel2Tests.swift | 3 ++- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/Sources/Citadel/TTY/Client/TTY.swift b/Sources/Citadel/TTY/Client/TTY.swift index 9e6e2f1..6d6dc9e 100644 --- a/Sources/Citadel/TTY/Client/TTY.swift +++ b/Sources/Citadel/TTY/Client/TTY.swift @@ -204,7 +204,7 @@ extension SSHClient { } internal func _executeCommandStream( - _ command: String, + _ command: String?, inShell: Bool = false ) async throws -> (channel: Channel, output: AsyncThrowingStream) { let (stream, streamContinuation) = AsyncThrowingStream.makeStream() @@ -227,13 +227,14 @@ extension SSHClient { streamContinuation.finish() } case .channelSuccess: - if inShell, - !hasReceivedChannelSuccess { - let commandData = SSHChannelData( - type: .channel, - data: .byteBuffer(ByteBuffer(string: command + ";exit\n")) - ) - channel.writeAndFlush(commandData, promise: nil) + if inShell, !hasReceivedChannelSuccess { + if let command { + let commandData = SSHChannelData( + type: .channel, + data: .byteBuffer(ByteBuffer(string: command + ";exit\n")) + ) + channel.writeAndFlush(commandData, promise: nil) + } hasReceivedChannelSuccess = true } case .exit(let status): @@ -258,7 +259,7 @@ extension SSHClient { try await channel.triggerUserOutboundEvent(SSHChannelRequestEvent.ShellRequest( wantReply: true )) - } else { + } else if let command { try await channel.triggerUserOutboundEvent(SSHChannelRequestEvent.ExecRequest( command: command, wantReply: true @@ -269,12 +270,10 @@ extension SSHClient { } @available(macOS 15.0, *) - public func withExecutingCommand( - _ command: String, - inShell: Bool, + public func withTTY( perform: (_ inbound: TTYOutput, _ outbound: TTYStdinWriter) async throws -> Void ) async throws { - let (channel, output) = try await _executeCommandStream(command, inShell: inShell) + let (channel, output) = try await _executeCommandStream(nil, inShell: true) func close() async throws { try await channel.close() diff --git a/Tests/CitadelTests/Citadel2Tests.swift b/Tests/CitadelTests/Citadel2Tests.swift index 098aafc..8c077ec 100644 --- a/Tests/CitadelTests/Citadel2Tests.swift +++ b/Tests/CitadelTests/Citadel2Tests.swift @@ -301,7 +301,8 @@ final class Citadel2Tests: XCTestCase { reconnect: .never ) - try await client.withExecutingCommand("cat", inShell: false) { inbound, outbound in + try await client.withTTY { inbound, outbound in + try await outbound.write(ByteBuffer(string: "cat")) try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { var i = UInt8.min From 2d8e8283c0ebf2216cc759564321b267186f7754 Mon Sep 17 00:00:00 2001 From: Joannis Orlandos Date: Fri, 8 Nov 2024 09:38:58 +0100 Subject: [PATCH 4/5] Support PTYs for terminal emulators --- Sources/Citadel/TTY/Client/TTY.swift | 67 ++++++++++++++++++++++------ 1 file changed, 53 insertions(+), 14 deletions(-) diff --git a/Sources/Citadel/TTY/Client/TTY.swift b/Sources/Citadel/TTY/Client/TTY.swift index 6d6dc9e..dc6edc2 100644 --- a/Sources/Citadel/TTY/Client/TTY.swift +++ b/Sources/Citadel/TTY/Client/TTY.swift @@ -79,6 +79,17 @@ public struct TTYStdinWriter { public func write(_ buffer: ByteBuffer) async throws { try await channel.writeAndFlush(SSHChannelData(type: .channel, data: .byteBuffer(buffer))) } + + public func changeSize(cols: Int, rows: Int) async throws { + try await channel.triggerUserOutboundEvent( + SSHChannelRequestEvent.WindowChangeRequest( + terminalCharacterWidth: 0, + terminalRowHeight: 0, + terminalPixelWidth: 0, + terminalPixelHeight: 0 + ) + ) + } } final class ExecCommandHandler: ChannelDuplexHandler { @@ -200,12 +211,17 @@ extension SSHClient { /// - command: The command to execute. /// - inShell: Whether to request the remote server to start a shell before executing the command. public func executeCommandStream(_ command: String, inShell: Bool = false) async throws -> AsyncThrowingStream { - try await _executeCommandStream(command, inShell: inShell).output + try await _executeCommandStream( + mode: inShell ? .tty(command: command) : .command(command) + ).output + } + + enum CommandMode { + case pty(SSHChannelRequestEvent.PseudoTerminalRequest), tty(command: String?), command(String) } internal func _executeCommandStream( - _ command: String?, - inShell: Bool = false + mode: CommandMode ) async throws -> (channel: Channel, output: AsyncThrowingStream) { let (stream, streamContinuation) = AsyncThrowingStream.makeStream() @@ -227,14 +243,12 @@ extension SSHClient { streamContinuation.finish() } case .channelSuccess: - if inShell, !hasReceivedChannelSuccess { - if let command { - let commandData = SSHChannelData( - type: .channel, - data: .byteBuffer(ByteBuffer(string: command + ";exit\n")) - ) - channel.writeAndFlush(commandData, promise: nil) - } + if case .tty(.some(let command)) = mode, !hasReceivedChannelSuccess { + let commandData = SSHChannelData( + type: .channel, + data: .byteBuffer(ByteBuffer(string: command + ";exit\n")) + ) + channel.writeAndFlush(commandData, promise: nil) hasReceivedChannelSuccess = true } case .exit(let status): @@ -255,11 +269,15 @@ extension SSHClient { return createChannel.futureResult }.get() - if inShell { + switch mode { + case .pty(let request): + try await channel.triggerUserOutboundEvent(request) + fallthrough + case .tty: try await channel.triggerUserOutboundEvent(SSHChannelRequestEvent.ShellRequest( wantReply: true )) - } else if let command { + case .command(let command): try await channel.triggerUserOutboundEvent(SSHChannelRequestEvent.ExecRequest( command: command, wantReply: true @@ -269,11 +287,32 @@ extension SSHClient { return (channel, stream) } + @available(macOS 15.0, *) + public func withPTY( + _ request: SSHChannelRequestEvent.PseudoTerminalRequest, + perform: (_ inbound: TTYOutput, _ outbound: TTYStdinWriter) async throws -> Void + ) async throws { + let (channel, output) = try await _executeCommandStream(mode: .pty(request)) + + func close() async throws { + try await channel.close() + } + + do { + let inbound = TTYOutput(sequence: output) + try await perform(inbound, TTYStdinWriter(channel: channel)) + try await close() + } catch { + try await close() + throw error + } + } + @available(macOS 15.0, *) public func withTTY( perform: (_ inbound: TTYOutput, _ outbound: TTYStdinWriter) async throws -> Void ) async throws { - let (channel, output) = try await _executeCommandStream(nil, inShell: true) + let (channel, output) = try await _executeCommandStream(mode: .tty(command: nil)) func close() async throws { try await channel.close() From 5aa51c513ecc9e60894df28dee0251581bebf9b5 Mon Sep 17 00:00:00 2001 From: Joannis Orlandos Date: Fri, 8 Nov 2024 09:42:06 +0100 Subject: [PATCH 5/5] Add support for passing in environment requests --- Sources/Citadel/TTY/Client/TTY.swift | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/Sources/Citadel/TTY/Client/TTY.swift b/Sources/Citadel/TTY/Client/TTY.swift index dc6edc2..0b73abd 100644 --- a/Sources/Citadel/TTY/Client/TTY.swift +++ b/Sources/Citadel/TTY/Client/TTY.swift @@ -210,8 +210,13 @@ extension SSHClient { /// - Parameters: /// - command: The command to execute. /// - inShell: Whether to request the remote server to start a shell before executing the command. - public func executeCommandStream(_ command: String, inShell: Bool = false) async throws -> AsyncThrowingStream { + public func executeCommandStream( + _ command: String, + environment: [SSHChannelRequestEvent.EnvironmentRequest] = [], + inShell: Bool = false + ) async throws -> AsyncThrowingStream { try await _executeCommandStream( + environment: environment, mode: inShell ? .tty(command: command) : .command(command) ).output } @@ -221,6 +226,7 @@ extension SSHClient { } internal func _executeCommandStream( + environment: [SSHChannelRequestEvent.EnvironmentRequest] = [], mode: CommandMode ) async throws -> (channel: Channel, output: AsyncThrowingStream) { let (stream, streamContinuation) = AsyncThrowingStream.makeStream() @@ -269,6 +275,10 @@ extension SSHClient { return createChannel.futureResult }.get() + for env in environment { + try await channel.triggerUserOutboundEvent(env) + } + switch mode { case .pty(let request): try await channel.triggerUserOutboundEvent(request) @@ -290,9 +300,13 @@ extension SSHClient { @available(macOS 15.0, *) public func withPTY( _ request: SSHChannelRequestEvent.PseudoTerminalRequest, + environment: [SSHChannelRequestEvent.EnvironmentRequest] = [], perform: (_ inbound: TTYOutput, _ outbound: TTYStdinWriter) async throws -> Void ) async throws { - let (channel, output) = try await _executeCommandStream(mode: .pty(request)) + let (channel, output) = try await _executeCommandStream( + environment: environment, + mode: .pty(request) + ) func close() async throws { try await channel.close() @@ -310,9 +324,13 @@ extension SSHClient { @available(macOS 15.0, *) public func withTTY( + environment: [SSHChannelRequestEvent.EnvironmentRequest] = [], perform: (_ inbound: TTYOutput, _ outbound: TTYStdinWriter) async throws -> Void ) async throws { - let (channel, output) = try await _executeCommandStream(mode: .tty(command: nil)) + let (channel, output) = try await _executeCommandStream( + environment: environment, + mode: .tty(command: nil) + ) func close() async throws { try await channel.close()