mirror of
https://github.com/swift-server/async-http-client.git
synced 2026-05-03 07:32:29 +00:00
ffcd1e1a1c
Motivation: TaskHandler unconditionally releases it's connection on error, this can lead to double release. This issue actually indicates a more general issue where handler continues to handle errors even after its state is `.endOrError`. We need to fix this by ignoring all subsequent errors. Modifications: 1. Check state before calling out delegate and pool 2. Replace all error callouts with call to `errorCaught` Result: Fixes #294
2631 lines
123 KiB
Swift
2631 lines
123 KiB
Swift
//===----------------------------------------------------------------------===//
|
|
//
|
|
// This source file is part of the AsyncHTTPClient open source project
|
|
//
|
|
// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors
|
|
// Licensed under Apache License v2.0
|
|
//
|
|
// See LICENSE.txt for license information
|
|
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
|
|
//
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
|
|
/* NOT @testable */ import AsyncHTTPClient // Tests that need @testable go into HTTPClientInternalTests.swift
|
|
#if canImport(Network)
|
|
import Network
|
|
#endif
|
|
import Logging
|
|
import NIO
|
|
import NIOConcurrencyHelpers
|
|
import NIOFoundationCompat
|
|
import NIOHTTP1
|
|
import NIOHTTPCompression
|
|
import NIOSSL
|
|
import NIOTestUtils
|
|
import NIOTransportServices
|
|
import XCTest
|
|
|
|
class HTTPClientTests: XCTestCase {
|
|
typealias Request = HTTPClient.Request
|
|
|
|
var clientGroup: EventLoopGroup!
|
|
var serverGroup: EventLoopGroup!
|
|
var defaultHTTPBin: HTTPBin!
|
|
var defaultClient: HTTPClient!
|
|
var backgroundLogStore: CollectEverythingLogHandler.LogStore!
|
|
|
|
var defaultHTTPBinURLPrefix: String {
|
|
return "http://localhost:\(self.defaultHTTPBin.port)/"
|
|
}
|
|
|
|
override func setUp() {
|
|
XCTAssertNil(self.clientGroup)
|
|
XCTAssertNil(self.serverGroup)
|
|
XCTAssertNil(self.defaultHTTPBin)
|
|
XCTAssertNil(self.defaultClient)
|
|
XCTAssertNil(self.backgroundLogStore)
|
|
|
|
self.clientGroup = getDefaultEventLoopGroup(numberOfThreads: 1)
|
|
self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
|
self.defaultHTTPBin = HTTPBin()
|
|
self.backgroundLogStore = CollectEverythingLogHandler.LogStore()
|
|
var backgroundLogger = Logger(label: "\(#function)", factory: { _ in
|
|
CollectEverythingLogHandler(logStore: self.backgroundLogStore!)
|
|
})
|
|
backgroundLogger.logLevel = .trace
|
|
self.defaultClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
backgroundActivityLogger: backgroundLogger)
|
|
}
|
|
|
|
override func tearDown() {
|
|
if let defaultClient = self.defaultClient {
|
|
XCTAssertNoThrow(try defaultClient.syncShutdown())
|
|
self.defaultClient = nil
|
|
}
|
|
|
|
XCTAssertNotNil(self.defaultHTTPBin)
|
|
XCTAssertNoThrow(try self.defaultHTTPBin.shutdown())
|
|
self.defaultHTTPBin = nil
|
|
|
|
XCTAssertNotNil(self.clientGroup)
|
|
XCTAssertNoThrow(try self.clientGroup.syncShutdownGracefully())
|
|
self.clientGroup = nil
|
|
|
|
XCTAssertNotNil(self.serverGroup)
|
|
XCTAssertNoThrow(try self.serverGroup.syncShutdownGracefully())
|
|
self.serverGroup = nil
|
|
|
|
XCTAssertNotNil(self.backgroundLogStore)
|
|
self.backgroundLogStore = nil
|
|
}
|
|
|
|
func testRequestURI() throws {
|
|
let request1 = try Request(url: "https://someserver.com:8888/some/path?foo=bar")
|
|
XCTAssertEqual(request1.url.host, "someserver.com")
|
|
XCTAssertEqual(request1.url.path, "/some/path")
|
|
XCTAssertEqual(request1.url.query!, "foo=bar")
|
|
XCTAssertEqual(request1.port, 8888)
|
|
XCTAssertTrue(request1.useTLS)
|
|
|
|
let request2 = try Request(url: "https://someserver.com")
|
|
XCTAssertEqual(request2.url.path, "")
|
|
|
|
let request3 = try Request(url: "unix:///tmp/file")
|
|
XCTAssertNil(request3.url.host)
|
|
XCTAssertEqual(request3.host, "")
|
|
XCTAssertEqual(request3.url.path, "/tmp/file")
|
|
XCTAssertEqual(request3.port, 80)
|
|
XCTAssertFalse(request3.useTLS)
|
|
|
|
let request4 = try Request(url: "http+unix://%2Ftmp%2Ffile/file/path")
|
|
XCTAssertEqual(request4.host, "")
|
|
XCTAssertEqual(request4.url.host, "/tmp/file")
|
|
XCTAssertEqual(request4.url.path, "/file/path")
|
|
XCTAssertFalse(request4.useTLS)
|
|
|
|
let request5 = try Request(url: "https+unix://%2Ftmp%2Ffile/file/path")
|
|
XCTAssertEqual(request5.host, "")
|
|
XCTAssertEqual(request5.url.host, "/tmp/file")
|
|
XCTAssertEqual(request5.url.path, "/file/path")
|
|
XCTAssertTrue(request5.useTLS)
|
|
}
|
|
|
|
func testBadRequestURI() throws {
|
|
XCTAssertThrowsError(try Request(url: "some/path"), "should throw") { error in
|
|
XCTAssertEqual(error as! HTTPClientError, HTTPClientError.emptyScheme)
|
|
}
|
|
XCTAssertThrowsError(try Request(url: "app://somewhere/some/path?foo=bar"), "should throw") { error in
|
|
XCTAssertEqual(error as! HTTPClientError, HTTPClientError.unsupportedScheme("app"))
|
|
}
|
|
XCTAssertThrowsError(try Request(url: "https:/foo"), "should throw") { error in
|
|
XCTAssertEqual(error as! HTTPClientError, HTTPClientError.emptyHost)
|
|
}
|
|
XCTAssertThrowsError(try Request(url: "http+unix:///path"), "should throw") { error in
|
|
XCTAssertEqual(error as! HTTPClientError, HTTPClientError.missingSocketPath)
|
|
}
|
|
}
|
|
|
|
func testSchemaCasing() throws {
|
|
XCTAssertNoThrow(try Request(url: "hTTpS://someserver.com:8888/some/path?foo=bar"))
|
|
XCTAssertNoThrow(try Request(url: "uNIx:///some/path"))
|
|
XCTAssertNoThrow(try Request(url: "hTtP+uNIx://%2Fsome%2Fpath/"))
|
|
XCTAssertNoThrow(try Request(url: "hTtPS+uNIx://%2Fsome%2Fpath/"))
|
|
}
|
|
|
|
func testURLSocketPathInitializers() throws {
|
|
let url1 = URL(httpURLWithSocketPath: "/tmp/file")
|
|
XCTAssertNotNil(url1)
|
|
if let url = url1 {
|
|
XCTAssertEqual(url.scheme, "http+unix")
|
|
XCTAssertEqual(url.host, "/tmp/file")
|
|
XCTAssertEqual(url.path, "/")
|
|
XCTAssertEqual(url.absoluteString, "http+unix://%2Ftmp%2Ffile/")
|
|
}
|
|
|
|
let url2 = URL(httpURLWithSocketPath: "/tmp/file", uri: "/file/path")
|
|
XCTAssertNotNil(url2)
|
|
if let url = url2 {
|
|
XCTAssertEqual(url.scheme, "http+unix")
|
|
XCTAssertEqual(url.host, "/tmp/file")
|
|
XCTAssertEqual(url.path, "/file/path")
|
|
XCTAssertEqual(url.absoluteString, "http+unix://%2Ftmp%2Ffile/file/path")
|
|
}
|
|
|
|
let url3 = URL(httpURLWithSocketPath: "/tmp/file", uri: "file/path")
|
|
XCTAssertNotNil(url3)
|
|
if let url = url3 {
|
|
XCTAssertEqual(url.scheme, "http+unix")
|
|
XCTAssertEqual(url.host, "/tmp/file")
|
|
XCTAssertEqual(url.path, "/file/path")
|
|
XCTAssertEqual(url.absoluteString, "http+unix://%2Ftmp%2Ffile/file/path")
|
|
}
|
|
|
|
let url4 = URL(httpURLWithSocketPath: "/tmp/file with spacesと漢字", uri: "file/path")
|
|
XCTAssertNotNil(url4)
|
|
if let url = url4 {
|
|
XCTAssertEqual(url.scheme, "http+unix")
|
|
XCTAssertEqual(url.host, "/tmp/file with spacesと漢字")
|
|
XCTAssertEqual(url.path, "/file/path")
|
|
XCTAssertEqual(url.absoluteString, "http+unix://%2Ftmp%2Ffile%20with%20spaces%E3%81%A8%E6%BC%A2%E5%AD%97/file/path")
|
|
}
|
|
|
|
let url5 = URL(httpsURLWithSocketPath: "/tmp/file")
|
|
XCTAssertNotNil(url5)
|
|
if let url = url5 {
|
|
XCTAssertEqual(url.scheme, "https+unix")
|
|
XCTAssertEqual(url.host, "/tmp/file")
|
|
XCTAssertEqual(url.path, "/")
|
|
XCTAssertEqual(url.absoluteString, "https+unix://%2Ftmp%2Ffile/")
|
|
}
|
|
|
|
let url6 = URL(httpsURLWithSocketPath: "/tmp/file", uri: "/file/path")
|
|
XCTAssertNotNil(url6)
|
|
if let url = url6 {
|
|
XCTAssertEqual(url.scheme, "https+unix")
|
|
XCTAssertEqual(url.host, "/tmp/file")
|
|
XCTAssertEqual(url.path, "/file/path")
|
|
XCTAssertEqual(url.absoluteString, "https+unix://%2Ftmp%2Ffile/file/path")
|
|
}
|
|
|
|
let url7 = URL(httpsURLWithSocketPath: "/tmp/file", uri: "file/path")
|
|
XCTAssertNotNil(url7)
|
|
if let url = url7 {
|
|
XCTAssertEqual(url.scheme, "https+unix")
|
|
XCTAssertEqual(url.host, "/tmp/file")
|
|
XCTAssertEqual(url.path, "/file/path")
|
|
XCTAssertEqual(url.absoluteString, "https+unix://%2Ftmp%2Ffile/file/path")
|
|
}
|
|
|
|
let url8 = URL(httpsURLWithSocketPath: "/tmp/file with spacesと漢字", uri: "file/path")
|
|
XCTAssertNotNil(url8)
|
|
if let url = url8 {
|
|
XCTAssertEqual(url.scheme, "https+unix")
|
|
XCTAssertEqual(url.host, "/tmp/file with spacesと漢字")
|
|
XCTAssertEqual(url.path, "/file/path")
|
|
XCTAssertEqual(url.absoluteString, "https+unix://%2Ftmp%2Ffile%20with%20spaces%E3%81%A8%E6%BC%A2%E5%AD%97/file/path")
|
|
}
|
|
|
|
let url9 = URL(httpURLWithSocketPath: "/tmp/file", uri: " ")
|
|
XCTAssertNil(url9)
|
|
|
|
let url10 = URL(httpsURLWithSocketPath: "/tmp/file", uri: " ")
|
|
XCTAssertNil(url10)
|
|
}
|
|
|
|
func testConvenienceExecuteMethods() throws {
|
|
XCTAssertNoThrow(XCTAssertEqual(["GET"[...]],
|
|
try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "echo-method").wait().headers[canonicalForm: "X-Method-Used"]))
|
|
XCTAssertNoThrow(XCTAssertEqual(["POST"[...]],
|
|
try self.defaultClient.post(url: self.defaultHTTPBinURLPrefix + "echo-method").wait().headers[canonicalForm: "X-Method-Used"]))
|
|
XCTAssertNoThrow(XCTAssertEqual(["PATCH"[...]],
|
|
try self.defaultClient.patch(url: self.defaultHTTPBinURLPrefix + "echo-method").wait().headers[canonicalForm: "X-Method-Used"]))
|
|
XCTAssertNoThrow(XCTAssertEqual(["PUT"[...]],
|
|
try self.defaultClient.put(url: self.defaultHTTPBinURLPrefix + "echo-method").wait().headers[canonicalForm: "X-Method-Used"]))
|
|
XCTAssertNoThrow(XCTAssertEqual(["DELETE"[...]],
|
|
try self.defaultClient.delete(url: self.defaultHTTPBinURLPrefix + "echo-method").wait().headers[canonicalForm: "X-Method-Used"]))
|
|
XCTAssertNoThrow(XCTAssertEqual(["GET"[...]],
|
|
try self.defaultClient.execute(url: self.defaultHTTPBinURLPrefix + "echo-method").wait().headers[canonicalForm: "X-Method-Used"]))
|
|
XCTAssertNoThrow(XCTAssertEqual(["CHECKOUT"[...]],
|
|
try self.defaultClient.execute(.CHECKOUT, url: self.defaultHTTPBinURLPrefix + "echo-method").wait().headers[canonicalForm: "X-Method-Used"]))
|
|
}
|
|
|
|
func testConvenienceExecuteMethodsOverSocket() throws {
|
|
XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { path in
|
|
let localSocketPathHTTPBin = HTTPBin(bindTarget: .unixDomainSocket(path))
|
|
defer {
|
|
XCTAssertNoThrow(try localSocketPathHTTPBin.shutdown())
|
|
}
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(["GET"[...]],
|
|
try self.defaultClient.execute(socketPath: path, urlPath: "echo-method").wait().headers[canonicalForm: "X-Method-Used"]))
|
|
XCTAssertNoThrow(XCTAssertEqual(["GET"[...]],
|
|
try self.defaultClient.execute(.GET, socketPath: path, urlPath: "echo-method").wait().headers[canonicalForm: "X-Method-Used"]))
|
|
XCTAssertNoThrow(XCTAssertEqual(["POST"[...]],
|
|
try self.defaultClient.execute(.POST, socketPath: path, urlPath: "echo-method").wait().headers[canonicalForm: "X-Method-Used"]))
|
|
})
|
|
}
|
|
|
|
func testConvenienceExecuteMethodsOverSecureSocket() throws {
|
|
XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { path in
|
|
let localSocketPathHTTPBin = HTTPBin(ssl: true, bindTarget: .unixDomainSocket(path))
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localSocketPathHTTPBin.shutdown())
|
|
}
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(["GET"[...]],
|
|
try localClient.execute(secureSocketPath: path, urlPath: "echo-method").wait().headers[canonicalForm: "X-Method-Used"]))
|
|
XCTAssertNoThrow(XCTAssertEqual(["GET"[...]],
|
|
try localClient.execute(.GET, secureSocketPath: path, urlPath: "echo-method").wait().headers[canonicalForm: "X-Method-Used"]))
|
|
XCTAssertNoThrow(XCTAssertEqual(["POST"[...]],
|
|
try localClient.execute(.POST, secureSocketPath: path, urlPath: "echo-method").wait().headers[canonicalForm: "X-Method-Used"]))
|
|
})
|
|
}
|
|
|
|
func testGet() throws {
|
|
let response = try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "get").wait()
|
|
XCTAssertEqual(.ok, response.status)
|
|
}
|
|
|
|
func testGetWithDifferentEventLoopBackpressure() throws {
|
|
let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "events/10/1")
|
|
let delegate = TestHTTPDelegate(backpressureEventLoop: self.serverGroup.next())
|
|
let task = self.defaultClient.execute(request: request, delegate: delegate)
|
|
try task.wait()
|
|
}
|
|
|
|
func testPost() throws {
|
|
let response = try self.defaultClient.post(url: self.defaultHTTPBinURLPrefix + "post", body: .string("1234")).wait()
|
|
let bytes = response.body.flatMap { $0.getData(at: 0, length: $0.readableBytes) }
|
|
let data = try JSONDecoder().decode(RequestInfo.self, from: bytes!)
|
|
|
|
XCTAssertEqual(.ok, response.status)
|
|
XCTAssertEqual("1234", data.data)
|
|
}
|
|
|
|
func testGetHttps() throws {
|
|
let localHTTPBin = HTTPBin(ssl: true)
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
|
|
let response = try localClient.get(url: "https://localhost:\(localHTTPBin.port)/get").wait()
|
|
XCTAssertEqual(.ok, response.status)
|
|
}
|
|
|
|
func testGetHttpsWithIP() throws {
|
|
let localHTTPBin = HTTPBin(ssl: true)
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
|
|
let response = try localClient.get(url: "https://127.0.0.1:\(localHTTPBin.port)/get").wait()
|
|
XCTAssertEqual(.ok, response.status)
|
|
}
|
|
|
|
func testPostHttps() throws {
|
|
let localHTTPBin = HTTPBin(ssl: true)
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
|
|
let request = try Request(url: "https://localhost:\(localHTTPBin.port)/post", method: .POST, body: .string("1234"))
|
|
|
|
let response = try localClient.execute(request: request).wait()
|
|
let bytes = response.body.flatMap { $0.getData(at: 0, length: $0.readableBytes) }
|
|
let data = try JSONDecoder().decode(RequestInfo.self, from: bytes!)
|
|
|
|
XCTAssertEqual(.ok, response.status)
|
|
XCTAssertEqual("1234", data.data)
|
|
}
|
|
|
|
func testHttpRedirect() throws {
|
|
let httpsBin = HTTPBin(ssl: true)
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none, redirectConfiguration: .follow(max: 10, allowCycles: true)))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try httpsBin.shutdown())
|
|
}
|
|
|
|
var response = try localClient.get(url: self.defaultHTTPBinURLPrefix + "redirect/302").wait()
|
|
XCTAssertEqual(response.status, .ok)
|
|
|
|
response = try localClient.get(url: self.defaultHTTPBinURLPrefix + "redirect/https?port=\(httpsBin.port)").wait()
|
|
XCTAssertEqual(response.status, .ok)
|
|
|
|
XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { httpSocketPath in
|
|
XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { httpsSocketPath in
|
|
let socketHTTPBin = HTTPBin(bindTarget: .unixDomainSocket(httpSocketPath))
|
|
let socketHTTPSBin = HTTPBin(ssl: true, bindTarget: .unixDomainSocket(httpsSocketPath))
|
|
defer {
|
|
XCTAssertNoThrow(try socketHTTPBin.shutdown())
|
|
XCTAssertNoThrow(try socketHTTPSBin.shutdown())
|
|
}
|
|
|
|
// From HTTP or HTTPS to HTTP+UNIX should fail to redirect
|
|
var targetURL = "http+unix://\(httpSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/ok"
|
|
var request = try Request(url: self.defaultHTTPBinURLPrefix + "redirect/target", method: .GET, headers: ["X-Target-Redirect-URL": targetURL], body: nil)
|
|
|
|
var response = try localClient.execute(request: request).wait()
|
|
XCTAssertEqual(response.status, .found)
|
|
XCTAssertEqual(response.headers.first(name: "Location"), targetURL)
|
|
|
|
request = try Request(url: "https://localhost:\(httpsBin.port)/redirect/target", method: .GET, headers: ["X-Target-Redirect-URL": targetURL], body: nil)
|
|
|
|
response = try localClient.execute(request: request).wait()
|
|
XCTAssertEqual(response.status, .found)
|
|
XCTAssertEqual(response.headers.first(name: "Location"), targetURL)
|
|
|
|
// From HTTP or HTTPS to HTTPS+UNIX should also fail to redirect
|
|
targetURL = "https+unix://\(httpsSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/ok"
|
|
request = try Request(url: self.defaultHTTPBinURLPrefix + "redirect/target", method: .GET, headers: ["X-Target-Redirect-URL": targetURL], body: nil)
|
|
|
|
response = try localClient.execute(request: request).wait()
|
|
XCTAssertEqual(response.status, .found)
|
|
XCTAssertEqual(response.headers.first(name: "Location"), targetURL)
|
|
|
|
request = try Request(url: "https://localhost:\(httpsBin.port)/redirect/target", method: .GET, headers: ["X-Target-Redirect-URL": targetURL], body: nil)
|
|
|
|
response = try localClient.execute(request: request).wait()
|
|
XCTAssertEqual(response.status, .found)
|
|
XCTAssertEqual(response.headers.first(name: "Location"), targetURL)
|
|
|
|
// ... while HTTP+UNIX to HTTP, HTTPS, or HTTP(S)+UNIX should succeed
|
|
targetURL = self.defaultHTTPBinURLPrefix + "ok"
|
|
request = try Request(url: "http+unix://\(httpSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/redirect/target", method: .GET, headers: ["X-Target-Redirect-URL": targetURL], body: nil)
|
|
|
|
response = try localClient.execute(request: request).wait()
|
|
XCTAssertEqual(response.status, .ok)
|
|
|
|
targetURL = "https://localhost:\(httpsBin.port)/ok"
|
|
request = try Request(url: "http+unix://\(httpSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/redirect/target", method: .GET, headers: ["X-Target-Redirect-URL": targetURL], body: nil)
|
|
|
|
response = try localClient.execute(request: request).wait()
|
|
XCTAssertEqual(response.status, .ok)
|
|
|
|
targetURL = "http+unix://\(httpSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/ok"
|
|
request = try Request(url: "http+unix://\(httpSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/redirect/target", method: .GET, headers: ["X-Target-Redirect-URL": targetURL], body: nil)
|
|
|
|
response = try localClient.execute(request: request).wait()
|
|
XCTAssertEqual(response.status, .ok)
|
|
|
|
targetURL = "https+unix://\(httpsSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/ok"
|
|
request = try Request(url: "http+unix://\(httpSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/redirect/target", method: .GET, headers: ["X-Target-Redirect-URL": targetURL], body: nil)
|
|
|
|
response = try localClient.execute(request: request).wait()
|
|
XCTAssertEqual(response.status, .ok)
|
|
|
|
// ... and HTTPS+UNIX to HTTP, HTTPS, or HTTP(S)+UNIX should succeed
|
|
targetURL = self.defaultHTTPBinURLPrefix + "ok"
|
|
request = try Request(url: "https+unix://\(httpsSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/redirect/target", method: .GET, headers: ["X-Target-Redirect-URL": targetURL], body: nil)
|
|
|
|
response = try localClient.execute(request: request).wait()
|
|
XCTAssertEqual(response.status, .ok)
|
|
|
|
targetURL = "https://localhost:\(httpsBin.port)/ok"
|
|
request = try Request(url: "https+unix://\(httpsSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/redirect/target", method: .GET, headers: ["X-Target-Redirect-URL": targetURL], body: nil)
|
|
|
|
response = try localClient.execute(request: request).wait()
|
|
XCTAssertEqual(response.status, .ok)
|
|
|
|
targetURL = "http+unix://\(httpSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/ok"
|
|
request = try Request(url: "https+unix://\(httpsSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/redirect/target", method: .GET, headers: ["X-Target-Redirect-URL": targetURL], body: nil)
|
|
|
|
response = try localClient.execute(request: request).wait()
|
|
XCTAssertEqual(response.status, .ok)
|
|
|
|
targetURL = "https+unix://\(httpsSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/ok"
|
|
request = try Request(url: "https+unix://\(httpsSocketPath.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)/redirect/target", method: .GET, headers: ["X-Target-Redirect-URL": targetURL], body: nil)
|
|
|
|
response = try localClient.execute(request: request).wait()
|
|
XCTAssertEqual(response.status, .ok)
|
|
})
|
|
})
|
|
}
|
|
|
|
func testHttpHostRedirect() {
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none, redirectConfiguration: .follow(max: 10, allowCycles: true)))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
}
|
|
|
|
let url = self.defaultHTTPBinURLPrefix + "redirect/loopback?port=\(self.defaultHTTPBin.port)"
|
|
var maybeResponse: HTTPClient.Response?
|
|
XCTAssertNoThrow(maybeResponse = try localClient.get(url: url).wait())
|
|
guard let response = maybeResponse, let body = response.body else {
|
|
XCTFail("request failed")
|
|
return
|
|
}
|
|
let hostName = try? JSONDecoder().decode(RequestInfo.self, from: body).data
|
|
XCTAssertEqual("127.0.0.1:\(self.defaultHTTPBin.port)", hostName)
|
|
}
|
|
|
|
func testPercentEncoded() throws {
|
|
let response = try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "percent%20encoded").wait()
|
|
XCTAssertEqual(.ok, response.status)
|
|
}
|
|
|
|
func testPercentEncodedBackslash() throws {
|
|
let response = try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "percent%2Fencoded/hello").wait()
|
|
XCTAssertEqual(.ok, response.status)
|
|
}
|
|
|
|
func testMultipleContentLengthHeaders() throws {
|
|
let body = ByteBuffer(string: "hello world!")
|
|
|
|
var headers = HTTPHeaders()
|
|
headers.add(name: "Content-Length", value: "12")
|
|
let request = try Request(url: self.defaultHTTPBinURLPrefix + "post", method: .POST, headers: headers, body: .byteBuffer(body))
|
|
let response = try self.defaultClient.execute(request: request).wait()
|
|
// if the library adds another content length header we'll get a bad request error.
|
|
XCTAssertEqual(.ok, response.status)
|
|
}
|
|
|
|
func testStreaming() throws {
|
|
var request = try Request(url: self.defaultHTTPBinURLPrefix + "events/10/1")
|
|
request.headers.add(name: "Accept", value: "text/event-stream")
|
|
|
|
let delegate = CountingDelegate()
|
|
let count = try self.defaultClient.execute(request: request, delegate: delegate).wait()
|
|
|
|
XCTAssertEqual(10, count)
|
|
}
|
|
|
|
func testRemoteClose() throws {
|
|
XCTAssertThrowsError(try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "close").wait(), "Should fail") { error in
|
|
guard case let error = error as? HTTPClientError, error == .remoteConnectionClosed else {
|
|
return XCTFail("Should fail with remoteConnectionClosed")
|
|
}
|
|
}
|
|
}
|
|
|
|
func testReadTimeout() throws {
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(timeout: HTTPClient.Configuration.Timeout(read: .milliseconds(150))))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
}
|
|
|
|
XCTAssertThrowsError(try localClient.get(url: self.defaultHTTPBinURLPrefix + "wait").wait(), "Should fail") { error in
|
|
guard case let error = error as? HTTPClientError, error == .readTimeout else {
|
|
return XCTFail("Should fail with readTimeout")
|
|
}
|
|
}
|
|
}
|
|
|
|
func testConnectTimeout() throws {
|
|
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: .init(timeout: .init(connect: .milliseconds(100), read: .milliseconds(150))))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try httpClient.syncShutdown())
|
|
}
|
|
|
|
// This must throw as 198.51.100.254 is reserved for documentation only
|
|
XCTAssertThrowsError(try httpClient.get(url: "http://198.51.100.254:65535/get").wait()) { error in
|
|
XCTAssertEqual(.connectTimeout(.milliseconds(100)), error as? ChannelError)
|
|
}
|
|
}
|
|
|
|
func testDeadline() throws {
|
|
XCTAssertThrowsError(try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "wait", deadline: .now() + .milliseconds(150)).wait(), "Should fail") { error in
|
|
guard case let error = error as? HTTPClientError, error == .readTimeout else {
|
|
return XCTFail("Should fail with readTimeout")
|
|
}
|
|
}
|
|
}
|
|
|
|
func testCancel() throws {
|
|
let queue = DispatchQueue(label: "nio-test")
|
|
let request = try Request(url: self.defaultHTTPBinURLPrefix + "wait")
|
|
let task = self.defaultClient.execute(request: request, delegate: TestHTTPDelegate())
|
|
|
|
queue.asyncAfter(deadline: .now() + .milliseconds(100)) {
|
|
task.cancel()
|
|
}
|
|
|
|
XCTAssertThrowsError(try task.wait(), "Should fail") { error in
|
|
guard case let error = error as? HTTPClientError, error == .cancelled else {
|
|
return XCTFail("Should fail with cancelled")
|
|
}
|
|
}
|
|
}
|
|
|
|
func testStressCancel() throws {
|
|
let request = try Request(url: self.defaultHTTPBinURLPrefix + "wait", method: .GET)
|
|
let tasks = (1...100).map { _ -> HTTPClient.Task<TestHTTPDelegate.Response> in
|
|
let task = self.defaultClient.execute(request: request, delegate: TestHTTPDelegate())
|
|
task.cancel()
|
|
return task
|
|
}
|
|
|
|
for task in tasks {
|
|
switch (Result { try task.futureResult.timeout(after: .seconds(10)).wait() }) {
|
|
case .success:
|
|
XCTFail("Shouldn't succeed")
|
|
return
|
|
case .failure(let error):
|
|
guard let clientError = error as? HTTPClientError, clientError == .cancelled else {
|
|
XCTFail("Unexpected error: \(error)")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func testHTTPClientAuthorization() {
|
|
var authorization = HTTPClient.Authorization.basic(username: "aladdin", password: "opensesame")
|
|
XCTAssertEqual(authorization.headerValue, "Basic YWxhZGRpbjpvcGVuc2VzYW1l")
|
|
|
|
authorization = HTTPClient.Authorization.bearer(tokens: "mF_9.B5f-4.1JqM")
|
|
XCTAssertEqual(authorization.headerValue, "Bearer mF_9.B5f-4.1JqM")
|
|
}
|
|
|
|
func testProxyPlaintext() throws {
|
|
let localHTTPBin = HTTPBin(simulateProxy: .plaintext)
|
|
let localClient = HTTPClient(
|
|
eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: .init(proxy: .server(host: "localhost", port: localHTTPBin.port))
|
|
)
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
let res = try localClient.get(url: "http://test/ok").wait()
|
|
XCTAssertEqual(res.status, .ok)
|
|
}
|
|
|
|
func testProxyTLS() throws {
|
|
let localHTTPBin = HTTPBin(simulateProxy: .tls)
|
|
let localClient = HTTPClient(
|
|
eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: .init(
|
|
certificateVerification: .none,
|
|
proxy: .server(host: "localhost", port: localHTTPBin.port)
|
|
)
|
|
)
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
let res = try localClient.get(url: "https://test/ok").wait()
|
|
XCTAssertEqual(res.status, .ok)
|
|
}
|
|
|
|
func testProxyPlaintextWithCorrectlyAuthorization() throws {
|
|
let localHTTPBin = HTTPBin(simulateProxy: .plaintext)
|
|
let localClient = HTTPClient(
|
|
eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: .init(proxy: .server(host: "localhost", port: localHTTPBin.port, authorization: .basic(username: "aladdin", password: "opensesame")))
|
|
)
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
let res = try localClient.get(url: "http://test/ok").wait()
|
|
XCTAssertEqual(res.status, .ok)
|
|
}
|
|
|
|
func testProxyPlaintextWithIncorrectlyAuthorization() throws {
|
|
let localHTTPBin = HTTPBin(simulateProxy: .plaintext)
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: .init(proxy: .server(host: "localhost",
|
|
port: localHTTPBin.port,
|
|
authorization: .basic(username: "aladdin",
|
|
password: "opensesamefoo"))))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
XCTAssertThrowsError(try localClient.get(url: "http://test/ok").wait(), "Should fail") { error in
|
|
guard case let error = error as? HTTPClientError, error == .proxyAuthenticationRequired else {
|
|
return XCTFail("Should fail with HTTPClientError.proxyAuthenticationRequired")
|
|
}
|
|
}
|
|
}
|
|
|
|
func testUploadStreaming() throws {
|
|
let body: HTTPClient.Body = .stream(length: 8) { writer in
|
|
let buffer = ByteBuffer(string: "1234")
|
|
return writer.write(.byteBuffer(buffer)).flatMap {
|
|
let buffer = ByteBuffer(string: "4321")
|
|
return writer.write(.byteBuffer(buffer))
|
|
}
|
|
}
|
|
|
|
let response = try self.defaultClient.post(url: self.defaultHTTPBinURLPrefix + "post", body: body).wait()
|
|
let bytes = response.body.flatMap { $0.getData(at: 0, length: $0.readableBytes) }
|
|
let data = try JSONDecoder().decode(RequestInfo.self, from: bytes!)
|
|
|
|
XCTAssertEqual(.ok, response.status)
|
|
XCTAssertEqual("12344321", data.data)
|
|
}
|
|
|
|
func testNoContentLengthForSSLUncleanShutdown() throws {
|
|
// NIOTS deals with ssl unclean shutdown internally
|
|
guard !isTestingNIOTS() else { return }
|
|
|
|
let localHTTPBin = HttpBinForSSLUncleanShutdown()
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
localHTTPBin.shutdown()
|
|
}
|
|
|
|
XCTAssertThrowsError(try localClient.get(url: "https://localhost:\(localHTTPBin.port)/nocontentlength").wait(), "Should fail") { error in
|
|
guard case let error = error as? NIOSSLError, error == .uncleanShutdown else {
|
|
return XCTFail("Should fail with NIOSSLError.uncleanShutdown")
|
|
}
|
|
}
|
|
}
|
|
|
|
func testNoContentLengthWithIgnoreErrorForSSLUncleanShutdown() throws {
|
|
// NIOTS deals with ssl unclean shutdown internally
|
|
guard !isTestingNIOTS() else { return }
|
|
|
|
let localHTTPBin = HttpBinForSSLUncleanShutdown()
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none, ignoreUncleanSSLShutdown: true))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
localHTTPBin.shutdown()
|
|
}
|
|
|
|
let response = try localClient.get(url: "https://localhost:\(localHTTPBin.port)/nocontentlength").wait()
|
|
let bytes = response.body.flatMap { $0.getData(at: 0, length: $0.readableBytes) }
|
|
let string = String(decoding: bytes!, as: UTF8.self)
|
|
|
|
XCTAssertEqual(.ok, response.status)
|
|
XCTAssertEqual("foo", string)
|
|
}
|
|
|
|
func testCorrectContentLengthForSSLUncleanShutdown() throws {
|
|
// NIOTS deals with ssl unclean shutdown internally
|
|
guard !isTestingNIOTS() else { return }
|
|
|
|
let localHTTPBin = HttpBinForSSLUncleanShutdown()
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
localHTTPBin.shutdown()
|
|
}
|
|
|
|
let response = try localClient.get(url: "https://localhost:\(localHTTPBin.port)/").wait()
|
|
let bytes = response.body.flatMap { $0.getData(at: 0, length: $0.readableBytes) }
|
|
let string = String(decoding: bytes!, as: UTF8.self)
|
|
|
|
XCTAssertEqual(.notFound, response.status)
|
|
XCTAssertEqual("Not Found", string)
|
|
}
|
|
|
|
func testNoContentForSSLUncleanShutdown() throws {
|
|
// NIOTS deals with ssl unclean shutdown internally
|
|
guard !isTestingNIOTS() else { return }
|
|
|
|
let localHTTPBin = HttpBinForSSLUncleanShutdown()
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
localHTTPBin.shutdown()
|
|
}
|
|
|
|
let response = try localClient.get(url: "https://localhost:\(localHTTPBin.port)/nocontent").wait()
|
|
|
|
XCTAssertEqual(.noContent, response.status)
|
|
XCTAssertEqual(response.body, nil)
|
|
}
|
|
|
|
func testNoResponseForSSLUncleanShutdown() throws {
|
|
// NIOTS deals with ssl unclean shutdown internally
|
|
guard !isTestingNIOTS() else { return }
|
|
|
|
let localHTTPBin = HttpBinForSSLUncleanShutdown()
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
localHTTPBin.shutdown()
|
|
}
|
|
|
|
XCTAssertThrowsError(try localClient.get(url: "https://localhost:\(localHTTPBin.port)/noresponse").wait(), "Should fail") { error in
|
|
guard case let sslError = error as? NIOSSLError, sslError == .uncleanShutdown else {
|
|
return XCTFail("Should fail with NIOSSLError.uncleanShutdown")
|
|
}
|
|
}
|
|
}
|
|
|
|
func testNoResponseWithIgnoreErrorForSSLUncleanShutdown() throws {
|
|
// NIOTS deals with ssl unclean shutdown internally
|
|
guard !isTestingNIOTS() else { return }
|
|
|
|
let localHTTPBin = HttpBinForSSLUncleanShutdown()
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none, ignoreUncleanSSLShutdown: true))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
localHTTPBin.shutdown()
|
|
}
|
|
|
|
XCTAssertThrowsError(try localClient.get(url: "https://localhost:\(localHTTPBin.port)/noresponse").wait(), "Should fail") { error in
|
|
guard case let sslError = error as? NIOSSLError, sslError == .uncleanShutdown else {
|
|
return XCTFail("Should fail with NIOSSLError.uncleanShutdown")
|
|
}
|
|
}
|
|
}
|
|
|
|
func testWrongContentLengthForSSLUncleanShutdown() throws {
|
|
// NIOTS deals with ssl unclean shutdown internally
|
|
guard !isTestingNIOTS() else { return }
|
|
|
|
let localHTTPBin = HttpBinForSSLUncleanShutdown()
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
localHTTPBin.shutdown()
|
|
}
|
|
|
|
XCTAssertThrowsError(try localClient.get(url: "https://localhost:\(localHTTPBin.port)/wrongcontentlength").wait(), "Should fail") { error in
|
|
XCTAssertEqual(.uncleanShutdown, error as? NIOSSLError)
|
|
}
|
|
}
|
|
|
|
func testWrongContentLengthWithIgnoreErrorForSSLUncleanShutdown() throws {
|
|
// NIOTS deals with ssl unclean shutdown internally
|
|
guard !isTestingNIOTS() else { return }
|
|
|
|
let localHTTPBin = HttpBinForSSLUncleanShutdown()
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none,
|
|
ignoreUncleanSSLShutdown: true))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
localHTTPBin.shutdown()
|
|
}
|
|
|
|
XCTAssertThrowsError(try localClient.get(url: "https://localhost:\(localHTTPBin.port)/wrongcontentlength").wait(), "Should fail") { error in
|
|
XCTAssertEqual(.invalidEOFState, error as? HTTPParserError)
|
|
}
|
|
}
|
|
|
|
func testEventLoopArgument() throws {
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(redirectConfiguration: .follow(max: 10, allowCycles: true)))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
}
|
|
|
|
class EventLoopValidatingDelegate: HTTPClientResponseDelegate {
|
|
typealias Response = Bool
|
|
|
|
let eventLoop: EventLoop
|
|
var result = false
|
|
|
|
init(eventLoop: EventLoop) {
|
|
self.eventLoop = eventLoop
|
|
}
|
|
|
|
func didReceiveHead(task: HTTPClient.Task<Bool>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
|
|
self.result = task.eventLoop === self.eventLoop
|
|
return task.eventLoop.makeSucceededFuture(())
|
|
}
|
|
|
|
func didFinishRequest(task: HTTPClient.Task<Bool>) throws -> Bool {
|
|
return self.result
|
|
}
|
|
}
|
|
|
|
let eventLoop = self.clientGroup.next()
|
|
let delegate = EventLoopValidatingDelegate(eventLoop: eventLoop)
|
|
var request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get")
|
|
var response = try localClient.execute(request: request, delegate: delegate, eventLoop: .delegate(on: eventLoop)).wait()
|
|
XCTAssertEqual(true, response)
|
|
|
|
// redirect
|
|
request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "redirect/302")
|
|
response = try localClient.execute(request: request, delegate: delegate, eventLoop: .delegate(on: eventLoop)).wait()
|
|
XCTAssertEqual(true, response)
|
|
}
|
|
|
|
func testDecompression() throws {
|
|
let localHTTPBin = HTTPBin(compress: true)
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: .init(decompression: .enabled(limit: .none)))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
|
|
var body = ""
|
|
for _ in 1...1000 {
|
|
body += "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
|
|
}
|
|
|
|
for algorithm in [nil, "gzip", "deflate"] {
|
|
var request = try HTTPClient.Request(url: "http://localhost:\(localHTTPBin.port)/post", method: .POST)
|
|
request.body = .string(body)
|
|
if let algorithm = algorithm {
|
|
request.headers.add(name: "Accept-Encoding", value: algorithm)
|
|
}
|
|
|
|
let response = try localClient.execute(request: request).wait()
|
|
let bytes = response.body!.getData(at: 0, length: response.body!.readableBytes)!
|
|
let data = try JSONDecoder().decode(RequestInfo.self, from: bytes)
|
|
|
|
XCTAssertEqual(.ok, response.status)
|
|
XCTAssertGreaterThan(body.count, response.headers["Content-Length"].first.flatMap { Int($0) }!)
|
|
if let algorithm = algorithm {
|
|
XCTAssertEqual(algorithm, response.headers["Content-Encoding"].first)
|
|
} else {
|
|
XCTAssertEqual("deflate", response.headers["Content-Encoding"].first)
|
|
}
|
|
XCTAssertEqual(body, data.data)
|
|
}
|
|
}
|
|
|
|
func testDecompressionLimit() throws {
|
|
let localHTTPBin = HTTPBin(compress: true)
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup), configuration: .init(decompression: .enabled(limit: .ratio(1))))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
|
|
var request = try HTTPClient.Request(url: "http://localhost:\(localHTTPBin.port)/post", method: .POST)
|
|
request.body = .byteBuffer(ByteBuffer(bytes: [120, 156, 75, 76, 28, 5, 200, 0, 0, 248, 66, 103, 17]))
|
|
request.headers.add(name: "Accept-Encoding", value: "deflate")
|
|
|
|
XCTAssertThrowsError(try localClient.execute(request: request).wait()) { error in
|
|
guard case .some(.limit) = error as? NIOHTTPDecompression.DecompressionError else {
|
|
XCTFail("wrong error: \(error)")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func testLoopDetectionRedirectLimit() throws {
|
|
let localHTTPBin = HTTPBin(ssl: true)
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none, redirectConfiguration: .follow(max: 5, allowCycles: false)))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
|
|
XCTAssertThrowsError(try localClient.get(url: "https://localhost:\(localHTTPBin.port)/redirect/infinite1").wait(), "Should fail with redirect limit") { error in
|
|
XCTAssertEqual(error as? HTTPClientError, HTTPClientError.redirectCycleDetected)
|
|
}
|
|
}
|
|
|
|
func testCountRedirectLimit() throws {
|
|
let localHTTPBin = HTTPBin(ssl: true)
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none, redirectConfiguration: .follow(max: 10, allowCycles: true)))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
|
|
XCTAssertThrowsError(try localClient.get(url: "https://localhost:\(localHTTPBin.port)/redirect/infinite1").wait(), "Should fail with redirect limit") { error in
|
|
XCTAssertEqual(error as? HTTPClientError, HTTPClientError.redirectLimitReached)
|
|
}
|
|
}
|
|
|
|
func testMultipleConcurrentRequests() throws {
|
|
let numberOfRequestsPerThread = 1000
|
|
let numberOfParallelWorkers = 5
|
|
|
|
final class HTTPServer: ChannelInboundHandler {
|
|
typealias InboundIn = HTTPServerRequestPart
|
|
typealias OutboundOut = HTTPServerResponsePart
|
|
|
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
if case .end = self.unwrapInboundIn(data) {
|
|
let responseHead = HTTPServerResponsePart.head(.init(version: .init(major: 1, minor: 1),
|
|
status: .ok))
|
|
context.write(self.wrapOutboundOut(responseHead), promise: nil)
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
|
|
}
|
|
}
|
|
}
|
|
|
|
let group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
|
|
defer {
|
|
XCTAssertNoThrow(try group.syncShutdownGracefully())
|
|
}
|
|
|
|
var server: Channel?
|
|
XCTAssertNoThrow(server = try ServerBootstrap(group: group)
|
|
.serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1)
|
|
.serverChannelOption(ChannelOptions.backlog, value: .init(numberOfParallelWorkers))
|
|
.childChannelInitializer { channel in
|
|
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false,
|
|
withServerUpgrade: nil,
|
|
withErrorHandling: false).flatMap {
|
|
channel.pipeline.addHandler(HTTPServer())
|
|
}
|
|
}
|
|
.bind(to: .init(ipAddress: "127.0.0.1", port: 0))
|
|
.wait())
|
|
defer {
|
|
XCTAssertNoThrow(try server?.close().wait())
|
|
}
|
|
|
|
let g = DispatchGroup()
|
|
for workerID in 0..<numberOfParallelWorkers {
|
|
DispatchQueue(label: "\(#file):\(#line):worker-\(workerID)").async(group: g) {
|
|
func makeRequest() {
|
|
let url = "http://127.0.0.1:\(server?.localAddress?.port ?? -1)/hello"
|
|
XCTAssertNoThrow(try self.defaultClient.get(url: url).wait())
|
|
}
|
|
for _ in 0..<numberOfRequestsPerThread {
|
|
makeRequest()
|
|
}
|
|
}
|
|
}
|
|
let timeout = DispatchTime.now() + .seconds(180)
|
|
switch g.wait(timeout: timeout) {
|
|
case .success:
|
|
break
|
|
case .timedOut:
|
|
XCTFail("Timed out")
|
|
}
|
|
}
|
|
|
|
func testWorksWith500Error() {
|
|
let web = NIOHTTP1TestServer(group: self.serverGroup)
|
|
defer {
|
|
XCTAssertNoThrow(try web.stop())
|
|
}
|
|
let result = self.defaultClient.get(url: "http://localhost:\(web.serverPort)/foo")
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.head(.init(version: .init(major: 1, minor: 1),
|
|
method: .GET,
|
|
uri: "/foo",
|
|
headers: HTTPHeaders([("Host", "localhost:\(web.serverPort)")]))),
|
|
try web.readInbound()))
|
|
XCTAssertNoThrow(XCTAssertEqual(.end(nil),
|
|
try web.readInbound()))
|
|
XCTAssertNoThrow(try web.writeOutbound(.head(.init(version: .init(major: 1, minor: 1),
|
|
status: .internalServerError))))
|
|
XCTAssertNoThrow(try web.writeOutbound(.end(nil)))
|
|
|
|
var response: HTTPClient.Response?
|
|
XCTAssertNoThrow(response = try result.wait())
|
|
XCTAssertEqual(.internalServerError, response?.status)
|
|
XCTAssertNil(response?.body)
|
|
}
|
|
|
|
func testWorksWithHTTP10Response() {
|
|
let web = NIOHTTP1TestServer(group: self.serverGroup)
|
|
defer {
|
|
XCTAssertNoThrow(try web.stop())
|
|
}
|
|
let result = self.defaultClient.get(url: "http://localhost:\(web.serverPort)/foo")
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.head(.init(version: .init(major: 1, minor: 1),
|
|
method: .GET,
|
|
uri: "/foo",
|
|
headers: HTTPHeaders([("Host", "localhost:\(web.serverPort)")]))),
|
|
try web.readInbound()))
|
|
XCTAssertNoThrow(XCTAssertEqual(.end(nil),
|
|
try web.readInbound()))
|
|
XCTAssertNoThrow(try web.writeOutbound(.head(.init(version: .init(major: 1, minor: 0),
|
|
status: .internalServerError))))
|
|
XCTAssertNoThrow(try web.writeOutbound(.end(nil)))
|
|
|
|
var response: HTTPClient.Response?
|
|
XCTAssertNoThrow(response = try result.wait())
|
|
XCTAssertEqual(.internalServerError, response?.status)
|
|
XCTAssertNil(response?.body)
|
|
}
|
|
|
|
func testWorksWhenServerClosesConnectionAfterReceivingRequest() {
|
|
let web = NIOHTTP1TestServer(group: self.serverGroup)
|
|
let result = self.defaultClient.get(url: "http://localhost:\(web.serverPort)/foo")
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.head(.init(version: .init(major: 1, minor: 1),
|
|
method: .GET,
|
|
uri: "/foo",
|
|
headers: HTTPHeaders([("Host", "localhost:\(web.serverPort)")]))),
|
|
try web.readInbound()))
|
|
XCTAssertNoThrow(XCTAssertEqual(.end(nil),
|
|
try web.readInbound()))
|
|
XCTAssertNoThrow(try web.stop())
|
|
|
|
XCTAssertThrowsError(try result.wait()) { error in
|
|
XCTAssertEqual(HTTPClientError.remoteConnectionClosed, error as? HTTPClientError)
|
|
}
|
|
}
|
|
|
|
func testSubsequentRequestsWorkWithServerSendingConnectionClose() {
|
|
let web = NIOHTTP1TestServer(group: self.serverGroup)
|
|
defer {
|
|
XCTAssertNoThrow(try web.stop())
|
|
}
|
|
|
|
for _ in 0..<10 {
|
|
let result = self.defaultClient.get(url: "http://localhost:\(web.serverPort)/foo")
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.head(.init(version: .init(major: 1, minor: 1),
|
|
method: .GET,
|
|
uri: "/foo",
|
|
headers: HTTPHeaders([("Host", "localhost:\(web.serverPort)")]))),
|
|
try web.readInbound()))
|
|
XCTAssertNoThrow(XCTAssertEqual(.end(nil),
|
|
try web.readInbound()))
|
|
XCTAssertNoThrow(try web.writeOutbound(.head(.init(version: .init(major: 1, minor: 0),
|
|
status: .ok,
|
|
headers: HTTPHeaders([("connection", "close")])))))
|
|
XCTAssertNoThrow(try web.writeOutbound(.end(nil)))
|
|
|
|
var response: HTTPClient.Response?
|
|
XCTAssertNoThrow(response = try result.wait())
|
|
XCTAssertEqual(.ok, response?.status)
|
|
XCTAssertNil(response?.body)
|
|
}
|
|
}
|
|
|
|
func testSubsequentRequestsWorkWithServerAlternatingBetweenKeepAliveAndClose() {
|
|
let web = NIOHTTP1TestServer(group: self.serverGroup)
|
|
defer {
|
|
XCTAssertNoThrow(try web.stop())
|
|
}
|
|
|
|
for i in 0..<10 {
|
|
let result = self.defaultClient.get(url: "http://localhost:\(web.serverPort)/foo")
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.head(.init(version: .init(major: 1, minor: 1),
|
|
method: .GET,
|
|
uri: "/foo",
|
|
headers: HTTPHeaders([("Host", "localhost:\(web.serverPort)")]))),
|
|
try web.readInbound()))
|
|
XCTAssertNoThrow(XCTAssertEqual(.end(nil),
|
|
try web.readInbound()))
|
|
XCTAssertNoThrow(try web.writeOutbound(.head(.init(version: .init(major: 1, minor: 0),
|
|
status: .ok,
|
|
headers: HTTPHeaders([("connection",
|
|
i % 2 == 0 ? "close" : "keep-alive")])))))
|
|
XCTAssertNoThrow(try web.writeOutbound(.end(nil)))
|
|
|
|
var response: HTTPClient.Response?
|
|
XCTAssertNoThrow(response = try result.wait())
|
|
XCTAssertEqual(.ok, response?.status)
|
|
XCTAssertNil(response?.body)
|
|
}
|
|
}
|
|
|
|
func testStressGetHttps() throws {
|
|
let localHTTPBin = HTTPBin(ssl: true)
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
|
|
let eventLoop = localClient.eventLoopGroup.next()
|
|
let requestCount = 200
|
|
var futureResults = [EventLoopFuture<HTTPClient.Response>]()
|
|
for _ in 1...requestCount {
|
|
let req = try HTTPClient.Request(url: "https://localhost:\(localHTTPBin.port)/get", method: .GET, headers: ["X-internal-delay": "100"])
|
|
futureResults.append(localClient.execute(request: req))
|
|
}
|
|
XCTAssertNoThrow(try EventLoopFuture<HTTPClient.Response>.andAllSucceed(futureResults, on: eventLoop).wait())
|
|
}
|
|
|
|
func testStressGetHttpsSSLError() throws {
|
|
let request = try Request(url: "https://localhost:\(self.defaultHTTPBin.port)/wait", method: .GET)
|
|
let tasks = (1...100).map { _ -> HTTPClient.Task<TestHTTPDelegate.Response> in
|
|
self.defaultClient.execute(request: request, delegate: TestHTTPDelegate())
|
|
}
|
|
|
|
let results = try EventLoopFuture<TestHTTPDelegate.Response>.whenAllComplete(tasks.map { $0.futureResult }, on: self.defaultClient.eventLoopGroup.next()).wait()
|
|
|
|
for result in results {
|
|
switch result {
|
|
case .success:
|
|
XCTFail("Shouldn't succeed")
|
|
continue
|
|
case .failure(let error):
|
|
if isTestingNIOTS() {
|
|
#if canImport(Network)
|
|
guard let clientError = error as? HTTPClient.NWTLSError else {
|
|
XCTFail("Unexpected error: \(error)")
|
|
continue
|
|
}
|
|
// We're speaking TLS to a plain text server. This will cause the handshake to fail but given
|
|
// that the bytes "HTTP/1.1" aren't the start of a valid TLS packet, we can also get
|
|
// errSSLPeerProtocolVersion because the first bytes contain the version.
|
|
XCTAssert(clientError.status == errSSLHandshakeFail ||
|
|
clientError.status == errSSLPeerProtocolVersion,
|
|
"unexpected NWTLSError with status \(clientError.status)")
|
|
#endif
|
|
} else {
|
|
guard let clientError = error as? NIOSSLError, case NIOSSLError.handshakeFailed = clientError else {
|
|
XCTFail("Unexpected error: \(error)")
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func testFailingConnectionIsReleased() {
|
|
let localHTTPBin = HTTPBin(refusesConnections: true)
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
do {
|
|
_ = try localClient.get(url: "http://localhost:\(localHTTPBin.port)/get").timeout(after: .seconds(5)).wait()
|
|
XCTFail("Shouldn't succeed")
|
|
} catch {
|
|
guard !(error is EventLoopFutureTimeoutError) else {
|
|
XCTFail("Timed out but should have failed immediately")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func testResponseDelayGet() throws {
|
|
let req = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get",
|
|
method: .GET,
|
|
headers: ["X-internal-delay": "2000"],
|
|
body: nil)
|
|
let start = Date()
|
|
let response = try! self.defaultClient.execute(request: req).wait()
|
|
XCTAssertGreaterThan(Date().timeIntervalSince(start), 2)
|
|
XCTAssertEqual(response.status, .ok)
|
|
}
|
|
|
|
func testIdleTimeoutNoReuse() throws {
|
|
var req = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .GET)
|
|
XCTAssertNoThrow(try self.defaultClient.execute(request: req, deadline: .now() + .seconds(2)).wait())
|
|
req.headers.add(name: "X-internal-delay", value: "2500")
|
|
try self.defaultClient.eventLoopGroup.next().scheduleTask(in: .milliseconds(250)) {}.futureResult.wait()
|
|
XCTAssertNoThrow(try self.defaultClient.execute(request: req).timeout(after: .seconds(10)).wait())
|
|
}
|
|
|
|
func testStressGetClose() throws {
|
|
let eventLoop = self.defaultClient.eventLoopGroup.next()
|
|
let requestCount = 200
|
|
var futureResults = [EventLoopFuture<HTTPClient.Response>]()
|
|
for _ in 1...requestCount {
|
|
let req = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get",
|
|
method: .GET,
|
|
headers: ["X-internal-delay": "5", "Connection": "close"])
|
|
futureResults.append(self.defaultClient.execute(request: req))
|
|
}
|
|
XCTAssertNoThrow(try EventLoopFuture<HTTPClient.Response>.andAllComplete(futureResults, on: eventLoop)
|
|
.timeout(after: .seconds(10)).wait())
|
|
}
|
|
|
|
func testManyConcurrentRequestsWork() {
|
|
let numberOfWorkers = 20
|
|
let numberOfRequestsPerWorkers = 20
|
|
let allWorkersReady = DispatchSemaphore(value: 0)
|
|
let allWorkersGo = DispatchSemaphore(value: 0)
|
|
let allDone = DispatchGroup()
|
|
|
|
let url = self.defaultHTTPBinURLPrefix + "get"
|
|
XCTAssertNoThrow(XCTAssertEqual(.ok, try self.defaultClient.get(url: url).wait().status))
|
|
|
|
for w in 0..<numberOfWorkers {
|
|
let q = DispatchQueue(label: "worker \(w)")
|
|
q.async(group: allDone) {
|
|
func go() {
|
|
allWorkersReady.signal() // tell the driver we're ready
|
|
allWorkersGo.wait() // wait for the driver to let us go
|
|
|
|
for _ in 0..<numberOfRequestsPerWorkers {
|
|
XCTAssertNoThrow(XCTAssertEqual(.ok, try self.defaultClient.get(url: url).wait().status))
|
|
}
|
|
}
|
|
go()
|
|
}
|
|
}
|
|
|
|
for _ in 0..<numberOfWorkers {
|
|
allWorkersReady.wait()
|
|
}
|
|
// now all workers should be waiting for the go signal
|
|
|
|
for _ in 0..<numberOfWorkers {
|
|
allWorkersGo.signal()
|
|
}
|
|
// all workers should be running, let's wait for them to finish
|
|
allDone.wait()
|
|
}
|
|
|
|
func testRepeatedRequestsWorkWhenServerAlwaysCloses() {
|
|
let web = NIOHTTP1TestServer(group: self.serverGroup)
|
|
defer {
|
|
XCTAssertNoThrow(try web.stop())
|
|
}
|
|
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
}
|
|
|
|
for _ in 0..<10 {
|
|
let result = localClient.get(url: "http://localhost:\(web.serverPort)/foo")
|
|
XCTAssertNoThrow(XCTAssertEqual(.head(.init(version: .init(major: 1, minor: 1),
|
|
method: .GET,
|
|
uri: "/foo",
|
|
headers: HTTPHeaders([("Host", "localhost:\(web.serverPort)")]))),
|
|
try web.readInbound()))
|
|
XCTAssertNoThrow(XCTAssertEqual(.end(nil),
|
|
try web.readInbound()))
|
|
XCTAssertNoThrow(try web.writeOutbound(.head(.init(version: .init(major: 1, minor: 1),
|
|
status: .ok,
|
|
headers: HTTPHeaders([("CoNnEcTiOn", "cLoSe")])))))
|
|
XCTAssertNoThrow(try web.writeOutbound(.end(nil)))
|
|
|
|
var response: HTTPClient.Response?
|
|
XCTAssertNoThrow(response = try result.wait())
|
|
XCTAssertEqual(.ok, response?.status)
|
|
XCTAssertNil(response?.body)
|
|
}
|
|
}
|
|
|
|
func testShutdownBeforeTasksCompletion() throws {
|
|
let client = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
|
|
let req = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .GET, headers: ["X-internal-delay": "500"])
|
|
let res = client.execute(request: req)
|
|
XCTAssertNoThrow(try client.syncShutdown())
|
|
_ = try? res.timeout(after: .seconds(2)).wait()
|
|
}
|
|
|
|
/// This test would cause an assertion failure on `HTTPClient` deinit if client doesn't actually shutdown
|
|
func testUncleanShutdownActuallyShutsDown() throws {
|
|
let client = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
|
|
let req = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .GET, headers: ["X-internal-delay": "200"])
|
|
_ = client.execute(request: req)
|
|
try? client.syncShutdown()
|
|
}
|
|
|
|
func testUncleanShutdownCancelsTasks() throws {
|
|
let client = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
|
|
|
|
let responses = (1...100).map { _ in
|
|
client.get(url: self.defaultHTTPBinURLPrefix + "wait")
|
|
}
|
|
|
|
try client.syncShutdown()
|
|
|
|
let results = try EventLoopFuture.whenAllComplete(responses, on: self.clientGroup.next()).timeout(after: .seconds(100)).wait()
|
|
|
|
for result in results {
|
|
switch result {
|
|
case .success:
|
|
XCTFail("Shouldn't succeed")
|
|
case .failure(let error):
|
|
if let clientError = error as? HTTPClientError, clientError == .cancelled {
|
|
continue
|
|
} else {
|
|
XCTFail("Unexpected error: \(error)")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func testDoubleShutdown() {
|
|
let client = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
|
|
XCTAssertNoThrow(try client.syncShutdown())
|
|
do {
|
|
try client.syncShutdown()
|
|
XCTFail("Shutdown should fail with \(HTTPClientError.alreadyShutdown)")
|
|
} catch {
|
|
guard let clientError = error as? HTTPClientError, clientError == .alreadyShutdown else {
|
|
XCTFail("Unexpected error: \(error) instead of \(HTTPClientError.alreadyShutdown)")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func testTaskFailsWhenClientIsShutdown() {
|
|
let client = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
|
|
XCTAssertNoThrow(try client.syncShutdown())
|
|
do {
|
|
_ = try client.get(url: "http://localhost/").wait()
|
|
XCTFail("Request shouldn't succeed")
|
|
} catch {
|
|
if let error = error as? HTTPClientError, error == .alreadyShutdown {
|
|
return
|
|
} else {
|
|
XCTFail("Unexpected error: \(error)")
|
|
}
|
|
}
|
|
}
|
|
|
|
func testRaceNewRequestsVsShutdown() {
|
|
let numberOfWorkers = 20
|
|
let allWorkersReady = DispatchSemaphore(value: 0)
|
|
let allWorkersGo = DispatchSemaphore(value: 0)
|
|
let allDone = DispatchGroup()
|
|
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
|
|
defer {
|
|
XCTAssertThrowsError(try localClient.syncShutdown()) { error in
|
|
XCTAssertEqual(.alreadyShutdown, error as? HTTPClientError)
|
|
}
|
|
}
|
|
|
|
let url = self.defaultHTTPBinURLPrefix + "get"
|
|
XCTAssertNoThrow(XCTAssertEqual(.ok, try localClient.get(url: url).wait().status))
|
|
|
|
for w in 0..<numberOfWorkers {
|
|
let q = DispatchQueue(label: "worker \(w)")
|
|
q.async(group: allDone) {
|
|
func go() {
|
|
allWorkersReady.signal() // tell the driver we're ready
|
|
allWorkersGo.wait() // wait for the driver to let us go
|
|
|
|
do {
|
|
while true {
|
|
let result = try localClient.get(url: url).wait().status
|
|
XCTAssertEqual(.ok, result)
|
|
}
|
|
} catch {
|
|
// ok, we failed, pool probably shutdown
|
|
if let clientError = error as? HTTPClientError, clientError == .cancelled || clientError == .alreadyShutdown {
|
|
return
|
|
} else {
|
|
XCTFail("Unexpected error: \(error)")
|
|
}
|
|
}
|
|
}
|
|
go()
|
|
}
|
|
}
|
|
|
|
for _ in 0..<numberOfWorkers {
|
|
allWorkersReady.wait()
|
|
}
|
|
// now all workers should be waiting for the go signal
|
|
|
|
for _ in 0..<numberOfWorkers {
|
|
allWorkersGo.signal()
|
|
}
|
|
Thread.sleep(until: .init(timeIntervalSinceNow: 0.2))
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
// all workers should be running, let's wait for them to finish
|
|
allDone.wait()
|
|
}
|
|
|
|
func testVaryingLoopPreference() throws {
|
|
let elg = getDefaultEventLoopGroup(numberOfThreads: 2)
|
|
let first = elg.next()
|
|
let second = elg.next()
|
|
XCTAssert(first !== second)
|
|
let client = HTTPClient(eventLoopGroupProvider: .shared(elg))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try client.syncShutdown())
|
|
XCTAssertNoThrow(try elg.syncShutdownGracefully())
|
|
}
|
|
|
|
var futureResults = [EventLoopFuture<HTTPClient.Response>]()
|
|
for i in 1...100 {
|
|
let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .GET, headers: ["X-internal-delay": "10"])
|
|
let preference: HTTPClient.EventLoopPreference
|
|
if i <= 50 {
|
|
preference = .delegateAndChannel(on: first)
|
|
} else {
|
|
preference = .delegateAndChannel(on: second)
|
|
}
|
|
futureResults.append(client.execute(request: request, eventLoop: preference))
|
|
}
|
|
|
|
let results = try EventLoopFuture.whenAllComplete(futureResults, on: elg.next()).wait()
|
|
|
|
for result in results {
|
|
switch result {
|
|
case .success:
|
|
break
|
|
case .failure(let error):
|
|
XCTFail("Unexpected error: \(error)")
|
|
}
|
|
}
|
|
}
|
|
|
|
func testMakeSecondRequestDuringCancelledCallout() {
|
|
let el = self.clientGroup.next()
|
|
let web = NIOHTTP1TestServer(group: self.serverGroup.next())
|
|
defer {
|
|
// This will throw as we've started the request but haven't fulfilled it.
|
|
XCTAssertThrowsError(try web.stop())
|
|
}
|
|
|
|
let url = "http://127.0.0.1:\(web.serverPort)"
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(el))
|
|
defer {
|
|
XCTAssertThrowsError(try localClient.syncShutdown()) { error in
|
|
XCTAssertEqual(.alreadyShutdown, error as? HTTPClientError)
|
|
}
|
|
}
|
|
|
|
let seenError = DispatchGroup()
|
|
seenError.enter()
|
|
var maybeSecondRequest: EventLoopFuture<HTTPClient.Response>?
|
|
XCTAssertNoThrow(maybeSecondRequest = try el.submit {
|
|
let neverSucceedingRequest = localClient.get(url: url)
|
|
let secondRequest = neverSucceedingRequest.flatMapError { error in
|
|
XCTAssertEqual(.cancelled, error as? HTTPClientError)
|
|
seenError.leave()
|
|
return localClient.get(url: url) // <== this is the main part, during the error callout, we call back in
|
|
}
|
|
return secondRequest
|
|
}.wait())
|
|
|
|
guard let secondRequest = maybeSecondRequest else {
|
|
XCTFail("couldn't get request future")
|
|
return
|
|
}
|
|
|
|
// Let's pull out the request .head so we know the request has started (but nothing else)
|
|
XCTAssertNoThrow(XCTAssertNotNil(try web.readInbound()))
|
|
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
|
|
seenError.wait()
|
|
XCTAssertThrowsError(try secondRequest.wait()) { error in
|
|
XCTAssertEqual(.alreadyShutdown, error as? HTTPClientError)
|
|
}
|
|
}
|
|
|
|
func testMakeSecondRequestDuringSuccessCallout() {
|
|
let el = self.clientGroup.next()
|
|
let url = "http://127.0.0.1:\(self.defaultHTTPBin.port)/get"
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(el))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
}
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.ok,
|
|
try el.flatSubmit { () -> EventLoopFuture<HTTPClient.Response> in
|
|
localClient.get(url: url).flatMap { firstResponse in
|
|
XCTAssertEqual(.ok, firstResponse.status)
|
|
return localClient.get(url: url) // <== interesting bit here
|
|
}
|
|
}.wait().status))
|
|
}
|
|
|
|
func testMakeSecondRequestWhilstFirstIsOngoing() {
|
|
let web = NIOHTTP1TestServer(group: self.serverGroup)
|
|
defer {
|
|
XCTAssertNoThrow(try web.stop())
|
|
}
|
|
|
|
let client = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
|
|
defer {
|
|
XCTAssertNoThrow(try client.syncShutdown())
|
|
}
|
|
|
|
let url = "http://127.0.0.1:\(web.serverPort)"
|
|
let firstRequest = client.get(url: url)
|
|
|
|
XCTAssertNoThrow(XCTAssertNotNil(try web.readInbound())) // first request: .head
|
|
|
|
// Now, the first request is ongoing but not complete, let's start a second one
|
|
let secondRequest = client.get(url: url)
|
|
XCTAssertNoThrow(XCTAssertEqual(.end(nil), try web.readInbound())) // first request: .end
|
|
|
|
XCTAssertNoThrow(try web.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .ok))))
|
|
XCTAssertNoThrow(try web.writeOutbound(.end(nil)))
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.ok, try firstRequest.wait().status))
|
|
|
|
// Okay, first request done successfully, let's do the second one too.
|
|
XCTAssertNoThrow(XCTAssertNotNil(try web.readInbound())) // first request: .head
|
|
XCTAssertNoThrow(XCTAssertEqual(.end(nil), try web.readInbound())) // first request: .end
|
|
|
|
XCTAssertNoThrow(try web.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .created))))
|
|
XCTAssertNoThrow(try web.writeOutbound(.end(nil)))
|
|
XCTAssertNoThrow(XCTAssertEqual(.created, try secondRequest.wait().status))
|
|
}
|
|
|
|
func testUDSBasic() {
|
|
// This tests just connecting to a URL where the whole URL is the UNIX domain socket path like
|
|
// unix:///this/is/my/socket.sock
|
|
// We don't really have a path component, so we'll have to use "/"
|
|
XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { path in
|
|
let localHTTPBin = HTTPBin(bindTarget: .unixDomainSocket(path))
|
|
defer {
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
let target = "unix://\(path)"
|
|
XCTAssertNoThrow(XCTAssertEqual(["Yes"[...]],
|
|
try self.defaultClient.get(url: target).wait().headers[canonicalForm: "X-Is-This-Slash"]))
|
|
})
|
|
}
|
|
|
|
func testUDSSocketAndPath() {
|
|
// Here, we're testing a URL that's encoding two different paths:
|
|
//
|
|
// 1. a "base path" which is the path to the UNIX domain socket
|
|
// 2. an actual path which is the normal path in a regular URL like https://example.com/this/is/the/path
|
|
XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { path in
|
|
let localHTTPBin = HTTPBin(bindTarget: .unixDomainSocket(path))
|
|
defer {
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
guard let target = URL(string: "/echo-uri", relativeTo: URL(string: "unix://\(path)")),
|
|
let request = try? Request(url: target) else {
|
|
XCTFail("couldn't build URL for request")
|
|
return
|
|
}
|
|
XCTAssertNoThrow(XCTAssertEqual(["/echo-uri"[...]],
|
|
try self.defaultClient.execute(request: request).wait().headers[canonicalForm: "X-Calling-URI"]))
|
|
})
|
|
}
|
|
|
|
func testHTTPPlusUNIX() {
|
|
// Here, we're testing a URL where the UNIX domain socket is encoded as the host name
|
|
XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { path in
|
|
let localHTTPBin = HTTPBin(bindTarget: .unixDomainSocket(path))
|
|
defer {
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
guard let target = URL(httpURLWithSocketPath: path, uri: "/echo-uri"),
|
|
let request = try? Request(url: target) else {
|
|
XCTFail("couldn't build URL for request")
|
|
return
|
|
}
|
|
XCTAssertNoThrow(XCTAssertEqual(["/echo-uri"[...]],
|
|
try self.defaultClient.execute(request: request).wait().headers[canonicalForm: "X-Calling-URI"]))
|
|
})
|
|
}
|
|
|
|
func testHTTPSPlusUNIX() {
|
|
// Here, we're testing a URL where the UNIX domain socket is encoded as the host name
|
|
XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { path in
|
|
let localHTTPBin = HTTPBin(ssl: true, bindTarget: .unixDomainSocket(path))
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
}
|
|
guard let target = URL(httpsURLWithSocketPath: path, uri: "/echo-uri"),
|
|
let request = try? Request(url: target) else {
|
|
XCTFail("couldn't build URL for request")
|
|
return
|
|
}
|
|
XCTAssertNoThrow(XCTAssertEqual(["/echo-uri"[...]],
|
|
try localClient.execute(request: request).wait().headers[canonicalForm: "X-Calling-URI"]))
|
|
})
|
|
}
|
|
|
|
func testUseExistingConnectionOnDifferentEL() throws {
|
|
let threadCount = 16
|
|
let elg = getDefaultEventLoopGroup(numberOfThreads: threadCount)
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(elg))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try elg.syncShutdownGracefully())
|
|
}
|
|
|
|
let eventLoops = (1...threadCount).map { _ in elg.next() }
|
|
let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get")
|
|
let closingRequest = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", headers: ["Connection": "close"])
|
|
|
|
for (index, el) in eventLoops.enumerated() {
|
|
if index.isMultiple(of: 2) {
|
|
XCTAssertNoThrow(try localClient.execute(request: request, eventLoop: .delegateAndChannel(on: el)).wait())
|
|
} else {
|
|
XCTAssertNoThrow(try localClient.execute(request: request, eventLoop: .delegateAndChannel(on: el)).wait())
|
|
XCTAssertNoThrow(try localClient.execute(request: closingRequest, eventLoop: .indifferent).wait())
|
|
}
|
|
}
|
|
}
|
|
|
|
func testWeRecoverFromServerThatClosesTheConnectionOnUs() {
|
|
final class ServerThatAcceptsThenRejects: ChannelInboundHandler {
|
|
typealias InboundIn = HTTPServerRequestPart
|
|
typealias OutboundOut = HTTPServerResponsePart
|
|
|
|
let requestNumber: NIOAtomic<Int>
|
|
let connectionNumber: NIOAtomic<Int>
|
|
|
|
init(requestNumber: NIOAtomic<Int>, connectionNumber: NIOAtomic<Int>) {
|
|
self.requestNumber = requestNumber
|
|
self.connectionNumber = connectionNumber
|
|
}
|
|
|
|
func channelActive(context: ChannelHandlerContext) {
|
|
_ = self.connectionNumber.add(1)
|
|
}
|
|
|
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
let req = self.unwrapInboundIn(data)
|
|
|
|
switch req {
|
|
case .head, .body:
|
|
()
|
|
case .end:
|
|
let last = self.requestNumber.add(1)
|
|
switch last {
|
|
case 0, 2:
|
|
context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))),
|
|
promise: nil)
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
|
|
case 1:
|
|
context.close(promise: nil)
|
|
default:
|
|
XCTFail("did not expect request \(last + 1)")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
let requestNumber = NIOAtomic<Int>.makeAtomic(value: 0)
|
|
let connectionNumber = NIOAtomic<Int>.makeAtomic(value: 0)
|
|
let sharedStateServerHandler = ServerThatAcceptsThenRejects(requestNumber: requestNumber,
|
|
connectionNumber: connectionNumber)
|
|
var maybeServer: Channel?
|
|
XCTAssertNoThrow(maybeServer = try ServerBootstrap(group: self.serverGroup)
|
|
.serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1)
|
|
.childChannelInitializer { channel in
|
|
channel.pipeline.configureHTTPServerPipeline().flatMap {
|
|
// We're deliberately adding a handler which is shared between multiple channels. This is normally
|
|
// very verboten but this handler is specially crafted to tolerate this.
|
|
channel.pipeline.addHandler(sharedStateServerHandler)
|
|
}
|
|
}
|
|
.bind(host: "127.0.0.1", port: 0)
|
|
.wait())
|
|
guard let server = maybeServer else {
|
|
XCTFail("couldn't create server")
|
|
return
|
|
}
|
|
defer {
|
|
XCTAssertNoThrow(try server.close().wait())
|
|
}
|
|
|
|
let url = "http://127.0.0.1:\(server.localAddress!.port!)"
|
|
let client = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
|
|
defer {
|
|
XCTAssertNoThrow(try client.syncShutdown())
|
|
}
|
|
|
|
XCTAssertEqual(0, sharedStateServerHandler.connectionNumber.load())
|
|
XCTAssertEqual(0, sharedStateServerHandler.requestNumber.load())
|
|
XCTAssertNoThrow(XCTAssertEqual(.ok, try client.get(url: url).wait().status))
|
|
XCTAssertEqual(1, sharedStateServerHandler.connectionNumber.load())
|
|
XCTAssertEqual(1, sharedStateServerHandler.requestNumber.load())
|
|
XCTAssertThrowsError(try client.get(url: url).wait().status) { error in
|
|
XCTAssertEqual(.remoteConnectionClosed, error as? HTTPClientError)
|
|
}
|
|
XCTAssertEqual(1, sharedStateServerHandler.connectionNumber.load())
|
|
XCTAssertEqual(2, sharedStateServerHandler.requestNumber.load())
|
|
XCTAssertNoThrow(XCTAssertEqual(.ok, try client.get(url: url).wait().status))
|
|
XCTAssertEqual(2, sharedStateServerHandler.connectionNumber.load())
|
|
XCTAssertEqual(3, sharedStateServerHandler.requestNumber.load())
|
|
}
|
|
|
|
func testPoolClosesIdleConnections() {
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: .init(connectionPool: .init(idleTimeout: .milliseconds(100))))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
}
|
|
XCTAssertNoThrow(try localClient.get(url: self.defaultHTTPBinURLPrefix + "get").wait())
|
|
Thread.sleep(forTimeInterval: 0.2)
|
|
XCTAssertEqual(self.defaultHTTPBin.activeConnections, 0)
|
|
}
|
|
|
|
func testRacePoolIdleConnectionsAndGet() {
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: .init(connectionPool: .init(idleTimeout: .milliseconds(10))))
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
}
|
|
for _ in 1...500 {
|
|
XCTAssertNoThrow(try localClient.get(url: self.defaultHTTPBinURLPrefix + "get").wait())
|
|
Thread.sleep(forTimeInterval: 0.01 + .random(in: -0.05...0.05))
|
|
}
|
|
}
|
|
|
|
func testAvoidLeakingTLSHandshakeCompletionPromise() {
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup), configuration: .init(timeout: .init(connect: .milliseconds(100))))
|
|
let localHTTPBin = HTTPBin()
|
|
let port = localHTTPBin.port
|
|
XCTAssertNoThrow(try localHTTPBin.shutdown())
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
}
|
|
|
|
XCTAssertThrowsError(try localClient.get(url: "http://localhost:\(port)").wait()) { error in
|
|
if isTestingNIOTS() {
|
|
guard case ChannelError.connectTimeout = error else {
|
|
XCTFail("Unexpected error: \(error)")
|
|
return
|
|
}
|
|
} else {
|
|
guard error is NIOConnectionError else {
|
|
XCTFail("Unexpected error: \(error)")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func testAsyncShutdown() throws {
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
|
|
let promise = self.clientGroup.next().makePromise(of: Void.self)
|
|
self.clientGroup.next().execute {
|
|
localClient.shutdown(queue: DispatchQueue(label: "testAsyncShutdown")) { error in
|
|
XCTAssertNil(error)
|
|
promise.succeed(())
|
|
}
|
|
}
|
|
XCTAssertNoThrow(try promise.futureResult.wait())
|
|
}
|
|
|
|
func testAsyncShutdownDefaultQueue() throws {
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
|
|
let promise = self.clientGroup.next().makePromise(of: Void.self)
|
|
self.clientGroup.next().execute {
|
|
localClient.shutdown { error in
|
|
XCTAssertNil(error)
|
|
promise.succeed(())
|
|
}
|
|
}
|
|
XCTAssertNoThrow(try promise.futureResult.wait())
|
|
}
|
|
|
|
func testValidationErrorsAreSurfaced() throws {
|
|
let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .TRACE, body: .stream { _ in
|
|
self.defaultClient.eventLoopGroup.next().makeSucceededFuture(())
|
|
})
|
|
let runningRequest = self.defaultClient.execute(request: request)
|
|
XCTAssertThrowsError(try runningRequest.wait()) { error in
|
|
XCTAssertEqual(HTTPClientError.traceRequestWithBody, error as? HTTPClientError)
|
|
}
|
|
}
|
|
|
|
func testUploadsReallyStream() {
|
|
final class HTTPServer: ChannelInboundHandler {
|
|
typealias InboundIn = HTTPServerRequestPart
|
|
typealias OutboundOut = HTTPServerResponsePart
|
|
|
|
private let headPromise: EventLoopPromise<HTTPRequestHead>
|
|
private let bodyPromises: [EventLoopPromise<ByteBuffer>]
|
|
private let endPromise: EventLoopPromise<Void>
|
|
private var bodyPartsSeenSoFar = 0
|
|
private var atEnd = false
|
|
|
|
init(headPromise: EventLoopPromise<HTTPRequestHead>,
|
|
bodyPromises: [EventLoopPromise<ByteBuffer>],
|
|
endPromise: EventLoopPromise<Void>) {
|
|
self.headPromise = headPromise
|
|
self.bodyPromises = bodyPromises
|
|
self.endPromise = endPromise
|
|
}
|
|
|
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
switch self.unwrapInboundIn(data) {
|
|
case .head(let head):
|
|
XCTAssert(self.bodyPartsSeenSoFar == 0)
|
|
self.headPromise.succeed(head)
|
|
case .body(let bytes):
|
|
let myNumber = self.bodyPartsSeenSoFar
|
|
self.bodyPartsSeenSoFar += 1
|
|
self.bodyPromises.dropFirst(myNumber).first?.succeed(bytes) ?? XCTFail("ouch, too many chunks")
|
|
case .end:
|
|
context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))),
|
|
promise: nil)
|
|
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: self.endPromise)
|
|
self.atEnd = true
|
|
}
|
|
}
|
|
|
|
func handlerRemoved(context: ChannelHandlerContext) {
|
|
guard !self.atEnd else {
|
|
return
|
|
}
|
|
struct NotFulfilledError: Error {}
|
|
|
|
self.headPromise.fail(NotFulfilledError())
|
|
self.bodyPromises.forEach {
|
|
$0.fail(NotFulfilledError())
|
|
}
|
|
self.endPromise.fail(NotFulfilledError())
|
|
}
|
|
}
|
|
|
|
let group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
|
|
defer {
|
|
XCTAssertNoThrow(try group.syncShutdownGracefully())
|
|
}
|
|
let client = HTTPClient(eventLoopGroupProvider: .shared(group))
|
|
defer {
|
|
XCTAssertNoThrow(try client.syncShutdown())
|
|
}
|
|
let headPromise = group.next().makePromise(of: HTTPRequestHead.self)
|
|
let bodyPromises = (0..<16).map { _ in group.next().makePromise(of: ByteBuffer.self) }
|
|
let endPromise = group.next().makePromise(of: Void.self)
|
|
let sentOffAllBodyPartsPromise = group.next().makePromise(of: Void.self)
|
|
let streamWriterPromise = group.next().makePromise(of: HTTPClient.Body.StreamWriter.self)
|
|
|
|
func makeServer() -> Channel? {
|
|
return try? ServerBootstrap(group: group)
|
|
.childChannelInitializer { channel in
|
|
channel.pipeline.configureHTTPServerPipeline().flatMap {
|
|
channel.pipeline.addHandler(HTTPServer(headPromise: headPromise,
|
|
bodyPromises: bodyPromises,
|
|
endPromise: endPromise))
|
|
}
|
|
}
|
|
.serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1)
|
|
.bind(host: "127.0.0.1", port: 0)
|
|
.wait()
|
|
}
|
|
|
|
func makeRequest(server: Channel) -> Request? {
|
|
guard let localAddress = server.localAddress else {
|
|
return nil
|
|
}
|
|
|
|
return try? HTTPClient.Request(url: "http://\(localAddress.ipAddress!):\(localAddress.port!)",
|
|
method: .POST,
|
|
headers: ["transfer-encoding": "chunked"],
|
|
body: .stream { streamWriter in
|
|
streamWriterPromise.succeed(streamWriter)
|
|
return sentOffAllBodyPartsPromise.futureResult
|
|
})
|
|
}
|
|
|
|
guard let server = makeServer(), let request = makeRequest(server: server) else {
|
|
XCTFail("couldn't make a server Channel and a matching Request...")
|
|
return
|
|
}
|
|
defer {
|
|
XCTAssertNoThrow(try server.close().wait())
|
|
}
|
|
|
|
var buffer = ByteBufferAllocator().buffer(capacity: 1)
|
|
let runningRequest = client.execute(request: request)
|
|
guard let streamWriter = try? streamWriterPromise.futureResult.wait() else {
|
|
XCTFail("didn't get StreamWriter")
|
|
return
|
|
}
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.POST, try headPromise.futureResult.wait().method))
|
|
for bodyChunkNumber in 0..<16 {
|
|
buffer.clear()
|
|
buffer.writeString(String(bodyChunkNumber, radix: 16))
|
|
XCTAssertEqual(1, buffer.readableBytes)
|
|
XCTAssertNoThrow(try streamWriter.write(.byteBuffer(buffer)).wait())
|
|
XCTAssertNoThrow(XCTAssertEqual(buffer, try bodyPromises[bodyChunkNumber].futureResult.wait()))
|
|
}
|
|
sentOffAllBodyPartsPromise.succeed(())
|
|
XCTAssertNoThrow(try endPromise.futureResult.wait())
|
|
XCTAssertNoThrow(try runningRequest.wait())
|
|
}
|
|
|
|
func testUploadStreamingCallinToleratedFromOtsideEL() throws {
|
|
let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .POST, body: .stream(length: 4) { writer in
|
|
let promise = self.defaultClient.eventLoopGroup.next().makePromise(of: Void.self)
|
|
// We have to toleare callins from any thread
|
|
DispatchQueue(label: "upload-streaming").async {
|
|
writer.write(.byteBuffer(ByteBuffer(string: "1234"))).whenComplete { _ in
|
|
promise.succeed(())
|
|
}
|
|
}
|
|
return promise.futureResult
|
|
})
|
|
XCTAssertNoThrow(try self.defaultClient.execute(request: request).wait())
|
|
}
|
|
|
|
func testWeHandleUsSendingACloseHeaderCorrectly() {
|
|
guard let req1 = try? Request(url: self.defaultHTTPBinURLPrefix + "stats",
|
|
method: .GET,
|
|
headers: ["connection": "close"]),
|
|
let statsBytes1 = try? self.defaultClient.execute(request: req1).wait().body,
|
|
let stats1 = try? JSONDecoder().decode(RequestInfo.self, from: statsBytes1) else {
|
|
XCTFail("request 1 didn't work")
|
|
return
|
|
}
|
|
guard let statsBytes2 = try? self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "stats").wait().body,
|
|
let stats2 = try? JSONDecoder().decode(RequestInfo.self, from: statsBytes2) else {
|
|
XCTFail("request 2 didn't work")
|
|
return
|
|
}
|
|
guard let statsBytes3 = try? self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "stats").wait().body,
|
|
let stats3 = try? JSONDecoder().decode(RequestInfo.self, from: statsBytes3) else {
|
|
XCTFail("request 3 didn't work")
|
|
return
|
|
}
|
|
|
|
// req 1 and 2 cannot share the same connection (close header)
|
|
XCTAssertEqual(stats1.connectionNumber + 1, stats2.connectionNumber)
|
|
XCTAssertEqual(stats1.requestNumber, 1)
|
|
XCTAssertEqual(stats2.requestNumber, 1)
|
|
|
|
// req 2 and 3 should share the same connection (keep-alive is default)
|
|
XCTAssertEqual(stats2.requestNumber + 1, stats3.requestNumber)
|
|
XCTAssertEqual(stats2.connectionNumber, stats3.connectionNumber)
|
|
}
|
|
|
|
func testWeHandleUsReceivingACloseHeaderCorrectly() {
|
|
guard let req1 = try? Request(url: self.defaultHTTPBinURLPrefix + "stats",
|
|
method: .GET,
|
|
headers: ["X-Send-Back-Header-Connection": "close"]),
|
|
let statsBytes1 = try? self.defaultClient.execute(request: req1).wait().body,
|
|
let stats1 = try? JSONDecoder().decode(RequestInfo.self, from: statsBytes1) else {
|
|
XCTFail("request 1 didn't work")
|
|
return
|
|
}
|
|
guard let statsBytes2 = try? self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "stats").wait().body,
|
|
let stats2 = try? JSONDecoder().decode(RequestInfo.self, from: statsBytes2) else {
|
|
XCTFail("request 2 didn't work")
|
|
return
|
|
}
|
|
guard let statsBytes3 = try? self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "stats").wait().body,
|
|
let stats3 = try? JSONDecoder().decode(RequestInfo.self, from: statsBytes3) else {
|
|
XCTFail("request 3 didn't work")
|
|
return
|
|
}
|
|
|
|
// req 1 and 2 cannot share the same connection (close header)
|
|
XCTAssertEqual(stats1.connectionNumber + 1, stats2.connectionNumber)
|
|
XCTAssertEqual(stats1.requestNumber, 1)
|
|
XCTAssertEqual(stats2.requestNumber, 1)
|
|
|
|
// req 2 and 3 should share the same connection (keep-alive is default)
|
|
XCTAssertEqual(stats2.requestNumber + 1, stats3.requestNumber)
|
|
XCTAssertEqual(stats2.connectionNumber, stats3.connectionNumber)
|
|
}
|
|
|
|
func testWeHandleUsSendingACloseHeaderAmongstOtherConnectionHeadersCorrectly() {
|
|
for closeHeader in [("connection", "close"), ("CoNneCTION", "ClOSe")] {
|
|
guard let req1 = try? Request(url: self.defaultHTTPBinURLPrefix + "stats",
|
|
method: .GET,
|
|
headers: ["X-Send-Back-Header-\(closeHeader.0)":
|
|
"foo,\(closeHeader.1),bar"]),
|
|
let statsBytes1 = try? self.defaultClient.execute(request: req1).wait().body,
|
|
let stats1 = try? JSONDecoder().decode(RequestInfo.self, from: statsBytes1) else {
|
|
XCTFail("request 1 didn't work")
|
|
return
|
|
}
|
|
guard let statsBytes2 = try? self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "stats").wait().body,
|
|
let stats2 = try? JSONDecoder().decode(RequestInfo.self, from: statsBytes2) else {
|
|
XCTFail("request 2 didn't work")
|
|
return
|
|
}
|
|
guard let statsBytes3 = try? self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "stats").wait().body,
|
|
let stats3 = try? JSONDecoder().decode(RequestInfo.self, from: statsBytes3) else {
|
|
XCTFail("request 3 didn't work")
|
|
return
|
|
}
|
|
|
|
// req 1 and 2 cannot share the same connection (close header)
|
|
XCTAssertEqual(stats1.connectionNumber + 1, stats2.connectionNumber)
|
|
XCTAssertEqual(stats2.requestNumber, 1)
|
|
|
|
// req 2 and 3 should share the same connection (keep-alive is default)
|
|
XCTAssertEqual(stats2.requestNumber + 1, stats3.requestNumber)
|
|
XCTAssertEqual(stats2.connectionNumber, stats3.connectionNumber)
|
|
}
|
|
}
|
|
|
|
func testWeHandleUsReceivingACloseHeaderAmongstOtherConnectionHeadersCorrectly() {
|
|
for closeHeader in [("connection", "close"), ("CoNneCTION", "ClOSe")] {
|
|
guard let req1 = try? Request(url: self.defaultHTTPBinURLPrefix + "stats",
|
|
method: .GET,
|
|
headers: ["X-Send-Back-Header-\(closeHeader.0)":
|
|
"foo,\(closeHeader.1),bar"]),
|
|
let statsBytes1 = try? self.defaultClient.execute(request: req1).wait().body,
|
|
let stats1 = try? JSONDecoder().decode(RequestInfo.self, from: statsBytes1) else {
|
|
XCTFail("request 1 didn't work")
|
|
return
|
|
}
|
|
guard let statsBytes2 = try? self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "stats").wait().body,
|
|
let stats2 = try? JSONDecoder().decode(RequestInfo.self, from: statsBytes2) else {
|
|
XCTFail("request 2 didn't work")
|
|
return
|
|
}
|
|
guard let statsBytes3 = try? self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "stats").wait().body,
|
|
let stats3 = try? JSONDecoder().decode(RequestInfo.self, from: statsBytes3) else {
|
|
XCTFail("request 3 didn't work")
|
|
return
|
|
}
|
|
|
|
// req 1 and 2 cannot share the same connection (close header)
|
|
XCTAssertEqual(stats1.connectionNumber + 1, stats2.connectionNumber)
|
|
XCTAssertEqual(stats2.requestNumber, 1)
|
|
|
|
// req 2 and 3 should share the same connection (keep-alive is default)
|
|
XCTAssertEqual(stats2.requestNumber + 1, stats3.requestNumber)
|
|
XCTAssertEqual(stats2.connectionNumber, stats3.connectionNumber)
|
|
}
|
|
}
|
|
|
|
func testLoggingCorrectlyAttachesRequestInformation() {
|
|
let logStore = CollectEverythingLogHandler.LogStore()
|
|
|
|
var loggerYolo001: Logger = Logger(label: "\(#function)", factory: { _ in
|
|
CollectEverythingLogHandler(logStore: logStore)
|
|
})
|
|
loggerYolo001.logLevel = .trace
|
|
loggerYolo001[metadataKey: "yolo-request-id"] = "yolo-001"
|
|
var loggerACME002: Logger = Logger(label: "\(#function)", factory: { _ in
|
|
CollectEverythingLogHandler(logStore: logStore)
|
|
})
|
|
loggerACME002.logLevel = .trace
|
|
loggerACME002[metadataKey: "acme-request-id"] = "acme-002"
|
|
|
|
guard let request1 = try? HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get"),
|
|
let request2 = try? HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "stats"),
|
|
let request3 = try? HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "ok") else {
|
|
XCTFail("bad stuff, can't even make request structures")
|
|
return
|
|
}
|
|
|
|
// === Request 1 (Yolo001)
|
|
XCTAssertNoThrow(try self.defaultClient.execute(request: request1,
|
|
eventLoop: .indifferent,
|
|
deadline: nil,
|
|
logger: loggerYolo001).wait())
|
|
let logsAfterReq1 = logStore.allEntries
|
|
logStore.allEntries = []
|
|
|
|
// === Request 2 (Yolo001)
|
|
XCTAssertNoThrow(try self.defaultClient.execute(request: request2,
|
|
eventLoop: .indifferent,
|
|
deadline: nil,
|
|
logger: loggerYolo001).wait())
|
|
let logsAfterReq2 = logStore.allEntries
|
|
logStore.allEntries = []
|
|
|
|
// === Request 3 (ACME002)
|
|
XCTAssertNoThrow(try self.defaultClient.execute(request: request3,
|
|
eventLoop: .indifferent,
|
|
deadline: nil,
|
|
logger: loggerACME002).wait())
|
|
let logsAfterReq3 = logStore.allEntries
|
|
logStore.allEntries = []
|
|
|
|
// === Assertions
|
|
XCTAssertGreaterThan(logsAfterReq1.count, 0)
|
|
XCTAssertGreaterThan(logsAfterReq2.count, 0)
|
|
XCTAssertGreaterThan(logsAfterReq3.count, 0)
|
|
|
|
XCTAssert(logsAfterReq1.allSatisfy { entry in
|
|
if let httpRequestMetadata = entry.metadata["ahc-request-id"],
|
|
let yoloRequestID = entry.metadata["yolo-request-id"] {
|
|
XCTAssertNil(entry.metadata["acme-request-id"])
|
|
XCTAssertEqual("yolo-001", yoloRequestID)
|
|
XCTAssertNotNil(Int(httpRequestMetadata))
|
|
return true
|
|
} else {
|
|
XCTFail("log message doesn't contain the right IDs: \(entry)")
|
|
return false
|
|
}
|
|
})
|
|
XCTAssert(logsAfterReq1.contains { entry in
|
|
entry.message == "opening fresh connection (no connections to reuse available)"
|
|
})
|
|
|
|
XCTAssert(logsAfterReq2.allSatisfy { entry in
|
|
if let httpRequestMetadata = entry.metadata["ahc-request-id"],
|
|
let yoloRequestID = entry.metadata["yolo-request-id"] {
|
|
XCTAssertNil(entry.metadata["acme-request-id"])
|
|
XCTAssertEqual("yolo-001", yoloRequestID)
|
|
XCTAssertNotNil(Int(httpRequestMetadata))
|
|
return true
|
|
} else {
|
|
XCTFail("log message doesn't contain the right IDs: \(entry)")
|
|
return false
|
|
}
|
|
})
|
|
XCTAssert(logsAfterReq2.contains { entry in
|
|
entry.message.starts(with: "leasing existing connection")
|
|
})
|
|
|
|
XCTAssert(logsAfterReq3.allSatisfy { entry in
|
|
if let httpRequestMetadata = entry.metadata["ahc-request-id"],
|
|
let acmeRequestID = entry.metadata["acme-request-id"] {
|
|
XCTAssertNil(entry.metadata["yolo-request-id"])
|
|
XCTAssertEqual("acme-002", acmeRequestID)
|
|
XCTAssertNotNil(Int(httpRequestMetadata))
|
|
return true
|
|
} else {
|
|
XCTFail("log message doesn't contain the right IDs: \(entry)")
|
|
return false
|
|
}
|
|
})
|
|
XCTAssert(logsAfterReq3.contains { entry in
|
|
entry.message.starts(with: "leasing existing connection")
|
|
})
|
|
}
|
|
|
|
func testNothingIsLoggedAtInfoOrHigher() {
|
|
let logStore = CollectEverythingLogHandler.LogStore()
|
|
|
|
var logger: Logger = Logger(label: "\(#function)", factory: { _ in
|
|
CollectEverythingLogHandler(logStore: logStore)
|
|
})
|
|
logger.logLevel = .info
|
|
|
|
guard let request1 = try? HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get"),
|
|
let request2 = try? HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "stats") else {
|
|
XCTFail("bad stuff, can't even make request structures")
|
|
return
|
|
}
|
|
|
|
// === Request 1
|
|
XCTAssertNoThrow(try self.defaultClient.execute(request: request1,
|
|
eventLoop: .indifferent,
|
|
deadline: nil,
|
|
logger: logger).wait())
|
|
XCTAssertEqual(0, logStore.allEntries.count)
|
|
|
|
// === Request 2
|
|
XCTAssertNoThrow(try self.defaultClient.execute(request: request2,
|
|
eventLoop: .indifferent,
|
|
deadline: nil,
|
|
logger: logger).wait())
|
|
XCTAssertEqual(0, logStore.allEntries.count)
|
|
|
|
// === Synthesized Request
|
|
XCTAssertNoThrow(try self.defaultClient.execute(.GET,
|
|
url: self.defaultHTTPBinURLPrefix + "get",
|
|
body: nil,
|
|
deadline: nil,
|
|
logger: logger).wait())
|
|
XCTAssertEqual(0, logStore.allEntries.count)
|
|
|
|
XCTAssertEqual(0, self.backgroundLogStore.allEntries.count)
|
|
|
|
// === Synthesized Socket Path Request
|
|
XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { path in
|
|
let backgroundLogStore = CollectEverythingLogHandler.LogStore()
|
|
var backgroundLogger = Logger(label: "\(#function)", factory: { _ in
|
|
CollectEverythingLogHandler(logStore: backgroundLogStore)
|
|
})
|
|
backgroundLogger.logLevel = .trace
|
|
|
|
let localSocketPathHTTPBin = HTTPBin(bindTarget: .unixDomainSocket(path))
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
backgroundActivityLogger: backgroundLogger)
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localSocketPathHTTPBin.shutdown())
|
|
}
|
|
|
|
XCTAssertNoThrow(try localClient.execute(.GET,
|
|
socketPath: path,
|
|
urlPath: "get",
|
|
body: nil,
|
|
deadline: nil,
|
|
logger: logger).wait())
|
|
XCTAssertEqual(0, logStore.allEntries.count)
|
|
|
|
XCTAssertEqual(0, backgroundLogStore.allEntries.count)
|
|
})
|
|
|
|
// === Synthesized Secure Socket Path Request
|
|
XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { path in
|
|
let backgroundLogStore = CollectEverythingLogHandler.LogStore()
|
|
var backgroundLogger = Logger(label: "\(#function)", factory: { _ in
|
|
CollectEverythingLogHandler(logStore: backgroundLogStore)
|
|
})
|
|
backgroundLogger.logLevel = .trace
|
|
|
|
let localSocketPathHTTPBin = HTTPBin(ssl: true, bindTarget: .unixDomainSocket(path))
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none),
|
|
backgroundActivityLogger: backgroundLogger)
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localSocketPathHTTPBin.shutdown())
|
|
}
|
|
|
|
XCTAssertNoThrow(try localClient.execute(.GET,
|
|
secureSocketPath: path,
|
|
urlPath: "get",
|
|
body: nil,
|
|
deadline: nil,
|
|
logger: logger).wait())
|
|
XCTAssertEqual(0, logStore.allEntries.count)
|
|
|
|
XCTAssertEqual(0, backgroundLogStore.allEntries.count)
|
|
})
|
|
}
|
|
|
|
func testAllMethodsLog() {
|
|
func checkExpectationsWithLogger<T>(type: String, _ body: (Logger, String) throws -> T) throws -> T {
|
|
let logStore = CollectEverythingLogHandler.LogStore()
|
|
|
|
var logger: Logger = Logger(label: "\(#function)", factory: { _ in
|
|
CollectEverythingLogHandler(logStore: logStore)
|
|
})
|
|
logger.logLevel = .trace
|
|
logger[metadataKey: "req"] = "yo-\(type)"
|
|
|
|
let url = "not-found/request/\(type))"
|
|
let result = try body(logger, url)
|
|
|
|
XCTAssertGreaterThan(logStore.allEntries.count, 0)
|
|
logStore.allEntries.forEach { entry in
|
|
XCTAssertEqual("yo-\(type)", entry.metadata["req"] ?? "n/a")
|
|
XCTAssertNotNil(Int(entry.metadata["ahc-request-id"] ?? "n/a"))
|
|
}
|
|
return result
|
|
}
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.notFound, try checkExpectationsWithLogger(type: "GET") { logger, url in
|
|
try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + url, logger: logger).wait()
|
|
}.status))
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.notFound, try checkExpectationsWithLogger(type: "PUT") { logger, url in
|
|
try self.defaultClient.put(url: self.defaultHTTPBinURLPrefix + url, logger: logger).wait()
|
|
}.status))
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.notFound, try checkExpectationsWithLogger(type: "POST") { logger, url in
|
|
try self.defaultClient.post(url: self.defaultHTTPBinURLPrefix + url, logger: logger).wait()
|
|
}.status))
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.notFound, try checkExpectationsWithLogger(type: "DELETE") { logger, url in
|
|
try self.defaultClient.delete(url: self.defaultHTTPBinURLPrefix + url, logger: logger).wait()
|
|
}.status))
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.notFound, try checkExpectationsWithLogger(type: "PATCH") { logger, url in
|
|
try self.defaultClient.patch(url: self.defaultHTTPBinURLPrefix + url, logger: logger).wait()
|
|
}.status))
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.notFound, try checkExpectationsWithLogger(type: "CHECKOUT") { logger, url in
|
|
try self.defaultClient.execute(.CHECKOUT, url: self.defaultHTTPBinURLPrefix + url, logger: logger).wait()
|
|
}.status))
|
|
|
|
// No background activity expected here.
|
|
XCTAssertEqual(0, self.backgroundLogStore.allEntries.count)
|
|
|
|
XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { path in
|
|
let backgroundLogStore = CollectEverythingLogHandler.LogStore()
|
|
var backgroundLogger = Logger(label: "\(#function)", factory: { _ in
|
|
CollectEverythingLogHandler(logStore: backgroundLogStore)
|
|
})
|
|
backgroundLogger.logLevel = .trace
|
|
|
|
let localSocketPathHTTPBin = HTTPBin(bindTarget: .unixDomainSocket(path))
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
backgroundActivityLogger: backgroundLogger)
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localSocketPathHTTPBin.shutdown())
|
|
}
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.notFound, try checkExpectationsWithLogger(type: "GET") { logger, url in
|
|
try localClient.execute(socketPath: path, urlPath: url, logger: logger).wait()
|
|
}.status))
|
|
|
|
// No background activity expected here.
|
|
XCTAssertEqual(0, backgroundLogStore.allEntries.count)
|
|
})
|
|
|
|
XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { path in
|
|
let backgroundLogStore = CollectEverythingLogHandler.LogStore()
|
|
var backgroundLogger = Logger(label: "\(#function)", factory: { _ in
|
|
CollectEverythingLogHandler(logStore: backgroundLogStore)
|
|
})
|
|
backgroundLogger.logLevel = .trace
|
|
|
|
let localSocketPathHTTPBin = HTTPBin(ssl: true, bindTarget: .unixDomainSocket(path))
|
|
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: HTTPClient.Configuration(certificateVerification: .none),
|
|
backgroundActivityLogger: backgroundLogger)
|
|
defer {
|
|
XCTAssertNoThrow(try localClient.syncShutdown())
|
|
XCTAssertNoThrow(try localSocketPathHTTPBin.shutdown())
|
|
}
|
|
|
|
XCTAssertNoThrow(XCTAssertEqual(.notFound, try checkExpectationsWithLogger(type: "GET") { logger, url in
|
|
try localClient.execute(secureSocketPath: path, urlPath: url, logger: logger).wait()
|
|
}.status))
|
|
|
|
// No background activity expected here.
|
|
XCTAssertEqual(0, backgroundLogStore.allEntries.count)
|
|
})
|
|
}
|
|
|
|
func testClosingIdleConnectionsInPoolLogsInTheBackground() {
|
|
XCTAssertNoThrow(try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "/get").wait())
|
|
|
|
XCTAssertNoThrow(try self.defaultClient.syncShutdown())
|
|
|
|
XCTAssertGreaterThanOrEqual(self.backgroundLogStore.allEntries.count, 0)
|
|
XCTAssert(self.backgroundLogStore.allEntries.contains { entry in
|
|
entry.message == "closing provider"
|
|
})
|
|
XCTAssert(self.backgroundLogStore.allEntries.allSatisfy { entry in
|
|
entry.metadata["ahc-request-id"] == nil &&
|
|
entry.metadata["ahc-request"] == nil &&
|
|
entry.metadata["ahc-provider"] != nil
|
|
})
|
|
|
|
self.defaultClient = nil // so it doesn't get shut down again.
|
|
}
|
|
|
|
func testUploadStreamingNoLength() throws {
|
|
let server = NIOHTTP1TestServer(group: self.serverGroup)
|
|
let client = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
|
|
defer {
|
|
XCTAssertNoThrow(try client.syncShutdown())
|
|
XCTAssertNoThrow(try server.stop())
|
|
}
|
|
|
|
var request = try HTTPClient.Request(url: "http://localhost:\(server.serverPort)/")
|
|
request.body = .stream { writer in
|
|
writer.write(.byteBuffer(ByteBuffer(string: "1234")))
|
|
}
|
|
|
|
let future = client.execute(request: request)
|
|
|
|
switch try server.readInbound() {
|
|
case .head(let head):
|
|
XCTAssertEqual(head.headers["transfer-encoding"], ["chunked"])
|
|
default:
|
|
XCTFail("Unexpected part")
|
|
}
|
|
|
|
XCTAssertNoThrow(try server.readInbound()) // .body
|
|
XCTAssertNoThrow(try server.readInbound()) // .end
|
|
|
|
XCTAssertNoThrow(try server.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .ok))))
|
|
XCTAssertNoThrow(try server.writeOutbound(.end(nil)))
|
|
|
|
XCTAssertNoThrow(try future.wait())
|
|
}
|
|
|
|
func testConnectErrorPropagatedToDelegate() throws {
|
|
class TestDelegate: HTTPClientResponseDelegate {
|
|
typealias Response = Void
|
|
var error: Error?
|
|
func didFinishRequest(task: HTTPClient.Task<Void>) throws {}
|
|
func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {
|
|
self.error = error
|
|
}
|
|
}
|
|
|
|
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
|
|
configuration: .init(timeout: .init(connect: .milliseconds(10))))
|
|
|
|
defer {
|
|
XCTAssertNoThrow(try httpClient.syncShutdown())
|
|
}
|
|
|
|
// This must throw as 198.51.100.254 is reserved for documentation only
|
|
let request = try HTTPClient.Request(url: "http://198.51.100.254:65535/get")
|
|
let delegate = TestDelegate()
|
|
|
|
XCTAssertThrowsError(try httpClient.execute(request: request, delegate: delegate).wait()) { error in
|
|
XCTAssertEqual(.connectTimeout(.milliseconds(10)), error as? ChannelError)
|
|
XCTAssertEqual(.connectTimeout(.milliseconds(10)), delegate.error as? ChannelError)
|
|
}
|
|
}
|
|
|
|
func testDelegateCallinsTolerateRandomEL() throws {
|
|
class TestDelegate: HTTPClientResponseDelegate {
|
|
typealias Response = Void
|
|
let eventLoop: EventLoop
|
|
|
|
init(eventLoop: EventLoop) {
|
|
self.eventLoop = eventLoop
|
|
}
|
|
|
|
func didReceiveHead(task: HTTPClient.Task<Void>, _: HTTPResponseHead) -> EventLoopFuture<Void> {
|
|
return self.eventLoop.makeSucceededFuture(())
|
|
}
|
|
|
|
func didReceiveBodyPart(task: HTTPClient.Task<Void>, _: ByteBuffer) -> EventLoopFuture<Void> {
|
|
return self.eventLoop.makeSucceededFuture(())
|
|
}
|
|
|
|
func didFinishRequest(task: HTTPClient.Task<Void>) throws {}
|
|
}
|
|
|
|
let elg = getDefaultEventLoopGroup(numberOfThreads: 3)
|
|
let first = elg.next()
|
|
let second = elg.next()
|
|
XCTAssertFalse(first === second)
|
|
|
|
let httpServer = NIOHTTP1TestServer(group: self.serverGroup)
|
|
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(first))
|
|
defer {
|
|
XCTAssertNoThrow(try httpClient.syncShutdown())
|
|
XCTAssertNoThrow(try httpServer.stop())
|
|
XCTAssertNoThrow(try elg.syncShutdownGracefully())
|
|
}
|
|
|
|
let delegate = TestDelegate(eventLoop: second)
|
|
let request = try HTTPClient.Request(url: "http://localhost:\(httpServer.serverPort)/")
|
|
let future = httpClient.execute(request: request, delegate: delegate)
|
|
|
|
XCTAssertNoThrow(try httpServer.readInbound()) // .head
|
|
XCTAssertNoThrow(try httpServer.readInbound()) // .end
|
|
|
|
XCTAssertNoThrow(try httpServer.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .ok))))
|
|
XCTAssertNoThrow(try httpServer.writeOutbound(.body(.byteBuffer(ByteBuffer(string: "1234")))))
|
|
XCTAssertNoThrow(try httpServer.writeOutbound(.end(nil)))
|
|
|
|
XCTAssertNoThrow(try future.wait())
|
|
}
|
|
|
|
func testContentLengthTooLongFails() throws {
|
|
let url = self.defaultHTTPBinURLPrefix + "post"
|
|
XCTAssertThrowsError(
|
|
try self.defaultClient.execute(request:
|
|
Request(url: url,
|
|
body: .stream(length: 10) { streamWriter in
|
|
let promise = self.defaultClient.eventLoopGroup.next().makePromise(of: Void.self)
|
|
DispatchQueue(label: "content-length-test").async {
|
|
streamWriter.write(.byteBuffer(ByteBuffer(string: "1"))).cascade(to: promise)
|
|
}
|
|
return promise.futureResult
|
|
})).wait()) { error in
|
|
XCTAssertEqual(error as! HTTPClientError, HTTPClientError.bodyLengthMismatch)
|
|
}
|
|
// Quickly try another request and check that it works.
|
|
let response = try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "get").wait()
|
|
guard var body = response.body else {
|
|
XCTFail("Body missing: \(response)")
|
|
return
|
|
}
|
|
guard let info = try body.readJSONDecodable(RequestInfo.self, length: body.readableBytes) else {
|
|
XCTFail("Cannot parse body: \(body.readableBytesView.map { $0 })")
|
|
return
|
|
}
|
|
XCTAssertEqual(info.connectionNumber, 1)
|
|
XCTAssertEqual(info.requestNumber, 1)
|
|
}
|
|
|
|
// currently gets stuck because of #250 the server just never replies
|
|
func testContentLengthTooShortFails() throws {
|
|
let url = self.defaultHTTPBinURLPrefix + "post"
|
|
let tooLong = "XBAD BAD BAD NOT HTTP/1.1\r\n\r\n"
|
|
XCTAssertThrowsError(
|
|
try self.defaultClient.execute(request:
|
|
Request(url: url,
|
|
body: .stream(length: 1) { streamWriter in
|
|
streamWriter.write(.byteBuffer(ByteBuffer(string: tooLong)))
|
|
})).wait()) { error in
|
|
XCTAssertEqual(error as! HTTPClientError, HTTPClientError.bodyLengthMismatch)
|
|
}
|
|
// Quickly try another request and check that it works. If we by accident wrote some extra bytes into the
|
|
// stream (and reuse the connection) that could cause problems.
|
|
let response = try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "get").wait()
|
|
guard var body = response.body else {
|
|
XCTFail("Body missing: \(response)")
|
|
return
|
|
}
|
|
guard let info = try body.readJSONDecodable(RequestInfo.self, length: body.readableBytes) else {
|
|
XCTFail("Cannot parse body: \(body.readableBytesView.map { $0 })")
|
|
return
|
|
}
|
|
XCTAssertEqual(info.connectionNumber, 1)
|
|
XCTAssertEqual(info.requestNumber, 1)
|
|
}
|
|
|
|
func testBodyUploadAfterEndFails() {
|
|
let url = self.defaultHTTPBinURLPrefix + "post"
|
|
|
|
func uploader(_ streamWriter: HTTPClient.Body.StreamWriter) -> EventLoopFuture<Void> {
|
|
let done = streamWriter.write(.byteBuffer(ByteBuffer(string: "X")))
|
|
done.recover { error -> Void in
|
|
XCTFail("unexpected error \(error)")
|
|
}.whenSuccess {
|
|
// This is executed when we have already sent the end of the request.
|
|
done.eventLoop.execute {
|
|
streamWriter.write(.byteBuffer(ByteBuffer(string: "BAD BAD BAD"))).whenComplete { result in
|
|
switch result {
|
|
case .success:
|
|
XCTFail("we succeeded writing bytes after the end!?")
|
|
case .failure(let error):
|
|
XCTAssertEqual(HTTPClientError.writeAfterRequestSent, error as? HTTPClientError)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return done
|
|
}
|
|
|
|
XCTAssertThrowsError(
|
|
try self.defaultClient.execute(request:
|
|
Request(url: url,
|
|
body: .stream(length: 1, uploader))).wait()) { error in
|
|
XCTAssertEqual(HTTPClientError.writeAfterRequestSent, error as? HTTPClientError)
|
|
}
|
|
|
|
// Quickly try another request and check that it works. If we by accident wrote some extra bytes into the
|
|
// stream (and reuse the connection) that could cause problems.
|
|
XCTAssertNoThrow(try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "get").wait())
|
|
}
|
|
|
|
func testNoBytesSentOverBodyLimit() throws {
|
|
let server = NIOHTTP1TestServer(group: self.serverGroup)
|
|
defer {
|
|
XCTAssertNoThrow(try server.stop())
|
|
}
|
|
|
|
let tooLong = "XBAD BAD BAD NOT HTTP/1.1\r\n\r\n"
|
|
let future = self.defaultClient.execute(
|
|
request: try Request(url: "http://localhost:\(server.serverPort)",
|
|
body: .stream(length: 1) { streamWriter in
|
|
streamWriter.write(.byteBuffer(ByteBuffer(string: tooLong)))
|
|
}))
|
|
|
|
XCTAssertNoThrow(try server.readInbound()) // .head
|
|
// this should fail if client detects that we are about to send more bytes than body limit and closes the connection
|
|
// We can test that this test actually fails if we remove limit check in `writeBodyPart` - it will send bytes, meaning that the next
|
|
// call will not throw, but the future will still throw body mismatch error
|
|
XCTAssertThrowsError(try server.readInbound()) { error in XCTAssertEqual(error as? HTTPParserError, HTTPParserError.invalidEOFState) }
|
|
|
|
XCTAssertThrowsError(try future.wait())
|
|
}
|
|
|
|
func testDoubleError() throws {
|
|
// This is needed to that connection pool will not get into closed state when we release
|
|
// second connection.
|
|
_ = self.defaultClient.get(url: "http://localhost:\(self.defaultHTTPBin.port)/events/10/1")
|
|
|
|
var request = try HTTPClient.Request(url: "http://localhost:\(self.defaultHTTPBin.port)/wait", method: .POST)
|
|
request.body = .stream { writer in
|
|
// Start writing chunks so tha we will try to write after read timeout is thrown
|
|
for _ in 1...10 {
|
|
_ = writer.write(.byteBuffer(ByteBuffer(string: "1234")))
|
|
}
|
|
|
|
let promise = self.clientGroup.next().makePromise(of: Void.self)
|
|
self.clientGroup.next().scheduleTask(in: .milliseconds(3)) {
|
|
writer.write(.byteBuffer(ByteBuffer(string: "1234"))).cascade(to: promise)
|
|
}
|
|
|
|
return promise.futureResult
|
|
}
|
|
|
|
// We specify a deadline of 2 ms co that request will be timed out before all chunks are writtent,
|
|
// we need to verify that second error on write after timeout does not lead to double-release.
|
|
XCTAssertThrowsError(try self.defaultClient.execute(request: request, deadline: .now() + .milliseconds(2)).wait())
|
|
}
|
|
}
|