Fixed a truncated uploaded file

This commit is contained in:
T.Sasaki
2018-11-16 19:42:22 +09:00
parent 3f59178b9e
commit 933ef9b5d4
2 changed files with 132 additions and 62 deletions
+18 -7
View File
@@ -522,6 +522,9 @@ public class FileProviderStreamTask: URLSessionTask, StreamDelegate {
}
guard outputStream != nil else {
if self.state == .canceling || self.state == .completed {
completionHandler(URLError(.cancelled))
}
return
}
if data.count > 4096 {
@@ -613,8 +616,10 @@ public class FileProviderStreamTask: URLSessionTask, StreamDelegate {
return
}
dispatch_queue.async {
while inputStream.streamStatus != .atEnd {
Thread.sleep(forTimeInterval: 0.1)
if !self.isUploadTask {
while inputStream.streamStatus != .atEnd {
Thread.sleep(forTimeInterval: 0.1)
}
}
inputStream.close()
self.streamDelegate?.urlSession?(self._underlyingSession, readClosedFor: self)
@@ -679,6 +684,16 @@ public class FileProviderStreamTask: URLSessionTask, StreamDelegate {
inputStream.setProperty(StreamSocketSecurityLevel.none.rawValue, forKey: .socketSecurityLevelKey)
outputStream.setProperty(StreamSocketSecurityLevel.none.rawValue, forKey: .socketSecurityLevelKey)
if let sslContext = inputStream.property(forKey: kCFStreamPropertySSLContext as Stream.PropertyKey) {
while SSLClose(sslContext as! SSLContext) == errSSLWouldBlock {
guard inputStream.streamError == nil || inputStream.streamError?._code == Int(errSSLClosedGraceful),
outputStream.streamError == nil || outputStream.streamError?._code == Int(errSSLClosedGraceful) else {
break
}
Thread.sleep(forTimeInterval: 0.1)
}
}
}
}
}
@@ -757,6 +772,7 @@ extension FileProviderStreamTask {
if close {
DispatchQueue.main.sync {
outputStream.close()
shouldCloseWrite = true
}
self.streamDelegate?.urlSession?(self._underlyingSession, writeClosedFor: self)
@@ -772,11 +788,6 @@ extension FileProviderStreamTask {
inputStream?.remove(from: RunLoop.main, forMode: .init("kCFRunLoopDefaultMode"))
inputStream = nil
// TOFIX: This sleep is a workaround for truncated file uploading
if isUploadTask {
Thread.sleep(forTimeInterval: _underlyingSession.configuration.timeoutIntervalForRequest * 2)
}
outputStream?.close()
outputStream?.remove(from: RunLoop.main, forMode: .init("kCFRunLoopDefaultMode"))
outputStream = nil
+114 -55
View File
@@ -19,23 +19,35 @@ internal extension FTPFileProvider {
completionHandler(nil, error)
return
}
afterSend?(error)
if task.state == .suspended {
task.resume()
self.readData(on: task, minLength: minLength, maxLength: 4096, timeout: timeout, afterSend: afterSend, completionHandler: completionHandler)
}
}
func readData(on task: FileProviderStreamTask,
minLength: Int = 4, maxLength: Int = 4096, timeout: TimeInterval,
afterSend: ((_ error: Error?) -> Void)? = nil,
completionHandler: @escaping (_ response: String?, _ error: Error?) -> Void) {
task.readData(ofMinLength: minLength, maxLength: maxLength, timeout: timeout) { (data, eof, error) in
if let error = error {
completionHandler(nil, error)
return
}
task.readData(ofMinLength: minLength, maxLength: 4096, timeout: timeout) { (data, eof, error) in
if let error = error {
completionHandler(nil, error)
if let data = data, let response = String(data: data, encoding: .utf8) {
let lines = response.components(separatedBy: "\n").compactMap { $0.isEmpty ? nil : $0.trimmingCharacters(in: .whitespacesAndNewlines) }
if let last = lines.last, last.hasPrefix("1") {
// 1XX: Need to wait for some other response
let timeout = self.session.configuration.timeoutIntervalForResource
self.readData(on: task, minLength: minLength, maxLength: maxLength, timeout: timeout, afterSend: afterSend, completionHandler: completionHandler)
// Call afterSend
afterSend?(error)
return
}
if let data = data, let response = String(data: data, encoding: .utf8) {
completionHandler(response.trimmingCharacters(in: .whitespacesAndNewlines), nil)
} else {
completionHandler(nil, URLError(.cannotParseResponse, url: self.url(of: "")))
}
completionHandler(response.trimmingCharacters(in: .whitespacesAndNewlines), nil)
} else {
completionHandler(nil, URLError(.cannotParseResponse, url: self.url(of: "")))
}
}
}
@@ -323,14 +335,28 @@ internal extension FTPFileProvider {
let success_lock = NSLock()
var success = false
let command = useMLST ? "MLSD \(path)" : "LIST \(path)"
self.execute(command: command, on: task, afterSend: { error in
let timeout = self.session.configuration.timeoutIntervalForRequest
var finalData = Data()
var eof = false
let error_lock = NSLock()
var error: Error?
self.execute(command: command, on: task) { (response, error) in
do {
if let error = error {
throw error
}
guard let response = response else {
throw URLError(.cannotParseResponse, url: self.url(of: path))
}
if response.hasPrefix("500") && useMLST {
dataTask.cancel()
self.supportsRFC3659 = false
throw URLError(.unsupportedURL, url: self.url(of: path))
}
let timeout = self.session.configuration.timeoutIntervalForRequest
var finalData = Data()
var eof = false
let error_lock = NSLock()
var error: Error?
while !eof {
let group = DispatchGroup()
group.enter()
@@ -361,34 +387,16 @@ internal extension FTPFileProvider {
}
}
guard let response = String(data: finalData, encoding: .utf8) else {
guard let dataResponse = String(data: finalData, encoding: .utf8) else {
throw URLError(.badServerResponse, url: self.url(of: path))
}
let contents: [String] = response.components(separatedBy: "\n")
let contents: [String] = dataResponse.components(separatedBy: "\n")
.compactMap({ $0.trimmingCharacters(in: .whitespacesAndNewlines) })
success_lock.try()
success = true
success_lock.unlock()
completionHandler(contents, nil)
} catch {
completionHandler([], error)
}
}) { (response, error) in
do {
if let error = error {
throw error
}
guard let response = response else {
throw URLError(.cannotParseResponse, url: self.url(of: path))
}
if response.hasPrefix("500") && useMLST {
dataTask.cancel()
self.supportsRFC3659 = false
throw URLError(.unsupportedURL, url: self.url(of: path))
}
success_lock.try()
if !success && !(response.hasPrefix("25") || response.hasPrefix("15")) {
@@ -480,7 +488,7 @@ internal extension FTPFileProvider {
}
// Send retreive command
self.execute(command: "TYPE I" + "\r\n" + "REST \(position)" + "\r\n" + "RETR \(filePath)", on: task, afterSend: { error in
self.execute(command: "TYPE I" + "\r\n" + "REST \(position)" + "\r\n" + "RETR \(filePath)", on: task) { (response, error) in
// starting passive task
onTask?(dataTask)
@@ -549,7 +557,7 @@ internal extension FTPFileProvider {
}
completionHandler?(nil)
}) { (response, error) in
do {
if let error = error {
throw error
@@ -695,6 +703,20 @@ internal extension FTPFileProvider {
}
let success_lock = NSLock()
var success = false
let completed_lock = NSLock()
var completed = false
func completionOnce(completion: () -> ()) {
completed_lock.lock()
guard !completed else {
completed_lock.unlock()
return
}
completion()
completed = true
completed_lock.unlock()
}
self.execute(command: "STOR \(filePath)", on: task, afterSend: { error in
onTask?(dataTask)
@@ -708,6 +730,10 @@ internal extension FTPFileProvider {
stream.open()
repeat {
guard !completed else {
return
}
lock.lock()
var subdata = Data.init(count: chunkSize)
let count = subdata.withUnsafeMutableBytes { buffer in
@@ -715,7 +741,9 @@ internal extension FTPFileProvider {
}
if count < 0 {
lock.unlock()
completionHandler(stream.streamError ?? URLError(.requestBodyStreamExhausted, url: self.url(of: filePath)))
completionOnce {
completionHandler(stream.streamError ?? URLError(.requestBodyStreamExhausted, url: self.url(of: filePath)))
}
return
}
subdata.count = count
@@ -741,15 +769,20 @@ internal extension FTPFileProvider {
let waitResult = group.wait(timeout: .now() + timeout)
lock.lock()
if let error = error {
lock.unlock()
completionHandler(error)
completionOnce {
completionHandler(error)
}
return
}
if waitResult == .timedOut {
lock.unlock()
completionHandler(URLError(.timedOut, url: self.url(of: filePath)))
completionOnce {
completionHandler(URLError(.timedOut, url: self.url(of: filePath)))
}
return
}
lock.unlock()
@@ -758,15 +791,14 @@ internal extension FTPFileProvider {
success_lock.lock()
success = true
success_lock.unlock()
completionHandler(nil)
}) { (response, error) in
success_lock.lock()
guard success else {
success_lock.unlock()
return
}
success_lock.unlock()
if self.securedDataConnection {
dataTask.stopSecureConnection()
}
// TOFIX: Close read/write stream for receive a FTP response from the server
dataTask.closeRead()
dataTask.closeWrite()
}) { (response, error) in
do {
if let error = error {
throw error
@@ -776,11 +808,38 @@ internal extension FTPFileProvider {
throw URLError(.cannotParseResponse, url: self.url(of: filePath))
}
if !(response.hasPrefix("1") || response.hasPrefix("2")) {
throw FileProviderFTPError(message: response)
let lines = response.components(separatedBy: "\n").compactMap { $0.isEmpty ? nil : $0.trimmingCharacters(in: .whitespacesAndNewlines) }
if lines.count > 0 {
for line in lines {
if !(line.hasPrefix("1") || line.hasPrefix("2")) {
// FTP Error Response
throw FileProviderFTPError(message: response)
}
}
}
success_lock.lock()
if success, let last = lines.last, last.hasPrefix("2") {
success_lock.unlock()
// File successfully transferred.
completionOnce {
completionHandler(nil)
}
return
} else {
success_lock.unlock()
throw URLError(.cannotCreateFile, url: self.url(of: filePath))
}
} catch {
completionHandler(error)
success_lock.lock()
if !success {
dataTask.cancel()
}
success_lock.unlock()
completionOnce {
completionHandler(error)
}
}
}
}