// This source file is part of the Swift.org Server APIs open source project // // Copyright (c) 2017 Swift Server API project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See http://swift.org/LICENSE.txt for license information // import CHTTPParser import Foundation import Dispatch public enum StreamingParserError: Error { case ConnectionAbortedError } /// Class that wraps the CHTTPParser and calls the `HTTPRequestHandler` to get the response /// :nodoc: public class StreamingParser: HTTPResponseWriter { let handle: HTTPRequestHandler /// Time to leave socket open waiting for next request to start public let keepAliveTimeout: TimeInterval /// Flag to track if the client wants to send consecutive requests on the same TCP connection var clientRequestedKeepAlive = false /// Tracks when socket should be closed. Needs to have a lock, since it's updated often private let _keepAliveUntilLock = DispatchSemaphore(value: 1) private var _keepAliveUntil: TimeInterval? public var keepAliveUntil: TimeInterval? { get { _keepAliveUntilLock.wait() defer { _keepAliveUntilLock.signal() } return _keepAliveUntil } set { _keepAliveUntilLock.wait() defer { _keepAliveUntilLock.signal() } _keepAliveUntil = newValue } } /// Tracks when we've been told socket has been closed. Needs to have a lock, since if we get confused, bad things happen private let _abortCalledLock = DispatchSemaphore(value: 1) private var _abortCalled: Bool = false internal var abortCalled: Bool { get { _abortCalledLock.wait() defer { _abortCalledLock.signal() } return _abortCalled } set { _abortCalledLock.wait() defer { _abortCalledLock.signal() } _abortCalled = newValue } } /// Optional delegate that can tell us how many connections are in-flight. public weak var connectionCounter: CurrentConnectionCounting? /// Holds the bytes that come from the CHTTPParser until we have enough of them to do something with it var parserBuffer: Data? /// Lock for parser buffer. private let _parserBufferLock = DispatchSemaphore(value: 1) /// HTTP Parser var httpParser = http_parser() var httpParserSettings = http_parser_settings() /// Block that takes a chunk from the HTTPParser as input and writes to a Response as a result var httpBodyProcessingCallback: HTTPBodyProcessing? //Note: we want this to be strong so it holds onto the connector until it's explicitly cleared /// Protocol that we use to send data (and status info) back to the Network layer public var parserConnector: ParserConnecting? ///Flag to track whether our handler has told us not to call it anymore private let _shouldStopProcessingBodyLock = DispatchSemaphore(value: 1) private var _shouldStopProcessingBody: Bool = false private var shouldStopProcessingBody: Bool { get { _shouldStopProcessingBodyLock.wait() defer { _shouldStopProcessingBodyLock.signal() } return _shouldStopProcessingBody } set { _shouldStopProcessingBodyLock.wait() defer { _shouldStopProcessingBodyLock.signal() } _shouldStopProcessingBody = newValue } } var lastCallBack = CallbackRecord.idle var lastHeaderName: String? var parsedHeaders = HTTPHeaders() var parsedHTTPMethod: HTTPMethod? var parsedHTTPVersion: HTTPVersion? var parsedURL: String? /// Is the currently parsed request an upgrade request? public private(set) var upgradeRequested = false /// Class that wraps the CHTTPParser and calls the `HTTPRequestHandler` to get the response /// /// - Parameter handler: function that is used to create the response public init(handler: @escaping HTTPRequestHandler, connectionCounter: CurrentConnectionCounting? = nil, keepAliveTimeout: Double = 5.0) { self.handle = handler self.connectionCounter = connectionCounter self.keepAliveTimeout = keepAliveTimeout //Set up all the callbacks for the CHTTPParser library httpParserSettings.on_message_begin = { parser -> Int32 in guard let listener = StreamingParser.getSelf(parser: parser) else { return Int32(0) } return listener.messageBegan() } httpParserSettings.on_message_complete = { parser -> Int32 in guard let listener = StreamingParser.getSelf(parser: parser) else { return 0 } return listener.messageCompleted() } httpParserSettings.on_headers_complete = { parser -> Int32 in guard let listener = StreamingParser.getSelf(parser: parser) else { return 0 } let methodId = parser?.pointee.method let methodName = String(validatingUTF8: http_method_str(http_method(rawValue: methodId ?? 0))) ?? "GET" let major = Int(parser?.pointee.http_major ?? 0) let minor = Int(parser?.pointee.http_minor ?? 0) //This needs to be set here and not messageCompleted if it's going to work here let keepAlive = http_should_keep_alive(parser) == 1 let upgradeRequested = parser?.pointee.upgrade == 1 return listener.headersCompleted(methodName: methodName, majorVersion: major, minorVersion: minor, keepAlive: keepAlive, upgrade: upgradeRequested) } httpParserSettings.on_header_field = { (parser, chunk, length) -> Int32 in guard let listener = StreamingParser.getSelf(parser: parser) else { return 0 } return listener.headerFieldReceived(data: chunk, length: length) } httpParserSettings.on_header_value = { (parser, chunk, length) -> Int32 in guard let listener = StreamingParser.getSelf(parser: parser) else { return 0 } return listener.headerValueReceived(data: chunk, length: length) } httpParserSettings.on_body = { (parser, chunk, length) -> Int32 in guard let listener = StreamingParser.getSelf(parser: parser) else { return 0 } return listener.bodyReceived(data: chunk, length: length) } httpParserSettings.on_url = { (parser, chunk, length) -> Int32 in guard let listener = StreamingParser.getSelf(parser: parser) else { return 0 } return listener.urlReceived(data: chunk, length: length) } http_parser_init(&httpParser, HTTP_REQUEST) self.httpParser.data = Unmanaged.passUnretained(self).toOpaque() } /// Read a stream from the network, pass it to the parser and return number of bytes consumed /// /// - Parameter data: data coming from network /// - Returns: number of bytes that we sent to the parser public func readStream(data: Data) -> Int { return data.withUnsafeBytes { (ptr) -> Int in return http_parser_execute(&self.httpParser, &self.httpParserSettings, ptr, data.count) } } /// States to track where we are in parsing the HTTP Stream from the client enum CallbackRecord { case idle, messageBegan, messageCompleted, headersCompleted, headerFieldReceived, headerValueReceived, bodyReceived, urlReceived } /// Process change of state as we get more and more parser callbacks /// /// - Parameter currentCallBack: state we are entering, as specified by the CHTTPParser /// - Returns: Whether or not the state actually changed @discardableResult func processCurrentCallback(_ currentCallBack: CallbackRecord) -> Bool { if lastCallBack == currentCallBack { return false } _parserBufferLock.wait() defer { _parserBufferLock.signal() } switch lastCallBack { case .headerFieldReceived: if let parserBuffer = self.parserBuffer { self.lastHeaderName = String(data: parserBuffer, encoding: .utf8) self.parserBuffer = nil } else { print("Missing parserBuffer after \(lastCallBack)") } case .headerValueReceived: if let parserBuffer = self.parserBuffer, let lastHeaderName = self.lastHeaderName, let headerValue = String(data: parserBuffer, encoding: .utf8) { self.parsedHeaders.append([HTTPHeaders.Name(lastHeaderName): headerValue]) self.lastHeaderName = nil self.parserBuffer = nil } else { print("Missing parserBuffer after \(lastCallBack)") } case .headersCompleted: self.parserBuffer = nil if !upgradeRequested { self.httpBodyProcessingCallback = self.handle(self.createRequest(), self) } case .urlReceived: if let parserBuffer = self.parserBuffer { //Under heaptrack, this may appear to leak via _CFGetTSDCreateIfNeeded, // apparently, that's because it triggers thread metadata to be created self.parsedURL = String(data: parserBuffer, encoding: .utf8) self.parserBuffer = nil } else { print("Missing parserBuffer after \(lastCallBack)") } case .idle: break case .messageBegan: break case .messageCompleted: break case .bodyReceived: break } lastCallBack = currentCallBack return true } func messageBegan() -> Int32 { processCurrentCallback(.messageBegan) self.parserConnector?.responseBeginning() return 0 } func messageCompleted() -> Int32 { let didChangeState = processCurrentCallback(.messageCompleted) if let chunkHandler = self.httpBodyProcessingCallback, didChangeState { var dummy = false //We're sending `.end`, which means processing is stopping anyway, so the bool here is pointless switch chunkHandler { case .processBody(let handler): handler(.end, &dummy) case .discardBody: done() } } return 0 } func headersCompleted(methodName: String, majorVersion: Int, minorVersion: Int, keepAlive: Bool, upgrade: Bool) -> Int32 { processCurrentCallback(.headersCompleted) self.parsedHTTPMethod = HTTPMethod(methodName) self.parsedHTTPVersion = HTTPVersion(major: majorVersion, minor: minorVersion) //This needs to be set here and not messageCompleted if it's going to work here self.clientRequestedKeepAlive = keepAlive self.keepAliveUntil = Date(timeIntervalSinceNow: keepAliveTimeout).timeIntervalSinceReferenceDate self.upgradeRequested = upgrade return 0 } func headerFieldReceived(data: UnsafePointer?, length: Int) -> Int32 { processCurrentCallback(.headerFieldReceived) guard let data = data else { return 0 } _parserBufferLock.wait() defer { _parserBufferLock.signal() } data.withMemoryRebound(to: UInt8.self, capacity: length) { (ptr) -> Void in if var parserBuffer = parserBuffer { parserBuffer.append(ptr, count: length) } else { parserBuffer = Data(bytes: data, count: length) } } return 0 } func headerValueReceived(data: UnsafePointer?, length: Int) -> Int32 { processCurrentCallback(.headerValueReceived) guard let data = data else { return 0 } _parserBufferLock.wait() defer { _parserBufferLock.signal() } data.withMemoryRebound(to: UInt8.self, capacity: length) { (ptr) -> Void in if var parserBuffer = parserBuffer { parserBuffer.append(ptr, count: length) } else { parserBuffer = Data(bytes: data, count: length) } } return 0 } func bodyReceived(data: UnsafePointer?, length: Int) -> Int32 { processCurrentCallback(.bodyReceived) guard let data = data else { return 0 } if shouldStopProcessingBody { return 0 } data.withMemoryRebound(to: UInt8.self, capacity: length) { (ptr) -> Void in #if swift(>=4.0) let buff = UnsafeRawBufferPointer(start: ptr, count: length) #else let buff = UnsafeBufferPointer(start: ptr, count: length) #endif let chunk = DispatchData(bytes: buff) if let chunkHandler = self.httpBodyProcessingCallback { switch chunkHandler { case .processBody(let handler): //OK, this sucks. We can't access the value of the `inout` inside this block // due to exclusivity. Which means that if we were to pass a local variable, we'd // have to put a semaphore or something up here to wait for the block to be done before // we could get its value and pass that on to the instance variable. So instead, we're // just passing in a pointer to the internal ivar. But that ivar can't be modified in // more than one place, so we have to put a semaphore around it to prevent that. _shouldStopProcessingBodyLock.wait() handler(.chunk(data: chunk, finishedProcessing: { self._shouldStopProcessingBodyLock.signal() }), &_shouldStopProcessingBody) case .discardBody: break } } } return 0 } func urlReceived(data: UnsafePointer?, length: Int) -> Int32 { processCurrentCallback(.urlReceived) guard let data = data else { return 0 } _parserBufferLock.wait() defer { _parserBufferLock.signal() } data.withMemoryRebound(to: UInt8.self, capacity: length) { (ptr) -> Void in if var parserBuffer = parserBuffer { parserBuffer.append(ptr, count: length) } else { parserBuffer = Data(bytes: data, count: length) } } return 0 } static func getSelf(parser: UnsafeMutablePointer?) -> StreamingParser? { guard let pointee = parser?.pointee.data else { return nil } return Unmanaged.fromOpaque(pointee).takeUnretainedValue() } var headersWritten = false var isChunked = false /// Create a `HTTPRequest` struct from the parsed information public func createRequest() -> HTTPRequest { return HTTPRequest(method: parsedHTTPMethod!, target: parsedURL!, httpVersion: parsedHTTPVersion!, headers: parsedHeaders) } public func writeHeader(status: HTTPResponseStatus, headers: HTTPHeaders, completion: @escaping (Result) -> Void) { guard !headersWritten else { return } var header = "HTTP/1.1 \(status.code) \(status.reasonPhrase)\r\n" let isInformational = status.class == .informational var headers = headers if !isInformational { adjustHeaders(status: status, headers: &headers) } for (key, value) in headers { // TODO encode value using [RFC5987] header += "\(key): \(value)\r\n" } header.append("\r\n") guard !abortCalled else { completion(.error(StreamingParserError.ConnectionAbortedError)) return } // FIXME headers are US-ASCII, anything else should be encoded using [RFC5987] some lines above // TODO use requested encoding if specified if let data = header.data(using: .utf8) { self.parserConnector?.queueSocketWrite(data, completion: completion) if !isInformational { headersWritten = true } } else { //TODO handle encoding error } } func adjustHeaders(status: HTTPResponseStatus, headers: inout HTTPHeaders) { for header in status.suppressedHeaders { headers[header] = nil } if headers[.contentLength] != nil { headers[.transferEncoding] = "identity" } else if parsedHTTPVersion! >= HTTPVersion(major: 1, minor: 1) { switch headers[.transferEncoding] { case .some("identity"): // identity without content-length clientRequestedKeepAlive = false case .some("chunked"): isChunked = true default: isChunked = true headers[.transferEncoding] = "chunked" } } else { // HTTP 1.0 does not support chunked clientRequestedKeepAlive = false headers[.transferEncoding] = nil } if clientRequestedKeepAlive { headers[.connection] = "Keep-Alive" } else { headers[.connection] = "Close" } } public func writeTrailer(_ trailers: HTTPHeaders, completion: @escaping (Result) -> Void) { fatalError("Not implemented") } public func writeBody(_ data: UnsafeHTTPResponseBody, completion: @escaping (Result) -> Void) { guard headersWritten else { //TODO error or default headers? return } guard data.withUnsafeBytes({ $0.count > 0 }) else { completion(.ok) return } let dataToWrite: Data if isChunked { dataToWrite = data.withUnsafeBytes { let chunkStart = (String($0.count, radix: 16) + "\r\n").data(using: .utf8)! var dataToWrite = chunkStart dataToWrite.append(UnsafeBufferPointer(start: $0.baseAddress?.assumingMemoryBound(to: UInt8.self), count: $0.count)) let chunkEnd = "\r\n".data(using: .utf8)! dataToWrite.append(chunkEnd) return dataToWrite } } else if let data = data as? Data { dataToWrite = data } else { dataToWrite = data.withUnsafeBytes { Data($0) } } guard !abortCalled else { completion(.error(StreamingParserError.ConnectionAbortedError)) return } self.parserConnector?.queueSocketWrite(dataToWrite, completion: completion) } public func done(completion: @escaping (Result) -> Void) { guard !abortCalled else { completion(.error(StreamingParserError.ConnectionAbortedError)) return } if isChunked { let chunkTerminate = "0\r\n\r\n".data(using: .utf8)! self.parserConnector?.queueSocketWrite(chunkTerminate, completion: completion) } self.parsedHTTPMethod = nil self.parsedURL = nil self.parsedHeaders = HTTPHeaders() self.lastHeaderName = nil _parserBufferLock.wait() self.parserBuffer = nil _parserBufferLock.signal() self.parsedHTTPMethod = nil self.parsedHTTPVersion = nil self.lastCallBack = .idle self.headersWritten = false self.httpBodyProcessingCallback = nil self.upgradeRequested = false self.shouldStopProcessingBody = false //Note: This used to be passed into the completion block that `Result` used to have // But since that block was removed, we're calling it directly if self.clientRequestedKeepAlive { self.keepAliveUntil = Date(timeIntervalSinceNow: keepAliveTimeout).timeIntervalSinceReferenceDate guard !abortCalled else { completion(.error(StreamingParserError.ConnectionAbortedError)) return } self.parserConnector?.responseComplete() } else { guard !abortCalled else { completion(.error(StreamingParserError.ConnectionAbortedError)) return } self.parserConnector?.responseCompleteCloseWriter() } completion(.ok) } public func abort() { abortCalled = true } deinit { httpParser.data = nil } } /// Protocol implemented by the thing that sits in between us and the network layer /// :nodoc: public protocol ParserConnecting: class { /// Send data to the network do be written to the client func queueSocketWrite(_ from: Data, completion: @escaping (Result) -> Void) /// Let the network know that a response has started to avoid closing a connection during a slow write func responseBeginning() /// Let the network know that a response is complete, so it can be closed after timeout func responseComplete() /// Let the network know that a response is complete and we're ready to close the connection func responseCompleteCloseWriter() /// Used to let the network know we're ready to close the connection func closeWriter() } /// Delegate that can tell us how many connections are in-flight so we can set the Keep-Alive header /// to the correct number of available connections /// :nodoc: public protocol CurrentConnectionCounting: class { /// Current number of active connections var connectionCount: Int { get } }