Files
FileProvider/Sources/FPSStreamTask.swift
2016-08-10 12:35:11 +04:30

523 lines
21 KiB
Swift

//
// FPSStreamTask.swift
// FileProvider
//
// Created by Amir Abbas Mousavian.
// Copyright © 2016 Mousavian. Distributed under MIT license.
//
import Foundation
private var lasttaskIdAssociated = 1_000_000_000
/// This class is a replica of NSURLSessionStreamTask with same api for iOS 7/8
/// while it will fallback to NSURLSessionStreamTask in iOS 9.
@objc
public class FPSStreamTask: NSURLSessionTask, NSStreamDelegate {
private var inputStream: NSInputStream?
private var outputStream: NSOutputStream?
private var dispatch_queue: dispatch_queue_t!
internal var _underlyingSession: NSURLSession
private var streamDelegate: FPSStreamDelegate? {
return (_underlyingSession.delegate as? FPSStreamDelegate)
}
private var _taskIdentifier: Int
@available(iOS 9.0, OSX 10.11, *)
static var streamTasks = [Int: NSURLSessionStreamTask]()
@available(iOS 9.0, OSX 10.11, *)
internal var _underlyingTask: NSURLSessionStreamTask? {
return FPSStreamTask.streamTasks[_taskIdentifier]
}
public override var taskIdentifier: Int {
if #available(iOS 9.0, OSX 10.11, *) {
return _underlyingTask!.taskIdentifier
} else {
return _taskIdentifier
}
}
private var _state: NSURLSessionTaskState = .Suspended
override public var state: NSURLSessionTaskState {
if #available(iOS 9.0, OSX 10.11, *) {
return _underlyingTask!.state
} else {
return _state
}
}
override public var originalRequest: NSURLRequest? {
if #available(iOS 9.0, OSX 10.11, *) {
return _underlyingTask!.originalRequest
} else {
return nil
}
}
override public var currentRequest: NSURLRequest? {
if #available(iOS 9.0, OSX 10.11, *) {
return _underlyingTask!.currentRequest
} else {
return nil
}
}
private var _countOfBytesSent: Int64 = 0
private var _countOfBytesRecieved: Int64 = 0
override public var countOfBytesSent: Int64 {
if #available(iOS 9.0, OSX 10.11, *) {
return _underlyingTask!.countOfBytesSent
} else {
return _countOfBytesSent
}
}
override public var countOfBytesReceived: Int64 {
if #available(iOS 9.0, OSX 10.11, *) {
return _underlyingTask!.countOfBytesReceived
} else {
return _countOfBytesRecieved
}
}
override public var countOfBytesExpectedToSend: Int64 {
if #available(iOS 9.0, OSX 10.11, *) {
return _underlyingTask!.countOfBytesExpectedToSend
} else {
return Int64(dataToBeSent.length)
}
}
override public var countOfBytesExpectedToReceive: Int64 {
if #available(iOS 9.0, OSX 10.11, *) {
return _underlyingTask!.countOfBytesExpectedToReceive
} else {
return Int64(dataReceived.length)
}
}
override public init() {
fatalError("Use NSURLSession.fpstreamTask() method")
}
var host: (hostname: String, port: Int)?
var service: NSNetService?
internal init(session: NSURLSession, host: String, port: Int) {
self._underlyingSession = session
if #available(iOS 9.0, OSX 10.11, *) {
let task = session.streamTaskWithHostName(host, port: port)
self._taskIdentifier = task.taskIdentifier
FPSStreamTask.streamTasks[_taskIdentifier] = task
} else {
lasttaskIdAssociated += 1
self._taskIdentifier = lasttaskIdAssociated
self.host = (host, port)
self.dispatch_queue = dispatch_queue_create("FSPStreamTask", DISPATCH_QUEUE_CONCURRENT)
}
}
internal init(session: NSURLSession, netService: NSNetService) {
self._underlyingSession = session
if #available(iOS 9.0, OSX 10.11, *) {
let task = session.streamTaskWithNetService(netService)
self._taskIdentifier = task.taskIdentifier
FPSStreamTask.streamTasks[_taskIdentifier] = task
} else {
lasttaskIdAssociated += 1
self._taskIdentifier = lasttaskIdAssociated
self.service = netService
self.dispatch_queue = dispatch_queue_create("FSPStreamTask", DISPATCH_QUEUE_CONCURRENT)
}
}
override public func cancel() {
if #available(iOS 9.0, OSX 10.11, *) {
_underlyingTask!.cancel()
} else {
self._state = .Canceling
inputStream?.setValue(kCFBooleanTrue, forKey: kCFStreamPropertyShouldCloseNativeSocket as String)
outputStream?.setValue(kCFBooleanTrue, forKey: kCFStreamPropertyShouldCloseNativeSocket as String)
self.inputStream?.close()
self.outputStream?.close()
self.inputStream?.removeFromRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode)
self.outputStream?.removeFromRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode)
self.inputStream?.delegate = nil
self.outputStream?.delegate = nil
self.inputStream = nil
self.outputStream = nil
self._state = .Completed
self._countOfBytesSent = 0
self._countOfBytesRecieved = 0
}
}
var _error: NSError? = nil
override public var error: NSError? {
if #available(iOS 9.0, OSX 10.11, *) {
return _underlyingTask!.error
} else {
return _error
}
}
override public func suspend() {
if #available(iOS 9.0, OSX 10.11, *) {
_underlyingTask!.suspend()
} else {
inputStream?.close()
outputStream?.close()
streamDelegate?.URLSession?(_underlyingSession, readClosedForStreamTask: self)
streamDelegate?.URLSession?(_underlyingSession, writeClosedForStreamTask: self)
self._state = .Suspended
}
}
override public func resume() {
if #available(iOS 9.0, OSX 10.11, *) {
_underlyingTask!.resume()
} else {
var readStream : Unmanaged<CFReadStream>?
var writeStream : Unmanaged<CFWriteStream>?
if inputStream == nil || outputStream == nil {
if let host = host {
let hostRef: CFString = NSString(string: host.hostname)
CFStreamCreatePairWithSocketToHost(kCFAllocatorDefault, hostRef, UInt32(host.port), &readStream, &writeStream)
} else if let service = service {
let cfnetService = CFNetServiceCreate(kCFAllocatorDefault, service.domain, service.type, service.name, Int32(service.port))
CFStreamCreatePairWithSocketToNetService(kCFAllocatorDefault, cfnetService.takeRetainedValue(), &readStream, &writeStream)
}
inputStream = readStream?.takeRetainedValue()
outputStream = writeStream?.takeRetainedValue()
guard let inputStream = inputStream, outputStream = outputStream else {
return
}
streamDelegate?.URLSession?(self._underlyingSession, streamTask: self, didBecomeInputStream: inputStream, outputStream: outputStream)
}
guard let inputStream = inputStream, outputStream = outputStream else {
return
}
inputStream.delegate = self
outputStream.delegate = self
dispatch_sync(dispatch_queue, {
inputStream.scheduleInRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode)
outputStream.scheduleInRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode)
})
inputStream.open()
outputStream.open()
_state = .Running
}
}
private let dataToBeSent: NSMutableData = NSMutableData()
private let dataReceived: NSMutableData = NSMutableData()
/* Read minBytes, or at most maxBytes bytes and invoke the completion
* handler on the sessions delegate queue with the data or an error.
* If an error occurs, any outstanding reads will also fail, and new
* read requests will error out immediately.
*/
public func readDataOfMinLength(minBytes: Int, maxLength maxBytes: Int, timeout: NSTimeInterval, completionHandler: (NSData?, Bool, NSError?) -> Void) {
if #available(iOS 9.0, OSX 10.11, *) {
_underlyingTask!.readDataOfMinLength(minBytes, maxLength: maxBytes, timeout: timeout, completionHandler: completionHandler)
} else {
guard let inputStream = inputStream else {
return
}
var timedOut: Bool = false
dispatch_async(dispatch_queue) {
if timeout > 0 {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, Int64(timeout * 1_000_000_000)), self.dispatch_queue, {
timedOut = true
completionHandler(nil, inputStream.streamStatus == .AtEnd, inputStream.streamError)
})
}
while (self.dataReceived.length == 0 || self.dataReceived.length < minBytes) && !timedOut {
NSRunLoop.currentRunLoop().runUntilDate(NSDate(timeIntervalSinceNow: 0.1));
NSThread.sleepForTimeInterval(0.1)
}
let dR = NSMutableData()
if self.dataReceived.length > maxBytes {
let range = NSRange(location: 0, length: maxBytes - 1)
dR.appendData(self.dataReceived.subdataWithRange(range))
self.dataReceived.replaceBytesInRange(range, withBytes: nil, length: 0)
} else {
dR.appendData(self.dataReceived)
self.dataReceived.length = 0
}
completionHandler(dR, inputStream.streamStatus == .AtEnd, inputStream.streamError)
}
}
}
/* Write the data completely to the underlying socket. If all the
* bytes have not been written by the timeout, a timeout error will
* occur. Note that invocation of the completion handler does not
* guarantee that the remote side has received all the bytes, only
* that they have been written to the kernel. */
public func writeData(data: NSData, timeout: NSTimeInterval, completionHandler: (NSError?) -> Void) {
if #available(iOS 9.0, OSX 10.11, *) {
_underlyingTask!.writeData(data, timeout: timeout, completionHandler: completionHandler)
} else {
guard let outputStream = outputStream else {
return
}
var timedOut: Bool = false
dispatch_async(dispatch_queue) {
if timeout > 0 {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, Int64(timeout * 1_000_000_000)), self.dispatch_queue, {
timedOut = true
completionHandler(self._error)
})
}
self.dataToBeSent.appendData(data)
while !outputStream.hasSpaceAvailable && !timedOut {
NSRunLoop.currentRunLoop().runUntilDate(NSDate(timeIntervalSinceNow: 0.1));
NSThread.sleepForTimeInterval(0.1)
}
if self.dataToBeSent.length > 0 {
let bytesWritten = outputStream.write(UnsafePointer(self.dataToBeSent.bytes), maxLength: self.dataToBeSent.length) ?? -1
if bytesWritten > 0 {
let range = NSRange(location: 0, length: bytesWritten)
self.dataToBeSent.replaceBytesInRange(range, withBytes: nil, length: 0)
self._countOfBytesSent += bytesWritten
completionHandler(nil)
} else {
self._error = outputStream.streamError
completionHandler(outputStream.streamError)
}
}
}
}
}
/* -captureStreams completes any already enqueued reads
* and writes, and then invokes the
* URLSession:streamTask:didBecomeInputStream:outputStream: delegate
* message. When that message is received, the task object is
* considered completed and will not receive any more delegate
* messages. */
public func captureStreams() {
if #available(iOS 9.0, OSX 10.11, *) {
_underlyingTask!.captureStreams()
} else {
guard let outputStream = outputStream, let inputStream = inputStream else {
return
}
dispatch_async(dispatch_queue) {
self.write(false)
while inputStream.streamStatus != .AtEnd {
NSRunLoop.currentRunLoop().runUntilDate(NSDate(timeIntervalSinceNow: 0.1));
NSThread.sleepForTimeInterval(0.1)
}
self.streamDelegate?.URLSession?(self._underlyingSession, streamTask: self, didBecomeInputStream: inputStream, outputStream: outputStream)
}
}
}
/* Enqueue a request to close the write end of the underlying socket.
* All outstanding IO will complete before the write side of the
* socket is closed. The server, however, may continue to write bytes
* back to the client, so best practice is to continue reading from
* the server until you receive EOF.
*/
public func closeWrite() {
if #available(iOS 9.0, OSX 10.11, *) {
_underlyingTask!.closeWrite()
} else {
dispatch_async(dispatch_queue, {
self.write(true)
})
}
}
private func write(close: Bool) {
guard let outputStream = outputStream else {
return
}
while self.dataToBeSent.length > 0 {
let bytesWritten = outputStream.write(UnsafePointer(self.dataToBeSent.bytes), maxLength: self.dataToBeSent.length) ?? -1
if bytesWritten > 0 {
let range = NSRange(location: 0, length: bytesWritten)
self.dataToBeSent.replaceBytesInRange(range, withBytes: nil, length: 0)
self._countOfBytesSent += bytesWritten
} else {
self._error = outputStream.streamError
}
if self.dataToBeSent.length == 0 {
break
}
NSRunLoop.currentRunLoop().runUntilDate(NSDate(timeIntervalSinceNow: 0.1));
NSThread.sleepForTimeInterval(0.1)
}
if close {
outputStream.close()
self.streamDelegate?.URLSession?(self._underlyingSession, writeClosedForStreamTask: self)
}
}
/* Enqueue a request to close the read side of the underlying socket.
* All outstanding IO will complete before the read side is closed.
* You may continue writing to the server.
*/
public func closeRead() {
if #available(iOS 9.0, OSX 10.11, *) {
_underlyingTask!.closeRead()
} else {
guard let inputStream = inputStream else {
return
}
dispatch_async(dispatch_queue) {
while inputStream.streamStatus != .AtEnd {
NSRunLoop.currentRunLoop().runUntilDate(NSDate(timeIntervalSinceNow: 0.1));
NSThread.sleepForTimeInterval(0.1)
}
inputStream.close()
self.streamDelegate?.URLSession?(self._underlyingSession, readClosedForStreamTask: self)
}
}
}
/*
* Begin encrypted handshake. The hanshake begins after all pending
* IO has completed. TLS authentication callbacks are sent to the
* session's -URLSession:task:didReceiveChallenge:completionHandler:
*/
public func startSecureConnection() {
if #available(iOS 9.0, OSX 10.11, *) {
_underlyingTask!.startSecureConnection()
} else {
inputStream!.setProperty(NSStreamSocketSecurityLevelNegotiatedSSL, forKey: NSStreamSocketSecurityLevelKey)
outputStream!.setProperty(NSStreamSocketSecurityLevelNegotiatedSSL, forKey: NSStreamSocketSecurityLevelKey)
}
}
/*
* Cleanly close a secure connection after all pending secure IO has
* completed.
*/
public func stopSecureConnection() {
if #available(iOS 9.0, OSX 10.11, *) {
_underlyingTask!.stopSecureConnection()
} else {
inputStream!.setProperty(NSStreamSocketSecurityLevelNone, forKey: NSStreamSocketSecurityLevelKey)
outputStream!.setProperty(NSStreamSocketSecurityLevelNone, forKey: NSStreamSocketSecurityLevelKey)
}
}
public func stream(aStream: NSStream, handleEvent eventCode: NSStreamEvent) {
switch (eventCode) {
case NSStreamEvent.ErrorOccurred:
self._error = aStream.streamError
streamDelegate?.URLSession?(_underlyingSession, task: self, didCompleteWithError: error)
case NSStreamEvent.EndEncountered:
break
case NSStreamEvent.None:
break
case NSStreamEvent.OpenCompleted:
break
case NSStreamEvent.HasBytesAvailable:
var buffer = [UInt8](count: 2048, repeatedValue: 0)
if (aStream == inputStream) {
while (inputStream!.hasBytesAvailable ?? false) {
let len = inputStream!.read(&buffer, maxLength: buffer.count)
if len > 0 {
dataReceived.appendBytes(&buffer, length: len)
self._countOfBytesRecieved += len
}
}
}
case NSStreamEvent.HasSpaceAvailable:
break
default:
break
}
}
}
extension NSURLSession {
/* Creates a bidirectional stream task to a given host and port.
*/
public func fpstreamTaskWithHostName(hostname: String, port: Int) -> FPSStreamTask {
return FPSStreamTask(session: self, host: hostname, port: port)
}
/* Creates a bidirectional stream task with an NSNetService to identify the endpoint.
* The NSNetService will be resolved before any IO completes.
*/
public func fpstreamTaskWithNetService(service: NSNetService) -> FPSStreamTask {
return fpstreamTaskWithNetService(service)
}
}
@objc
public protocol FPSStreamDelegate : NSURLSessionTaskDelegate {
/* Indiciates that the read side of a connection has been closed. Any
* outstanding reads complete, but future reads will immediately fail.
* This may be sent even when no reads are in progress. However, when
* this delegate message is received, there may still be bytes
* available. You only know that no more bytes are available when you
* are able to read until EOF. */
optional func URLSession(session: NSURLSession, readClosedForStreamTask streamTask: FPSStreamTask)
/* Indiciates that the write side of a connection has been closed.
* Any outstanding writes complete, but future writes will immediately
* fail.
*/
optional func URLSession(session: NSURLSession, writeClosedForStreamTask streamTask: FPSStreamTask)
/* A notification that the system has determined that a better route
* to the host has been detected (eg, a wi-fi interface becoming
* available.) This is a hint to the delegate that it may be
* desirable to create a new task for subsequent work. Note that
* there is no guarantee that the future task will be able to connect
* to the host, so callers should should be prepared for failure of
* reads and writes over any new interface. */
optional func URLSession(session: NSURLSession, betterRouteDiscoveredForStreamTask streamTask: FPSStreamTask)
/* The given task has been completed, and unopened NSInputStream and
* NSOutputStream objects are created from the underlying network
* connection. This will only be invoked after all enqueued IO has
* completed (including any necessary handshakes.) The streamTask
* will not receive any further delegate messages.
*/
optional func URLSession(session: NSURLSession, streamTask: FPSStreamTask, didBecomeInputStream inputStream: NSInputStream, outputStream: NSOutputStream)
}
private let ports = ["http": 80,
"https": 443,
"smb": 445,
"ftp": 21,
"sftp": 22,
"sftp": 2121,
"telnet": 23,
"pop": 110,
"smtp": 25,
"imap": 143]
private let securePorts = ["https": 443,
"smb": 445,
"sftp": 22,
"sftp": 2121,
"telnet": 992,
"pop": 995,
"smtp": 465,
"imap": 993]