LambdaRuntimeClient works with Invocation (#19)

### Motivation:

- We want to store different entities that are needed when executing a handler within the LambdaContext (Logger, EventLoop, ByteBufferAllocator, …)
- Currently the LambdaRuntimeClient creates the LambdaContext. Having the LambdaContext with the Logger, EventLoop and ByteBufferAllocator be created from the LambdaRuntimeClient feels to me too much for me.
- Conceptionally the Lambda control plane api call is “get next Invocation” (API naming)

### Changes:

- LambdaRuntimeClient responds with an Invocation and does not use the LambdaContext at all anymore.
- LambdaRunner creates the LambdaContext with the Invocation, Logger and EventLoop.
- LambdaContext has been renamed to Lambda.Context
- Lambda.Context is a class now, since it is conceptionally not a value type and might be passed around a lot
- Lambda.Context properties `traceId`, `invokedFunctionArn`, `deadline` are not optional anymore since they will be always set when executing a lambda
- Creating an Invocation can fail with LambdaRuntimeClientError.invocationMissingHeader(String), if non optional headers are not present
- the test MockLambdaServer and the performance test MockServer always return headers for deadline, traceId and function arn (static for now – could be changed with Behaviour flag?!)

### Open ends:

- we will need to build some kind of Deadline into the context (See also #9 - probably for a different PR)
- we have a stupid mapping between ByteBuffer and [UInt8] in the LambdaRunner for now (marked with two TODOs). I don’t want to change this in this PR since it will lead to huge merge conflicts down the road with the potentiall API changes we have in mind.
This commit is contained in:
Fabian Fett
2020-03-08 23:10:03 +01:00
committed by GitHub
parent 08f75f51c7
commit ddee38c367
13 changed files with 138 additions and 91 deletions
+6 -1
View File
@@ -108,7 +108,12 @@ internal final class HTTPHandler: ChannelInboundHandler {
case .json:
responseBody = "{ \"body\": \"\(requestId)\" }"
}
responseHeaders = [(AmazonHeaders.requestID, requestId)]
responseHeaders = [
(AmazonHeaders.requestID, requestId),
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime"),
(AmazonHeaders.traceID, "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419;Sampled=1"),
(AmazonHeaders.deadline, String(Date(timeIntervalSinceNow: 60).timeIntervalSince1970 * 1000)),
]
} else if request.head.uri.hasSuffix("/response") {
responseStatus = .accepted
} else {
+4 -4
View File
@@ -50,7 +50,7 @@ public typealias LambdaCodableCallback<Out> = (LambdaCodableResult<Out>) -> Void
/// A processing closure for a Lambda that takes an `In` and returns an `Out` via `LambdaCodableCallback<Out>` asynchronously,
/// having `In` and `Out` extending `Decodable` and `Encodable` respectively.
public typealias LambdaCodableClosure<In, Out> = (LambdaContext, In, LambdaCodableCallback<Out>) -> Void
public typealias LambdaCodableClosure<In, Out> = (Lambda.Context, In, LambdaCodableCallback<Out>) -> Void
/// A processing protocol for a Lambda that takes an `In` and returns an `Out` via `LambdaCodableCallback<Out>` asynchronously,
/// having `In` and `Out` extending `Decodable` and `Encodable` respectively.
@@ -58,7 +58,7 @@ public protocol LambdaCodableHandler: LambdaHandler {
associatedtype In: Decodable
associatedtype Out: Encodable
func handle(context: LambdaContext, payload: In, callback: @escaping LambdaCodableCallback<Out>)
func handle(context: Lambda.Context, payload: In, callback: @escaping LambdaCodableCallback<Out>)
var codec: LambdaCodableCodec<In, Out> { get }
}
@@ -79,7 +79,7 @@ public class LambdaCodableCodec<In: Decodable, Out: Encodable> {
/// Default implementation of `Encodable` -> `[UInt8]` encoding and `[UInt8]` -> `Decodable' decoding
public extension LambdaCodableHandler {
func handle(context: LambdaContext, payload: [UInt8], callback: @escaping (LambdaResult) -> Void) {
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping (LambdaResult) -> Void) {
switch self.codec.decode(payload) {
case .failure(let error):
return callback(.failure(Errors.requestDecoding(error)))
@@ -133,7 +133,7 @@ private struct LambdaClosureWrapper<In: Decodable, Out: Encodable>: LambdaCodabl
self.closure = closure
}
public func handle(context: LambdaContext, payload: In, callback: @escaping LambdaCodableCallback<Out>) {
public func handle(context: Lambda.Context, payload: In, callback: @escaping LambdaCodableCallback<Out>) {
self.closure(context, payload, callback)
}
}
+4 -4
View File
@@ -46,16 +46,16 @@ public typealias LambdaStringResult = Result<String, Error>
public typealias LambdaStringCallback = (LambdaStringResult) -> Void
/// A processing closure for a Lambda that takes a `String` and returns a `LambdaStringResult` via `LambdaStringCallback` asynchronously.
public typealias LambdaStringClosure = (LambdaContext, String, LambdaStringCallback) -> Void
public typealias LambdaStringClosure = (Lambda.Context, String, LambdaStringCallback) -> Void
/// A processing protocol for a Lambda that takes a `String` and returns a `LambdaStringResult` via `LambdaStringCallback` asynchronously.
public protocol LambdaStringHandler: LambdaHandler {
func handle(context: LambdaContext, payload: String, callback: @escaping LambdaStringCallback)
func handle(context: Lambda.Context, payload: String, callback: @escaping LambdaStringCallback)
}
/// Default implementation of `String` -> `[UInt8]` encoding and `[UInt8]` -> `String' decoding
public extension LambdaStringHandler {
func handle(context: LambdaContext, payload: [UInt8], callback: @escaping LambdaCallback) {
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback) {
self.handle(context: context, payload: String(decoding: payload, as: UTF8.self)) { result in
switch result {
case .success(let string):
@@ -73,7 +73,7 @@ private struct LambdaClosureWrapper: LambdaStringHandler {
self.closure = closure
}
func handle(context: LambdaContext, payload: String, callback: @escaping LambdaStringCallback) {
func handle(context: Lambda.Context, payload: String, callback: @escaping LambdaStringCallback) {
self.closure(context, payload, callback)
}
}
+35 -37
View File
@@ -77,6 +77,38 @@ public enum Lambda {
}
}
public class Context {
// from aws
public let requestId: String
public let traceId: String
public let invokedFunctionArn: String
public let deadline: String
public let cognitoIdentity: String?
public let clientContext: String?
// utility
public let logger: Logger
internal init(requestId: String,
traceId: String,
invokedFunctionArn: String,
deadline: String,
cognitoIdentity: String? = nil,
clientContext: String? = nil,
logger: Logger) {
self.requestId = requestId
self.traceId = traceId
self.invokedFunctionArn = invokedFunctionArn
self.cognitoIdentity = cognitoIdentity
self.clientContext = clientContext
self.deadline = deadline
// mutate logger with context
var logger = logger
logger[metadataKey: "awsRequestId"] = .string(requestId)
logger[metadataKey: "awsTraceId"] = .string(traceId)
self.logger = logger
}
}
private final class Lifecycle {
private let eventLoop: EventLoop
private let logger: Logger
@@ -258,7 +290,7 @@ public typealias LambdaResult = Result<[UInt8], Error>
public typealias LambdaCallback = (LambdaResult) -> Void
/// A processing closure for a Lambda that takes a `[UInt8]` and returns a `LambdaResult` result type asynchronously.
public typealias LambdaClosure = (LambdaContext, [UInt8], LambdaCallback) -> Void
public typealias LambdaClosure = (Lambda.Context, [UInt8], LambdaCallback) -> Void
/// A result type for a Lambda initialization.
public typealias LambdaInitResult = Result<Void, Error>
@@ -270,7 +302,7 @@ public typealias LambdaInitCallBack = (LambdaInitResult) -> Void
public protocol LambdaHandler {
/// Initializes the `LambdaHandler`.
func initialize(callback: @escaping LambdaInitCallBack)
func handle(context: LambdaContext, payload: [UInt8], callback: @escaping LambdaCallback)
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback)
}
extension LambdaHandler {
@@ -280,40 +312,6 @@ extension LambdaHandler {
}
}
public struct LambdaContext {
// from aws
public let requestId: String
public let traceId: String?
public let invokedFunctionArn: String?
public let cognitoIdentity: String?
public let clientContext: String?
public let deadline: String?
// utliity
public let logger: Logger
public init(requestId: String,
traceId: String? = nil,
invokedFunctionArn: String? = nil,
cognitoIdentity: String? = nil,
clientContext: String? = nil,
deadline: String? = nil,
logger: Logger) {
self.requestId = requestId
self.traceId = traceId
self.invokedFunctionArn = invokedFunctionArn
self.cognitoIdentity = cognitoIdentity
self.clientContext = clientContext
self.deadline = deadline
// mutate logger with context
var logger = logger
logger[metadataKey: "awsRequestId"] = .string(requestId)
if let traceId = traceId {
logger[metadataKey: "awsTraceId"] = .string(traceId)
}
self.logger = logger
}
}
@usableFromInline
internal typealias LambdaLifecycleResult = Result<Int, Error>
@@ -323,7 +321,7 @@ private struct LambdaClosureWrapper: LambdaHandler {
self.closure = closure
}
func handle(context: LambdaContext, payload: [UInt8], callback: @escaping LambdaCallback) {
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback) {
self.closure(context, payload, callback)
}
}
+36 -5
View File
@@ -54,17 +54,36 @@ internal struct LambdaRunner {
// 1. request work from lambda runtime engine
return self.runtimeClient.requestWork(logger: logger).peekError { error in
logger.error("could not fetch work from lambda runtime engine: \(error)")
}.flatMap { context, payload in
}.flatMap { invocation, payload in
// 2. send work to handler
let context = Lambda.Context(logger: logger, eventLoop: self.eventLoop, invocation: invocation)
logger.debug("sending work to lambda handler \(self.lambdaHandler)")
// TODO: This is just for now, so that we can work with ByteBuffers only
// in the LambdaRuntimeClient
let bytes = [UInt8](payload.readableBytesView)
return self.lambdaHandler.handle(eventLoop: self.eventLoop,
lifecycleId: self.lifecycleId,
offload: self.offload,
context: context,
payload: payload).map { (context, $0) }
}.flatMap { context, result in
payload: bytes)
.map {
// TODO: This mapping shall be removed as soon as the LambdaHandler protocol
// works with ByteBuffer? instead of [UInt8]
let mappedResult: Result<ByteBuffer, Error>
switch $0 {
case .success(let bytes):
var buffer = ByteBufferAllocator().buffer(capacity: bytes.count)
buffer.writeBytes(bytes)
mappedResult = .success(buffer)
case .failure(let error):
mappedResult = .failure(error)
}
return (invocation, mappedResult)
}
}.flatMap { invocation, result in
// 3. report results to runtime engine
self.runtimeClient.reportResults(logger: logger, context: context, result: result).peekError { error in
self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in
logger.error("failed reporting results to lambda runtime engine: \(error)")
}
}.always { result in
@@ -88,7 +107,7 @@ private extension LambdaHandler {
return promise.futureResult
}
func handle(eventLoop: EventLoop, lifecycleId: String, offload: Bool, context: LambdaContext, payload: [UInt8]) -> EventLoopFuture<LambdaResult> {
func handle(eventLoop: EventLoop, lifecycleId: String, offload: Bool, context: Lambda.Context, payload: [UInt8]) -> EventLoopFuture<LambdaResult> {
// offloading so user code never blocks the eventloop
let promise = eventLoop.makePromise(of: LambdaResult.self)
if offload {
@@ -106,6 +125,18 @@ private extension LambdaHandler {
}
}
private extension Lambda.Context {
convenience init(logger: Logger, eventLoop: EventLoop, invocation: Invocation) {
self.init(requestId: invocation.requestId,
traceId: invocation.traceId,
invokedFunctionArn: invocation.invokedFunctionArn,
deadline: invocation.deadlineDate,
cognitoIdentity: invocation.cognitoIdentity,
clientContext: invocation.clientContext,
logger: logger)
}
}
// TODO: move to nio?
private extension EventLoopFuture {
// callback does not have side effects, failing with original result
@@ -33,20 +33,18 @@ internal struct LambdaRuntimeClient {
}
/// Requests work from the Runtime Engine.
func requestWork(logger: Logger) -> EventLoopFuture<(LambdaContext, [UInt8])> {
func requestWork(logger: Logger) -> EventLoopFuture<(Invocation, ByteBuffer)> {
let url = Consts.invocationURLPrefix + Consts.requestWorkURLSuffix
logger.debug("requesting work from lambda runtime engine using \(url)")
return self.httpClient.get(url: url).flatMapThrowing { response in
guard response.status == .ok else {
throw LambdaRuntimeClientError.badStatusCode(response.status)
}
guard let payload = response.readWholeBody() else {
let invocation = try Invocation(headers: response.headers)
guard let payload = response.body else {
throw LambdaRuntimeClientError.noBody
}
guard let context = LambdaContext(logger: logger, response: response) else {
throw LambdaRuntimeClientError.noContext
}
return (context, payload)
return (invocation, payload)
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
@@ -60,14 +58,13 @@ internal struct LambdaRuntimeClient {
}
/// Reports a result to the Runtime Engine.
func reportResults(logger: Logger, context: LambdaContext, result: LambdaResult) -> EventLoopFuture<Void> {
var url = Consts.invocationURLPrefix + "/" + context.requestId
func reportResults(logger: Logger, invocation: Invocation, result: Result<ByteBuffer, Error>) -> EventLoopFuture<Void> {
var url = Consts.invocationURLPrefix + "/" + invocation.requestId
var body: ByteBuffer
switch result {
case .success(let data):
case .success(let buffer):
url += Consts.postResponseURLSuffix
body = self.allocator.buffer(capacity: data.count)
body.writeBytes(data)
body = buffer
case .failure(let error):
url += Consts.postErrorURLSuffix
// TODO: make FunctionError a const
@@ -132,8 +129,8 @@ internal struct LambdaRuntimeClient {
internal enum LambdaRuntimeClientError: Error, Equatable {
case badStatusCode(HTTPResponseStatus)
case upstreamError(String)
case invocationMissingHeader(String)
case noBody
case noContext
case json(JsonCodecError)
}
@@ -182,25 +179,36 @@ private extension HTTPClient.Response {
}
}
private extension LambdaContext {
init?(logger: Logger, response: HTTPClient.Response) {
guard let requestId = response.headerValue(AmazonHeaders.requestID) else {
return nil
internal struct Invocation {
let requestId: String
let deadlineDate: String
let invokedFunctionArn: String
let traceId: String
let clientContext: String?
let cognitoIdentity: String?
init(headers: HTTPHeaders) throws {
guard let requestId = headers.first(name: AmazonHeaders.requestID), !requestId.isEmpty else {
throw LambdaRuntimeClientError.invocationMissingHeader(AmazonHeaders.requestID)
}
if requestId.isEmpty {
return nil
guard let unixTimeMilliseconds = headers.first(name: AmazonHeaders.deadline) else {
throw LambdaRuntimeClientError.invocationMissingHeader(AmazonHeaders.deadline)
}
let traceId = response.headerValue(AmazonHeaders.traceID)
let invokedFunctionArn = response.headerValue(AmazonHeaders.invokedFunctionARN)
let cognitoIdentity = response.headerValue(AmazonHeaders.cognitoIdentity)
let clientContext = response.headerValue(AmazonHeaders.clientContext)
let deadline = response.headerValue(AmazonHeaders.deadline)
self = LambdaContext(requestId: requestId,
traceId: traceId,
invokedFunctionArn: invokedFunctionArn,
cognitoIdentity: cognitoIdentity,
clientContext: clientContext,
deadline: deadline,
logger: logger)
guard let invokedFunctionArn = headers.first(name: AmazonHeaders.invokedFunctionARN) else {
throw LambdaRuntimeClientError.invocationMissingHeader(AmazonHeaders.invokedFunctionARN)
}
guard let traceId = headers.first(name: AmazonHeaders.traceID) else {
throw LambdaRuntimeClientError.invocationMissingHeader(AmazonHeaders.traceID)
}
self.requestId = requestId
self.deadlineDate = unixTimeMilliseconds
self.invokedFunctionArn = invokedFunctionArn
self.traceId = traceId
self.clientContext = headers["Lambda-Runtime-Client-Context"].first
self.cognitoIdentity = headers["Lambda-Runtime-Cognito-Identity"].first
}
}
@@ -150,7 +150,7 @@ private struct Response: Codable {
}
private struct CodableEchoHandler: LambdaCodableHandler {
func handle(context: LambdaContext, payload: Request, callback: @escaping LambdaCodableCallback<Response>) {
func handle(context: Lambda.Context, payload: Request, callback: @escaping LambdaCodableCallback<Response>) {
callback(.success(Response(requestId: payload.requestId)))
}
}
@@ -120,7 +120,7 @@ private struct BadBehavior: LambdaServerBehavior {
}
private struct StringEchoHandler: LambdaStringHandler {
func handle(context: LambdaContext, payload: String, callback: @escaping LambdaStringCallback) {
func handle(context: Lambda.Context, payload: String, callback: @escaping LambdaStringCallback) {
callback(.success(payload))
}
}
@@ -27,7 +27,7 @@ extension LambdaRuntimeClientTest {
return [
("testGetWorkServerInternalError", testGetWorkServerInternalError),
("testGetWorkServerNoBodyError", testGetWorkServerNoBodyError),
("testGetWorkServerNoContextError", testGetWorkServerNoContextError),
("testGetWorkServerMissingHeaderRequestIDError", testGetWorkServerMissingHeaderRequestIDError),
("testProcessResponseInternalServerError", testProcessResponseInternalServerError),
("testProcessErrorInternalServerError", testProcessErrorInternalServerError),
("testProcessInitErrorInternalServerError", testProcessInitErrorInternalServerError),
@@ -68,7 +68,7 @@ class LambdaRuntimeClientTest: XCTestCase {
}
}
func testGetWorkServerNoContextError() throws {
func testGetWorkServerMissingHeaderRequestIDError() throws {
struct Behavior: LambdaServerBehavior {
func getWork() -> GetWorkResult {
// no request id -> no context
@@ -91,7 +91,7 @@ class LambdaRuntimeClientTest: XCTestCase {
}
}
XCTAssertThrowsError(try runLambda(behavior: Behavior(), handler: EchoHandler())) { error in
XCTAssertEqual(error as? LambdaRuntimeClientError, LambdaRuntimeClientError.noContext)
XCTAssertEqual(error as? LambdaRuntimeClientError, LambdaRuntimeClientError.invocationMissingHeader(AmazonHeaders.requestID))
}
}
+1 -1
View File
@@ -86,7 +86,7 @@ class LambdaTest: XCTestCase {
func testStartStop() throws {
let server = try MockLambdaServer(behavior: GoodBehavior()).start().wait()
struct MyHandler: LambdaHandler {
func handle(context: LambdaContext, payload: [UInt8], callback: @escaping LambdaCallback) {
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback) {
callback(.success(payload))
}
}
@@ -140,7 +140,12 @@ internal final class HTTPHandler: ChannelInboundHandler {
}
responseStatus = .ok
responseBody = result
responseHeaders = [(AmazonHeaders.requestID, requestId)]
responseHeaders = [
(AmazonHeaders.requestID, requestId),
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime"),
(AmazonHeaders.traceID, "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419;Sampled=1"),
(AmazonHeaders.deadline, String(Date(timeIntervalSinceNow: 60).timeIntervalSince1970 * 1000)),
]
case .failure(let error):
responseStatus = .init(statusCode: error.rawValue)
}
+3 -3
View File
@@ -38,7 +38,7 @@ final class EchoHandler: LambdaHandler {
callback(.success(()))
}
func handle(context: LambdaContext, payload: [UInt8], callback: @escaping LambdaCallback) {
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback) {
callback(.success(payload))
}
}
@@ -50,7 +50,7 @@ struct FailedHandler: LambdaHandler {
self.reason = reason
}
func handle(context: LambdaContext, payload: [UInt8], callback: @escaping LambdaCallback) {
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback) {
callback(.failure(Error(description: self.reason)))
}
@@ -66,7 +66,7 @@ struct FailedInitializerHandler: LambdaHandler {
self.reason = reason
}
func handle(context: LambdaContext, payload: [UInt8], callback: @escaping LambdaCallback) {
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback) {
callback(.success(payload))
}