|
|
|
@@ -167,6 +167,11 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
public init(url: URL, protocols: [String]? = nil) {
|
|
|
|
|
self.url = url
|
|
|
|
|
self.origin = url.absoluteString
|
|
|
|
|
if let hostUrl = URL (string: "/", relativeTo: url) {
|
|
|
|
|
var origin = hostUrl.absoluteString
|
|
|
|
|
origin.remove(at: origin.index(before: origin.endIndex))
|
|
|
|
|
self.origin = origin
|
|
|
|
|
}
|
|
|
|
|
writeQueue.maxConcurrentOperationCount = 1
|
|
|
|
|
optionalProtocols = protocols
|
|
|
|
|
}
|
|
|
|
@@ -180,12 +185,11 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
/**
|
|
|
|
|
Connect to the WebSocket server on a background thread.
|
|
|
|
|
*/
|
|
|
|
|
public func connect() {
|
|
|
|
|
open func connect() {
|
|
|
|
|
guard !isConnecting else { return }
|
|
|
|
|
didDisconnect = false
|
|
|
|
|
isConnecting = true
|
|
|
|
|
createHTTPRequest()
|
|
|
|
|
isConnecting = false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@@ -198,7 +202,8 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
- Parameter forceTimeout: Maximum time to wait for the server to close the socket.
|
|
|
|
|
- Parameter closeCode: The code to send on disconnect. The default is the normal close code for cleanly disconnecting a webSocket.
|
|
|
|
|
*/
|
|
|
|
|
public func disconnect(forceTimeout: TimeInterval? = nil, closeCode: UInt16 = CloseCode.normal.rawValue) {
|
|
|
|
|
open func disconnect(forceTimeout: TimeInterval? = nil, closeCode: UInt16 = CloseCode.normal.rawValue) {
|
|
|
|
|
guard isConnected else { return }
|
|
|
|
|
switch forceTimeout {
|
|
|
|
|
case .some(let seconds) where seconds > 0:
|
|
|
|
|
let milliseconds = Int(seconds * 1_000)
|
|
|
|
@@ -222,7 +227,7 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
- parameter string: The string to write.
|
|
|
|
|
- parameter completion: The (optional) completion handler.
|
|
|
|
|
*/
|
|
|
|
|
public func write(string: String, completion: (() -> ())? = nil) {
|
|
|
|
|
open func write(string: String, completion: (() -> ())? = nil) {
|
|
|
|
|
guard isConnected else { return }
|
|
|
|
|
dequeueWrite(string.data(using: String.Encoding.utf8)!, code: .textFrame, writeCompletion: completion)
|
|
|
|
|
}
|
|
|
|
@@ -235,7 +240,7 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
- parameter data: The data to write.
|
|
|
|
|
- parameter completion: The (optional) completion handler.
|
|
|
|
|
*/
|
|
|
|
|
public func write(data: Data, completion: (() -> ())? = nil) {
|
|
|
|
|
open func write(data: Data, completion: (() -> ())? = nil) {
|
|
|
|
|
guard isConnected else { return }
|
|
|
|
|
dequeueWrite(data, code: .binaryFrame, writeCompletion: completion)
|
|
|
|
|
}
|
|
|
|
@@ -244,7 +249,7 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
Write a ping to the websocket. This sends it as a control frame.
|
|
|
|
|
Yodel a sound to the planet. This sends it as an astroid. http://youtu.be/Eu5ZJELRiJ8?t=42s
|
|
|
|
|
*/
|
|
|
|
|
public func write(ping: Data, completion: (() -> ())? = nil) {
|
|
|
|
|
open func write(ping: Data, completion: (() -> ())? = nil) {
|
|
|
|
|
guard isConnected else { return }
|
|
|
|
|
dequeueWrite(ping, code: .ping, writeCompletion: completion)
|
|
|
|
|
}
|
|
|
|
@@ -253,7 +258,6 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
Private method that starts the connection.
|
|
|
|
|
*/
|
|
|
|
|
private func createHTTPRequest() {
|
|
|
|
|
|
|
|
|
|
let urlRequest = CFHTTPMessageCreateRequest(kCFAllocatorDefault, "GET" as CFString,
|
|
|
|
|
url as CFURL, kCFHTTPVersion1_1).takeRetainedValue()
|
|
|
|
|
|
|
|
|
@@ -314,6 +318,9 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
//higher level API we will cut over to at some point
|
|
|
|
|
//NSStream.getStreamsToHostWithName(url.host, port: url.port.integerValue, inputStream: &inputStream, outputStream: &outputStream)
|
|
|
|
|
|
|
|
|
|
// Disconnect and clean up any existing streams before setting up a new pair
|
|
|
|
|
disconnectStream(nil, runDelegate: false)
|
|
|
|
|
|
|
|
|
|
var readStream: Unmanaged<CFReadStream>?
|
|
|
|
|
var writeStream: Unmanaged<CFWriteStream>?
|
|
|
|
|
let h = url.host! as NSString
|
|
|
|
@@ -324,6 +331,7 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
inStream.delegate = self
|
|
|
|
|
outStream.delegate = self
|
|
|
|
|
if supportedSSLSchemes.contains(url.scheme!) {
|
|
|
|
|
certValidated = false
|
|
|
|
|
inStream.setProperty(StreamSocketSecurityLevel.negotiatedSSL as AnyObject, forKey: Stream.PropertyKey.socketSecurityLevelKey)
|
|
|
|
|
outStream.setProperty(StreamSocketSecurityLevel.negotiatedSSL as AnyObject, forKey: Stream.PropertyKey.socketSecurityLevelKey)
|
|
|
|
|
if disableSSLCertValidation {
|
|
|
|
@@ -367,37 +375,46 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
|
|
|
|
|
let bytes = UnsafeRawPointer((data as NSData).bytes).assumingMemoryBound(to: UInt8.self)
|
|
|
|
|
var out = timeout * 1_000_000 // wait 5 seconds before giving up
|
|
|
|
|
writeQueue.addOperation { [weak self] in
|
|
|
|
|
while !outStream.hasSpaceAvailable {
|
|
|
|
|
let operation = BlockOperation()
|
|
|
|
|
operation.addExecutionBlock { [weak self, weak operation] in
|
|
|
|
|
guard let sOperation = operation else { return }
|
|
|
|
|
while !outStream.hasSpaceAvailable && !sOperation.isCancelled {
|
|
|
|
|
usleep(100) // wait until the socket is ready
|
|
|
|
|
guard !sOperation.isCancelled else { return }
|
|
|
|
|
out -= 100
|
|
|
|
|
if out < 0 {
|
|
|
|
|
self?.cleanupStream()
|
|
|
|
|
WebSocket.sharedWorkQueue.async {
|
|
|
|
|
self?.cleanupStream()
|
|
|
|
|
}
|
|
|
|
|
self?.doDisconnect(self?.errorWithDetail("write wait timed out", code: 2))
|
|
|
|
|
return
|
|
|
|
|
} else if outStream.streamError != nil {
|
|
|
|
|
return // disconnectStream will be called.
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
guard !sOperation.isCancelled, let s = self else { return }
|
|
|
|
|
// Do the pinning now if needed
|
|
|
|
|
if let sec = s.security, !s.certValidated {
|
|
|
|
|
let trust = outStream.property(forKey: kCFStreamPropertySSLPeerTrust as Stream.PropertyKey) as! SecTrust
|
|
|
|
|
let domain = outStream.property(forKey: kCFStreamSSLPeerName as Stream.PropertyKey) as? String
|
|
|
|
|
s.certValidated = sec.isValid(trust, domain: domain)
|
|
|
|
|
if !s.certValidated {
|
|
|
|
|
WebSocket.sharedWorkQueue.async {
|
|
|
|
|
let error = s.errorWithDetail("Invalid SSL certificate", code: 1)
|
|
|
|
|
s.disconnectStream(error)
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
outStream.write(bytes, maxLength: data.count)
|
|
|
|
|
}
|
|
|
|
|
writeQueue.addOperation(operation)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
Delegate for the stream methods. Processes incoming bytes
|
|
|
|
|
*/
|
|
|
|
|
public func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
|
|
|
|
|
if let sec = security, !certValidated && [.hasBytesAvailable, .hasSpaceAvailable].contains(eventCode) {
|
|
|
|
|
let trust = aStream.property(forKey: kCFStreamPropertySSLPeerTrust as Stream.PropertyKey) as! SecTrust
|
|
|
|
|
let domain = aStream.property(forKey: kCFStreamSSLPeerName as Stream.PropertyKey) as? String
|
|
|
|
|
if sec.isValid(trust, domain: domain) {
|
|
|
|
|
certValidated = true
|
|
|
|
|
} else {
|
|
|
|
|
let error = errorWithDetail("Invalid SSL certificate", code: 1)
|
|
|
|
|
disconnectStream(error)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
open func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
|
|
|
|
|
if eventCode == .hasBytesAvailable {
|
|
|
|
|
if aStream == inputStream {
|
|
|
|
|
processInputStream()
|
|
|
|
@@ -412,14 +429,17 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
/**
|
|
|
|
|
Disconnect the stream object and notifies the delegate.
|
|
|
|
|
*/
|
|
|
|
|
private func disconnectStream(_ error: NSError?) {
|
|
|
|
|
private func disconnectStream(_ error: NSError?, runDelegate: Bool = true) {
|
|
|
|
|
if error == nil {
|
|
|
|
|
writeQueue.waitUntilAllOperationsAreFinished()
|
|
|
|
|
} else {
|
|
|
|
|
writeQueue.cancelAllOperations()
|
|
|
|
|
}
|
|
|
|
|
cleanupStream()
|
|
|
|
|
doDisconnect(error)
|
|
|
|
|
connected = false
|
|
|
|
|
if runDelegate {
|
|
|
|
|
doDisconnect(error)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@@ -438,6 +458,7 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
}
|
|
|
|
|
outputStream = nil
|
|
|
|
|
inputStream = nil
|
|
|
|
|
fragBuffer = nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@@ -463,22 +484,24 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
*/
|
|
|
|
|
private func dequeueInput() {
|
|
|
|
|
while !inputQueue.isEmpty {
|
|
|
|
|
let data = inputQueue[0]
|
|
|
|
|
var work = data
|
|
|
|
|
if let fragBuffer = fragBuffer {
|
|
|
|
|
var combine = NSData(data: fragBuffer) as Data
|
|
|
|
|
combine.append(data)
|
|
|
|
|
work = combine
|
|
|
|
|
self.fragBuffer = nil
|
|
|
|
|
autoreleasepool {
|
|
|
|
|
let data = inputQueue[0]
|
|
|
|
|
var work = data
|
|
|
|
|
if let buffer = fragBuffer {
|
|
|
|
|
var combine = NSData(data: buffer) as Data
|
|
|
|
|
combine.append(data)
|
|
|
|
|
work = combine
|
|
|
|
|
fragBuffer = nil
|
|
|
|
|
}
|
|
|
|
|
let buffer = UnsafeRawPointer((work as NSData).bytes).assumingMemoryBound(to: UInt8.self)
|
|
|
|
|
let length = work.count
|
|
|
|
|
if !connected {
|
|
|
|
|
processTCPHandshake(buffer, bufferLen: length)
|
|
|
|
|
} else {
|
|
|
|
|
processRawMessagesInBuffer(buffer, bufferLen: length)
|
|
|
|
|
}
|
|
|
|
|
inputQueue = inputQueue.filter{ $0 != data }
|
|
|
|
|
}
|
|
|
|
|
let buffer = UnsafeRawPointer((work as NSData).bytes).assumingMemoryBound(to: UInt8.self)
|
|
|
|
|
let length = work.count
|
|
|
|
|
if !connected {
|
|
|
|
|
processTCPHandshake(buffer, bufferLen: length)
|
|
|
|
|
} else {
|
|
|
|
|
processRawMessagesInBuffer(buffer, bufferLen: length)
|
|
|
|
|
}
|
|
|
|
|
inputQueue = inputQueue.filter{ $0 != data }
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -489,17 +512,10 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
let code = processHTTP(buffer, bufferLen: bufferLen)
|
|
|
|
|
switch code {
|
|
|
|
|
case 0:
|
|
|
|
|
connected = true
|
|
|
|
|
guard canDispatch else {return}
|
|
|
|
|
callbackQueue.async { [weak self] in
|
|
|
|
|
guard let s = self else { return }
|
|
|
|
|
s.onConnect?()
|
|
|
|
|
s.delegate?.websocketDidConnect(socket: s)
|
|
|
|
|
s.notificationCenter.post(name: NSNotification.Name(WebsocketDidConnectNotification), object: self)
|
|
|
|
|
}
|
|
|
|
|
break
|
|
|
|
|
case -1:
|
|
|
|
|
fragBuffer = Data(bytes: buffer, count: bufferLen)
|
|
|
|
|
break // do nothing, we are going to collect more data
|
|
|
|
|
break // do nothing, we are going to collect more data
|
|
|
|
|
default:
|
|
|
|
|
doDisconnect(errorWithDetail("Invalid HTTP upgrade", code: UInt16(code)))
|
|
|
|
|
}
|
|
|
|
@@ -528,6 +544,17 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
if code != 0 {
|
|
|
|
|
return code
|
|
|
|
|
}
|
|
|
|
|
isConnecting = false
|
|
|
|
|
connected = true
|
|
|
|
|
didDisconnect = false
|
|
|
|
|
if canDispatch {
|
|
|
|
|
callbackQueue.async { [weak self] in
|
|
|
|
|
guard let s = self else { return }
|
|
|
|
|
s.onConnect?()
|
|
|
|
|
s.delegate?.websocketDidConnect(socket: s)
|
|
|
|
|
s.notificationCenter.post(name: NSNotification.Name(WebsocketDidConnectNotification), object: self)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
totalSize += 1 //skip the last \n
|
|
|
|
|
let restSize = bufferLen - totalSize
|
|
|
|
|
if restSize > 0 {
|
|
|
|
@@ -618,7 +645,8 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
return buffer.fromOffset(bufferLen - extra)
|
|
|
|
|
} else {
|
|
|
|
|
let isFin = (FinMask & baseAddress[0])
|
|
|
|
|
let receivedOpcode = OpCode(rawValue: (OpCodeMask & baseAddress[0]))
|
|
|
|
|
let receivedOpcodeRawValue = (OpCodeMask & baseAddress[0])
|
|
|
|
|
let receivedOpcode = OpCode(rawValue: receivedOpcodeRawValue)
|
|
|
|
|
let isMasked = (MaskMask & baseAddress[1])
|
|
|
|
|
let payloadLen = (PayloadLenMask & baseAddress[1])
|
|
|
|
|
var offset = 2
|
|
|
|
@@ -632,7 +660,7 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
if !isControlFrame && (receivedOpcode != .binaryFrame && receivedOpcode != .continueFrame &&
|
|
|
|
|
receivedOpcode != .textFrame && receivedOpcode != .pong) {
|
|
|
|
|
let errCode = CloseCode.protocolError.rawValue
|
|
|
|
|
doDisconnect(errorWithDetail("unknown opcode: \(receivedOpcode)", code: errCode))
|
|
|
|
|
doDisconnect(errorWithDetail("unknown opcode: \(receivedOpcodeRawValue)", code: errCode))
|
|
|
|
|
writeError(errCode)
|
|
|
|
|
return emptyBuffer
|
|
|
|
|
}
|
|
|
|
@@ -677,18 +705,13 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
if dataLength > UInt64(bufferLen) {
|
|
|
|
|
len = UInt64(bufferLen-offset)
|
|
|
|
|
}
|
|
|
|
|
let data: Data
|
|
|
|
|
if len < 0 {
|
|
|
|
|
len = 0
|
|
|
|
|
data = Data()
|
|
|
|
|
} else {
|
|
|
|
|
if receivedOpcode == .connectionClose && len > 0 {
|
|
|
|
|
let size = MemoryLayout<UInt16>.size
|
|
|
|
|
offset += size
|
|
|
|
|
len -= UInt64(size)
|
|
|
|
|
}
|
|
|
|
|
data = Data(bytes: baseAddress+offset, count: Int(len))
|
|
|
|
|
if receivedOpcode == .connectionClose && len > 0 {
|
|
|
|
|
let size = MemoryLayout<UInt16>.size
|
|
|
|
|
offset += size
|
|
|
|
|
len -= UInt64(size)
|
|
|
|
|
}
|
|
|
|
|
let data = Data(bytes: baseAddress+offset, count: Int(len))
|
|
|
|
|
|
|
|
|
|
if receivedOpcode == .connectionClose {
|
|
|
|
|
var closeReason = "connection closed by server"
|
|
|
|
|
if let customCloseReason = String(data: data, encoding: .utf8) {
|
|
|
|
@@ -835,9 +858,11 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
Used to write things to the stream
|
|
|
|
|
*/
|
|
|
|
|
private func dequeueWrite(_ data: Data, code: OpCode, writeCompletion: (() -> ())? = nil) {
|
|
|
|
|
writeQueue.addOperation { [weak self] in
|
|
|
|
|
let operation = BlockOperation()
|
|
|
|
|
operation.addExecutionBlock { [weak self, weak operation] in
|
|
|
|
|
//stream isn't ready, let's wait
|
|
|
|
|
guard let s = self else { return }
|
|
|
|
|
guard let sOperation = operation else { return }
|
|
|
|
|
var offset = 2
|
|
|
|
|
let dataLength = data.count
|
|
|
|
|
let frame = NSMutableData(capacity: dataLength + s.MaxFrameSize)
|
|
|
|
@@ -864,7 +889,7 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
offset += 1
|
|
|
|
|
}
|
|
|
|
|
var total = 0
|
|
|
|
|
while true {
|
|
|
|
|
while !sOperation.isCancelled {
|
|
|
|
|
guard let outStream = s.outputStream else { break }
|
|
|
|
|
let writeBuffer = UnsafeRawPointer(frame!.bytes+total).assumingMemoryBound(to: UInt8.self)
|
|
|
|
|
let len = outStream.write(writeBuffer, maxLength: offset-total)
|
|
|
|
@@ -891,8 +916,8 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
writeQueue.addOperation(operation)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@@ -901,6 +926,7 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
private func doDisconnect(_ error: NSError?) {
|
|
|
|
|
guard !didDisconnect else { return }
|
|
|
|
|
didDisconnect = true
|
|
|
|
|
isConnecting = false
|
|
|
|
|
connected = false
|
|
|
|
|
guard canDispatch else {return}
|
|
|
|
|
callbackQueue.async { [weak self] in
|
|
|
|
@@ -919,6 +945,7 @@ open class WebSocket : NSObject, StreamDelegate {
|
|
|
|
|
readyToWrite = false
|
|
|
|
|
mutex.unlock()
|
|
|
|
|
cleanupStream()
|
|
|
|
|
writeQueue.cancelAllOperations()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|