Remove old API (#355)

This commit is contained in:
Fabian Fett
2024-09-04 18:28:53 +02:00
committed by GitHub
parent 7a8c0f22c0
commit ddb703946c
31 changed files with 87 additions and 4689 deletions
+5 -4
View File
@@ -84,13 +84,14 @@ let package = Package(
dependencies: [
.byName(name: "AWSLambdaRuntime"),
.product(name: "NIO", package: "swift-nio"),
],
swiftSettings: [.swiftLanguageMode(.v5)]
]
),
.testTarget(
name: "AWSLambdaTestingTests",
dependencies: ["AWSLambdaTesting"],
swiftSettings: [.swiftLanguageMode(.v5)]
dependencies: [
.byName(name: "AWSLambdaTesting"),
.product(name: "Testing", package: "swift-testing"),
]
),
// for perf testing
.executableTarget(
@@ -16,7 +16,7 @@ import AWSLambdaRuntimeCore
import struct Foundation.Date
extension LambdaContext {
extension NewLambdaContext {
var deadlineDate: Date {
let secondsSinceEpoch = Double(Int64(bitPattern: self.deadline.rawValue)) / -1_000_000_000
return Date(timeIntervalSince1970: secondsSinceEpoch)
@@ -24,125 +24,6 @@ import class Foundation.JSONDecoder
import class Foundation.JSONEncoder
#endif
// MARK: - SimpleLambdaHandler Codable support
/// Implementation of `ByteBuffer` to `Event` decoding.
extension SimpleLambdaHandler where Event: Decodable {
@inlinable
public func decode(buffer: ByteBuffer) throws -> Event {
try self.decoder.decode(Event.self, from: buffer)
}
}
/// Implementation of `Output` to `ByteBuffer` encoding.
extension SimpleLambdaHandler where Output: Encodable {
@inlinable
public func encode(value: Output, into buffer: inout ByteBuffer) throws {
try self.encoder.encode(value, into: &buffer)
}
}
/// Default `ByteBuffer` to `Event` decoder using Foundation's `JSONDecoder`.
/// Advanced users who want to inject their own codec can do it by overriding these functions.
extension SimpleLambdaHandler where Event: Decodable {
public var decoder: LambdaCodableDecoder {
Lambda.defaultJSONDecoder
}
}
/// Default `Output` to `ByteBuffer` encoder using Foundation's `JSONEncoder`.
/// Advanced users who want to inject their own codec can do it by overriding these functions.
extension SimpleLambdaHandler where Output: Encodable {
public var encoder: LambdaCodableEncoder {
Lambda.defaultJSONEncoder
}
}
// MARK: - LambdaHandler Codable support
/// Implementation of `ByteBuffer` to `Event` decoding.
extension LambdaHandler where Event: Decodable {
@inlinable
public func decode(buffer: ByteBuffer) throws -> Event {
try self.decoder.decode(Event.self, from: buffer)
}
}
/// Implementation of `Output` to `ByteBuffer` encoding.
extension LambdaHandler where Output: Encodable {
@inlinable
public func encode(value: Output, into buffer: inout ByteBuffer) throws {
try self.encoder.encode(value, into: &buffer)
}
}
/// Default `ByteBuffer` to `Event` decoder using Foundation's `JSONDecoder`.
/// Advanced users who want to inject their own codec can do it by overriding these functions.
extension LambdaHandler where Event: Decodable {
public var decoder: LambdaCodableDecoder {
Lambda.defaultJSONDecoder
}
}
/// Default `Output` to `ByteBuffer` encoder using Foundation's `JSONEncoder`.
/// Advanced users who want to inject their own codec can do it by overriding these functions.
extension LambdaHandler where Output: Encodable {
public var encoder: LambdaCodableEncoder {
Lambda.defaultJSONEncoder
}
}
// MARK: - EventLoopLambdaHandler Codable support
/// Implementation of `ByteBuffer` to `Event` decoding.
extension EventLoopLambdaHandler where Event: Decodable {
@inlinable
public func decode(buffer: ByteBuffer) throws -> Event {
try self.decoder.decode(Event.self, from: buffer)
}
}
/// Implementation of `Output` to `ByteBuffer` encoding.
extension EventLoopLambdaHandler where Output: Encodable {
@inlinable
public func encode(value: Output, into buffer: inout ByteBuffer) throws {
try self.encoder.encode(value, into: &buffer)
}
}
/// Default `ByteBuffer` to `Event` decoder using Foundation's `JSONDecoder`.
/// Advanced users that want to inject their own codec can do it by overriding these functions.
extension EventLoopLambdaHandler where Event: Decodable {
public var decoder: LambdaCodableDecoder {
Lambda.defaultJSONDecoder
}
}
/// Default `Output` to `ByteBuffer` encoder using Foundation's `JSONEncoder`.
/// Advanced users that want to inject their own codec can do it by overriding these functions.
extension EventLoopLambdaHandler where Output: Encodable {
public var encoder: LambdaCodableEncoder {
Lambda.defaultJSONEncoder
}
}
public protocol LambdaCodableDecoder {
func decode<T: Decodable>(_ type: T.Type, from buffer: ByteBuffer) throws -> T
}
public protocol LambdaCodableEncoder {
func encode<T: Encodable>(_ value: T, into buffer: inout ByteBuffer) throws
}
extension Lambda {
fileprivate static let defaultJSONDecoder = JSONDecoder()
fileprivate static let defaultJSONEncoder = JSONEncoder()
}
extension JSONDecoder: LambdaCodableDecoder {}
extension JSONEncoder: LambdaCodableEncoder {}
extension JSONDecoder: AWSLambdaRuntimeCore.LambdaEventDecoder {}
@usableFromInline
@@ -36,19 +36,19 @@ package struct InvocationMetadata: Hashable {
package let clientContext: String?
package let cognitoIdentity: String?
package init(headers: HTTPHeaders) throws {
package init(headers: HTTPHeaders) throws(NewLambdaRuntimeError) {
guard let requestID = headers.first(name: AmazonHeaders.requestID), !requestID.isEmpty else {
throw LambdaRuntimeError.invocationMissingHeader(AmazonHeaders.requestID)
throw NewLambdaRuntimeError(code: .nextInvocationMissingHeaderRequestID)
}
guard let deadline = headers.first(name: AmazonHeaders.deadline),
let unixTimeInMilliseconds = Int64(deadline)
else {
throw LambdaRuntimeError.invocationMissingHeader(AmazonHeaders.deadline)
throw NewLambdaRuntimeError(code: .nextInvocationMissingHeaderDeadline)
}
guard let invokedFunctionARN = headers.first(name: AmazonHeaders.invokedFunctionARN) else {
throw LambdaRuntimeError.invocationMissingHeader(AmazonHeaders.invokedFunctionARN)
throw NewLambdaRuntimeError(code: .nextInvocationMissingHeaderInvokeFuctionARN)
}
self.requestID = requestID
@@ -1,94 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Foundation
import Logging
import NIOConcurrencyHelpers
import NIOCore
/// A container that allows tasks to finish after a synchronous invocation
/// has produced its response.
actor DetachedTasksContainer: Sendable {
struct Context: Sendable {
let eventLoop: EventLoop
let logger: Logger
}
private var context: Context
private var storage: [RegistrationKey: EventLoopFuture<Void>] = [:]
init(context: Context) {
self.context = context
}
/// Adds a detached async task.
///
/// - Parameters:
/// - name: The name of the task.
/// - task: The async task to execute.
/// - Returns: A `RegistrationKey` for the registered task.
func detached(task: @Sendable @escaping () async -> Void) {
let key = RegistrationKey()
let promise = self.context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask(task)
let task = promise.futureResult.always { [weak self] _ in
guard let self else { return }
Task {
await self.removeTask(forKey: key)
}
}
self.storage[key] = task
}
func removeTask(forKey key: RegistrationKey) {
self.storage.removeValue(forKey: key)
}
/// Awaits all registered tasks to complete.
///
/// - Returns: An `EventLoopFuture<Void>` that completes when all tasks have finished.
func awaitAll() -> EventLoopFuture<Void> {
let tasks = self.storage.values
if tasks.isEmpty {
return self.context.eventLoop.makeSucceededVoidFuture()
} else {
let context = context
return EventLoopFuture.andAllComplete(Array(tasks), on: context.eventLoop).flatMap { [weak self] in
guard let self else {
return context.eventLoop.makeSucceededFuture(())
}
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await self.awaitAll().get()
}
return promise.futureResult
}
}
}
}
extension DetachedTasksContainer {
/// Lambda detached task registration key.
struct RegistrationKey: Hashable, CustomStringConvertible, Sendable {
var value: String
init() {
// UUID basically
self.value = UUID().uuidString
}
var description: String {
self.value
}
}
}
@@ -1,342 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOConcurrencyHelpers
import NIOCore
import NIOHTTP1
import NIOPosix
/// A barebone HTTP client to interact with AWS Runtime Engine which is an HTTP server.
/// Note that Lambda Runtime API dictate that only one requests runs at a time.
/// This means we can avoid locks and other concurrency concern we would otherwise need to build into the client
final class HTTPClient {
private let eventLoop: EventLoop
private let configuration: LambdaConfiguration.RuntimeEngine
private let targetHost: String
private var state = State.disconnected
private var executing = false
init(eventLoop: EventLoop, configuration: LambdaConfiguration.RuntimeEngine) {
self.eventLoop = eventLoop
self.configuration = configuration
self.targetHost = "\(self.configuration.ip):\(self.configuration.port)"
}
func get(url: String, headers: HTTPHeaders, timeout: TimeAmount? = nil) -> EventLoopFuture<Response> {
self.execute(
Request(
targetHost: self.targetHost,
url: url,
method: .GET,
headers: headers,
timeout: timeout ?? self.configuration.requestTimeout
)
)
}
func post(
url: String,
headers: HTTPHeaders,
body: ByteBuffer?,
timeout: TimeAmount? = nil
) -> EventLoopFuture<Response> {
self.execute(
Request(
targetHost: self.targetHost,
url: url,
method: .POST,
headers: headers,
body: body,
timeout: timeout ?? self.configuration.requestTimeout
)
)
}
/// cancels the current request if there is one
func cancel() {
guard self.executing else {
// there is no request running. nothing to cancel
return
}
guard case .connected(let channel) = self.state else {
preconditionFailure("if we are executing, we expect to have an open channel")
}
channel.triggerUserOutboundEvent(RequestCancelEvent(), promise: nil)
}
// TODO: cap reconnect attempt
private func execute(_ request: Request, validate: Bool = true) -> EventLoopFuture<Response> {
if validate {
precondition(self.executing == false, "expecting single request at a time")
self.executing = true
}
switch self.state {
case .disconnected:
return self.connect().flatMap { channel -> EventLoopFuture<Response> in
self.state = .connected(channel)
return self.execute(request, validate: false)
}
case .connected(let channel):
guard channel.isActive else {
self.state = .disconnected
return self.execute(request, validate: false)
}
let promise = channel.eventLoop.makePromise(of: Response.self)
promise.futureResult.whenComplete { _ in
precondition(self.executing == true, "invalid execution state")
self.executing = false
}
let wrapper = HTTPRequestWrapper(request: request, promise: promise)
channel.writeAndFlush(wrapper).cascadeFailure(to: promise)
return promise.futureResult
}
}
private func connect() -> EventLoopFuture<Channel> {
let bootstrap = ClientBootstrap(group: self.eventLoop)
.channelInitializer { channel in
do {
try channel.pipeline.syncOperations.addHTTPClientHandlers()
// Lambda quotas... An invocation payload is maximal 6MB in size:
// https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
try channel.pipeline.syncOperations.addHandler(
NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024)
)
try channel.pipeline.syncOperations.addHandler(LambdaChannelHandler())
return channel.eventLoop.makeSucceededFuture(())
} catch {
return channel.eventLoop.makeFailedFuture(error)
}
}
do {
// connect directly via socket address to avoid happy eyeballs (perf)
let address = try SocketAddress(ipAddress: self.configuration.ip, port: self.configuration.port)
return bootstrap.connect(to: address)
} catch {
return self.eventLoop.makeFailedFuture(error)
}
}
struct Request: Equatable {
let url: String
let method: HTTPMethod
let targetHost: String
let headers: HTTPHeaders
let body: ByteBuffer?
let timeout: TimeAmount?
init(
targetHost: String,
url: String,
method: HTTPMethod = .GET,
headers: HTTPHeaders = HTTPHeaders(),
body: ByteBuffer? = nil,
timeout: TimeAmount?
) {
self.targetHost = targetHost
self.url = url
self.method = method
self.headers = headers
self.body = body
self.timeout = timeout
}
}
struct Response: Equatable {
var version: HTTPVersion
var status: HTTPResponseStatus
var headers: HTTPHeaders
var body: ByteBuffer?
}
enum Errors: Error {
case connectionResetByPeer
case timeout
case cancelled
}
private enum State {
case disconnected
case connected(Channel)
}
}
// no need in locks since we validate only one request can run at a time
private final class LambdaChannelHandler: ChannelDuplexHandler {
typealias InboundIn = NIOHTTPClientResponseFull
typealias OutboundIn = HTTPRequestWrapper
typealias OutboundOut = HTTPClientRequestPart
enum State {
case idle
case running(promise: EventLoopPromise<HTTPClient.Response>, timeout: Scheduled<Void>?)
case waitForConnectionClose(HTTPClient.Response, EventLoopPromise<HTTPClient.Response>)
}
private var state: State = .idle
private var lastError: Error?
init() {}
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
guard case .idle = self.state else {
preconditionFailure("invalid state, outstanding request")
}
let wrapper = unwrapOutboundIn(data)
var head = HTTPRequestHead(
version: .http1_1,
method: wrapper.request.method,
uri: wrapper.request.url,
headers: wrapper.request.headers
)
head.headers.add(name: "host", value: wrapper.request.targetHost)
switch head.method {
case .POST, .PUT:
head.headers.add(name: "content-length", value: String(wrapper.request.body?.readableBytes ?? 0))
default:
break
}
let timeoutTask = wrapper.request.timeout.map {
context.eventLoop.scheduleTask(in: $0) {
guard case .running = self.state else {
preconditionFailure("invalid state")
}
context.pipeline.fireErrorCaught(HTTPClient.Errors.timeout)
}
}
self.state = .running(promise: wrapper.promise, timeout: timeoutTask)
context.write(wrapOutboundOut(.head(head)), promise: nil)
if let body = wrapper.request.body {
context.write(wrapOutboundOut(.body(IOData.byteBuffer(body))), promise: nil)
}
context.writeAndFlush(wrapOutboundOut(.end(nil)), promise: promise)
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
guard case .running(let promise, let timeout) = self.state else {
preconditionFailure("invalid state, no pending request")
}
let response = unwrapInboundIn(data)
let httpResponse = HTTPClient.Response(
version: response.head.version,
status: response.head.status,
headers: response.head.headers,
body: response.body
)
timeout?.cancel()
// As defined in RFC 7230 Section 6.3:
// HTTP/1.1 defaults to the use of "persistent connections", allowing
// multiple requests and responses to be carried over a single
// connection. The "close" connection option is used to signal that a
// connection will not persist after the current request/response. HTTP
// implementations SHOULD support persistent connections.
//
// That's why we only assume the connection shall be closed if we receive
// a "connection = close" header.
let serverCloseConnection =
response.head.headers["connection"].contains(where: { $0.lowercased() == "close" })
let closeConnection = serverCloseConnection || response.head.version != .http1_1
if closeConnection {
// If we were succeeding the request promise here directly and closing the connection
// after succeeding the promise we may run into a race condition:
//
// The lambda runtime will ask for the next work item directly after a succeeded post
// response request. The desire for the next work item might be faster than the attempt
// to close the connection. This will lead to a situation where we try to the connection
// but the next request has already been scheduled on the connection that we want to
// close. For this reason we postpone succeeding the promise until the connection has
// been closed. This codepath will only be hit in the very, very unlikely event of the
// Lambda control plane demanding to close connection. (It's more or less only
// implemented to support http1.1 correctly.) This behavior is ensured with the test
// `LambdaTest.testNoKeepAliveServer`.
self.state = .waitForConnectionClose(httpResponse, promise)
_ = context.channel.close()
return
} else {
self.state = .idle
promise.succeed(httpResponse)
}
}
func errorCaught(context: ChannelHandlerContext, error: Error) {
// pending responses will fail with lastError in channelInactive since we are calling context.close
self.lastError = error
context.channel.close(promise: nil)
}
func channelInactive(context: ChannelHandlerContext) {
// fail any pending responses with last error or assume peer disconnected
context.fireChannelInactive()
switch self.state {
case .idle:
break
case .running(let promise, let timeout):
self.state = .idle
timeout?.cancel()
promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer)
case .waitForConnectionClose(let response, let promise):
self.state = .idle
promise.succeed(response)
}
}
func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
switch event {
case is RequestCancelEvent:
switch self.state {
case .idle:
break
case .running(let promise, let timeout):
self.state = .idle
timeout?.cancel()
promise.fail(HTTPClient.Errors.cancelled)
// after the cancel error has been send, we want to close the connection so
// that no more packets can be read on this connection.
_ = context.channel.close()
case .waitForConnectionClose(_, let promise):
self.state = .idle
promise.fail(HTTPClient.Errors.cancelled)
}
default:
context.triggerUserOutboundEvent(event, promise: promise)
}
}
}
private struct HTTPRequestWrapper {
let request: HTTPClient.Request
let promise: EventLoopPromise<HTTPClient.Response>
}
private struct RequestCancelEvent {}
@@ -55,14 +55,13 @@ private enum LocalLambda {
private let port: Int
private let invocationEndpoint: String
public init(invocationEndpoint: String?) {
let configuration = LambdaConfiguration()
init(invocationEndpoint: String?) {
var logger = Logger(label: "LocalLambdaServer")
logger.logLevel = configuration.general.logLevel
logger.logLevel = .info
self.logger = logger
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
self.host = configuration.runtimeEngine.ip
self.port = configuration.runtimeEngine.port
self.host = "127.0.0.1"
self.port = 0
self.invocationEndpoint = invocationEndpoint ?? "/invoke"
}
@@ -1,77 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
// MARK: - SimpleLambdaHandler String support
extension SimpleLambdaHandler where Event == String {
/// Implementation of a `ByteBuffer` to `String` decoding.
@inlinable
public func decode(buffer: ByteBuffer) throws -> Event {
guard let value = buffer.getString(at: buffer.readerIndex, length: buffer.readableBytes) else {
throw CodecError.invalidString
}
return value
}
}
extension SimpleLambdaHandler where Output == String {
/// Implementation of `String` to `ByteBuffer` encoding.
@inlinable
public func encode(value: Output, into buffer: inout ByteBuffer) throws {
buffer.writeString(value)
}
}
// MARK: - LambdaHandler String support
extension LambdaHandler where Event == String {
/// Implementation of a `ByteBuffer` to `String` decoding.
@inlinable
public func decode(buffer: ByteBuffer) throws -> Event {
guard let value = buffer.getString(at: buffer.readerIndex, length: buffer.readableBytes) else {
throw CodecError.invalidString
}
return value
}
}
extension LambdaHandler where Output == String {
/// Implementation of `String` to `ByteBuffer` encoding.
@inlinable
public func encode(value: Output, into buffer: inout ByteBuffer) throws {
buffer.writeString(value)
}
}
// MARK: - EventLoopLambdaHandler String support
extension EventLoopLambdaHandler where Event == String {
/// Implementation of `String` to `ByteBuffer` encoding.
@inlinable
public func decode(buffer: ByteBuffer) throws -> Event {
guard let value = buffer.getString(at: buffer.readerIndex, length: buffer.readableBytes) else {
throw CodecError.invalidString
}
return value
}
}
extension EventLoopLambdaHandler where Output == String {
/// Implementation of a `ByteBuffer` to `String` decoding.
@inlinable
public func encode(value: Output, into buffer: inout ByteBuffer) throws {
buffer.writeString(value)
}
}
+1 -144
View File
@@ -28,150 +28,7 @@ import ucrt
#error("Unsupported platform")
#endif
#if swift(<5.9)
import Backtrace
#endif
public enum Lambda {
/// Run a Lambda defined by implementing the ``SimpleLambdaHandler`` protocol.
/// The Runtime will manage the Lambdas application lifecycle automatically.
///
/// - parameters:
/// - configuration: A Lambda runtime configuration object
/// - handlerType: The Handler to create and invoke.
///
/// - note: This is a blocking operation that will run forever, as its lifecycle is managed by the AWS Lambda Runtime Engine.
static func run<Handler: SimpleLambdaHandler>(
configuration: LambdaConfiguration = .init(),
handlerType: Handler.Type
) -> Result<Int, Error> {
self.run(
configuration: configuration,
handlerProvider: CodableSimpleLambdaHandler<Handler>.makeHandler(context:)
)
}
/// Run a Lambda defined by implementing the ``LambdaHandler`` protocol.
/// The Runtime will manage the Lambdas application lifecycle automatically. It will invoke the
/// ``LambdaHandler/makeHandler(context:)`` to create a new Handler.
///
/// - parameters:
/// - configuration: A Lambda runtime configuration object
/// - handlerType: The Handler to create and invoke.
///
/// - note: This is a blocking operation that will run forever, as its lifecycle is managed by the AWS Lambda Runtime Engine.
static func run<Handler: LambdaHandler>(
configuration: LambdaConfiguration = .init(),
handlerType: Handler.Type
) -> Result<Int, Error> {
self.run(configuration: configuration, handlerProvider: CodableLambdaHandler<Handler>.makeHandler(context:))
}
/// Run a Lambda defined by implementing the ``EventLoopLambdaHandler`` protocol.
/// The Runtime will manage the Lambdas application lifecycle automatically. It will invoke the
/// ``EventLoopLambdaHandler/makeHandler(context:)`` to create a new Handler.
///
/// - parameters:
/// - configuration: A Lambda runtime configuration object
/// - handlerType: The Handler to create and invoke.
///
/// - note: This is a blocking operation that will run forever, as its lifecycle is managed by the AWS Lambda Runtime Engine.
static func run<Handler: EventLoopLambdaHandler>(
configuration: LambdaConfiguration = .init(),
handlerType: Handler.Type
) -> Result<Int, Error> {
self.run(
configuration: configuration,
handlerProvider: CodableEventLoopLambdaHandler<Handler>.makeHandler(context:)
)
}
/// Run a Lambda defined by implementing the ``ByteBufferLambdaHandler`` protocol.
/// The Runtime will manage the Lambdas application lifecycle automatically. It will invoke the
/// ``ByteBufferLambdaHandler/makeHandler(context:)`` to create a new Handler.
///
/// - parameters:
/// - configuration: A Lambda runtime configuration object
/// - handlerType: The Handler to create and invoke.
///
/// - note: This is a blocking operation that will run forever, as its lifecycle is managed by the AWS Lambda Runtime Engine.
static func run(
configuration: LambdaConfiguration = .init(),
handlerType: (some ByteBufferLambdaHandler).Type
) -> Result<Int, Error> {
self.run(configuration: configuration, handlerProvider: handlerType.makeHandler(context:))
}
/// Run a Lambda defined by implementing the ``LambdaRuntimeHandler`` protocol.
/// - parameters:
/// - configuration: A Lambda runtime configuration object
/// - handlerProvider: A provider of the ``LambdaRuntimeHandler`` to invoke.
///
/// - note: This is a blocking operation that will run forever, as its lifecycle is managed by the AWS Lambda Runtime Engine.
static func run(
configuration: LambdaConfiguration = .init(),
handlerProvider: @escaping (LambdaInitializationContext) -> EventLoopFuture<some LambdaRuntimeHandler>
) -> Result<Int, Error> {
let _run = { (configuration: LambdaConfiguration) -> Result<Int, Error> in
#if swift(<5.9)
Backtrace.install()
#endif
var logger = Logger(label: "Lambda")
logger.logLevel = configuration.general.logLevel
var result: Result<Int, Error>!
MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { eventLoop in
let runtime = LambdaRuntime(
handlerProvider: handlerProvider,
eventLoop: eventLoop,
logger: logger,
configuration: configuration
)
#if DEBUG
let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in
logger.info("intercepted signal: \(signal)")
runtime.shutdown()
}
#endif
runtime.start().flatMap {
runtime.shutdownFuture
}.whenComplete { lifecycleResult in
#if DEBUG
signalSource.cancel()
#endif
eventLoop.shutdownGracefully { error in
if let error = error {
preconditionFailure("Failed to shutdown eventloop: \(error)")
}
}
result = lifecycleResult
}
}
logger.info("shutdown completed")
return result
}
// start local server for debugging in DEBUG mode only
#if DEBUG
if Lambda.env("LOCAL_LAMBDA_SERVER_ENABLED").flatMap(Bool.init) ?? false {
do {
return try Lambda.withLocalServer(
invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT")
) {
_run(configuration)
}
} catch {
return .failure(error)
}
} else {
return _run(configuration)
}
#else
return _run(configuration)
#endif
}
}
enum Lambda {}
// MARK: - Public API
@@ -1,89 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Dispatch
import Logging
import NIOCore
struct LambdaConfiguration: CustomStringConvertible {
let general: General
let lifecycle: Lifecycle
let runtimeEngine: RuntimeEngine
init() {
self.init(general: .init(), lifecycle: .init(), runtimeEngine: .init())
}
init(general: General? = nil, lifecycle: Lifecycle? = nil, runtimeEngine: RuntimeEngine? = nil) {
self.general = general ?? General()
self.lifecycle = lifecycle ?? Lifecycle()
self.runtimeEngine = runtimeEngine ?? RuntimeEngine()
}
struct General: CustomStringConvertible {
let logLevel: Logger.Level
init(logLevel: Logger.Level? = nil) {
self.logLevel = logLevel ?? Lambda.env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info
}
var description: String {
"\(General.self)(logLevel: \(self.logLevel))"
}
}
struct Lifecycle: CustomStringConvertible {
let id: String
let maxTimes: Int
let stopSignal: Signal
init(id: String? = nil, maxTimes: Int? = nil, stopSignal: Signal? = nil) {
self.id = id ?? "\(DispatchTime.now().uptimeNanoseconds)"
self.maxTimes = maxTimes ?? Lambda.env("MAX_REQUESTS").flatMap(Int.init) ?? 0
self.stopSignal =
stopSignal ?? Lambda.env("STOP_SIGNAL").flatMap(Int32.init).flatMap(Signal.init) ?? Signal.TERM
precondition(self.maxTimes >= 0, "maxTimes must be equal or larger than 0")
}
var description: String {
"\(Lifecycle.self)(id: \(self.id), maxTimes: \(self.maxTimes), stopSignal: \(self.stopSignal))"
}
}
struct RuntimeEngine: CustomStringConvertible {
let ip: String
let port: Int
let requestTimeout: TimeAmount?
init(address: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) {
let ipPort =
(address ?? Lambda.env("AWS_LAMBDA_RUNTIME_API"))?.split(separator: ":") ?? ["127.0.0.1", "7000"]
guard ipPort.count == 2, let port = Int(ipPort[1]) else {
preconditionFailure("invalid ip+port configuration \(ipPort)")
}
self.ip = String(ipPort[0])
self.port = port
self.requestTimeout =
requestTimeout ?? Lambda.env("REQUEST_TIMEOUT").flatMap(Int64.init).flatMap { .milliseconds($0) }
}
var description: String {
"\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), requestTimeout: \(String(describing: self.requestTimeout))"
}
}
var description: String {
"\(Self.self)\n \(self.general))\n \(self.lifecycle)\n \(self.runtimeEngine)"
}
}
@@ -1,242 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Logging
import NIOCore
#if swift(<5.9)
@preconcurrency import Dispatch
#else
import Dispatch
#endif
// MARK: - InitializationContext
/// Lambda runtime initialization context.
/// The Lambda runtime generates and passes the `LambdaInitializationContext` to the Handlers
/// ``ByteBufferLambdaHandler/makeHandler(context:)`` or ``LambdaHandler/init(context:)``
/// as an argument.
public struct LambdaInitializationContext: Sendable {
/// `Logger` to log with.
///
/// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable.
public let logger: Logger
/// The `EventLoop` the Lambda is executed on. Use this to schedule work with.
///
/// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care.
/// Most importantly the `EventLoop` must never be blocked.
public let eventLoop: EventLoop
/// `ByteBufferAllocator` to allocate `ByteBuffer`.
public let allocator: ByteBufferAllocator
/// ``LambdaTerminator`` to register shutdown operations.
public let terminator: LambdaTerminator
init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, terminator: LambdaTerminator) {
self.eventLoop = eventLoop
self.logger = logger
self.allocator = allocator
self.terminator = terminator
}
/// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning.
public static func __forTestsOnly(
logger: Logger,
eventLoop: EventLoop
) -> LambdaInitializationContext {
LambdaInitializationContext(
logger: logger,
eventLoop: eventLoop,
allocator: ByteBufferAllocator(),
terminator: LambdaTerminator()
)
}
}
// MARK: - Context
/// Lambda runtime context.
/// The Lambda runtime generates and passes the `LambdaContext` to the Lambda handler as an argument.
public struct LambdaContext: CustomDebugStringConvertible, Sendable {
final class _Storage: Sendable {
let requestID: String
let traceID: String
let invokedFunctionARN: String
let deadline: DispatchWallTime
let cognitoIdentity: String?
let clientContext: String?
let logger: Logger
let eventLoop: EventLoop
let allocator: ByteBufferAllocator
let tasks: DetachedTasksContainer
init(
requestID: String,
traceID: String,
invokedFunctionARN: String,
deadline: DispatchWallTime,
cognitoIdentity: String?,
clientContext: String?,
logger: Logger,
eventLoop: EventLoop,
allocator: ByteBufferAllocator,
tasks: DetachedTasksContainer
) {
self.requestID = requestID
self.traceID = traceID
self.invokedFunctionARN = invokedFunctionARN
self.deadline = deadline
self.cognitoIdentity = cognitoIdentity
self.clientContext = clientContext
self.logger = logger
self.eventLoop = eventLoop
self.allocator = allocator
self.tasks = tasks
}
}
private var storage: _Storage
/// The request ID, which identifies the request that triggered the function invocation.
public var requestID: String {
self.storage.requestID
}
/// The AWS X-Ray tracing header.
public var traceID: String {
self.storage.traceID
}
/// The ARN of the Lambda function, version, or alias that's specified in the invocation.
public var invokedFunctionARN: String {
self.storage.invokedFunctionARN
}
/// The timestamp that the function times out.
public var deadline: DispatchWallTime {
self.storage.deadline
}
/// For invocations from the AWS Mobile SDK, data about the Amazon Cognito identity provider.
public var cognitoIdentity: String? {
self.storage.cognitoIdentity
}
/// For invocations from the AWS Mobile SDK, data about the client application and device.
public var clientContext: String? {
self.storage.clientContext
}
/// `Logger` to log with.
///
/// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable.
public var logger: Logger {
self.storage.logger
}
/// The `EventLoop` the Lambda is executed on. Use this to schedule work with.
/// This is useful when implementing the ``EventLoopLambdaHandler`` protocol.
///
/// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care.
/// Most importantly the `EventLoop` must never be blocked.
public var eventLoop: EventLoop {
self.storage.eventLoop
}
/// `ByteBufferAllocator` to allocate `ByteBuffer`.
/// This is useful when implementing ``EventLoopLambdaHandler``.
public var allocator: ByteBufferAllocator {
self.storage.allocator
}
init(
requestID: String,
traceID: String,
invokedFunctionARN: String,
deadline: DispatchWallTime,
cognitoIdentity: String? = nil,
clientContext: String? = nil,
logger: Logger,
eventLoop: EventLoop,
allocator: ByteBufferAllocator
) {
self.storage = _Storage(
requestID: requestID,
traceID: traceID,
invokedFunctionARN: invokedFunctionARN,
deadline: deadline,
cognitoIdentity: cognitoIdentity,
clientContext: clientContext,
logger: logger,
eventLoop: eventLoop,
allocator: allocator,
tasks: DetachedTasksContainer(
context: DetachedTasksContainer.Context(
eventLoop: eventLoop,
logger: logger
)
)
)
}
public func getRemainingTime() -> TimeAmount {
let deadline = self.deadline.millisSinceEpoch
let now = DispatchWallTime.now().millisSinceEpoch
let remaining = deadline - now
return .milliseconds(remaining)
}
var tasks: DetachedTasksContainer {
self.storage.tasks
}
/// Registers a background task that continues running after the synchronous invocation has completed.
/// This is useful for tasks like flushing metrics or performing clean-up operations without delaying the response.
///
/// - Parameter body: An asynchronous closure that performs the background task.
/// - Warning: You will be billed for the milliseconds of Lambda execution time until the very last
/// background task is finished.
public func detachedBackgroundTask(_ body: @escaping @Sendable () async -> Void) {
Task {
await self.tasks.detached(task: body)
}
}
public var debugDescription: String {
"\(Self.self)(requestID: \(self.requestID), traceID: \(self.traceID), invokedFunctionARN: \(self.invokedFunctionARN), cognitoIdentity: \(self.cognitoIdentity ?? "nil"), clientContext: \(self.clientContext ?? "nil"), deadline: \(self.deadline))"
}
/// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning.
public static func __forTestsOnly(
requestID: String,
traceID: String,
invokedFunctionARN: String,
timeout: DispatchTimeInterval,
logger: Logger,
eventLoop: EventLoop
) -> LambdaContext {
LambdaContext(
requestID: requestID,
traceID: traceID,
invokedFunctionARN: invokedFunctionARN,
deadline: .now() + timeout,
logger: logger,
eventLoop: eventLoop,
allocator: ByteBufferAllocator()
)
}
}
@@ -1,462 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Dispatch
import NIOCore
// MARK: - SimpleLambdaHandler
/// Strongly typed, processing protocol for a Lambda that takes a user defined
/// ``SimpleLambdaHandler/Event`` and returns a user defined
/// ``SimpleLambdaHandler/Output`` asynchronously.
///
/// - note: Most users should implement the ``LambdaHandler`` protocol instead
/// which defines the Lambda initialization method.
public protocol SimpleLambdaHandler {
/// The lambda function's input. In most cases this should be `Codable`. If your event originates from an
/// AWS service, have a look at [AWSLambdaEvents](https://github.com/swift-server/swift-aws-lambda-events),
/// which provides a number of commonly used AWS Event implementations.
associatedtype Event
/// The lambda function's output. Can be `Void`.
associatedtype Output
init()
/// The Lambda handling method.
/// Concrete Lambda handlers implement this method to provide the Lambda functionality.
///
/// - parameters:
/// - event: Event of type `Event` representing the event or request.
/// - context: Runtime ``LambdaContext``.
///
/// - Returns: A Lambda result ot type `Output`.
func handle(_ event: Event, context: LambdaContext) async throws -> Output
/// Encode a response of type ``Output`` to `ByteBuffer`.
/// Concrete Lambda handlers implement this method to provide coding functionality.
/// - parameters:
/// - value: Response of type ``Output``.
/// - buffer: A `ByteBuffer` to encode into, will be overwritten.
func encode(value: Output, into buffer: inout ByteBuffer) throws
/// Decode a `ByteBuffer` to a request or event of type ``Event``.
/// Concrete Lambda handlers implement this method to provide coding functionality.
///
/// - parameters:
/// - buffer: The `ByteBuffer` to decode.
///
/// - Returns: A request or event of type ``Event``.
func decode(buffer: ByteBuffer) throws -> Event
}
@usableFromInline
final class CodableSimpleLambdaHandler<Underlying: SimpleLambdaHandler>: ByteBufferLambdaHandler {
@usableFromInline
let handler: Underlying
@usableFromInline
private(set) var outputBuffer: ByteBuffer
@inlinable
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<CodableSimpleLambdaHandler> {
let promise = context.eventLoop.makePromise(of: CodableSimpleLambdaHandler<Underlying>.self)
promise.completeWithTask {
let handler = Underlying()
return CodableSimpleLambdaHandler(handler: handler, allocator: context.allocator)
}
return promise.futureResult
}
@inlinable
init(handler: Underlying, allocator: ByteBufferAllocator) {
self.handler = handler
self.outputBuffer = allocator.buffer(capacity: 1024 * 1024)
}
@inlinable
func handle(_ buffer: ByteBuffer, context: LambdaContext) -> EventLoopFuture<ByteBuffer?> {
let promise = context.eventLoop.makePromise(of: ByteBuffer?.self)
promise.completeWithTask {
let input: Underlying.Event
do {
input = try self.handler.decode(buffer: buffer)
} catch {
throw CodecError.requestDecoding(error)
}
let output = try await self.handler.handle(input, context: context)
do {
self.outputBuffer.clear()
try self.handler.encode(value: output, into: &self.outputBuffer)
return self.outputBuffer
} catch {
throw CodecError.responseEncoding(error)
}
}
return promise.futureResult
}
}
/// Implementation of `ByteBuffer` to `Void` decoding.
extension SimpleLambdaHandler where Output == Void {
@inlinable
public func encode(value: Output, into buffer: inout ByteBuffer) throws {}
}
extension SimpleLambdaHandler {
/// Initializes and runs the Lambda function.
///
/// If you precede your ``SimpleLambdaHandler`` conformer's declaration with the
/// [@main](https://docs.swift.org/swift-book/ReferenceManual/Attributes.html#ID626)
/// attribute, the system calls the conformer's `main()` method to launch the lambda function.
///
/// The lambda runtime provides a default implementation of the method that manages the launch
/// process.
public static func main() {
_ = Lambda.run(configuration: .init(), handlerType: Self.self)
}
}
// MARK: - LambdaHandler
/// Strongly typed, processing protocol for a Lambda that takes a user defined
/// ``LambdaHandler/Event`` and returns a user defined
/// ``LambdaHandler/Output`` asynchronously.
///
/// - note: Most users should implement this protocol instead of the lower
/// level protocols ``EventLoopLambdaHandler`` and
/// ``ByteBufferLambdaHandler``.
public protocol LambdaHandler {
/// The lambda function's input. In most cases this should be `Codable`. If your event originates from an
/// AWS service, have a look at [AWSLambdaEvents](https://github.com/swift-server/swift-aws-lambda-events),
/// which provides a number of commonly used AWS Event implementations.
associatedtype Event
/// The lambda function's output. Can be `Void`.
associatedtype Output
/// The Lambda initialization method.
/// Use this method to initialize resources that will be used in every request.
///
/// Examples for this can be HTTP or database clients.
/// - parameters:
/// - context: Runtime ``LambdaInitializationContext``.
init(context: LambdaInitializationContext) async throws
/// The Lambda handling method.
/// Concrete Lambda handlers implement this method to provide the Lambda functionality.
///
/// - parameters:
/// - event: Event of type `Event` representing the event or request.
/// - context: Runtime ``LambdaContext``.
///
/// - Returns: A Lambda result ot type `Output`.
func handle(_ event: Event, context: LambdaContext) async throws -> Output
/// Encode a response of type ``Output`` to `ByteBuffer`.
/// Concrete Lambda handlers implement this method to provide coding functionality.
/// - parameters:
/// - value: Response of type ``Output``.
/// - buffer: A `ByteBuffer` to encode into, will be overwritten.
func encode(value: Output, into buffer: inout ByteBuffer) throws
/// Decode a `ByteBuffer` to a request or event of type ``Event``.
/// Concrete Lambda handlers implement this method to provide coding functionality.
///
/// - parameters:
/// - buffer: The `ByteBuffer` to decode.
///
/// - Returns: A request or event of type ``Event``.
func decode(buffer: ByteBuffer) throws -> Event
}
@usableFromInline
final class CodableLambdaHandler<Underlying: LambdaHandler>: ByteBufferLambdaHandler {
@usableFromInline
let handler: Underlying
@usableFromInline
private(set) var outputBuffer: ByteBuffer
@inlinable
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<CodableLambdaHandler> {
let promise = context.eventLoop.makePromise(of: CodableLambdaHandler<Underlying>.self)
promise.completeWithTask {
let handler = try await Underlying(context: context)
return CodableLambdaHandler(handler: handler, allocator: context.allocator)
}
return promise.futureResult
}
@inlinable
init(handler: Underlying, allocator: ByteBufferAllocator) {
self.handler = handler
self.outputBuffer = allocator.buffer(capacity: 1024 * 1024)
}
@inlinable
func handle(_ buffer: ByteBuffer, context: LambdaContext) -> EventLoopFuture<ByteBuffer?> {
let promise = context.eventLoop.makePromise(of: ByteBuffer?.self)
promise.completeWithTask {
let input: Underlying.Event
do {
input = try self.handler.decode(buffer: buffer)
} catch {
throw CodecError.requestDecoding(error)
}
let output = try await self.handler.handle(input, context: context)
do {
self.outputBuffer.clear()
try self.handler.encode(value: output, into: &self.outputBuffer)
return self.outputBuffer
} catch {
throw CodecError.responseEncoding(error)
}
}
return promise.futureResult
}
}
/// Implementation of `ByteBuffer` to `Void` decoding.
extension LambdaHandler where Output == Void {
@inlinable
public func encode(value: Output, into buffer: inout ByteBuffer) throws {}
}
extension LambdaHandler {
/// Initializes and runs the Lambda function.
///
/// If you precede your ``LambdaHandler`` conformer's declaration with the
/// [@main](https://docs.swift.org/swift-book/ReferenceManual/Attributes.html#ID626)
/// attribute, the system calls the conformer's `main()` method to launch the lambda function.
///
/// The lambda runtime provides a default implementation of the method that manages the launch
/// process.
public static func main() {
_ = Lambda.run(configuration: .init(), handlerType: Self.self)
}
}
/// unchecked sendable wrapper for the handler
/// this is safe since lambda runtime is designed to calls the handler serially
@usableFromInline
struct UncheckedSendableHandler<Underlying: LambdaHandler, Event, Output>: @unchecked Sendable
where Event == Underlying.Event, Output == Underlying.Output {
@usableFromInline
let underlying: Underlying
@inlinable
init(underlying: Underlying) {
self.underlying = underlying
}
@inlinable
func handle(_ event: Event, context: LambdaContext) async throws -> Output {
try await self.underlying.handle(event, context: context)
}
}
// MARK: - EventLoopLambdaHandler
/// Strongly typed, `EventLoopFuture` based processing protocol for a Lambda that takes a user
/// defined ``EventLoopLambdaHandler/Event`` and returns a user defined ``EventLoopLambdaHandler/Output`` asynchronously.
///
/// - note: To implement a Lambda, implement either ``LambdaHandler`` or the
/// ``EventLoopLambdaHandler`` protocol. The ``LambdaHandler`` will offload
/// the Lambda execution to an async Task making processing safer but slower (due to
/// fewer thread hops).
/// The ``EventLoopLambdaHandler`` will execute the Lambda on the same `EventLoop`
/// as the core runtime engine, making the processing faster but requires more care from the
/// implementation to never block the `EventLoop`. Implement this protocol only in performance
/// critical situations and implement ``LambdaHandler`` in all other circumstances.
public protocol EventLoopLambdaHandler {
/// The lambda functions input. In most cases this should be `Codable`. If your event originates from an
/// AWS service, have a look at [AWSLambdaEvents](https://github.com/swift-server/swift-aws-lambda-events),
/// which provides a number of commonly used AWS Event implementations.
associatedtype Event
/// The lambda functions output. Can be `Void`.
associatedtype Output
/// Create a Lambda handler for the runtime.
///
/// Use this to initialize all your resources that you want to cache between invocations. This could be database
/// connections and HTTP clients for example. It is encouraged to use the given `EventLoop`'s conformance
/// to `EventLoopGroup` when initializing NIO dependencies. This will improve overall performance, as it
/// minimizes thread hopping.
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Self>
/// The Lambda handling method.
/// Concrete Lambda handlers implement this method to provide the Lambda functionality.
///
/// - parameters:
/// - context: Runtime ``LambdaContext``.
/// - event: Event of type `Event` representing the event or request.
///
/// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine.
/// The `EventLoopFuture` should be completed with either a response of type ``Output`` or an `Error`.
func handle(_ event: Event, context: LambdaContext) -> EventLoopFuture<Output>
/// Encode a response of type ``Output`` to `ByteBuffer`.
/// Concrete Lambda handlers implement this method to provide coding functionality.
/// - parameters:
/// - value: Response of type ``Output``.
/// - buffer: A `ByteBuffer` to encode into, will be overwritten.
///
/// - Returns: A `ByteBuffer` with the encoded version of the `value`.
func encode(value: Output, into buffer: inout ByteBuffer) throws
/// Decode a `ByteBuffer` to a request or event of type ``Event``.
/// Concrete Lambda handlers implement this method to provide coding functionality.
///
/// - parameters:
/// - buffer: The `ByteBuffer` to decode.
///
/// - Returns: A request or event of type ``Event``.
func decode(buffer: ByteBuffer) throws -> Event
}
/// Implementation of `ByteBuffer` to `Void` decoding.
extension EventLoopLambdaHandler where Output == Void {
@inlinable
public func encode(value: Output, into buffer: inout ByteBuffer) throws {}
}
@usableFromInline
final class CodableEventLoopLambdaHandler<Underlying: EventLoopLambdaHandler>: ByteBufferLambdaHandler {
@usableFromInline
let handler: Underlying
@usableFromInline
private(set) var outputBuffer: ByteBuffer
@inlinable
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<CodableEventLoopLambdaHandler> {
Underlying.makeHandler(context: context).map { handler -> CodableEventLoopLambdaHandler<Underlying> in
CodableEventLoopLambdaHandler(handler: handler, allocator: context.allocator)
}
}
@inlinable
init(handler: Underlying, allocator: ByteBufferAllocator) {
self.handler = handler
self.outputBuffer = allocator.buffer(capacity: 1024 * 1024)
}
@inlinable
func handle(_ buffer: ByteBuffer, context: LambdaContext) -> EventLoopFuture<ByteBuffer?> {
let input: Underlying.Event
do {
input = try self.handler.decode(buffer: buffer)
} catch {
return context.eventLoop.makeFailedFuture(CodecError.requestDecoding(error))
}
return self.handler.handle(input, context: context).flatMapThrowing { output in
do {
self.outputBuffer.clear()
try self.handler.encode(value: output, into: &self.outputBuffer)
return self.outputBuffer
} catch {
throw CodecError.responseEncoding(error)
}
}
}
}
extension EventLoopLambdaHandler {
/// Initializes and runs the Lambda function.
///
/// If you precede your ``EventLoopLambdaHandler`` conformer's declaration with the
/// [@main](https://docs.swift.org/swift-book/ReferenceManual/Attributes.html#ID626)
/// attribute, the system calls the conformer's `main()` method to launch the lambda function.
///
/// The lambda runtime provides a default implementation of the method that manages the launch
/// process.
public static func main() {
_ = Lambda.run(configuration: .init(), handlerType: Self.self)
}
}
// MARK: - ByteBufferLambdaHandler
/// An `EventLoopFuture` based processing protocol for a Lambda that takes a `ByteBuffer` and returns
/// an optional `ByteBuffer` asynchronously.
///
/// - note: This is a low level protocol designed to power the higher level ``EventLoopLambdaHandler`` and
/// ``LambdaHandler`` based APIs.
/// Most users are not expected to use this protocol.
public protocol ByteBufferLambdaHandler: LambdaRuntimeHandler {
/// Create a Lambda handler for the runtime.
///
/// Use this to initialize all your resources that you want to cache between invocations. This could be database
/// connections and HTTP clients for example. It is encouraged to use the given `EventLoop`'s conformance
/// to `EventLoopGroup` when initializing NIO dependencies. This will improve overall performance, as it
/// minimizes thread hopping.
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Self>
/// The Lambda handling method.
/// Concrete Lambda handlers implement this method to provide the Lambda functionality.
///
/// - parameters:
/// - buffer: The event or input payload encoded as `ByteBuffer`.
/// - context: Runtime ``LambdaContext``.
///
/// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine.
/// The `EventLoopFuture` should be completed with either a response encoded as `ByteBuffer` or an `Error`.
func handle(_ buffer: ByteBuffer, context: LambdaContext) -> EventLoopFuture<ByteBuffer?>
}
extension ByteBufferLambdaHandler {
/// Initializes and runs the Lambda function.
///
/// If you precede your ``ByteBufferLambdaHandler`` conformer's declaration with the
/// [@main](https://docs.swift.org/swift-book/ReferenceManual/Attributes.html#ID626)
/// attribute, the system calls the conformer's `main()` method to launch the lambda function.
///
/// The lambda runtime provides a default implementation of the method that manages the launch
/// process.
public static func main() {
_ = Lambda.run(configuration: .init(), handlerType: Self.self)
}
}
// MARK: - LambdaRuntimeHandler
/// An `EventLoopFuture` based processing protocol for a Lambda that takes a `ByteBuffer` and returns
/// an optional `ByteBuffer` asynchronously.
///
/// - note: This is a low level protocol designed to enable use cases where a frameworks initializes the
/// runtime with a handler outside the normal initialization of
/// ``ByteBufferLambdaHandler``, ``EventLoopLambdaHandler`` and ``LambdaHandler`` based APIs.
/// Most users are not expected to use this protocol.
public protocol LambdaRuntimeHandler {
/// The Lambda handling method.
/// Concrete Lambda handlers implement this method to provide the Lambda functionality.
///
/// - parameters:
/// - buffer: The event or input payload encoded as `ByteBuffer`.
/// - context: Runtime ``LambdaContext``.
///
/// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine.
/// The `EventLoopFuture` should be completed with either a response encoded as `ByteBuffer` or an `Error`.
func handle(_ buffer: ByteBuffer, context: LambdaContext) -> EventLoopFuture<ByteBuffer?>
}
// MARK: - Other
@usableFromInline
enum CodecError: Error {
case requestDecoding(Error)
case responseEncoding(Error)
case invalidString
}
@@ -1,192 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Dispatch
import Logging
import NIOCore
/// LambdaRunner manages the Lambda runtime workflow, or business logic.
final class LambdaRunner {
private let runtimeClient: LambdaRuntimeClient
private let eventLoop: EventLoop
private let allocator: ByteBufferAllocator
private var isGettingNextInvocation = false
init(eventLoop: EventLoop, configuration: LambdaConfiguration) {
self.eventLoop = eventLoop
self.runtimeClient = LambdaRuntimeClient(eventLoop: self.eventLoop, configuration: configuration.runtimeEngine)
self.allocator = ByteBufferAllocator()
}
/// Run the user provided initializer. This *must* only be called once.
///
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization.
func initialize<Handler: LambdaRuntimeHandler>(
handlerProvider: @escaping (LambdaInitializationContext) -> EventLoopFuture<Handler>,
logger: Logger,
terminator: LambdaTerminator
) -> EventLoopFuture<Handler> {
logger.debug("initializing lambda")
// 1. create the handler from the factory
// 2. report initialization error if one occurred
let context = LambdaInitializationContext(
logger: logger,
eventLoop: self.eventLoop,
allocator: self.allocator,
terminator: terminator
)
return handlerProvider(context)
// Hopping back to "our" EventLoop is important in case the factory returns a future
// that originated from a foreign EventLoop/EventLoopGroup.
// This can happen if the factory uses a library (let's say a database client) that manages its own threads/loops
// for whatever reason and returns a future that originated from that foreign EventLoop.
.hop(to: self.eventLoop)
.peekError { error in
self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in
// We're going to bail out because the init failed, so there's not a lot we can do other than log
// that we couldn't report this error back to the runtime.
logger.error("failed reporting initialization error to lambda runtime engine: \(reportingError)")
}
}
}
func run(handler: some LambdaRuntimeHandler, logger: Logger) -> EventLoopFuture<Void> {
logger.debug("lambda invocation sequence starting")
// 1. request invocation from lambda runtime engine
self.isGettingNextInvocation = true
return self.runtimeClient.getNextInvocation(logger: logger).peekError { error in
logger.debug("could not fetch work from lambda runtime engine: \(error)")
}.flatMap { invocation, bytes in
// 2. send invocation to handler
self.isGettingNextInvocation = false
let context = LambdaContext(
logger: logger,
eventLoop: self.eventLoop,
allocator: self.allocator,
invocation: invocation
)
// when log level is trace or lower, print the first Kb of the payload
if logger.logLevel <= .trace, let buffer = bytes.getSlice(at: 0, length: max(bytes.readableBytes, 1024)) {
logger.trace(
"sending invocation to lambda handler",
metadata: ["1024 first bytes": .string(String(buffer: buffer))]
)
} else {
logger.debug("sending invocation to lambda handler")
}
return handler.handle(bytes, context: context)
// Hopping back to "our" EventLoop is important in case the handler returns a future that
// originated from a foreign EventLoop/EventLoopGroup.
// This can happen if the handler uses a library (lets say a DB client) that manages its own threads/loops
// for whatever reason and returns a future that originated from that foreign EventLoop.
.hop(to: self.eventLoop)
.mapResult { result in
if case .failure(let error) = result {
logger.warning("lambda handler returned an error: \(error)")
}
return (invocation, result, context)
}
}.flatMap { invocation, result, context in
// 3. report results to runtime engine
self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError {
error in
logger.error("could not report results to lambda runtime engine: \(error)")
// To discuss:
// Do we want to await the tasks in this case?
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await context.tasks.awaitAll().get()
}
return promise.futureResult
}.map { _ in context }
}
.flatMap { (context: LambdaContext) -> EventLoopFuture<Void> in
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await context.tasks.awaitAll().get()
}
return promise.futureResult
}
}
/// cancels the current run, if we are waiting for next invocation (long poll from Lambda control plane)
/// only needed for debugging purposes.
func cancelWaitingForNextInvocation() {
if self.isGettingNextInvocation {
self.runtimeClient.cancel()
}
}
}
extension LambdaContext {
init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, invocation: InvocationMetadata) {
self.init(
requestID: invocation.requestID,
traceID: invocation.traceID,
invokedFunctionARN: invocation.invokedFunctionARN,
deadline: DispatchWallTime(millisSinceEpoch: invocation.deadlineInMillisSinceEpoch),
cognitoIdentity: invocation.cognitoIdentity,
clientContext: invocation.clientContext,
logger: logger,
eventLoop: eventLoop,
allocator: allocator
)
}
}
// TODO: move to nio?
extension EventLoopFuture {
// callback does not have side effects, failing with original result
func peekError(_ callback: @escaping (Error) -> Void) -> EventLoopFuture<Value> {
self.flatMapError { error in
callback(error)
return self
}
}
// callback does not have side effects, failing with original result
func peekError(_ callback: @escaping (Error) -> EventLoopFuture<Void>) -> EventLoopFuture<Value> {
self.flatMapError { error in
let promise = self.eventLoop.makePromise(of: Value.self)
callback(error).whenComplete { _ in
promise.completeWith(self)
}
return promise.futureResult
}
}
func mapResult<NewValue>(_ callback: @escaping (Result<Value, Error>) -> NewValue) -> EventLoopFuture<NewValue> {
self.map { value in
callback(.success(value))
}.flatMapErrorThrowing { error in
callback(.failure(error))
}
}
}
extension Result {
private var successful: Bool {
switch self {
case .success:
return true
case .failure:
return false
}
}
}
/// This is safe since lambda runtime synchronizes by dispatching all methods to a single `EventLoop`
extension LambdaRunner: @unchecked Sendable {}
@@ -1,352 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Logging
import NIOConcurrencyHelpers
import NIOCore
/// `LambdaRuntime` manages the Lambda process lifecycle.
///
/// Use this API, if you build a higher level web framework which shall be able to run inside the Lambda environment.
public final class LambdaRuntime<Handler: LambdaRuntimeHandler> {
private let eventLoop: EventLoop
private let shutdownPromise: EventLoopPromise<Int>
private let logger: Logger
private let configuration: LambdaConfiguration
private let handlerProvider: (LambdaInitializationContext) -> EventLoopFuture<Handler>
private var state = State.idle {
willSet {
self.eventLoop.assertInEventLoop()
precondition(newValue.order > self.state.order, "invalid state \(newValue) after \(self.state.order)")
}
}
/// Create a new `LambdaRuntime`.
///
/// - parameters:
/// - handlerProvider: A provider of the ``Handler`` the `LambdaRuntime` will manage.
/// - eventLoop: An `EventLoop` to run the Lambda on.
/// - logger: A `Logger` to log the Lambda events.
@usableFromInline
convenience init(
handlerProvider: @escaping (LambdaInitializationContext) -> EventLoopFuture<Handler>,
eventLoop: EventLoop,
logger: Logger
) {
self.init(
handlerProvider: handlerProvider,
eventLoop: eventLoop,
logger: logger,
configuration: .init()
)
}
/// Create a new `LambdaRuntime`.
///
/// - parameters:
/// - handlerProvider: A provider of the ``Handler`` the `LambdaRuntime` will manage.
/// - eventLoop: An `EventLoop` to run the Lambda on.
/// - logger: A `Logger` to log the Lambda events.
init(
handlerProvider: @escaping (LambdaInitializationContext) -> EventLoopFuture<Handler>,
eventLoop: EventLoop,
logger: Logger,
configuration: LambdaConfiguration
) {
self.eventLoop = eventLoop
self.shutdownPromise = eventLoop.makePromise(of: Int.self)
self.logger = logger
self.configuration = configuration
self.handlerProvider = handlerProvider
}
deinit {
guard case .shutdown = self.state else {
preconditionFailure("invalid state \(self.state)")
}
}
/// The `Lifecycle` shutdown future.
///
/// - Returns: An `EventLoopFuture` that is fulfilled after the Lambda lifecycle has fully shutdown.
public var shutdownFuture: EventLoopFuture<Int> {
self.shutdownPromise.futureResult
}
/// Start the `LambdaRuntime`.
///
/// - Returns: An `EventLoopFuture` that is fulfilled after the Lambda hander has been created and initialized, and a first run has been scheduled.
public func start() -> EventLoopFuture<Void> {
if self.eventLoop.inEventLoop {
return self._start()
} else {
return self.eventLoop.flatSubmit { self._start() }
}
}
private func _start() -> EventLoopFuture<Void> {
// This method must be called on the `EventLoop` the `LambdaRuntime` has been initialized with.
self.eventLoop.assertInEventLoop()
logger.info("lambda runtime starting with \(self.configuration)")
self.state = .initializing
var logger = self.logger
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
let terminator = LambdaTerminator()
let runner = LambdaRunner(eventLoop: self.eventLoop, configuration: self.configuration)
let startupFuture = runner.initialize(
handlerProvider: self.handlerProvider,
logger: logger,
terminator: terminator
)
startupFuture.flatMap { handler -> EventLoopFuture<Result<Int, Error>> in
// after the startup future has succeeded, we have a handler that we can use
// to `run` the lambda.
let finishedPromise = self.eventLoop.makePromise(of: Int.self)
self.state = .active(runner, handler)
self.run(promise: finishedPromise)
return finishedPromise.futureResult.mapResult { $0 }
}.flatMap { runnerResult -> EventLoopFuture<Int> in
// after the lambda finishPromise has succeeded or failed we need to
// shutdown the handler
terminator.terminate(eventLoop: self.eventLoop).flatMapErrorThrowing { error in
// if, we had an error shutting down the handler, we want to concatenate it with
// the runner result
logger.error("Error shutting down handler: \(error)")
throw LambdaRuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult)
}.flatMapResult { _ -> Result<Int, Error> in
// we had no error shutting down the lambda. let's return the runner's result
runnerResult
}
}.always { _ in
// triggered when the Lambda has finished its last run or has a startup failure.
self.markShutdown()
}.cascade(to: self.shutdownPromise)
return startupFuture.map { _ in }
}
// MARK: - Private
/// Begin the `LambdaRuntime` shutdown.
public func shutdown() {
// make this method thread safe by dispatching onto the eventloop
self.eventLoop.execute {
let oldState = self.state
self.state = .shuttingdown
if case .active(let runner, _) = oldState {
runner.cancelWaitingForNextInvocation()
}
}
}
private func markShutdown() {
self.state = .shutdown
}
@inline(__always)
private func run(promise: EventLoopPromise<Int>) {
func _run(_ count: Int) {
switch self.state {
case .active(let runner, let handler):
if self.configuration.lifecycle.maxTimes > 0, count >= self.configuration.lifecycle.maxTimes {
return promise.succeed(count)
}
var mlogger = self.logger
mlogger[metadataKey: "lifecycleIteration"] = "\(count)"
let logger = mlogger
runner.run(handler: handler, logger: logger).whenComplete { result in
switch result {
case .success:
logger.log(level: .debug, "lambda invocation sequence completed successfully")
// recursive! per aws lambda runtime spec the polling requests are to be done one at a time
_run(count + 1)
case .failure(HTTPClient.Errors.cancelled):
if case .shuttingdown = self.state {
// if we ware shutting down, we expect to that the get next
// invocation request might have been cancelled. For this reason we
// succeed the promise here.
logger.log(level: .info, "lambda invocation sequence has been cancelled for shutdown")
return promise.succeed(count)
}
logger.log(level: .error, "lambda invocation sequence has been cancelled unexpectedly")
promise.fail(HTTPClient.Errors.cancelled)
case .failure(let error):
logger.log(level: .error, "lambda invocation sequence completed with error: \(error)")
promise.fail(error)
}
}
case .shuttingdown:
promise.succeed(count)
default:
preconditionFailure("invalid run state: \(self.state)")
}
}
_run(0)
}
private enum State {
case idle
case initializing
case active(LambdaRunner, any LambdaRuntimeHandler)
case shuttingdown
case shutdown
var order: Int {
switch self {
case .idle:
return 0
case .initializing:
return 1
case .active:
return 2
case .shuttingdown:
return 3
case .shutdown:
return 4
}
}
}
}
public enum LambdaRuntimeFactory {
/// Create a new `LambdaRuntime`.
///
/// - parameters:
/// - handlerType: The ``SimpleLambdaHandler`` type the `LambdaRuntime` shall create and manage.
/// - eventLoop: An `EventLoop` to run the Lambda on.
/// - logger: A `Logger` to log the Lambda events.
@inlinable
public static func makeRuntime<Handler: SimpleLambdaHandler>(
_ handlerType: Handler.Type,
eventLoop: any EventLoop,
logger: Logger
) -> LambdaRuntime<some ByteBufferLambdaHandler> {
LambdaRuntime<CodableSimpleLambdaHandler<Handler>>(
handlerProvider: CodableSimpleLambdaHandler<Handler>.makeHandler(context:),
eventLoop: eventLoop,
logger: logger
)
}
/// Create a new `LambdaRuntime`.
///
/// - parameters:
/// - handlerType: The ``LambdaHandler`` type the `LambdaRuntime` shall create and manage.
/// - eventLoop: An `EventLoop` to run the Lambda on.
/// - logger: A `Logger` to log the Lambda events.
@inlinable
public static func makeRuntime<Handler: LambdaHandler>(
_ handlerType: Handler.Type,
eventLoop: any EventLoop,
logger: Logger
) -> LambdaRuntime<some LambdaRuntimeHandler> {
LambdaRuntime<CodableLambdaHandler<Handler>>(
handlerProvider: CodableLambdaHandler<Handler>.makeHandler(context:),
eventLoop: eventLoop,
logger: logger
)
}
/// Create a new `LambdaRuntime`.
///
/// - parameters:
/// - handlerType: The ``EventLoopLambdaHandler`` type the `LambdaRuntime` shall create and manage.
/// - eventLoop: An `EventLoop` to run the Lambda on.
/// - logger: A `Logger` to log the Lambda events.
@inlinable
public static func makeRuntime<Handler: EventLoopLambdaHandler>(
_ handlerType: Handler.Type,
eventLoop: any EventLoop,
logger: Logger
) -> LambdaRuntime<some LambdaRuntimeHandler> {
LambdaRuntime<CodableEventLoopLambdaHandler<Handler>>(
handlerProvider: CodableEventLoopLambdaHandler<Handler>.makeHandler(context:),
eventLoop: eventLoop,
logger: logger
)
}
/// Create a new `LambdaRuntime`.
///
/// - parameters:
/// - handlerType: The ``ByteBufferLambdaHandler`` type the `LambdaRuntime` shall create and manage.
/// - eventLoop: An `EventLoop` to run the Lambda on.
/// - logger: A `Logger` to log the Lambda events.
@inlinable
public static func makeRuntime<Handler: ByteBufferLambdaHandler>(
_ handlerType: Handler.Type,
eventLoop: any EventLoop,
logger: Logger
) -> LambdaRuntime<some LambdaRuntimeHandler> {
LambdaRuntime<Handler>(
handlerProvider: Handler.makeHandler(context:),
eventLoop: eventLoop,
logger: logger
)
}
/// Create a new `LambdaRuntime`.
///
/// - parameters:
/// - handlerProvider: A provider of the ``LambdaRuntimeHandler`` the `LambdaRuntime` will manage.
/// - eventLoop: An `EventLoop` to run the Lambda on.
/// - logger: A `Logger` to log the Lambda events.
@inlinable
public static func makeRuntime<Handler: LambdaRuntimeHandler>(
handlerProvider: @escaping (LambdaInitializationContext) -> EventLoopFuture<Handler>,
eventLoop: any EventLoop,
logger: Logger
) -> LambdaRuntime<Handler> {
LambdaRuntime(
handlerProvider: handlerProvider,
eventLoop: eventLoop,
logger: logger
)
}
/// Create a new `LambdaRuntime`.
///
/// - parameters:
/// - handlerProvider: A provider of the ``LambdaRuntimeHandler`` the `LambdaRuntime` will manage.
/// - eventLoop: An `EventLoop` to run the Lambda on.
/// - logger: A `Logger` to log the Lambda events.
@inlinable
public static func makeRuntime<Handler: LambdaRuntimeHandler>(
handlerProvider: @escaping (LambdaInitializationContext) async throws -> Handler,
eventLoop: any EventLoop,
logger: Logger
) -> LambdaRuntime<Handler> {
LambdaRuntime(
handlerProvider: { context in
let promise = eventLoop.makePromise(of: Handler.self)
promise.completeWithTask {
try await handlerProvider(context)
}
return promise.futureResult
},
eventLoop: eventLoop,
logger: logger
)
}
}
/// This is safe since lambda runtime synchronizes by dispatching all methods to a single `EventLoop`
extension LambdaRuntime: @unchecked Sendable {}
@@ -1,155 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Logging
import NIOCore
import NIOHTTP1
/// An HTTP based client for AWS Runtime Engine. This encapsulates the RESTful methods exposed by the Runtime Engine:
/// * /runtime/invocation/next
/// * /runtime/invocation/response
/// * /runtime/invocation/error
/// * /runtime/init/error
struct LambdaRuntimeClient {
private let eventLoop: EventLoop
private let allocator = ByteBufferAllocator()
private let httpClient: HTTPClient
init(eventLoop: EventLoop, configuration: LambdaConfiguration.RuntimeEngine) {
self.eventLoop = eventLoop
self.httpClient = HTTPClient(eventLoop: eventLoop, configuration: configuration)
}
/// Requests invocation from the control plane.
func getNextInvocation(logger: Logger) -> EventLoopFuture<(InvocationMetadata, ByteBuffer)> {
let url = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix
logger.debug("requesting work from lambda runtime engine using \(url)")
return self.httpClient.get(url: url, headers: LambdaRuntimeClient.defaultHeaders).flatMapThrowing { response in
guard response.status == .ok else {
throw LambdaRuntimeError.badStatusCode(response.status)
}
let invocation = try InvocationMetadata(headers: response.headers)
guard let event = response.body else {
throw LambdaRuntimeError.noBody
}
return (invocation, event)
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
throw LambdaRuntimeError.upstreamError("timeout")
case HTTPClient.Errors.connectionResetByPeer:
throw LambdaRuntimeError.upstreamError("connectionResetByPeer")
default:
throw error
}
}
}
/// Reports a result to the Runtime Engine.
func reportResults(
logger: Logger,
invocation: InvocationMetadata,
result: Result<ByteBuffer?, Error>
) -> EventLoopFuture<Void> {
var url = Consts.invocationURLPrefix + "/" + invocation.requestID
var body: ByteBuffer?
let headers: HTTPHeaders
switch result {
case .success(let buffer):
url += Consts.postResponseURLSuffix
body = buffer
headers = LambdaRuntimeClient.defaultHeaders
case .failure(let error):
url += Consts.postErrorURLSuffix
let errorResponse = ErrorResponse(errorType: Consts.functionError, errorMessage: "\(error)")
let bytes = errorResponse.toJSONBytes()
body = self.allocator.buffer(capacity: bytes.count)
body!.writeBytes(bytes)
headers = LambdaRuntimeClient.errorHeaders
}
logger.debug("reporting results to lambda runtime engine using \(url)")
return self.httpClient.post(url: url, headers: headers, body: body).flatMapThrowing { response in
guard response.status == .accepted else {
throw LambdaRuntimeError.badStatusCode(response.status)
}
return ()
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
throw LambdaRuntimeError.upstreamError("timeout")
case HTTPClient.Errors.connectionResetByPeer:
throw LambdaRuntimeError.upstreamError("connectionResetByPeer")
default:
throw error
}
}
}
/// Reports an initialization error to the Runtime Engine.
func reportInitializationError(logger: Logger, error: Error) -> EventLoopFuture<Void> {
let url = Consts.postInitErrorURL
let errorResponse = ErrorResponse(errorType: Consts.initializationError, errorMessage: "\(error)")
let bytes = errorResponse.toJSONBytes()
var body = self.allocator.buffer(capacity: bytes.count)
body.writeBytes(bytes)
logger.warning("reporting initialization error to lambda runtime engine using \(url)")
return self.httpClient.post(url: url, headers: LambdaRuntimeClient.errorHeaders, body: body).flatMapThrowing {
response in
guard response.status == .accepted else {
throw LambdaRuntimeError.badStatusCode(response.status)
}
return ()
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
throw LambdaRuntimeError.upstreamError("timeout")
case HTTPClient.Errors.connectionResetByPeer:
throw LambdaRuntimeError.upstreamError("connectionResetByPeer")
default:
throw error
}
}
}
/// Cancels the current request, if one is running. Only needed for debugging purposes
func cancel() {
self.httpClient.cancel()
}
}
enum LambdaRuntimeError: Error {
case badStatusCode(HTTPResponseStatus)
case upstreamError(String)
case invocationMissingHeader(String)
case noBody
case json(Error)
case shutdownError(shutdownError: Error, runnerResult: Result<Int, Error>)
}
extension LambdaRuntimeClient {
static let defaultHeaders = HTTPHeaders([("user-agent", "Swift-Lambda/Unknown")])
/// These headers must be sent along an invocation or initialization error report
static let errorHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"lambda-runtime-function-error-type": "Unhandled",
]
/// These headers must be sent along an invocation or initialization error report
static let streamingHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"transfer-encoding": "streaming",
]
}
@@ -578,7 +578,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .POST,
uri: url,
headers: LambdaRuntimeClient.streamingHeaders
headers: NewLambdaRuntimeClient.streamingHeaders
)
context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
@@ -608,7 +608,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
"content-length": "\(byteBuffer?.readableBytes ?? 0)",
]
} else {
LambdaRuntimeClient.streamingHeaders
NewLambdaRuntimeClient.streamingHeaders
}
let httpRequest = HTTPRequestHead(
@@ -634,7 +634,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .GET,
uri: self.nextInvocationPath,
headers: LambdaRuntimeClient.defaultHeaders
headers: NewLambdaRuntimeClient.defaultHeaders
)
context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
@@ -650,7 +650,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .POST,
uri: url,
headers: LambdaRuntimeClient.errorHeaders
headers: NewLambdaRuntimeClient.errorHeaders
)
if self.reusableErrorBuffer == nil {
@@ -798,4 +798,21 @@ extension LambdaChannelHandler: ChannelInboundHandler {
}
}
private struct RequestCancelEvent {}
extension NewLambdaRuntimeClient {
static let defaultHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown"
]
/// These headers must be sent along an invocation or initialization error report
static let errorHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"lambda-runtime-function-error-type": "Unhandled",
]
/// These headers must be sent along an invocation or initialization error report
static let streamingHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"transfer-encoding": "streaming",
]
}
@@ -12,8 +12,8 @@
//
//===----------------------------------------------------------------------===//
struct NewLambdaRuntimeError: Error {
enum Code {
package struct NewLambdaRuntimeError: Error {
package enum Code {
case closingRuntimeClient
case connectionToControlPlaneLost
@@ -25,9 +25,18 @@ struct NewLambdaRuntimeError: Error {
case lostConnectionToControlPlane
case unexpectedStatusCodeForRequest
case nextInvocationMissingHeaderRequestID
case nextInvocationMissingHeaderDeadline
case nextInvocationMissingHeaderInvokeFuctionARN
}
var code: Code
var underlying: (any Error)?
package init(code: Code, underlying: (any Error)? = nil) {
self.code = code
self.underlying = underlying
}
package var code: Code
package var underlying: (any Error)?
}
@@ -1,148 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOConcurrencyHelpers
import NIOCore
/// Lambda terminator.
/// Utility to manage the lambda shutdown sequence.
public final class LambdaTerminator {
fileprivate typealias Handler = (EventLoop) -> EventLoopFuture<Void>
private var storage: Storage
init() {
self.storage = Storage()
}
/// Register a shutdown handler with the terminator.
///
/// - parameters:
/// - name: Display name for logging purposes.
/// - handler: The shutdown handler to call when terminating the Lambda.
/// Shutdown handlers are called in the reverse order of being registered.
///
/// - Returns: A ``RegistrationKey`` that can be used to de-register the handler when its no longer needed.
@discardableResult
public func register(name: String, handler: @escaping (EventLoop) -> EventLoopFuture<Void>) -> RegistrationKey {
let key = RegistrationKey()
self.storage.add(key: key, name: name, handler: handler)
return key
}
/// De-register a shutdown handler with the terminator.
///
/// - parameters:
/// - key: A ``RegistrationKey`` obtained from calling the register API.
public func deregister(_ key: RegistrationKey) {
self.storage.remove(key)
}
/// Begin the termination cycle.
/// Shutdown handlers are called in the reverse order of being registered.
///
/// - parameters:
/// - eventLoop: The `EventLoop` to run the termination on.
///
/// - Returns: An `EventLoopFuture` with the result of the termination cycle.
func terminate(eventLoop: EventLoop) -> EventLoopFuture<Void> {
func terminate(
_ iterator: IndexingIterator<[(name: String, handler: Handler)]>,
errors: [Error],
promise: EventLoopPromise<Void>
) {
var iterator = iterator
guard let handler = iterator.next()?.handler else {
if errors.isEmpty {
return promise.succeed(())
} else {
return promise.fail(TerminationError(underlying: errors))
}
}
handler(eventLoop).whenComplete { result in
var errors = errors
if case .failure(let error) = result {
errors.append(error)
}
return terminate(iterator, errors: errors, promise: promise)
}
}
// terminate in cascading, reverse order
let promise = eventLoop.makePromise(of: Void.self)
terminate(self.storage.handlers.reversed().makeIterator(), errors: [], promise: promise)
return promise.futureResult
}
}
extension LambdaTerminator {
/// Lambda terminator registration key.
public struct RegistrationKey: Hashable, CustomStringConvertible {
var value: String
init() {
// UUID basically
self.value = LambdaRequestID().uuidString
}
public var description: String {
self.value
}
}
}
extension LambdaTerminator {
fileprivate final class Storage {
private let lock: NIOLock
private var index: [RegistrationKey]
private var map: [RegistrationKey: (name: String, handler: Handler)]
init() {
self.lock = .init()
self.index = []
self.map = [:]
}
func add(key: RegistrationKey, name: String, handler: @escaping Handler) {
self.lock.withLock {
self.index.append(key)
self.map[key] = (name: name, handler: handler)
}
}
func remove(_ key: RegistrationKey) {
self.lock.withLock {
self.index = self.index.filter { $0 != key }
self.map[key] = nil
}
}
var handlers: [(name: String, handler: Handler)] {
self.lock.withLock {
self.index.compactMap { self.map[$0] }
}
}
}
}
extension LambdaTerminator {
struct TerminationError: Error {
let underlying: [Error]
}
}
// Ideally this would not be @unchecked Sendable, but Sendable checks do not understand locks
// We can transition this to an actor once we drop support for older Swift versions
extension LambdaTerminator: @unchecked Sendable {}
extension LambdaTerminator.Storage: @unchecked Sendable {}
@@ -1,131 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
// This functionality is designed to help with Lambda unit testing with XCTest
// For example:
//
// func test() {
// struct MyLambda: LambdaHandler {
// typealias Event = String
// typealias Output = String
//
// init(context: Lambda.InitializationContext) {}
//
// func handle(_ event: String, context: LambdaContext) async throws -> String {
// "echo" + event
// }
// }
//
// let input = UUID().uuidString
// var result: String?
// XCTAssertNoThrow(result = try Lambda.test(MyLambda.self, with: input))
// XCTAssertEqual(result, "echo" + input)
// }
import Dispatch
import Logging
import NIOCore
import NIOPosix
@testable import AWSLambdaRuntime
@testable import AWSLambdaRuntimeCore
extension Lambda {
public struct TestConfig {
public var requestID: String
public var traceID: String
public var invokedFunctionARN: String
public var timeout: DispatchTimeInterval
public init(
requestID: String = "\(DispatchTime.now().uptimeNanoseconds)",
traceID: String =
"Root=\(DispatchTime.now().uptimeNanoseconds);Parent=\(DispatchTime.now().uptimeNanoseconds);Sampled=1",
invokedFunctionARN: String =
"arn:aws:lambda:us-west-1:\(DispatchTime.now().uptimeNanoseconds):function:custom-runtime",
timeout: DispatchTimeInterval = .seconds(5)
) {
self.requestID = requestID
self.traceID = traceID
self.invokedFunctionARN = invokedFunctionARN
self.timeout = timeout
}
}
public static func test<Handler: SimpleLambdaHandler>(
_ handlerType: Handler.Type,
with event: Handler.Event,
using config: TestConfig = .init()
) async throws -> Handler.Output {
let context = Self.makeContext(config: config)
let handler = Handler()
return try await handler.handle(event, context: context.1)
}
public static func test<Handler: LambdaHandler>(
_ handlerType: Handler.Type,
with event: Handler.Event,
using config: TestConfig = .init()
) async throws -> Handler.Output {
let context = Self.makeContext(config: config)
let handler = try await Handler(context: context.0)
return try await handler.handle(event, context: context.1)
}
public static func test<Handler: EventLoopLambdaHandler>(
_ handlerType: Handler.Type,
with event: Handler.Event,
using config: TestConfig = .init()
) async throws -> Handler.Output {
let context = Self.makeContext(config: config)
let handler = try await Handler.makeHandler(context: context.0).get()
return try await handler.handle(event, context: context.1).get()
}
public static func test<Handler: ByteBufferLambdaHandler>(
_ handlerType: Handler.Type,
with buffer: ByteBuffer,
using config: TestConfig = .init()
) async throws -> ByteBuffer? {
let context = Self.makeContext(config: config)
let handler = try await Handler.makeHandler(context: context.0).get()
return try await handler.handle(buffer, context: context.1).get()
}
private static func makeContext(config: TestConfig) -> (LambdaInitializationContext, LambdaContext) {
let logger = Logger(label: "test")
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
try! eventLoopGroup.syncShutdownGracefully()
}
let eventLoop = eventLoopGroup.next()
let initContext = LambdaInitializationContext.__forTestsOnly(
logger: logger,
eventLoop: eventLoop
)
let context = LambdaContext.__forTestsOnly(
requestID: config.requestID,
traceID: config.traceID,
invokedFunctionARN: config.invokedFunctionARN,
timeout: config.timeout,
logger: logger,
eventLoop: eventLoop
)
return (initContext, context)
}
}
@@ -0,0 +1,17 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Logging
import NIOCore
import NIOPosix
@@ -1,80 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Logging
import NIO
import XCTest
@testable import AWSLambdaRuntimeCore
class DetachedTasksTest: XCTestCase {
actor Expectation {
var isFulfilled = false
func fulfill() {
self.isFulfilled = true
}
}
func testAwaitTasks() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let context = DetachedTasksContainer.Context(
eventLoop: eventLoopGroup.next(),
logger: Logger(label: "test")
)
let expectation = Expectation()
let container = DetachedTasksContainer(context: context)
await container.detached {
try! await Task.sleep(for: .milliseconds(200))
await expectation.fulfill()
}
try await container.awaitAll().get()
let isFulfilled = await expectation.isFulfilled
XCTAssert(isFulfilled)
}
func testAwaitChildrenTasks() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let context = DetachedTasksContainer.Context(
eventLoop: eventLoopGroup.next(),
logger: Logger(label: "test")
)
let expectation1 = Expectation()
let expectation2 = Expectation()
let container = DetachedTasksContainer(context: context)
await container.detached {
await container.detached {
try! await Task.sleep(for: .milliseconds(300))
await expectation1.fulfill()
}
try! await Task.sleep(for: .milliseconds(200))
await container.detached {
try! await Task.sleep(for: .milliseconds(100))
await expectation2.fulfill()
}
}
try await container.awaitAll().get()
let isFulfilled1 = await expectation1.isFulfilled
let isFulfilled2 = await expectation2.isFulfilled
XCTAssert(isFulfilled1)
XCTAssert(isFulfilled2)
}
}
@@ -1,352 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
import XCTest
@testable import AWSLambdaRuntimeCore
class LambdaHandlerTest: XCTestCase {
// MARK: - SimpleLambdaHandler
func testBootstrapSimpleNoInit() {
let server = MockLambdaServer(behavior: Behavior())
var port: Int?
XCTAssertNoThrow(port = try server.start().wait())
guard let port else { return XCTFail("Expected the server to have started") }
defer { XCTAssertNoThrow(try server.stop().wait()) }
struct TestBootstrapHandler: SimpleLambdaHandler {
func handle(_ event: String, context: LambdaContext) async throws -> String {
event
}
}
let maxTimes = Int.random(in: 10...20)
let configuration = LambdaConfiguration(
lifecycle: .init(maxTimes: maxTimes),
runtimeEngine: .init(address: "127.0.0.1:\(port)")
)
let result = Lambda.run(configuration: configuration, handlerType: TestBootstrapHandler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: maxTimes)
}
func testBootstrapSimpleInit() {
let server = MockLambdaServer(behavior: Behavior())
var port: Int?
XCTAssertNoThrow(port = try server.start().wait())
guard let port else { return XCTFail("Expected the server to have started") }
defer { XCTAssertNoThrow(try server.stop().wait()) }
struct TestBootstrapHandler: SimpleLambdaHandler {
var initialized = false
init() {
XCTAssertFalse(self.initialized)
self.initialized = true
}
func handle(_ event: String, context: LambdaContext) async throws -> String {
event
}
}
let maxTimes = Int.random(in: 10...20)
let configuration = LambdaConfiguration(
lifecycle: .init(maxTimes: maxTimes),
runtimeEngine: .init(address: "127.0.0.1:\(port)")
)
let result = Lambda.run(configuration: configuration, handlerType: TestBootstrapHandler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: maxTimes)
}
// MARK: - LambdaHandler
func testBootstrapSuccess() {
let server = MockLambdaServer(behavior: Behavior())
var port: Int?
XCTAssertNoThrow(port = try server.start().wait())
guard let port else { return XCTFail("Expected the server to have started") }
defer { XCTAssertNoThrow(try server.stop().wait()) }
struct TestBootstrapHandler: LambdaHandler {
var initialized = false
init(context: LambdaInitializationContext) async throws {
XCTAssertFalse(self.initialized)
try await Task.sleep(nanoseconds: 100 * 1000 * 1000) // 0.1 seconds
self.initialized = true
}
func handle(_ event: String, context: LambdaContext) async throws -> String {
event
}
}
let maxTimes = Int.random(in: 10...20)
let configuration = LambdaConfiguration(
lifecycle: .init(maxTimes: maxTimes),
runtimeEngine: .init(address: "127.0.0.1:\(port)")
)
let result = Lambda.run(configuration: configuration, handlerType: TestBootstrapHandler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: maxTimes)
}
func testBootstrapFailure() {
let server = MockLambdaServer(behavior: FailedBootstrapBehavior())
var port: Int?
XCTAssertNoThrow(port = try server.start().wait())
guard let port else { return XCTFail("Expected the server to have started") }
defer { XCTAssertNoThrow(try server.stop().wait()) }
struct TestBootstrapHandler: LambdaHandler {
var initialized = false
init(context: LambdaInitializationContext) async throws {
XCTAssertFalse(self.initialized)
try await Task.sleep(nanoseconds: 100 * 1000 * 1000) // 0.1 seconds
throw TestError("kaboom")
}
func handle(_ event: String, context: LambdaContext) async throws {
XCTFail("How can this be called if init failed")
}
}
let maxTimes = Int.random(in: 10...20)
let configuration = LambdaConfiguration(
lifecycle: .init(maxTimes: maxTimes),
runtimeEngine: .init(address: "127.0.0.1:\(port)")
)
let result = Lambda.run(configuration: configuration, handlerType: TestBootstrapHandler.self)
assertLambdaRuntimeResult(result, shouldFailWithError: TestError("kaboom"))
}
func testHandlerSuccess() {
let server = MockLambdaServer(behavior: Behavior())
var port: Int?
XCTAssertNoThrow(port = try server.start().wait())
guard let port else { return XCTFail("Expected the server to have started") }
defer { XCTAssertNoThrow(try server.stop().wait()) }
struct Handler: SimpleLambdaHandler {
func handle(_ event: String, context: LambdaContext) async throws -> String {
event
}
}
let maxTimes = Int.random(in: 1...10)
let configuration = LambdaConfiguration(
lifecycle: .init(maxTimes: maxTimes),
runtimeEngine: .init(address: "127.0.0.1:\(port)")
)
let result = Lambda.run(configuration: configuration, handlerType: Handler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: maxTimes)
}
func testVoidHandlerSuccess() {
let server = MockLambdaServer(behavior: Behavior(result: .success(nil)))
var port: Int?
XCTAssertNoThrow(port = try server.start().wait())
guard let port else { return XCTFail("Expected the server to have started") }
defer { XCTAssertNoThrow(try server.stop().wait()) }
struct Handler: SimpleLambdaHandler {
func handle(_ event: String, context: LambdaContext) async throws {}
}
let maxTimes = Int.random(in: 1...10)
let configuration = LambdaConfiguration(
lifecycle: .init(maxTimes: maxTimes),
runtimeEngine: .init(address: "127.0.0.1:\(port)")
)
let result = Lambda.run(configuration: configuration, handlerType: Handler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: maxTimes)
}
func testHandlerFailure() {
let server = MockLambdaServer(behavior: Behavior(result: .failure(TestError("boom"))))
var port: Int?
XCTAssertNoThrow(port = try server.start().wait())
guard let port else { return XCTFail("Expected the server to have started") }
defer { XCTAssertNoThrow(try server.stop().wait()) }
struct Handler: SimpleLambdaHandler {
func handle(_ event: String, context: LambdaContext) async throws -> String {
throw TestError("boom")
}
}
let maxTimes = Int.random(in: 1...10)
let configuration = LambdaConfiguration(
lifecycle: .init(maxTimes: maxTimes),
runtimeEngine: .init(address: "127.0.0.1:\(port)")
)
let result = Lambda.run(configuration: configuration, handlerType: Handler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: maxTimes)
}
// MARK: - EventLoopLambdaHandler
func testEventLoopSuccess() {
let server = MockLambdaServer(behavior: Behavior())
var port: Int?
XCTAssertNoThrow(port = try server.start().wait())
guard let port else { return XCTFail("Expected the server to have started") }
defer { XCTAssertNoThrow(try server.stop().wait()) }
struct Handler: EventLoopLambdaHandler {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeSucceededFuture(Handler())
}
func handle(_ event: String, context: LambdaContext) -> EventLoopFuture<String> {
context.eventLoop.makeSucceededFuture(event)
}
}
let maxTimes = Int.random(in: 1...10)
let configuration = LambdaConfiguration(
lifecycle: .init(maxTimes: maxTimes),
runtimeEngine: .init(address: "127.0.0.1:\(port)")
)
let result = Lambda.run(configuration: configuration, handlerType: Handler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: maxTimes)
}
func testVoidEventLoopSuccess() {
let server = MockLambdaServer(behavior: Behavior(result: .success(nil)))
var port: Int?
XCTAssertNoThrow(port = try server.start().wait())
guard let port else { return XCTFail("Expected the server to have started") }
defer { XCTAssertNoThrow(try server.stop().wait()) }
struct Handler: EventLoopLambdaHandler {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeSucceededFuture(Handler())
}
func handle(_ event: String, context: LambdaContext) -> EventLoopFuture<Void> {
context.eventLoop.makeSucceededFuture(())
}
}
let maxTimes = Int.random(in: 1...10)
let configuration = LambdaConfiguration(
lifecycle: .init(maxTimes: maxTimes),
runtimeEngine: .init(address: "127.0.0.1:\(port)")
)
let result = Lambda.run(configuration: configuration, handlerType: Handler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: maxTimes)
}
func testEventLoopFailure() {
let server = MockLambdaServer(behavior: Behavior(result: .failure(TestError("boom"))))
var port: Int?
XCTAssertNoThrow(port = try server.start().wait())
guard let port else { return XCTFail("Expected the server to have started") }
defer { XCTAssertNoThrow(try server.stop().wait()) }
struct Handler: EventLoopLambdaHandler {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeSucceededFuture(Handler())
}
func handle(_ event: String, context: LambdaContext) -> EventLoopFuture<String> {
context.eventLoop.makeFailedFuture(TestError("boom"))
}
}
let maxTimes = Int.random(in: 1...10)
let configuration = LambdaConfiguration(
lifecycle: .init(maxTimes: maxTimes),
runtimeEngine: .init(address: "127.0.0.1:\(port)")
)
let result = Lambda.run(configuration: configuration, handlerType: Handler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: maxTimes)
}
func testEventLoopBootstrapFailure() {
let server = MockLambdaServer(behavior: FailedBootstrapBehavior())
var port: Int?
XCTAssertNoThrow(port = try server.start().wait())
guard let port else { return XCTFail("Expected the server to have started") }
defer { XCTAssertNoThrow(try server.stop().wait()) }
struct Handler: EventLoopLambdaHandler {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeFailedFuture(TestError("kaboom"))
}
func handle(_ event: String, context: LambdaContext) -> EventLoopFuture<String> {
XCTFail("Must never be called")
return context.eventLoop.makeFailedFuture(TestError("boom"))
}
}
let configuration = LambdaConfiguration(runtimeEngine: .init(address: "127.0.0.1:\(port)"))
let result = Lambda.run(configuration: configuration, handlerType: Handler.self)
assertLambdaRuntimeResult(result, shouldFailWithError: TestError("kaboom"))
}
}
private struct Behavior: LambdaServerBehavior {
let requestId: String
let event: String
let result: Result<String?, TestError>
init(
requestId: String = UUID().uuidString,
event: String = "hello",
result: Result<String?, TestError> = .success("hello")
) {
self.requestId = requestId
self.event = event
self.result = result
}
func getInvocation() -> GetInvocationResult {
.success((requestId: self.requestId, event: self.event))
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTAssertEqual(self.requestId, requestId, "expecting requestId to match")
switch self.result {
case .success(let expected):
XCTAssertEqual(expected, response, "expecting response to match")
return .success(())
case .failure:
XCTFail("unexpected to fail, but succeeded with: \(response ?? "undefined")")
return .failure(.internalServerError)
}
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTAssertEqual(self.requestId, requestId, "expecting requestId to match")
switch self.result {
case .success:
XCTFail("unexpected to succeed, but failed with: \(error)")
return .failure(.internalServerError)
case .failure(let expected):
XCTAssertEqual(expected.description, error.errorMessage, "expecting error to match")
return .success(())
}
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report init error")
return .failure(.internalServerError)
}
}
@@ -1,52 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import AWSLambdaRuntimeCore
import NIOCore
import XCTest
struct EchoHandler: EventLoopLambdaHandler {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<EchoHandler> {
context.eventLoop.makeSucceededFuture(EchoHandler())
}
func handle(_ event: String, context: LambdaContext) -> EventLoopFuture<String> {
context.eventLoop.makeSucceededFuture(event)
}
}
struct StartupError: Error {}
struct StartupErrorHandler: EventLoopLambdaHandler {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<StartupErrorHandler> {
context.eventLoop.makeFailedFuture(StartupError())
}
func handle(_ event: String, context: LambdaContext) -> EventLoopFuture<String> {
XCTFail("Must never be called")
return context.eventLoop.makeSucceededFuture(event)
}
}
struct RuntimeError: Error {}
struct RuntimeErrorHandler: EventLoopLambdaHandler {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<RuntimeErrorHandler> {
context.eventLoop.makeSucceededFuture(RuntimeErrorHandler())
}
func handle(_ event: String, context: LambdaContext) -> EventLoopFuture<Void> {
context.eventLoop.makeFailedFuture(RuntimeError())
}
}
@@ -1,217 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
import XCTest
@testable import AWSLambdaRuntimeCore
class LambdaRunnerTest: XCTestCase {
func testSuccess() {
struct Behavior: LambdaServerBehavior {
let requestId = UUID().uuidString
let event = "hello"
func getInvocation() -> GetInvocationResult {
.success((self.requestId, self.event))
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTAssertEqual(self.requestId, requestId, "expecting requestId to match")
XCTAssertEqual(self.event, response, "expecting response to match")
return .success(())
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report error")
return .failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report init error")
return .failure(.internalServerError)
}
}
XCTAssertNoThrow(try runLambda(behavior: Behavior(), handlerType: EchoHandler.self))
}
func testFailure() {
struct Behavior: LambdaServerBehavior {
let requestId = UUID().uuidString
func getInvocation() -> GetInvocationResult {
.success((requestId: self.requestId, event: "hello"))
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should report error")
return .failure(.internalServerError)
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTAssertEqual(self.requestId, requestId, "expecting requestId to match")
XCTAssertEqual(String(describing: RuntimeError()), error.errorMessage, "expecting error to match")
return .success(())
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report init error")
return .failure(.internalServerError)
}
}
XCTAssertNoThrow(try runLambda(behavior: Behavior(), handlerType: RuntimeErrorHandler.self))
}
func testCustomProviderSuccess() {
struct Behavior: LambdaServerBehavior {
let requestId = UUID().uuidString
let event = "hello"
func getInvocation() -> GetInvocationResult {
.success((self.requestId, self.event))
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTAssertEqual(self.requestId, requestId, "expecting requestId to match")
XCTAssertEqual(self.event, response, "expecting response to match")
return .success(())
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report error")
return .failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report init error")
return .failure(.internalServerError)
}
}
XCTAssertNoThrow(
try runLambda(
behavior: Behavior(),
handlerProvider: { context in
context.eventLoop.makeSucceededFuture(EchoHandler())
}
)
)
}
func testCustomProviderFailure() {
struct Behavior: LambdaServerBehavior {
let requestId = UUID().uuidString
let event = "hello"
func getInvocation() -> GetInvocationResult {
.success((self.requestId, self.event))
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should not report processing")
return .failure(.internalServerError)
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report error")
return .failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTAssertEqual(String(describing: CustomError()), error.errorMessage, "expecting error to match")
return .success(())
}
}
struct CustomError: Error {}
XCTAssertThrowsError(
try runLambda(
behavior: Behavior(),
handlerProvider: { context -> EventLoopFuture<EchoHandler> in
context.eventLoop.makeFailedFuture(CustomError())
}
)
) { error in
XCTAssertNotNil(error as? CustomError, "expecting error to match")
}
}
func testCustomAsyncProviderSuccess() {
struct Behavior: LambdaServerBehavior {
let requestId = UUID().uuidString
let event = "hello"
func getInvocation() -> GetInvocationResult {
.success((self.requestId, self.event))
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTAssertEqual(self.requestId, requestId, "expecting requestId to match")
XCTAssertEqual(self.event, response, "expecting response to match")
return .success(())
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report error")
return .failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report init error")
return .failure(.internalServerError)
}
}
XCTAssertNoThrow(
try runLambda(
behavior: Behavior(),
handlerProvider: { _ async throws -> EchoHandler in
EchoHandler()
}
)
)
}
func testCustomAsyncProviderFailure() {
struct Behavior: LambdaServerBehavior {
let requestId = UUID().uuidString
let event = "hello"
func getInvocation() -> GetInvocationResult {
.success((self.requestId, self.event))
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should not report processing")
return .failure(.internalServerError)
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report error")
return .failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTAssertEqual(String(describing: CustomError()), error.errorMessage, "expecting error to match")
return .success(())
}
}
struct CustomError: Error {}
XCTAssertThrowsError(
try runLambda(
behavior: Behavior(),
handlerProvider: { _ async throws -> EchoHandler in
throw CustomError()
}
)
) { error in
XCTAssertNotNil(error as? CustomError, "expecting error to match")
}
}
}
@@ -1,380 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Logging
import NIOCore
import NIOFoundationCompat
import NIOHTTP1
import NIOPosix
import NIOTestUtils
import XCTest
@testable import AWSLambdaRuntimeCore
class LambdaRuntimeClientTest: XCTestCase {
func testSuccess() {
let behavior = Behavior()
XCTAssertNoThrow(try runLambda(behavior: behavior, handlerType: EchoHandler.self))
XCTAssertEqual(behavior.state, 6)
}
func testFailure() {
let behavior = Behavior()
XCTAssertNoThrow(try runLambda(behavior: behavior, handlerType: RuntimeErrorHandler.self))
XCTAssertEqual(behavior.state, 10)
}
func testStartupFailure() {
let behavior = Behavior()
XCTAssertThrowsError(try runLambda(behavior: behavior, handlerType: StartupErrorHandler.self)) {
XCTAssert($0 is StartupError)
}
XCTAssertEqual(behavior.state, 1)
}
func testGetInvocationServerInternalError() {
struct Behavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
.failure(.internalServerError)
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should not report results")
return .failure(.internalServerError)
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report error")
return .failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report init error")
return .failure(.internalServerError)
}
}
XCTAssertThrowsError(try runLambda(behavior: Behavior(), handlerType: EchoHandler.self)) {
XCTAssertEqual($0 as? LambdaRuntimeError, .badStatusCode(.internalServerError))
}
}
func testGetInvocationServerNoBodyError() {
struct Behavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
.success(("1", ""))
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should not report results")
return .failure(.internalServerError)
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report error")
return .failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report init error")
return .failure(.internalServerError)
}
}
XCTAssertThrowsError(try runLambda(behavior: Behavior(), handlerType: EchoHandler.self)) {
XCTAssertEqual($0 as? LambdaRuntimeError, .noBody)
}
}
func testGetInvocationServerMissingHeaderRequestIDError() {
struct Behavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
// no request id -> no context
.success(("", "hello"))
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should not report results")
return .failure(.internalServerError)
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report error")
return .failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report init error")
return .failure(.internalServerError)
}
}
XCTAssertThrowsError(try runLambda(behavior: Behavior(), handlerType: EchoHandler.self)) {
XCTAssertEqual($0 as? LambdaRuntimeError, .invocationMissingHeader(AmazonHeaders.requestID))
}
}
func testProcessResponseInternalServerError() {
struct Behavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
.success((requestId: "1", event: "event"))
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
.failure(.internalServerError)
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report error")
return .failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report init error")
return .failure(.internalServerError)
}
}
XCTAssertThrowsError(try runLambda(behavior: Behavior(), handlerType: EchoHandler.self)) {
XCTAssertEqual($0 as? LambdaRuntimeError, .badStatusCode(.internalServerError))
}
}
func testProcessErrorInternalServerError() {
struct Behavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
.success((requestId: "1", event: "event"))
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should not report results")
return .failure(.internalServerError)
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
.failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report init error")
return .failure(.internalServerError)
}
}
XCTAssertThrowsError(try runLambda(behavior: Behavior(), handlerType: RuntimeErrorHandler.self)) {
XCTAssertEqual($0 as? LambdaRuntimeError, .badStatusCode(.internalServerError))
}
}
func testProcessInitErrorOnBootstrapFailure() {
struct Behavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
XCTFail("should not get invocation")
return .failure(.internalServerError)
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should not report results")
return .failure(.internalServerError)
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report error")
return .failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
.failure(.internalServerError)
}
}
XCTAssertThrowsError(try runLambda(behavior: Behavior(), handlerType: StartupErrorHandler.self)) {
XCTAssert($0 is StartupError)
}
}
func testErrorResponseToJSON() {
// we want to check if quotes and back slashes are correctly escaped
let windowsError = ErrorResponse(
errorType: "error",
errorMessage: #"underlyingError: "An error with a windows path C:\Windows\""#
)
let windowsBytes = windowsError.toJSONBytes()
XCTAssertEqual(
#"{"errorType":"error","errorMessage":"underlyingError: \"An error with a windows path C:\\Windows\\\""}"#,
String(decoding: windowsBytes, as: Unicode.UTF8.self)
)
// we want to check if unicode sequences work
let emojiError = ErrorResponse(
errorType: "error",
errorMessage: #"🥑👨👩👧👧👩👩👧👧👨👨👧"#
)
let emojiBytes = emojiError.toJSONBytes()
XCTAssertEqual(
#"{"errorType":"error","errorMessage":"🥑👨‍👩‍👧‍👧👩‍👩‍👧‍👧👨‍👨‍👧"}"#,
String(decoding: emojiBytes, as: Unicode.UTF8.self)
)
}
func testInitializationErrorReport() {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let server = NIOHTTP1TestServer(group: eventLoopGroup)
defer { XCTAssertNoThrow(try server.stop()) }
let logger = Logger(label: "TestLogger")
let client = LambdaRuntimeClient(
eventLoop: eventLoopGroup.next(),
configuration: .init(address: "127.0.0.1:\(server.serverPort)")
)
let result = client.reportInitializationError(logger: logger, error: TestError("boom"))
var inboundHeader: HTTPServerRequestPart?
XCTAssertNoThrow(inboundHeader = try server.readInbound())
guard case .head(let head) = try? XCTUnwrap(inboundHeader) else {
XCTFail("Expected to get a head first")
return
}
XCTAssertEqual(head.headers["lambda-runtime-function-error-type"], ["Unhandled"])
XCTAssertEqual(head.headers["user-agent"], ["Swift-Lambda/Unknown"])
var inboundBody: HTTPServerRequestPart?
XCTAssertNoThrow(inboundBody = try server.readInbound())
guard case .body(let body) = try? XCTUnwrap(inboundBody) else {
XCTFail("Expected body after head")
return
}
XCTAssertEqual(try JSONDecoder().decode(ErrorResponse.self, from: body).errorMessage, "boom")
XCTAssertEqual(try server.readInbound(), .end(nil))
XCTAssertNoThrow(try server.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .accepted))))
XCTAssertNoThrow(try server.writeOutbound(.end(nil)))
XCTAssertNoThrow(try result.wait())
}
func testInvocationErrorReport() {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let server = NIOHTTP1TestServer(group: eventLoopGroup)
defer { XCTAssertNoThrow(try server.stop()) }
let logger = Logger(label: "TestLogger")
let client = LambdaRuntimeClient(
eventLoop: eventLoopGroup.next(),
configuration: .init(address: "127.0.0.1:\(server.serverPort)")
)
let header = HTTPHeaders([
(AmazonHeaders.requestID, "test"),
(AmazonHeaders.deadline, String(Date(timeIntervalSinceNow: 60).millisSinceEpoch)),
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime"),
(AmazonHeaders.traceID, "Root=\(AmazonHeaders.generateXRayTraceID());Sampled=1"),
])
var inv: InvocationMetadata?
XCTAssertNoThrow(inv = try InvocationMetadata(headers: header))
guard let invocation = inv else { return }
let result = client.reportResults(
logger: logger,
invocation: invocation,
result: Result.failure(TestError("boom"))
)
var inboundHeader: HTTPServerRequestPart?
XCTAssertNoThrow(inboundHeader = try server.readInbound())
guard case .head(let head) = try? XCTUnwrap(inboundHeader) else {
XCTFail("Expected to get a head first")
return
}
XCTAssertEqual(head.headers["lambda-runtime-function-error-type"], ["Unhandled"])
XCTAssertEqual(head.headers["user-agent"], ["Swift-Lambda/Unknown"])
var inboundBody: HTTPServerRequestPart?
XCTAssertNoThrow(inboundBody = try server.readInbound())
guard case .body(let body) = try? XCTUnwrap(inboundBody) else {
XCTFail("Expected body after head")
return
}
XCTAssertEqual(try JSONDecoder().decode(ErrorResponse.self, from: body).errorMessage, "boom")
XCTAssertEqual(try server.readInbound(), .end(nil))
XCTAssertNoThrow(try server.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .accepted))))
XCTAssertNoThrow(try server.writeOutbound(.end(nil)))
XCTAssertNoThrow(try result.wait())
}
func testInvocationSuccessResponse() {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let server = NIOHTTP1TestServer(group: eventLoopGroup)
defer { XCTAssertNoThrow(try server.stop()) }
let logger = Logger(label: "TestLogger")
let client = LambdaRuntimeClient(
eventLoop: eventLoopGroup.next(),
configuration: .init(address: "127.0.0.1:\(server.serverPort)")
)
let header = HTTPHeaders([
(AmazonHeaders.requestID, "test"),
(AmazonHeaders.deadline, String(Date(timeIntervalSinceNow: 60).millisSinceEpoch)),
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime"),
(AmazonHeaders.traceID, "Root=\(AmazonHeaders.generateXRayTraceID());Sampled=1"),
])
var inv: InvocationMetadata?
XCTAssertNoThrow(inv = try InvocationMetadata(headers: header))
guard let invocation = inv else { return }
let result = client.reportResults(logger: logger, invocation: invocation, result: Result.success(nil))
var inboundHeader: HTTPServerRequestPart?
XCTAssertNoThrow(inboundHeader = try server.readInbound())
guard case .head(let head) = try? XCTUnwrap(inboundHeader) else {
XCTFail("Expected to get a head first")
return
}
XCTAssertFalse(head.headers.contains(name: "lambda-runtime-function-error-type"))
XCTAssertEqual(head.headers["user-agent"], ["Swift-Lambda/Unknown"])
XCTAssertEqual(try server.readInbound(), .end(nil))
XCTAssertNoThrow(try server.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .accepted))))
XCTAssertNoThrow(try server.writeOutbound(.end(nil)))
XCTAssertNoThrow(try result.wait())
}
class Behavior: LambdaServerBehavior {
var state = 0
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
self.state += 1
return .success(())
}
func getInvocation() -> GetInvocationResult {
self.state += 2
return .success(("1", "hello"))
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
self.state += 4
return .success(())
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
self.state += 8
return .success(())
}
}
}
@@ -1,158 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Logging
import NIOCore
import NIOHTTP1
import NIOPosix
import XCTest
@testable import AWSLambdaRuntimeCore
class LambdaRuntimeTest: XCTestCase {
func testShutdownFutureIsFulfilledWithStartUpError() {
let server = MockLambdaServer(behavior: FailedBootstrapBehavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let eventLoop = eventLoopGroup.next()
let logger = Logger(label: "TestLogger")
let runtime = LambdaRuntimeFactory.makeRuntime(StartupErrorHandler.self, eventLoop: eventLoop, logger: logger)
// eventLoop.submit in this case returns an EventLoopFuture<EventLoopFuture<ByteBufferHandler>>
// which is why we need `wait().wait()`
XCTAssertThrowsError(try eventLoop.flatSubmit { runtime.start() }.wait()) {
XCTAssert($0 is StartupError)
}
XCTAssertThrowsError(try runtime.shutdownFuture.wait()) {
XCTAssert($0 is StartupError)
}
}
func testShutdownIsCalledWhenLambdaShutsdown() {
let server = MockLambdaServer(behavior: BadBehavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let eventLoop = eventLoopGroup.next()
let logger = Logger(label: "TestLogger")
let runtime = LambdaRuntimeFactory.makeRuntime(EchoHandler.self, eventLoop: eventLoop, logger: logger)
XCTAssertNoThrow(_ = try eventLoop.flatSubmit { runtime.start() }.wait())
XCTAssertThrowsError(try runtime.shutdownFuture.wait()) {
XCTAssertEqual(.badStatusCode(HTTPResponseStatus.internalServerError), $0 as? LambdaRuntimeError)
}
}
func testLambdaResultIfShutsdownIsUnclean() {
let server = MockLambdaServer(behavior: BadBehavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
struct ShutdownError: Error {
let description: String
}
struct ShutdownErrorHandler: EventLoopLambdaHandler {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<ShutdownErrorHandler> {
// register shutdown operation
context.terminator.register(
name: "test 1",
handler: { eventLoop in
eventLoop.makeFailedFuture(ShutdownError(description: "error 1"))
}
)
context.terminator.register(
name: "test 2",
handler: { eventLoop in
eventLoop.makeSucceededVoidFuture()
}
)
context.terminator.register(
name: "test 3",
handler: { eventLoop in
eventLoop.makeFailedFuture(ShutdownError(description: "error 2"))
}
)
context.terminator.register(
name: "test 4",
handler: { eventLoop in
eventLoop.makeSucceededVoidFuture()
}
)
context.terminator.register(
name: "test 5",
handler: { eventLoop in
eventLoop.makeFailedFuture(ShutdownError(description: "error 3"))
}
)
return context.eventLoop.makeSucceededFuture(ShutdownErrorHandler())
}
func handle(_ event: String, context: LambdaContext) -> EventLoopFuture<Void> {
context.eventLoop.makeSucceededVoidFuture()
}
}
let eventLoop = eventLoopGroup.next()
let logger = Logger(label: "TestLogger")
let runtime = LambdaRuntimeFactory.makeRuntime(ShutdownErrorHandler.self, eventLoop: eventLoop, logger: logger)
XCTAssertNoThrow(try eventLoop.flatSubmit { runtime.start() }.wait())
XCTAssertThrowsError(try runtime.shutdownFuture.wait()) { error in
guard case LambdaRuntimeError.shutdownError(let shutdownError, .failure(let runtimeError)) = error else {
XCTFail("Unexpected error: \(error)")
return
}
XCTAssertEqual(
shutdownError as? LambdaTerminator.TerminationError,
LambdaTerminator.TerminationError(underlying: [
ShutdownError(description: "error 3"),
ShutdownError(description: "error 2"),
ShutdownError(description: "error 1"),
])
)
XCTAssertEqual(runtimeError as? LambdaRuntimeError, .badStatusCode(.internalServerError))
}
}
}
struct BadBehavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
.failure(.internalServerError)
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should not report a response")
return .failure(.internalServerError)
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report an error")
return .failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report an error")
return .failure(.internalServerError)
}
}
@@ -1,382 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Logging
import NIOCore
import NIOPosix
import XCTest
@testable import AWSLambdaRuntimeCore
class LambdaTest: XCTestCase {
func testSuccess() {
let server = MockLambdaServer(behavior: Behavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let maxTimes = Int.random(in: 10...20)
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: EchoHandler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: maxTimes)
}
func testFailure() {
let server = MockLambdaServer(behavior: Behavior(result: .failure(RuntimeError())))
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let maxTimes = Int.random(in: 10...20)
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: RuntimeErrorHandler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: maxTimes)
}
func testBootstrapFailure() {
let server = MockLambdaServer(behavior: FailedBootstrapBehavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let result = Lambda.run(configuration: .init(), handlerType: StartupErrorHandler.self)
assertLambdaRuntimeResult(result, shouldFailWithError: StartupError())
}
func testBootstrapFailureAndReportErrorFailure() {
struct Behavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
XCTFail("should not get invocation")
return .failure(.internalServerError)
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should not report a response")
return .failure(.internalServerError)
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report an error")
return .failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
.failure(.internalServerError)
}
}
let server = MockLambdaServer(behavior: FailedBootstrapBehavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let result = Lambda.run(configuration: .init(), handlerType: StartupErrorHandler.self)
assertLambdaRuntimeResult(result, shouldFailWithError: StartupError())
}
func testStartStopInDebugMode() {
let server = MockLambdaServer(behavior: Behavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let signal = Signal.ALRM
let maxTimes = 1000
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes, stopSignal: signal))
DispatchQueue(label: "test").async {
// we need to schedule the signal before we start the long running `Lambda.run`, since
// `Lambda.run` will block the main thread.
usleep(100_000)
kill(getpid(), signal.rawValue) // ignore-unacceptable-language
}
let result = Lambda.run(configuration: configuration, handlerType: EchoHandler.self)
switch result {
case .success(let invocationCount):
XCTAssertGreaterThan(invocationCount, 0, "should have stopped before any request made")
XCTAssertLessThan(invocationCount, maxTimes, "should have stopped before \(maxTimes)")
case .failure(let error):
XCTFail("Unexpected error: \(error)")
}
}
func testTimeout() {
let timeout: Int64 = 100
let server = MockLambdaServer(behavior: Behavior(requestId: "timeout", event: "\(timeout * 2)"))
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let configuration = LambdaConfiguration(
lifecycle: .init(maxTimes: 1),
runtimeEngine: .init(requestTimeout: .milliseconds(timeout))
)
let result = Lambda.run(configuration: configuration, handlerType: EchoHandler.self)
assertLambdaRuntimeResult(result, shouldFailWithError: LambdaRuntimeError.upstreamError("timeout"))
}
func testDisconnect() {
let server = MockLambdaServer(behavior: Behavior(requestId: "disconnect"))
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: 1))
let result = Lambda.run(configuration: configuration, handlerType: EchoHandler.self)
assertLambdaRuntimeResult(
result,
shouldFailWithError: LambdaRuntimeError.upstreamError("connectionResetByPeer")
)
}
func testBigEvent() {
let event = String(repeating: "*", count: 104_448)
let server = MockLambdaServer(behavior: Behavior(event: event, result: .success(event)))
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: 1))
let result = Lambda.run(configuration: configuration, handlerType: EchoHandler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: 1)
}
func testKeepAliveServer() {
let server = MockLambdaServer(behavior: Behavior(), keepAlive: true)
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let maxTimes = 10
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: EchoHandler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: maxTimes)
}
func testNoKeepAliveServer() {
let server = MockLambdaServer(behavior: Behavior(), keepAlive: false)
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let maxTimes = 10
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: EchoHandler.self)
assertLambdaRuntimeResult(result, shouldHaveRun: maxTimes)
}
func testServerFailure() {
let server = MockLambdaServer(behavior: Behavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
struct Behavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
.failure(.internalServerError)
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
.failure(.internalServerError)
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
.failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report init error")
return .failure(.internalServerError)
}
}
let result = Lambda.run(configuration: .init(), handlerType: EchoHandler.self)
assertLambdaRuntimeResult(result, shouldFailWithError: LambdaRuntimeError.badStatusCode(.internalServerError))
}
func testDeadline() {
let delta = Int.random(in: 1...600)
let milli1 = Date(timeIntervalSinceNow: Double(delta)).millisSinceEpoch
let milli2 = (DispatchWallTime.now() + .seconds(delta)).millisSinceEpoch
XCTAssertEqual(Double(milli1), Double(milli2), accuracy: 2.0)
let now1 = DispatchWallTime.now()
let now2 = DispatchWallTime(millisSinceEpoch: Date().millisSinceEpoch)
XCTAssertEqual(Double(now2.rawValue), Double(now1.rawValue), accuracy: 2_000_000.0)
let future1 = DispatchWallTime.now() + .seconds(delta)
let future2 = DispatchWallTime(millisSinceEpoch: Date(timeIntervalSinceNow: Double(delta)).millisSinceEpoch)
XCTAssertEqual(Double(future1.rawValue), Double(future2.rawValue), accuracy: 2_000_000.0)
let past1 = DispatchWallTime.now() - .seconds(delta)
let past2 = DispatchWallTime(millisSinceEpoch: Date(timeIntervalSinceNow: Double(-delta)).millisSinceEpoch)
XCTAssertEqual(Double(past1.rawValue), Double(past2.rawValue), accuracy: 2_000_000.0)
let context = LambdaContext(
requestID: UUID().uuidString,
traceID: UUID().uuidString,
invokedFunctionARN: UUID().uuidString,
deadline: .now() + .seconds(1),
cognitoIdentity: nil,
clientContext: nil,
logger: Logger(label: "test"),
eventLoop: NIOSingletons.posixEventLoopGroup.next(),
allocator: ByteBufferAllocator()
)
XCTAssertGreaterThan(context.deadline, .now())
let expiredContext = LambdaContext(
requestID: context.requestID,
traceID: context.traceID,
invokedFunctionARN: context.invokedFunctionARN,
deadline: .now() - .seconds(1),
cognitoIdentity: context.cognitoIdentity,
clientContext: context.clientContext,
logger: context.logger,
eventLoop: context.eventLoop,
allocator: context.allocator
)
XCTAssertLessThan(expiredContext.deadline, .now())
}
func testGetRemainingTime() {
let context = LambdaContext(
requestID: UUID().uuidString,
traceID: UUID().uuidString,
invokedFunctionARN: UUID().uuidString,
deadline: .now() + .seconds(1),
cognitoIdentity: nil,
clientContext: nil,
logger: Logger(label: "test"),
eventLoop: NIOSingletons.posixEventLoopGroup.next(),
allocator: ByteBufferAllocator()
)
XCTAssertLessThanOrEqual(context.getRemainingTime(), .seconds(1))
XCTAssertGreaterThan(context.getRemainingTime(), .milliseconds(800))
}
#if compiler(>=5.6)
func testSendable() async throws {
struct Handler: EventLoopLambdaHandler {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeSucceededFuture(Handler())
}
func handle(_ event: String, context: LambdaContext) -> EventLoopFuture<String> {
context.eventLoop.makeSucceededFuture("hello")
}
}
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let server = MockLambdaServer(behavior: Behavior(), port: 0)
var port: Int?
XCTAssertNoThrow(port = try server.start().wait())
guard let port else { return XCTFail("Expected the server to have started") }
defer { XCTAssertNoThrow(try server.stop().wait()) }
let logger = Logger(label: "TestLogger")
let configuration = LambdaConfiguration(
runtimeEngine: .init(address: "127.0.0.1:\(port)", requestTimeout: .milliseconds(100))
)
let handler1 = Handler()
let task = Task.detached {
print(configuration.description)
logger.info("hello")
let runner = LambdaRunner(eventLoop: eventLoopGroup.next(), configuration: configuration)
try await runner.run(
handler: CodableEventLoopLambdaHandler(
handler: handler1,
allocator: ByteBufferAllocator()
),
logger: logger
).get()
try await runner.initialize(
handlerType: CodableEventLoopLambdaHandler<Handler>.self,
logger: logger,
terminator: LambdaTerminator()
).flatMap { handler2 in
runner.run(handler: handler2, logger: logger)
}.get()
}
try await task.value
}
#endif
}
private struct Behavior: LambdaServerBehavior {
let requestId: String
let event: String
let result: Result<String?, RuntimeError>
init(
requestId: String = UUID().uuidString,
event: String = "hello",
result: Result<String?, RuntimeError> = .success("hello")
) {
self.requestId = requestId
self.event = event
self.result = result
}
func getInvocation() -> GetInvocationResult {
.success((requestId: self.requestId, event: self.event))
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTAssertEqual(self.requestId, requestId, "expecting requestId to match")
switch self.result {
case .success(let expected):
XCTAssertEqual(expected, response, "expecting response to match")
return .success(())
case .failure:
XCTFail("unexpected to fail, but succeeded with: \(response ?? "undefined")")
return .failure(.internalServerError)
}
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTAssertEqual(self.requestId, requestId, "expecting requestId to match")
switch self.result {
case .success:
XCTFail("unexpected to succeed, but failed with: \(error)")
return .failure(.internalServerError)
case .failure(let expected):
XCTAssertEqual(String(describing: expected), error.errorMessage, "expecting error to match")
return .success(())
}
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report init error")
return .failure(.internalServerError)
}
}
struct FailedBootstrapBehavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
XCTFail("should not get invocation")
return .failure(.internalServerError)
}
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should not report a response")
return .failure(.internalServerError)
}
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report an error")
return .failure(.internalServerError)
}
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
.success(())
}
}
+2 -145
View File
@@ -2,7 +2,7 @@
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Copyright (c) 2017-2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
@@ -12,153 +12,10 @@
//
//===----------------------------------------------------------------------===//
import Logging
import NIOCore
import NIOPosix
import XCTest
@testable import AWSLambdaRuntimeCore
func runLambda<Handler: SimpleLambdaHandler>(behavior: LambdaServerBehavior, handlerType: Handler.Type) throws {
try runLambda(behavior: behavior, handlerProvider: CodableSimpleLambdaHandler<Handler>.makeHandler(context:))
}
func runLambda<Handler: LambdaHandler>(behavior: LambdaServerBehavior, handlerType: Handler.Type) throws {
try runLambda(behavior: behavior, handlerProvider: CodableLambdaHandler<Handler>.makeHandler(context:))
}
func runLambda<Handler: EventLoopLambdaHandler>(behavior: LambdaServerBehavior, handlerType: Handler.Type) throws {
try runLambda(behavior: behavior, handlerProvider: CodableEventLoopLambdaHandler<Handler>.makeHandler(context:))
}
func runLambda<Handler: EventLoopLambdaHandler>(
behavior: LambdaServerBehavior,
handlerProvider: @escaping (LambdaInitializationContext) -> EventLoopFuture<Handler>
) throws {
try runLambda(
behavior: behavior,
handlerProvider: { context in
handlerProvider(context).map {
CodableEventLoopLambdaHandler(handler: $0, allocator: context.allocator)
}
}
)
}
func runLambda<Handler: EventLoopLambdaHandler>(
behavior: LambdaServerBehavior,
handlerProvider: @escaping (LambdaInitializationContext) async throws -> Handler
) throws {
try runLambda(
behavior: behavior,
handlerProvider: { context in
let handler = try await handlerProvider(context)
return CodableEventLoopLambdaHandler(handler: handler, allocator: context.allocator)
}
)
}
func runLambda<Handler: ByteBufferLambdaHandler>(
behavior: LambdaServerBehavior,
handlerProvider: @escaping (LambdaInitializationContext) async throws -> Handler
) throws {
let eventLoopGroup = NIOSingletons.posixEventLoopGroup.next()
try runLambda(
behavior: behavior,
handlerProvider: { context in
let promise = eventLoopGroup.next().makePromise(of: Handler.self)
promise.completeWithTask {
try await handlerProvider(context)
}
return promise.futureResult
}
)
}
func runLambda(
behavior: LambdaServerBehavior,
handlerProvider: @escaping (LambdaInitializationContext) -> EventLoopFuture<some ByteBufferLambdaHandler>
) throws {
let eventLoopGroup = NIOSingletons.posixEventLoopGroup.next()
let logger = Logger(label: "TestLogger")
let server = MockLambdaServer(behavior: behavior, port: 0)
let port = try server.start().wait()
let configuration = LambdaConfiguration(
runtimeEngine: .init(address: "127.0.0.1:\(port)", requestTimeout: .milliseconds(100))
)
let terminator = LambdaTerminator()
let runner = LambdaRunner(eventLoop: eventLoopGroup.next(), configuration: configuration)
defer { XCTAssertNoThrow(try server.stop().wait()) }
try runner.initialize(handlerProvider: handlerProvider, logger: logger, terminator: terminator).flatMap { handler in
runner.run(handler: handler, logger: logger)
}.wait()
}
func assertLambdaRuntimeResult(
_ result: Result<Int, Error>,
shouldHaveRun: Int = 0,
shouldFailWithError: Error? = nil,
file: StaticString = #file,
line: UInt = #line
) {
switch result {
case .success where shouldFailWithError != nil:
XCTFail("should fail with \(shouldFailWithError!)", file: file, line: line)
case .success(let count) where shouldFailWithError == nil:
XCTAssertEqual(shouldHaveRun, count, "should have run \(shouldHaveRun) times", file: file, line: line)
case .failure(let error) where shouldFailWithError == nil:
XCTFail("should succeed, but failed with \(error)", file: file, line: line)
case .failure(let error) where shouldFailWithError != nil:
XCTAssertEqual(
String(describing: shouldFailWithError!),
String(describing: error),
"expected error to mactch",
file: file,
line: line
)
default:
XCTFail("invalid state")
}
}
struct TestError: Error, Equatable, CustomStringConvertible {
let description: String
init(_ description: String) {
self.description = description
}
}
import Foundation
extension Date {
var millisSinceEpoch: Int64 {
Int64(self.timeIntervalSince1970 * 1000)
}
}
extension LambdaRuntimeError: Equatable {
public static func == (lhs: Self, rhs: Self) -> Bool {
// technically incorrect, but good enough for our tests
String(describing: lhs) == String(describing: rhs)
}
}
extension LambdaTerminator.TerminationError: Equatable {
public static func == (lhs: Self, rhs: Self) -> Bool {
guard lhs.underlying.count == rhs.underlying.count else {
return false
}
// technically incorrect, but good enough for our tests
return String(describing: lhs) == String(describing: rhs)
}
}
// for backward compatibility in tests
extension LambdaRunner {
func initialize<Handler: ByteBufferLambdaHandler>(
handlerType: Handler.Type,
logger: Logger,
terminator: LambdaTerminator
) -> EventLoopFuture<Handler> {
self.initialize(handlerProvider: handlerType.makeHandler(context:), logger: logger, terminator: terminator)
}
}
@@ -1,251 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Logging
import NIOCore
import NIOFoundationCompat
import NIOPosix
import XCTest
@testable import AWSLambdaRuntime
@testable import AWSLambdaRuntimeCore
class CodableLambdaTest: XCTestCase {
var eventLoopGroup: EventLoopGroup!
let allocator = ByteBufferAllocator()
override func setUp() {
self.eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
}
override func tearDown() {
XCTAssertNoThrow(try self.eventLoopGroup.syncShutdownGracefully())
}
func testCodableVoidEventLoopFutureHandler() {
struct Handler: EventLoopLambdaHandler {
var expected: Request?
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeSucceededFuture(Handler())
}
func handle(_ event: Request, context: LambdaContext) -> EventLoopFuture<Void> {
XCTAssertEqual(event, self.expected)
return context.eventLoop.makeSucceededVoidFuture()
}
}
let context = self.newContext()
let request = Request(requestId: UUID().uuidString)
let handler = CodableEventLoopLambdaHandler(
handler: Handler(expected: request),
allocator: context.allocator
)
var inputBuffer = context.allocator.buffer(capacity: 1024)
XCTAssertNoThrow(try JSONEncoder().encode(request, into: &inputBuffer))
var outputBuffer: ByteBuffer?
XCTAssertNoThrow(outputBuffer = try handler.handle(inputBuffer, context: context).wait())
XCTAssertEqual(outputBuffer?.readableBytes, 0)
}
func testCodableEventLoopFutureHandler() {
struct Handler: EventLoopLambdaHandler {
var expected: Request?
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeSucceededFuture(Handler())
}
func handle(_ event: Request, context: LambdaContext) -> EventLoopFuture<Response> {
XCTAssertEqual(event, self.expected)
return context.eventLoop.makeSucceededFuture(Response(requestId: event.requestId))
}
}
let context = self.newContext()
let request = Request(requestId: UUID().uuidString)
var response: Response?
let handler = CodableEventLoopLambdaHandler(
handler: Handler(expected: request),
allocator: context.allocator
)
var inputBuffer = context.allocator.buffer(capacity: 1024)
XCTAssertNoThrow(try JSONEncoder().encode(request, into: &inputBuffer))
var outputBuffer: ByteBuffer?
XCTAssertNoThrow(outputBuffer = try handler.handle(inputBuffer, context: context).wait())
XCTAssertNoThrow(response = try JSONDecoder().decode(Response.self, from: XCTUnwrap(outputBuffer)))
XCTAssertEqual(response?.requestId, request.requestId)
}
func testCodableVoidHandler() async throws {
struct Handler: LambdaHandler {
init(context: AWSLambdaRuntimeCore.LambdaInitializationContext) async throws {}
var expected: Request?
func handle(_ event: Request, context: LambdaContext) async throws {
XCTAssertEqual(event, self.expected)
}
}
let context = self.newContext()
let request = Request(requestId: UUID().uuidString)
var underlying = try await Handler(context: self.newInitContext())
underlying.expected = request
let handler = CodableLambdaHandler(
handler: underlying,
allocator: context.allocator
)
var inputBuffer = context.allocator.buffer(capacity: 1024)
XCTAssertNoThrow(try JSONEncoder().encode(request, into: &inputBuffer))
var outputBuffer: ByteBuffer?
XCTAssertNoThrow(outputBuffer = try handler.handle(inputBuffer, context: context).wait())
XCTAssertEqual(outputBuffer?.readableBytes, 0)
}
func testCodableHandler() async throws {
struct Handler: LambdaHandler {
init(context: AWSLambdaRuntimeCore.LambdaInitializationContext) async throws {}
var expected: Request?
func handle(_ event: Request, context: LambdaContext) async throws -> Response {
XCTAssertEqual(event, self.expected)
return Response(requestId: event.requestId)
}
}
let context = self.newContext()
let request = Request(requestId: UUID().uuidString)
var response: Response?
var underlying = try await Handler(context: self.newInitContext())
underlying.expected = request
let handler = CodableLambdaHandler(
handler: underlying,
allocator: context.allocator
)
var inputBuffer = context.allocator.buffer(capacity: 1024)
XCTAssertNoThrow(try JSONEncoder().encode(request, into: &inputBuffer))
var outputBuffer: ByteBuffer?
XCTAssertNoThrow(outputBuffer = try handler.handle(inputBuffer, context: context).wait())
XCTAssertNoThrow(response = try JSONDecoder().decode(Response.self, from: XCTUnwrap(outputBuffer)))
XCTAssertNoThrow(try handler.handle(inputBuffer, context: context).wait())
XCTAssertEqual(response?.requestId, request.requestId)
}
func testCodableVoidSimpleHandler() async throws {
struct Handler: SimpleLambdaHandler {
var expected: Request?
func handle(_ event: Request, context: LambdaContext) async throws {
XCTAssertEqual(event, self.expected)
}
}
let context = self.newContext()
let request = Request(requestId: UUID().uuidString)
var underlying = Handler()
underlying.expected = request
let handler = CodableSimpleLambdaHandler(
handler: underlying,
allocator: context.allocator
)
var inputBuffer = context.allocator.buffer(capacity: 1024)
XCTAssertNoThrow(try JSONEncoder().encode(request, into: &inputBuffer))
var outputBuffer: ByteBuffer?
XCTAssertNoThrow(outputBuffer = try handler.handle(inputBuffer, context: context).wait())
XCTAssertEqual(outputBuffer?.readableBytes, 0)
}
func testCodableSimpleHandler() async throws {
struct Handler: SimpleLambdaHandler {
var expected: Request?
func handle(_ event: Request, context: LambdaContext) async throws -> Response {
XCTAssertEqual(event, self.expected)
return Response(requestId: event.requestId)
}
}
let context = self.newContext()
let request = Request(requestId: UUID().uuidString)
var response: Response?
var underlying = Handler()
underlying.expected = request
let handler = CodableSimpleLambdaHandler(
handler: underlying,
allocator: context.allocator
)
var inputBuffer = context.allocator.buffer(capacity: 1024)
XCTAssertNoThrow(try JSONEncoder().encode(request, into: &inputBuffer))
var outputBuffer: ByteBuffer?
XCTAssertNoThrow(outputBuffer = try handler.handle(inputBuffer, context: context).wait())
XCTAssertNoThrow(response = try JSONDecoder().decode(Response.self, from: XCTUnwrap(outputBuffer)))
XCTAssertNoThrow(try handler.handle(inputBuffer, context: context).wait())
XCTAssertEqual(response?.requestId, request.requestId)
}
// convenience method
func newContext() -> LambdaContext {
LambdaContext(
requestID: UUID().uuidString,
traceID: "abc123",
invokedFunctionARN: "aws:arn:",
deadline: .now() + .seconds(3),
cognitoIdentity: nil,
clientContext: nil,
logger: Logger(label: "test"),
eventLoop: self.eventLoopGroup.next(),
allocator: ByteBufferAllocator()
)
}
func newInitContext() -> LambdaInitializationContext {
LambdaInitializationContext(
logger: Logger(label: "test"),
eventLoop: self.eventLoopGroup.next(),
allocator: ByteBufferAllocator(),
terminator: LambdaTerminator()
)
}
}
private struct Request: Codable, Equatable {
let requestId: String
init(requestId: String) {
self.requestId = requestId
}
}
private struct Response: Codable, Equatable {
let requestId: String
init(requestId: String) {
self.requestId = requestId
}
}
@@ -0,0 +1,18 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import AWSLambdaRuntime
import AWSLambdaTesting
import NIOCore
import Testing
-102
View File
@@ -1,102 +0,0 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import AWSLambdaRuntime
import AWSLambdaTesting
import NIOCore
import XCTest
class LambdaTestingTests: XCTestCase {
func testBasics() async throws {
struct MyLambda: SimpleLambdaHandler {
func handle(_ event: String, context: LambdaContext) async throws -> String {
event
}
}
let uuid = UUID().uuidString
let result = try await Lambda.test(MyLambda.self, with: uuid)
XCTAssertEqual(result, uuid)
}
func testCodableClosure() async throws {
struct Request: Codable {
let name: String
}
struct Response: Codable {
let message: String
}
struct MyLambda: SimpleLambdaHandler {
func handle(_ event: Request, context: LambdaContext) async throws -> Response {
Response(message: "echo" + event.name)
}
}
let request = Request(name: UUID().uuidString)
let response = try await Lambda.test(MyLambda.self, with: request)
XCTAssertEqual(response.message, "echo" + request.name)
}
func testCodableVoidClosure() async throws {
struct Request: Codable {
let name: String
}
struct MyLambda: SimpleLambdaHandler {
// DIRTY HACK: To verify the handler was actually invoked, we change a global variable.
static var VoidLambdaHandlerInvokeCount: Int = 0
func handle(_ event: Request, context: LambdaContext) async throws {
Self.VoidLambdaHandlerInvokeCount += 1
}
}
let request = Request(name: UUID().uuidString)
MyLambda.VoidLambdaHandlerInvokeCount = 0
try await Lambda.test(MyLambda.self, with: request)
XCTAssertEqual(MyLambda.VoidLambdaHandlerInvokeCount, 1)
}
func testInvocationFailure() async throws {
struct MyError: Error {}
struct MyLambda: SimpleLambdaHandler {
func handle(_ event: String, context: LambdaContext) async throws {
throw MyError()
}
}
do {
try await Lambda.test(MyLambda.self, with: UUID().uuidString)
XCTFail("expected to throw")
} catch {
XCTAssert(error is MyError)
}
}
func testAsyncLongRunning() async throws {
struct MyLambda: SimpleLambdaHandler {
func handle(_ event: String, context: LambdaContext) async throws -> String {
try await Task.sleep(nanoseconds: 500 * 1000 * 1000)
return event
}
}
let uuid = UUID().uuidString
let result = try await Lambda.test(MyLambda.self, with: uuid)
XCTAssertEqual(result, uuid)
}
}