mirror of
https://github.com/swift-server/swift-aws-lambda-runtime.git
synced 2026-05-03 07:22:27 +00:00
b1553d2766
Closing https://github.com/swift-server/swift-aws-lambda-runtime/issues/584 The LocalServer now queues concurrent `POST /invoke` requests from testing client applications and ensures that the requests are delivered to the Lambda Runtime one by one, just like the AWS Lambda Runtime environment does. The `Pool` has now two modes : pure FIFO (one element get exactly one `next()`) and one mode where multiple elements can get pushed and multiple `next(for requestId:String)` can be called concurrently. The two modes are needed because invocations are 1:1 (one `POST /invoke` is always by one matching `GET /next`) but responses are n:n (a response can have multiple chunks and concurrent invocations can trigger multiple `next(for requestId: String)` I made a couple of additional changes while working on this PR - I moved the `Pool` code in a separate file for improved readability - I removed an instance of `DispatchTime` that was hiding in the code, unnoticed until today - I removed the `async` requirement on `Pool.push(_)` function. This was not required (thank you @t089 for having reported this) - I removed the `fatalError()` that was in the `Pool` implementation. The pool now throws an error when `next()` is invoked concurrently, making it easier to test. - I added extensive unit tests to validate the Pool behavior - I added a test to verify that a rapid succession of client invocations are correctly queued and return no error - I moved a `continuation(resume:)` outside of a lock. Generally speaking, it's a bad idea to resume continuation while owning a lock. I suspect this is causing a error during test execution when we spawn and tear down mutliple `Task` very quickly. In some rare occasions, the test was failing with an invalid assertion in NIO : `NIOCore/NIOAsyncWriter.swift:177: Fatal error: Deinited NIOAsyncWriter without calling finish()` --------- Co-authored-by: Sebastien Stormacq <stormacq@amazon.lu> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
521 lines
18 KiB
Swift
521 lines
18 KiB
Swift
//===----------------------------------------------------------------------===//
|
|
//
|
|
// This source file is part of the SwiftAWSLambdaRuntime open source project
|
|
//
|
|
// Copyright SwiftAWSLambdaRuntime project authors
|
|
// Copyright (c) Amazon.com, Inc. or its affiliates.
|
|
// Licensed under Apache License v2.0
|
|
//
|
|
// See LICENSE.txt for license information
|
|
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
|
|
//
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
|
|
#if LocalServerSupport
|
|
|
|
import NIOCore
|
|
import Testing
|
|
|
|
@testable import AWSLambdaRuntime
|
|
|
|
struct PoolTests {
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testBasicPushAndIteration() async throws {
|
|
let pool = LambdaHTTPServer.Pool<String>()
|
|
|
|
// Push values
|
|
pool.push("first")
|
|
pool.push("second")
|
|
|
|
// Iterate and verify order
|
|
var values = [String]()
|
|
for try await value in pool {
|
|
values.append(value)
|
|
if values.count == 2 { break }
|
|
}
|
|
|
|
#expect(values == ["first", "second"])
|
|
}
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testPoolCancellation() async throws {
|
|
let pool = LambdaHTTPServer.Pool<String>()
|
|
|
|
// Create a task that will be cancelled
|
|
let task = Task {
|
|
for try await _ in pool {
|
|
Issue.record("Should not receive any values after cancellation")
|
|
}
|
|
}
|
|
|
|
// Cancel the task immediately
|
|
task.cancel()
|
|
|
|
// This should complete without receiving any values
|
|
do {
|
|
try await task.value
|
|
} catch is CancellationError {
|
|
// this might happen depending on the order on which the cancellation is handled
|
|
}
|
|
}
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testConcurrentPushAndIteration() async throws {
|
|
let pool = LambdaHTTPServer.Pool<Int>()
|
|
let iterations = 1000
|
|
|
|
// Start consumer task first
|
|
let consumer = Task { @Sendable in
|
|
var receivedValues = Set<Int>()
|
|
var count = 0
|
|
for try await value in pool {
|
|
receivedValues.insert(value)
|
|
count += 1
|
|
if count >= iterations { break }
|
|
}
|
|
return receivedValues
|
|
}
|
|
|
|
// Create multiple producer tasks
|
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
|
for i in 0..<iterations {
|
|
group.addTask {
|
|
pool.push(i)
|
|
}
|
|
}
|
|
try await group.waitForAll()
|
|
}
|
|
|
|
// Wait for consumer to complete
|
|
let receivedValues = try await consumer.value
|
|
|
|
// Verify all values were received exactly once
|
|
#expect(receivedValues.count == iterations)
|
|
#expect(Set(0..<iterations) == receivedValues)
|
|
}
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testPushToWaitingConsumer() async throws {
|
|
let pool = LambdaHTTPServer.Pool<String>()
|
|
let expectedValue = "test value"
|
|
|
|
// Start a consumer that will wait for a value
|
|
let consumer = Task {
|
|
for try await value in pool {
|
|
#expect(value == expectedValue)
|
|
break
|
|
}
|
|
}
|
|
|
|
// Give consumer time to start waiting
|
|
try await Task.sleep(nanoseconds: 100_000_000) // 0.1 seconds
|
|
|
|
// Push a value
|
|
pool.push(expectedValue)
|
|
|
|
// Wait for consumer to complete
|
|
try await consumer.value
|
|
}
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testStressTest() async throws {
|
|
let pool = LambdaHTTPServer.Pool<Int>()
|
|
let producerCount = 10
|
|
let messagesPerProducer = 1000
|
|
|
|
// Start consumer
|
|
let consumer = Task { @Sendable in
|
|
var receivedValues = [Int]()
|
|
var count = 0
|
|
for try await value in pool {
|
|
receivedValues.append(value)
|
|
count += 1
|
|
if count >= producerCount * messagesPerProducer { break }
|
|
}
|
|
return receivedValues
|
|
}
|
|
|
|
// Create multiple producers
|
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
|
for p in 0..<producerCount {
|
|
group.addTask {
|
|
for i in 0..<messagesPerProducer {
|
|
pool.push(p * messagesPerProducer + i)
|
|
}
|
|
}
|
|
}
|
|
try await group.waitForAll()
|
|
}
|
|
|
|
// Wait for consumer to complete
|
|
let receivedValues = try await consumer.value
|
|
|
|
// Verify we received all values
|
|
#expect(receivedValues.count == producerCount * messagesPerProducer)
|
|
#expect(Set(receivedValues).count == producerCount * messagesPerProducer)
|
|
}
|
|
|
|
// in Swift 6.0, the error returned by #expect(throwing:) macro is a tuple ()
|
|
// I decided to skip these tests on Swift 6.0
|
|
#if swift(>=6.1)
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testConcurrentNext() async throws {
|
|
let pool = LambdaHTTPServer.Pool<String>()
|
|
|
|
// Create two tasks that will both wait for elements to be available
|
|
let error = await #expect(throws: LambdaHTTPServer.Pool<String>.PoolError.self) {
|
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
|
|
|
// one of the two task will throw a PoolError
|
|
|
|
group.addTask {
|
|
for try await _ in pool {
|
|
}
|
|
Issue.record("Loop 1 should not complete")
|
|
}
|
|
|
|
group.addTask {
|
|
for try await _ in pool {
|
|
}
|
|
Issue.record("Loop 2 should not complete")
|
|
}
|
|
try await group.waitForAll()
|
|
}
|
|
}
|
|
|
|
// Verify it's the correct error cause
|
|
if case .nextCalledTwice = error?.cause {
|
|
// This is the expected error
|
|
} else {
|
|
Issue.record("Expected nextCalledTwice error, got: \(String(describing: error?.cause))")
|
|
}
|
|
}
|
|
|
|
// MARK: - Invariant Tests for RequestId-specific functionality
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testRequestIdSpecificNext() async throws {
|
|
let pool = LambdaHTTPServer.Pool<LambdaHTTPServer.LocalServerResponse>()
|
|
|
|
// Push responses with different requestIds
|
|
pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "data1")))
|
|
pool.push(LambdaHTTPServer.LocalServerResponse(id: "req2", body: ByteBuffer(string: "data2")))
|
|
pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "data3")))
|
|
|
|
// Get specific responses
|
|
let response1 = try await pool.next(for: "req1")
|
|
#expect(response1.requestId == "req1")
|
|
#expect(String(buffer: response1.body!) == "data1")
|
|
|
|
let response2 = try await pool.next(for: "req2")
|
|
#expect(response2.requestId == "req2")
|
|
#expect(String(buffer: response2.body!) == "data2")
|
|
|
|
let response3 = try await pool.next(for: "req1")
|
|
#expect(response3.requestId == "req1")
|
|
#expect(String(buffer: response3.body!) == "data3")
|
|
}
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testStreamingResponsesWithSameRequestId() async throws {
|
|
let pool = LambdaHTTPServer.Pool<LambdaHTTPServer.LocalServerResponse>()
|
|
let requestId = "streaming-req"
|
|
|
|
let chunks = try await withThrowingTaskGroup(of: [String].self) { group in
|
|
// Start consumer task
|
|
group.addTask {
|
|
var chunks: [String] = []
|
|
var isComplete = false
|
|
|
|
while !isComplete {
|
|
let response = try await pool.next(for: requestId)
|
|
if let body = response.body {
|
|
chunks.append(String(buffer: body))
|
|
}
|
|
if response.final {
|
|
isComplete = true
|
|
}
|
|
}
|
|
return chunks
|
|
}
|
|
|
|
// Start producer task
|
|
group.addTask {
|
|
// Give consumer time to start waiting
|
|
try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds
|
|
|
|
// Push multiple chunks for the same requestId
|
|
pool.push(
|
|
LambdaHTTPServer.LocalServerResponse(
|
|
id: requestId,
|
|
body: ByteBuffer(string: "chunk1"),
|
|
final: false
|
|
)
|
|
)
|
|
pool.push(
|
|
LambdaHTTPServer.LocalServerResponse(
|
|
id: requestId,
|
|
body: ByteBuffer(string: "chunk2"),
|
|
final: false
|
|
)
|
|
)
|
|
pool.push(
|
|
LambdaHTTPServer.LocalServerResponse(id: requestId, body: ByteBuffer(string: "chunk3"), final: true)
|
|
)
|
|
|
|
return [] // Producer doesn't return chunks
|
|
}
|
|
|
|
// Wait for consumer to complete and return its result
|
|
for try await result in group {
|
|
if !result.isEmpty {
|
|
group.cancelAll()
|
|
return result
|
|
}
|
|
}
|
|
return []
|
|
}
|
|
|
|
#expect(chunks == ["chunk1", "chunk2", "chunk3"])
|
|
}
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testMixedWaitingModesError() async throws {
|
|
let pool = LambdaHTTPServer.Pool<LambdaHTTPServer.LocalServerResponse>()
|
|
|
|
let error = await #expect(throws: LambdaHTTPServer.Pool<LambdaHTTPServer.LocalServerResponse>.PoolError.self) {
|
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
|
// Start a FIFO consumer
|
|
group.addTask {
|
|
for try await _ in pool {
|
|
// This should block waiting for any item
|
|
}
|
|
}
|
|
|
|
// Start a requestId-specific consumer after a delay
|
|
group.addTask {
|
|
// Give FIFO task time to start waiting
|
|
try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds
|
|
|
|
// Try to use requestId-specific next - should fail with mixedWaitingModes
|
|
_ = try await pool.next(for: "req1")
|
|
}
|
|
|
|
// Wait for the first task to complete (which should be the error)
|
|
try await group.next()
|
|
group.cancelAll()
|
|
}
|
|
}
|
|
|
|
// Verify it's the correct error cause
|
|
if case .mixedWaitingModes = error?.cause {
|
|
// This is the expected error
|
|
} else {
|
|
Issue.record("Expected mixedWaitingModes error, got: \(String(describing: error?.cause))")
|
|
}
|
|
}
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testMixedWaitingModesErrorReverse() async throws {
|
|
let pool = LambdaHTTPServer.Pool<LambdaHTTPServer.LocalServerResponse>()
|
|
|
|
let error = await #expect(throws: LambdaHTTPServer.Pool<LambdaHTTPServer.LocalServerResponse>.PoolError.self) {
|
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
|
// Start a requestId-specific consumer
|
|
group.addTask {
|
|
_ = try await pool.next(for: "req1")
|
|
}
|
|
|
|
// Start a FIFO consumer after a delay
|
|
group.addTask {
|
|
// Give specific task time to start waiting
|
|
try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds
|
|
|
|
// Try to use FIFO next - should fail with mixedWaitingModes
|
|
for try await _ in pool {
|
|
break
|
|
}
|
|
}
|
|
|
|
// Wait for the first task to complete (which should be the error)
|
|
try await group.next()
|
|
group.cancelAll()
|
|
}
|
|
}
|
|
|
|
// Verify it's the correct error cause
|
|
if case .mixedWaitingModes = error?.cause {
|
|
// This is the expected error
|
|
} else {
|
|
Issue.record("Expected mixedWaitingModes error, got: \(String(describing: error?.cause))")
|
|
}
|
|
}
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testDuplicateRequestIdWaitError() async throws {
|
|
let pool = LambdaHTTPServer.Pool<LambdaHTTPServer.LocalServerResponse>()
|
|
|
|
let error = await #expect(throws: LambdaHTTPServer.Pool<LambdaHTTPServer.LocalServerResponse>.PoolError.self) {
|
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
|
// Start first consumer waiting for specific requestId
|
|
group.addTask {
|
|
_ = try await pool.next(for: "req1")
|
|
}
|
|
|
|
// Start second consumer for same requestId after a delay
|
|
group.addTask {
|
|
// Give first task time to start waiting
|
|
try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds
|
|
|
|
// Try to wait for the same requestId - should fail
|
|
_ = try await pool.next(for: "req1")
|
|
}
|
|
|
|
// Wait for the first task to complete (which should be the error)
|
|
try await group.next()
|
|
group.cancelAll()
|
|
}
|
|
}
|
|
|
|
// Verify it's the correct error cause and requestId
|
|
if case let .duplicateRequestIdWait(requestId) = error?.cause {
|
|
#expect(requestId == "req1")
|
|
} else {
|
|
Issue.record("Expected duplicateRequestIdWait error, got: \(String(describing: error?.cause))")
|
|
}
|
|
}
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testConcurrentRequestIdConsumers() async throws {
|
|
let pool = LambdaHTTPServer.Pool<LambdaHTTPServer.LocalServerResponse>()
|
|
|
|
let results = try await withThrowingTaskGroup(of: (String, String).self) { group in
|
|
// Start multiple consumers for different requestIds
|
|
group.addTask {
|
|
let response = try await pool.next(for: "req1")
|
|
return ("req1", String(buffer: response.body!))
|
|
}
|
|
|
|
group.addTask {
|
|
let response = try await pool.next(for: "req2")
|
|
return ("req2", String(buffer: response.body!))
|
|
}
|
|
|
|
group.addTask {
|
|
let response = try await pool.next(for: "req3")
|
|
return ("req3", String(buffer: response.body!))
|
|
}
|
|
|
|
// Start producer task
|
|
group.addTask {
|
|
// Give tasks time to start waiting
|
|
try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds
|
|
|
|
// Push responses in different order
|
|
pool.push(LambdaHTTPServer.LocalServerResponse(id: "req3", body: ByteBuffer(string: "data3")))
|
|
pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "data1")))
|
|
pool.push(LambdaHTTPServer.LocalServerResponse(id: "req2", body: ByteBuffer(string: "data2")))
|
|
|
|
return ("producer", "") // Producer doesn't return meaningful data
|
|
}
|
|
|
|
// Collect results from consumers
|
|
var consumerResults: [String: String] = [:]
|
|
for try await (requestId, data) in group {
|
|
if requestId != "producer" {
|
|
consumerResults[requestId] = data
|
|
}
|
|
if consumerResults.count == 3 {
|
|
group.cancelAll()
|
|
break
|
|
}
|
|
}
|
|
return consumerResults
|
|
}
|
|
|
|
// Verify each consumer gets the correct response
|
|
#expect(results["req1"] == "data1")
|
|
#expect(results["req2"] == "data2")
|
|
#expect(results["req3"] == "data3")
|
|
}
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testCancellationCleansUpAllContinuations() async throws {
|
|
let pool = LambdaHTTPServer.Pool<LambdaHTTPServer.LocalServerResponse>()
|
|
|
|
// Test that cancellation properly cleans up all continuations
|
|
await #expect(throws: CancellationError.self) {
|
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
|
// Start multiple consumers for different requestIds
|
|
group.addTask {
|
|
_ = try await pool.next(for: "req1")
|
|
}
|
|
|
|
group.addTask {
|
|
_ = try await pool.next(for: "req2")
|
|
}
|
|
|
|
group.addTask {
|
|
_ = try await pool.next(for: "req3")
|
|
}
|
|
|
|
// Give tasks time to start waiting then cancel all
|
|
try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds
|
|
group.cancelAll()
|
|
|
|
try await group.waitForAll()
|
|
}
|
|
}
|
|
|
|
// Pool should be back to clean state - verify by pushing and consuming normally
|
|
pool.push(LambdaHTTPServer.LocalServerResponse(id: "new-req", body: ByteBuffer(string: "new-data")))
|
|
let response = try await pool.next(for: "new-req")
|
|
#expect(String(buffer: response.body!) == "new-data")
|
|
}
|
|
|
|
@Test
|
|
@available(LambdaSwift 2.0, *)
|
|
func testBufferOrderingWithRequestIds() async throws {
|
|
let pool = LambdaHTTPServer.Pool<LambdaHTTPServer.LocalServerResponse>()
|
|
|
|
// Push multiple responses for the same requestId
|
|
pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "first")))
|
|
pool.push(LambdaHTTPServer.LocalServerResponse(id: "req2", body: ByteBuffer(string: "other")))
|
|
pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "second")))
|
|
pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "third")))
|
|
|
|
// Consume in order - should get FIFO order for the same requestId
|
|
let first = try await pool.next(for: "req1")
|
|
#expect(String(buffer: first.body!) == "first")
|
|
|
|
let second = try await pool.next(for: "req1")
|
|
#expect(String(buffer: second.body!) == "second")
|
|
|
|
let other = try await pool.next(for: "req2")
|
|
#expect(String(buffer: other.body!) == "other")
|
|
|
|
let third = try await pool.next(for: "req1")
|
|
#expect(String(buffer: third.body!) == "third")
|
|
}
|
|
#endif //swift >= 6.1
|
|
|
|
}
|
|
#endif // trait
|