mirror of
https://github.com/swift-server/swift-aws-lambda-runtime.git
synced 2026-05-03 07:22:27 +00:00
This is a proposal to fix issue #507 **changes** - `LambdaRuntime.init()` uses a `Mutex<Bool>` to make sure only one instance is created - `LambdaRuntime.init()` can now throw an error in case an instance already exists (I did not use `fatalError()` to make it easier to test) - All `convenience init()` methods catch possible errors instead of re-throwing it to a void breaking the user-facing API - Renamed existing `LambdaRuntimeError` to `LambdaRuntimeClientError` - Introduced a new type `LambdaRuntimeError` to represent the double initialization error --------- Co-authored-by: Fabian Fett <fabianfett@apple.com> Co-authored-by: Adam Fowler <adamfowler71@gmail.com>
This commit is contained in:
committed by
GitHub
parent
bf2385ae08
commit
344d30b401
@@ -15,10 +15,7 @@ let package = Package(
|
||||
// during CI, the dependency on local version of swift-aws-lambda-runtime is added dynamically below
|
||||
.package(
|
||||
url: "https://github.com/swift-server/swift-aws-lambda-runtime.git",
|
||||
branch: "ff-package-traits",
|
||||
traits: [
|
||||
.trait(name: "FoundationJSONSupport")
|
||||
]
|
||||
branch: "main"
|
||||
)
|
||||
],
|
||||
targets: [
|
||||
|
||||
@@ -48,11 +48,9 @@ extension Lambda {
|
||||
@usableFromInline
|
||||
static func withLocalServer(
|
||||
invocationEndpoint: String? = nil,
|
||||
logger: Logger,
|
||||
_ body: sending @escaping () async throws -> Void
|
||||
) async throws {
|
||||
var logger = Logger(label: "LocalServer")
|
||||
logger.logLevel = Lambda.env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info
|
||||
|
||||
try await LambdaHTTPServer.withLocalServer(
|
||||
invocationEndpoint: invocationEndpoint,
|
||||
logger: logger
|
||||
@@ -93,7 +91,7 @@ internal struct LambdaHTTPServer {
|
||||
case serverReturned(Swift.Result<Void, any Error>)
|
||||
}
|
||||
|
||||
struct UnsafeTransferBox<Value>: @unchecked Sendable {
|
||||
fileprivate struct UnsafeTransferBox<Value>: @unchecked Sendable {
|
||||
let value: Value
|
||||
|
||||
init(value: sending Value) {
|
||||
@@ -133,6 +131,7 @@ internal struct LambdaHTTPServer {
|
||||
}
|
||||
}
|
||||
|
||||
// it's ok to keep this at `info` level because it is only used for local testing and unit tests
|
||||
logger.info(
|
||||
"Server started and listening",
|
||||
metadata: [
|
||||
@@ -202,12 +201,18 @@ internal struct LambdaHTTPServer {
|
||||
return result
|
||||
|
||||
case .serverReturned(let result):
|
||||
logger.error(
|
||||
"Server shutdown before closure completed",
|
||||
metadata: [
|
||||
"error": "\(result.maybeError != nil ? "\(result.maybeError!)" : "none")"
|
||||
]
|
||||
)
|
||||
|
||||
if result.maybeError is CancellationError {
|
||||
logger.trace("Server's task cancelled")
|
||||
} else {
|
||||
logger.error(
|
||||
"Server shutdown before closure completed",
|
||||
metadata: [
|
||||
"error": "\(result.maybeError != nil ? "\(result.maybeError!)" : "none")"
|
||||
]
|
||||
)
|
||||
}
|
||||
|
||||
switch await group.next()! {
|
||||
case .closureResult(let result):
|
||||
return result
|
||||
@@ -265,9 +270,12 @@ internal struct LambdaHTTPServer {
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch let error as CancellationError {
|
||||
logger.trace("The task was cancelled", metadata: ["error": "\(error)"])
|
||||
} catch {
|
||||
logger.error("Hit error: \(error)")
|
||||
}
|
||||
|
||||
} onCancel: {
|
||||
channel.channel.close(promise: nil)
|
||||
}
|
||||
|
||||
@@ -13,8 +13,8 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import Logging
|
||||
import NIOConcurrencyHelpers
|
||||
import NIOCore
|
||||
import Synchronization
|
||||
|
||||
#if canImport(FoundationEssentials)
|
||||
import FoundationEssentials
|
||||
@@ -22,13 +22,14 @@ import FoundationEssentials
|
||||
import Foundation
|
||||
#endif
|
||||
|
||||
// We need `@unchecked` Sendable here, as `NIOLockedValueBox` does not understand `sending` today.
|
||||
// We don't want to use `NIOLockedValueBox` here anyway. We would love to use Mutex here, but this
|
||||
// sadly crashes the compiler today.
|
||||
public final class LambdaRuntime<Handler>: @unchecked Sendable where Handler: StreamingLambdaHandler {
|
||||
// TODO: We want to change this to Mutex as soon as this doesn't crash the Swift compiler on Linux anymore
|
||||
// This is our guardian to ensure only one LambdaRuntime is running at the time
|
||||
// We use an Atomic here to ensure thread safety
|
||||
private let _isRunning = Atomic<Bool>(false)
|
||||
|
||||
public final class LambdaRuntime<Handler>: Sendable where Handler: StreamingLambdaHandler {
|
||||
@usableFromInline
|
||||
let handlerMutex: NIOLockedValueBox<Handler?>
|
||||
/// we protect the handler behind a Mutex to ensure that we only ever have one copy of it
|
||||
let handlerStorage: SendingStorage<Handler>
|
||||
@usableFromInline
|
||||
let logger: Logger
|
||||
@usableFromInline
|
||||
@@ -39,7 +40,7 @@ public final class LambdaRuntime<Handler>: @unchecked Sendable where Handler: St
|
||||
eventLoop: EventLoop = Lambda.defaultEventLoop,
|
||||
logger: Logger = Logger(label: "LambdaRuntime")
|
||||
) {
|
||||
self.handlerMutex = NIOLockedValueBox(handler)
|
||||
self.handlerStorage = SendingStorage(handler)
|
||||
self.eventLoop = eventLoop
|
||||
|
||||
// by setting the log level here, we understand it can not be changed dynamically at runtime
|
||||
@@ -62,14 +63,24 @@ public final class LambdaRuntime<Handler>: @unchecked Sendable where Handler: St
|
||||
}
|
||||
#endif
|
||||
|
||||
@inlinable
|
||||
/// Make sure only one run() is called at a time
|
||||
// @inlinable
|
||||
internal func _run() async throws {
|
||||
let handler = self.handlerMutex.withLockedValue { handler in
|
||||
let result = handler
|
||||
handler = nil
|
||||
return result
|
||||
|
||||
// we use an atomic global variable to ensure only one LambdaRuntime is running at the time
|
||||
let (_, original) = _isRunning.compareExchange(expected: false, desired: true, ordering: .acquiringAndReleasing)
|
||||
|
||||
// if the original value was already true, run() is already running
|
||||
if original {
|
||||
throw LambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce)
|
||||
}
|
||||
|
||||
defer {
|
||||
_isRunning.store(false, ordering: .releasing)
|
||||
}
|
||||
|
||||
// The handler can be non-sendable, we want to ensure we only ever have one copy of it
|
||||
let handler = try? self.handlerStorage.get()
|
||||
guard let handler else {
|
||||
throw LambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce)
|
||||
}
|
||||
@@ -100,8 +111,10 @@ public final class LambdaRuntime<Handler>: @unchecked Sendable where Handler: St
|
||||
#if LocalServerSupport
|
||||
// we're not running on Lambda and we're compiled in DEBUG mode,
|
||||
// let's start a local server for testing
|
||||
try await Lambda.withLocalServer(invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT"))
|
||||
{
|
||||
try await Lambda.withLocalServer(
|
||||
invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT"),
|
||||
logger: self.logger
|
||||
) {
|
||||
|
||||
try await LambdaRuntimeClient.withRuntimeClient(
|
||||
configuration: .init(ip: "127.0.0.1", port: 7000),
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
package struct LambdaRuntimeError: Error {
|
||||
@usableFromInline
|
||||
package enum Code: Sendable {
|
||||
/// internal error codes for LambdaRuntimeClient
|
||||
case closingRuntimeClient
|
||||
|
||||
case connectionToControlPlaneLost
|
||||
|
||||
@@ -13,8 +13,11 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import Dispatch
|
||||
import NIOConcurrencyHelpers
|
||||
import NIOPosix
|
||||
|
||||
// import Synchronization
|
||||
|
||||
enum Consts {
|
||||
static let apiPrefix = "/2018-06-01"
|
||||
static let invocationURLPrefix = "\(apiPrefix)/runtime/invocation"
|
||||
@@ -132,3 +135,34 @@ extension AmazonHeaders {
|
||||
return "\(version)-\(datePadding)\(dateValue)-\(identifier)"
|
||||
}
|
||||
}
|
||||
|
||||
/// Temporary storage for value being sent from one isolation domain to another
|
||||
// use NIOLockedValueBox instead of Mutex to avoid compiler crashes on 6.0
|
||||
// see https://github.com/swiftlang/swift/issues/78048
|
||||
@usableFromInline
|
||||
struct SendingStorage<Value>: ~Copyable, @unchecked Sendable {
|
||||
@usableFromInline
|
||||
struct ValueAlreadySentError: Error {
|
||||
@usableFromInline
|
||||
init() {}
|
||||
}
|
||||
|
||||
@usableFromInline
|
||||
// let storage: Mutex<Value?>
|
||||
let storage: NIOLockedValueBox<Value?>
|
||||
|
||||
@inlinable
|
||||
init(_ value: sending Value) {
|
||||
self.storage = .init(value)
|
||||
}
|
||||
|
||||
@inlinable
|
||||
func get() throws -> Value {
|
||||
// try self.storage.withLock {
|
||||
try self.storage.withLockedValue {
|
||||
guard let value = $0 else { throw ValueAlreadySentError() }
|
||||
$0 = nil
|
||||
return value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,6 +123,7 @@ struct HttpServer {
|
||||
}
|
||||
}
|
||||
}
|
||||
// it's ok to keep this at `info` level because it is only used for local testing and unit tests
|
||||
logger.info("Server shutting down")
|
||||
}
|
||||
|
||||
|
||||
@@ -147,12 +147,11 @@ struct LambdaRuntimeClientTests {
|
||||
(event: String, context: LambdaContext) in
|
||||
"Hello \(event)"
|
||||
}
|
||||
var logger = Logger(label: "LambdaRuntime")
|
||||
logger.logLevel = .debug
|
||||
|
||||
let serviceGroup = ServiceGroup(
|
||||
services: [runtime],
|
||||
gracefulShutdownSignals: [.sigterm, .sigint],
|
||||
logger: logger
|
||||
logger: Logger(label: "TestLambdaRuntimeGracefulShutdown")
|
||||
)
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
group.addTask {
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the SwiftAWSLambdaRuntime open source project
|
||||
//
|
||||
// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import Foundation
|
||||
import Logging
|
||||
import NIOCore
|
||||
import Synchronization
|
||||
import Testing
|
||||
|
||||
@testable import AWSLambdaRuntime
|
||||
|
||||
@Suite("LambdaRuntimeTests")
|
||||
struct LambdaRuntimeTests {
|
||||
|
||||
@Test("LambdaRuntime can only be run once")
|
||||
func testLambdaRuntimerunOnce() async throws {
|
||||
|
||||
// First runtime
|
||||
let runtime1 = LambdaRuntime(
|
||||
handler: MockHandler(),
|
||||
eventLoop: Lambda.defaultEventLoop,
|
||||
logger: Logger(label: "LambdaRuntimeTests.Runtime1")
|
||||
)
|
||||
|
||||
// Second runtime
|
||||
let runtime2 = LambdaRuntime(
|
||||
handler: MockHandler(),
|
||||
eventLoop: Lambda.defaultEventLoop,
|
||||
logger: Logger(label: "LambdaRuntimeTests.Runtime2")
|
||||
)
|
||||
|
||||
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
|
||||
// start the first runtime
|
||||
taskGroup.addTask {
|
||||
// ChannelError will be thrown when we cancel the task group
|
||||
await #expect(throws: ChannelError.self) {
|
||||
try await runtime1.run()
|
||||
}
|
||||
}
|
||||
|
||||
// wait a small amount to ensure runtime1 task is started
|
||||
try await Task.sleep(for: .seconds(1))
|
||||
|
||||
// Running the second runtime should trigger LambdaRuntimeError
|
||||
await #expect(throws: LambdaRuntimeError.self) {
|
||||
try await runtime2.run()
|
||||
}
|
||||
|
||||
// cancel runtime 1 / task 1
|
||||
taskGroup.cancelAll()
|
||||
}
|
||||
|
||||
// Running the second runtime should work now
|
||||
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
|
||||
taskGroup.addTask {
|
||||
// ChannelError will be thrown when we cancel the task group
|
||||
await #expect(throws: ChannelError.self) {
|
||||
try await runtime2.run()
|
||||
}
|
||||
}
|
||||
|
||||
// Set timeout and cancel the runtime 2
|
||||
try await Task.sleep(for: .seconds(2))
|
||||
taskGroup.cancelAll()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct MockHandler: StreamingLambdaHandler {
|
||||
mutating func handle(
|
||||
_ event: NIOCore.ByteBuffer,
|
||||
responseWriter: some AWSLambdaRuntime.LambdaResponseStreamWriter,
|
||||
context: AWSLambdaRuntime.LambdaContext
|
||||
) async throws {
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user