Serialize mixer output frame ordering via AsyncStream.

Replace unstructured Task {} in MediaMixerOutput and StreamOutput
conformances with AsyncStream channels that preserve FIFO ordering.

The previous pattern created a new Task for each audio/video callback,
which provides no ordering guarantee when entering an actor's serial
executor. This caused adjacent frames to arrive out of order, resulting
in RTMPTimestamp.invalidSequence errors (silent frame drops) and
AVAssetWriter failures in StreamRecorder.
This commit is contained in:
William Porter
2026-03-09 22:15:11 +10:00
parent fe04d3ec6f
commit ea4d6b20a6
3 changed files with 98 additions and 13 deletions
+25 -8
View File
@@ -126,6 +126,8 @@ public actor StreamRecorder {
private var audioPresentationTime: CMTime = .zero
private var videoPresentationTime: CMTime = .zero
private var dimensions: CMVideoDimensions = .init(width: 0, height: 0)
nonisolated(unsafe) private var inputContinuation: AsyncStream<CMSampleBuffer>.Continuation?
private var inputConsumerTask: Task<Void, Never>?
/// Creates a new recorder.
public init() {
@@ -191,6 +193,7 @@ public actor StreamRecorder {
videoPresentationTime = .zero
audioPresentationTime = .zero
self.settings = settings
startInputConsumer()
isRecording = true
}
@@ -215,6 +218,7 @@ public actor StreamRecorder {
throw Error.invalidState
}
defer {
stopInputConsumer()
isRecording = false
continuation = nil
self.writer = nil
@@ -254,6 +258,23 @@ public actor StreamRecorder {
return url.pathExtension.isEmpty ? url.appendingPathComponent(UUID().uuidString).appendingPathExtension(Self.defaultPathExtension) : url
}
private func startInputConsumer() {
let (stream, continuation) = AsyncStream.makeStream(of: CMSampleBuffer.self)
inputContinuation = continuation
inputConsumerTask = Task {
for await sampleBuffer in stream {
append(sampleBuffer)
}
}
}
private func stopInputConsumer() {
inputContinuation?.finish()
inputContinuation = nil
inputConsumerTask?.cancel()
inputConsumerTask = nil
}
private func append(_ sampleBuffer: CMSampleBuffer) {
guard isRecording else {
return
@@ -352,31 +373,27 @@ public actor StreamRecorder {
extension StreamRecorder: StreamOutput {
// MARK: HKStreamOutput
nonisolated public func stream(_ stream: some StreamConvertible, didOutput video: CMSampleBuffer) {
Task { await append(video) }
inputContinuation?.yield(video)
}
nonisolated public func stream(_ stream: some StreamConvertible, didOutput audio: AVAudioBuffer, when: AVAudioTime) {
guard let sampleBuffer = (audio as? AVAudioPCMBuffer)?.makeSampleBuffer(when) else {
return
}
Task { await append(sampleBuffer) }
inputContinuation?.yield(sampleBuffer)
}
}
extension StreamRecorder: MediaMixerOutput {
// MARK: MediaMixerOutput
nonisolated public func mixer(_ mixer: MediaMixer, didOutput sampleBuffer: CMSampleBuffer) {
Task {
await append(sampleBuffer)
}
inputContinuation?.yield(sampleBuffer)
}
nonisolated public func mixer(_ mixer: MediaMixer, didOutput buffer: AVAudioPCMBuffer, when: AVAudioTime) {
guard let sampleBuffer = buffer.makeSampleBuffer(when) else {
return
}
Task {
await append(sampleBuffer)
}
inputContinuation?.yield(sampleBuffer)
}
}
+35 -2
View File
@@ -224,6 +224,8 @@ public actor RTMPStream {
private var expectedResponse: Code?
package var bitRateStrategy: (any StreamBitRateStrategy)?
private var statusContinuation: AsyncStream<RTMPStatus>.Continuation?
nonisolated(unsafe) private var mixerAudioContinuation: AsyncStream<(AVAudioPCMBuffer, AVAudioTime)>.Continuation?
nonisolated(unsafe) private var mixerVideoContinuation: AsyncStream<CMSampleBuffer>.Continuation?
private(set) var id: UInt32 = RTMPStream.defaultID
package lazy var incoming = IncomingStream(self)
package lazy var outgoing = OutgoingStream()
@@ -275,6 +277,7 @@ public actor RTMPStream {
self.fcPublishName = fcPublishName
self.requestTimeout = connection.requestTimeout
Task {
await self.startMixerInputConsumers()
await connection.addStream(self)
if await connection.connected {
await createStream()
@@ -283,6 +286,8 @@ public actor RTMPStream {
}
deinit {
mixerAudioContinuation?.finish()
mixerVideoContinuation?.finish()
outputs.removeAll()
}
@@ -388,6 +393,8 @@ public actor RTMPStream {
readyState = .publishing
try? send("@setDataFrame", arguments: "onMetaData", metadata)
outgoing.startRunning()
stopMixerInputConsumers()
startMixerInputConsumers()
Task {
for await audio in outgoing.audioOutputStream {
append(audio.0, when: audio.1)
@@ -415,6 +422,8 @@ public actor RTMPStream {
guard readyState == .playing || readyState == .publishing else {
throw Error.invalidState
}
stopMixerInputConsumers()
startMixerInputConsumers()
outgoing.stopRunning()
return try await withCheckedThrowingContinuation { continutation in
self.continuation = continutation
@@ -683,6 +692,30 @@ public actor RTMPStream {
}
}
private func startMixerInputConsumers() {
let (audioStream, audioContinuation) = AsyncStream.makeStream(of: (AVAudioPCMBuffer, AVAudioTime).self)
let (videoStream, videoContinuation) = AsyncStream.makeStream(of: CMSampleBuffer.self)
mixerAudioContinuation = audioContinuation
mixerVideoContinuation = videoContinuation
Task {
for await (buffer, when) in audioStream {
append(buffer, when: when)
}
}
Task {
for await sampleBuffer in videoStream {
append(sampleBuffer)
}
}
}
private func stopMixerInputConsumers() {
mixerAudioContinuation?.finish()
mixerAudioContinuation = nil
mixerVideoContinuation?.finish()
mixerVideoContinuation = nil
}
/// Creates flv metadata for a stream.
private func makeMetadata() -> AMFArray {
// https://github.com/shogo4405/HaishinKit.swift/issues/1410
@@ -806,10 +839,10 @@ extension RTMPStream: MediaMixerOutput {
}
nonisolated public func mixer(_ mixer: MediaMixer, didOutput sampleBuffer: CMSampleBuffer) {
Task { await append(sampleBuffer) }
mixerVideoContinuation?.yield(sampleBuffer)
}
nonisolated public func mixer(_ mixer: MediaMixer, didOutput buffer: AVAudioPCMBuffer, when: AVAudioTime) {
Task { await append(buffer, when: when) }
mixerAudioContinuation?.yield((buffer, when))
}
}
+38 -3
View File
@@ -24,6 +24,8 @@ public actor SRTStream {
package lazy var incoming = IncomingStream(self)
package lazy var outgoing = OutgoingStream()
private weak var connection: SRTConnection?
nonisolated(unsafe) private var mixerAudioContinuation: AsyncStream<(AVAudioPCMBuffer, AVAudioTime)>.Continuation?
nonisolated(unsafe) private var mixerVideoContinuation: AsyncStream<CMSampleBuffer>.Continuation?
/// The error domain codes.
public enum Error: Swift.Error {
@@ -34,10 +36,15 @@ public actor SRTStream {
/// Creates a new stream object.
public init(connection: SRTConnection) {
self.connection = connection
Task { await connection.addStream(self) }
Task {
await self.startMixerInputConsumers()
await connection.addStream(self)
}
}
deinit {
mixerAudioContinuation?.finish()
mixerVideoContinuation?.finish()
outputs.removeAll()
}
@@ -58,6 +65,8 @@ public actor SRTStream {
return
}
readyState = .publishing
stopMixerInputConsumers()
startMixerInputConsumers()
outgoing.startRunning()
if outgoing.videoInputFormat != nil {
writer.expectedMedias.insert(.video)
@@ -121,6 +130,8 @@ public actor SRTStream {
guard readyState != .idle else {
return
}
stopMixerInputConsumers()
startMixerInputConsumers()
writer.clear()
reader.clear()
outgoing.stopRunning()
@@ -139,6 +150,30 @@ public actor SRTStream {
func doInput(_ data: Data) {
_ = reader.read(data)
}
private func startMixerInputConsumers() {
let (audioStream, audioContinuation) = AsyncStream.makeStream(of: (AVAudioPCMBuffer, AVAudioTime).self)
let (videoStream, videoContinuation) = AsyncStream.makeStream(of: CMSampleBuffer.self)
mixerAudioContinuation = audioContinuation
mixerVideoContinuation = videoContinuation
Task {
for await (buffer, when) in audioStream {
append(buffer, when: when)
}
}
Task {
for await sampleBuffer in videoStream {
append(sampleBuffer)
}
}
}
private func stopMixerInputConsumers() {
mixerAudioContinuation?.finish()
mixerAudioContinuation = nil
mixerVideoContinuation?.finish()
mixerVideoContinuation = nil
}
}
extension SRTStream: _Stream {
@@ -203,10 +238,10 @@ extension SRTStream: MediaMixerOutput {
}
nonisolated public func mixer(_ mixer: MediaMixer, didOutput sampleBuffer: CMSampleBuffer) {
Task { await append(sampleBuffer) }
mixerVideoContinuation?.yield(sampleBuffer)
}
nonisolated public func mixer(_ mixer: MediaMixer, didOutput buffer: AVAudioPCMBuffer, when: AVAudioTime) {
Task { await append(buffer, when: when) }
mixerAudioContinuation?.yield((buffer, when))
}
}