Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 88f5a0971b |
@@ -33,7 +33,6 @@ internal final class NetworkDataStream {
|
||||
let error: Error?
|
||||
}
|
||||
|
||||
private var lock = UnfairLock()
|
||||
private var streamCallback: StreamCompletion?
|
||||
|
||||
/// The serial queue for all internal async actions.
|
||||
@@ -67,7 +66,6 @@ internal final class NetworkDataStream {
|
||||
|
||||
@discardableResult
|
||||
func responseStream(completion: @escaping StreamCompletion) -> Self {
|
||||
lock.lock(); defer { lock.unlock() }
|
||||
streamCallback = completion
|
||||
return self
|
||||
}
|
||||
@@ -81,7 +79,6 @@ internal final class NetworkDataStream {
|
||||
}
|
||||
|
||||
func cancel() {
|
||||
lock.lock(); defer { lock.unlock() }
|
||||
guard state.canBecome(.cancelled) else { return }
|
||||
state = .cancelled
|
||||
streamCallback = nil
|
||||
|
||||
@@ -44,6 +44,7 @@ public class RemoteAudioSource: AudioStreamSource {
|
||||
}
|
||||
|
||||
internal let underlyingQueue: DispatchQueue
|
||||
internal let streamOperationQueue: OperationQueue
|
||||
internal let netStatusService: NetStatusProvider
|
||||
internal var waitingForNetwork = false
|
||||
internal let retrierTimeout: Retrier
|
||||
@@ -66,7 +67,12 @@ public class RemoteAudioSource: AudioStreamSource {
|
||||
supportsSeek = false
|
||||
netStatusService = netStatusProvider
|
||||
self.icycastHeadersProcessor = icycastHeadersProcessor
|
||||
self.underlyingQueue = DispatchQueue(label: "remote.audio.source.queue", target: underlyingQueue)
|
||||
self.underlyingQueue = underlyingQueue
|
||||
streamOperationQueue = OperationQueue()
|
||||
streamOperationQueue.underlyingQueue = underlyingQueue
|
||||
streamOperationQueue.maxConcurrentOperationCount = 1
|
||||
streamOperationQueue.isSuspended = true
|
||||
streamOperationQueue.name = "remote.audio.source.data.stream.queue"
|
||||
retrierTimeout = retrier
|
||||
startNetworkService()
|
||||
}
|
||||
@@ -104,6 +110,7 @@ public class RemoteAudioSource: AudioStreamSource {
|
||||
func close() {
|
||||
retrierTimeout.cancel()
|
||||
netStatusService.stop()
|
||||
streamOperationQueue.cancelAllOperations()
|
||||
if let streamTask = streamRequest {
|
||||
streamTask.cancel()
|
||||
networkingClient.remove(task: streamTask)
|
||||
@@ -131,10 +138,12 @@ public class RemoteAudioSource: AudioStreamSource {
|
||||
|
||||
func suspend() {
|
||||
streamRequest?.suspend()
|
||||
streamOperationQueue.isSuspended = true
|
||||
}
|
||||
|
||||
func resume() {
|
||||
streamRequest?.resume()
|
||||
streamOperationQueue.isSuspended = false
|
||||
}
|
||||
|
||||
// MARK: Private
|
||||
@@ -156,9 +165,7 @@ public class RemoteAudioSource: AudioStreamSource {
|
||||
let request = networkingClient.stream(request: urlRequest)
|
||||
.responseStream { [weak self] event in
|
||||
guard let self = self else { return }
|
||||
self.underlyingQueue.sync {
|
||||
self.handleResponse(event: event)
|
||||
}
|
||||
self.handleResponse(event: event)
|
||||
}
|
||||
.resume()
|
||||
|
||||
@@ -172,13 +179,17 @@ public class RemoteAudioSource: AudioStreamSource {
|
||||
switch event {
|
||||
case let .response(urlResponse):
|
||||
parseResponseHeader(response: urlResponse)
|
||||
streamOperationQueue.isSuspended = false
|
||||
case let .stream(event):
|
||||
handleStreamEvent(event: event)
|
||||
case let .complete(event):
|
||||
if let error = event.error {
|
||||
delegate?.errorOccured(source: self, error: error)
|
||||
} else {
|
||||
delegate?.endOfFileOccured(source: self)
|
||||
addCompletionOperation { [weak self] in
|
||||
guard let self = self else { return }
|
||||
self.delegate?.endOfFileOccured(source: self)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -187,23 +198,26 @@ public class RemoteAudioSource: AudioStreamSource {
|
||||
switch event {
|
||||
case let .success(value):
|
||||
if let audioData = value.data {
|
||||
if shouldTryParsingIcycastHeaders {
|
||||
let (header, extractedAudio) = icycastHeadersProcessor.proccess(data: audioData)
|
||||
if let header = header {
|
||||
shouldTryParsingIcycastHeaders = false
|
||||
let parser = IcycastHeaderParser()
|
||||
parsedHeaderOutput = parser.parse(input: header)
|
||||
if let metadataStep = parsedHeaderOutput?.metadataStep {
|
||||
metadataStreamProcessor.metadataAvailable(step: metadataStep)
|
||||
}
|
||||
addStreamOperation { [weak self] in
|
||||
guard let self = self else { return }
|
||||
if self.shouldTryParsingIcycastHeaders {
|
||||
let (header, extractedAudio) = self.icycastHeadersProcessor.proccess(data: audioData)
|
||||
if let header = header {
|
||||
self.shouldTryParsingIcycastHeaders = false
|
||||
let parser = IcycastHeaderParser()
|
||||
self.parsedHeaderOutput = parser.parse(input: header)
|
||||
if let metadataStep = self.parsedHeaderOutput?.metadataStep {
|
||||
self.metadataStreamProcessor.metadataAvailable(step: metadataStep)
|
||||
}
|
||||
|
||||
let audioCount = processAudio(data: extractedAudio)
|
||||
relativePosition += audioCount
|
||||
return
|
||||
let audioCount = self.processAudio(data: extractedAudio)
|
||||
self.relativePosition += audioCount
|
||||
return
|
||||
}
|
||||
}
|
||||
let audioCount = self.processAudio(data: audioData)
|
||||
self.relativePosition += audioCount
|
||||
}
|
||||
let audioCount = processAudio(data: audioData)
|
||||
relativePosition += audioCount
|
||||
}
|
||||
case .failure:
|
||||
if !netStatusService.isConnected {
|
||||
@@ -266,7 +280,7 @@ public class RemoteAudioSource: AudioStreamSource {
|
||||
var urlRequest = URLRequest(url: url)
|
||||
urlRequest.networkServiceType = .avStreaming
|
||||
urlRequest.cachePolicy = .reloadIgnoringLocalCacheData
|
||||
urlRequest.timeoutInterval = 240
|
||||
urlRequest.timeoutInterval = 60
|
||||
|
||||
for header in additionalRequestHeaders {
|
||||
urlRequest.addValue(header.value, forHTTPHeaderField: header.key)
|
||||
@@ -287,6 +301,25 @@ public class RemoteAudioSource: AudioStreamSource {
|
||||
self.seek(at: self.position)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Network Stream Operation Queue
|
||||
|
||||
/// Schedules the given block on the stream operation queue
|
||||
///
|
||||
/// - Parameter block: A closure to be executed
|
||||
private func addStreamOperation(_ block: @escaping () -> Void) {
|
||||
let operation = BlockOperation(block: block)
|
||||
streamOperationQueue.addOperation(operation)
|
||||
}
|
||||
|
||||
/// Schedules the given block on the stream operation queue as a completion
|
||||
///
|
||||
/// - Parameter block: A closure to be executed
|
||||
private func addCompletionOperation(_ block: @escaping () -> Void) {
|
||||
let operation = BlockOperation(block: block)
|
||||
operation.queuePriority = .veryLow
|
||||
streamOperationQueue.addOperation(operation)
|
||||
}
|
||||
}
|
||||
|
||||
extension RemoteAudioSource: MetadataStreamSourceDelegate {
|
||||
|
||||
@@ -535,11 +535,10 @@ open class AudioPlayer {
|
||||
/// This calls `processSource` method every `500 ms`
|
||||
private func startReadProcessFromSourceIfNeeded() {
|
||||
guard audioReadSource.state != .activated else { return }
|
||||
// TODO: this might be needed after all...
|
||||
// audioReadSource.add { [weak self] in
|
||||
// self?.processSource()
|
||||
// }
|
||||
// audioReadSource.activate()
|
||||
audioReadSource.add { [weak self] in
|
||||
self?.processSource()
|
||||
}
|
||||
audioReadSource.activate()
|
||||
}
|
||||
|
||||
/// Stops and removes the handler from the timer, @see `audioReadSource`
|
||||
|
||||
Reference in New Issue
Block a user