mirror of
https://github.com/swift-server/swift-aws-lambda-runtime.git
synced 2026-05-03 07:22:27 +00:00
performance tests (#16)
motivation: benchmark for comparison of warm/cold runs changes: * refactor configuration * add mock server that can be used by perf tests * add simple perf test script * change redundant classes to structs, make remaining classes final * make offloading opt-in * safer locking * fix format
This commit is contained in:
committed by
GitHub Enterprise
parent
5c724da6b3
commit
c3d4ad5a39
@@ -8,5 +8,6 @@
|
||||
--patternlet inline
|
||||
--stripunusedargs unnamed-only
|
||||
--comments ignore
|
||||
--ifdef no-indent
|
||||
|
||||
# rules
|
||||
|
||||
+10
-12
@@ -2,26 +2,24 @@
|
||||
|
||||
import PackageDescription
|
||||
|
||||
var targets: [PackageDescription.Target] = [
|
||||
.target(name: "SwiftAwsLambda", dependencies: ["Logging", "Backtrace", "NIOHTTP1"]),
|
||||
.target(name: "SwiftAwsLambdaSample", dependencies: ["SwiftAwsLambda"]),
|
||||
.target(name: "SwiftAwsLambdaStringSample", dependencies: ["SwiftAwsLambda"]),
|
||||
.target(name: "SwiftAwsLambdaCodableSample", dependencies: ["SwiftAwsLambda"]),
|
||||
.testTarget(name: "SwiftAwsLambdaTests", dependencies: ["SwiftAwsLambda"]),
|
||||
]
|
||||
|
||||
let package = Package(
|
||||
name: "swift-aws-lambda",
|
||||
products: [
|
||||
.library(name: "SwiftAwsLambda", targets: ["SwiftAwsLambda"]),
|
||||
.executable(name: "SwiftAwsLambdaSample", targets: ["SwiftAwsLambdaSample"]),
|
||||
.executable(name: "SwiftAwsLambdaStringSample", targets: ["SwiftAwsLambdaStringSample"]),
|
||||
.executable(name: "SwiftAwsLambdaCodableSample", targets: ["SwiftAwsLambdaCodableSample"]),
|
||||
],
|
||||
dependencies: [
|
||||
.package(url: "https://github.com/apple/swift-nio.git", from: "2.8.0"),
|
||||
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
|
||||
.package(url: "https://github.com/ianpartridge/swift-backtrace.git", from: "1.1.0"),
|
||||
],
|
||||
targets: targets
|
||||
targets: [
|
||||
.target(name: "SwiftAwsLambda", dependencies: ["Logging", "Backtrace", "NIOHTTP1"]),
|
||||
.testTarget(name: "SwiftAwsLambdaTests", dependencies: ["SwiftAwsLambda"]),
|
||||
// samples
|
||||
.target(name: "SwiftAwsLambdaSample", dependencies: ["SwiftAwsLambda"]),
|
||||
.target(name: "SwiftAwsLambdaStringSample", dependencies: ["SwiftAwsLambda"]),
|
||||
.target(name: "SwiftAwsLambdaCodableSample", dependencies: ["SwiftAwsLambda"]),
|
||||
// perf tests
|
||||
.target(name: "MockServer", dependencies: ["Logging", "NIOHTTP1"]),
|
||||
]
|
||||
)
|
||||
|
||||
@@ -0,0 +1,179 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the SwiftAwsLambda open source project
|
||||
//
|
||||
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAwsLambda project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of SwiftAwsLambda project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import Foundation
|
||||
import Logging
|
||||
import NIO
|
||||
import NIOHTTP1
|
||||
|
||||
internal struct MockServer {
|
||||
private let logger: Logger
|
||||
private let group: EventLoopGroup
|
||||
private let host: String
|
||||
private let port: Int
|
||||
private let mode: Mode
|
||||
private let keepAlive: Bool
|
||||
|
||||
public init() {
|
||||
var logger = Logger(label: "MockServer")
|
||||
logger.logLevel = env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info
|
||||
self.logger = logger
|
||||
self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
|
||||
self.host = env("HOST") ?? "127.0.0.1"
|
||||
self.port = env("PORT").flatMap(Int.init) ?? 7000
|
||||
self.mode = env("MODE").flatMap(Mode.init) ?? .string
|
||||
self.keepAlive = env("KEEP_ALIVE").flatMap(Bool.init) ?? true
|
||||
}
|
||||
|
||||
func start() throws {
|
||||
let bootstrap = ServerBootstrap(group: group)
|
||||
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
|
||||
.childChannelInitializer { channel in
|
||||
channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap { _ in
|
||||
channel.pipeline.addHandler(HTTPHandler(logger: self.logger,
|
||||
keepAlive: self.keepAlive,
|
||||
mode: self.mode))
|
||||
}
|
||||
}
|
||||
try bootstrap.bind(host: self.host, port: self.port).flatMap { channel -> EventLoopFuture<Void> in
|
||||
guard let localAddress = channel.localAddress else {
|
||||
return channel.eventLoop.makeFailedFuture(ServerError.cantBind)
|
||||
}
|
||||
self.logger.info("\(self) started and listening on \(localAddress)")
|
||||
return channel.eventLoop.makeSucceededFuture(())
|
||||
}.wait()
|
||||
}
|
||||
}
|
||||
|
||||
internal final class HTTPHandler: ChannelInboundHandler {
|
||||
public typealias InboundIn = HTTPServerRequestPart
|
||||
public typealias OutboundOut = HTTPServerResponsePart
|
||||
|
||||
private let logger: Logger
|
||||
private let mode: Mode
|
||||
private let keepAlive: Bool
|
||||
|
||||
private var requestHead: HTTPRequestHead!
|
||||
private var requestBody: ByteBuffer?
|
||||
|
||||
public init(logger: Logger, keepAlive: Bool, mode: Mode) {
|
||||
self.logger = logger
|
||||
self.mode = mode
|
||||
self.keepAlive = keepAlive
|
||||
}
|
||||
|
||||
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
let requestPart = unwrapInboundIn(data)
|
||||
|
||||
switch requestPart {
|
||||
case .head(let head):
|
||||
self.requestHead = head
|
||||
self.requestBody?.clear()
|
||||
case .body(var buffer):
|
||||
if self.requestBody == nil {
|
||||
self.requestBody = buffer
|
||||
} else {
|
||||
self.requestBody!.writeBuffer(&buffer)
|
||||
}
|
||||
case .end:
|
||||
self.processRequest(context: context)
|
||||
}
|
||||
}
|
||||
|
||||
func processRequest(context: ChannelHandlerContext) {
|
||||
self.logger.debug("\(self) processing \(self.requestHead.uri)")
|
||||
|
||||
var responseStatus: HTTPResponseStatus
|
||||
var responseBody: String?
|
||||
var responseHeaders: [(String, String)]?
|
||||
|
||||
if self.requestHead.uri.hasSuffix("/next") {
|
||||
let requestId = UUID().uuidString
|
||||
responseStatus = .ok
|
||||
switch self.mode {
|
||||
case .string:
|
||||
responseBody = requestId
|
||||
case .json:
|
||||
responseBody = "{ \"body\": \"\(requestId)\" }"
|
||||
}
|
||||
responseHeaders = [(AmazonHeaders.requestID, requestId)]
|
||||
} else if self.requestHead.uri.hasSuffix("/response") {
|
||||
responseStatus = .accepted
|
||||
} else {
|
||||
responseStatus = .notFound
|
||||
}
|
||||
self.writeResponse(context: context, status: responseStatus, headers: responseHeaders, body: responseBody)
|
||||
}
|
||||
|
||||
func writeResponse(context: ChannelHandlerContext, status: HTTPResponseStatus, headers: [(String, String)]? = nil, body: String? = nil) {
|
||||
var headers = HTTPHeaders(headers ?? [])
|
||||
headers.add(name: "Content-Length", value: "\(body?.utf8.count ?? 0)")
|
||||
headers.add(name: "Connection", value: self.keepAlive ? "keep-alive" : "close")
|
||||
let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: status, headers: headers)
|
||||
|
||||
context.write(wrapOutboundOut(.head(head))).whenFailure { error in
|
||||
self.logger.error("\(self) write error \(error)")
|
||||
}
|
||||
|
||||
if let b = body {
|
||||
var buffer = context.channel.allocator.buffer(capacity: b.utf8.count)
|
||||
buffer.writeString(b)
|
||||
context.write(wrapOutboundOut(.body(.byteBuffer(buffer)))).whenFailure { error in
|
||||
self.logger.error("\(self) write error \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
context.writeAndFlush(wrapOutboundOut(.end(nil))).whenComplete { result in
|
||||
if case .failure(let error) = result {
|
||||
self.logger.error("\(self) write error \(error)")
|
||||
}
|
||||
if !self.self.keepAlive {
|
||||
context.close().whenFailure { error in
|
||||
self.logger.error("\(self) close error \(error)")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal enum ServerError: Error {
|
||||
case notReady
|
||||
case cantBind
|
||||
}
|
||||
|
||||
internal enum AmazonHeaders {
|
||||
static let requestID = "Lambda-Runtime-Aws-Request-Id"
|
||||
static let traceID = "Lambda-Runtime-Trace-Id"
|
||||
static let clientContext = "X-Amz-Client-Context"
|
||||
static let cognitoIdentity = "X-Amz-Cognito-Identity"
|
||||
static let deadline = "Lambda-Runtime-Deadline-Ms"
|
||||
static let invokedFunctionARN = "Lambda-Runtime-Invoked-Function-Arn"
|
||||
}
|
||||
|
||||
internal enum Mode: String {
|
||||
case string
|
||||
case json
|
||||
}
|
||||
|
||||
func env(_ name: String) -> String? {
|
||||
guard let value = getenv(name) else {
|
||||
return nil
|
||||
}
|
||||
return String(utf8String: value)
|
||||
}
|
||||
|
||||
// main
|
||||
let server = MockServer()
|
||||
try! server.start()
|
||||
dispatchMain()
|
||||
@@ -12,7 +12,6 @@
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import Foundation
|
||||
import NIO
|
||||
import NIOConcurrencyHelpers
|
||||
import NIOHTTP1
|
||||
@@ -20,45 +19,40 @@ import NIOHTTP1
|
||||
/// A barebone HTTP client to interact with AWS Runtime Engine which is an HTTP server.
|
||||
internal class HTTPClient {
|
||||
private let eventLoop: EventLoop
|
||||
private let config: Lambda.Config.RuntimeEngine
|
||||
private let configuration: Lambda.Configuration.RuntimeEngine
|
||||
|
||||
private var _state = State.disconnected
|
||||
private var state = State.disconnected
|
||||
private let lock = Lock()
|
||||
|
||||
init(eventLoop: EventLoop, config: Lambda.Config.RuntimeEngine) {
|
||||
init(eventLoop: EventLoop, configuration: Lambda.Configuration.RuntimeEngine) {
|
||||
self.eventLoop = eventLoop
|
||||
self.config = config
|
||||
}
|
||||
|
||||
private var state: State {
|
||||
get {
|
||||
return self.lock.withLock {
|
||||
self._state
|
||||
}
|
||||
}
|
||||
set {
|
||||
self.lock.withLockVoid {
|
||||
self._state = newValue
|
||||
}
|
||||
}
|
||||
self.configuration = configuration
|
||||
}
|
||||
|
||||
func get(url: String, timeout: TimeAmount? = nil) -> EventLoopFuture<Response> {
|
||||
return self.execute(Request(url: self.config.baseURL.appendingPathComponent(url), method: .GET, timeout: timeout ?? self.config.requestTimeout))
|
||||
return self.execute(Request(url: self.configuration.baseURL.appendingPathComponent(url),
|
||||
method: .GET,
|
||||
timeout: timeout ?? self.configuration.requestTimeout))
|
||||
}
|
||||
|
||||
func post(url: String, body: ByteBuffer, timeout: TimeAmount? = nil) -> EventLoopFuture<Response> {
|
||||
return self.execute(Request(url: self.config.baseURL.appendingPathComponent(url), method: .POST, body: body, timeout: timeout ?? self.config.requestTimeout))
|
||||
return self.execute(Request(url: self.configuration.baseURL.appendingPathComponent(url),
|
||||
method: .POST,
|
||||
body: body,
|
||||
timeout: timeout ?? self.configuration.requestTimeout))
|
||||
}
|
||||
|
||||
private func execute(_ request: Request) -> EventLoopFuture<Response> {
|
||||
self.lock.lock()
|
||||
switch self.state {
|
||||
case .connected(let channel):
|
||||
guard channel.isActive else {
|
||||
// attempt to reconnect
|
||||
self.state = .disconnected
|
||||
self.lock.unlock()
|
||||
return self.execute(request)
|
||||
}
|
||||
self.lock.unlock()
|
||||
let promise = channel.eventLoop.makePromise(of: Response.self)
|
||||
let wrapper = HTTPRequestWrapper(request: request, promise: promise)
|
||||
return channel.writeAndFlush(wrapper).flatMap {
|
||||
@@ -66,7 +60,8 @@ internal class HTTPClient {
|
||||
}
|
||||
case .disconnected:
|
||||
return self.connect().flatMap {
|
||||
self.execute(request)
|
||||
self.lock.unlock()
|
||||
return self.execute(request)
|
||||
}
|
||||
default:
|
||||
preconditionFailure("invalid state \(self.state)")
|
||||
@@ -81,11 +76,11 @@ internal class HTTPClient {
|
||||
let bootstrap = ClientBootstrap(group: eventLoop)
|
||||
.channelInitializer { channel in
|
||||
channel.pipeline.addHTTPClientHandlers().flatMap {
|
||||
channel.pipeline.addHandlers([HTTPHandler(keepAlive: self.config.keepAlive),
|
||||
UnaryHandler(keepAlive: self.config.keepAlive)])
|
||||
channel.pipeline.addHandlers([HTTPHandler(keepAlive: self.configuration.keepAlive),
|
||||
UnaryHandler(keepAlive: self.configuration.keepAlive)])
|
||||
}
|
||||
}
|
||||
return bootstrap.connect(host: self.config.baseURL.host, port: self.config.baseURL.port).flatMapThrowing { channel in
|
||||
return bootstrap.connect(host: self.configuration.baseURL.host, port: self.configuration.baseURL.port).flatMapThrowing { channel in
|
||||
self.state = .connected(channel)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import Foundation
|
||||
import Foundation // for JSON
|
||||
|
||||
/// Extension to the `Lambda` companion to enable execution of Lambdas that take and return `Codable` payloads.
|
||||
/// This is the most common way to use this library in AWS Lambda, since its JSON based.
|
||||
@@ -32,13 +32,13 @@ extension Lambda {
|
||||
}
|
||||
|
||||
// for testing
|
||||
internal static func run<In: Decodable, Out: Encodable>(maxTimes: Int = 0, closure: @escaping LambdaCodableClosure<In, Out>) -> LambdaLifecycleResult {
|
||||
return self.run(handler: LambdaClosureWrapper(closure), maxTimes: maxTimes)
|
||||
internal static func run<In: Decodable, Out: Encodable>(configuration: Configuration = .init(), closure: @escaping LambdaCodableClosure<In, Out>) -> LambdaLifecycleResult {
|
||||
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
|
||||
}
|
||||
|
||||
// for testing
|
||||
internal static func run<Handler>(handler: Handler, maxTimes: Int = 0) -> LambdaLifecycleResult where Handler: LambdaCodableHandler {
|
||||
return self.run(handler: handler as LambdaHandler, maxTimes: maxTimes)
|
||||
internal static func run<Handler>(handler: Handler, configuration: Configuration = .init()) -> LambdaLifecycleResult where Handler: LambdaCodableHandler {
|
||||
return self.run(handler: handler as LambdaHandler, configuration: configuration)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,9 +104,10 @@ public extension LambdaCodableHandler {
|
||||
/// LambdaCodableJsonCodec is an implementation of `LambdaCodableCodec` which does `Encodable` -> `[UInt8]` encoding and `[UInt8]` -> `Decodable' decoding
|
||||
/// using JSONEncoder and JSONDecoder respectively.
|
||||
// This is a class as encoder amd decoder are a class, which means its cheaper to hold a reference to both in a class then a struct.
|
||||
private class LambdaCodableJsonCodec<In: Decodable, Out: Encodable>: LambdaCodableCodec<In, Out> {
|
||||
private final class LambdaCodableJsonCodec<In: Decodable, Out: Encodable>: LambdaCodableCodec<In, Out> {
|
||||
private let encoder = JSONEncoder()
|
||||
private let decoder = JSONDecoder()
|
||||
|
||||
public override func encode(_ value: Out) -> Result<[UInt8], Error> {
|
||||
do {
|
||||
return .success(try [UInt8](self.encoder.encode(value)))
|
||||
|
||||
@@ -29,13 +29,13 @@ extension Lambda {
|
||||
}
|
||||
|
||||
// for testing
|
||||
internal static func run(maxTimes: Int = 0, _ closure: @escaping LambdaStringClosure) -> LambdaLifecycleResult {
|
||||
return self.run(handler: LambdaClosureWrapper(closure), maxTimes: maxTimes)
|
||||
internal static func run(configuration: Configuration = .init(), _ closure: @escaping LambdaStringClosure) -> LambdaLifecycleResult {
|
||||
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
|
||||
}
|
||||
|
||||
// for testing
|
||||
internal static func run(handler: LambdaStringHandler, maxTimes: Int = 0) -> LambdaLifecycleResult {
|
||||
return self.run(handler: handler as LambdaHandler, maxTimes: maxTimes)
|
||||
internal static func run(handler: LambdaStringHandler, configuration: Configuration = .init()) -> LambdaLifecycleResult {
|
||||
return self.run(handler: handler as LambdaHandler, configuration: configuration)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,13 +13,13 @@
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
#if os(Linux)
|
||||
import Glibc
|
||||
import Glibc
|
||||
#else
|
||||
import Darwin.C
|
||||
import Darwin.C
|
||||
#endif
|
||||
|
||||
import Backtrace
|
||||
import Foundation
|
||||
import Foundation // for URL
|
||||
import Logging
|
||||
import NIO
|
||||
import NIOConcurrencyHelpers
|
||||
@@ -28,6 +28,7 @@ public enum Lambda {
|
||||
/// Run a Lambda defined by implementing the `LambdaClosure` closure.
|
||||
///
|
||||
/// - note: This is a blocking operation that will run forever, as it's lifecycle is managed by the AWS Lambda Runtime Engine.
|
||||
@inlinable
|
||||
public static func run(_ closure: @escaping LambdaClosure) {
|
||||
self.run(closure: closure)
|
||||
}
|
||||
@@ -35,35 +36,38 @@ public enum Lambda {
|
||||
/// Run a Lambda defined by implementing the `LambdaHandler` protocol.
|
||||
///
|
||||
/// - note: This is a blocking operation that will run forever, as it's lifecycle is managed by the AWS Lambda Runtime Engine.
|
||||
@inlinable
|
||||
public static func run(_ handler: LambdaHandler) {
|
||||
self.run(handler: handler)
|
||||
}
|
||||
|
||||
// for testing and internal use
|
||||
@usableFromInline
|
||||
@discardableResult
|
||||
internal static func run(maxTimes: Int = 0, stopSignal: Signal = .TERM, closure: @escaping LambdaClosure) -> LambdaLifecycleResult {
|
||||
return self.run(handler: LambdaClosureWrapper(closure), maxTimes: maxTimes, stopSignal: stopSignal)
|
||||
internal static func run(configuration: Configuration = .init(), closure: @escaping LambdaClosure) -> LambdaLifecycleResult {
|
||||
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
|
||||
}
|
||||
|
||||
// for testing and internal use
|
||||
@usableFromInline
|
||||
@discardableResult
|
||||
internal static func run(handler: LambdaHandler, maxTimes: Int = 0, stopSignal: Signal = .TERM) -> LambdaLifecycleResult {
|
||||
internal static func run(handler: LambdaHandler, configuration: Configuration = .init()) -> LambdaLifecycleResult {
|
||||
do {
|
||||
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
|
||||
defer { try! eventLoopGroup.syncShutdownGracefully() }
|
||||
let result = try self.runAsync(eventLoopGroup: eventLoopGroup, handler: handler, maxTimes: maxTimes, stopSignal: stopSignal).wait()
|
||||
let result = try self.runAsync(eventLoopGroup: eventLoopGroup, handler: handler, configuration: configuration).wait()
|
||||
return .success(result)
|
||||
} catch {
|
||||
return .failure(error)
|
||||
}
|
||||
}
|
||||
|
||||
internal static func runAsync(eventLoopGroup: EventLoopGroup, handler: LambdaHandler, maxTimes: Int = 0, stopSignal: Signal = .TERM) -> EventLoopFuture<Int> {
|
||||
internal static func runAsync(eventLoopGroup: EventLoopGroup, handler: LambdaHandler, configuration: Configuration) -> EventLoopFuture<Int> {
|
||||
Backtrace.install()
|
||||
let logger = Logger(label: "Lambda")
|
||||
let config = Config(lifecycle: .init(maxTimes: maxTimes))
|
||||
let lifecycle = Lifecycle(eventLoop: eventLoopGroup.next(), logger: logger, config: config, handler: handler)
|
||||
let signalSource = trap(signal: stopSignal) { signal in
|
||||
var logger = Logger(label: "Lambda")
|
||||
logger.logLevel = configuration.general.logLevel
|
||||
let lifecycle = Lifecycle(eventLoop: eventLoopGroup.next(), logger: logger, configuration: configuration, handler: handler)
|
||||
let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in
|
||||
logger.info("intercepted signal: \(signal)")
|
||||
lifecycle.stop()
|
||||
}
|
||||
@@ -73,19 +77,19 @@ public enum Lambda {
|
||||
}
|
||||
}
|
||||
|
||||
private class Lifecycle {
|
||||
private final class Lifecycle {
|
||||
private let eventLoop: EventLoop
|
||||
private let logger: Logger
|
||||
private let config: Lambda.Config
|
||||
private let configuration: Configuration
|
||||
private let handler: LambdaHandler
|
||||
|
||||
private var _state = LifecycleState.idle
|
||||
private let stateLock = Lock()
|
||||
|
||||
init(eventLoop: EventLoop, logger: Logger, config: Lambda.Config, handler: LambdaHandler) {
|
||||
init(eventLoop: EventLoop, logger: Logger, configuration: Configuration, handler: LambdaHandler) {
|
||||
self.eventLoop = eventLoop
|
||||
self.logger = logger
|
||||
self.config = config
|
||||
self.configuration = configuration
|
||||
self.handler = handler
|
||||
}
|
||||
|
||||
@@ -108,11 +112,11 @@ public enum Lambda {
|
||||
}
|
||||
|
||||
func start() -> EventLoopFuture<Int> {
|
||||
logger.info("lambda lifecycle starting with \(self.config)")
|
||||
logger.info("lambda lifecycle starting with \(self.configuration)")
|
||||
self.state = .initializing
|
||||
var logger = self.logger
|
||||
logger[metadataKey: "lifecycleId"] = .string(self.config.lifecycle.id)
|
||||
let runner = LambdaRunner(eventLoop: self.eventLoop, config: self.config, lambdaHandler: self.handler)
|
||||
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
|
||||
let runner = LambdaRunner(eventLoop: self.eventLoop, configuration: self.configuration, lambdaHandler: self.handler)
|
||||
return runner.initialize(logger: logger).flatMap { _ in
|
||||
self.state = .active
|
||||
return self.run(logger: logger, runner: runner, count: 0)
|
||||
@@ -132,7 +136,7 @@ public enum Lambda {
|
||||
private func run(logger: Logger, runner: LambdaRunner, count: Int) -> EventLoopFuture<Int> {
|
||||
switch self.state {
|
||||
case .active:
|
||||
if self.config.lifecycle.maxTimes > 0, count >= self.config.lifecycle.maxTimes {
|
||||
if self.configuration.lifecycle.maxTimes > 0, count >= self.configuration.lifecycle.maxTimes {
|
||||
return self.eventLoop.makeSucceededFuture(count)
|
||||
}
|
||||
var logger = logger
|
||||
@@ -149,31 +153,49 @@ public enum Lambda {
|
||||
}
|
||||
}
|
||||
|
||||
internal struct Config: CustomStringConvertible {
|
||||
@usableFromInline
|
||||
internal struct Configuration: CustomStringConvertible {
|
||||
let general: General
|
||||
let lifecycle: Lifecycle
|
||||
let runtimeEngine: RuntimeEngine
|
||||
|
||||
var description: String {
|
||||
return "\(Config.self):\n \(self.lifecycle)\n \(self.runtimeEngine)"
|
||||
@usableFromInline
|
||||
init() {
|
||||
self.init(general: .init(), lifecycle: .init(), runtimeEngine: .init())
|
||||
}
|
||||
|
||||
init(lifecycle: Lifecycle = .init(), runtimeEngine: RuntimeEngine = .init()) {
|
||||
self.lifecycle = lifecycle
|
||||
self.runtimeEngine = runtimeEngine
|
||||
init(general: General? = nil, lifecycle: Lifecycle? = nil, runtimeEngine: RuntimeEngine? = nil) {
|
||||
self.general = general ?? General()
|
||||
self.lifecycle = lifecycle ?? Lifecycle()
|
||||
self.runtimeEngine = runtimeEngine ?? RuntimeEngine()
|
||||
}
|
||||
|
||||
struct General: CustomStringConvertible {
|
||||
let logLevel: Logger.Level
|
||||
|
||||
init(logLevel: Logger.Level? = nil) {
|
||||
self.logLevel = logLevel ?? env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info
|
||||
}
|
||||
|
||||
var description: String {
|
||||
return "\(General.self)(logLevel: \(self.logLevel))"
|
||||
}
|
||||
}
|
||||
|
||||
struct Lifecycle: CustomStringConvertible {
|
||||
let id: String
|
||||
let maxTimes: Int
|
||||
let stopSignal: Signal
|
||||
|
||||
init(id: String? = nil, maxTimes: Int? = nil) {
|
||||
self.id = id ?? NSUUID().uuidString
|
||||
self.maxTimes = maxTimes ?? 0
|
||||
init(id: String? = nil, maxTimes: Int? = nil, stopSignal: Signal? = nil) {
|
||||
self.id = id ?? UUID().uuidString
|
||||
self.maxTimes = maxTimes ?? env("MAX_REQUESTS").flatMap(Int.init) ?? 0
|
||||
self.stopSignal = stopSignal ?? env("STOP_SIGNAL").flatMap(Int32.init).flatMap(Signal.init) ?? Signal.TERM
|
||||
precondition(self.maxTimes >= 0, "maxTimes must be equal or larger than 0")
|
||||
}
|
||||
|
||||
var description: String {
|
||||
return "\(Lifecycle.self)(id: \(self.id), maxTimes: \(self.maxTimes))"
|
||||
return "\(Lifecycle.self)(id: \(self.id), maxTimes: \(self.maxTimes), stopSignal: \(self.stopSignal))"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,17 +203,24 @@ public enum Lambda {
|
||||
let baseURL: HTTPURL
|
||||
let keepAlive: Bool
|
||||
let requestTimeout: TimeAmount?
|
||||
let offload: Bool
|
||||
|
||||
init(baseURL: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) {
|
||||
self.baseURL = HTTPURL(baseURL ?? Environment.string(Consts.hostPortEnvVariableName).flatMap { "http://\($0)" } ?? "http://\(Defaults.host):\(Defaults.port)")
|
||||
self.keepAlive = keepAlive ?? true
|
||||
self.requestTimeout = requestTimeout ?? Environment.int(Consts.requestTimeoutEnvVariableName).flatMap { .milliseconds(Int64($0)) }
|
||||
init(baseURL: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil, offload: Bool? = nil) {
|
||||
self.baseURL = HTTPURL(baseURL ?? "http://\(env("AWS_LAMBDA_RUNTIME_API") ?? "127.0.0.1:7000")")
|
||||
self.keepAlive = keepAlive ?? env("KEEP_ALIVE").flatMap(Bool.init) ?? true
|
||||
self.requestTimeout = requestTimeout ?? env("REQUEST_TIMEOUT").flatMap(Int64.init).flatMap { .milliseconds($0) }
|
||||
self.offload = offload ?? env("OFFLOAD").flatMap(Bool.init) ?? false
|
||||
}
|
||||
|
||||
var description: String {
|
||||
return "\(RuntimeEngine.self)(baseURL: \(self.baseURL), keepAlive: \(self.keepAlive), requestTimeout: \(String(describing: self.requestTimeout)))"
|
||||
return "\(RuntimeEngine.self)(baseURL: \(self.baseURL), keepAlive: \(self.keepAlive), requestTimeout: \(String(describing: self.requestTimeout)), offload: \(self.offload)"
|
||||
}
|
||||
}
|
||||
|
||||
@usableFromInline
|
||||
var description: String {
|
||||
return "\(Configuration.self)\n \(self.general))\n \(self.lifecycle)\n \(self.runtimeEngine)"
|
||||
}
|
||||
}
|
||||
|
||||
internal struct HTTPURL: Equatable, CustomStringConvertible {
|
||||
@@ -308,6 +337,7 @@ public struct LambdaContext {
|
||||
}
|
||||
}
|
||||
|
||||
@usableFromInline
|
||||
internal typealias LambdaLifecycleResult = Result<Int, Error>
|
||||
|
||||
private struct LambdaClosureWrapper: LambdaHandler {
|
||||
|
||||
@@ -12,22 +12,24 @@
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import Foundation
|
||||
import Dispatch // for offloading
|
||||
import Logging
|
||||
import NIO
|
||||
|
||||
/// LambdaRunner manages the Lambda runtime workflow, or business logic.
|
||||
internal final class LambdaRunner {
|
||||
internal struct LambdaRunner {
|
||||
private let runtimeClient: LambdaRuntimeClient
|
||||
private let lambdaHandler: LambdaHandler
|
||||
private let eventLoop: EventLoop
|
||||
private let lifecycleId: String
|
||||
private let offload: Bool
|
||||
|
||||
init(eventLoop: EventLoop, config: Lambda.Config, lambdaHandler: LambdaHandler) {
|
||||
init(eventLoop: EventLoop, configuration: Lambda.Configuration, lambdaHandler: LambdaHandler) {
|
||||
self.eventLoop = eventLoop
|
||||
self.runtimeClient = LambdaRuntimeClient(eventLoop: self.eventLoop, config: config.runtimeEngine)
|
||||
self.runtimeClient = LambdaRuntimeClient(eventLoop: self.eventLoop, configuration: configuration.runtimeEngine)
|
||||
self.lambdaHandler = lambdaHandler
|
||||
self.lifecycleId = config.lifecycle.id
|
||||
self.lifecycleId = configuration.lifecycle.id
|
||||
self.offload = configuration.runtimeEngine.offload
|
||||
}
|
||||
|
||||
/// Run the user provided initializer. This *must* only be called once.
|
||||
@@ -36,7 +38,9 @@ internal final class LambdaRunner {
|
||||
func initialize(logger: Logger) -> EventLoopFuture<Void> {
|
||||
logger.info("initializing lambda")
|
||||
// We need to use `flatMap` instead of `whenFailure` to ensure we complete reporting the result before stopping.
|
||||
return self.lambdaHandler.initialize(eventLoop: self.eventLoop, lifecycleId: self.lifecycleId).peekError { error in
|
||||
return self.lambdaHandler.initialize(eventLoop: self.eventLoop,
|
||||
lifecycleId: self.lifecycleId,
|
||||
offload: self.offload).peekError { error in
|
||||
self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in
|
||||
// We're going to bail out because the init failed, so there's not a lot we can do other than log
|
||||
// that we couldn't report this error back to the runtime.
|
||||
@@ -46,14 +50,18 @@ internal final class LambdaRunner {
|
||||
}
|
||||
|
||||
func run(logger: Logger) -> EventLoopFuture<Void> {
|
||||
logger.info("lambda invocation sequence starting")
|
||||
logger.debug("lambda invocation sequence starting")
|
||||
// 1. request work from lambda runtime engine
|
||||
return self.runtimeClient.requestWork(logger: logger).peekError { error in
|
||||
logger.error("could not fetch work from lambda runtime engine: \(error)")
|
||||
}.flatMap { context, payload in
|
||||
// 2. send work to handler
|
||||
logger.info("sending work to lambda handler \(self.lambdaHandler)")
|
||||
return self.lambdaHandler.handle(eventLoop: self.eventLoop, lifecycleId: self.lifecycleId, context: context, payload: payload).map { (context, $0) }
|
||||
logger.debug("sending work to lambda handler \(self.lambdaHandler)")
|
||||
return self.lambdaHandler.handle(eventLoop: self.eventLoop,
|
||||
lifecycleId: self.lifecycleId,
|
||||
offload: self.offload,
|
||||
context: context,
|
||||
payload: payload).map { (context, $0) }
|
||||
}.flatMap { context, result in
|
||||
// 3. report results to runtime engine
|
||||
self.runtimeClient.reportResults(logger: logger, context: context, result: result).peekError { error in
|
||||
@@ -61,25 +69,35 @@ internal final class LambdaRunner {
|
||||
}
|
||||
}.always { result in
|
||||
// we are done!
|
||||
logger.info("lambda invocation sequence completed \(result.successful ? "successfully" : "with failure")")
|
||||
logger.log(level: result.successful ? .info : .warning, "lambda invocation sequence completed \(result.successful ? "successfully" : "with failure")")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private extension LambdaHandler {
|
||||
func initialize(eventLoop: EventLoop, lifecycleId: String) -> EventLoopFuture<Void> {
|
||||
func initialize(eventLoop: EventLoop, lifecycleId: String, offload: Bool) -> EventLoopFuture<Void> {
|
||||
// offloading so user code never blocks the eventloop
|
||||
let promise = eventLoop.makePromise(of: Void.self)
|
||||
DispatchQueue(label: "lambda-\(lifecycleId)").async {
|
||||
if offload {
|
||||
DispatchQueue(label: "lambda-\(lifecycleId)").async {
|
||||
self.initialize { promise.completeWith($0) }
|
||||
}
|
||||
} else {
|
||||
self.initialize { promise.completeWith($0) }
|
||||
}
|
||||
return promise.futureResult
|
||||
}
|
||||
|
||||
func handle(eventLoop: EventLoop, lifecycleId: String, context: LambdaContext, payload: [UInt8]) -> EventLoopFuture<LambdaResult> {
|
||||
func handle(eventLoop: EventLoop, lifecycleId: String, offload: Bool, context: LambdaContext, payload: [UInt8]) -> EventLoopFuture<LambdaResult> {
|
||||
// offloading so user code never blocks the eventloop
|
||||
let promise = eventLoop.makePromise(of: LambdaResult.self)
|
||||
DispatchQueue(label: "lambda-\(lifecycleId)").async {
|
||||
if offload {
|
||||
DispatchQueue(label: "lambda-\(lifecycleId)").async {
|
||||
self.handle(context: context, payload: payload) { result in
|
||||
promise.succeed(result)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.handle(context: context, payload: payload) { result in
|
||||
promise.succeed(result)
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import Foundation
|
||||
import Foundation // for JSON
|
||||
import Logging
|
||||
import NIO
|
||||
import NIOHTTP1
|
||||
@@ -22,20 +22,20 @@ import NIOHTTP1
|
||||
/// * /runtime/invocation/response
|
||||
/// * /runtime/invocation/error
|
||||
/// * /runtime/init/error
|
||||
internal class LambdaRuntimeClient {
|
||||
internal struct LambdaRuntimeClient {
|
||||
private let eventLoop: EventLoop
|
||||
private let allocator = ByteBufferAllocator()
|
||||
private let httpClient: HTTPClient
|
||||
|
||||
init(eventLoop: EventLoop, config: Lambda.Config.RuntimeEngine) {
|
||||
init(eventLoop: EventLoop, configuration: Lambda.Configuration.RuntimeEngine) {
|
||||
self.eventLoop = eventLoop
|
||||
self.httpClient = HTTPClient(eventLoop: eventLoop, config: config)
|
||||
self.httpClient = HTTPClient(eventLoop: eventLoop, configuration: configuration)
|
||||
}
|
||||
|
||||
/// Requests work from the Runtime Engine.
|
||||
func requestWork(logger: Logger) -> EventLoopFuture<(LambdaContext, [UInt8])> {
|
||||
let url = Consts.invocationURLPrefix + Consts.requestWorkURLSuffix
|
||||
logger.info("requesting work from lambda runtime engine using \(url)")
|
||||
logger.debug("requesting work from lambda runtime engine using \(url)")
|
||||
return self.httpClient.get(url: url).flatMapThrowing { response in
|
||||
guard response.status == .ok else {
|
||||
throw LambdaRuntimeClientError.badStatusCode(response.status)
|
||||
@@ -80,7 +80,7 @@ internal class LambdaRuntimeClient {
|
||||
body.writeString(json)
|
||||
}
|
||||
}
|
||||
logger.info("reporting results to lambda runtime engine using \(url)")
|
||||
logger.debug("reporting results to lambda runtime engine using \(url)")
|
||||
return self.httpClient.post(url: url, body: body).flatMapThrowing { response in
|
||||
guard response.status == .accepted else {
|
||||
throw LambdaRuntimeClientError.badStatusCode(response.status)
|
||||
@@ -109,7 +109,7 @@ internal class LambdaRuntimeClient {
|
||||
case .success(let json):
|
||||
body = self.allocator.buffer(capacity: json.utf8.count)
|
||||
body.writeString(json)
|
||||
logger.info("reporting initialization error to lambda runtime engine using \(url)")
|
||||
logger.warning("reporting initialization error to lambda runtime engine using \(url)")
|
||||
return self.httpClient.post(url: url, body: body).flatMapThrowing { response in
|
||||
guard response.status == .accepted else {
|
||||
throw LambdaRuntimeClientError.badStatusCode(response.status)
|
||||
|
||||
@@ -15,15 +15,7 @@
|
||||
import Dispatch
|
||||
import NIO
|
||||
|
||||
internal enum Defaults {
|
||||
static let host = "127.0.0.1"
|
||||
static let port = 8080
|
||||
}
|
||||
|
||||
internal enum Consts {
|
||||
static let hostPortEnvVariableName = "AWS_LAMBDA_RUNTIME_API"
|
||||
static let requestTimeoutEnvVariableName = "REQUEST_TIMEOUT"
|
||||
|
||||
private static let apiPrefix = "/2018-06-01"
|
||||
static let invocationURLPrefix = "\(apiPrefix)/runtime/invocation"
|
||||
static let requestWorkURLSuffix = "/next"
|
||||
@@ -43,28 +35,11 @@ internal enum AmazonHeaders {
|
||||
}
|
||||
|
||||
/// Utility to read environment variables
|
||||
internal enum Environment {
|
||||
static func string(name: String, defaultValue: String) -> String {
|
||||
return self.string(name) ?? defaultValue
|
||||
}
|
||||
|
||||
static func string(_ name: String) -> String? {
|
||||
guard let value = getenv(name) else {
|
||||
return nil
|
||||
}
|
||||
return String(validatingUTF8: value)
|
||||
}
|
||||
|
||||
static func int(name: String, defaultValue: Int) -> Int {
|
||||
return self.int(name) ?? defaultValue
|
||||
}
|
||||
|
||||
static func int(_ name: String) -> Int? {
|
||||
guard let value = string(name) else {
|
||||
return nil
|
||||
}
|
||||
return Int(value)
|
||||
internal func env(_ name: String) -> String? {
|
||||
guard let value = getenv(name) else {
|
||||
return nil
|
||||
}
|
||||
return String(utf8String: value)
|
||||
}
|
||||
|
||||
/// Helper function to trap signals
|
||||
@@ -79,6 +54,7 @@ internal func trap(signal sig: Signal, handler: @escaping (Signal) -> Void) -> D
|
||||
return signalSource
|
||||
}
|
||||
|
||||
@usableFromInline
|
||||
internal enum Signal: Int32 {
|
||||
case HUP = 1
|
||||
case INT = 2
|
||||
|
||||
@@ -14,13 +14,16 @@
|
||||
|
||||
import SwiftAwsLambda
|
||||
|
||||
private class Request: Codable {}
|
||||
private class Response: Codable {}
|
||||
private struct Request: Codable {
|
||||
let body: String
|
||||
}
|
||||
|
||||
private struct Response: Codable {
|
||||
let body: String
|
||||
}
|
||||
|
||||
// in this example we are receiving and responding with codables. Request and Response above are examples of how to use
|
||||
// codables to model your reqeuest and response objects
|
||||
Lambda.run { (_, _: Request, callback) in
|
||||
callback(.success(Response()))
|
||||
Lambda.run { (_, request: Request, callback) in
|
||||
callback(.success(Response(body: String(request.body.reversed()))))
|
||||
}
|
||||
|
||||
print("Bye!")
|
||||
|
||||
@@ -19,5 +19,3 @@ Lambda.run { (_, payload: [UInt8], callback) in
|
||||
// as an example, respond with the reverse the input payload
|
||||
callback(.success(payload.reversed()))
|
||||
}
|
||||
|
||||
print("Bye!")
|
||||
|
||||
@@ -19,5 +19,3 @@ Lambda.run { (_, payload: String, callback) in
|
||||
// as an example, respond with the reverse the input payload
|
||||
callback(.success(String(payload.reversed())))
|
||||
}
|
||||
|
||||
print("Bye!")
|
||||
|
||||
@@ -23,13 +23,13 @@ import XCTest
|
||||
///
|
||||
|
||||
#if os(Linux) || os(FreeBSD)
|
||||
@testable import SwiftAwsLambdaTests
|
||||
@testable import SwiftAwsLambdaTests
|
||||
|
||||
XCTMain([
|
||||
testCase(CodableLambdaTest.allTests),
|
||||
testCase(LambdaRunnerTest.allTests),
|
||||
testCase(LambdaRuntimeClientTest.allTests),
|
||||
testCase(LambdaTest.allTests),
|
||||
testCase(StringLambdaTest.allTests),
|
||||
])
|
||||
XCTMain([
|
||||
testCase(CodableLambdaTest.allTests),
|
||||
testCase(LambdaRunnerTest.allTests),
|
||||
testCase(LambdaRuntimeClientTest.allTests),
|
||||
testCase(LambdaTest.allTests),
|
||||
testCase(StringLambdaTest.allTests),
|
||||
])
|
||||
#endif
|
||||
|
||||
@@ -18,8 +18,9 @@ import XCTest
|
||||
class CodableLambdaTest: XCTestCase {
|
||||
func testSuceess() throws {
|
||||
let maxTimes = Int.random(in: 1 ... 10)
|
||||
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
|
||||
let server = try MockLambdaServer(behavior: GoodBehavior()).start().wait()
|
||||
let result = Lambda.run(handler: CodableEchoHandler(), maxTimes: maxTimes)
|
||||
let result = Lambda.run(handler: CodableEchoHandler(), configuration: configuration)
|
||||
try server.stop().wait()
|
||||
assertLambdaLifecycleResult(result: result, shoudHaveRun: maxTimes)
|
||||
}
|
||||
@@ -33,8 +34,9 @@ class CodableLambdaTest: XCTestCase {
|
||||
|
||||
func testClosureSuccess() throws {
|
||||
let maxTimes = Int.random(in: 1 ... 10)
|
||||
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
|
||||
let server = try MockLambdaServer(behavior: GoodBehavior()).start().wait()
|
||||
let result = Lambda.run(maxTimes: maxTimes) { (_, payload: Request, callback) in
|
||||
let result = Lambda.run(configuration: configuration) { (_, payload: Request, callback) in
|
||||
callback(.success(Response(requestId: payload.requestId)))
|
||||
}
|
||||
try server.stop().wait()
|
||||
@@ -68,7 +70,7 @@ private func assertLambdaLifecycleResult(result: LambdaLifecycleResult, shoudHav
|
||||
}
|
||||
|
||||
// TODO: taking advantage of the fact we know the serialization is json
|
||||
private class GoodBehavior: LambdaServerBehavior {
|
||||
private struct GoodBehavior: LambdaServerBehavior {
|
||||
let requestId = NSUUID().uuidString
|
||||
|
||||
func getWork() -> GetWorkResult {
|
||||
@@ -107,7 +109,7 @@ private class GoodBehavior: LambdaServerBehavior {
|
||||
}
|
||||
}
|
||||
|
||||
private class BadBehavior: LambdaServerBehavior {
|
||||
private struct BadBehavior: LambdaServerBehavior {
|
||||
func getWork() -> GetWorkResult {
|
||||
return .failure(.internalServerError)
|
||||
}
|
||||
@@ -125,21 +127,21 @@ private class BadBehavior: LambdaServerBehavior {
|
||||
}
|
||||
}
|
||||
|
||||
private class Request: Codable {
|
||||
private struct Request: Codable {
|
||||
let requestId: String
|
||||
init(requestId: String) {
|
||||
self.requestId = requestId
|
||||
}
|
||||
}
|
||||
|
||||
private class Response: Codable {
|
||||
private struct Response: Codable {
|
||||
let requestId: String
|
||||
init(requestId: String) {
|
||||
self.requestId = requestId
|
||||
}
|
||||
}
|
||||
|
||||
private class CodableEchoHandler: LambdaCodableHandler {
|
||||
private struct CodableEchoHandler: LambdaCodableHandler {
|
||||
func handle(context: LambdaContext, payload: Request, callback: @escaping LambdaCodableCallback<Response>) {
|
||||
callback(.success(Response(requestId: payload.requestId)))
|
||||
}
|
||||
|
||||
@@ -18,8 +18,9 @@ import XCTest
|
||||
class StringLambdaTest: XCTestCase {
|
||||
func testSuceess() throws {
|
||||
let maxTimes = Int.random(in: 1 ... 10)
|
||||
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
|
||||
let server = try MockLambdaServer(behavior: GoodBehavior()).start().wait()
|
||||
let result = Lambda.run(handler: StringEchoHandler(), maxTimes: maxTimes)
|
||||
let result = Lambda.run(handler: StringEchoHandler(), configuration: configuration)
|
||||
try server.stop().wait()
|
||||
assertLambdaLifecycleResult(result: result, shoudHaveRun: maxTimes)
|
||||
}
|
||||
@@ -33,8 +34,9 @@ class StringLambdaTest: XCTestCase {
|
||||
|
||||
func testClosureSuccess() throws {
|
||||
let maxTimes = Int.random(in: 1 ... 10)
|
||||
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
|
||||
let server = try MockLambdaServer(behavior: GoodBehavior()).start().wait()
|
||||
let result = Lambda.run(maxTimes: maxTimes) { (_, payload: String, callback) in
|
||||
let result = Lambda.run(configuration: configuration) { (_, payload: String, callback) in
|
||||
callback(.success(payload))
|
||||
}
|
||||
try server.stop().wait()
|
||||
@@ -67,7 +69,7 @@ private func assertLambdaLifecycleResult(result: LambdaLifecycleResult, shoudHav
|
||||
}
|
||||
}
|
||||
|
||||
private class GoodBehavior: LambdaServerBehavior {
|
||||
private struct GoodBehavior: LambdaServerBehavior {
|
||||
let requestId = NSUUID().uuidString
|
||||
let payload = "hello"
|
||||
func getWork() -> GetWorkResult {
|
||||
@@ -91,7 +93,7 @@ private class GoodBehavior: LambdaServerBehavior {
|
||||
}
|
||||
}
|
||||
|
||||
private class BadBehavior: LambdaServerBehavior {
|
||||
private struct BadBehavior: LambdaServerBehavior {
|
||||
func getWork() -> GetWorkResult {
|
||||
return .failure(.internalServerError)
|
||||
}
|
||||
@@ -109,7 +111,7 @@ private class BadBehavior: LambdaServerBehavior {
|
||||
}
|
||||
}
|
||||
|
||||
private class StringEchoHandler: LambdaStringHandler {
|
||||
private struct StringEchoHandler: LambdaStringHandler {
|
||||
func handle(context: LambdaContext, payload: String, callback: @escaping LambdaStringCallback) {
|
||||
callback(.success(payload))
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ import XCTest
|
||||
|
||||
class LambdaRunnerTest: XCTestCase {
|
||||
func testSuccess() throws {
|
||||
class Behavior: LambdaServerBehavior {
|
||||
struct Behavior: LambdaServerBehavior {
|
||||
let requestId = NSUUID().uuidString
|
||||
let payload = "hello"
|
||||
func getWork() -> GetWorkResult {
|
||||
@@ -44,7 +44,7 @@ class LambdaRunnerTest: XCTestCase {
|
||||
}
|
||||
|
||||
func testFailure() throws {
|
||||
class Behavior: LambdaServerBehavior {
|
||||
struct Behavior: LambdaServerBehavior {
|
||||
static let error = "boom"
|
||||
let requestId = NSUUID().uuidString
|
||||
func getWork() -> GetWorkResult {
|
||||
|
||||
@@ -17,7 +17,7 @@ import XCTest
|
||||
|
||||
class LambdaRuntimeClientTest: XCTestCase {
|
||||
func testGetWorkServerInternalError() throws {
|
||||
class Behavior: LambdaServerBehavior {
|
||||
struct Behavior: LambdaServerBehavior {
|
||||
func getWork() -> GetWorkResult {
|
||||
return .failure(.internalServerError)
|
||||
}
|
||||
@@ -43,7 +43,7 @@ class LambdaRuntimeClientTest: XCTestCase {
|
||||
}
|
||||
|
||||
func testGetWorkServerNoBodyError() throws {
|
||||
class Behavior: LambdaServerBehavior {
|
||||
struct Behavior: LambdaServerBehavior {
|
||||
func getWork() -> GetWorkResult {
|
||||
return .success(("1", ""))
|
||||
}
|
||||
@@ -69,7 +69,7 @@ class LambdaRuntimeClientTest: XCTestCase {
|
||||
}
|
||||
|
||||
func testGetWorkServerNoContextError() throws {
|
||||
class Behavior: LambdaServerBehavior {
|
||||
struct Behavior: LambdaServerBehavior {
|
||||
func getWork() -> GetWorkResult {
|
||||
// no request id -> no context
|
||||
return .success(("", "hello"))
|
||||
@@ -96,7 +96,7 @@ class LambdaRuntimeClientTest: XCTestCase {
|
||||
}
|
||||
|
||||
func testProcessResponseInternalServerError() throws {
|
||||
class Behavior: LambdaServerBehavior {
|
||||
struct Behavior: LambdaServerBehavior {
|
||||
func getWork() -> GetWorkResult {
|
||||
return .success((requestId: "1", payload: "payload"))
|
||||
}
|
||||
@@ -121,7 +121,7 @@ class LambdaRuntimeClientTest: XCTestCase {
|
||||
}
|
||||
|
||||
func testProcessErrorInternalServerError() throws {
|
||||
class Behavior: LambdaServerBehavior {
|
||||
struct Behavior: LambdaServerBehavior {
|
||||
func getWork() -> GetWorkResult {
|
||||
return .success((requestId: "1", payload: "payload"))
|
||||
}
|
||||
@@ -146,7 +146,7 @@ class LambdaRuntimeClientTest: XCTestCase {
|
||||
}
|
||||
|
||||
func testProcessInitErrorInternalServerError() throws {
|
||||
class Behavior: LambdaServerBehavior {
|
||||
struct Behavior: LambdaServerBehavior {
|
||||
func getWork() -> GetWorkResult {
|
||||
XCTFail("should not get work")
|
||||
return .failure(.internalServerError)
|
||||
|
||||
@@ -19,9 +19,10 @@ import XCTest
|
||||
class LambdaTest: XCTestCase {
|
||||
func testSuceess() throws {
|
||||
let maxTimes = Int.random(in: 10 ... 20)
|
||||
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
|
||||
let server = try MockLambdaServer(behavior: GoodBehavior()).start().wait()
|
||||
let handler = EchoHandler()
|
||||
let result = Lambda.run(handler: handler, maxTimes: maxTimes)
|
||||
let result = Lambda.run(handler: handler, configuration: configuration)
|
||||
try server.stop().wait()
|
||||
assertLambdaLifecycleResult(result: result, shoudHaveRun: maxTimes)
|
||||
XCTAssertEqual(handler.initializeCalls, 1)
|
||||
@@ -52,8 +53,9 @@ class LambdaTest: XCTestCase {
|
||||
|
||||
func testClosureSuccess() throws {
|
||||
let maxTimes = Int.random(in: 10 ... 20)
|
||||
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
|
||||
let server = try MockLambdaServer(behavior: GoodBehavior()).start().wait()
|
||||
let result = Lambda.run(maxTimes: maxTimes) { (_, payload: [UInt8], callback: LambdaCallback) in
|
||||
let result = Lambda.run(configuration: configuration) { (_, payload: [UInt8], callback: LambdaCallback) in
|
||||
callback(.success(payload))
|
||||
}
|
||||
try server.stop().wait()
|
||||
@@ -77,57 +79,64 @@ class LambdaTest: XCTestCase {
|
||||
}
|
||||
}
|
||||
let signal = Signal.ALRM
|
||||
let max = 50
|
||||
let maxTimes = 50
|
||||
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes, stopSignal: signal))
|
||||
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
|
||||
let future = Lambda.runAsync(eventLoopGroup: eventLoopGroup, handler: MyHandler(), maxTimes: max, stopSignal: signal)
|
||||
let future = Lambda.runAsync(eventLoopGroup: eventLoopGroup, handler: MyHandler(), configuration: configuration)
|
||||
DispatchQueue(label: "test").async {
|
||||
usleep(100_000)
|
||||
kill(getpid(), signal.rawValue)
|
||||
}
|
||||
let result = try future.wait()
|
||||
XCTAssertGreaterThan(result, 0, "should have stopped before any request made")
|
||||
XCTAssertLessThan(result, max, "should have stopped before \(max)")
|
||||
XCTAssertLessThan(result, maxTimes, "should have stopped before \(maxTimes)")
|
||||
try server.stop().wait()
|
||||
try eventLoopGroup.syncShutdownGracefully()
|
||||
}
|
||||
|
||||
func testTimeout() throws {
|
||||
let timeout = 100
|
||||
setenv(Consts.requestTimeoutEnvVariableName, "\(timeout)", 1)
|
||||
let timeout: Int64 = 100
|
||||
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: 1),
|
||||
runtimeEngine: .init(requestTimeout: .milliseconds(timeout)))
|
||||
let server = try MockLambdaServer(behavior: GoodBehavior(requestId: "timeout", payload: "\(timeout * 2)")).start().wait()
|
||||
let result = Lambda.run(handler: EchoHandler(), maxTimes: 1)
|
||||
let result = Lambda.run(handler: EchoHandler(), configuration: configuration)
|
||||
try server.stop().wait()
|
||||
assertLambdaLifecycleResult(result: result, shouldFailWithError: LambdaRuntimeClientError.upstreamError("timeout"))
|
||||
unsetenv(Consts.requestTimeoutEnvVariableName)
|
||||
}
|
||||
|
||||
func testDisconnect() throws {
|
||||
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: 1))
|
||||
let server = try MockLambdaServer(behavior: GoodBehavior(requestId: "disconnect")).start().wait()
|
||||
let result = Lambda.run(handler: EchoHandler(), maxTimes: 1)
|
||||
let result = Lambda.run(handler: EchoHandler(), configuration: configuration)
|
||||
try server.stop().wait()
|
||||
assertLambdaLifecycleResult(result: result, shouldFailWithError: LambdaRuntimeClientError.upstreamError("connectionResetByPeer"))
|
||||
}
|
||||
|
||||
func testBigPayload() throws {
|
||||
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: 1))
|
||||
let payload = String(repeating: "*", count: 104_448)
|
||||
let server = try MockLambdaServer(behavior: GoodBehavior(payload: payload)).start().wait()
|
||||
let result = Lambda.run(handler: EchoHandler(), maxTimes: 1)
|
||||
let result = Lambda.run(handler: EchoHandler(), configuration: configuration)
|
||||
try server.stop().wait()
|
||||
assertLambdaLifecycleResult(result: result, shoudHaveRun: 1)
|
||||
}
|
||||
|
||||
func testKeepAliveServer() throws {
|
||||
let maxTimes = 10
|
||||
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
|
||||
let server = try MockLambdaServer(behavior: GoodBehavior(), keepAlive: true).start().wait()
|
||||
let result = Lambda.run(handler: EchoHandler(), maxTimes: 10)
|
||||
let result = Lambda.run(handler: EchoHandler(), configuration: configuration)
|
||||
try server.stop().wait()
|
||||
assertLambdaLifecycleResult(result: result, shoudHaveRun: 10)
|
||||
assertLambdaLifecycleResult(result: result, shoudHaveRun: maxTimes)
|
||||
}
|
||||
|
||||
func testNoKeepAliveServer() throws {
|
||||
let maxTimes = 10
|
||||
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes))
|
||||
let server = try MockLambdaServer(behavior: GoodBehavior(), keepAlive: false).start().wait()
|
||||
let result = Lambda.run(handler: EchoHandler(), maxTimes: 10)
|
||||
let result = Lambda.run(handler: EchoHandler(), configuration: configuration)
|
||||
try server.stop().wait()
|
||||
assertLambdaLifecycleResult(result: result, shoudHaveRun: 10)
|
||||
assertLambdaLifecycleResult(result: result, shoudHaveRun: maxTimes)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,7 +157,7 @@ private func assertLambdaLifecycleResult(result: LambdaLifecycleResult, shoudHav
|
||||
}
|
||||
}
|
||||
|
||||
private class GoodBehavior: LambdaServerBehavior {
|
||||
private struct GoodBehavior: LambdaServerBehavior {
|
||||
let requestId: String
|
||||
let payload: String
|
||||
|
||||
@@ -178,7 +187,7 @@ private class GoodBehavior: LambdaServerBehavior {
|
||||
}
|
||||
}
|
||||
|
||||
private class BadBehavior: LambdaServerBehavior {
|
||||
private struct BadBehavior: LambdaServerBehavior {
|
||||
func getWork() -> GetWorkResult {
|
||||
return .failure(.internalServerError)
|
||||
}
|
||||
@@ -197,7 +206,7 @@ private class BadBehavior: LambdaServerBehavior {
|
||||
}
|
||||
}
|
||||
|
||||
private class GoodBehaviourWhenInitFails: LambdaServerBehavior {
|
||||
private struct GoodBehaviourWhenInitFails: LambdaServerBehavior {
|
||||
func getWork() -> GetWorkResult {
|
||||
XCTFail("should not get work")
|
||||
return .failure(.internalServerError)
|
||||
@@ -218,7 +227,7 @@ private class GoodBehaviourWhenInitFails: LambdaServerBehavior {
|
||||
}
|
||||
}
|
||||
|
||||
private class BadBehaviourWhenInitFails: LambdaServerBehavior {
|
||||
private struct BadBehaviourWhenInitFails: LambdaServerBehavior {
|
||||
func getWork() -> GetWorkResult {
|
||||
XCTFail("should not get work")
|
||||
return .failure(.internalServerError)
|
||||
|
||||
@@ -12,23 +12,24 @@
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import Foundation
|
||||
import Foundation // for JSON
|
||||
import Logging
|
||||
import NIO
|
||||
import NIOHTTP1
|
||||
@testable import SwiftAwsLambda
|
||||
|
||||
internal class MockLambdaServer {
|
||||
internal final class MockLambdaServer {
|
||||
private let logger = Logger(label: "MockLambdaServer")
|
||||
private let behavior: LambdaServerBehavior
|
||||
private let host: String
|
||||
private let port: Int
|
||||
private let keepAlive: Bool
|
||||
private let group: EventLoopGroup
|
||||
|
||||
private var channel: Channel?
|
||||
private var shutdown = false
|
||||
|
||||
public init(behavior: LambdaServerBehavior, host: String = Defaults.host, port: Int = Defaults.port, keepAlive: Bool = true) {
|
||||
public init(behavior: LambdaServerBehavior, host: String = "127.0.0.1", port: Int = 7000, keepAlive: Bool = true) {
|
||||
self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
|
||||
self.behavior = behavior
|
||||
self.host = host
|
||||
@@ -96,9 +97,10 @@ internal final class HTTPHandler: ChannelInboundHandler {
|
||||
self.requestBody?.clear()
|
||||
case .body(var buffer):
|
||||
if self.requestBody == nil {
|
||||
self.requestBody = context.channel.allocator.buffer(capacity: buffer.readableBytes)
|
||||
self.requestBody = buffer
|
||||
} else {
|
||||
self.requestBody!.writeBuffer(&buffer)
|
||||
}
|
||||
self.requestBody!.writeBuffer(&buffer)
|
||||
case .end:
|
||||
self.processRequest(context: context)
|
||||
}
|
||||
@@ -129,7 +131,7 @@ internal final class HTTPHandler: ChannelInboundHandler {
|
||||
}
|
||||
} else if self.requestHead.uri.hasSuffix(Consts.requestWorkURLSuffix) {
|
||||
switch self.behavior.getWork() {
|
||||
case .success(let requestId, let result):
|
||||
case .success(let (requestId, result)):
|
||||
if requestId == "timeout" {
|
||||
usleep((UInt32(result) ?? 0) * 1000)
|
||||
} else if requestId == "disconnect" {
|
||||
|
||||
@@ -21,8 +21,8 @@ func runLambda(behavior: LambdaServerBehavior, handler: LambdaHandler) throws {
|
||||
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
|
||||
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
|
||||
let logger = Logger(label: "TestLogger")
|
||||
let config = Lambda.Config(lifecycle: .init(), runtimeEngine: .init(requestTimeout: .milliseconds(100)))
|
||||
let runner = LambdaRunner(eventLoop: eventLoopGroup.next(), config: config, lambdaHandler: handler)
|
||||
let configuration = Lambda.Configuration(runtimeEngine: .init(requestTimeout: .milliseconds(100)))
|
||||
let runner = LambdaRunner(eventLoop: eventLoopGroup.next(), configuration: configuration, lambdaHandler: handler)
|
||||
let server = try MockLambdaServer(behavior: behavior).start().wait()
|
||||
defer { XCTAssertNoThrow(try server.stop().wait()) }
|
||||
try runner.initialize(logger: logger).flatMap {
|
||||
@@ -43,7 +43,7 @@ class EchoHandler: LambdaHandler {
|
||||
}
|
||||
}
|
||||
|
||||
class FailedHandler: LambdaHandler {
|
||||
struct FailedHandler: LambdaHandler {
|
||||
private let reason: String
|
||||
|
||||
public init(_ reason: String) {
|
||||
@@ -59,7 +59,7 @@ class FailedHandler: LambdaHandler {
|
||||
}
|
||||
}
|
||||
|
||||
class FailedInitializerHandler: LambdaHandler {
|
||||
struct FailedInitializerHandler: LambdaHandler {
|
||||
private let reason: String
|
||||
|
||||
public init(_ reason: String) {
|
||||
|
||||
@@ -38,8 +38,8 @@ This library is designed to simplify implementing an AWS Lambda using the Swift
|
||||
Or more typically, a simple closure that receives a json payload and replies with a json response via `Codable`:
|
||||
|
||||
```swift
|
||||
private class Request: Codable {}
|
||||
private class Response: Codable {}
|
||||
private struct Request: Codable {}
|
||||
private struct Response: Codable {}
|
||||
|
||||
// in this example we are receiving and responding with codables. Request and Response above are examples of how to use
|
||||
// codables to model your reqeuest and response objects
|
||||
|
||||
Executable
+126
@@ -0,0 +1,126 @@
|
||||
#!/bin/bash
|
||||
##===----------------------------------------------------------------------===##
|
||||
##
|
||||
## This source file is part of the SwiftAwsLambda open source project
|
||||
##
|
||||
## Copyright (c) 2017-2018 Apple Inc. and the SwiftAwsLambda project authors
|
||||
## Licensed under Apache License v2.0
|
||||
##
|
||||
## See LICENSE.txt for license information
|
||||
## See CONTRIBUTORS.txt for the list of SwiftAwsLambda project authors
|
||||
##
|
||||
## SPDX-License-Identifier: Apache-2.0
|
||||
##
|
||||
##===----------------------------------------------------------------------===##
|
||||
|
||||
set -eu
|
||||
|
||||
export HOST=127.0.0.1
|
||||
export PORT=3000
|
||||
export AWS_LAMBDA_RUNTIME_API="$HOST:$PORT"
|
||||
export LOG_LEVEL=warning # important, otherwise log becomes a bottleneck
|
||||
|
||||
# using gdate on mdarwin for nanoseconds
|
||||
if [[ $(uname -s) == "Linux" ]]; then
|
||||
shopt -s expand_aliases
|
||||
alias gdate="date"
|
||||
fi
|
||||
|
||||
swift build -c release -Xswiftc -g
|
||||
|
||||
cleanup() {
|
||||
kill -9 $server_pid
|
||||
}
|
||||
|
||||
trap "cleanup" ERR
|
||||
|
||||
iterations=100
|
||||
results=()
|
||||
|
||||
#------------------
|
||||
# string
|
||||
#------------------
|
||||
|
||||
export MODE=string
|
||||
|
||||
# start (fork) mock server
|
||||
pkill -9 MockServer && echo "killed previous servers" && sleep 1
|
||||
echo "starting server in $MODE mode"
|
||||
(./.build/release/MockServer) &
|
||||
server_pid=$!
|
||||
sleep 1
|
||||
kill -0 $server_pid # check server is alive
|
||||
|
||||
# cold start
|
||||
echo "running $MODE mode cold test"
|
||||
cold=()
|
||||
export MAX_REQUESTS=1
|
||||
for (( i=0; i<$iterations; i++ )); do
|
||||
start=$(gdate +%s%N)
|
||||
./.build/release/SwiftAwsLambdaStringSample
|
||||
end=$(gdate +%s%N)
|
||||
cold+=( $(($end-$start)) )
|
||||
done
|
||||
sum_cold=$(IFS=+; echo "$((${cold[*]}))")
|
||||
avg_cold=$(($sum_cold/$iterations))
|
||||
results+=( "$MODE, cold: $avg_cold (ns)" )
|
||||
|
||||
# normal calls
|
||||
echo "running $MODE mode warm test"
|
||||
export MAX_REQUESTS=$iterations
|
||||
start=$(gdate +%s%N)
|
||||
./.build/release/SwiftAwsLambdaStringSample
|
||||
end=$(gdate +%s%N)
|
||||
sum_warm=$(($end-$start-$avg_cold)) # substract by avg cold since the first call is cold
|
||||
avg_warm=$(($sum_warm/($iterations-1))) # substract since the first call is cold
|
||||
results+=( "$MODE, warm: $avg_warm (ns)" )
|
||||
|
||||
#------------------
|
||||
# JSON
|
||||
#------------------
|
||||
|
||||
export MODE=json
|
||||
|
||||
# start (fork) mock server
|
||||
pkill -9 MockServer && echo "killed previous servers" && sleep 1
|
||||
echo "starting server in $MODE mode"
|
||||
(./.build/release/MockServer) &
|
||||
server_pid=$!
|
||||
sleep 1
|
||||
kill -0 $server_pid # check server is alive
|
||||
|
||||
# cold start
|
||||
echo "running $MODE mode cold test"
|
||||
cold=()
|
||||
export MAX_REQUESTS=1
|
||||
for (( i=0; i<$iterations; i++ )); do
|
||||
start=$(gdate +%s%N)
|
||||
./.build/release/SwiftAwsLambdaCodableSample
|
||||
end=$(gdate +%s%N)
|
||||
cold+=( $(($end-$start)) )
|
||||
done
|
||||
sum_cold=$(IFS=+; echo "$((${cold[*]}))")
|
||||
avg_cold=$(($sum_cold/$iterations))
|
||||
results+=( "$MODE, cold: $avg_cold (ns)" )
|
||||
|
||||
# normal calls
|
||||
echo "running $MODE mode warm test"
|
||||
export MAX_REQUESTS=$iterations
|
||||
start=$(gdate +%s%N)
|
||||
./.build/release/SwiftAwsLambdaCodableSample
|
||||
end=$(gdate +%s%N)
|
||||
sum_warm=$(($end-$start-$avg_cold)) # substract by avg cold since the first call is cold
|
||||
avg_warm=$(($sum_warm/($iterations-1))) # substract since the first call is cold
|
||||
results+=( "$MODE, warm: $avg_warm (ns)" )
|
||||
|
||||
# print results
|
||||
echo "-----------------------------"
|
||||
echo "results"
|
||||
echo "-----------------------------"
|
||||
for i in "${results[@]}"; do
|
||||
echo $i
|
||||
done
|
||||
echo "-----------------------------"
|
||||
|
||||
# cleanup
|
||||
cleanup
|
||||
@@ -14,6 +14,7 @@
|
||||
##===----------------------------------------------------------------------===##
|
||||
|
||||
set -eu
|
||||
|
||||
here="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
|
||||
function replace_acceptable_years() {
|
||||
|
||||
Reference in New Issue
Block a user