mirror of
https://github.com/swift-server/swift-aws-lambda-runtime.git
synced 2026-05-03 07:22:27 +00:00
fef8c0d130
Fix the issue described at https://github.com/awslabs/swift-aws-lambda-runtime/issues/640 Here is the proposed fix: I added a new function `assumeIsolatedOnEventLoop` — a nonisolated method that: 1. Calls `self.eventLoop.preconditionInEventLoop()` to verify we're on the correct event loop (NIO's own thread-identity check, which always works) 2. Uses `unsafeBitCast` to strip the isolated annotation, the same pattern NIO uses internally and that I found on the Swift Forums. See: https://github.com/swiftlang/swift/blob/main/stdlib/public/Concurrency/ExecutorAssertions.swift#L348 See: https://forums.swift.org/t/actor-assumeisolated-erroneously-crashes-when-using-a-dispatch-queue-as-the-underlying-executor/72434/3 --------- Co-authored-by: Sebastien Stormacq <stormacq@amazon.lu> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
549 lines
22 KiB
Swift
549 lines
22 KiB
Swift
//===----------------------------------------------------------------------===//
|
|
//
|
|
// This source file is part of the SwiftAWSLambdaRuntime open source project
|
|
//
|
|
// Copyright SwiftAWSLambdaRuntime project authors
|
|
// Copyright (c) Amazon.com, Inc. or its affiliates.
|
|
// Licensed under Apache License v2.0
|
|
//
|
|
// See LICENSE.txt for license information
|
|
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
|
|
//
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
|
|
import Logging
|
|
import NIOCore
|
|
import NIOHTTP1
|
|
import NIOPosix
|
|
|
|
@available(LambdaSwift 2.0, *)
|
|
@usableFromInline
|
|
final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
|
|
@usableFromInline
|
|
nonisolated let unownedExecutor: UnownedSerialExecutor
|
|
|
|
@usableFromInline
|
|
struct Configuration: Sendable {
|
|
var ip: String
|
|
var port: Int
|
|
|
|
@usableFromInline
|
|
init(ip: String, port: Int) {
|
|
self.ip = ip
|
|
self.port = port
|
|
}
|
|
}
|
|
|
|
@usableFromInline
|
|
struct Writer: LambdaRuntimeClientResponseStreamWriter, Sendable {
|
|
private var runtimeClient: LambdaRuntimeClient
|
|
|
|
fileprivate init(runtimeClient: LambdaRuntimeClient) {
|
|
self.runtimeClient = runtimeClient
|
|
}
|
|
|
|
@usableFromInline
|
|
func write(_ buffer: NIOCore.ByteBuffer, hasCustomHeaders: Bool = false) async throws {
|
|
try await self.runtimeClient.write(buffer, hasCustomHeaders: hasCustomHeaders)
|
|
}
|
|
|
|
@usableFromInline
|
|
func finish() async throws {
|
|
try await self.runtimeClient.writeAndFinish(nil)
|
|
}
|
|
|
|
@usableFromInline
|
|
func writeAndFinish(_ buffer: NIOCore.ByteBuffer) async throws {
|
|
try await self.runtimeClient.writeAndFinish(buffer)
|
|
}
|
|
|
|
@usableFromInline
|
|
func reportError(_ error: any Error) async throws {
|
|
try await self.runtimeClient.reportError(error)
|
|
}
|
|
}
|
|
|
|
private typealias ConnectionContinuation = CheckedContinuation<
|
|
NIOLoopBound<LambdaChannelHandler<LambdaRuntimeClient>>, any Error
|
|
>
|
|
|
|
private enum ConnectionState {
|
|
case disconnected
|
|
case connecting([ConnectionContinuation])
|
|
case connected(Channel, LambdaChannelHandler<LambdaRuntimeClient>)
|
|
}
|
|
|
|
enum LambdaState {
|
|
/// this is the "normal" state. Transitions to `waitingForNextInvocation`
|
|
case idle(previousRequestID: String?)
|
|
/// this is the state while we wait for an invocation. A next call is running.
|
|
/// Transitions to `waitingForResponse`
|
|
case waitingForNextInvocation
|
|
/// The invocation was forwarded to the handler and we wait for a response.
|
|
/// Transitions to `sendingResponse` or `sentResponse`.
|
|
case waitingForResponse(requestID: String)
|
|
case sendingResponse(requestID: String)
|
|
case sentResponse(requestID: String)
|
|
}
|
|
|
|
enum ClosingState {
|
|
case notClosing
|
|
case closing(CheckedContinuation<Void, Never>)
|
|
case closed
|
|
}
|
|
|
|
private let eventLoop: any EventLoop
|
|
private let logger: Logger
|
|
private let configuration: Configuration
|
|
|
|
private var connectionState: ConnectionState = .disconnected
|
|
|
|
private var lambdaState: LambdaState = .idle(previousRequestID: nil)
|
|
private var closingState: ClosingState = .notClosing
|
|
|
|
// connections that are currently being closed. In the `run` method we must await all of them
|
|
// being fully closed before we can return from it.
|
|
private var closingConnections: [any Channel] = []
|
|
|
|
@inlinable
|
|
static func withRuntimeClient<Result>(
|
|
configuration: Configuration,
|
|
eventLoop: any EventLoop,
|
|
logger: Logger,
|
|
_ body: (LambdaRuntimeClient) async throws -> Result
|
|
) async throws -> Result {
|
|
let runtime = LambdaRuntimeClient(configuration: configuration, eventLoop: eventLoop, logger: logger)
|
|
let result: Swift.Result<Result, any Error>
|
|
do {
|
|
result = .success(try await body(runtime))
|
|
} catch {
|
|
result = .failure(error)
|
|
}
|
|
await runtime.close()
|
|
return try result.get()
|
|
}
|
|
|
|
@usableFromInline
|
|
init(configuration: Configuration, eventLoop: any EventLoop, logger: Logger) {
|
|
self.unownedExecutor = eventLoop.executor.asUnownedSerialExecutor()
|
|
self.configuration = configuration
|
|
self.eventLoop = eventLoop
|
|
self.logger = logger
|
|
}
|
|
|
|
/// Assume that the current context is isolated to this actor's event loop and execute the closure.
|
|
///
|
|
/// This is a workaround for `Actor.assumeIsolated` which can crash on open-source Swift toolchains
|
|
/// built with runtime assertions enabled. In those toolchains, `assumeIsolated` performs a strict
|
|
/// runtime check via `_taskIsCurrentExecutor` that fails when called from NIO callbacks
|
|
/// (e.g. `whenComplete`, channel handler methods) because there is no Swift Concurrency task
|
|
/// tracking the current executor in thread-local storage.
|
|
///
|
|
/// We use `eventLoop.preconditionInEventLoop()` as our safety check instead, then perform
|
|
/// the same unsafe cast that `assumeIsolated` does internally after its check passes.
|
|
/// See: https://github.com/swiftlang/swift/blob/main/stdlib/public/Concurrency/ExecutorAssertions.swift#L348
|
|
/// See: https://forums.swift.org/t/actor-assumeisolated-erroneously-crashes-when-using-a-dispatch-queue-as-the-underlying-executor/72434/3
|
|
private nonisolated func assumeIsolatedOnEventLoop(
|
|
_ operation: (isolated LambdaRuntimeClient) -> Void
|
|
) {
|
|
self.eventLoop.preconditionInEventLoop()
|
|
// This is safe: we verified we're on the event loop, which is this actor's executor.
|
|
withoutActuallyEscaping(operation) { escapingOperation in
|
|
let strippedOperation = unsafeBitCast(
|
|
escapingOperation,
|
|
to: ((LambdaRuntimeClient) -> Void).self
|
|
)
|
|
strippedOperation(self)
|
|
}
|
|
}
|
|
|
|
@usableFromInline
|
|
func close() async {
|
|
self.logger.trace("Close lambda runtime client")
|
|
|
|
guard case .notClosing = self.closingState else {
|
|
return
|
|
}
|
|
await withCheckedContinuation { continuation in
|
|
self.closingState = .closing(continuation)
|
|
|
|
switch self.connectionState {
|
|
case .disconnected:
|
|
if self.closingConnections.isEmpty {
|
|
return continuation.resume()
|
|
}
|
|
|
|
case .connecting(let continuations):
|
|
for continuation in continuations {
|
|
continuation.resume(throwing: LambdaRuntimeError(code: .closingRuntimeClient))
|
|
}
|
|
self.connectionState = .connecting([])
|
|
|
|
case .connected(let channel, _):
|
|
channel.close(mode: .all, promise: nil)
|
|
}
|
|
}
|
|
}
|
|
|
|
@usableFromInline
|
|
func nextInvocation() async throws -> (Invocation, Writer) {
|
|
|
|
try Task.checkCancellation()
|
|
|
|
return try await withTaskCancellationHandler {
|
|
switch self.lambdaState {
|
|
case .idle:
|
|
self.lambdaState = .waitingForNextInvocation
|
|
let handler = try await self.makeOrGetConnection()
|
|
let invocation = try await handler.nextInvocation()
|
|
|
|
guard case .waitingForNextInvocation = self.lambdaState else {
|
|
fatalError("Invalid state: \(self.lambdaState)")
|
|
}
|
|
self.lambdaState = .waitingForResponse(requestID: invocation.metadata.requestID)
|
|
return (invocation, Writer(runtimeClient: self))
|
|
|
|
case .waitingForNextInvocation,
|
|
.waitingForResponse,
|
|
.sendingResponse,
|
|
.sentResponse:
|
|
fatalError("Invalid state: \(self.lambdaState)")
|
|
}
|
|
} onCancel: {
|
|
Task {
|
|
await self.close()
|
|
}
|
|
}
|
|
}
|
|
|
|
private func write(_ buffer: NIOCore.ByteBuffer, hasCustomHeaders: Bool = false) async throws {
|
|
switch self.lambdaState {
|
|
case .idle, .sentResponse:
|
|
throw LambdaRuntimeError(code: .writeAfterFinishHasBeenSent)
|
|
|
|
case .waitingForNextInvocation:
|
|
fatalError("Invalid state: \(self.lambdaState)")
|
|
|
|
case .waitingForResponse(let requestID):
|
|
self.lambdaState = .sendingResponse(requestID: requestID)
|
|
fallthrough
|
|
|
|
case .sendingResponse(let requestID):
|
|
let handler = try await self.makeOrGetConnection()
|
|
guard case .sendingResponse(requestID) = self.lambdaState else {
|
|
fatalError("Invalid state: \(self.lambdaState)")
|
|
}
|
|
return try await handler.writeResponseBodyPart(
|
|
buffer,
|
|
requestID: requestID,
|
|
hasCustomHeaders: hasCustomHeaders
|
|
)
|
|
}
|
|
}
|
|
|
|
private func writeAndFinish(_ buffer: NIOCore.ByteBuffer?) async throws {
|
|
switch self.lambdaState {
|
|
case .idle, .sentResponse:
|
|
throw LambdaRuntimeError(code: .finishAfterFinishHasBeenSent)
|
|
|
|
case .waitingForNextInvocation:
|
|
fatalError("Invalid state: \(self.lambdaState)")
|
|
|
|
case .waitingForResponse(let requestID):
|
|
fallthrough
|
|
|
|
case .sendingResponse(let requestID):
|
|
self.lambdaState = .sentResponse(requestID: requestID)
|
|
let handler = try await self.makeOrGetConnection()
|
|
guard case .sentResponse(requestID) = self.lambdaState else {
|
|
fatalError("Invalid state: \(self.lambdaState)")
|
|
}
|
|
try await handler.finishResponseRequest(finalData: buffer, requestID: requestID)
|
|
guard case .sentResponse(requestID) = self.lambdaState else {
|
|
fatalError("Invalid state: \(self.lambdaState)")
|
|
}
|
|
self.lambdaState = .idle(previousRequestID: requestID)
|
|
}
|
|
}
|
|
|
|
private func reportError(_ error: any Error) async throws {
|
|
switch self.lambdaState {
|
|
case .idle, .waitingForNextInvocation, .sentResponse:
|
|
fatalError("Invalid state: \(self.lambdaState)")
|
|
|
|
case .waitingForResponse(let requestID):
|
|
fallthrough
|
|
|
|
case .sendingResponse(let requestID):
|
|
self.lambdaState = .sentResponse(requestID: requestID)
|
|
let handler = try await self.makeOrGetConnection()
|
|
guard case .sentResponse(requestID) = self.lambdaState else {
|
|
fatalError("Invalid state: \(self.lambdaState)")
|
|
}
|
|
try await handler.reportError(error, requestID: requestID)
|
|
guard case .sentResponse(requestID) = self.lambdaState else {
|
|
fatalError("Invalid state: \(self.lambdaState)")
|
|
}
|
|
self.lambdaState = .idle(previousRequestID: requestID)
|
|
}
|
|
}
|
|
|
|
private func channelClosed(_ channel: any Channel) {
|
|
// Check if this is an old channel that we're already tracking as closed
|
|
// This handles the race condition where:
|
|
// 1. connectionWillClose() is called, adding the channel to closingConnections
|
|
// 2. A new connection is established (connectionState = .connected with new channel)
|
|
// 3. The old channel's closeFuture fires (closingState might be .closed)
|
|
// 4. We receive channelClosed() for the OLD channel while NEW channel is connected
|
|
if self.closingConnections.contains(where: { $0 === channel }) {
|
|
// If this channel is still the currently connected channel, let the main
|
|
// state-handling logic below run instead of treating it as an old channel.
|
|
if case .connected(let stateChannel, _) = self.connectionState, channel === stateChannel {
|
|
// Remove from tracking and fall through to the main switch statement
|
|
if let index = self.closingConnections.firstIndex(where: { $0 === channel }) {
|
|
self.closingConnections.remove(at: index)
|
|
}
|
|
} else {
|
|
// This is an old channel that's finishing its close operation
|
|
if let index = self.closingConnections.firstIndex(where: { $0 === channel }) {
|
|
self.closingConnections.remove(at: index)
|
|
}
|
|
|
|
// If we're in closing state and all connections are now closed, complete the close
|
|
if case .closing(let continuation) = self.closingState,
|
|
self.closingConnections.isEmpty
|
|
{
|
|
self.closingState = .closed
|
|
continuation.resume()
|
|
}
|
|
|
|
self.logger.trace(
|
|
"Old channel closed after new connection established",
|
|
metadata: ["channel": "\(channel)"]
|
|
)
|
|
return
|
|
}
|
|
}
|
|
|
|
switch (self.connectionState, self.closingState) {
|
|
case (_, .closed):
|
|
// This should not happen, but if it does, it means we're receiving a close
|
|
// notification for a channel after the runtime client has fully closed.
|
|
// Log it but don't crash - this could be a legitimate race condition.
|
|
self.logger.warning(
|
|
"Received channelClosed after closingState is .closed",
|
|
metadata: [
|
|
"channel": "\(channel)",
|
|
"connectionState": "\(self.connectionState)",
|
|
]
|
|
)
|
|
return
|
|
|
|
case (.disconnected, .notClosing):
|
|
if let index = self.closingConnections.firstIndex(where: { $0 === channel }) {
|
|
self.closingConnections.remove(at: index)
|
|
}
|
|
|
|
case (.disconnected, .closing(let continuation)):
|
|
if let index = self.closingConnections.firstIndex(where: { $0 === channel }) {
|
|
self.closingConnections.remove(at: index)
|
|
}
|
|
|
|
if self.closingConnections.isEmpty {
|
|
self.closingState = .closed
|
|
continuation.resume()
|
|
}
|
|
|
|
case (.connecting(let array), .notClosing):
|
|
self.connectionState = .disconnected
|
|
for continuation in array {
|
|
continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost))
|
|
}
|
|
|
|
case (.connecting(let array), .closing(let continuation)):
|
|
self.connectionState = .disconnected
|
|
precondition(array.isEmpty, "If we are closing we should have failed all connection attempts already")
|
|
if self.closingConnections.isEmpty {
|
|
self.closingState = .closed
|
|
continuation.resume()
|
|
}
|
|
|
|
case (.connected(let currentChannel, _), .notClosing):
|
|
// Only transition to disconnected if this is the CURRENT channel closing
|
|
if currentChannel === channel {
|
|
self.connectionState = .disconnected
|
|
} else {
|
|
// This is an old channel closing - remove from tracking
|
|
if let index = self.closingConnections.firstIndex(where: { $0 === channel }) {
|
|
self.closingConnections.remove(at: index)
|
|
}
|
|
|
|
self.logger.trace(
|
|
"Old channel closing while new connection is active",
|
|
metadata: [
|
|
"closingChannel": "\(channel)",
|
|
"currentChannel": "\(currentChannel)",
|
|
]
|
|
)
|
|
}
|
|
|
|
case (.connected(let currentChannel, _), .closing(let continuation)):
|
|
// Only transition to disconnected if this is the CURRENT channel closing
|
|
if currentChannel === channel {
|
|
self.connectionState = .disconnected
|
|
} else {
|
|
// This is an old channel closing - remove from tracking
|
|
if let index = self.closingConnections.firstIndex(where: { $0 === channel }) {
|
|
self.closingConnections.remove(at: index)
|
|
}
|
|
}
|
|
|
|
if self.closingConnections.isEmpty {
|
|
self.closingState = .closed
|
|
continuation.resume()
|
|
}
|
|
}
|
|
}
|
|
|
|
private func makeOrGetConnection() async throws -> LambdaChannelHandler<LambdaRuntimeClient> {
|
|
switch self.connectionState {
|
|
case .disconnected:
|
|
self.connectionState = .connecting([])
|
|
break
|
|
case .connecting(var array):
|
|
// Since we do get sequential invocations this case normally should never be hit.
|
|
// We'll support it anyway.
|
|
let loopBound = try await withCheckedThrowingContinuation { (continuation: ConnectionContinuation) in
|
|
array.append(continuation)
|
|
self.connectionState = .connecting(array)
|
|
}
|
|
return loopBound.value
|
|
case .connected(_, let handler):
|
|
return handler
|
|
}
|
|
|
|
let bootstrap = ClientBootstrap(group: self.eventLoop)
|
|
.channelInitializer { channel in
|
|
do {
|
|
try channel.pipeline.syncOperations.addHTTPClientHandlers()
|
|
// Lambda quotas... An invocation payload is maximal 6MB in size:
|
|
// https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
|
|
try channel.pipeline.syncOperations.addHandler(
|
|
NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024)
|
|
)
|
|
try channel.pipeline.syncOperations.addHandler(
|
|
LambdaChannelHandler(
|
|
delegate: self,
|
|
logger: self.logger,
|
|
configuration: self.configuration
|
|
)
|
|
)
|
|
return channel.eventLoop.makeSucceededFuture(())
|
|
} catch {
|
|
return channel.eventLoop.makeFailedFuture(error)
|
|
}
|
|
}
|
|
.connectTimeout(.seconds(2))
|
|
|
|
do {
|
|
// connect directly via socket address to avoid happy eyeballs (perf)
|
|
let address = try SocketAddress(ipAddress: self.configuration.ip, port: self.configuration.port)
|
|
let channel = try await bootstrap.connect(to: address).get()
|
|
let handler = try channel.pipeline.syncOperations.handler(
|
|
type: LambdaChannelHandler<LambdaRuntimeClient>.self
|
|
)
|
|
self.logger.trace(
|
|
"Connection to control plane created",
|
|
metadata: [
|
|
"lambda_port": "\(self.configuration.port)",
|
|
"lambda_ip": "\(self.configuration.ip)",
|
|
]
|
|
)
|
|
channel.closeFuture.whenComplete { _ in
|
|
self.assumeIsolatedOnEventLoop { runtimeClient in
|
|
// close the channel
|
|
runtimeClient.channelClosed(channel)
|
|
// Note: Do NOT set connectionState = .disconnected here!
|
|
// The channelClosed() method handles state transitions properly,
|
|
// checking if this is the current channel or an old one.
|
|
}
|
|
}
|
|
|
|
switch self.connectionState {
|
|
case .disconnected, .connected:
|
|
fatalError("Unexpected state: \(self.connectionState)")
|
|
|
|
case .connecting(let array):
|
|
self.connectionState = .connected(channel, handler)
|
|
defer {
|
|
let loopBound = NIOLoopBound(handler, eventLoop: self.eventLoop)
|
|
for continuation in array {
|
|
continuation.resume(returning: loopBound)
|
|
}
|
|
}
|
|
return handler
|
|
}
|
|
} catch {
|
|
|
|
switch self.connectionState {
|
|
case .disconnected, .connected:
|
|
fatalError("Unexpected state: \(self.connectionState)")
|
|
|
|
case .connecting(let array):
|
|
self.connectionState = .disconnected
|
|
defer {
|
|
for continuation in array {
|
|
continuation.resume(throwing: error)
|
|
}
|
|
}
|
|
throw error
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
@available(LambdaSwift 2.0, *)
|
|
extension LambdaRuntimeClient: LambdaChannelHandlerDelegate {
|
|
nonisolated func connectionErrorHappened(_ error: any Error, channel: any Channel) {}
|
|
|
|
nonisolated func connectionWillClose(channel: any Channel) {
|
|
self.assumeIsolatedOnEventLoop { isolated in
|
|
switch isolated.connectionState {
|
|
case .disconnected:
|
|
// this case should never happen. But whatever
|
|
if channel.isActive {
|
|
isolated.closingConnections.append(channel)
|
|
}
|
|
|
|
case .connecting(let continuations):
|
|
// this case should never happen. But whatever
|
|
if channel.isActive {
|
|
isolated.closingConnections.append(channel)
|
|
}
|
|
|
|
for continuation in continuations {
|
|
continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost))
|
|
}
|
|
|
|
case .connected(let stateChannel, _):
|
|
guard channel === stateChannel else {
|
|
// This is an old channel closing - add to tracking
|
|
isolated.closingConnections.append(channel)
|
|
isolated.logger.trace(
|
|
"Old channel will close while new connection is active",
|
|
metadata: [
|
|
"closingChannel": "\(channel)",
|
|
"currentChannel": "\(stateChannel)",
|
|
]
|
|
)
|
|
return
|
|
}
|
|
|
|
isolated.connectionState = .disconnected
|
|
}
|
|
}
|
|
}
|
|
}
|