From 34e89b402741c044a275123867695da2974f79bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 15 Jan 2026 20:58:03 +0100 Subject: [PATCH] Fix Test hangs in Lambda+LocalServer (#630) (#631) # Fix test hangs caused by Pool cancellation race conditions ## Summary This PR fixes two related race conditions in `Lambda+LocalServer+Pool.swift` that were causing the test suite to hang approximately 10% of the time. ## Problem The test suite exhibited intermittent hangs (~10% frequency) due to two bugs in the Pool implementation: 1. **Individual task cancellation bug**: When one task waiting for a specific `requestId` was cancelled, the cancellation handler would incorrectly cancel ALL waiting tasks instead of just the cancelled one. 2. **Server shutdown hang**: When the server shut down, waiting continuations in the pools were never cancelled, causing handlers to wait indefinitely for responses that would never arrive. ## Root Causes ### Root Cause #1: Cancellation Handler Removes ALL Continuations The `onCancel` handler in `Pool._next()` was removing all continuations from the `waitingForSpecific` dictionary when any single task was cancelled: ```swift onCancel: { // BUG: Removes ALL continuations, not just the cancelled task's for continuation in state.waitingForSpecific.values { toCancel.append(continuation) } state.waitingForSpecific.removeAll() } ``` This caused unrelated concurrent invocations to fail with `CancellationError` when one client cancelled their request. ### Root Cause #2: No Pool Cleanup During Server Shutdown When the server shut down (e.g., test completes), the task group was cancelled but the pools' waiting continuations were never notified. The `/invoke` endpoint handlers would continue waiting for responses that would never arrive because the Lambda function had stopped. ## Solution ### Fix #1: Only Remove Specific Continuation on Cancellation Modified the cancellation handler to only remove the continuation for the specific cancelled task: ```swift onCancel: { // Only remove THIS task's continuation let continuationToCancel = self.lock.withLock { state -> CheckedContinuation? in if let requestId = requestId { return state.waitingForSpecific.removeValue(forKey: requestId) } else { let cont = state.waitingForAny state.waitingForAny = nil return cont } } continuationToCancel?.resume(throwing: CancellationError()) } ``` ### Fix #2: Add Pool Cleanup During Server Shutdown Added `cancelAll()` method to the Pool class and call it during server shutdown: ```swift func cancelAll() { let continuationsToCancel = self.lock.withLock { state -> [CheckedContinuation] in var toCancel: [CheckedContinuation] = [] if let continuation = state.waitingForAny { toCancel.append(continuation) state.waitingForAny = nil } for continuation in state.waitingForSpecific.values { toCancel.append(continuation) } state.waitingForSpecific.removeAll() return toCancel } for continuation in continuationsToCancel { continuation.resume(throwing: CancellationError()) } } ``` Called during server shutdown: ```swift let serverOrHandlerResult1 = await group.next()! group.cancelAll() // Cancel all waiting continuations in the pools to prevent hangs server.invocationPool.cancelAll() server.responsePool.cancelAll() ``` ## Changes ### Modified Files - **Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer+Pool.swift** - Fixed cancellation handler in `_next()` to only remove specific continuation - Added `cancelAll()` method for server shutdown cleanup - **Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer.swift** - Call `cancelAll()` on both pools during server shutdown ### New Files - **Tests/AWSLambdaRuntimeTests/LocalServerPoolCancellationTests.swift** - Added comprehensive test suite with 3 tests - `testCancellationOnlyAffectsOwnTask`: Verifies only the cancelled task receives CancellationError - `testConcurrentInvocationsWithCancellation`: Tests real-world scenario with 5 concurrent invocations - `testFIFOModeCancellation`: Ensures FIFO mode cancellation works correctly ## Testing ### Before Fix - Test suite hung ~10% of the time - When 1 task was cancelled, all 5 concurrent tasks received `CancellationError` - Streaming tests would occasionally hang during shutdown ### After Fix - All 91 tests pass consistently without hangs - When 1 task is cancelled, only that specific task receives `CancellationError` - Other tasks continue waiting normally - Server shutdown properly cleans up all waiting continuations - Multiple consecutive test runs confirm stability ### Test Coverage The new test suite reproduces both bugs and verifies the fixes: 1. **testCancellationOnlyAffectsOwnTask**: Creates 3 tasks waiting for different requestIds, cancels only one, and verifies the others are not affected 2. **testConcurrentInvocationsWithCancellation**: Simulates 5 concurrent invocations with one cancellation 3. **testFIFOModeCancellation**: Tests FIFO mode to ensure it still works correctly --------- Co-authored-by: Sebastien Stormacq --- .../HTTPServer/Lambda+LocalServer+Pool.swift | 51 ++-- .../HTTPServer/Lambda+LocalServer.swift | 4 + .../LocalServerPoolCancellationTests.swift | 256 ++++++++++++++++++ 3 files changed, 294 insertions(+), 17 deletions(-) create mode 100644 Tests/AWSLambdaRuntimeTests/LocalServerPoolCancellationTests.swift diff --git a/Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer+Pool.swift b/Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer+Pool.swift index cbbca58c..83ef8a6c 100644 --- a/Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer+Pool.swift +++ b/Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer+Pool.swift @@ -131,27 +131,20 @@ extension LambdaHTTPServer { } } } onCancel: { - // Ensure we properly handle cancellation by removing stored continuation - let continuationsToCancel = self.lock.withLock { state -> [CheckedContinuation] in - var toCancel: [CheckedContinuation] = [] - - if let continuation = state.waitingForAny { - toCancel.append(continuation) + // Only remove THIS task's continuation + let continuationToCancel = self.lock.withLock { state -> CheckedContinuation? in + if let requestId = requestId { + // Remove only the continuation for this specific requestId + return state.waitingForSpecific.removeValue(forKey: requestId) + } else { + // Remove only the FIFO continuation + let cont = state.waitingForAny state.waitingForAny = nil + return cont } - - for continuation in state.waitingForSpecific.values { - toCancel.append(continuation) - } - state.waitingForSpecific.removeAll() - - return toCancel } - // Resume all continuations outside the lock to avoid potential deadlocks - for continuation in continuationsToCancel { - continuation.resume(throwing: CancellationError()) - } + continuationToCancel?.resume(throwing: CancellationError()) } } @@ -169,6 +162,30 @@ extension LambdaHTTPServer { self } + /// Cancel all waiting continuations - used during server shutdown + func cancelAll() { + let continuationsToCancel = self.lock.withLock { state -> [CheckedContinuation] in + var toCancel: [CheckedContinuation] = [] + + if let continuation = state.waitingForAny { + toCancel.append(continuation) + state.waitingForAny = nil + } + + for continuation in state.waitingForSpecific.values { + toCancel.append(continuation) + } + state.waitingForSpecific.removeAll() + + return toCancel + } + + // Resume all continuations outside the lock + for continuation in continuationsToCancel { + continuation.resume(throwing: CancellationError()) + } + } + struct PoolError: Error { let cause: Cause var message: String { diff --git a/Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer.swift index ba0aec17..84db6505 100644 --- a/Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer.swift @@ -223,6 +223,10 @@ internal struct LambdaHTTPServer { let serverOrHandlerResult1 = await group.next()! group.cancelAll() + // Cancel all waiting continuations in the pools to prevent hangs + server.invocationPool.cancelAll() + server.responsePool.cancelAll() + switch serverOrHandlerResult1 { case .closureResult(let result): return result diff --git a/Tests/AWSLambdaRuntimeTests/LocalServerPoolCancellationTests.swift b/Tests/AWSLambdaRuntimeTests/LocalServerPoolCancellationTests.swift new file mode 100644 index 00000000..8bc14595 --- /dev/null +++ b/Tests/AWSLambdaRuntimeTests/LocalServerPoolCancellationTests.swift @@ -0,0 +1,256 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright SwiftAWSLambdaRuntime project authors +// Copyright (c) Amazon.com, Inc. or its affiliates. +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if LocalServerSupport +import Testing +import NIOCore +import Synchronization + +@testable import AWSLambdaRuntime + +@Suite("LocalServer Pool Cancellation Tests") +struct LocalServerPoolCancellationTests { + + /// Test that reproduces Issue #2: Cancellation handler removes ALL continuations + /// + /// This test demonstrates the bug where cancelling one task waiting in `waitingForSpecific` + /// causes ALL other waiting tasks to also receive CancellationError, even though they + /// weren't cancelled. + /// + /// Expected behavior: Only the cancelled task should receive CancellationError + /// Actual behavior: ALL waiting tasks receive CancellationError + @Test("Cancelling one task should not affect other waiting tasks") + @available(LambdaSwift 2.0, *) + func testCancellationOnlyAffectsOwnTask() async throws { + #if compiler(>=6.0) + let pool = LambdaHTTPServer.Pool(name: "Test Pool") + + let cancelledFlags = Mutex<[Bool]>([false, false, false]) + + // Create 3 tasks waiting for different requestIds + let task1 = Task { @Sendable in + do { + _ = try await pool.next(for: "request-1") + } catch is CancellationError { + cancelledFlags.withLock { $0[0] = true } + } + } + + let task2 = Task { @Sendable in + do { + _ = try await pool.next(for: "request-2") + } catch is CancellationError { + cancelledFlags.withLock { $0[1] = true } + } + } + + let task3 = Task { @Sendable in + do { + _ = try await pool.next(for: "request-3") + } catch is CancellationError { + cancelledFlags.withLock { $0[2] = true } + } + } + + // Let tasks register their continuations + try await Task.sleep(for: .milliseconds(100)) + + // Cancel only task 2 + task2.cancel() + + // Give cancellation time to propagate + try await Task.sleep(for: .milliseconds(100)) + + // Check cancellation status + let flags = cancelledFlags.withLock { $0 } + + #expect(flags[1] == true, "Task 2 should be cancelled") + + // With the bug, task1 and task3 will also be cancelled + if flags[0] || flags[2] { + Issue.record("BUG REPRODUCED: Other tasks were cancelled when only task 2 should have been cancelled") + } + + #expect(flags[0] == false, "Task 1 should NOT be cancelled") + #expect(flags[2] == false, "Task 3 should NOT be cancelled") + + // Clean up - cancel all tasks + task1.cancel() + task2.cancel() + task3.cancel() + + _ = await task1.result + _ = await task2.result + _ = await task3.result + + #else + throw XCTSkip("This test requires Swift 6.0 or later") + #endif + } + + /// Test concurrent invocations with one being cancelled + /// + /// This simulates the real-world scenario where multiple clients invoke the Lambda + /// function simultaneously, and one client's connection drops. + @Test("Multiple concurrent invocations with one cancellation") + @available(LambdaSwift 2.0, *) + func testConcurrentInvocationsWithCancellation() async throws { + #if compiler(>=6.0) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Timeout task + group.addTask { + try await Task.sleep(for: .seconds(10)) + throw TestError.timeout + } + + // Main test task + group.addTask { + let pool = LambdaHTTPServer.Pool(name: "Concurrent Test Pool") + + let cancelledCount = Mutex(0) + + // Spawn 5 concurrent tasks waiting for different requestIds + var tasks: [Task] = [] + for i in 1...5 { + let task = Task { @Sendable in + do { + _ = try await pool.next(for: "request-\(i)") + } catch is CancellationError { + cancelledCount.withLock { $0 += 1 } + } + } + tasks.append(task) + } + + // Let all tasks register their continuations + try await Task.sleep(for: .milliseconds(200)) + + // Cancel task 3 (index 2) + tasks[2].cancel() + + // Give cancellation time to propagate + try await Task.sleep(for: .milliseconds(200)) + + // Check how many tasks were cancelled + let count = cancelledCount.withLock { $0 } + + // Expected: 1 cancelled + // Actual (with bug): 5 cancelled + if count > 1 { + Issue.record("BUG REPRODUCED: \(count) tasks were cancelled, but only 1 should have been cancelled") + } + + #expect(count == 1, "Only 1 task should be cancelled, but \(count) were cancelled") + + // Clean up - cancel all remaining tasks + for task in tasks { + task.cancel() + } + + for task in tasks { + _ = await task.result + } + } + + // Wait for first task to complete (should be main test, not timeout) + try await group.next() + group.cancelAll() + } + + #else + throw XCTSkip("This test requires Swift 6.0 or later") + #endif + } + + /// Test that FIFO mode doesn't have the same issue + /// + /// FIFO mode only allows one waiter at a time, so this bug shouldn't affect it. + @Test("FIFO mode cancellation works correctly") + @available(LambdaSwift 2.0, *) + func testFIFOModeCancellation() async throws { + #if compiler(>=6.0) + let pool = LambdaHTTPServer.Pool(name: "FIFO Test Pool") + + try await withThrowingTaskGroup(of: Void.self) { group in + + // Timeout + group.addTask { + try await Task.sleep(for: .seconds(10)) + throw TestError.timeout + } + + // Main test + group.addTask { + let task = Task { @Sendable in + do { + guard let item = try await pool.next() else { + return "error: nil item" + } + return "success: \(item.id)" + } catch is CancellationError { + return "cancelled" + } catch { + return "error: \(error)" + } + } + + // Let task register continuation + try await Task.sleep(for: .milliseconds(100)) + + // Cancel the task + task.cancel() + + // Wait for result + let result = await task.value + + #expect(result == "cancelled", "Task should be cancelled") + } + + try await group.next() + group.cancelAll() + } + #else + throw XCTSkip("This test requires Swift 6.0 or later") + #endif + } +} + +// MARK: - Test Helpers + +extension LocalServerPoolCancellationTests { + + struct TestItem: Sendable { + let id: String + let data: String + } + + enum TestResult: Sendable { + case success(String) + case cancelled(String) + case error(String, any Error) + } + + enum TestError: Error { + case timeout + } +} + +// Make TestItem conform to LocalServerResponse protocol if needed +extension LocalServerPoolCancellationTests.TestItem { + var requestId: String? { id } +} + +#endif