Files
http/Sources/HTTP/HTTPStreamingParser.swift
2018-01-08 11:07:29 -06:00

585 lines
22 KiB
Swift

// This source file is part of the Swift.org Server APIs open source project
//
// Copyright (c) 2017 Swift Server API project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See http://swift.org/LICENSE.txt for license information
//
import CHTTPParser
import Foundation
import Dispatch
public enum StreamingParserError: Error {
case ConnectionAbortedError
}
/// Class that wraps the CHTTPParser and calls the `HTTPRequestHandler` to get the response
/// :nodoc:
public class StreamingParser: HTTPResponseWriter {
let handle: HTTPRequestHandler
/// Time to leave socket open waiting for next request to start
public let keepAliveTimeout: TimeInterval
/// Flag to track if the client wants to send consecutive requests on the same TCP connection
var clientRequestedKeepAlive = false
/// Tracks when socket should be closed. Needs to have a lock, since it's updated often
private let _keepAliveUntilLock = DispatchSemaphore(value: 1)
private var _keepAliveUntil: TimeInterval?
public var keepAliveUntil: TimeInterval? {
get {
_keepAliveUntilLock.wait()
defer {
_keepAliveUntilLock.signal()
}
return _keepAliveUntil
}
set {
_keepAliveUntilLock.wait()
defer {
_keepAliveUntilLock.signal()
}
_keepAliveUntil = newValue
}
}
/// Tracks when we've been told socket has been closed. Needs to have a lock, since if we get confused, bad things happen
private let _abortCalledLock = DispatchSemaphore(value: 1)
private var _abortCalled: Bool = false
internal var abortCalled: Bool {
get {
_abortCalledLock.wait()
defer {
_abortCalledLock.signal()
}
return _abortCalled
}
set {
_abortCalledLock.wait()
defer {
_abortCalledLock.signal()
}
_abortCalled = newValue
}
}
/// Optional delegate that can tell us how many connections are in-flight.
public weak var connectionCounter: CurrentConnectionCounting?
/// Holds the bytes that come from the CHTTPParser until we have enough of them to do something with it
var parserBuffer: Data?
/// Lock for parser buffer.
private let _parserBufferLock = DispatchSemaphore(value: 1)
/// HTTP Parser
var httpParser = http_parser()
var httpParserSettings = http_parser_settings()
/// Block that takes a chunk from the HTTPParser as input and writes to a Response as a result
var httpBodyProcessingCallback: HTTPBodyProcessing?
//Note: we want this to be strong so it holds onto the connector until it's explicitly cleared
/// Protocol that we use to send data (and status info) back to the Network layer
public var parserConnector: ParserConnecting?
///Flag to track whether our handler has told us not to call it anymore
private let _shouldStopProcessingBodyLock = DispatchSemaphore(value: 1)
private var _shouldStopProcessingBody: Bool = false
private var shouldStopProcessingBody: Bool {
get {
_shouldStopProcessingBodyLock.wait()
defer {
_shouldStopProcessingBodyLock.signal()
}
return _shouldStopProcessingBody
}
set {
_shouldStopProcessingBodyLock.wait()
defer {
_shouldStopProcessingBodyLock.signal()
}
_shouldStopProcessingBody = newValue
}
}
var lastCallBack = CallbackRecord.idle
var lastHeaderName: String?
var parsedHeaders = HTTPHeaders()
var parsedHTTPMethod: HTTPMethod?
var parsedHTTPVersion: HTTPVersion?
var parsedURL: String?
/// Is the currently parsed request an upgrade request?
public private(set) var upgradeRequested = false
/// Class that wraps the CHTTPParser and calls the `HTTPRequestHandler` to get the response
///
/// - Parameter handler: function that is used to create the response
public init(handler: @escaping HTTPRequestHandler, connectionCounter: CurrentConnectionCounting? = nil, keepAliveTimeout: Double = 5.0) {
self.handle = handler
self.connectionCounter = connectionCounter
self.keepAliveTimeout = keepAliveTimeout
//Set up all the callbacks for the CHTTPParser library
httpParserSettings.on_message_begin = { parser -> Int32 in
guard let listener = StreamingParser.getSelf(parser: parser) else {
return Int32(0)
}
return listener.messageBegan()
}
httpParserSettings.on_message_complete = { parser -> Int32 in
guard let listener = StreamingParser.getSelf(parser: parser) else {
return 0
}
return listener.messageCompleted()
}
httpParserSettings.on_headers_complete = { parser -> Int32 in
guard let listener = StreamingParser.getSelf(parser: parser) else {
return 0
}
let methodId = parser?.pointee.method
let methodName = String(validatingUTF8: http_method_str(http_method(rawValue: methodId ?? 0))) ?? "GET"
let major = Int(parser?.pointee.http_major ?? 0)
let minor = Int(parser?.pointee.http_minor ?? 0)
//This needs to be set here and not messageCompleted if it's going to work here
let keepAlive = http_should_keep_alive(parser) == 1
let upgradeRequested = parser?.pointee.upgrade == 1
return listener.headersCompleted(methodName: methodName,
majorVersion: major,
minorVersion: minor,
keepAlive: keepAlive,
upgrade: upgradeRequested)
}
httpParserSettings.on_header_field = { (parser, chunk, length) -> Int32 in
guard let listener = StreamingParser.getSelf(parser: parser) else {
return 0
}
return listener.headerFieldReceived(data: chunk, length: length)
}
httpParserSettings.on_header_value = { (parser, chunk, length) -> Int32 in
guard let listener = StreamingParser.getSelf(parser: parser) else {
return 0
}
return listener.headerValueReceived(data: chunk, length: length)
}
httpParserSettings.on_body = { (parser, chunk, length) -> Int32 in
guard let listener = StreamingParser.getSelf(parser: parser) else {
return 0
}
return listener.bodyReceived(data: chunk, length: length)
}
httpParserSettings.on_url = { (parser, chunk, length) -> Int32 in
guard let listener = StreamingParser.getSelf(parser: parser) else {
return 0
}
return listener.urlReceived(data: chunk, length: length)
}
http_parser_init(&httpParser, HTTP_REQUEST)
self.httpParser.data = Unmanaged.passUnretained(self).toOpaque()
}
/// Read a stream from the network, pass it to the parser and return number of bytes consumed
///
/// - Parameter data: data coming from network
/// - Returns: number of bytes that we sent to the parser
public func readStream(data: Data) -> Int {
return data.withUnsafeBytes { (ptr) -> Int in
return http_parser_execute(&self.httpParser, &self.httpParserSettings, ptr, data.count)
}
}
/// States to track where we are in parsing the HTTP Stream from the client
enum CallbackRecord {
case idle, messageBegan, messageCompleted, headersCompleted, headerFieldReceived, headerValueReceived, bodyReceived, urlReceived
}
/// Process change of state as we get more and more parser callbacks
///
/// - Parameter currentCallBack: state we are entering, as specified by the CHTTPParser
/// - Returns: Whether or not the state actually changed
@discardableResult
func processCurrentCallback(_ currentCallBack: CallbackRecord) -> Bool {
if lastCallBack == currentCallBack {
return false
}
_parserBufferLock.wait()
defer { _parserBufferLock.signal() }
switch lastCallBack {
case .headerFieldReceived:
if let parserBuffer = self.parserBuffer {
self.lastHeaderName = String(data: parserBuffer, encoding: .utf8)
self.parserBuffer = nil
} else {
print("Missing parserBuffer after \(lastCallBack)")
}
case .headerValueReceived:
if let parserBuffer = self.parserBuffer,
let lastHeaderName = self.lastHeaderName,
let headerValue = String(data: parserBuffer, encoding: .utf8) {
self.parsedHeaders.append([HTTPHeaders.Name(lastHeaderName): headerValue])
self.lastHeaderName = nil
self.parserBuffer = nil
} else {
print("Missing parserBuffer after \(lastCallBack)")
}
case .headersCompleted:
self.parserBuffer = nil
if !upgradeRequested {
self.httpBodyProcessingCallback = self.handle(self.createRequest(), self)
}
case .urlReceived:
if let parserBuffer = self.parserBuffer {
//Under heaptrack, this may appear to leak via _CFGetTSDCreateIfNeeded,
// apparently, that's because it triggers thread metadata to be created
self.parsedURL = String(data: parserBuffer, encoding: .utf8)
self.parserBuffer = nil
} else {
print("Missing parserBuffer after \(lastCallBack)")
}
case .idle:
break
case .messageBegan:
break
case .messageCompleted:
break
case .bodyReceived:
break
}
lastCallBack = currentCallBack
return true
}
func messageBegan() -> Int32 {
processCurrentCallback(.messageBegan)
self.parserConnector?.responseBeginning()
return 0
}
func messageCompleted() -> Int32 {
let didChangeState = processCurrentCallback(.messageCompleted)
if let chunkHandler = self.httpBodyProcessingCallback, didChangeState {
var dummy = false //We're sending `.end`, which means processing is stopping anyway, so the bool here is pointless
switch chunkHandler {
case .processBody(let handler):
handler(.end, &dummy)
case .discardBody:
done()
}
}
return 0
}
func headersCompleted(methodName: String,
majorVersion: Int,
minorVersion: Int,
keepAlive: Bool,
upgrade: Bool) -> Int32 {
processCurrentCallback(.headersCompleted)
self.parsedHTTPMethod = HTTPMethod(methodName)
self.parsedHTTPVersion = HTTPVersion(major: majorVersion, minor: minorVersion)
//This needs to be set here and not messageCompleted if it's going to work here
self.clientRequestedKeepAlive = keepAlive
self.keepAliveUntil = Date(timeIntervalSinceNow: keepAliveTimeout).timeIntervalSinceReferenceDate
self.upgradeRequested = upgrade
return 0
}
func headerFieldReceived(data: UnsafePointer<Int8>?, length: Int) -> Int32 {
processCurrentCallback(.headerFieldReceived)
guard let data = data else { return 0 }
_parserBufferLock.wait()
defer { _parserBufferLock.signal() }
data.withMemoryRebound(to: UInt8.self, capacity: length) { (ptr) -> Void in
if var parserBuffer = parserBuffer {
parserBuffer.append(ptr, count: length)
} else {
parserBuffer = Data(bytes: data, count: length)
}
}
return 0
}
func headerValueReceived(data: UnsafePointer<Int8>?, length: Int) -> Int32 {
processCurrentCallback(.headerValueReceived)
guard let data = data else { return 0 }
_parserBufferLock.wait()
defer { _parserBufferLock.signal() }
data.withMemoryRebound(to: UInt8.self, capacity: length) { (ptr) -> Void in
if var parserBuffer = parserBuffer {
parserBuffer.append(ptr, count: length)
} else {
parserBuffer = Data(bytes: data, count: length)
}
}
return 0
}
func bodyReceived(data: UnsafePointer<Int8>?, length: Int) -> Int32 {
processCurrentCallback(.bodyReceived)
guard let data = data else { return 0 }
if shouldStopProcessingBody {
return 0
}
data.withMemoryRebound(to: UInt8.self, capacity: length) { (ptr) -> Void in
#if swift(>=4.0)
let buff = UnsafeRawBufferPointer(start: ptr, count: length)
#else
let buff = UnsafeBufferPointer<UInt8>(start: ptr, count: length)
#endif
let chunk = DispatchData(bytes: buff)
if let chunkHandler = self.httpBodyProcessingCallback {
switch chunkHandler {
case .processBody(let handler):
//OK, this sucks. We can't access the value of the `inout` inside this block
// due to exclusivity. Which means that if we were to pass a local variable, we'd
// have to put a semaphore or something up here to wait for the block to be done before
// we could get its value and pass that on to the instance variable. So instead, we're
// just passing in a pointer to the internal ivar. But that ivar can't be modified in
// more than one place, so we have to put a semaphore around it to prevent that.
_shouldStopProcessingBodyLock.wait()
handler(.chunk(data: chunk, finishedProcessing: { self._shouldStopProcessingBodyLock.signal() }), &_shouldStopProcessingBody)
case .discardBody:
break
}
}
}
return 0
}
func urlReceived(data: UnsafePointer<Int8>?, length: Int) -> Int32 {
processCurrentCallback(.urlReceived)
guard let data = data else { return 0 }
_parserBufferLock.wait()
defer { _parserBufferLock.signal() }
data.withMemoryRebound(to: UInt8.self, capacity: length) { (ptr) -> Void in
if var parserBuffer = parserBuffer {
parserBuffer.append(ptr, count: length)
} else {
parserBuffer = Data(bytes: data, count: length)
}
}
return 0
}
static func getSelf(parser: UnsafeMutablePointer<http_parser>?) -> StreamingParser? {
guard let pointee = parser?.pointee.data else { return nil }
return Unmanaged<StreamingParser>.fromOpaque(pointee).takeUnretainedValue()
}
var headersWritten = false
var isChunked = false
/// Create a `HTTPRequest` struct from the parsed information
public func createRequest() -> HTTPRequest {
return HTTPRequest(method: parsedHTTPMethod!,
target: parsedURL!,
httpVersion: parsedHTTPVersion!,
headers: parsedHeaders)
}
public func writeHeader(status: HTTPResponseStatus, headers: HTTPHeaders, completion: @escaping (Result) -> Void) {
guard !headersWritten else {
return
}
var header = "HTTP/1.1 \(status.code) \(status.reasonPhrase)\r\n"
let isInformational = status.class == .informational
var headers = headers
if !isInformational {
adjustHeaders(status: status, headers: &headers)
}
for (key, value) in headers {
// TODO encode value using [RFC5987]
header += "\(key): \(value)\r\n"
}
header.append("\r\n")
guard !abortCalled else {
completion(.error(StreamingParserError.ConnectionAbortedError))
return
}
// FIXME headers are US-ASCII, anything else should be encoded using [RFC5987] some lines above
// TODO use requested encoding if specified
if let data = header.data(using: .utf8) {
self.parserConnector?.queueSocketWrite(data, completion: completion)
if !isInformational {
headersWritten = true
}
} else {
//TODO handle encoding error
}
}
func adjustHeaders(status: HTTPResponseStatus, headers: inout HTTPHeaders) {
for header in status.suppressedHeaders {
headers[header] = nil
}
if headers[.contentLength] != nil {
headers[.transferEncoding] = "identity"
} else if parsedHTTPVersion! >= HTTPVersion(major: 1, minor: 1) {
switch headers[.transferEncoding] {
case .some("identity"): // identity without content-length
clientRequestedKeepAlive = false
case .some("chunked"):
isChunked = true
default:
isChunked = true
headers[.transferEncoding] = "chunked"
}
} else {
// HTTP 1.0 does not support chunked
clientRequestedKeepAlive = false
headers[.transferEncoding] = nil
}
if clientRequestedKeepAlive {
headers[.connection] = "Keep-Alive"
} else {
headers[.connection] = "Close"
}
}
public func writeTrailer(_ trailers: HTTPHeaders, completion: @escaping (Result) -> Void) {
fatalError("Not implemented")
}
public func writeBody(_ data: UnsafeHTTPResponseBody, completion: @escaping (Result) -> Void) {
guard headersWritten else {
//TODO error or default headers?
return
}
guard data.withUnsafeBytes({ $0.count > 0 }) else {
completion(.ok)
return
}
let dataToWrite: Data
if isChunked {
dataToWrite = data.withUnsafeBytes {
let chunkStart = (String($0.count, radix: 16) + "\r\n").data(using: .utf8)!
var dataToWrite = chunkStart
dataToWrite.append(UnsafeBufferPointer(start: $0.baseAddress?.assumingMemoryBound(to: UInt8.self), count: $0.count))
let chunkEnd = "\r\n".data(using: .utf8)!
dataToWrite.append(chunkEnd)
return dataToWrite
}
} else if let data = data as? Data {
dataToWrite = data
} else {
dataToWrite = data.withUnsafeBytes { Data($0) }
}
guard !abortCalled else {
completion(.error(StreamingParserError.ConnectionAbortedError))
return
}
self.parserConnector?.queueSocketWrite(dataToWrite, completion: completion)
}
public func done(completion: @escaping (Result) -> Void) {
guard !abortCalled else {
completion(.error(StreamingParserError.ConnectionAbortedError))
return
}
if isChunked {
let chunkTerminate = "0\r\n\r\n".data(using: .utf8)!
self.parserConnector?.queueSocketWrite(chunkTerminate, completion: completion)
}
self.parsedHTTPMethod = nil
self.parsedURL = nil
self.parsedHeaders = HTTPHeaders()
self.lastHeaderName = nil
_parserBufferLock.wait()
self.parserBuffer = nil
_parserBufferLock.signal()
self.parsedHTTPMethod = nil
self.parsedHTTPVersion = nil
self.lastCallBack = .idle
self.headersWritten = false
self.httpBodyProcessingCallback = nil
self.upgradeRequested = false
self.shouldStopProcessingBody = false
//Note: This used to be passed into the completion block that `Result` used to have
// But since that block was removed, we're calling it directly
if self.clientRequestedKeepAlive {
self.keepAliveUntil = Date(timeIntervalSinceNow: keepAliveTimeout).timeIntervalSinceReferenceDate
guard !abortCalled else {
completion(.error(StreamingParserError.ConnectionAbortedError))
return
}
self.parserConnector?.responseComplete()
} else {
guard !abortCalled else {
completion(.error(StreamingParserError.ConnectionAbortedError))
return
}
self.parserConnector?.responseCompleteCloseWriter()
}
completion(.ok)
}
public func abort() {
abortCalled = true
}
deinit {
httpParser.data = nil
}
}
/// Protocol implemented by the thing that sits in between us and the network layer
/// :nodoc:
public protocol ParserConnecting: class {
/// Send data to the network do be written to the client
func queueSocketWrite(_ from: Data, completion: @escaping (Result) -> Void)
/// Let the network know that a response has started to avoid closing a connection during a slow write
func responseBeginning()
/// Let the network know that a response is complete, so it can be closed after timeout
func responseComplete()
/// Let the network know that a response is complete and we're ready to close the connection
func responseCompleteCloseWriter()
/// Used to let the network know we're ready to close the connection
func closeWriter()
}
/// Delegate that can tell us how many connections are in-flight so we can set the Keep-Alive header
/// to the correct number of available connections
/// :nodoc:
public protocol CurrentConnectionCounting: class {
/// Current number of active connections
var connectionCount: Int { get }
}