Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4e8a3f0289 |
@@ -33,6 +33,7 @@ internal final class NetworkDataStream {
|
|||||||
let error: Error?
|
let error: Error?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private var lock = UnfairLock()
|
||||||
private var streamCallback: StreamCompletion?
|
private var streamCallback: StreamCompletion?
|
||||||
|
|
||||||
/// The serial queue for all internal async actions.
|
/// The serial queue for all internal async actions.
|
||||||
@@ -66,6 +67,7 @@ internal final class NetworkDataStream {
|
|||||||
|
|
||||||
@discardableResult
|
@discardableResult
|
||||||
func responseStream(completion: @escaping StreamCompletion) -> Self {
|
func responseStream(completion: @escaping StreamCompletion) -> Self {
|
||||||
|
lock.lock(); defer { lock.unlock() }
|
||||||
streamCallback = completion
|
streamCallback = completion
|
||||||
return self
|
return self
|
||||||
}
|
}
|
||||||
@@ -79,6 +81,7 @@ internal final class NetworkDataStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func cancel() {
|
func cancel() {
|
||||||
|
lock.lock(); defer { lock.unlock() }
|
||||||
guard state.canBecome(.cancelled) else { return }
|
guard state.canBecome(.cancelled) else { return }
|
||||||
state = .cancelled
|
state = .cancelled
|
||||||
streamCallback = nil
|
streamCallback = nil
|
||||||
|
|||||||
@@ -44,7 +44,6 @@ public class RemoteAudioSource: AudioStreamSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
internal let underlyingQueue: DispatchQueue
|
internal let underlyingQueue: DispatchQueue
|
||||||
internal let streamOperationQueue: OperationQueue
|
|
||||||
internal let netStatusService: NetStatusProvider
|
internal let netStatusService: NetStatusProvider
|
||||||
internal var waitingForNetwork = false
|
internal var waitingForNetwork = false
|
||||||
internal let retrierTimeout: Retrier
|
internal let retrierTimeout: Retrier
|
||||||
@@ -67,12 +66,7 @@ public class RemoteAudioSource: AudioStreamSource {
|
|||||||
supportsSeek = false
|
supportsSeek = false
|
||||||
netStatusService = netStatusProvider
|
netStatusService = netStatusProvider
|
||||||
self.icycastHeadersProcessor = icycastHeadersProcessor
|
self.icycastHeadersProcessor = icycastHeadersProcessor
|
||||||
self.underlyingQueue = underlyingQueue
|
self.underlyingQueue = DispatchQueue(label: "remote.audio.source.queue", target: underlyingQueue)
|
||||||
streamOperationQueue = OperationQueue()
|
|
||||||
streamOperationQueue.underlyingQueue = underlyingQueue
|
|
||||||
streamOperationQueue.maxConcurrentOperationCount = 1
|
|
||||||
streamOperationQueue.isSuspended = true
|
|
||||||
streamOperationQueue.name = "remote.audio.source.data.stream.queue"
|
|
||||||
retrierTimeout = retrier
|
retrierTimeout = retrier
|
||||||
startNetworkService()
|
startNetworkService()
|
||||||
}
|
}
|
||||||
@@ -110,7 +104,6 @@ public class RemoteAudioSource: AudioStreamSource {
|
|||||||
func close() {
|
func close() {
|
||||||
retrierTimeout.cancel()
|
retrierTimeout.cancel()
|
||||||
netStatusService.stop()
|
netStatusService.stop()
|
||||||
streamOperationQueue.cancelAllOperations()
|
|
||||||
if let streamTask = streamRequest {
|
if let streamTask = streamRequest {
|
||||||
streamTask.cancel()
|
streamTask.cancel()
|
||||||
networkingClient.remove(task: streamTask)
|
networkingClient.remove(task: streamTask)
|
||||||
@@ -138,12 +131,10 @@ public class RemoteAudioSource: AudioStreamSource {
|
|||||||
|
|
||||||
func suspend() {
|
func suspend() {
|
||||||
streamRequest?.suspend()
|
streamRequest?.suspend()
|
||||||
streamOperationQueue.isSuspended = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func resume() {
|
func resume() {
|
||||||
streamRequest?.resume()
|
streamRequest?.resume()
|
||||||
streamOperationQueue.isSuspended = false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: Private
|
// MARK: Private
|
||||||
@@ -165,7 +156,9 @@ public class RemoteAudioSource: AudioStreamSource {
|
|||||||
let request = networkingClient.stream(request: urlRequest)
|
let request = networkingClient.stream(request: urlRequest)
|
||||||
.responseStream { [weak self] event in
|
.responseStream { [weak self] event in
|
||||||
guard let self = self else { return }
|
guard let self = self else { return }
|
||||||
self.handleResponse(event: event)
|
self.underlyingQueue.sync {
|
||||||
|
self.handleResponse(event: event)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
.resume()
|
.resume()
|
||||||
|
|
||||||
@@ -179,17 +172,13 @@ public class RemoteAudioSource: AudioStreamSource {
|
|||||||
switch event {
|
switch event {
|
||||||
case let .response(urlResponse):
|
case let .response(urlResponse):
|
||||||
parseResponseHeader(response: urlResponse)
|
parseResponseHeader(response: urlResponse)
|
||||||
streamOperationQueue.isSuspended = false
|
|
||||||
case let .stream(event):
|
case let .stream(event):
|
||||||
handleStreamEvent(event: event)
|
handleStreamEvent(event: event)
|
||||||
case let .complete(event):
|
case let .complete(event):
|
||||||
if let error = event.error {
|
if let error = event.error {
|
||||||
delegate?.errorOccured(source: self, error: error)
|
delegate?.errorOccured(source: self, error: error)
|
||||||
} else {
|
} else {
|
||||||
addCompletionOperation { [weak self] in
|
delegate?.endOfFileOccured(source: self)
|
||||||
guard let self = self else { return }
|
|
||||||
self.delegate?.endOfFileOccured(source: self)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -198,26 +187,23 @@ public class RemoteAudioSource: AudioStreamSource {
|
|||||||
switch event {
|
switch event {
|
||||||
case let .success(value):
|
case let .success(value):
|
||||||
if let audioData = value.data {
|
if let audioData = value.data {
|
||||||
addStreamOperation { [weak self] in
|
if shouldTryParsingIcycastHeaders {
|
||||||
guard let self = self else { return }
|
let (header, extractedAudio) = icycastHeadersProcessor.proccess(data: audioData)
|
||||||
if self.shouldTryParsingIcycastHeaders {
|
if let header = header {
|
||||||
let (header, extractedAudio) = self.icycastHeadersProcessor.proccess(data: audioData)
|
shouldTryParsingIcycastHeaders = false
|
||||||
if let header = header {
|
let parser = IcycastHeaderParser()
|
||||||
self.shouldTryParsingIcycastHeaders = false
|
parsedHeaderOutput = parser.parse(input: header)
|
||||||
let parser = IcycastHeaderParser()
|
if let metadataStep = parsedHeaderOutput?.metadataStep {
|
||||||
self.parsedHeaderOutput = parser.parse(input: header)
|
metadataStreamProcessor.metadataAvailable(step: metadataStep)
|
||||||
if let metadataStep = self.parsedHeaderOutput?.metadataStep {
|
|
||||||
self.metadataStreamProcessor.metadataAvailable(step: metadataStep)
|
|
||||||
}
|
|
||||||
|
|
||||||
let audioCount = self.processAudio(data: extractedAudio)
|
|
||||||
self.relativePosition += audioCount
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let audioCount = processAudio(data: extractedAudio)
|
||||||
|
relativePosition += audioCount
|
||||||
|
return
|
||||||
}
|
}
|
||||||
let audioCount = self.processAudio(data: audioData)
|
|
||||||
self.relativePosition += audioCount
|
|
||||||
}
|
}
|
||||||
|
let audioCount = processAudio(data: audioData)
|
||||||
|
relativePosition += audioCount
|
||||||
}
|
}
|
||||||
case .failure:
|
case .failure:
|
||||||
if !netStatusService.isConnected {
|
if !netStatusService.isConnected {
|
||||||
@@ -280,7 +266,7 @@ public class RemoteAudioSource: AudioStreamSource {
|
|||||||
var urlRequest = URLRequest(url: url)
|
var urlRequest = URLRequest(url: url)
|
||||||
urlRequest.networkServiceType = .avStreaming
|
urlRequest.networkServiceType = .avStreaming
|
||||||
urlRequest.cachePolicy = .reloadIgnoringLocalCacheData
|
urlRequest.cachePolicy = .reloadIgnoringLocalCacheData
|
||||||
urlRequest.timeoutInterval = 60
|
urlRequest.timeoutInterval = 240
|
||||||
|
|
||||||
for header in additionalRequestHeaders {
|
for header in additionalRequestHeaders {
|
||||||
urlRequest.addValue(header.value, forHTTPHeaderField: header.key)
|
urlRequest.addValue(header.value, forHTTPHeaderField: header.key)
|
||||||
@@ -301,25 +287,6 @@ public class RemoteAudioSource: AudioStreamSource {
|
|||||||
self.seek(at: self.position)
|
self.seek(at: self.position)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: - Network Stream Operation Queue
|
|
||||||
|
|
||||||
/// Schedules the given block on the stream operation queue
|
|
||||||
///
|
|
||||||
/// - Parameter block: A closure to be executed
|
|
||||||
private func addStreamOperation(_ block: @escaping () -> Void) {
|
|
||||||
let operation = BlockOperation(block: block)
|
|
||||||
streamOperationQueue.addOperation(operation)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedules the given block on the stream operation queue as a completion
|
|
||||||
///
|
|
||||||
/// - Parameter block: A closure to be executed
|
|
||||||
private func addCompletionOperation(_ block: @escaping () -> Void) {
|
|
||||||
let operation = BlockOperation(block: block)
|
|
||||||
operation.queuePriority = .veryLow
|
|
||||||
streamOperationQueue.addOperation(operation)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
extension RemoteAudioSource: MetadataStreamSourceDelegate {
|
extension RemoteAudioSource: MetadataStreamSourceDelegate {
|
||||||
|
|||||||
@@ -535,10 +535,11 @@ open class AudioPlayer {
|
|||||||
/// This calls `processSource` method every `500 ms`
|
/// This calls `processSource` method every `500 ms`
|
||||||
private func startReadProcessFromSourceIfNeeded() {
|
private func startReadProcessFromSourceIfNeeded() {
|
||||||
guard audioReadSource.state != .activated else { return }
|
guard audioReadSource.state != .activated else { return }
|
||||||
audioReadSource.add { [weak self] in
|
// TODO: this might be needed after all...
|
||||||
self?.processSource()
|
// audioReadSource.add { [weak self] in
|
||||||
}
|
// self?.processSource()
|
||||||
audioReadSource.activate()
|
// }
|
||||||
|
// audioReadSource.activate()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stops and removes the handler from the timer, @see `audioReadSource`
|
/// Stops and removes the handler from the timer, @see `audioReadSource`
|
||||||
|
|||||||
Reference in New Issue
Block a user