mirror of
https://github.com/swift-server/swift-aws-lambda-runtime.git
synced 2026-05-03 07:22:27 +00:00
termination handler (#251)
motivation: make it simpler to register shutdown hooks changes: * introduce Terminator helper that allow registering and de-registaring shutdown handlers * expose the new terminator hanler on the InitializationContext and deprecate ShutdownContext * deprecate the Handler::shutdown protocol requirment * update the runtime code to use the new terminator instead of calling shutdown on the handler * add and adjust tests
This commit is contained in:
@@ -38,10 +38,14 @@ extension Lambda {
|
||||
/// `ByteBufferAllocator` to allocate `ByteBuffer`
|
||||
public let allocator: ByteBufferAllocator
|
||||
|
||||
init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator) {
|
||||
/// `Terminator` to register shutdown operations
|
||||
public let terminator: LambdaTerminator
|
||||
|
||||
init(logger: Logger, eventLoop: EventLoop, allocator: ByteBufferAllocator, terminator: LambdaTerminator) {
|
||||
self.eventLoop = eventLoop
|
||||
self.logger = logger
|
||||
self.allocator = allocator
|
||||
self.terminator = terminator
|
||||
}
|
||||
|
||||
/// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning.
|
||||
@@ -52,7 +56,8 @@ extension Lambda {
|
||||
InitializationContext(
|
||||
logger: logger,
|
||||
eventLoop: eventLoop,
|
||||
allocator: ByteBufferAllocator()
|
||||
allocator: ByteBufferAllocator(),
|
||||
terminator: LambdaTerminator()
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -205,27 +210,3 @@ public struct LambdaContext: CustomDebugStringConvertible {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - ShutdownContext
|
||||
|
||||
extension Lambda {
|
||||
/// Lambda runtime shutdown context.
|
||||
/// The Lambda runtime generates and passes the `ShutdownContext` to the Lambda handler as an argument.
|
||||
public final class ShutdownContext {
|
||||
/// `Logger` to log with
|
||||
///
|
||||
/// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable.
|
||||
public let logger: Logger
|
||||
|
||||
/// The `EventLoop` the Lambda is executed on. Use this to schedule work with.
|
||||
///
|
||||
/// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care.
|
||||
/// Most importantly the `EventLoop` must never be blocked.
|
||||
public let eventLoop: EventLoop
|
||||
|
||||
internal init(logger: Logger, eventLoop: EventLoop) {
|
||||
self.eventLoop = eventLoop
|
||||
self.logger = logger
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,19 +176,6 @@ public protocol ByteBufferLambdaHandler {
|
||||
/// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine.
|
||||
/// The `EventLoopFuture` should be completed with either a response encoded as `ByteBuffer` or an `Error`
|
||||
func handle(_ event: ByteBuffer, context: LambdaContext) -> EventLoopFuture<ByteBuffer?>
|
||||
|
||||
/// Clean up the Lambda resources asynchronously.
|
||||
/// Concrete Lambda handlers implement this method to shutdown resources like `HTTPClient`s and database connections.
|
||||
///
|
||||
/// - Note: In case your Lambda fails while creating your LambdaHandler in the `HandlerFactory`, this method
|
||||
/// **is not invoked**. In this case you must cleanup the created resources immediately in the `HandlerFactory`.
|
||||
func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void>
|
||||
}
|
||||
|
||||
extension ByteBufferLambdaHandler {
|
||||
public func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void> {
|
||||
context.eventLoop.makeSucceededFuture(())
|
||||
}
|
||||
}
|
||||
|
||||
extension ByteBufferLambdaHandler {
|
||||
|
||||
@@ -34,13 +34,16 @@ extension Lambda {
|
||||
/// Run the user provided initializer. This *must* only be called once.
|
||||
///
|
||||
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization.
|
||||
func initialize<Handler: ByteBufferLambdaHandler>(logger: Logger, handlerType: Handler.Type) -> EventLoopFuture<Handler> {
|
||||
func initialize<Handler: ByteBufferLambdaHandler>(logger: Logger, terminator: LambdaTerminator, handlerType: Handler.Type) -> EventLoopFuture<Handler> {
|
||||
logger.debug("initializing lambda")
|
||||
// 1. create the handler from the factory
|
||||
// 2. report initialization error if one occured
|
||||
let context = InitializationContext(logger: logger,
|
||||
eventLoop: self.eventLoop,
|
||||
allocator: self.allocator)
|
||||
// 2. report initialization error if one occurred
|
||||
let context = InitializationContext(
|
||||
logger: logger,
|
||||
eventLoop: self.eventLoop,
|
||||
allocator: self.allocator,
|
||||
terminator: terminator
|
||||
)
|
||||
return Handler.makeHandler(context: context)
|
||||
// Hopping back to "our" EventLoop is important in case the factory returns a future
|
||||
// that originated from a foreign EventLoop/EventLoopGroup.
|
||||
|
||||
@@ -74,23 +74,22 @@ public final class LambdaRuntime<Handler: ByteBufferLambdaHandler> {
|
||||
|
||||
var logger = self.logger
|
||||
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
|
||||
let terminator = LambdaTerminator()
|
||||
let runner = Lambda.Runner(eventLoop: self.eventLoop, configuration: self.configuration)
|
||||
|
||||
let startupFuture = runner.initialize(logger: logger, handlerType: Handler.self)
|
||||
startupFuture.flatMap { handler -> EventLoopFuture<(Handler, Result<Int, Error>)> in
|
||||
let startupFuture = runner.initialize(logger: logger, terminator: terminator, handlerType: Handler.self)
|
||||
startupFuture.flatMap { handler -> EventLoopFuture<Result<Int, Error>> in
|
||||
// after the startup future has succeeded, we have a handler that we can use
|
||||
// to `run` the lambda.
|
||||
let finishedPromise = self.eventLoop.makePromise(of: Int.self)
|
||||
self.state = .active(runner, handler)
|
||||
self.run(promise: finishedPromise)
|
||||
return finishedPromise.futureResult.mapResult { (handler, $0) }
|
||||
}
|
||||
.flatMap { handler, runnerResult -> EventLoopFuture<Int> in
|
||||
return finishedPromise.futureResult.mapResult { $0 }
|
||||
}.flatMap { runnerResult -> EventLoopFuture<Int> in
|
||||
// after the lambda finishPromise has succeeded or failed we need to
|
||||
// shutdown the handler
|
||||
let shutdownContext = Lambda.ShutdownContext(logger: logger, eventLoop: self.eventLoop)
|
||||
return handler.shutdown(context: shutdownContext).flatMapErrorThrowing { error in
|
||||
// if, we had an error shuting down the lambda, we want to concatenate it with
|
||||
terminator.terminate(eventLoop: self.eventLoop).flatMapErrorThrowing { error in
|
||||
// if, we had an error shutting down the handler, we want to concatenate it with
|
||||
// the runner result
|
||||
logger.error("Error shutting down handler: \(error)")
|
||||
throw Lambda.RuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult)
|
||||
|
||||
@@ -0,0 +1,139 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the SwiftAWSLambdaRuntime open source project
|
||||
//
|
||||
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import NIOConcurrencyHelpers
|
||||
import NIOCore
|
||||
|
||||
/// Lambda terminator.
|
||||
/// Utility to manage the lambda shutdown sequence.
|
||||
public final class LambdaTerminator {
|
||||
private typealias Handler = (EventLoop) -> EventLoopFuture<Void>
|
||||
|
||||
private var storage: Storage
|
||||
|
||||
init() {
|
||||
self.storage = Storage()
|
||||
}
|
||||
|
||||
/// Register a shutdown handler with the terminator
|
||||
///
|
||||
/// - parameters:
|
||||
/// - name: Display name for logging purposes
|
||||
/// - handler: The shutdown handler to call when terminating the Lambda.
|
||||
/// Shutdown handlers are called in the reverse order of being registered.
|
||||
///
|
||||
/// - Returns: A `RegistrationKey` that can be used to de-register the handler when its no longer needed.
|
||||
@discardableResult
|
||||
public func register(name: String, handler: @escaping (EventLoop) -> EventLoopFuture<Void>) -> RegistrationKey {
|
||||
let key = RegistrationKey()
|
||||
self.storage.add(key: key, name: name, handler: handler)
|
||||
return key
|
||||
}
|
||||
|
||||
/// De-register a shutdown handler with the terminator
|
||||
///
|
||||
/// - parameters:
|
||||
/// - key: A `RegistrationKey` obtained from calling the register API.
|
||||
public func deregister(_ key: RegistrationKey) {
|
||||
self.storage.remove(key)
|
||||
}
|
||||
|
||||
/// Begin the termination cycle
|
||||
/// Shutdown handlers are called in the reverse order of being registered.
|
||||
///
|
||||
/// - parameters:
|
||||
/// - eventLoop: The `EventLoop` to run the termination on.
|
||||
///
|
||||
/// - Returns: An `EventLoopFuture` with the result of the termination cycle.
|
||||
internal func terminate(eventLoop: EventLoop) -> EventLoopFuture<Void> {
|
||||
func terminate(_ iterator: IndexingIterator<[(name: String, handler: Handler)]>, errors: [Error], promise: EventLoopPromise<Void>) {
|
||||
var iterator = iterator
|
||||
guard let handler = iterator.next()?.handler else {
|
||||
if errors.isEmpty {
|
||||
return promise.succeed(())
|
||||
} else {
|
||||
return promise.fail(TerminationError(underlying: errors))
|
||||
}
|
||||
}
|
||||
handler(eventLoop).whenComplete { result in
|
||||
var errors = errors
|
||||
if case .failure(let error) = result {
|
||||
errors.append(error)
|
||||
}
|
||||
return terminate(iterator, errors: errors, promise: promise)
|
||||
}
|
||||
}
|
||||
|
||||
// terminate in cascading, reverse order
|
||||
let promise = eventLoop.makePromise(of: Void.self)
|
||||
terminate(self.storage.handlers.reversed().makeIterator(), errors: [], promise: promise)
|
||||
return promise.futureResult
|
||||
}
|
||||
}
|
||||
|
||||
extension LambdaTerminator {
|
||||
/// Lambda terminator registration key.
|
||||
public struct RegistrationKey: Hashable, CustomStringConvertible {
|
||||
var value: String
|
||||
|
||||
init() {
|
||||
// UUID basically
|
||||
self.value = LambdaRequestID().uuidString
|
||||
}
|
||||
|
||||
public var description: String {
|
||||
self.value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension LambdaTerminator {
|
||||
private final class Storage {
|
||||
private let lock: Lock
|
||||
private var index: [RegistrationKey]
|
||||
private var map: [RegistrationKey: (name: String, handler: Handler)]
|
||||
|
||||
init() {
|
||||
self.lock = .init()
|
||||
self.index = []
|
||||
self.map = [:]
|
||||
}
|
||||
|
||||
func add(key: RegistrationKey, name: String, handler: @escaping Handler) {
|
||||
self.lock.withLock {
|
||||
self.index.append(key)
|
||||
self.map[key] = (name: name, handler: handler)
|
||||
}
|
||||
}
|
||||
|
||||
func remove(_ key: RegistrationKey) {
|
||||
self.lock.withLock {
|
||||
self.index = self.index.filter { $0 != key }
|
||||
self.map[key] = nil
|
||||
}
|
||||
}
|
||||
|
||||
var handlers: [(name: String, handler: Handler)] {
|
||||
self.lock.withLock {
|
||||
self.index.compactMap { self.map[$0] }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension LambdaTerminator {
|
||||
struct TerminationError: Error {
|
||||
let underlying: [Error]
|
||||
}
|
||||
}
|
||||
@@ -51,8 +51,7 @@ extension Lambda {
|
||||
public init(requestID: String = "\(DispatchTime.now().uptimeNanoseconds)",
|
||||
traceID: String = "Root=\(DispatchTime.now().uptimeNanoseconds);Parent=\(DispatchTime.now().uptimeNanoseconds);Sampled=1",
|
||||
invokedFunctionARN: String = "arn:aws:lambda:us-west-1:\(DispatchTime.now().uptimeNanoseconds):function:custom-runtime",
|
||||
timeout: DispatchTimeInterval = .seconds(5))
|
||||
{
|
||||
timeout: DispatchTimeInterval = .seconds(5)) {
|
||||
self.requestID = requestID
|
||||
self.traceID = traceID
|
||||
self.invokedFunctionARN = invokedFunctionARN
|
||||
|
||||
@@ -66,23 +66,37 @@ class LambdaRuntimeTest: XCTestCase {
|
||||
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
|
||||
|
||||
struct ShutdownError: Error {}
|
||||
struct ShutdownError: Error {
|
||||
let description: String
|
||||
}
|
||||
|
||||
struct ShutdownErrorHandler: EventLoopLambdaHandler {
|
||||
typealias Event = String
|
||||
typealias Output = Void
|
||||
|
||||
static func makeHandler(context: Lambda.InitializationContext) -> EventLoopFuture<ShutdownErrorHandler> {
|
||||
context.eventLoop.makeSucceededFuture(ShutdownErrorHandler())
|
||||
// register shutdown operation
|
||||
context.terminator.register(name: "test 1", handler: { eventLoop in
|
||||
eventLoop.makeFailedFuture(ShutdownError(description: "error 1"))
|
||||
})
|
||||
context.terminator.register(name: "test 2", handler: { eventLoop in
|
||||
eventLoop.makeSucceededVoidFuture()
|
||||
})
|
||||
context.terminator.register(name: "test 3", handler: { eventLoop in
|
||||
eventLoop.makeFailedFuture(ShutdownError(description: "error 2"))
|
||||
})
|
||||
context.terminator.register(name: "test 4", handler: { eventLoop in
|
||||
eventLoop.makeSucceededVoidFuture()
|
||||
})
|
||||
context.terminator.register(name: "test 5", handler: { eventLoop in
|
||||
eventLoop.makeFailedFuture(ShutdownError(description: "error 3"))
|
||||
})
|
||||
return context.eventLoop.makeSucceededFuture(ShutdownErrorHandler())
|
||||
}
|
||||
|
||||
func handle(_ event: String, context: LambdaContext) -> EventLoopFuture<Void> {
|
||||
context.eventLoop.makeSucceededVoidFuture()
|
||||
}
|
||||
|
||||
func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void> {
|
||||
context.eventLoop.makeFailedFuture(ShutdownError())
|
||||
}
|
||||
}
|
||||
|
||||
let eventLoop = eventLoopGroup.next()
|
||||
@@ -95,7 +109,11 @@ class LambdaRuntimeTest: XCTestCase {
|
||||
XCTFail("Unexpected error: \(error)"); return
|
||||
}
|
||||
|
||||
XCTAssert(shutdownError is ShutdownError)
|
||||
XCTAssertEqual(shutdownError as? LambdaTerminator.TerminationError, LambdaTerminator.TerminationError(underlying: [
|
||||
ShutdownError(description: "error 3"),
|
||||
ShutdownError(description: "error 2"),
|
||||
ShutdownError(description: "error 1"),
|
||||
]))
|
||||
XCTAssertEqual(runtimeError as? Lambda.RuntimeError, .badStatusCode(.internalServerError))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,10 +23,11 @@ func runLambda<Handler: ByteBufferLambdaHandler>(behavior: LambdaServerBehavior,
|
||||
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
|
||||
let logger = Logger(label: "TestLogger")
|
||||
let configuration = Lambda.Configuration(runtimeEngine: .init(requestTimeout: .milliseconds(100)))
|
||||
let terminator = LambdaTerminator()
|
||||
let runner = Lambda.Runner(eventLoop: eventLoopGroup.next(), configuration: configuration)
|
||||
let server = try MockLambdaServer(behavior: behavior).start().wait()
|
||||
defer { XCTAssertNoThrow(try server.stop().wait()) }
|
||||
try runner.initialize(logger: logger, handlerType: handlerType).flatMap { handler in
|
||||
try runner.initialize(logger: logger, terminator: terminator, handlerType: handlerType).flatMap { handler in
|
||||
runner.run(logger: logger, handler: handler)
|
||||
}.wait()
|
||||
}
|
||||
@@ -66,3 +67,13 @@ extension Lambda.RuntimeError: Equatable {
|
||||
String(describing: lhs) == String(describing: rhs)
|
||||
}
|
||||
}
|
||||
|
||||
extension LambdaTerminator.TerminationError: Equatable {
|
||||
public static func == (lhs: Self, rhs: Self) -> Bool {
|
||||
guard lhs.underlying.count == rhs.underlying.count else {
|
||||
return false
|
||||
}
|
||||
// technically incorrect, but good enough for our tests
|
||||
return String(describing: lhs) == String(describing: rhs)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,7 +172,8 @@ class CodableLambdaTest: XCTestCase {
|
||||
Lambda.InitializationContext(
|
||||
logger: Logger(label: "test"),
|
||||
eventLoop: self.eventLoopGroup.next(),
|
||||
allocator: ByteBufferAllocator()
|
||||
allocator: ByteBufferAllocator(),
|
||||
terminator: LambdaTerminator()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user