// // FPSStreamTask.swift // FileProvider // // Created by Amir Abbas Mousavian. // Copyright © 2016 Mousavian. Distributed under MIT license. // import Foundation private var lasttaskIdAssociated = 1_000_000_000 /// This class is a replica of NSURLSessionStreamTask with same api for iOS 7/8 /// while it can actually fallback to NSURLSessionStreamTask in iOS 9. public class FileProviderStreamTask: URLSessionTask, StreamDelegate { fileprivate var inputStream: InputStream? fileprivate var outputStream: OutputStream? fileprivate var operation_queue: OperationQueue! internal var _underlyingSession: URLSession fileprivate var streamDelegate: FPSStreamDelegate? { return (_underlyingSession.delegate as? FPSStreamDelegate) } fileprivate var _taskIdentifier: Int fileprivate var _taskDescription: String? /// Force using `URLSessionStreamTask` for iOS 9 and later public let useURLSession: Bool @available(iOS 9.0, OSX 10.11, *) fileprivate static var streamTasks = [Int: URLSessionStreamTask]() @available(iOS 9.0, OSX 10.11, *) internal var _underlyingTask: URLSessionStreamTask? { return FileProviderStreamTask.streamTasks[_taskIdentifier] } /** * An identifier uniquely identifies the task within a given session. * * This value is unique only within the context of a single session; * tasks in other sessions may have the same `taskIdentifier` value. */ open override var taskIdentifier: Int { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { return _underlyingTask!.taskIdentifier } } return _taskIdentifier } /// An app-provided description of the current task. /// /// This value may be nil. It is intended to contain human-readable strings that you can /// then display to the user as part of your app’s user interface. open override var taskDescription: String? { get { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { return _underlyingTask!.taskDescription } } return _taskDescription } @objc(setTaskDescription:) set { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { _underlyingTask!.taskDescription = newValue return } } _taskDescription = newValue } } fileprivate var _state: URLSessionTask.State = .suspended /** * The current state of the task—active, suspended, in the process * of being canceled, or completed. */ override open var state: URLSessionTask.State { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { return _underlyingTask!.state } } return _state } /** * The original request object passed when the task was created. * This value is typically the same as the currently active request (`currentRequest`) * except when the server has responded to the initial request with a redirect to a different URL. */ override open var originalRequest: URLRequest? { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { return _underlyingTask!.originalRequest } } return nil } /** * The URL request object currently being handled by the task. * This value is typically the same as the initial request (`originalRequest`) * except when the server has responded to the initial request with a redirect to a different URL. */ override open var currentRequest: URLRequest? { if #available(iOS 9.0, OSX 10.11, *) { return _underlyingTask!.currentRequest } else { return nil } } fileprivate var _countOfBytesSent: Int64 = 0 { willSet { for observer in observers where observer.keyPath == "countOfBytesSent" { observer.observer.observeValue(forKeyPath: observer.keyPath, of: self, change: [.oldKey: _countOfBytesSent, .oldKey: newValue], context: observer.context) } } didSet { for observer in observers where observer.keyPath == "countOfBytesSent" { observer.observer.observeValue(forKeyPath: observer.keyPath, of: self, change: [.oldKey: oldValue, .oldKey: _countOfBytesSent], context: observer.context) } } } fileprivate var _countOfBytesRecieved: Int64 = 0 { willSet { for observer in observers where observer.keyPath == "countOfBytesRecieved" { observer.observer.observeValue(forKeyPath: observer.keyPath, of: self, change: [.oldKey: _countOfBytesRecieved, .oldKey: newValue], context: observer.context) } } didSet { for observer in observers where observer.keyPath == "countOfBytesRecieved" { observer.observer.observeValue(forKeyPath: observer.keyPath, of: self, change: [.oldKey: oldValue, .oldKey: _countOfBytesRecieved], context: observer.context) } } } /** * The number of bytes that the task has sent to the server in the request body. * * This byte count includes only the length of the request body itself, not the request headers. * * To be notified when this value changes, implement the * `urlSession(_:task:didSendBodyData:totalBytesSent:totalBytesExpectedToSend:)` delegate method. */ override open var countOfBytesSent: Int64 { if #available(iOS 9.0, macOS 10.11, *) { if self.useURLSession { return _underlyingTask!.countOfBytesSent } } return _countOfBytesSent } /** * The number of bytes that the task has received from the server in the response body. * * To be notified when this value changes, implement the `urlSession(_:dataTask:didReceive:)` delegate method (for data and upload tasks) * or the `urlSession(_:downloadTask:didWriteData:totalBytesWritten:totalBytesExpectedToWrite:)` method (for download tasks). */ override open var countOfBytesReceived: Int64 { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { return _underlyingTask!.countOfBytesReceived } } return _countOfBytesRecieved } /** * The number of bytes that the task expects to send in the request body. * * The `URL` loading system can determine the length of the upload data in three ways: * - From the length of the `NSData` object provided as the upload body. * - From the length of the file on disk provided as the upload body of an upload task (not a download task). * - From the `Content-Length` in the request object, if you explicitly set it. * * Otherwise, the value is `NSURLSessionTransferSizeUnknown` (`-1`) if you provided a stream or body data object, or zero (`0`) if you did not. */ override open var countOfBytesExpectedToSend: Int64 { if #available(iOS 9.0, OSX 10.11, *) { return _underlyingTask!.countOfBytesExpectedToSend } else { return Int64(dataToBeSent.count) } } /** * The number of bytes that the task expects to receive in the response body. * * This value is determined based on the `Content-Length` header received from the server. * If that header is absent, the value is `NSURLSessionTransferSizeUnknown`. */ override open var countOfBytesExpectedToReceive: Int64 { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { return _underlyingTask!.countOfBytesExpectedToReceive } } return Int64(dataReceived.count) } var observers: [(keyPath: String, observer: NSObject, context: UnsafeMutableRawPointer?)] = [] public override func addObserver(_ observer: NSObject, forKeyPath keyPath: String, options: NSKeyValueObservingOptions = [], context: UnsafeMutableRawPointer?) { if #available(iOS 9.0, macOS 10.11, *) { if self.useURLSession { self._underlyingTask?.addObserver(observer, forKeyPath: keyPath, options: options, context: context) return } } switch keyPath { case #keyPath(countOfBytesSent): fallthrough case #keyPath(countOfBytesReceived): fallthrough case #keyPath(countOfBytesExpectedToSend): fallthrough case #keyPath(countOfBytesExpectedToReceive): observers.append((keyPath: keyPath, observer: observer, context: context)) default: break } super.addObserver(observer, forKeyPath: keyPath, options: options, context: context) } public override func removeObserver(_ observer: NSObject, forKeyPath keyPath: String) { var newObservers: [(keyPath: String, observer: NSObject, context: UnsafeMutableRawPointer?)] = [] for observer in observers where observer.keyPath != keyPath { newObservers.append(observer) } self.observers = newObservers super.removeObserver(observer, forKeyPath: keyPath) } public override func removeObserver(_ observer: NSObject, forKeyPath keyPath: String, context: UnsafeMutableRawPointer?) { var newObservers: [(keyPath: String, observer: NSObject, context: UnsafeMutableRawPointer?)] = [] for observer in observers where observer.keyPath != keyPath || observer.context != context { newObservers.append(observer) } self.observers = newObservers super.removeObserver(observer, forKeyPath: keyPath, context: context) } override public init() { fatalError("Use NSURLSession.fpstreamTask() method") } fileprivate var host: (hostname: String, port: Int)? fileprivate var service: NetService? internal init(session: URLSession, host: String, port: Int, useURLSession: Bool = true) { self._underlyingSession = session self.useURLSession = useURLSession if #available(iOS 9.0, OSX 10.11, *) { if useURLSession { let task = session.streamTask(withHostName: host, port: port) self._taskIdentifier = task.taskIdentifier FileProviderStreamTask.streamTasks[_taskIdentifier] = task return } } lasttaskIdAssociated += 1 self._taskIdentifier = lasttaskIdAssociated self.host = (host, port) self.operation_queue = OperationQueue() self.operation_queue.name = "FileProviderStreamTask" self.operation_queue.maxConcurrentOperationCount = 1 } internal init(session: URLSession, netService: NetService, useURLSession: Bool = true) { self._underlyingSession = session self.useURLSession = useURLSession if #available(iOS 9.0, OSX 10.11, *) { if useURLSession { let task = session.streamTask(with: netService) self._taskIdentifier = task.taskIdentifier FileProviderStreamTask.streamTasks[_taskIdentifier] = task return } } lasttaskIdAssociated += 1 self._taskIdentifier = lasttaskIdAssociated self.service = netService self.operation_queue = OperationQueue() self.operation_queue.name = "FileProviderStreamTask" self.operation_queue.maxConcurrentOperationCount = 1 } /** * Cancels the task. * * This method returns immediately, marking the task as being canceled. Once a task is marked as being canceled, * `urlSession(_:task:didCompleteWithError:)` will be sent to the task delegate, passing an error * in the domain NSURLErrorDomain with the code `NSURLErrorCancelled`. A task may, under some circumstances, * send messages to its delegate before the cancelation is acknowledged. * * This method may be called on a task that is suspended. */ override open func cancel() { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { _underlyingTask!.cancel() return } } self._state = .canceling inputStream?.setValue(kCFBooleanTrue, forKey: kCFStreamPropertyShouldCloseNativeSocket as String) outputStream?.setValue(kCFBooleanTrue, forKey: kCFStreamPropertyShouldCloseNativeSocket as String) self.inputStream?.close() self.outputStream?.close() self.inputStream?.remove(from: RunLoop.main, forMode: .defaultRunLoopMode) self.outputStream?.remove(from: RunLoop.main, forMode: .defaultRunLoopMode) self.inputStream?.delegate = nil self.outputStream?.delegate = nil self.inputStream = nil self.outputStream = nil self._state = .completed self._countOfBytesSent = 0 self._countOfBytesRecieved = 0 } var _error: Error? = nil /** * An error object that indicates why the task failed. * * This value is `NULL` if the task is still active or if the transfer completed successfully. */ override open var error: Error? { if #available(iOS 9.0, OSX 10.11, *) { if useURLSession { return _underlyingTask!.error } } return _error } /** * Temporarily suspends a task. * * A task, while suspended, produces no network traffic and is not subject to timeouts. * A download task can continue transferring data at a later time. * All other tasks must start over when resumed. */ override open func suspend() { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { _underlyingTask!.suspend() return } } self._state = .suspended self.operation_queue.isSuspended = true } // Resumes the task, if it is suspended. override open func resume() { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { _underlyingTask!.resume() return } } var readStream : Unmanaged? var writeStream : Unmanaged? if inputStream == nil || outputStream == nil { if let host = host { CFStreamCreatePairWithSocketToHost(kCFAllocatorDefault, host.hostname as CFString, UInt32(host.port), &readStream, &writeStream) } else if let service = service { let cfnetService = CFNetServiceCreate(kCFAllocatorDefault, service.domain as CFString, service.type as CFString, service.name as CFString, Int32(service.port)) CFStreamCreatePairWithSocketToNetService(kCFAllocatorDefault, cfnetService.takeRetainedValue(), &readStream, &writeStream) } inputStream = readStream?.takeRetainedValue() outputStream = writeStream?.takeRetainedValue() guard let inputStream = inputStream, let outputStream = outputStream else { return } streamDelegate?.urlSession?(self._underlyingSession, streamTask: self, didBecome: inputStream, outputStream: outputStream) } guard let inputStream = inputStream, let outputStream = outputStream else { return } inputStream.delegate = self outputStream.delegate = self operation_queue.addOperation { inputStream.schedule(in: RunLoop.main, forMode: .defaultRunLoopMode) outputStream.schedule(in: RunLoop.main, forMode: .defaultRunLoopMode) } inputStream.open() outputStream.open() operation_queue.isSuspended = false _state = .running } fileprivate var dataToBeSent: Data = Data() fileprivate var dataReceived: Data = Data() /** * Asynchronously reads a number of bytes from the stream, and calls a handler upon completion. * * - Parameter minBytes: The minimum number of bytes to read. * - ParametermaxBytes: The maximum number of bytes to read. * - Parameter timeout: A timeout for reading bytes. If the read is not completed within the specified interval, * the read is canceled and the completionHandler is called with an error. Pass `0` to prevent a read from timing out. * - Parameter completionHandler: The completion handler to call when all bytes are read, or an error occurs. * This handler is executed on the delegate queue. This completion handler takes the following parameters: * - data: The data read from the stream. * - atEOF: Whether or not the stream reached end-of-file (`EOF`), such that no more data can be read. * - error: An error object that indicates why the read failed, or `nil` if the read was successful. */ open func readData(ofMinLength minBytes: Int, maxLength maxBytes: Int, timeout: TimeInterval, completionHandler: @escaping (_ data: Data?, _ atEOF: Bool, _ error :Error?) -> Void) { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { _underlyingTask!.readData(ofMinLength: minBytes, maxLength: maxBytes, timeout: timeout, completionHandler: completionHandler) return } } guard let inputStream = inputStream else { return } let expireDate = Date(timeIntervalSinceNow: timeout) operation_queue.addOperation { var timedOut: Bool = false while (self.dataReceived.count == 0 || self.dataReceived.count < minBytes) && !timedOut { Thread.sleep(forTimeInterval: 0.1) timedOut = expireDate < Date() } var dR: Data? if self.dataReceived.count > maxBytes { let range: Range = 0.. 0 { dR = self.dataReceived self.dataReceived.count = 0 } } let isEOF = inputStream.streamStatus == .atEnd && self.dataReceived.count == 0 completionHandler(dR, isEOF, dR == nil ? inputStream.streamError : nil) } } /** * Asynchronously writes the specified data to the stream, and calls a handler upon completion. * * There is no guarantee that the remote side of the stream has received all of the written bytes * at the time that `completionHandler` is called, only that all of the data has been written to the kernel. * * - Parameter data: The data to be written. * - Parameter timeout: A timeout for writing bytes. If the write is not completed within the specified interval, * the write is canceled and the `completionHandler` is called with an error. * Pass `0` to prevent a write from timing out. * - Parameter completionHandler: The completion handler to call when all bytes are written, or an error occurs. * This handler is executed on the delegate queue. * This completion handler takes the following parameter: * - error: An error object that indicates why the write failed, or nil if the write was successful. */ open func write(_ data: Data, timeout: TimeInterval, completionHandler: @escaping (_ error: Error?) -> Void) { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { _underlyingTask!.write(data, timeout: timeout, completionHandler: completionHandler) return } } guard outputStream != nil else { return } operation_queue.addOperation { self.dataToBeSent.append(data) let result = self.write(timeout: timeout, close: false) if result < 0 { let error = self.outputStream?.streamError ?? NSError(domain: URLError.errorDomain, code: URLError.cannotWriteToFile.rawValue, userInfo: nil) completionHandler(error) } else { completionHandler(nil) } } } /** * Completes any already enqueued reads and writes, and then invokes the * `urlSession(_:streamTask:didBecome:outputStream:)` delegate message. */ open func captureStreams() { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { _underlyingTask!.captureStreams() return } } guard let outputStream = outputStream, let inputStream = inputStream else { return } self.operation_queue.addOperation { _=self.write(close: false) while inputStream.streamStatus != .atEnd || outputStream.streamStatus == .writing { Thread.sleep(forTimeInterval: 0.1) } self.streamDelegate?.urlSession?(self._underlyingSession, streamTask: self, didBecome: inputStream, outputStream: outputStream) } } /** * Completes any enqueued reads and writes, and then closes the write side of the underlying socket. * * You may continue to read data using the `readData(ofMinLength:maxLength:timeout:completionHandler:)` * method after calling this method. Any calls to `write(_:timeout:completionHandler:)` after calling * this method will result in an error. * * Because the server may continue to write bytes to the client, it is recommended that * you continue reading until the stream reaches end-of-file (EOF). */ open func closeWrite() { if #available(iOS 9.0, OSX 10.11, *) { if self.useURLSession { _underlyingTask!.closeWrite() return } } operation_queue.addOperation { _ = self.write(close: true) } } fileprivate func write(timeout: TimeInterval = 0, close: Bool) -> Int { guard let outputStream = outputStream else { return -1 } var byteSent: Int = 0 let expireDate = Date(timeIntervalSinceNow: timeout) while self.dataToBeSent.count > 0 && (timeout == 0 || expireDate > Date()) { let bytesWritten = self.dataToBeSent.withUnsafeBytes { outputStream.write($0, maxLength: self.dataToBeSent.count) } if bytesWritten > 0 { let range = 0.. 0 { dataReceived.append(&buffer, count: len) self._countOfBytesRecieved += Int64(len) } } } } } public extension URLSession { /// Creates a bidirectional stream task to a given host and port. func fpstreamTask(withHostName hostname: String, port: Int) -> FileProviderStreamTask { return FileProviderStreamTask(session: self, host: hostname, port: port) } /** * Creates a bidirectional stream task with an NSNetService to identify the endpoint. * The NSNetService will be resolved before any IO completes. */ func fpstreamTask(withNetService service: NetService) -> FileProviderStreamTask { return FileProviderStreamTask(session: self, netService: service) } } @objc internal protocol FPSStreamDelegate : URLSessionTaskDelegate { /** * Indiciates that the read side of a connection has been closed. Any * outstanding reads complete, but future reads will immediately fail. * This may be sent even when no reads are in progress. However, when * this delegate message is received, there may still be bytes * available. You only know that no more bytes are available when you * are able to read until EOF. */ @objc optional func urlSession(_ session: URLSession, readClosedFor streamTask: FileProviderStreamTask) /** * Indiciates that the write side of a connection has been closed. * Any outstanding writes complete, but future writes will immediately * fail. */ @objc optional func urlSession(_ session: URLSession, writeClosedFor streamTask: FileProviderStreamTask) /** * A notification that the system has determined that a better route * to the host has been detected (eg, a wi-fi interface becoming * available.) This is a hint to the delegate that it may be * desirable to create a new task for subsequent work. Note that * there is no guarantee that the future task will be able to connect * to the host, so callers should should be prepared for failure of * reads and writes over any new interface. */ @objc optional func urlSession(_ session: URLSession, betterRouteDiscoveredFor streamTask: FileProviderStreamTask) /** * The given task has been completed, and unopened NSInputStream and * NSOutputStream objects are created from the underlying network * connection. This will only be invoked after all enqueued IO has * completed (including any necessary handshakes.) The streamTask * will not receive any further delegate messages. */ @objc optional func urlSession(_ session: URLSession, streamTask: FileProviderStreamTask, didBecome inputStream: InputStream, outputStream: OutputStream) } private let ports: [String: Int] = ["http": 80, "https": 443, "smb": 445,"ftp": 21, "telnet": 23, "pop": 110, "smtp": 25, "imap": 143] private let securePorts: [String: Int] = ["ssh": 22, "https": 443, "smb": 445, "smtp": 465, "ftps": 990,"telnet": 992, "imap": 993, "pop": 995]