mirror of
https://github.com/swift-server/async-http-client.git
synced 2026-05-03 07:32:29 +00:00
59bfb96afb
Motivation We currently don't handle large headers well which trigger a channel writability change event. Modification Add failing (but currently skipped) tests which reproduces the issue Result We can reliably reproduce the large request header issue in an integration and unit test. Note that the actual fix is not included to make reviewing easier and will come in a follow up PR.
1459 lines
55 KiB
Swift
1459 lines
55 KiB
Swift
//===----------------------------------------------------------------------===//
|
|
//
|
|
// This source file is part of the AsyncHTTPClient open source project
|
|
//
|
|
// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors
|
|
// Licensed under Apache License v2.0
|
|
//
|
|
// See LICENSE.txt for license information
|
|
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
|
|
//
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
|
|
import AsyncHTTPClient
|
|
import Atomics
|
|
import Foundation
|
|
import Logging
|
|
import NIOConcurrencyHelpers
|
|
import NIOCore
|
|
import NIOEmbedded
|
|
import NIOHPACK
|
|
import NIOHTTP1
|
|
import NIOHTTP2
|
|
import NIOHTTPCompression
|
|
import NIOPosix
|
|
import NIOSSL
|
|
import NIOTLS
|
|
import NIOTransportServices
|
|
import XCTest
|
|
#if canImport(Darwin)
|
|
import Darwin
|
|
#elseif canImport(Glibc)
|
|
import Glibc
|
|
#endif
|
|
|
|
/// Are we testing NIO Transport services
|
|
func isTestingNIOTS() -> Bool {
|
|
#if canImport(Network)
|
|
return ProcessInfo.processInfo.environment["DISABLE_TS_TESTS"] != "true"
|
|
#else
|
|
return false
|
|
#endif
|
|
}
|
|
|
|
func getDefaultEventLoopGroup(numberOfThreads: Int) -> EventLoopGroup {
|
|
#if canImport(Network)
|
|
if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *),
|
|
isTestingNIOTS() {
|
|
return NIOTSEventLoopGroup(loopCount: numberOfThreads, defaultQoS: .default)
|
|
}
|
|
#endif
|
|
return MultiThreadedEventLoopGroup(numberOfThreads: numberOfThreads)
|
|
}
|
|
|
|
let canBindIPv6Loopback: Bool = {
|
|
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
|
defer { try! elg.syncShutdownGracefully() }
|
|
let serverChannel = try? ServerBootstrap(group: elg)
|
|
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
|
|
.bind(host: "::1", port: 0)
|
|
.wait()
|
|
let didBind = (serverChannel != nil)
|
|
try! serverChannel?.close().wait()
|
|
return didBind
|
|
}()
|
|
|
|
/// Runs the given block in the context of a non-English C locale (in this case, German).
|
|
/// Throws an XCTSkip error if the locale is not supported by the system.
|
|
func withCLocaleSetToGerman(_ body: () throws -> Void) throws {
|
|
guard let germanLocale = newlocale(LC_TIME_MASK | LC_NUMERIC_MASK, "de_DE", nil) else {
|
|
if errno == ENOENT {
|
|
throw XCTSkip("System does not support locale de_DE")
|
|
} else {
|
|
XCTFail("Failed to create locale de_DE")
|
|
return
|
|
}
|
|
}
|
|
defer { freelocale(germanLocale) }
|
|
|
|
let oldLocale = uselocale(germanLocale)
|
|
defer { _ = uselocale(oldLocale) }
|
|
try body()
|
|
}
|
|
|
|
class TestHTTPDelegate: HTTPClientResponseDelegate {
|
|
typealias Response = Void
|
|
|
|
init(backpressureEventLoop: EventLoop? = nil) {
|
|
self.backpressureEventLoop = backpressureEventLoop
|
|
}
|
|
|
|
var backpressureEventLoop: EventLoop?
|
|
|
|
enum State {
|
|
case idle
|
|
case head(HTTPResponseHead)
|
|
case body(HTTPResponseHead, ByteBuffer)
|
|
case end
|
|
case error(Error)
|
|
}
|
|
|
|
var state = State.idle
|
|
|
|
func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
|
|
self.state = .head(head)
|
|
return (self.backpressureEventLoop ?? task.eventLoop).makeSucceededFuture(())
|
|
}
|
|
|
|
func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
|
|
switch self.state {
|
|
case .head(let head):
|
|
self.state = .body(head, buffer)
|
|
case .body(let head, var body):
|
|
var buffer = buffer
|
|
body.writeBuffer(&buffer)
|
|
self.state = .body(head, body)
|
|
default:
|
|
preconditionFailure("expecting head or body")
|
|
}
|
|
return (self.backpressureEventLoop ?? task.eventLoop).makeSucceededFuture(())
|
|
}
|
|
|
|
func didFinishRequest(task: HTTPClient.Task<Response>) throws {}
|
|
}
|
|
|
|
class CountingDelegate: HTTPClientResponseDelegate {
|
|
typealias Response = Int
|
|
|
|
var count = 0
|
|
|
|
func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
|
|
let str = buffer.getString(at: 0, length: buffer.readableBytes)
|
|
if str?.starts(with: "id:") ?? false {
|
|
self.count += 1
|
|
}
|
|
return task.eventLoop.makeSucceededFuture(())
|
|
}
|
|
|
|
func didFinishRequest(task: HTTPClient.Task<Response>) throws -> Int {
|
|
return self.count
|
|
}
|
|
}
|
|
|
|
class DelayOnHeadDelegate: HTTPClientResponseDelegate {
|
|
typealias Response = ByteBuffer
|
|
|
|
let eventLoop: EventLoop
|
|
let didReceiveHead: (HTTPResponseHead, EventLoopPromise<Void>) -> Void
|
|
|
|
private var data: ByteBuffer
|
|
|
|
private var mayReceiveData = false
|
|
|
|
private var expectError = false
|
|
|
|
init(eventLoop: EventLoop, didReceiveHead: @escaping (HTTPResponseHead, EventLoopPromise<Void>) -> Void) {
|
|
self.eventLoop = eventLoop
|
|
self.didReceiveHead = didReceiveHead
|
|
self.data = ByteBuffer()
|
|
}
|
|
|
|
func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
|
|
XCTAssertFalse(self.mayReceiveData)
|
|
XCTAssertFalse(self.expectError)
|
|
|
|
let promise = self.eventLoop.makePromise(of: Void.self)
|
|
promise.futureResult.whenComplete {
|
|
switch $0 {
|
|
case .success:
|
|
self.mayReceiveData = true
|
|
case .failure:
|
|
self.expectError = true
|
|
}
|
|
}
|
|
|
|
self.didReceiveHead(head, promise)
|
|
return promise.futureResult
|
|
}
|
|
|
|
func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
|
|
XCTAssertTrue(self.mayReceiveData)
|
|
XCTAssertFalse(self.expectError)
|
|
self.data.writeImmutableBuffer(buffer)
|
|
return self.eventLoop.makeSucceededFuture(())
|
|
}
|
|
|
|
func didFinishRequest(task: HTTPClient.Task<Response>) throws -> Response {
|
|
XCTAssertTrue(self.mayReceiveData)
|
|
XCTAssertFalse(self.expectError)
|
|
return self.data
|
|
}
|
|
|
|
func didReceiveError(task: HTTPClient.Task<ByteBuffer>, _ error: Error) {
|
|
XCTAssertTrue(self.expectError)
|
|
}
|
|
}
|
|
|
|
enum TemporaryFileHelpers {
|
|
private static var temporaryDirectory: String {
|
|
#if targetEnvironment(simulator)
|
|
// Simulator temp directories are so long (and contain the user name) that they're not usable
|
|
// for UNIX Domain Socket paths (which are limited to 103 bytes).
|
|
return "/tmp"
|
|
#else
|
|
#if os(Android)
|
|
return "/data/local/tmp"
|
|
#elseif os(Linux)
|
|
return "/tmp"
|
|
#else
|
|
if #available(macOS 10.12, iOS 10, tvOS 10, watchOS 3, *) {
|
|
return FileManager.default.temporaryDirectory.path
|
|
} else {
|
|
return "/tmp"
|
|
}
|
|
#endif // os
|
|
#endif // targetEnvironment
|
|
}
|
|
|
|
private static func openTemporaryFile() -> (CInt, String) {
|
|
let template = "\(temporaryDirectory)/ahc_XXXXXX"
|
|
var templateBytes = template.utf8 + [0]
|
|
let templateBytesCount = templateBytes.count
|
|
let fd = templateBytes.withUnsafeMutableBufferPointer { ptr in
|
|
ptr.baseAddress!.withMemoryRebound(to: Int8.self, capacity: templateBytesCount) { ptr in
|
|
mkstemp(ptr)
|
|
}
|
|
}
|
|
templateBytes.removeLast()
|
|
return (fd, String(decoding: templateBytes, as: Unicode.UTF8.self))
|
|
}
|
|
|
|
/// This function creates a filename that can be used for a temporary UNIX domain socket path.
|
|
///
|
|
/// If the temporary directory is too long to store a UNIX domain socket path, it will `chdir` into the temporary
|
|
/// directory and return a short-enough path. The iOS simulator is known to have too long paths.
|
|
internal static func withTemporaryUnixDomainSocketPathName<T>(directory: String = temporaryDirectory,
|
|
_ body: (String) throws -> T) throws -> T {
|
|
// this is racy but we're trying to create the shortest possible path so we can't add a directory...
|
|
let (fd, path) = self.openTemporaryFile()
|
|
close(fd)
|
|
try! FileManager.default.removeItem(atPath: path)
|
|
|
|
let saveCurrentDirectory = FileManager.default.currentDirectoryPath
|
|
let restoreSavedCWD: Bool
|
|
let shortEnoughPath: String
|
|
do {
|
|
_ = try SocketAddress(unixDomainSocketPath: path)
|
|
// this seems to be short enough for a UDS
|
|
shortEnoughPath = path
|
|
restoreSavedCWD = false
|
|
} catch SocketAddressError.unixDomainSocketPathTooLong {
|
|
FileManager.default.changeCurrentDirectoryPath(URL(fileURLWithPath: path).deletingLastPathComponent().absoluteString)
|
|
shortEnoughPath = URL(fileURLWithPath: path).lastPathComponent
|
|
restoreSavedCWD = true
|
|
print("WARNING: Path '\(path)' could not be used as UNIX domain socket path, using chdir & '\(shortEnoughPath)'")
|
|
}
|
|
defer {
|
|
if FileManager.default.fileExists(atPath: path) {
|
|
try? FileManager.default.removeItem(atPath: path)
|
|
}
|
|
if restoreSavedCWD {
|
|
FileManager.default.changeCurrentDirectoryPath(saveCurrentDirectory)
|
|
}
|
|
}
|
|
return try body(shortEnoughPath)
|
|
}
|
|
|
|
/// This function creates a filename that can be used as a temporary file.
|
|
internal static func withTemporaryFilePath<T>(
|
|
directory: String = temporaryDirectory,
|
|
_ body: (String) throws -> T
|
|
) throws -> T {
|
|
let (fd, path) = self.openTemporaryFile()
|
|
close(fd)
|
|
try! FileManager.default.removeItem(atPath: path)
|
|
|
|
defer {
|
|
if FileManager.default.fileExists(atPath: path) {
|
|
try? FileManager.default.removeItem(atPath: path)
|
|
}
|
|
}
|
|
return try body(path)
|
|
}
|
|
|
|
internal static func makeTemporaryFilePath(
|
|
directory: String = temporaryDirectory
|
|
) -> String {
|
|
let (fd, path) = self.openTemporaryFile()
|
|
close(fd)
|
|
try! FileManager.default.removeItem(atPath: path)
|
|
return path
|
|
}
|
|
|
|
internal static func removeTemporaryFile(
|
|
at path: String
|
|
) {
|
|
if FileManager.default.fileExists(atPath: path) {
|
|
try? FileManager.default.removeItem(atPath: path)
|
|
}
|
|
}
|
|
|
|
internal static func fileSize(path: String) throws -> Int? {
|
|
return try FileManager.default.attributesOfItem(atPath: path)[.size] as? Int
|
|
}
|
|
|
|
internal static func fileExists(path: String) -> Bool {
|
|
return FileManager.default.fileExists(atPath: path)
|
|
}
|
|
}
|
|
|
|
enum TestTLS {
|
|
static let certificate = try! NIOSSLCertificate(bytes: Array(cert.utf8), format: .pem)
|
|
static let privateKey = try! NIOSSLPrivateKey(bytes: Array(key.utf8), format: .pem)
|
|
}
|
|
|
|
internal final class HTTPBin<RequestHandler: ChannelInboundHandler> where
|
|
RequestHandler.InboundIn == HTTPServerRequestPart,
|
|
RequestHandler.OutboundOut == HTTPServerResponsePart {
|
|
enum BindTarget {
|
|
case unixDomainSocket(String)
|
|
case localhostIPv4RandomPort
|
|
case localhostIPv6RandomPort
|
|
}
|
|
|
|
enum Mode {
|
|
// refuses all connections
|
|
case refuse
|
|
// supports http1.1 connections only, which can be either plain text or encrypted
|
|
case http1_1(ssl: Bool = false, compress: Bool = false)
|
|
// supports http1.1 and http2 connections which must be always encrypted
|
|
case http2(
|
|
compress: Bool = false,
|
|
settings: HTTP2Settings? = nil
|
|
)
|
|
|
|
// supports request decompression and http response compression
|
|
var compress: Bool {
|
|
switch self {
|
|
case .refuse:
|
|
return false
|
|
case .http1_1(ssl: _, compress: let compress), .http2(compress: let compress, _):
|
|
return compress
|
|
}
|
|
}
|
|
|
|
var httpSettings: HTTP2Settings {
|
|
switch self {
|
|
case .http1_1, .http2(_, nil), .refuse:
|
|
return [
|
|
HTTP2Setting(parameter: .maxConcurrentStreams, value: 10),
|
|
HTTP2Setting(parameter: .maxHeaderListSize, value: HPACKDecoder.defaultMaxHeaderListSize),
|
|
]
|
|
case .http2(_, .some(let customSettings)):
|
|
return customSettings
|
|
}
|
|
}
|
|
}
|
|
|
|
enum Proxy {
|
|
case none
|
|
case simulate(authorization: String?)
|
|
}
|
|
|
|
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
|
|
|
private let activeConnCounterHandler: ConnectionsCountHandler
|
|
var activeConnections: Int {
|
|
return self.activeConnCounterHandler.currentlyActiveConnections
|
|
}
|
|
|
|
var createdConnections: Int {
|
|
return self.activeConnCounterHandler.createdConnections
|
|
}
|
|
|
|
var port: Int {
|
|
return Int(self.serverChannel.localAddress!.port!)
|
|
}
|
|
|
|
var socketAddress: SocketAddress {
|
|
return self.serverChannel.localAddress!
|
|
}
|
|
|
|
var baseURL: String {
|
|
let scheme: String = {
|
|
switch mode {
|
|
case .http1_1, .refuse:
|
|
return "http"
|
|
case .http2:
|
|
return "https"
|
|
}
|
|
}()
|
|
let host: String = {
|
|
switch self.socketAddress {
|
|
case .v4:
|
|
return self.socketAddress.ipAddress!
|
|
case .v6:
|
|
return "[\(self.socketAddress.ipAddress!)]"
|
|
case .unixDomainSocket:
|
|
return self.socketAddress.pathname!
|
|
}
|
|
}()
|
|
|
|
return "\(scheme)://\(host):\(self.port)/"
|
|
}
|
|
|
|
private let mode: Mode
|
|
private let sslContext: NIOSSLContext?
|
|
private var serverChannel: Channel!
|
|
private let isShutdown = ManagedAtomic(false)
|
|
private let handlerFactory: (Int) -> (RequestHandler)
|
|
|
|
init(
|
|
_ mode: Mode = .http1_1(ssl: false, compress: false),
|
|
proxy: Proxy = .none,
|
|
bindTarget: BindTarget = .localhostIPv4RandomPort,
|
|
handlerFactory: @escaping (Int) -> (RequestHandler)
|
|
) {
|
|
self.mode = mode
|
|
self.sslContext = HTTPBin.sslContext(for: mode)
|
|
self.handlerFactory = handlerFactory
|
|
|
|
let socketAddress: SocketAddress
|
|
switch bindTarget {
|
|
case .localhostIPv4RandomPort:
|
|
socketAddress = try! SocketAddress(ipAddress: "127.0.0.1", port: 0)
|
|
case .localhostIPv6RandomPort:
|
|
socketAddress = try! SocketAddress(ipAddress: "::1", port: 0)
|
|
case .unixDomainSocket(let path):
|
|
socketAddress = try! SocketAddress(unixDomainSocketPath: path)
|
|
}
|
|
|
|
self.activeConnCounterHandler = ConnectionsCountHandler()
|
|
|
|
let connectionIDAtomic = ManagedAtomic(0)
|
|
|
|
self.serverChannel = try! ServerBootstrap(group: self.group)
|
|
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
|
|
.serverChannelInitializer { channel in
|
|
channel.pipeline.addHandler(self.activeConnCounterHandler)
|
|
}.childChannelInitializer { channel in
|
|
do {
|
|
let connectionID = connectionIDAtomic.loadThenWrappingIncrement(ordering: .relaxed)
|
|
|
|
if case .refuse = mode {
|
|
throw HTTPBinError.refusedConnection
|
|
}
|
|
|
|
// if we need to simulate a proxy, we need to add those handlers first
|
|
if case .simulate(authorization: let expectedAuthorization) = proxy {
|
|
try self.syncAddHTTPProxyHandlers(
|
|
to: channel,
|
|
connectionID: connectionID,
|
|
expectedAuthorization: expectedAuthorization
|
|
)
|
|
return channel.eventLoop.makeSucceededVoidFuture()
|
|
}
|
|
|
|
// if a connection has been established, we need to negotiate TLS before
|
|
// anything else. Depending on the negotiation, the HTTPHandlers will be added.
|
|
if let sslContext = self.sslContext {
|
|
try self.addTLSHandlerAndUpgrader(
|
|
to: channel,
|
|
sslContext: sslContext,
|
|
connectionID: connectionID
|
|
)
|
|
return channel.eventLoop.makeSucceededVoidFuture()
|
|
}
|
|
|
|
// if neither HTTP Proxy nor TLS are wanted, we can add HTTP1 handlers directly
|
|
try self.syncAddHTTP1Handlers(to: channel, connectionID: connectionID)
|
|
return channel.eventLoop.makeSucceededVoidFuture()
|
|
} catch {
|
|
return channel.eventLoop.makeFailedFuture(error)
|
|
}
|
|
}.bind(to: socketAddress).wait()
|
|
}
|
|
|
|
private func syncAddHTTPProxyHandlers(
|
|
to channel: Channel,
|
|
connectionID: Int,
|
|
expectedAuthorization: String?
|
|
) throws {
|
|
let sync = channel.pipeline.syncOperations
|
|
let promise = channel.eventLoop.makePromise(of: Void.self)
|
|
|
|
let responseEncoder = HTTPResponseEncoder()
|
|
let requestDecoder = ByteToMessageHandler(HTTPRequestDecoder(leftOverBytesStrategy: .forwardBytes))
|
|
let proxySimulator = HTTPProxySimulator(promise: promise, expectedAuhorization: expectedAuthorization)
|
|
|
|
try sync.addHandler(responseEncoder)
|
|
try sync.addHandler(requestDecoder)
|
|
try sync.addHandler(proxySimulator)
|
|
|
|
promise.futureResult.flatMap { _ in
|
|
channel.pipeline.removeHandler(proxySimulator)
|
|
}.flatMap { _ in
|
|
channel.pipeline.removeHandler(responseEncoder)
|
|
}.flatMap { _ in
|
|
channel.pipeline.removeHandler(requestDecoder)
|
|
}.whenComplete { result in
|
|
switch result {
|
|
case .failure:
|
|
channel.close(mode: .all, promise: nil)
|
|
case .success:
|
|
self.httpProxyEstablished(channel, connectionID: connectionID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func syncAddHTTP1Handlers(to channel: Channel, connectionID: Int) throws {
|
|
let sync = channel.pipeline.syncOperations
|
|
try sync.configureHTTPServerPipeline(withPipeliningAssistance: true, withErrorHandling: true)
|
|
|
|
if self.mode.compress {
|
|
try sync.addHandler(HTTPResponseCompressor())
|
|
}
|
|
|
|
try sync.addHandler(self.handlerFactory(connectionID))
|
|
}
|
|
|
|
private func httpProxyEstablished(_ channel: Channel, connectionID: Int) {
|
|
do {
|
|
// if a connection has been established, we need to negotiate TLS before
|
|
// anything else. Depending on the negotiation, the HTTPHandlers will be added.
|
|
if let sslContext = self.sslContext {
|
|
try self.addTLSHandlerAndUpgrader(
|
|
to: channel,
|
|
sslContext: sslContext,
|
|
connectionID: connectionID
|
|
)
|
|
return
|
|
}
|
|
|
|
// if neither HTTP Proxy nor TLS are wanted, we can add HTTP1 handlers directly
|
|
try self.syncAddHTTP1Handlers(to: channel, connectionID: connectionID)
|
|
} catch {
|
|
// in case of an while modifying the pipeline we should close the connection
|
|
channel.close(mode: .all, promise: nil)
|
|
}
|
|
}
|
|
|
|
private static func tlsConfiguration(for mode: Mode) -> TLSConfiguration? {
|
|
var configuration: TLSConfiguration?
|
|
|
|
switch mode {
|
|
case .refuse, .http1_1(ssl: false, compress: _):
|
|
break
|
|
case .http2:
|
|
configuration = .makeServerConfiguration(
|
|
certificateChain: [.certificate(TestTLS.certificate)],
|
|
privateKey: .privateKey(TestTLS.privateKey)
|
|
)
|
|
configuration!.applicationProtocols = NIOHTTP2SupportedALPNProtocols
|
|
case .http1_1(ssl: true, compress: _):
|
|
configuration = .makeServerConfiguration(
|
|
certificateChain: [.certificate(TestTLS.certificate)],
|
|
privateKey: .privateKey(TestTLS.privateKey)
|
|
)
|
|
}
|
|
|
|
return configuration
|
|
}
|
|
|
|
private static func sslContext(for mode: Mode) -> NIOSSLContext? {
|
|
if let tlsConfiguration = self.tlsConfiguration(for: mode) {
|
|
return try! NIOSSLContext(configuration: tlsConfiguration)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
private func addTLSHandlerAndUpgrader(to channel: Channel, sslContext: NIOSSLContext, connectionID: Int) throws {
|
|
let sslHandler = NIOSSLServerHandler(context: sslContext)
|
|
|
|
// copy pasted from NIOHTTP2
|
|
let alpnHandler = ApplicationProtocolNegotiationHandler { result in
|
|
do {
|
|
switch result {
|
|
case .negotiated("h2"):
|
|
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
|
|
let http2Handler = NIOHTTP2Handler(
|
|
mode: .server,
|
|
initialSettings: self.mode.httpSettings
|
|
)
|
|
let multiplexer = HTTP2StreamMultiplexer(
|
|
mode: .server,
|
|
channel: channel,
|
|
inboundStreamInitializer: { channel in
|
|
do {
|
|
let sync = channel.pipeline.syncOperations
|
|
|
|
try sync.addHandler(HTTP2FramePayloadToHTTP1ServerCodec())
|
|
if self.mode.compress {
|
|
try sync.addHandler(HTTPResponseCompressor())
|
|
}
|
|
try sync.addHandler(self.handlerFactory(connectionID))
|
|
|
|
return channel.eventLoop.makeSucceededVoidFuture()
|
|
} catch {
|
|
return channel.eventLoop.makeFailedFuture(error)
|
|
}
|
|
}
|
|
)
|
|
|
|
let sync = channel.pipeline.syncOperations
|
|
|
|
try sync.addHandler(http2Handler)
|
|
try sync.addHandler(multiplexer)
|
|
case .negotiated("http/1.1"), .fallback:
|
|
// Explicit or implicit HTTP/1.1 choice.
|
|
try self.syncAddHTTP1Handlers(to: channel, connectionID: connectionID)
|
|
case .negotiated:
|
|
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
|
|
// of a user configuration error. We're going to close the connection directly.
|
|
channel.close(mode: .all, promise: nil)
|
|
throw NIOHTTP2Errors.invalidALPNToken()
|
|
}
|
|
|
|
return channel.eventLoop.makeSucceededVoidFuture()
|
|
} catch {
|
|
return channel.eventLoop.makeFailedFuture(error)
|
|
}
|
|
}
|
|
|
|
try channel.pipeline.syncOperations.addHandler(alpnHandler)
|
|
try channel.pipeline.syncOperations.addHandler(sslHandler, position: .before(alpnHandler))
|
|
}
|
|
|
|
func shutdown() throws {
|
|
self.isShutdown.store(true, ordering: .relaxed)
|
|
try self.group.syncShutdownGracefully()
|
|
}
|
|
|
|
deinit {
|
|
assert(self.isShutdown.load(ordering: .relaxed), "HTTPBin not shutdown before deinit")
|
|
}
|
|
}
|
|
|
|
extension HTTPBin where RequestHandler == HTTPBinHandler {
|
|
convenience init(
|
|
_ mode: Mode = .http1_1(ssl: false, compress: false),
|
|
proxy: Proxy = .none,
|
|
bindTarget: BindTarget = .localhostIPv4RandomPort
|
|
) {
|
|
self.init(mode, proxy: proxy, bindTarget: bindTarget) { HTTPBinHandler(connectionID: $0) }
|
|
}
|
|
}
|
|
|
|
enum HTTPBinError: Error {
|
|
case refusedConnection
|
|
case invalidProxyRequest
|
|
}
|
|
|
|
final class HTTPProxySimulator: ChannelInboundHandler, RemovableChannelHandler {
|
|
typealias InboundIn = HTTPServerRequestPart
|
|
typealias InboundOut = HTTPServerResponsePart
|
|
typealias OutboundOut = HTTPServerResponsePart
|
|
|
|
// the promise to succeed, once the proxy connection is setup
|
|
let promise: EventLoopPromise<Void>
|
|
let expectedAuhorization: String?
|
|
|
|
var head: HTTPResponseHead
|
|
|
|
init(promise: EventLoopPromise<Void>, expectedAuhorization: String?) {
|
|
self.promise = promise
|
|
self.expectedAuhorization = expectedAuhorization
|
|
self.head = HTTPResponseHead(version: .init(major: 1, minor: 1), status: .ok, headers: .init([("Content-Length", "0")]))
|
|
}
|
|
|
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
let request = self.unwrapInboundIn(data)
|
|
switch request {
|
|
case .head(let head):
|
|
guard head.method == .CONNECT else {
|
|
self.head.status = .badRequest
|
|
return
|
|
}
|
|
|
|
if let expectedAuhorization = self.expectedAuhorization {
|
|
guard let authorization = head.headers["proxy-authorization"].first,
|
|
expectedAuhorization == authorization else {
|
|
self.head.status = .proxyAuthenticationRequired
|
|
return
|
|
}
|
|
}
|
|
|
|
case .body:
|
|
()
|
|
case .end:
|
|
let okay = self.head.status != .ok
|
|
context.write(self.wrapOutboundOut(.head(self.head)), promise: nil)
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
|
|
if okay {
|
|
self.promise.fail(HTTPBinError.invalidProxyRequest)
|
|
} else {
|
|
self.promise.succeed(())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
internal struct HTTPResponseBuilder {
|
|
var head: HTTPResponseHead
|
|
var body: ByteBuffer?
|
|
|
|
init(_ version: HTTPVersion = HTTPVersion(major: 1, minor: 1), status: HTTPResponseStatus, headers: HTTPHeaders = HTTPHeaders()) {
|
|
self.head = HTTPResponseHead(version: version, status: status, headers: headers)
|
|
}
|
|
|
|
mutating func add(_ part: ByteBuffer) {
|
|
if var body = body {
|
|
var part = part
|
|
body.writeBuffer(&part)
|
|
self.body = body
|
|
} else {
|
|
self.body = part
|
|
}
|
|
}
|
|
}
|
|
|
|
internal struct RequestInfo: Codable, Equatable {
|
|
var data: String
|
|
var requestNumber: Int
|
|
var connectionNumber: Int
|
|
}
|
|
|
|
internal final class HTTPBinHandler: ChannelInboundHandler {
|
|
typealias InboundIn = HTTPServerRequestPart
|
|
typealias OutboundOut = HTTPServerResponsePart
|
|
|
|
var resps = CircularBuffer<HTTPResponseBuilder>()
|
|
var responseHeaders = HTTPHeaders()
|
|
var delay: TimeAmount = .seconds(0)
|
|
let creationDate = Date()
|
|
var shouldClose = false
|
|
var isServingRequest = false
|
|
let connectionID: Int
|
|
var requestId: Int = 0
|
|
|
|
init(connectionID: Int) {
|
|
self.connectionID = connectionID
|
|
}
|
|
|
|
func parseAndSetOptions(from head: HTTPRequestHead) {
|
|
if let delay = head.headers["X-internal-delay"].first {
|
|
if let milliseconds = Int64(delay) {
|
|
self.delay = TimeAmount.milliseconds(milliseconds)
|
|
} else {
|
|
assertionFailure("Invalid interval format")
|
|
}
|
|
} else {
|
|
self.delay = .nanoseconds(0)
|
|
}
|
|
|
|
for header in head.headers {
|
|
let needle = "x-send-back-header-"
|
|
if header.name.lowercased().starts(with: needle) {
|
|
self.responseHeaders.add(name: String(header.name.dropFirst(needle.count)),
|
|
value: header.value)
|
|
}
|
|
}
|
|
}
|
|
|
|
func writeEvents(context: ChannelHandlerContext, isContentLengthRequired: Bool = false) {
|
|
let headers: HTTPHeaders
|
|
if isContentLengthRequired {
|
|
headers = HTTPHeaders([("Content-Length", "50")])
|
|
} else {
|
|
headers = HTTPHeaders()
|
|
}
|
|
|
|
context.write(wrapOutboundOut(.head(HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok, headers: headers))), promise: nil)
|
|
for i in 0..<10 {
|
|
let msg = "id: \(i)"
|
|
var buf = context.channel.allocator.buffer(capacity: msg.count)
|
|
buf.writeString(msg)
|
|
context.writeAndFlush(wrapOutboundOut(.body(.byteBuffer(buf))), promise: nil)
|
|
Thread.sleep(forTimeInterval: 0.05)
|
|
}
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
|
|
}
|
|
|
|
func writeChunked(context: ChannelHandlerContext) {
|
|
// This tests receiving chunks very fast: please do not insert delays here!
|
|
let headers = HTTPHeaders([("Transfer-Encoding", "chunked")])
|
|
|
|
context.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok, headers: headers))), promise: nil)
|
|
for i in 0..<10 {
|
|
let msg = "id: \(i)"
|
|
var buf = context.channel.allocator.buffer(capacity: msg.count)
|
|
buf.writeString(msg)
|
|
context.write(wrapOutboundOut(.body(.byteBuffer(buf))), promise: nil)
|
|
}
|
|
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
|
|
}
|
|
|
|
func writeManyChunks(context: ChannelHandlerContext) {
|
|
// This tests receiving a lot of tiny chunks: they must all be sent in a single flush or the test doesn't work.
|
|
let headers = HTTPHeaders([("Transfer-Encoding", "chunked")])
|
|
|
|
context.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok, headers: headers))), promise: nil)
|
|
let message = ByteBuffer(integer: UInt8(ascii: "a"))
|
|
|
|
// This number (10k) is load-bearing and a bit magic: it has been experimentally verified as being sufficient to blow the stack
|
|
// in the old implementation on all testing platforms. Please don't change it without good reason.
|
|
for _ in 0..<10_000 {
|
|
context.write(wrapOutboundOut(.body(.byteBuffer(message))), promise: nil)
|
|
}
|
|
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
|
|
}
|
|
|
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
self.isServingRequest = true
|
|
switch self.unwrapInboundIn(data) {
|
|
case .head(let req):
|
|
self.responseHeaders = HTTPHeaders()
|
|
self.requestId += 1
|
|
self.parseAndSetOptions(from: req)
|
|
let urlComponents = URLComponents(string: req.uri)!
|
|
switch urlComponents.percentEncodedPath {
|
|
case "/":
|
|
var headers = self.responseHeaders
|
|
headers.add(name: "X-Is-This-Slash", value: "Yes")
|
|
self.resps.append(HTTPResponseBuilder(status: .ok, headers: headers))
|
|
return
|
|
case "/echo-uri":
|
|
var headers = self.responseHeaders
|
|
headers.add(name: "X-Calling-URI", value: req.uri)
|
|
self.resps.append(HTTPResponseBuilder(status: .ok, headers: headers))
|
|
return
|
|
case "/echo-method":
|
|
var headers = self.responseHeaders
|
|
headers.add(name: "X-Method-Used", value: req.method.rawValue)
|
|
self.resps.append(HTTPResponseBuilder(status: .ok, headers: headers))
|
|
return
|
|
case "/ok":
|
|
self.resps.append(HTTPResponseBuilder(status: .ok))
|
|
return
|
|
case "/get":
|
|
if req.method != .GET {
|
|
self.resps.append(HTTPResponseBuilder(status: .methodNotAllowed))
|
|
return
|
|
}
|
|
self.resps.append(HTTPResponseBuilder(status: .ok))
|
|
return
|
|
case "/stats":
|
|
var body = context.channel.allocator.buffer(capacity: 1)
|
|
body.writeString("Just some stats mate.")
|
|
var builder = HTTPResponseBuilder(status: .ok)
|
|
builder.add(body)
|
|
|
|
self.resps.append(builder)
|
|
case "/post":
|
|
if req.method != .POST {
|
|
self.resps.append(HTTPResponseBuilder(status: .methodNotAllowed))
|
|
return
|
|
}
|
|
self.resps.append(HTTPResponseBuilder(status: .ok))
|
|
return
|
|
case "/redirect/302":
|
|
var headers = self.responseHeaders
|
|
headers.add(name: "location", value: "/ok")
|
|
self.resps.append(HTTPResponseBuilder(status: .found, headers: headers))
|
|
return
|
|
case "/redirect/https":
|
|
let port = self.value(for: "port", from: urlComponents.query!)
|
|
var headers = self.responseHeaders
|
|
headers.add(name: "Location", value: "https://localhost:\(port)/ok")
|
|
self.resps.append(HTTPResponseBuilder(status: .found, headers: headers))
|
|
return
|
|
case "/redirect/loopback":
|
|
let port = self.value(for: "port", from: urlComponents.query!)
|
|
var headers = self.responseHeaders
|
|
headers.add(name: "Location", value: "http://127.0.0.1:\(port)/echohostheader")
|
|
self.resps.append(HTTPResponseBuilder(status: .found, headers: headers))
|
|
return
|
|
case "/redirect/infinite1":
|
|
var headers = self.responseHeaders
|
|
headers.add(name: "Location", value: "/redirect/infinite2")
|
|
self.resps.append(HTTPResponseBuilder(status: .found, headers: headers))
|
|
return
|
|
case "/redirect/infinite2":
|
|
var headers = self.responseHeaders
|
|
headers.add(name: "Location", value: "/redirect/infinite1")
|
|
self.resps.append(HTTPResponseBuilder(status: .found, headers: headers))
|
|
return
|
|
case "/redirect/target":
|
|
var headers = self.responseHeaders
|
|
let targetURL = req.headers["X-Target-Redirect-URL"].first ?? ""
|
|
headers.add(name: "Location", value: targetURL)
|
|
self.resps.append(HTTPResponseBuilder(status: .found, headers: headers))
|
|
return
|
|
case "/percent%20encoded":
|
|
if req.method != .GET {
|
|
self.resps.append(HTTPResponseBuilder(status: .methodNotAllowed))
|
|
return
|
|
}
|
|
self.resps.append(HTTPResponseBuilder(status: .ok))
|
|
return
|
|
case "/percent%2Fencoded/hello":
|
|
if req.method != .GET {
|
|
self.resps.append(HTTPResponseBuilder(status: .methodNotAllowed))
|
|
return
|
|
}
|
|
self.resps.append(HTTPResponseBuilder(status: .ok))
|
|
return
|
|
case "/echohostheader":
|
|
var builder = HTTPResponseBuilder(status: .ok)
|
|
let hostValue = req.headers["Host"].first ?? ""
|
|
let buff = context.channel.allocator.buffer(string: hostValue)
|
|
builder.add(buff)
|
|
self.resps.append(builder)
|
|
return
|
|
case "/wait":
|
|
return
|
|
case "/close":
|
|
context.close(promise: nil)
|
|
return
|
|
case "/custom":
|
|
context.writeAndFlush(wrapOutboundOut(.head(HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok))), promise: nil)
|
|
return
|
|
case "/events/10/1": // TODO: parse path
|
|
self.writeEvents(context: context)
|
|
return
|
|
case "/events/10/content-length":
|
|
self.writeEvents(context: context, isContentLengthRequired: true)
|
|
case "/chunked":
|
|
self.writeChunked(context: context)
|
|
return
|
|
case "/mega-chunked":
|
|
self.writeManyChunks(context: context)
|
|
return
|
|
case "/close-on-response":
|
|
var headers = self.responseHeaders
|
|
headers.replaceOrAdd(name: "connection", value: "close")
|
|
|
|
var builder = HTTPResponseBuilder(.http1_1, status: .ok, headers: headers)
|
|
builder.body = ByteBuffer(string: "some body content")
|
|
|
|
// We're forcing this closed now.
|
|
self.shouldClose = true
|
|
self.resps.append(builder)
|
|
default:
|
|
context.write(wrapOutboundOut(.head(HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .notFound))), promise: nil)
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
|
|
return
|
|
}
|
|
case .body(let body):
|
|
if self.resps.isEmpty {
|
|
return
|
|
}
|
|
var response = self.resps.removeFirst()
|
|
response.add(body)
|
|
self.resps.prepend(response)
|
|
case .end:
|
|
if self.resps.isEmpty {
|
|
return
|
|
}
|
|
var response = self.resps.removeFirst()
|
|
response.head.headers.add(contentsOf: self.responseHeaders)
|
|
context.write(wrapOutboundOut(.head(response.head)), promise: nil)
|
|
if let body = response.body {
|
|
let requestInfo = RequestInfo(data: String(buffer: body),
|
|
requestNumber: self.requestId,
|
|
connectionNumber: self.connectionID)
|
|
let responseBody = try! JSONEncoder().encodeAsByteBuffer(requestInfo,
|
|
allocator: context.channel.allocator)
|
|
context.write(wrapOutboundOut(.body(.byteBuffer(responseBody))), promise: nil)
|
|
} else {
|
|
let requestInfo = RequestInfo(data: "",
|
|
requestNumber: self.requestId,
|
|
connectionNumber: self.connectionID)
|
|
let responseBody = try! JSONEncoder().encodeAsByteBuffer(requestInfo,
|
|
allocator: context.channel.allocator)
|
|
context.write(wrapOutboundOut(.body(.byteBuffer(responseBody))), promise: nil)
|
|
}
|
|
context.eventLoop.scheduleTask(in: self.delay) {
|
|
guard context.channel.isActive else {
|
|
context.close(promise: nil)
|
|
return
|
|
}
|
|
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenComplete { result in
|
|
self.isServingRequest = false
|
|
switch result {
|
|
case .success:
|
|
if self.responseHeaders[canonicalForm: "X-Close-Connection"].contains("true") ||
|
|
self.shouldClose {
|
|
context.close(promise: nil)
|
|
}
|
|
case .failure(let error):
|
|
assertionFailure("\(error)")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func value(for key: String, from query: String) -> String {
|
|
let components = query.components(separatedBy: "&").map {
|
|
$0.trimmingCharacters(in: .whitespaces)
|
|
}
|
|
|
|
for component in components {
|
|
let nameAndValue = component.components(separatedBy: "=").map {
|
|
$0.trimmingCharacters(in: .whitespaces)
|
|
}
|
|
|
|
if nameAndValue[0] == key {
|
|
return nameAndValue[1]
|
|
}
|
|
}
|
|
fatalError("parameter \(key) is missing from query: \(query)")
|
|
}
|
|
}
|
|
|
|
final class ConnectionsCountHandler: ChannelInboundHandler {
|
|
typealias InboundIn = Channel
|
|
|
|
private let activeConns = ManagedAtomic(0)
|
|
private let createdConns = ManagedAtomic(0)
|
|
|
|
var createdConnections: Int {
|
|
self.createdConns.load(ordering: .relaxed)
|
|
}
|
|
|
|
var currentlyActiveConnections: Int {
|
|
self.activeConns.load(ordering: .relaxed)
|
|
}
|
|
|
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
let channel = self.unwrapInboundIn(data)
|
|
|
|
_ = self.activeConns.loadThenWrappingIncrement(ordering: .relaxed)
|
|
_ = self.createdConns.loadThenWrappingIncrement(ordering: .relaxed)
|
|
channel.closeFuture.whenComplete { _ in
|
|
_ = self.activeConns.loadThenWrappingDecrement(ordering: .relaxed)
|
|
}
|
|
|
|
context.fireChannelRead(data)
|
|
}
|
|
}
|
|
|
|
internal final class CloseWithoutClosingServerHandler: ChannelInboundHandler {
|
|
typealias InboundIn = HTTPServerRequestPart
|
|
typealias OutboundOut = HTTPServerResponsePart
|
|
|
|
private var callback: (() -> Void)?
|
|
private var onClosePromise: EventLoopPromise<Void>?
|
|
|
|
init(_ callback: @escaping () -> Void) {
|
|
self.callback = callback
|
|
}
|
|
|
|
func handlerAdded(context: ChannelHandlerContext) {
|
|
self.onClosePromise = context.eventLoop.makePromise()
|
|
self.onClosePromise!.futureResult.whenSuccess(self.callback!)
|
|
self.callback = nil
|
|
}
|
|
|
|
func handlerRemoved(context: ChannelHandlerContext) {
|
|
assert(self.onClosePromise == nil)
|
|
}
|
|
|
|
func channelInactive(context: ChannelHandlerContext) {
|
|
if let onClosePromise = self.onClosePromise {
|
|
self.onClosePromise = nil
|
|
onClosePromise.succeed(())
|
|
}
|
|
}
|
|
|
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
guard case .end = self.unwrapInboundIn(data) else {
|
|
return
|
|
}
|
|
|
|
// We're gonna send a response back here, with Connection: close, but we will
|
|
// not close the connection. This reproduces #324.
|
|
let headers = HTTPHeaders([
|
|
("Host", "CloseWithoutClosingServerHandler"),
|
|
("Content-Length", "0"),
|
|
("Connection", "close"),
|
|
])
|
|
let head = HTTPResponseHead(version: .init(major: 1, minor: 1), status: .ok, headers: headers)
|
|
|
|
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
|
|
}
|
|
}
|
|
|
|
final class ExpectClosureServerHandler: ChannelInboundHandler {
|
|
typealias InboundIn = HTTPServerRequestPart
|
|
typealias OutboundOut = HTTPServerResponsePart
|
|
|
|
private let onClosePromise: EventLoopPromise<Void>
|
|
|
|
init(onClosePromise: EventLoopPromise<Void>) {
|
|
self.onClosePromise = onClosePromise
|
|
}
|
|
|
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
switch self.unwrapInboundIn(data) {
|
|
case .head:
|
|
let head = HTTPResponseHead(version: .http1_1, status: .ok, headers: ["Content-Length": "0"])
|
|
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
|
|
case .body, .end:
|
|
()
|
|
}
|
|
}
|
|
|
|
func channelInactive(context: ChannelHandlerContext) {
|
|
self.onClosePromise.succeed(())
|
|
}
|
|
}
|
|
|
|
struct EventLoopFutureTimeoutError: Error {}
|
|
|
|
extension EventLoopFuture {
|
|
func timeout(after failDelay: TimeAmount) -> EventLoopFuture<Value> {
|
|
let promise = self.eventLoop.makePromise(of: Value.self)
|
|
|
|
self.whenComplete { result in
|
|
switch result {
|
|
case .success(let value):
|
|
promise.succeed(value)
|
|
case .failure(let error):
|
|
promise.fail(error)
|
|
}
|
|
}
|
|
|
|
self.eventLoop.scheduleTask(in: failDelay) {
|
|
promise.fail(EventLoopFutureTimeoutError())
|
|
}
|
|
|
|
return promise.futureResult
|
|
}
|
|
}
|
|
|
|
struct CollectEverythingLogHandler: LogHandler {
|
|
var metadata: Logger.Metadata = [:]
|
|
var logLevel: Logger.Level = .info
|
|
let logStore: LogStore
|
|
|
|
class LogStore {
|
|
struct Entry {
|
|
var level: Logger.Level
|
|
var message: String
|
|
var metadata: [String: String]
|
|
}
|
|
|
|
var lock = NIOLock()
|
|
var logs: [Entry] = []
|
|
|
|
var allEntries: [Entry] {
|
|
get {
|
|
return self.lock.withLock { self.logs }
|
|
}
|
|
set {
|
|
self.lock.withLock { self.logs = newValue }
|
|
}
|
|
}
|
|
|
|
func append(level: Logger.Level, message: Logger.Message, metadata: Logger.Metadata?) {
|
|
self.lock.withLock {
|
|
self.logs.append(Entry(level: level,
|
|
message: message.description,
|
|
metadata: metadata?.mapValues { $0.description } ?? [:]))
|
|
}
|
|
}
|
|
}
|
|
|
|
init(logStore: LogStore) {
|
|
self.logStore = logStore
|
|
}
|
|
|
|
func log(level: Logger.Level,
|
|
message: Logger.Message,
|
|
metadata: Logger.Metadata?,
|
|
file: String, function: String, line: UInt) {
|
|
self.logStore.append(level: level, message: message, metadata: self.metadata.merging(metadata ?? [:]) { $1 })
|
|
}
|
|
|
|
subscript(metadataKey key: String) -> Logger.Metadata.Value? {
|
|
get {
|
|
return self.metadata[key]
|
|
}
|
|
set {
|
|
self.metadata[key] = newValue
|
|
}
|
|
}
|
|
}
|
|
|
|
/// A ``HTTPClientResponseDelegate`` that buffers the incoming response parts for the consumer. The consumer can
|
|
/// consume the bytes by calling ``next()`` on the delegate.
|
|
///
|
|
/// The sole purpose of this class is to enable straight-line stream tests.
|
|
class ResponseStreamDelegate: HTTPClientResponseDelegate {
|
|
typealias Response = Void
|
|
|
|
enum State {
|
|
/// The delegate is in the idle state. There are no http response parts to be buffered
|
|
/// and the consumer did not signal a demand. Transitions to all other states are allowed.
|
|
case idle
|
|
/// The consumer has signaled a demand for more bytes, but none where available. Can
|
|
/// transition to `.idle` (when new bytes arrive), `.finished` (when the stream finishes or fails)
|
|
case waitingForBytes(EventLoopPromise<ByteBuffer?>)
|
|
/// The consumer has signaled no further demand but bytes keep arriving. Valid transitions
|
|
/// to `.idle` (when bytes are consumed), `.finished` (when bytes are consumed, and the
|
|
/// stream has ended), `.failed` (if an error is forwarded)
|
|
case buffering(ByteBuffer, done: Bool)
|
|
/// Stores an error for consumption. Valid transitions are: `.finished`, when the error was consumed.
|
|
case failed(Error)
|
|
/// The stream has finished and all bytes or errors where consumed.
|
|
case finished
|
|
}
|
|
|
|
let eventLoop: EventLoop
|
|
private var state: State = .idle
|
|
|
|
init(eventLoop: EventLoop) {
|
|
self.eventLoop = eventLoop
|
|
}
|
|
|
|
func next() -> EventLoopFuture<ByteBuffer?> {
|
|
if self.eventLoop.inEventLoop {
|
|
return self.next0()
|
|
} else {
|
|
return self.eventLoop.flatSubmit {
|
|
self.next0()
|
|
}
|
|
}
|
|
}
|
|
|
|
private func next0() -> EventLoopFuture<ByteBuffer?> {
|
|
switch self.state {
|
|
case .idle:
|
|
let promise = self.eventLoop.makePromise(of: ByteBuffer?.self)
|
|
self.state = .waitingForBytes(promise)
|
|
return promise.futureResult
|
|
|
|
case .buffering(let byteBuffer, done: false):
|
|
self.state = .idle
|
|
return self.eventLoop.makeSucceededFuture(byteBuffer)
|
|
|
|
case .buffering(let byteBuffer, done: true):
|
|
self.state = .finished
|
|
return self.eventLoop.makeSucceededFuture(byteBuffer)
|
|
|
|
case .waitingForBytes:
|
|
preconditionFailure("Don't call `.next` twice")
|
|
|
|
case .failed(let error):
|
|
self.state = .finished
|
|
return self.eventLoop.makeFailedFuture(error)
|
|
|
|
case .finished:
|
|
return self.eventLoop.makeSucceededFuture(nil)
|
|
}
|
|
}
|
|
|
|
// MARK: HTTPClientResponseDelegate
|
|
|
|
func didSendRequestHead(task: HTTPClient.Task<Response>, _ head: HTTPRequestHead) {
|
|
self.eventLoop.preconditionInEventLoop()
|
|
}
|
|
|
|
func didSendRequestPart(task: HTTPClient.Task<Response>, _ part: IOData) {
|
|
self.eventLoop.preconditionInEventLoop()
|
|
}
|
|
|
|
func didSendRequest(task: HTTPClient.Task<Response>) {
|
|
self.eventLoop.preconditionInEventLoop()
|
|
}
|
|
|
|
func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
|
|
self.eventLoop.preconditionInEventLoop()
|
|
return task.eventLoop.makeSucceededVoidFuture()
|
|
}
|
|
|
|
func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
|
|
self.eventLoop.preconditionInEventLoop()
|
|
|
|
switch self.state {
|
|
case .idle:
|
|
self.state = .buffering(buffer, done: false)
|
|
case .waitingForBytes(let promise):
|
|
self.state = .idle
|
|
promise.succeed(buffer)
|
|
case .buffering(var byteBuffer, done: false):
|
|
var buffer = buffer
|
|
byteBuffer.writeBuffer(&buffer)
|
|
self.state = .buffering(byteBuffer, done: false)
|
|
case .buffering(_, done: true), .finished, .failed:
|
|
preconditionFailure("Invalid state: \(self.state)")
|
|
}
|
|
|
|
return task.eventLoop.makeSucceededVoidFuture()
|
|
}
|
|
|
|
func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {
|
|
self.eventLoop.preconditionInEventLoop()
|
|
|
|
switch self.state {
|
|
case .idle:
|
|
self.state = .failed(error)
|
|
case .waitingForBytes(let promise):
|
|
self.state = .finished
|
|
promise.fail(error)
|
|
case .buffering(_, done: false):
|
|
self.state = .failed(error)
|
|
case .buffering(_, done: true), .finished, .failed:
|
|
preconditionFailure("Invalid state: \(self.state)")
|
|
}
|
|
}
|
|
|
|
func didFinishRequest(task: HTTPClient.Task<Response>) throws {
|
|
self.eventLoop.preconditionInEventLoop()
|
|
|
|
switch self.state {
|
|
case .idle:
|
|
self.state = .finished
|
|
case .waitingForBytes(let promise):
|
|
self.state = .finished
|
|
promise.succeed(nil)
|
|
case .buffering(let byteBuffer, done: false):
|
|
self.state = .buffering(byteBuffer, done: true)
|
|
case .buffering(_, done: true), .finished, .failed:
|
|
preconditionFailure("Invalid state: \(self.state)")
|
|
}
|
|
}
|
|
}
|
|
|
|
class HTTPEchoHandler: ChannelInboundHandler {
|
|
typealias InboundIn = HTTPServerRequestPart
|
|
typealias OutboundOut = HTTPServerResponsePart
|
|
|
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
let request = self.unwrapInboundIn(data)
|
|
switch request {
|
|
case .head(let requestHead):
|
|
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok, headers: requestHead.headers))), promise: nil)
|
|
case .body(let bytes):
|
|
context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(bytes))), promise: nil)
|
|
case .end:
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenSuccess {
|
|
context.close(promise: nil)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
final class HTTPEchoHeaders: ChannelInboundHandler {
|
|
typealias InboundIn = HTTPServerRequestPart
|
|
typealias OutboundOut = HTTPServerResponsePart
|
|
|
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
let request = self.unwrapInboundIn(data)
|
|
switch request {
|
|
case .head(let requestHead):
|
|
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok, headers: requestHead.headers))), promise: nil)
|
|
case .body:
|
|
break
|
|
case .end:
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenSuccess {
|
|
context.close(promise: nil)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
final class HTTP200DelayedHandler: ChannelInboundHandler {
|
|
typealias InboundIn = HTTPServerRequestPart
|
|
typealias OutboundOut = HTTPServerResponsePart
|
|
|
|
var pendingBodyParts: Int?
|
|
|
|
init(bodyPartsBeforeResponse: Int) {
|
|
self.pendingBodyParts = bodyPartsBeforeResponse
|
|
}
|
|
|
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
let request = self.unwrapInboundIn(data)
|
|
switch request {
|
|
case .head:
|
|
// Once we have received one response, all further requests are responded to immediately.
|
|
if self.pendingBodyParts == nil {
|
|
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok))), promise: nil)
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
|
|
}
|
|
case .body:
|
|
if let pendingBodyParts = self.pendingBodyParts {
|
|
if pendingBodyParts > 0 {
|
|
self.pendingBodyParts = pendingBodyParts - 1
|
|
} else {
|
|
self.pendingBodyParts = nil
|
|
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok))), promise: nil)
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
|
|
}
|
|
}
|
|
case .end:
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
private let cert = """
|
|
-----BEGIN CERTIFICATE-----
|
|
MIICmDCCAYACCQCPC8JDqMh1zzANBgkqhkiG9w0BAQsFADANMQswCQYDVQQGEwJ1
|
|
czAgFw0xODEwMzExNTU1MjJaGA8yMTE4MTAwNzE1NTUyMlowDTELMAkGA1UEBhMC
|
|
dXMwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDiC+TGmbSP/nWWN1tj
|
|
yNfnWCU5ATjtIOfdtP6ycx8JSeqkvyNXG21kNUn14jTTU8BglGL2hfVpCbMisUdb
|
|
d3LpP8unSsvlOWwORFOViSy4YljSNM/FNoMtavuITA/sEELYgjWkz2o/uHPZHud9
|
|
+JQwGJgqIlMa3mr2IaaUZlWN3D1u88bzJYhpt3YyxRy9+OEoOKy36KdWwhKzV3S8
|
|
kXb0Y1GbAo68jJ9RfzeLy290mIs9qG2y1CNXWO6sxf6B//LaalizZiCfzYAVKcNR
|
|
9oNYsEJc5KB/+DsAGTzR7mL+oiU4h/vwVb2GTDat5C+PFGi6j1ujxYTRPO538ljg
|
|
dslnAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAFYhA7sw8odOsRO8/DUklBOjPnmn
|
|
a078oSumgPXXw6AgcoAJv/Qthjo6CCEtrjYfcA9jaBw9/Tii7mDmqDRS5c9ZPL8+
|
|
NEPdHjFCFBOEvlL6uHOgw0Z9Wz+5yCXnJ8oNUEgc3H2NbbzJF6sMBXSPtFS2NOK8
|
|
OsAI9OodMrDd6+lwljrmFoCCkJHDEfE637IcsbgFKkzhO/oNCRK6OrudG4teDahz
|
|
Au4LoEYwT730QKC/VQxxEVZobjn9/sTrq9CZlbPYHxX4fz6e00sX7H9i49vk9zQ5
|
|
5qCm9ljhrQPSa42Q62PPE2BEEGSP2KBm0J+H3vlvCD6+SNc/nMZjrRmgjrI=
|
|
-----END CERTIFICATE-----
|
|
"""
|
|
|
|
private let key = """
|
|
-----BEGIN PRIVATE KEY-----
|
|
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDiC+TGmbSP/nWW
|
|
N1tjyNfnWCU5ATjtIOfdtP6ycx8JSeqkvyNXG21kNUn14jTTU8BglGL2hfVpCbMi
|
|
sUdbd3LpP8unSsvlOWwORFOViSy4YljSNM/FNoMtavuITA/sEELYgjWkz2o/uHPZ
|
|
Hud9+JQwGJgqIlMa3mr2IaaUZlWN3D1u88bzJYhpt3YyxRy9+OEoOKy36KdWwhKz
|
|
V3S8kXb0Y1GbAo68jJ9RfzeLy290mIs9qG2y1CNXWO6sxf6B//LaalizZiCfzYAV
|
|
KcNR9oNYsEJc5KB/+DsAGTzR7mL+oiU4h/vwVb2GTDat5C+PFGi6j1ujxYTRPO53
|
|
8ljgdslnAgMBAAECggEBANZNWFNAnYJ2R5xmVuo/GxFk68Ujd4i4TZpPYbhkk+QG
|
|
g8I0w5htlEQQkVHfZx2CpTvq8feuAH/YhlA5qeD5WaPwq26q5qsmyV6tQGDgb9lO
|
|
w85l6ySZDbwdVOJe2il/MSB6MclSKvTGNm59chJnfHYsmvY3HHq4qsc2F+tRKYMW
|
|
pY75LgEbaTUV69J3cbC1wAeVjv0q/krND+YkhYpTxNZhbazK/FHOCvY+zFu9fg0L
|
|
zpwbn5fb6wIvqG7tXp7koa3QMn64AXmO/fb5mBd8G2vBGYnxwb7Egwdg/3Dw+BXu
|
|
ynQLP7ixWsE2KNfR9Ce1i3YvEo6QDTv2340I3dntxkECgYEA9vdaL4PGyvEbpim4
|
|
kqz1vuug8Iq0nTVDo6jmgH1o+XdcIbW3imXtgi5zUJpj4oDD7/4aufiJZjG64i/v
|
|
phe11xeUvh5QNNOzeMymVDoJut97F97KKKTv7bG8Rpon/WzH2I0SoAkECCwmdWAJ
|
|
H3nvOCnXEkpbCqmIUvHVURPRDn8CgYEA6lCk3EzFQlbXs3Sj5op61R3Mscx7/35A
|
|
eGv5axzbENHt1so+s3Zvyyi1bo4VBcwnKVCvQjmTuLiqrc9VfX8XdbiTUNnEr2u3
|
|
992Ja6DEJTZ9gy5WiviwYnwU2HpjwOVNBb17T0NLoRHkDZ6iXj7NZgwizOki5p3j
|
|
/hS0pObSIRkCgYEAiEdOGNIarHoHy9VR6H5QzR2xHYssx2NRA8p8B4MsnhxjVqaz
|
|
tUcxnJiNQXkwjRiJBrGthdnD2ASxH4dcMsb6rMpyZcbMc5ouewZS8j9khx4zCqUB
|
|
4RPC4eMmBb+jOZEBZlnSYUUYWHokbrij0B61BsTvzUQCoQuUElEoaSkKP3kCgYEA
|
|
mwdqXHvK076jjo9w1drvtEu4IDc8H2oH++TsrEr2QiWzaDZ9z71f8BnqGNCW5jQS
|
|
AQrqOjXgIArGmqMgXB0Xh4LsrUS4Fpx9ptiD0JsYy8pGtuGUzvQFt9OC80ve7kSI
|
|
dnDMwj+zLUmqCrzXjuWcfpUu/UaPGeiDbZuDfcteYhkCgYBLyL5JY7Qd4gVQIhFX
|
|
7Sv3sNJN3KZCQHEzut7IwojaxgpuxiFvgsoXXuYolVCQp32oWbYcE2Yke+hOKsTE
|
|
sCMAWZiSGN2Nrfea730IYAXkUm8bpEd3VxDXEEv13nxVeQof+JGMdlkldFGaBRDU
|
|
oYQsPj00S3/GA9WDapwe81Wl2A==
|
|
-----END PRIVATE KEY-----
|
|
"""
|