Skip to content

Commit

Permalink
Add new LambdaRuntime (#353)
Browse files Browse the repository at this point in the history
  • Loading branch information
aryan-25 authored Sep 5, 2024
1 parent 06763b8 commit 85b938e
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 25 deletions.
53 changes: 53 additions & 0 deletions Sources/AWSLambdaRuntime/Lambda+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,56 @@ extension LambdaCodableAdapter {
)
}
}

extension NewLambdaRuntime {
/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. ``JSONEncoder()`` used as default.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. ``JSONDecoder()`` used as default.
package convenience init<Event: Decodable, Output>(
body: @escaping (Event, NewLambdaContext) async throws -> Output,
encoder: JSONEncoder = JSONEncoder(),
decoder: JSONDecoder = JSONDecoder()
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Output, ClosureHandler<Event, Output>>,
Event,
Output,
JSONDecoder,
LambdaJSONOutputEncoder<Output>
>
{
let handler = LambdaCodableAdapter(
encoder: encoder,
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}

/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. ``JSONDecoder()`` used as default.
package convenience init<Event: Decodable>(
body: @escaping (Event, NewLambdaContext) async throws -> Void,
decoder: JSONDecoder = JSONDecoder()
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Void, ClosureHandler<Event, Void>>,
Event,
Void,
JSONDecoder,
VoidEncoder
>
{
let handler = LambdaCodableAdapter(
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}
}
4 changes: 4 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import Dispatch
import Logging
import NIOCore

extension Lambda {
package static func runLoop<RuntimeClient: LambdaRuntimeClientProtocol, Handler>(
Expand Down Expand Up @@ -44,4 +45,7 @@ extension Lambda {
}
}
}

/// The default EventLoop the Lambda is scheduled on.
package static var defaultEventLoop: any EventLoop = NIOSingletons.posixEventLoopGroup.next()
}
67 changes: 67 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambdaHandlers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,70 @@ package struct ClosureHandler<Event: Decodable, Output>: NewLambdaHandler {
try await self.body(event, context)
}
}

extension NewLambdaRuntime {
/// Initialize an instance with a ``StreamingLambdaHandler`` in the form of a closure.
/// - Parameter body: The handler in the form of a closure.
package convenience init(
body: @Sendable @escaping (ByteBuffer, LambdaResponseStreamWriter, NewLambdaContext) async throws -> Void
) where Handler == StreamingClosureHandler {
self.init(handler: StreamingClosureHandler(body: body))
}

/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**, an encoder, and a decoder.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type.
package convenience init<
Event: Decodable,
Output: Encodable,
Encoder: LambdaOutputEncoder,
Decoder: LambdaEventDecoder
>(
encoder: Encoder,
decoder: Decoder,
body: @escaping (Event, NewLambdaContext) async throws -> Output
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Output, ClosureHandler<Event, Output>>,
Event,
Output,
Decoder,
Encoder
>
{
let handler = LambdaCodableAdapter(
encoder: encoder,
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}

/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**, an encoder, and a decoder.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type.
package convenience init<Event: Decodable, Decoder: LambdaEventDecoder>(
decoder: Decoder,
body: @escaping (Event, NewLambdaContext) async throws -> Void
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Void, ClosureHandler<Event, Void>>,
Event,
Void,
Decoder,
VoidEncoder
>
{
let handler = LambdaCodableAdapter(
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}
}
70 changes: 70 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation
import Logging
import NIOCore
import NIOConcurrencyHelpers

// We need `@unchecked` Sendable here, as `NIOLockedValueBox` does not understand `sending` today.
// We don't want to use `NIOLockedValueBox` here anyway. We would love to use Mutex here, but this
// sadly crashes the compiler today.
package final class NewLambdaRuntime<Handler>: @unchecked Sendable where Handler: StreamingLambdaHandler {
// TODO: We want to change this to Mutex as soon as this doesn't crash the Swift compiler on Linux anymore
let handlerMutex: NIOLockedValueBox<Optional<Handler>>
let logger: Logger
let eventLoop: EventLoop

package init(
handler: sending Handler,
eventLoop: EventLoop = Lambda.defaultEventLoop,
logger: Logger = Logger(label: "LambdaRuntime")
) {
self.handlerMutex = NIOLockedValueBox(handler)
self.eventLoop = eventLoop
self.logger = logger
}

package func run() async throws {
guard let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") else {
throw NewLambdaRuntimeError(code: .missingLambdaRuntimeAPIEnvironmentVariable)
}

let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1)
let ip = String(ipAndPort[0])
guard let port = Int(ipAndPort[1]) else { throw NewLambdaRuntimeError(code: .invalidPort) }

let handler = self.handlerMutex.withLockedValue { handler in
let result = handler
handler = nil
return result
}

guard let handler else {
throw NewLambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce)
}

try await NewLambdaRuntimeClient.withRuntimeClient(
configuration: .init(ip: ip, port: port),
eventLoop: self.eventLoop,
logger: self.logger
) { runtimeClient in
try await Lambda.runLoop(
runtimeClient: runtimeClient,
handler: handler,
logger: self.logger
)
}
}
}
55 changes: 30 additions & 25 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024)
)
try channel.pipeline.syncOperations.addHandler(
LambdaChannelHandler(delegate: self, logger: self.logger)
LambdaChannelHandler(delegate: self, logger: self.logger, configuration: self.configuration)
)
return channel.eventLoop.makeSucceededFuture(())
} catch {
Expand Down Expand Up @@ -433,10 +433,33 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
private var reusableErrorBuffer: ByteBuffer?
private let logger: Logger
private let delegate: Delegate
private let configuration: NewLambdaRuntimeClient.Configuration

init(delegate: Delegate, logger: Logger) {
/// These are the default headers that must be sent along an invocation
let defaultHeaders: HTTPHeaders
/// These headers must be sent along an invocation or initialization error report
let errorHeaders: HTTPHeaders
/// These headers must be sent when streaming a response
let streamingHeaders: HTTPHeaders

init(delegate: Delegate, logger: Logger, configuration: NewLambdaRuntimeClient.Configuration) {
self.delegate = delegate
self.logger = logger
self.configuration = configuration
self.defaultHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
]
self.errorHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"lambda-runtime-function-error-type": "Unhandled",
]
self.streamingHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"transfer-encoding": "chunked",
]
}

