Compare commits

..

2 Commits

Author SHA1 Message Date
dimitris-c 4e8a3f0289 experimenting with RemoteAudioSource 2021-09-29 13:21:54 +03:00
Dimitris C 30b4189778 Bumping version (#30) 2021-09-07 15:47:59 +03:00
3 changed files with 28 additions and 57 deletions
@@ -33,6 +33,7 @@ internal final class NetworkDataStream {
let error: Error?
}
private var lock = UnfairLock()
private var streamCallback: StreamCompletion?
/// The serial queue for all internal async actions.
@@ -66,6 +67,7 @@ internal final class NetworkDataStream {
@discardableResult
func responseStream(completion: @escaping StreamCompletion) -> Self {
lock.lock(); defer { lock.unlock() }
streamCallback = completion
return self
}
@@ -79,6 +81,7 @@ internal final class NetworkDataStream {
}
func cancel() {
lock.lock(); defer { lock.unlock() }
guard state.canBecome(.cancelled) else { return }
state = .cancelled
streamCallback = nil
@@ -44,7 +44,6 @@ public class RemoteAudioSource: AudioStreamSource {
}
internal let underlyingQueue: DispatchQueue
internal let streamOperationQueue: OperationQueue
internal let netStatusService: NetStatusProvider
internal var waitingForNetwork = false
internal let retrierTimeout: Retrier
@@ -67,12 +66,7 @@ public class RemoteAudioSource: AudioStreamSource {
supportsSeek = false
netStatusService = netStatusProvider
self.icycastHeadersProcessor = icycastHeadersProcessor
self.underlyingQueue = underlyingQueue
streamOperationQueue = OperationQueue()
streamOperationQueue.underlyingQueue = underlyingQueue
streamOperationQueue.maxConcurrentOperationCount = 1
streamOperationQueue.isSuspended = true
streamOperationQueue.name = "remote.audio.source.data.stream.queue"
self.underlyingQueue = DispatchQueue(label: "remote.audio.source.queue", target: underlyingQueue)
retrierTimeout = retrier
startNetworkService()
}
@@ -110,7 +104,6 @@ public class RemoteAudioSource: AudioStreamSource {
func close() {
retrierTimeout.cancel()
netStatusService.stop()
streamOperationQueue.cancelAllOperations()
if let streamTask = streamRequest {
streamTask.cancel()
networkingClient.remove(task: streamTask)
@@ -138,12 +131,10 @@ public class RemoteAudioSource: AudioStreamSource {
func suspend() {
streamRequest?.suspend()
streamOperationQueue.isSuspended = true
}
func resume() {
streamRequest?.resume()
streamOperationQueue.isSuspended = false
}
// MARK: Private
@@ -165,7 +156,9 @@ public class RemoteAudioSource: AudioStreamSource {
let request = networkingClient.stream(request: urlRequest)
.responseStream { [weak self] event in
guard let self = self else { return }
self.handleResponse(event: event)
self.underlyingQueue.sync {
self.handleResponse(event: event)
}
}
.resume()
@@ -179,17 +172,13 @@ public class RemoteAudioSource: AudioStreamSource {
switch event {
case let .response(urlResponse):
parseResponseHeader(response: urlResponse)
streamOperationQueue.isSuspended = false
case let .stream(event):
handleStreamEvent(event: event)
case let .complete(event):
if let error = event.error {
delegate?.errorOccured(source: self, error: error)
} else {
addCompletionOperation { [weak self] in
guard let self = self else { return }
self.delegate?.endOfFileOccured(source: self)
}
delegate?.endOfFileOccured(source: self)
}
}
}
@@ -198,26 +187,23 @@ public class RemoteAudioSource: AudioStreamSource {
switch event {
case let .success(value):
if let audioData = value.data {
addStreamOperation { [weak self] in
guard let self = self else { return }
if self.shouldTryParsingIcycastHeaders {
let (header, extractedAudio) = self.icycastHeadersProcessor.proccess(data: audioData)
if let header = header {
self.shouldTryParsingIcycastHeaders = false
let parser = IcycastHeaderParser()
self.parsedHeaderOutput = parser.parse(input: header)
if let metadataStep = self.parsedHeaderOutput?.metadataStep {
self.metadataStreamProcessor.metadataAvailable(step: metadataStep)
}
let audioCount = self.processAudio(data: extractedAudio)
self.relativePosition += audioCount
return
if shouldTryParsingIcycastHeaders {
let (header, extractedAudio) = icycastHeadersProcessor.proccess(data: audioData)
if let header = header {
shouldTryParsingIcycastHeaders = false
let parser = IcycastHeaderParser()
parsedHeaderOutput = parser.parse(input: header)
if let metadataStep = parsedHeaderOutput?.metadataStep {
metadataStreamProcessor.metadataAvailable(step: metadataStep)
}
let audioCount = processAudio(data: extractedAudio)
relativePosition += audioCount
return
}
let audioCount = self.processAudio(data: audioData)
self.relativePosition += audioCount
}
let audioCount = processAudio(data: audioData)
relativePosition += audioCount
}
case .failure:
if !netStatusService.isConnected {
@@ -280,7 +266,7 @@ public class RemoteAudioSource: AudioStreamSource {
var urlRequest = URLRequest(url: url)
urlRequest.networkServiceType = .avStreaming
urlRequest.cachePolicy = .reloadIgnoringLocalCacheData
urlRequest.timeoutInterval = 60
urlRequest.timeoutInterval = 240
for header in additionalRequestHeaders {
urlRequest.addValue(header.value, forHTTPHeaderField: header.key)
@@ -301,25 +287,6 @@ public class RemoteAudioSource: AudioStreamSource {
self.seek(at: self.position)
}
}
// MARK: - Network Stream Operation Queue
/// Schedules the given block on the stream operation queue
///
/// - Parameter block: A closure to be executed
private func addStreamOperation(_ block: @escaping () -> Void) {
let operation = BlockOperation(block: block)
streamOperationQueue.addOperation(operation)
}
/// Schedules the given block on the stream operation queue as a completion
///
/// - Parameter block: A closure to be executed
private func addCompletionOperation(_ block: @escaping () -> Void) {
let operation = BlockOperation(block: block)
operation.queuePriority = .veryLow
streamOperationQueue.addOperation(operation)
}
}
extension RemoteAudioSource: MetadataStreamSourceDelegate {
@@ -535,10 +535,11 @@ open class AudioPlayer {
/// This calls `processSource` method every `500 ms`
private func startReadProcessFromSourceIfNeeded() {
guard audioReadSource.state != .activated else { return }
audioReadSource.add { [weak self] in
self?.processSource()
}
audioReadSource.activate()
// TODO: this might be needed after all...
// audioReadSource.add { [weak self] in
// self?.processSource()
// }
// audioReadSource.activate()
}
/// Stops and removes the handler from the timer, @see `audioReadSource`