Prefix data structures with Lambda instead of namespacing them (#256)

motivation: consisten naming convention

changes:
* Lambda.InitializationContext -> LambdaInitializationContext
* Lambda.Runner -> LambdaRunner
* Lambda.Configuration -> LambdaConfiguration
* Lambda.RuntimeError -> LambdaRuntimeError
* adjust call sites, tests, and examples
This commit is contained in:
tomer doron
2022-04-15 04:33:54 -07:00
committed by GitHub
parent d35e827121
commit e5b44962bd
28 changed files with 359 additions and 366 deletions
+1 -1
View File
@@ -25,7 +25,7 @@ struct BenchmarkHandler: EventLoopLambdaHandler {
typealias Event = String
typealias Output = String
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<Self> {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Self> {
context.eventLoop.makeSucceededFuture(BenchmarkHandler())
}
@@ -25,7 +25,7 @@ struct BenchmarkHandler: EventLoopLambdaHandler {
typealias Event = String
typealias Output = String
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<Self> {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Self> {
context.eventLoop.makeSucceededFuture(BenchmarkHandler())
}
@@ -20,7 +20,7 @@ struct HelloWorldHandler: LambdaHandler {
typealias Event = String
typealias Output = String
init(context: Lambda.InitializationContext) async throws {
init(context: LambdaInitializationContext) async throws {
// setup your resources that you want to reuse here.
}
+1 -1
View File
@@ -21,7 +21,7 @@ struct MyLambda: LambdaHandler {
typealias Event = String
typealias Output = String
init(context: Lambda.InitializationContext) async throws {
init(context: LambdaInitializationContext) async throws {
// setup your resources that you want to reuse for every invocation here.
}
+1 -1
View File
@@ -21,7 +21,7 @@ struct MyLambda: LambdaHandler {
typealias Event = Request
typealias Output = Response
init(context: Lambda.InitializationContext) async throws {}
init(context: LambdaInitializationContext) async throws {}
func handle(_ request: Request, context: LambdaContext) async throws -> Response {
// switch over the error type "requested" by the request, and trigger such error accordingly
+1 -1
View File
@@ -30,7 +30,7 @@ struct MyLambda: LambdaHandler {
let calculator: ExchangeRatesCalculator
init(context: Lambda.InitializationContext) async throws {
init(context: LambdaInitializationContext) async throws {
// the ExchangeRatesCalculator() can be reused over and over
self.calculator = ExchangeRatesCalculator()
}
+1 -1
View File
@@ -30,7 +30,7 @@ struct MyLambda: LambdaHandler {
typealias Event = Request
typealias Output = Response
init(context: Lambda.InitializationContext) async throws {
init(context: LambdaInitializationContext) async throws {
// setup your resources that you want to reuse for every invocation here.
}
@@ -23,7 +23,7 @@ struct MyLambda: LambdaHandler {
typealias Event = Request
typealias Output = Response
init(context: Lambda.InitializationContext) async throws {
init(context: LambdaInitializationContext) async throws {
// setup your resources that you want to reuse for every invocation here.
}
+1 -1
View File
@@ -21,7 +21,7 @@ struct MyLambda: LambdaHandler {
typealias Event = String
typealias Output = String
init(context: Lambda.InitializationContext) async throws {
init(context: LambdaInitializationContext) async throws {
// setup your resources that you want to reuse for every invocation here.
}
@@ -38,17 +38,17 @@ struct Invocation: Hashable {
init(headers: HTTPHeaders) throws {
guard let requestID = headers.first(name: AmazonHeaders.requestID), !requestID.isEmpty else {
throw Lambda.RuntimeError.invocationMissingHeader(AmazonHeaders.requestID)
throw LambdaRuntimeError.invocationMissingHeader(AmazonHeaders.requestID)
}
guard let deadline = headers.first(name: AmazonHeaders.deadline),
let unixTimeInMilliseconds = Int64(deadline)
else {
throw Lambda.RuntimeError.invocationMissingHeader(AmazonHeaders.deadline)
throw LambdaRuntimeError.invocationMissingHeader(AmazonHeaders.deadline)
}
guard let invokedFunctionARN = headers.first(name: AmazonHeaders.invokedFunctionARN) else {
throw Lambda.RuntimeError.invocationMissingHeader(AmazonHeaders.invokedFunctionARN)
throw LambdaRuntimeError.invocationMissingHeader(AmazonHeaders.invokedFunctionARN)
}
self.requestID = requestID
@@ -22,13 +22,13 @@ import NIOPosix
/// This means we can avoid locks and other concurrency concern we would otherwise need to build into the client
internal final class HTTPClient {
private let eventLoop: EventLoop
private let configuration: Lambda.Configuration.RuntimeEngine
private let configuration: LambdaConfiguration.RuntimeEngine
private let targetHost: String
private var state = State.disconnected
private var executing = false
init(eventLoop: EventLoop, configuration: Lambda.Configuration.RuntimeEngine) {
init(eventLoop: EventLoop, configuration: LambdaConfiguration.RuntimeEngine) {
self.eventLoop = eventLoop
self.configuration = configuration
self.targetHost = "\(self.configuration.ip):\(self.configuration.port)"
@@ -35,7 +35,7 @@ extension Lambda {
/// - invocationEndpoint: The endpoint to post events to.
/// - body: Code to run within the context of the mock server. Typically this would be a Lambda.run function call.
///
/// - note: This API is designed stricly for local testing and is behind a DEBUG flag
/// - note: This API is designed strictly for local testing and is behind a DEBUG flag
internal static func withLocalServer<Value>(invocationEndpoint: String? = nil, _ body: @escaping () -> Value) throws -> Value {
let server = LocalLambda.Server(invocationEndpoint: invocationEndpoint)
try server.start().wait()
@@ -55,7 +55,7 @@ private enum LocalLambda {
private let invocationEndpoint: String
public init(invocationEndpoint: String?) {
let configuration = Lambda.Configuration()
let configuration = LambdaConfiguration()
var logger = Logger(label: "LocalLambdaServer")
logger.logLevel = configuration.general.logLevel
self.logger = logger
@@ -299,5 +299,4 @@ private enum LocalLambda {
case cantBind
}
}
#endif
+14 -10
View File
@@ -24,14 +24,6 @@ import NIOCore
import NIOPosix
public enum Lambda {
/// Utility to access/read environment variables
public static func env(_ name: String) -> String? {
guard let value = getenv(name) else {
return nil
}
return String(cString: value)
}
/// Run a Lambda defined by implementing the ``ByteBufferLambdaHandler`` protocol.
/// The Runtime will manage the Lambdas application lifecycle automatically. It will invoke the
/// ``ByteBufferLambdaHandler/makeHandler(context:)`` to create a new Handler.
@@ -42,10 +34,10 @@ public enum Lambda {
///
/// - note: This is a blocking operation that will run forever, as its lifecycle is managed by the AWS Lambda Runtime Engine.
internal static func run<Handler: ByteBufferLambdaHandler>(
configuration: Configuration = .init(),
configuration: LambdaConfiguration = .init(),
handlerType: Handler.Type
) -> Result<Int, Error> {
let _run = { (configuration: Configuration) -> Result<Int, Error> in
let _run = { (configuration: LambdaConfiguration) -> Result<Int, Error> in
Backtrace.install()
var logger = Logger(label: "Lambda")
logger.logLevel = configuration.general.logLevel
@@ -97,3 +89,15 @@ public enum Lambda {
#endif
}
}
// MARK: - Public API
extension Lambda {
/// Utility to access/read environment variables
public static func env(_ name: String) -> String? {
guard let value = getenv(name) else {
return nil
}
return String(cString: value)
}
}
@@ -16,73 +16,71 @@ import Dispatch
import Logging
import NIOCore
extension Lambda {
internal struct Configuration: CustomStringConvertible {
let general: General
let lifecycle: Lifecycle
let runtimeEngine: RuntimeEngine
internal struct LambdaConfiguration: CustomStringConvertible {
let general: General
let lifecycle: Lifecycle
let runtimeEngine: RuntimeEngine
init() {
self.init(general: .init(), lifecycle: .init(), runtimeEngine: .init())
}
init() {
self.init(general: .init(), lifecycle: .init(), runtimeEngine: .init())
}
init(general: General? = nil, lifecycle: Lifecycle? = nil, runtimeEngine: RuntimeEngine? = nil) {
self.general = general ?? General()
self.lifecycle = lifecycle ?? Lifecycle()
self.runtimeEngine = runtimeEngine ?? RuntimeEngine()
}
init(general: General? = nil, lifecycle: Lifecycle? = nil, runtimeEngine: RuntimeEngine? = nil) {
self.general = general ?? General()
self.lifecycle = lifecycle ?? Lifecycle()
self.runtimeEngine = runtimeEngine ?? RuntimeEngine()
}
struct General: CustomStringConvertible {
let logLevel: Logger.Level
struct General: CustomStringConvertible {
let logLevel: Logger.Level
init(logLevel: Logger.Level? = nil) {
self.logLevel = logLevel ?? env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info
}
var description: String {
"\(General.self)(logLevel: \(self.logLevel))"
}
}
struct Lifecycle: CustomStringConvertible {
let id: String
let maxTimes: Int
let stopSignal: Signal
init(id: String? = nil, maxTimes: Int? = nil, stopSignal: Signal? = nil) {
self.id = id ?? "\(DispatchTime.now().uptimeNanoseconds)"
self.maxTimes = maxTimes ?? env("MAX_REQUESTS").flatMap(Int.init) ?? 0
self.stopSignal = stopSignal ?? env("STOP_SIGNAL").flatMap(Int32.init).flatMap(Signal.init) ?? Signal.TERM
precondition(self.maxTimes >= 0, "maxTimes must be equal or larger than 0")
}
var description: String {
"\(Lifecycle.self)(id: \(self.id), maxTimes: \(self.maxTimes), stopSignal: \(self.stopSignal))"
}
}
struct RuntimeEngine: CustomStringConvertible {
let ip: String
let port: Int
let requestTimeout: TimeAmount?
init(address: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) {
let ipPort = (address ?? env("AWS_LAMBDA_RUNTIME_API"))?.split(separator: ":") ?? ["127.0.0.1", "7000"]
guard ipPort.count == 2, let port = Int(ipPort[1]) else {
preconditionFailure("invalid ip+port configuration \(ipPort)")
}
self.ip = String(ipPort[0])
self.port = port
self.requestTimeout = requestTimeout ?? env("REQUEST_TIMEOUT").flatMap(Int64.init).flatMap { .milliseconds($0) }
}
var description: String {
"\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), requestTimeout: \(String(describing: self.requestTimeout))"
}
init(logLevel: Logger.Level? = nil) {
self.logLevel = logLevel ?? Lambda.env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info
}
var description: String {
"\(Configuration.self)\n \(self.general))\n \(self.lifecycle)\n \(self.runtimeEngine)"
"\(General.self)(logLevel: \(self.logLevel))"
}
}
struct Lifecycle: CustomStringConvertible {
let id: String
let maxTimes: Int
let stopSignal: Signal
init(id: String? = nil, maxTimes: Int? = nil, stopSignal: Signal? = nil) {
self.id = id ?? "\(DispatchTime.now().uptimeNanoseconds)"
self.maxTimes = maxTimes ?? Lambda.env("MAX_REQUESTS").flatMap(Int.init) ?? 0
self.stopSignal = stopSignal ?? Lambda.env("STOP_SIGNAL").flatMap(Int32.init).flatMap(Signal.init) ?? Signal.TERM
precondition(self.maxTimes >= 0, "maxTimes must be equal or larger than 0")
}
var description: String {
"\(Lifecycle.self)(id: \(self.id), maxTimes: \(self.maxTimes), stopSignal: \(self.stopSignal))"
}
}
struct RuntimeEngine: CustomStringConvertible {
let ip: String
let port: Int
let requestTimeout: TimeAmount?
init(address: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) {
let ipPort = (address ?? Lambda.env("AWS_LAMBDA_RUNTIME_API"))?.split(separator: ":") ?? ["127.0.0.1", "7000"]
guard ipPort.count == 2, let port = Int(ipPort[1]) else {
preconditionFailure("invalid ip+port configuration \(ipPort)")
}
self.ip = String(ipPort[0])
self.port = port
self.requestTimeout = requestTimeout ?? Lambda.env("REQUEST_TIMEOUT").flatMap(Int64.init).flatMap { .milliseconds($0) }
}
var description: String {
"\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), requestTimeout: \(String(describing: self.requestTimeout))"
}
}
var description: String {
"\(Self.self)\n \(self.general))\n \(self.lifecycle)\n \(self.runtimeEngine)"
}
}
@@ -24,48 +24,46 @@ import NIOCore
// MARK: - InitializationContext
extension Lambda {
/// Lambda runtime initialization context.
/// The Lambda runtime generates and passes the `InitializationContext` to the Handlers
/// ``ByteBufferLambdaHandler/makeHandler(context:)`` or ``LambdaHandler/init(context:)``
/// as an argument.
public struct InitializationContext: _AWSLambdaSendable {
/// `Logger` to log with
///
/// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable.
public let logger: Logger
/// Lambda runtime initialization context.
/// The Lambda runtime generates and passes the `LambdaInitializationContext` to the Handlers
/// ``ByteBufferLambdaHandler/makeHandler(context:)`` or ``LambdaHandler/init(context:)``
/// as an argument.
public struct LambdaInitializationContext: _AWSLambdaSendable {
/// `Logger` to log with
///
/// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable.
public let logger: Logger
/// The `EventLoop` the Lambda is executed on. Use this to schedule work with.
///
/// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care.
/// Most importantly the `EventLoop` must never be blocked.
public let eventLoop: EventLoop
/// The `EventLoop` the Lambda is executed on. Use this to schedule work with.
///
/// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care.
/// Most importantly the `EventLoop` must never be blocked.
public let eventLoop: EventLoop
/// `ByteBufferAllocator` to allocate `ByteBuffer`
public let allocator: ByteBufferAllocator
/// `ByteBufferAllocator` to allocate `ByteBuffer`
public let allocator: ByteBufferAllocator
/// `Terminator` to register shutdown operations
public let terminator: LambdaTerminator
/// `Terminator` to register shutdown operations
public let terminator: LambdaTerminator
init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, terminator: LambdaTerminator) {
self.eventLoop = eventLoop
self.logger = logger
self.allocator = allocator
self.terminator = terminator
}
init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, terminator: LambdaTerminator) {
self.eventLoop = eventLoop
self.logger = logger
self.allocator = allocator
self.terminator = terminator
}
/// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning.
public static func __forTestsOnly(
logger: Logger,
eventLoop: EventLoop
) -> InitializationContext {
InitializationContext(
logger: logger,
eventLoop: eventLoop,
allocator: ByteBufferAllocator(),
terminator: LambdaTerminator()
)
}
/// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning.
public static func __forTestsOnly(
logger: Logger,
eventLoop: EventLoop
) -> LambdaInitializationContext {
LambdaInitializationContext(
logger: logger,
eventLoop: eventLoop,
allocator: ByteBufferAllocator(),
terminator: LambdaTerminator()
)
}
}
@@ -33,7 +33,7 @@ public protocol LambdaHandler: EventLoopLambdaHandler {
/// Examples for this can be HTTP or database clients.
/// - parameters:
/// - context: Runtime `InitializationContext`.
init(context: Lambda.InitializationContext) async throws
init(context: LambdaInitializationContext) async throws
/// The Lambda handling method
/// Concrete Lambda handlers implement this method to provide the Lambda functionality.
@@ -48,7 +48,7 @@ public protocol LambdaHandler: EventLoopLambdaHandler {
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
extension LambdaHandler {
public static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<Self> {
public static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Self> {
let promise = context.eventLoop.makePromise(of: Self.self)
promise.completeWithTask {
try await Self(context: context)
@@ -181,7 +181,7 @@ public protocol ByteBufferLambdaHandler {
/// connections and HTTP clients for example. It is encouraged to use the given `EventLoop`'s conformance
/// to `EventLoopGroup` when initializing NIO dependencies. This will improve overall performance, as it
/// minimizes thread hopping.
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<Self>
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Self>
/// The Lambda handling method
/// Concrete Lambda handlers implement this method to provide the Lambda functionality.
+69 -71
View File
@@ -16,91 +16,89 @@ import Dispatch
import Logging
import NIOCore
extension Lambda {
/// LambdaRunner manages the Lambda runtime workflow, or business logic.
internal final class Runner {
private let runtimeClient: RuntimeClient
private let eventLoop: EventLoop
private let allocator: ByteBufferAllocator
/// LambdaRunner manages the Lambda runtime workflow, or business logic.
internal final class LambdaRunner {
private let runtimeClient: LambdaRuntimeClient
private let eventLoop: EventLoop
private let allocator: ByteBufferAllocator
private var isGettingNextInvocation = false
private var isGettingNextInvocation = false
init(eventLoop: EventLoop, configuration: Configuration) {
self.eventLoop = eventLoop
self.runtimeClient = RuntimeClient(eventLoop: self.eventLoop, configuration: configuration.runtimeEngine)
self.allocator = ByteBufferAllocator()
}
init(eventLoop: EventLoop, configuration: LambdaConfiguration) {
self.eventLoop = eventLoop
self.runtimeClient = LambdaRuntimeClient(eventLoop: self.eventLoop, configuration: configuration.runtimeEngine)
self.allocator = ByteBufferAllocator()
}
/// Run the user provided initializer. This *must* only be called once.
///
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization.
func initialize<Handler: ByteBufferLambdaHandler>(logger: Logger, terminator: LambdaTerminator, handlerType: Handler.Type) -> EventLoopFuture<Handler> {
logger.debug("initializing lambda")
// 1. create the handler from the factory
// 2. report initialization error if one occurred
let context = InitializationContext(
/// Run the user provided initializer. This *must* only be called once.
///
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization.
func initialize<Handler: ByteBufferLambdaHandler>(logger: Logger, terminator: LambdaTerminator, handlerType: Handler.Type) -> EventLoopFuture<Handler> {
logger.debug("initializing lambda")
// 1. create the handler from the factory
// 2. report initialization error if one occurred
let context = LambdaInitializationContext(
logger: logger,
eventLoop: self.eventLoop,
allocator: self.allocator,
terminator: terminator
)
return Handler.makeHandler(context: context)
// Hopping back to "our" EventLoop is important in case the factory returns a future
// that originated from a foreign EventLoop/EventLoopGroup.
// This can happen if the factory uses a library (let's say a database client) that manages its own threads/loops
// for whatever reason and returns a future that originated from that foreign EventLoop.
.hop(to: self.eventLoop)
.peekError { error in
self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in
// We're going to bail out because the init failed, so there's not a lot we can do other than log
// that we couldn't report this error back to the runtime.
logger.error("failed reporting initialization error to lambda runtime engine: \(reportingError)")
}
}
}
func run<Handler: ByteBufferLambdaHandler>(logger: Logger, handler: Handler) -> EventLoopFuture<Void> {
logger.debug("lambda invocation sequence starting")
// 1. request invocation from lambda runtime engine
self.isGettingNextInvocation = true
return self.runtimeClient.getNextInvocation(logger: logger).peekError { error in
logger.error("could not fetch work from lambda runtime engine: \(error)")
}.flatMap { invocation, bytes in
// 2. send invocation to handler
self.isGettingNextInvocation = false
let context = LambdaContext(
logger: logger,
eventLoop: self.eventLoop,
allocator: self.allocator,
terminator: terminator
invocation: invocation
)
return Handler.makeHandler(context: context)
// Hopping back to "our" EventLoop is important in case the factory returns a future
// that originated from a foreign EventLoop/EventLoopGroup.
// This can happen if the factory uses a library (let's say a database client) that manages its own threads/loops
logger.debug("sending invocation to lambda handler \(handler)")
return handler.handle(bytes, context: context)
// Hopping back to "our" EventLoop is important in case the handler returns a future that
// originiated from a foreign EventLoop/EventLoopGroup.
// This can happen if the handler uses a library (lets say a DB client) that manages its own threads/loops
// for whatever reason and returns a future that originated from that foreign EventLoop.
.hop(to: self.eventLoop)
.peekError { error in
self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in
// We're going to bail out because the init failed, so there's not a lot we can do other than log
// that we couldn't report this error back to the runtime.
logger.error("failed reporting initialization error to lambda runtime engine: \(reportingError)")
.mapResult { result in
if case .failure(let error) = result {
logger.warning("lambda handler returned an error: \(error)")
}
return (invocation, result)
}
}
func run<Handler: ByteBufferLambdaHandler>(logger: Logger, handler: Handler) -> EventLoopFuture<Void> {
logger.debug("lambda invocation sequence starting")
// 1. request invocation from lambda runtime engine
self.isGettingNextInvocation = true
return self.runtimeClient.getNextInvocation(logger: logger).peekError { error in
logger.error("could not fetch work from lambda runtime engine: \(error)")
}.flatMap { invocation, bytes in
// 2. send invocation to handler
self.isGettingNextInvocation = false
let context = LambdaContext(
logger: logger,
eventLoop: self.eventLoop,
allocator: self.allocator,
invocation: invocation
)
logger.debug("sending invocation to lambda handler \(handler)")
return handler.handle(bytes, context: context)
// Hopping back to "our" EventLoop is important in case the handler returns a future that
// originiated from a foreign EventLoop/EventLoopGroup.
// This can happen if the handler uses a library (lets say a DB client) that manages its own threads/loops
// for whatever reason and returns a future that originated from that foreign EventLoop.
.hop(to: self.eventLoop)
.mapResult { result in
if case .failure(let error) = result {
logger.warning("lambda handler returned an error: \(error)")
}
return (invocation, result)
}
}.flatMap { invocation, result in
// 3. report results to runtime engine
self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in
logger.error("could not report results to lambda runtime engine: \(error)")
}
}.flatMap { invocation, result in
// 3. report results to runtime engine
self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in
logger.error("could not report results to lambda runtime engine: \(error)")
}
}
}
/// cancels the current run, if we are waiting for next invocation (long poll from Lambda control plane)
/// only needed for debugging purposes.
func cancelWaitingForNextInvocation() {
if self.isGettingNextInvocation {
self.runtimeClient.cancel()
}
/// cancels the current run, if we are waiting for next invocation (long poll from Lambda control plane)
/// only needed for debugging purposes.
func cancelWaitingForNextInvocation() {
if self.isGettingNextInvocation {
self.runtimeClient.cancel()
}
}
}
@@ -23,7 +23,7 @@ public final class LambdaRuntime<Handler: ByteBufferLambdaHandler> {
private let eventLoop: EventLoop
private let shutdownPromise: EventLoopPromise<Int>
private let logger: Logger
private let configuration: Lambda.Configuration
private let configuration: LambdaConfiguration
private var state = State.idle {
willSet {
@@ -41,7 +41,7 @@ public final class LambdaRuntime<Handler: ByteBufferLambdaHandler> {
self.init(eventLoop: eventLoop, logger: logger, configuration: .init())
}
init(eventLoop: EventLoop, logger: Logger, configuration: Lambda.Configuration) {
init(eventLoop: EventLoop, logger: Logger, configuration: LambdaConfiguration) {
self.eventLoop = eventLoop
self.shutdownPromise = eventLoop.makePromise(of: Int.self)
self.logger = logger
@@ -82,7 +82,7 @@ public final class LambdaRuntime<Handler: ByteBufferLambdaHandler> {
var logger = self.logger
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
let terminator = LambdaTerminator()
let runner = Lambda.Runner(eventLoop: self.eventLoop, configuration: self.configuration)
let runner = LambdaRunner(eventLoop: self.eventLoop, configuration: self.configuration)
let startupFuture = runner.initialize(logger: logger, terminator: terminator, handlerType: Handler.self)
startupFuture.flatMap { handler -> EventLoopFuture<Result<Int, Error>> in
@@ -99,7 +99,7 @@ public final class LambdaRuntime<Handler: ByteBufferLambdaHandler> {
// if, we had an error shutting down the handler, we want to concatenate it with
// the runner result
logger.error("Error shutting down handler: \(error)")
throw Lambda.RuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult)
throw LambdaRuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult)
}.flatMapResult { _ -> Result<Int, Error> in
// we had no error shutting down the lambda. let's return the runner's result
runnerResult
@@ -176,7 +176,7 @@ public final class LambdaRuntime<Handler: ByteBufferLambdaHandler> {
private enum State {
case idle
case initializing
case active(Lambda.Runner, Handler)
case active(LambdaRunner, Handler)
case shuttingdown
case shutdown
@@ -26,123 +26,119 @@ import NIOHTTP1
/// * /runtime/invocation/response
/// * /runtime/invocation/error
/// * /runtime/init/error
extension Lambda {
internal struct RuntimeClient {
private let eventLoop: EventLoop
private let allocator = ByteBufferAllocator()
private let httpClient: HTTPClient
internal struct LambdaRuntimeClient {
private let eventLoop: EventLoop
private let allocator = ByteBufferAllocator()
private let httpClient: HTTPClient
init(eventLoop: EventLoop, configuration: Configuration.RuntimeEngine) {
self.eventLoop = eventLoop
self.httpClient = HTTPClient(eventLoop: eventLoop, configuration: configuration)
}
init(eventLoop: EventLoop, configuration: LambdaConfiguration.RuntimeEngine) {
self.eventLoop = eventLoop
self.httpClient = HTTPClient(eventLoop: eventLoop, configuration: configuration)
}
/// Requests invocation from the control plane.
func getNextInvocation(logger: Logger) -> EventLoopFuture<(Invocation, ByteBuffer)> {
let url = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix
logger.debug("requesting work from lambda runtime engine using \(url)")
return self.httpClient.get(url: url, headers: RuntimeClient.defaultHeaders).flatMapThrowing { response in
guard response.status == .ok else {
throw RuntimeError.badStatusCode(response.status)
}
let invocation = try Invocation(headers: response.headers)
guard let event = response.body else {
throw RuntimeError.noBody
}
return (invocation, event)
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
throw RuntimeError.upstreamError("timeout")
case HTTPClient.Errors.connectionResetByPeer:
throw RuntimeError.upstreamError("connectionResetByPeer")
default:
throw error
}
/// Requests invocation from the control plane.
func getNextInvocation(logger: Logger) -> EventLoopFuture<(Invocation, ByteBuffer)> {
let url = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix
logger.debug("requesting work from lambda runtime engine using \(url)")
return self.httpClient.get(url: url, headers: LambdaRuntimeClient.defaultHeaders).flatMapThrowing { response in
guard response.status == .ok else {
throw LambdaRuntimeError.badStatusCode(response.status)
}
let invocation = try Invocation(headers: response.headers)
guard let event = response.body else {
throw LambdaRuntimeError.noBody
}
return (invocation, event)
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
throw LambdaRuntimeError.upstreamError("timeout")
case HTTPClient.Errors.connectionResetByPeer:
throw LambdaRuntimeError.upstreamError("connectionResetByPeer")
default:
throw error
}
}
}
/// Reports a result to the Runtime Engine.
func reportResults(logger: Logger, invocation: Invocation, result: Result<ByteBuffer?, Error>) -> EventLoopFuture<Void> {
var url = Consts.invocationURLPrefix + "/" + invocation.requestID
var body: ByteBuffer?
let headers: HTTPHeaders
/// Reports a result to the Runtime Engine.
func reportResults(logger: Logger, invocation: Invocation, result: Result<ByteBuffer?, Error>) -> EventLoopFuture<Void> {
var url = Consts.invocationURLPrefix + "/" + invocation.requestID
var body: ByteBuffer?
let headers: HTTPHeaders
switch result {
case .success(let buffer):
url += Consts.postResponseURLSuffix
body = buffer
headers = RuntimeClient.defaultHeaders
case .failure(let error):
url += Consts.postErrorURLSuffix
let errorResponse = ErrorResponse(errorType: Consts.functionError, errorMessage: "\(error)")
let bytes = errorResponse.toJSONBytes()
body = self.allocator.buffer(capacity: bytes.count)
body!.writeBytes(bytes)
headers = RuntimeClient.errorHeaders
}
logger.debug("reporting results to lambda runtime engine using \(url)")
return self.httpClient.post(url: url, headers: headers, body: body).flatMapThrowing { response in
guard response.status == .accepted else {
throw RuntimeError.badStatusCode(response.status)
}
return ()
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
throw RuntimeError.upstreamError("timeout")
case HTTPClient.Errors.connectionResetByPeer:
throw RuntimeError.upstreamError("connectionResetByPeer")
default:
throw error
}
}
}
/// Reports an initialization error to the Runtime Engine.
func reportInitializationError(logger: Logger, error: Error) -> EventLoopFuture<Void> {
let url = Consts.postInitErrorURL
let errorResponse = ErrorResponse(errorType: Consts.initializationError, errorMessage: "\(error)")
switch result {
case .success(let buffer):
url += Consts.postResponseURLSuffix
body = buffer
headers = LambdaRuntimeClient.defaultHeaders
case .failure(let error):
url += Consts.postErrorURLSuffix
let errorResponse = ErrorResponse(errorType: Consts.functionError, errorMessage: "\(error)")
let bytes = errorResponse.toJSONBytes()
var body = self.allocator.buffer(capacity: bytes.count)
body.writeBytes(bytes)
logger.warning("reporting initialization error to lambda runtime engine using \(url)")
return self.httpClient.post(url: url, headers: RuntimeClient.errorHeaders, body: body).flatMapThrowing { response in
guard response.status == .accepted else {
throw RuntimeError.badStatusCode(response.status)
}
return ()
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
throw RuntimeError.upstreamError("timeout")
case HTTPClient.Errors.connectionResetByPeer:
throw RuntimeError.upstreamError("connectionResetByPeer")
default:
throw error
}
body = self.allocator.buffer(capacity: bytes.count)
body!.writeBytes(bytes)
headers = LambdaRuntimeClient.errorHeaders
}
logger.debug("reporting results to lambda runtime engine using \(url)")
return self.httpClient.post(url: url, headers: headers, body: body).flatMapThrowing { response in
guard response.status == .accepted else {
throw LambdaRuntimeError.badStatusCode(response.status)
}
return ()
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
throw LambdaRuntimeError.upstreamError("timeout")
case HTTPClient.Errors.connectionResetByPeer:
throw LambdaRuntimeError.upstreamError("connectionResetByPeer")
default:
throw error
}
}
}
/// Cancels the current request, if one is running. Only needed for debugging purposes
func cancel() {
self.httpClient.cancel()
/// Reports an initialization error to the Runtime Engine.
func reportInitializationError(logger: Logger, error: Error) -> EventLoopFuture<Void> {
let url = Consts.postInitErrorURL
let errorResponse = ErrorResponse(errorType: Consts.initializationError, errorMessage: "\(error)")
let bytes = errorResponse.toJSONBytes()
var body = self.allocator.buffer(capacity: bytes.count)
body.writeBytes(bytes)
logger.warning("reporting initialization error to lambda runtime engine using \(url)")
return self.httpClient.post(url: url, headers: LambdaRuntimeClient.errorHeaders, body: body).flatMapThrowing { response in
guard response.status == .accepted else {
throw LambdaRuntimeError.badStatusCode(response.status)
}
return ()
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
throw LambdaRuntimeError.upstreamError("timeout")
case HTTPClient.Errors.connectionResetByPeer:
throw LambdaRuntimeError.upstreamError("connectionResetByPeer")
default:
throw error
}
}
}
}
extension Lambda {
internal enum RuntimeError: Error {
case badStatusCode(HTTPResponseStatus)
case upstreamError(String)
case invocationMissingHeader(String)
case noBody
case json(Error)
case shutdownError(shutdownError: Error, runnerResult: Result<Int, Error>)
/// Cancels the current request, if one is running. Only needed for debugging purposes
func cancel() {
self.httpClient.cancel()
}
}
extension Lambda.RuntimeClient {
internal enum LambdaRuntimeError: Error {
case badStatusCode(HTTPResponseStatus)
case upstreamError(String)
case invocationMissingHeader(String)
case noBody
case json(Error)
case shutdownError(shutdownError: Error, runnerResult: Result<Int, Error>)
}
extension LambdaRuntimeClient {
internal static let defaultHeaders = HTTPHeaders([("user-agent", "Swift-Lambda/Unknown")])
/// These headers must be sent along an invocation or initialization error report
@@ -72,7 +72,7 @@ extension Lambda {
let eventLoop = eventLoopGroup.next()
let promise = eventLoop.makePromise(of: Handler.self)
let initContext = Lambda.InitializationContext.__forTestsOnly(
let initContext = LambdaInitializationContext.__forTestsOnly(
logger: logger,
eventLoop: eventLoop
)
@@ -33,7 +33,7 @@ class LambdaHandlerTest: XCTestCase {
var initialized = false
init(context: Lambda.InitializationContext) async throws {
init(context: LambdaInitializationContext) async throws {
XCTAssertFalse(self.initialized)
try await Task.sleep(nanoseconds: 100 * 1000 * 1000) // 0.1 seconds
self.initialized = true
@@ -45,7 +45,7 @@ class LambdaHandlerTest: XCTestCase {
}
let maxTimes = Int.random(in: 10 ... 20)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: TestBootstrapHandler.self)
assertLambdaRuntimeResult(result, shoudHaveRun: maxTimes)
}
@@ -62,7 +62,7 @@ class LambdaHandlerTest: XCTestCase {
var initialized = false
init(context: Lambda.InitializationContext) async throws {
init(context: LambdaInitializationContext) async throws {
XCTAssertFalse(self.initialized)
try await Task.sleep(nanoseconds: 100 * 1000 * 1000) // 0.1 seconds
throw TestError("kaboom")
@@ -74,7 +74,7 @@ class LambdaHandlerTest: XCTestCase {
}
let maxTimes = Int.random(in: 10 ... 20)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: TestBootstrapHandler.self)
assertLambdaRuntimeResult(result, shouldFailWithError: TestError("kaboom"))
}
@@ -89,7 +89,7 @@ class LambdaHandlerTest: XCTestCase {
typealias Event = String
typealias Output = String
init(context: Lambda.InitializationContext) {}
init(context: LambdaInitializationContext) {}
func handle(_ event: String, context: LambdaContext) async throws -> String {
event
@@ -97,7 +97,7 @@ class LambdaHandlerTest: XCTestCase {
}
let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: Handler.self)
assertLambdaRuntimeResult(result, shoudHaveRun: maxTimes)
}
@@ -112,13 +112,13 @@ class LambdaHandlerTest: XCTestCase {
typealias Event = String
typealias Output = Void
init(context: Lambda.InitializationContext) {}
init(context: LambdaInitializationContext) {}
func handle(_ event: String, context: LambdaContext) async throws {}
}
let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: Handler.self)
assertLambdaRuntimeResult(result, shoudHaveRun: maxTimes)
@@ -134,7 +134,7 @@ class LambdaHandlerTest: XCTestCase {
typealias Event = String
typealias Output = String
init(context: Lambda.InitializationContext) {}
init(context: LambdaInitializationContext) {}
func handle(_ event: String, context: LambdaContext) async throws -> String {
throw TestError("boom")
@@ -142,7 +142,7 @@ class LambdaHandlerTest: XCTestCase {
}
let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: Handler.self)
assertLambdaRuntimeResult(result, shoudHaveRun: maxTimes)
}
@@ -159,7 +159,7 @@ class LambdaHandlerTest: XCTestCase {
typealias Event = String
typealias Output = String
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<Handler> {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeSucceededFuture(Handler())
}
@@ -169,7 +169,7 @@ class LambdaHandlerTest: XCTestCase {
}
let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: Handler.self)
assertLambdaRuntimeResult(result, shoudHaveRun: maxTimes)
}
@@ -183,7 +183,7 @@ class LambdaHandlerTest: XCTestCase {
typealias Event = String
typealias Output = Void
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<Handler> {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeSucceededFuture(Handler())
}
@@ -193,7 +193,7 @@ class LambdaHandlerTest: XCTestCase {
}
let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: Handler.self)
assertLambdaRuntimeResult(result, shoudHaveRun: maxTimes)
}
@@ -207,7 +207,7 @@ class LambdaHandlerTest: XCTestCase {
typealias Event = String
typealias Output = String
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<Handler> {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeSucceededFuture(Handler())
}
@@ -217,7 +217,7 @@ class LambdaHandlerTest: XCTestCase {
}
let maxTimes = Int.random(in: 1 ... 10)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: Handler.self)
assertLambdaRuntimeResult(result, shoudHaveRun: maxTimes)
}
@@ -231,7 +231,7 @@ class LambdaHandlerTest: XCTestCase {
typealias Event = String
typealias Output = String
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<Handler> {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeFailedFuture(TestError("kaboom"))
}
@@ -20,7 +20,7 @@ struct EchoHandler: EventLoopLambdaHandler {
typealias Event = String
typealias Output = String
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<EchoHandler> {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<EchoHandler> {
context.eventLoop.makeSucceededFuture(EchoHandler())
}
@@ -35,7 +35,7 @@ struct StartupErrorHandler: EventLoopLambdaHandler {
typealias Event = String
typealias Output = String
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<StartupErrorHandler> {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<StartupErrorHandler> {
context.eventLoop.makeFailedFuture(StartupError())
}
@@ -51,7 +51,7 @@ struct RuntimeErrorHandler: EventLoopLambdaHandler {
typealias Event = String
typealias Output = Void
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<RuntimeErrorHandler> {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<RuntimeErrorHandler> {
context.eventLoop.makeSucceededFuture(RuntimeErrorHandler())
}
@@ -64,7 +64,7 @@ class LambdaRuntimeClientTest: XCTestCase {
}
}
XCTAssertThrowsError(try runLambda(behavior: Behavior(), handlerType: EchoHandler.self)) {
XCTAssertEqual($0 as? Lambda.RuntimeError, .badStatusCode(.internalServerError))
XCTAssertEqual($0 as? LambdaRuntimeError, .badStatusCode(.internalServerError))
}
}
@@ -90,7 +90,7 @@ class LambdaRuntimeClientTest: XCTestCase {
}
}
XCTAssertThrowsError(try runLambda(behavior: Behavior(), handlerType: EchoHandler.self)) {
XCTAssertEqual($0 as? Lambda.RuntimeError, .noBody)
XCTAssertEqual($0 as? LambdaRuntimeError, .noBody)
}
}
@@ -117,7 +117,7 @@ class LambdaRuntimeClientTest: XCTestCase {
}
}
XCTAssertThrowsError(try runLambda(behavior: Behavior(), handlerType: EchoHandler.self)) {
XCTAssertEqual($0 as? Lambda.RuntimeError, .invocationMissingHeader(AmazonHeaders.requestID))
XCTAssertEqual($0 as? LambdaRuntimeError, .invocationMissingHeader(AmazonHeaders.requestID))
}
}
@@ -142,7 +142,7 @@ class LambdaRuntimeClientTest: XCTestCase {
}
}
XCTAssertThrowsError(try runLambda(behavior: Behavior(), handlerType: EchoHandler.self)) {
XCTAssertEqual($0 as? Lambda.RuntimeError, .badStatusCode(.internalServerError))
XCTAssertEqual($0 as? LambdaRuntimeError, .badStatusCode(.internalServerError))
}
}
@@ -167,7 +167,7 @@ class LambdaRuntimeClientTest: XCTestCase {
}
}
XCTAssertThrowsError(try runLambda(behavior: Behavior(), handlerType: RuntimeErrorHandler.self)) {
XCTAssertEqual($0 as? Lambda.RuntimeError, .badStatusCode(.internalServerError))
XCTAssertEqual($0 as? LambdaRuntimeError, .badStatusCode(.internalServerError))
}
}
@@ -223,7 +223,7 @@ class LambdaRuntimeClientTest: XCTestCase {
defer { XCTAssertNoThrow(try server.stop()) }
let logger = Logger(label: "TestLogger")
let client = Lambda.RuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)"))
let client = LambdaRuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)"))
let result = client.reportInitializationError(logger: logger, error: TestError("boom"))
var inboundHeader: HTTPServerRequestPart?
@@ -252,7 +252,7 @@ class LambdaRuntimeClientTest: XCTestCase {
defer { XCTAssertNoThrow(try server.stop()) }
let logger = Logger(label: "TestLogger")
let client = Lambda.RuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)"))
let client = LambdaRuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)"))
let header = HTTPHeaders([
(AmazonHeaders.requestID, "test"),
@@ -292,7 +292,7 @@ class LambdaRuntimeClientTest: XCTestCase {
defer { XCTAssertNoThrow(try server.stop()) }
let logger = Logger(label: "TestLogger")
let client = Lambda.RuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)"))
let client = LambdaRuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)"))
let header = HTTPHeaders([
(AmazonHeaders.requestID, "test"),
@@ -55,7 +55,7 @@ class LambdaRuntimeTest: XCTestCase {
XCTAssertNoThrow(_ = try eventLoop.flatSubmit { runtime.start() }.wait())
XCTAssertThrowsError(_ = try runtime.shutdownFuture.wait()) {
XCTAssertEqual(.badStatusCode(HTTPResponseStatus.internalServerError), $0 as? Lambda.RuntimeError)
XCTAssertEqual(.badStatusCode(HTTPResponseStatus.internalServerError), $0 as? LambdaRuntimeError)
}
}
@@ -74,7 +74,7 @@ class LambdaRuntimeTest: XCTestCase {
typealias Event = String
typealias Output = Void
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<ShutdownErrorHandler> {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<ShutdownErrorHandler> {
// register shutdown operation
context.terminator.register(name: "test 1", handler: { eventLoop in
eventLoop.makeFailedFuture(ShutdownError(description: "error 1"))
@@ -105,7 +105,7 @@ class LambdaRuntimeTest: XCTestCase {
XCTAssertNoThrow(try eventLoop.flatSubmit { runtime.start() }.wait())
XCTAssertThrowsError(try runtime.shutdownFuture.wait()) { error in
guard case Lambda.RuntimeError.shutdownError(let shutdownError, .failure(let runtimeError)) = error else {
guard case LambdaRuntimeError.shutdownError(let shutdownError, .failure(let runtimeError)) = error else {
XCTFail("Unexpected error: \(error)"); return
}
@@ -114,7 +114,7 @@ class LambdaRuntimeTest: XCTestCase {
ShutdownError(description: "error 2"),
ShutdownError(description: "error 1"),
]))
XCTAssertEqual(runtimeError as? Lambda.RuntimeError, .badStatusCode(.internalServerError))
XCTAssertEqual(runtimeError as? LambdaRuntimeError, .badStatusCode(.internalServerError))
}
}
}
@@ -30,7 +30,7 @@ class LambdaTest: XCTestCase {
defer { XCTAssertNoThrow(try server.stop().wait()) }
let maxTimes = Int.random(in: 10 ... 20)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: EchoHandler.self)
assertLambdaRuntimeResult(result, shoudHaveRun: maxTimes)
}
@@ -41,7 +41,7 @@ class LambdaTest: XCTestCase {
defer { XCTAssertNoThrow(try server.stop().wait()) }
let maxTimes = Int.random(in: 10 ... 20)
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: RuntimeErrorHandler.self)
assertLambdaRuntimeResult(result, shoudHaveRun: maxTimes)
}
@@ -92,7 +92,7 @@ class LambdaTest: XCTestCase {
let signal = Signal.ALRM
let maxTimes = 1000
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes, stopSignal: signal))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes, stopSignal: signal))
DispatchQueue(label: "test").async {
// we need to schedule the signal before we start the long running `Lambda.run`, since
@@ -117,10 +117,10 @@ class LambdaTest: XCTestCase {
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: 1),
runtimeEngine: .init(requestTimeout: .milliseconds(timeout)))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: 1),
runtimeEngine: .init(requestTimeout: .milliseconds(timeout)))
let result = Lambda.run(configuration: configuration, handlerType: EchoHandler.self)
assertLambdaRuntimeResult(result, shouldFailWithError: Lambda.RuntimeError.upstreamError("timeout"))
assertLambdaRuntimeResult(result, shouldFailWithError: LambdaRuntimeError.upstreamError("timeout"))
}
func testDisconnect() {
@@ -128,9 +128,9 @@ class LambdaTest: XCTestCase {
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: 1))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: 1))
let result = Lambda.run(configuration: configuration, handlerType: EchoHandler.self)
assertLambdaRuntimeResult(result, shouldFailWithError: Lambda.RuntimeError.upstreamError("connectionResetByPeer"))
assertLambdaRuntimeResult(result, shouldFailWithError: LambdaRuntimeError.upstreamError("connectionResetByPeer"))
}
func testBigEvent() {
@@ -139,7 +139,7 @@ class LambdaTest: XCTestCase {
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: 1))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: 1))
let result = Lambda.run(configuration: configuration, handlerType: EchoHandler.self)
assertLambdaRuntimeResult(result, shoudHaveRun: 1)
}
@@ -150,7 +150,7 @@ class LambdaTest: XCTestCase {
defer { XCTAssertNoThrow(try server.stop().wait()) }
let maxTimes = 10
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: EchoHandler.self)
assertLambdaRuntimeResult(result, shoudHaveRun: maxTimes)
}
@@ -161,7 +161,7 @@ class LambdaTest: XCTestCase {
defer { XCTAssertNoThrow(try server.stop().wait()) }
let maxTimes = 10
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
let configuration = LambdaConfiguration(lifecycle: .init(maxTimes: maxTimes))
let result = Lambda.run(configuration: configuration, handlerType: EchoHandler.self)
assertLambdaRuntimeResult(result, shoudHaveRun: maxTimes)
}
@@ -191,7 +191,7 @@ class LambdaTest: XCTestCase {
}
let result = Lambda.run(configuration: .init(), handlerType: EchoHandler.self)
assertLambdaRuntimeResult(result, shouldFailWithError: Lambda.RuntimeError.badStatusCode(.internalServerError))
assertLambdaRuntimeResult(result, shouldFailWithError: LambdaRuntimeError.badStatusCode(.internalServerError))
}
func testDeadline() {
@@ -262,7 +262,7 @@ class LambdaTest: XCTestCase {
typealias Event = String
typealias Output = String
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<Handler> {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeSucceededFuture(Handler())
}
@@ -278,13 +278,13 @@ class LambdaTest: XCTestCase {
defer { XCTAssertNoThrow(try server.stop().wait()) }
let logger = Logger(label: "TestLogger")
let configuration = Lambda.Configuration(runtimeEngine: .init(requestTimeout: .milliseconds(100)))
let configuration = LambdaConfiguration(runtimeEngine: .init(requestTimeout: .milliseconds(100)))
let handler1 = Handler()
let task = Task.detached {
print(configuration.description)
logger.info("hello")
let runner = Lambda.Runner(eventLoop: eventLoopGroup.next(), configuration: configuration)
let runner = LambdaRunner(eventLoop: eventLoopGroup.next(), configuration: configuration)
try runner.run(logger: logger, handler: handler1).wait()
+4 -4
View File
@@ -22,9 +22,9 @@ func runLambda<Handler: ByteBufferLambdaHandler>(behavior: LambdaServerBehavior,
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let logger = Logger(label: "TestLogger")
let configuration = Lambda.Configuration(runtimeEngine: .init(requestTimeout: .milliseconds(100)))
let configuration = LambdaConfiguration(runtimeEngine: .init(requestTimeout: .milliseconds(100)))
let terminator = LambdaTerminator()
let runner = Lambda.Runner(eventLoop: eventLoopGroup.next(), configuration: configuration)
let runner = LambdaRunner(eventLoop: eventLoopGroup.next(), configuration: configuration)
let server = try MockLambdaServer(behavior: behavior).start().wait()
defer { XCTAssertNoThrow(try server.stop().wait()) }
try runner.initialize(logger: logger, terminator: terminator, handlerType: handlerType).flatMap { handler in
@@ -61,8 +61,8 @@ extension Date {
}
}
extension Lambda.RuntimeError: Equatable {
public static func == (lhs: Lambda.RuntimeError, rhs: Lambda.RuntimeError) -> Bool {
extension LambdaRuntimeError: Equatable {
public static func == (lhs: Self, rhs: Self) -> Bool {
// technically incorrect, but good enough for our tests
String(describing: lhs) == String(describing: rhs)
}
@@ -43,7 +43,7 @@ class CodableLambdaTest: XCTestCase {
var expected: Request?
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<Handler> {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeSucceededFuture(Handler())
}
@@ -72,7 +72,7 @@ class CodableLambdaTest: XCTestCase {
var expected: Request?
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<Handler> {
static func makeHandler(context: LambdaInitializationContext) -> EventLoopFuture<Handler> {
context.eventLoop.makeSucceededFuture(Handler())
}
@@ -99,7 +99,7 @@ class CodableLambdaTest: XCTestCase {
var expected: Request?
init(context: Lambda.InitializationContext) async throws {}
init(context: LambdaInitializationContext) async throws {}
func handle(_ event: Request, context: LambdaContext) async throws {
XCTAssertEqual(event, self.expected)
@@ -128,7 +128,7 @@ class CodableLambdaTest: XCTestCase {
var expected: Request?
init(context: Lambda.InitializationContext) async throws {}
init(context: LambdaInitializationContext) async throws {}
func handle(_ event: Request, context: LambdaContext) async throws -> Response {
XCTAssertEqual(event, self.expected)
@@ -168,8 +168,8 @@ class CodableLambdaTest: XCTestCase {
)
}
func newInitContext() -> Lambda.InitializationContext {
Lambda.InitializationContext(
func newInitContext() -> LambdaInitializationContext {
LambdaInitializationContext(
logger: Logger(label: "test"),
eventLoop: self.eventLoopGroup.next(),
allocator: ByteBufferAllocator(),
+4 -4
View File
@@ -33,7 +33,7 @@ class LambdaTestingTests: XCTestCase {
typealias Event = Request
typealias Output = Response
init(context: Lambda.InitializationContext) {}
init(context: LambdaInitializationContext) {}
func handle(_ event: Request, context: LambdaContext) async throws -> Response {
Response(message: "echo" + event.name)
@@ -57,7 +57,7 @@ class LambdaTestingTests: XCTestCase {
typealias Event = Request
typealias Output = Void
init(context: Lambda.InitializationContext) {}
init(context: LambdaInitializationContext) {}
func handle(_ event: Request, context: LambdaContext) async throws {
LambdaTestingTests.VoidLambdaHandlerInvokeCount += 1
@@ -77,7 +77,7 @@ class LambdaTestingTests: XCTestCase {
typealias Event = String
typealias Output = Void
init(context: Lambda.InitializationContext) {}
init(context: LambdaInitializationContext) {}
func handle(_ event: String, context: LambdaContext) async throws {
throw MyError()
@@ -94,7 +94,7 @@ class LambdaTestingTests: XCTestCase {
typealias Event = String
typealias Output = String
init(context: Lambda.InitializationContext) {}
init(context: LambdaInitializationContext) {}
func handle(_ event: String, context: LambdaContext) async throws -> String {
try await Task.sleep(nanoseconds: 500 * 1000 * 1000)