func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation {
Expand Down Expand Up @@ -578,7 +601,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .POST,
uri: url,
headers: NewLambdaRuntimeClient.streamingHeaders
headers: self.streamingHeaders
)

context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
Expand All @@ -604,11 +627,12 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
let headers: HTTPHeaders =
if byteBuffer?.readableBytes ?? 0 < 6_000_000 {
[
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"content-length": "\(byteBuffer?.readableBytes ?? 0)",
]
} else {
NewLambdaRuntimeClient.streamingHeaders
self.streamingHeaders
}

let httpRequest = HTTPRequestHead(
Expand All @@ -634,7 +658,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .GET,
uri: self.nextInvocationPath,
headers: NewLambdaRuntimeClient.defaultHeaders
headers: self.defaultHeaders
)

context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
Expand All @@ -650,7 +674,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .POST,
uri: url,
headers: NewLambdaRuntimeClient.errorHeaders
headers: self.errorHeaders
)

if self.reusableErrorBuffer == nil {
Expand Down Expand Up @@ -797,22 +821,3 @@ extension LambdaChannelHandler: ChannelInboundHandler {
context.fireChannelInactive()
}
}

extension NewLambdaRuntimeClient {
static let defaultHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown"
]

/// These headers must be sent along an invocation or initialization error report
static let errorHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"lambda-runtime-function-error-type": "Unhandled",
]

/// These headers must be sent along an invocation or initialization error report
static let streamingHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"transfer-encoding": "streaming",
]

}
3 changes: 3 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ package struct NewLambdaRuntimeError: Error {
case nextInvocationMissingHeaderDeadline
case nextInvocationMissingHeaderInvokeFuctionARN

case missingLambdaRuntimeAPIEnvironmentVariable
case runtimeCanOnlyBeStartedOnce
case invalidPort
}

package init(code: Code, underlying: (any Error)? = nil) {
Expand Down

0 comments on commit 85b938e

Please sign in to comment.