Compare commits

...

1 Commits

Author SHA1 Message Date
dimitris-c 4e8a3f0289 experimenting with RemoteAudioSource 2021-09-29 13:21:54 +03:00
3 changed files with 28 additions and 57 deletions
@@ -33,6 +33,7 @@ internal final class NetworkDataStream {
let error: Error? let error: Error?
} }
private var lock = UnfairLock()
private var streamCallback: StreamCompletion? private var streamCallback: StreamCompletion?
/// The serial queue for all internal async actions. /// The serial queue for all internal async actions.
@@ -66,6 +67,7 @@ internal final class NetworkDataStream {
@discardableResult @discardableResult
func responseStream(completion: @escaping StreamCompletion) -> Self { func responseStream(completion: @escaping StreamCompletion) -> Self {
lock.lock(); defer { lock.unlock() }
streamCallback = completion streamCallback = completion
return self return self
} }
@@ -79,6 +81,7 @@ internal final class NetworkDataStream {
} }
func cancel() { func cancel() {
lock.lock(); defer { lock.unlock() }
guard state.canBecome(.cancelled) else { return } guard state.canBecome(.cancelled) else { return }
state = .cancelled state = .cancelled
streamCallback = nil streamCallback = nil
@@ -44,7 +44,6 @@ public class RemoteAudioSource: AudioStreamSource {
} }
internal let underlyingQueue: DispatchQueue internal let underlyingQueue: DispatchQueue
internal let streamOperationQueue: OperationQueue
internal let netStatusService: NetStatusProvider internal let netStatusService: NetStatusProvider
internal var waitingForNetwork = false internal var waitingForNetwork = false
internal let retrierTimeout: Retrier internal let retrierTimeout: Retrier
@@ -67,12 +66,7 @@ public class RemoteAudioSource: AudioStreamSource {
supportsSeek = false supportsSeek = false
netStatusService = netStatusProvider netStatusService = netStatusProvider
self.icycastHeadersProcessor = icycastHeadersProcessor self.icycastHeadersProcessor = icycastHeadersProcessor
self.underlyingQueue = underlyingQueue self.underlyingQueue = DispatchQueue(label: "remote.audio.source.queue", target: underlyingQueue)
streamOperationQueue = OperationQueue()
streamOperationQueue.underlyingQueue = underlyingQueue
streamOperationQueue.maxConcurrentOperationCount = 1
streamOperationQueue.isSuspended = true
streamOperationQueue.name = "remote.audio.source.data.stream.queue"
retrierTimeout = retrier retrierTimeout = retrier
startNetworkService() startNetworkService()
} }
@@ -110,7 +104,6 @@ public class RemoteAudioSource: AudioStreamSource {
func close() { func close() {
retrierTimeout.cancel() retrierTimeout.cancel()
netStatusService.stop() netStatusService.stop()
streamOperationQueue.cancelAllOperations()
if let streamTask = streamRequest { if let streamTask = streamRequest {
streamTask.cancel() streamTask.cancel()
networkingClient.remove(task: streamTask) networkingClient.remove(task: streamTask)
@@ -138,12 +131,10 @@ public class RemoteAudioSource: AudioStreamSource {
func suspend() { func suspend() {
streamRequest?.suspend() streamRequest?.suspend()
streamOperationQueue.isSuspended = true
} }
func resume() { func resume() {
streamRequest?.resume() streamRequest?.resume()
streamOperationQueue.isSuspended = false
} }
// MARK: Private // MARK: Private
@@ -165,7 +156,9 @@ public class RemoteAudioSource: AudioStreamSource {
let request = networkingClient.stream(request: urlRequest) let request = networkingClient.stream(request: urlRequest)
.responseStream { [weak self] event in .responseStream { [weak self] event in
guard let self = self else { return } guard let self = self else { return }
self.handleResponse(event: event) self.underlyingQueue.sync {
self.handleResponse(event: event)
}
} }
.resume() .resume()
@@ -179,17 +172,13 @@ public class RemoteAudioSource: AudioStreamSource {
switch event { switch event {
case let .response(urlResponse): case let .response(urlResponse):
parseResponseHeader(response: urlResponse) parseResponseHeader(response: urlResponse)
streamOperationQueue.isSuspended = false
case let .stream(event): case let .stream(event):
handleStreamEvent(event: event) handleStreamEvent(event: event)
case let .complete(event): case let .complete(event):
if let error = event.error { if let error = event.error {
delegate?.errorOccured(source: self, error: error) delegate?.errorOccured(source: self, error: error)
} else { } else {
addCompletionOperation { [weak self] in delegate?.endOfFileOccured(source: self)
guard let self = self else { return }
self.delegate?.endOfFileOccured(source: self)
}
} }
} }
} }
@@ -198,26 +187,23 @@ public class RemoteAudioSource: AudioStreamSource {
switch event { switch event {
case let .success(value): case let .success(value):
if let audioData = value.data { if let audioData = value.data {
addStreamOperation { [weak self] in if shouldTryParsingIcycastHeaders {
guard let self = self else { return } let (header, extractedAudio) = icycastHeadersProcessor.proccess(data: audioData)
if self.shouldTryParsingIcycastHeaders { if let header = header {
let (header, extractedAudio) = self.icycastHeadersProcessor.proccess(data: audioData) shouldTryParsingIcycastHeaders = false
if let header = header { let parser = IcycastHeaderParser()
self.shouldTryParsingIcycastHeaders = false parsedHeaderOutput = parser.parse(input: header)
let parser = IcycastHeaderParser() if let metadataStep = parsedHeaderOutput?.metadataStep {
self.parsedHeaderOutput = parser.parse(input: header) metadataStreamProcessor.metadataAvailable(step: metadataStep)
if let metadataStep = self.parsedHeaderOutput?.metadataStep {
self.metadataStreamProcessor.metadataAvailable(step: metadataStep)
}
let audioCount = self.processAudio(data: extractedAudio)
self.relativePosition += audioCount
return
} }
let audioCount = processAudio(data: extractedAudio)
relativePosition += audioCount
return
} }
let audioCount = self.processAudio(data: audioData)
self.relativePosition += audioCount
} }
let audioCount = processAudio(data: audioData)
relativePosition += audioCount
} }
case .failure: case .failure:
if !netStatusService.isConnected { if !netStatusService.isConnected {
@@ -280,7 +266,7 @@ public class RemoteAudioSource: AudioStreamSource {
var urlRequest = URLRequest(url: url) var urlRequest = URLRequest(url: url)
urlRequest.networkServiceType = .avStreaming urlRequest.networkServiceType = .avStreaming
urlRequest.cachePolicy = .reloadIgnoringLocalCacheData urlRequest.cachePolicy = .reloadIgnoringLocalCacheData
urlRequest.timeoutInterval = 60 urlRequest.timeoutInterval = 240
for header in additionalRequestHeaders { for header in additionalRequestHeaders {
urlRequest.addValue(header.value, forHTTPHeaderField: header.key) urlRequest.addValue(header.value, forHTTPHeaderField: header.key)
@@ -301,25 +287,6 @@ public class RemoteAudioSource: AudioStreamSource {
self.seek(at: self.position) self.seek(at: self.position)
} }
} }
// MARK: - Network Stream Operation Queue
/// Schedules the given block on the stream operation queue
///
/// - Parameter block: A closure to be executed
private func addStreamOperation(_ block: @escaping () -> Void) {
let operation = BlockOperation(block: block)
streamOperationQueue.addOperation(operation)
}
/// Schedules the given block on the stream operation queue as a completion
///
/// - Parameter block: A closure to be executed
private func addCompletionOperation(_ block: @escaping () -> Void) {
let operation = BlockOperation(block: block)
operation.queuePriority = .veryLow
streamOperationQueue.addOperation(operation)
}
} }
extension RemoteAudioSource: MetadataStreamSourceDelegate { extension RemoteAudioSource: MetadataStreamSourceDelegate {
@@ -535,10 +535,11 @@ open class AudioPlayer {
/// This calls `processSource` method every `500 ms` /// This calls `processSource` method every `500 ms`
private func startReadProcessFromSourceIfNeeded() { private func startReadProcessFromSourceIfNeeded() {
guard audioReadSource.state != .activated else { return } guard audioReadSource.state != .activated else { return }
audioReadSource.add { [weak self] in // TODO: this might be needed after all...
self?.processSource() // audioReadSource.add { [weak self] in
} // self?.processSource()
audioReadSource.activate() // }
// audioReadSource.activate()
} }
/// Stops and removes the handler from the timer, @see `audioReadSource` /// Stops and removes the handler from the timer, @see `audioReadSource`