Compare commits

..

1 Commits

Author SHA1 Message Date
dimitris-c 88f5a0971b Bumping version 2021-09-07 15:46:46 +03:00
3 changed files with 57 additions and 28 deletions
@@ -33,7 +33,6 @@ internal final class NetworkDataStream {
let error: Error?
}
private var lock = UnfairLock()
private var streamCallback: StreamCompletion?
/// The serial queue for all internal async actions.
@@ -67,7 +66,6 @@ internal final class NetworkDataStream {
@discardableResult
func responseStream(completion: @escaping StreamCompletion) -> Self {
lock.lock(); defer { lock.unlock() }
streamCallback = completion
return self
}
@@ -81,7 +79,6 @@ internal final class NetworkDataStream {
}
func cancel() {
lock.lock(); defer { lock.unlock() }
guard state.canBecome(.cancelled) else { return }
state = .cancelled
streamCallback = nil
@@ -44,6 +44,7 @@ public class RemoteAudioSource: AudioStreamSource {
}
internal let underlyingQueue: DispatchQueue
internal let streamOperationQueue: OperationQueue
internal let netStatusService: NetStatusProvider
internal var waitingForNetwork = false
internal let retrierTimeout: Retrier
@@ -66,7 +67,12 @@ public class RemoteAudioSource: AudioStreamSource {
supportsSeek = false
netStatusService = netStatusProvider
self.icycastHeadersProcessor = icycastHeadersProcessor
self.underlyingQueue = DispatchQueue(label: "remote.audio.source.queue", target: underlyingQueue)
self.underlyingQueue = underlyingQueue
streamOperationQueue = OperationQueue()
streamOperationQueue.underlyingQueue = underlyingQueue
streamOperationQueue.maxConcurrentOperationCount = 1
streamOperationQueue.isSuspended = true
streamOperationQueue.name = "remote.audio.source.data.stream.queue"
retrierTimeout = retrier
startNetworkService()
}
@@ -104,6 +110,7 @@ public class RemoteAudioSource: AudioStreamSource {
func close() {
retrierTimeout.cancel()
netStatusService.stop()
streamOperationQueue.cancelAllOperations()
if let streamTask = streamRequest {
streamTask.cancel()
networkingClient.remove(task: streamTask)
@@ -131,10 +138,12 @@ public class RemoteAudioSource: AudioStreamSource {
func suspend() {
streamRequest?.suspend()
streamOperationQueue.isSuspended = true
}
func resume() {
streamRequest?.resume()
streamOperationQueue.isSuspended = false
}
// MARK: Private
@@ -156,9 +165,7 @@ public class RemoteAudioSource: AudioStreamSource {
let request = networkingClient.stream(request: urlRequest)
.responseStream { [weak self] event in
guard let self = self else { return }
self.underlyingQueue.sync {
self.handleResponse(event: event)
}
self.handleResponse(event: event)
}
.resume()
@@ -172,13 +179,17 @@ public class RemoteAudioSource: AudioStreamSource {
switch event {
case let .response(urlResponse):
parseResponseHeader(response: urlResponse)
streamOperationQueue.isSuspended = false
case let .stream(event):
handleStreamEvent(event: event)
case let .complete(event):
if let error = event.error {
delegate?.errorOccured(source: self, error: error)
} else {
delegate?.endOfFileOccured(source: self)
addCompletionOperation { [weak self] in
guard let self = self else { return }
self.delegate?.endOfFileOccured(source: self)
}
}
}
}
@@ -187,23 +198,26 @@ public class RemoteAudioSource: AudioStreamSource {
switch event {
case let .success(value):
if let audioData = value.data {
if shouldTryParsingIcycastHeaders {
let (header, extractedAudio) = icycastHeadersProcessor.proccess(data: audioData)
if let header = header {
shouldTryParsingIcycastHeaders = false
let parser = IcycastHeaderParser()
parsedHeaderOutput = parser.parse(input: header)
if let metadataStep = parsedHeaderOutput?.metadataStep {
metadataStreamProcessor.metadataAvailable(step: metadataStep)
}
addStreamOperation { [weak self] in
guard let self = self else { return }
if self.shouldTryParsingIcycastHeaders {
let (header, extractedAudio) = self.icycastHeadersProcessor.proccess(data: audioData)
if let header = header {
self.shouldTryParsingIcycastHeaders = false
let parser = IcycastHeaderParser()
self.parsedHeaderOutput = parser.parse(input: header)
if let metadataStep = self.parsedHeaderOutput?.metadataStep {
self.metadataStreamProcessor.metadataAvailable(step: metadataStep)
}
let audioCount = processAudio(data: extractedAudio)
relativePosition += audioCount
return
let audioCount = self.processAudio(data: extractedAudio)
self.relativePosition += audioCount
return
}
}
let audioCount = self.processAudio(data: audioData)
self.relativePosition += audioCount
}
let audioCount = processAudio(data: audioData)
relativePosition += audioCount
}
case .failure:
if !netStatusService.isConnected {
@@ -266,7 +280,7 @@ public class RemoteAudioSource: AudioStreamSource {
var urlRequest = URLRequest(url: url)
urlRequest.networkServiceType = .avStreaming
urlRequest.cachePolicy = .reloadIgnoringLocalCacheData
urlRequest.timeoutInterval = 240
urlRequest.timeoutInterval = 60
for header in additionalRequestHeaders {
urlRequest.addValue(header.value, forHTTPHeaderField: header.key)
@@ -287,6 +301,25 @@ public class RemoteAudioSource: AudioStreamSource {
self.seek(at: self.position)
}
}
// MARK: - Network Stream Operation Queue
/// Schedules the given block on the stream operation queue
///
/// - Parameter block: A closure to be executed
private func addStreamOperation(_ block: @escaping () -> Void) {
let operation = BlockOperation(block: block)
streamOperationQueue.addOperation(operation)
}
/// Schedules the given block on the stream operation queue as a completion
///
/// - Parameter block: A closure to be executed
private func addCompletionOperation(_ block: @escaping () -> Void) {
let operation = BlockOperation(block: block)
operation.queuePriority = .veryLow
streamOperationQueue.addOperation(operation)
}
}
extension RemoteAudioSource: MetadataStreamSourceDelegate {
@@ -535,11 +535,10 @@ open class AudioPlayer {
/// This calls `processSource` method every `500 ms`
private func startReadProcessFromSourceIfNeeded() {
guard audioReadSource.state != .activated else { return }
// TODO: this might be needed after all...
// audioReadSource.add { [weak self] in
// self?.processSource()
// }
// audioReadSource.activate()
audioReadSource.add { [weak self] in
self?.processSource()
}
audioReadSource.activate()
}
/// Stops and removes the handler from the timer, @see `audioReadSource`