mirror of
https://github.com/HaishinKit/HaishinKit.swift.git
synced 2026-05-07 20:12:28 +00:00
ea4d6b20a6
Replace unstructured Task {} in MediaMixerOutput and StreamOutput
conformances with AsyncStream channels that preserve FIFO ordering.
The previous pattern created a new Task for each audio/video callback,
which provides no ordering guarantee when entering an actor's serial
executor. This caused adjacent frames to arrive out of order, resulting
in RTMPTimestamp.invalidSequence errors (silent frame drops) and
AVAssetWriter failures in StreamRecorder.
849 lines
32 KiB
Swift
849 lines
32 KiB
Swift
@preconcurrency import AVFAudio
|
|
import AVFoundation
|
|
import Combine
|
|
import HaishinKit
|
|
|
|
#if canImport(UIKit)
|
|
import UIKit
|
|
typealias View = UIView
|
|
#endif
|
|
|
|
#if canImport(AppKit) && !targetEnvironment(macCatalyst)
|
|
import AppKit
|
|
typealias View = NSView
|
|
#endif
|
|
|
|
/// An object that provides the interface to control a one-way channel over an RTMPConnection.
|
|
public actor RTMPStream {
|
|
/// The error domain code.
|
|
public enum Error: Swift.Error {
|
|
/// An invalid internal stare.
|
|
case invalidState
|
|
/// The requested operation timed out.
|
|
case requestTimedOut
|
|
/// A request fails.
|
|
case requestFailed(response: RTMPResponse)
|
|
/// An unsupported codec.
|
|
case unsupportedCodec
|
|
}
|
|
|
|
/// NetStatusEvent#info.code for NetStream
|
|
/// - seealso: https://help.adobe.com/en_US/air/reference/html/flash/events/NetStatusEvent.html#NET_STATUS
|
|
public enum Code: String {
|
|
case bufferEmpty = "NetStream.Buffer.Empty"
|
|
case bufferFlush = "NetStream.Buffer.Flush"
|
|
case bufferFull = "NetStream.Buffer.Full"
|
|
case connectClosed = "NetStream.Connect.Closed"
|
|
case connectFailed = "NetStream.Connect.Failed"
|
|
case connectRejected = "NetStream.Connect.Rejected"
|
|
case connectSuccess = "NetStream.Connect.Success"
|
|
case drmUpdateNeeded = "NetStream.DRM.UpdateNeeded"
|
|
case failed = "NetStream.Failed"
|
|
case multicastStreamReset = "NetStream.MulticastStream.Reset"
|
|
case pauseNotify = "NetStream.Pause.Notify"
|
|
case playFailed = "NetStream.Play.Failed"
|
|
case playFileStructureInvalid = "NetStream.Play.FileStructureInvalid"
|
|
case playInsufficientBW = "NetStream.Play.InsufficientBW"
|
|
case playNoSupportedTrackFound = "NetStream.Play.NoSupportedTrackFound"
|
|
case playReset = "NetStream.Play.Reset"
|
|
case playStart = "NetStream.Play.Start"
|
|
case playStop = "NetStream.Play.Stop"
|
|
case playStreamNotFound = "NetStream.Play.StreamNotFound"
|
|
case playTransition = "NetStream.Play.Transition"
|
|
case playUnpublishNotify = "NetStream.Play.UnpublishNotify"
|
|
case publishBadName = "NetStream.Publish.BadName"
|
|
case publishIdle = "NetStream.Publish.Idle"
|
|
case publishStart = "NetStream.Publish.Start"
|
|
case recordAlreadyExists = "NetStream.Record.AlreadyExists"
|
|
case recordFailed = "NetStream.Record.Failed"
|
|
case recordNoAccess = "NetStream.Record.NoAccess"
|
|
case recordStart = "NetStream.Record.Start"
|
|
case recordStop = "NetStream.Record.Stop"
|
|
case recordDiskQuotaExceeded = "NetStream.Record.DiskQuotaExceeded"
|
|
case secondScreenStart = "NetStream.SecondScreen.Start"
|
|
case secondScreenStop = "NetStream.SecondScreen.Stop"
|
|
case seekFailed = "NetStream.Seek.Failed"
|
|
case seekInvalidTime = "NetStream.Seek.InvalidTime"
|
|
case seekNotify = "NetStream.Seek.Notify"
|
|
case stepNotify = "NetStream.Step.Notify"
|
|
case unpauseNotify = "NetStream.Unpause.Notify"
|
|
case unpublishSuccess = "NetStream.Unpublish.Success"
|
|
case videoDimensionChange = "NetStream.Video.DimensionChange"
|
|
|
|
public var level: String {
|
|
switch self {
|
|
case .bufferEmpty:
|
|
return "status"
|
|
case .bufferFlush:
|
|
return "status"
|
|
case .bufferFull:
|
|
return "status"
|
|
case .connectClosed:
|
|
return "status"
|
|
case .connectFailed:
|
|
return "error"
|
|
case .connectRejected:
|
|
return "error"
|
|
case .connectSuccess:
|
|
return "status"
|
|
case .drmUpdateNeeded:
|
|
return "status"
|
|
case .failed:
|
|
return "error"
|
|
case .multicastStreamReset:
|
|
return "status"
|
|
case .pauseNotify:
|
|
return "status"
|
|
case .playFailed:
|
|
return "error"
|
|
case .playFileStructureInvalid:
|
|
return "error"
|
|
case .playInsufficientBW:
|
|
return "warning"
|
|
case .playNoSupportedTrackFound:
|
|
return "status"
|
|
case .playReset:
|
|
return "status"
|
|
case .playStart:
|
|
return "status"
|
|
case .playStop:
|
|
return "status"
|
|
case .playStreamNotFound:
|
|
return "error"
|
|
case .playTransition:
|
|
return "status"
|
|
case .playUnpublishNotify:
|
|
return "status"
|
|
case .publishBadName:
|
|
return "error"
|
|
case .publishIdle:
|
|
return "status"
|
|
case .publishStart:
|
|
return "status"
|
|
case .recordAlreadyExists:
|
|
return "status"
|
|
case .recordFailed:
|
|
return "error"
|
|
case .recordNoAccess:
|
|
return "error"
|
|
case .recordStart:
|
|
return "status"
|
|
case .recordStop:
|
|
return "status"
|
|
case .recordDiskQuotaExceeded:
|
|
return "error"
|
|
case .secondScreenStart:
|
|
return "status"
|
|
case .secondScreenStop:
|
|
return "status"
|
|
case .seekFailed:
|
|
return "error"
|
|
case .seekInvalidTime:
|
|
return "error"
|
|
case .seekNotify:
|
|
return "status"
|
|
case .stepNotify:
|
|
return "status"
|
|
case .unpauseNotify:
|
|
return "status"
|
|
case .unpublishSuccess:
|
|
return "status"
|
|
case .videoDimensionChange:
|
|
return "status"
|
|
}
|
|
}
|
|
|
|
func status(_ description: String) -> RTMPStatus {
|
|
return .init(code: rawValue, level: level, description: description)
|
|
}
|
|
}
|
|
|
|
/// The type of publish options.
|
|
public enum HowToPublish: String, Sendable {
|
|
/// Publish with server-side recording.
|
|
case record
|
|
/// Publish with server-side recording which is to append file if exists.
|
|
case append
|
|
/// Publish with server-side recording which is to append and ajust time file if exists.
|
|
case appendWithGap
|
|
/// Publish.
|
|
case live
|
|
}
|
|
|
|
static let defaultID: UInt32 = 0
|
|
static let supportedAudioCodecs: [AudioCodecSettings.Format] = [.aac, .opus]
|
|
static let supportedVideoCodecs: [VideoCodecSettings.Format] = VideoCodecSettings.Format.allCases
|
|
|
|
/// The RTMPStream metadata.
|
|
public private(set) var metadata: AMFArray = .init(count: 0)
|
|
/// The RTMPStreamInfo object whose properties contain data.
|
|
public private(set) var info = RTMPStreamInfo()
|
|
/// The object encoding (AMF). Framework supports AMF0 only.
|
|
public private(set) var objectEncoding = RTMPConnection.defaultObjectEncoding
|
|
/// The boolean value that indicates audio samples allow access or not.
|
|
public private(set) var audioSampleAccess = true
|
|
/// The boolean value that indicates video samples allow access or not.
|
|
public private(set) var videoSampleAccess = true
|
|
/// The number of video frames per seconds.
|
|
@Published public private(set) var currentFPS: UInt16 = 0
|
|
/// The ready state of stream.
|
|
@Published public private(set) var readyState: StreamReadyState = .idle
|
|
/// The stream of events you receive RTMP status events from a service.
|
|
public var status: AsyncStream<RTMPStatus> {
|
|
AsyncStream { continuation in
|
|
statusContinuation = continuation
|
|
}
|
|
}
|
|
/// The stream's name used for FMLE-compatible sequences.
|
|
public private(set) var fcPublishName: String?
|
|
|
|
public private(set) var videoTrackId: UInt8? = UInt8.max
|
|
public private(set) var audioTrackId: UInt8? = UInt8.max
|
|
|
|
private var isPaused = false
|
|
private var startedAt = Date() {
|
|
didSet {
|
|
dataTimestamps.removeAll()
|
|
}
|
|
}
|
|
package var outputs: [any StreamOutput] = []
|
|
private var frameCount: UInt16 = 0
|
|
private var audioBuffer: AVAudioCompressedBuffer?
|
|
private var howToPublish: RTMPStream.HowToPublish = .live
|
|
private var continuation: CheckedContinuation<RTMPResponse, any Swift.Error>? {
|
|
didSet {
|
|
if continuation == nil {
|
|
expectedResponse = nil
|
|
}
|
|
}
|
|
}
|
|
private var dataTimestamps: [String: Date] = .init()
|
|
private var audioTimestamp: RTMPTimestamp<AVAudioTime> = .init()
|
|
private var videoTimestamp: RTMPTimestamp<CMTime> = .init()
|
|
private var requestTimeout = RTMPConnection.defaultRequestTimeout
|
|
private var expectedResponse: Code?
|
|
package var bitRateStrategy: (any StreamBitRateStrategy)?
|
|
private var statusContinuation: AsyncStream<RTMPStatus>.Continuation?
|
|
nonisolated(unsafe) private var mixerAudioContinuation: AsyncStream<(AVAudioPCMBuffer, AVAudioTime)>.Continuation?
|
|
nonisolated(unsafe) private var mixerVideoContinuation: AsyncStream<CMSampleBuffer>.Continuation?
|
|
private(set) var id: UInt32 = RTMPStream.defaultID
|
|
package lazy var incoming = IncomingStream(self)
|
|
package lazy var outgoing = OutgoingStream()
|
|
private weak var connection: RTMPConnection?
|
|
|
|
private var audioFormat: AVAudioFormat? {
|
|
didSet {
|
|
guard audioFormat != oldValue else {
|
|
return
|
|
}
|
|
switch readyState {
|
|
case .publishing:
|
|
guard let message = RTMPAudioMessage(streamId: id, timestamp: 0, formatDescription: audioFormat?.formatDescription) else {
|
|
return
|
|
}
|
|
doOutput(oldValue == nil ? .zero : .one, chunkStreamId: .audio, message: message)
|
|
case .playing:
|
|
if let audioFormat {
|
|
audioBuffer = AVAudioCompressedBuffer(format: audioFormat, packetCapacity: 1, maximumPacketSize: 1024 * Int(audioFormat.channelCount))
|
|
} else {
|
|
audioBuffer = nil
|
|
}
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
private var videoFormat: CMFormatDescription? {
|
|
didSet {
|
|
guard videoFormat != oldValue else {
|
|
return
|
|
}
|
|
switch readyState {
|
|
case .publishing:
|
|
guard let message = RTMPVideoMessage(streamId: id, timestamp: 0, formatDescription: videoFormat) else {
|
|
return
|
|
}
|
|
doOutput(oldValue == nil ? .zero : .one, chunkStreamId: .video, message: message)
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Creates a new stream.
|
|
public init(connection: RTMPConnection, fcPublishName: String? = nil) {
|
|
self.connection = connection
|
|
self.fcPublishName = fcPublishName
|
|
self.requestTimeout = connection.requestTimeout
|
|
Task {
|
|
await self.startMixerInputConsumers()
|
|
await connection.addStream(self)
|
|
if await connection.connected {
|
|
await createStream()
|
|
}
|
|
}
|
|
}
|
|
|
|
deinit {
|
|
mixerAudioContinuation?.finish()
|
|
mixerVideoContinuation?.finish()
|
|
outputs.removeAll()
|
|
}
|
|
|
|
/// Plays a live stream from a server.
|
|
public func play(_ arguments: (any Sendable)?...) async throws -> RTMPResponse {
|
|
guard let name = arguments.first as? String else {
|
|
switch readyState {
|
|
case .playing:
|
|
info.resourceName = nil
|
|
return try await close()
|
|
default:
|
|
throw Error.invalidState
|
|
}
|
|
}
|
|
do {
|
|
audioFormat = nil
|
|
videoFormat = nil
|
|
let response = try await withCheckedThrowingContinuation { continuation in
|
|
readyState = .play
|
|
expectedResponse = Code.playStart
|
|
self.continuation = continuation
|
|
Task {
|
|
await incoming.startRunning()
|
|
try? await Task.sleep(nanoseconds: requestTimeout * 1_000_000)
|
|
self.continuation.map {
|
|
$0.resume(throwing: Error.requestTimedOut)
|
|
}
|
|
self.continuation = nil
|
|
}
|
|
doOutput(.zero, chunkStreamId: .command, message: RTMPCommandMessage(
|
|
streamId: id,
|
|
transactionId: 0,
|
|
objectEncoding: objectEncoding,
|
|
commandName: "play",
|
|
commandObject: nil,
|
|
arguments: arguments
|
|
))
|
|
}
|
|
startedAt = .init()
|
|
readyState = .playing
|
|
info.resourceName = name
|
|
return response
|
|
} catch {
|
|
Task { await incoming.stopRunning() }
|
|
outgoing.stopRunning()
|
|
readyState = .idle
|
|
throw error
|
|
}
|
|
}
|
|
|
|
/// Seeks the keyframe.
|
|
public func seek(_ offset: Double) async throws {
|
|
guard readyState == .playing else {
|
|
throw Error.invalidState
|
|
}
|
|
doOutput(.zero, chunkStreamId: .command, message: RTMPCommandMessage(
|
|
streamId: id,
|
|
transactionId: 0,
|
|
objectEncoding: objectEncoding,
|
|
commandName: "seek",
|
|
commandObject: nil,
|
|
arguments: [offset]
|
|
))
|
|
}
|
|
|
|
/// Sends streaming audio, vidoe and data message from client.
|
|
public func publish(_ name: String?, type: RTMPStream.HowToPublish = .live) async throws -> RTMPResponse {
|
|
guard let name else {
|
|
switch readyState {
|
|
case .publishing:
|
|
return try await close()
|
|
default:
|
|
throw Error.invalidState
|
|
}
|
|
}
|
|
do {
|
|
audioFormat = nil
|
|
videoFormat = nil
|
|
let response = try await withCheckedThrowingContinuation { continuation in
|
|
readyState = .publish
|
|
expectedResponse = Code.publishStart
|
|
self.continuation = continuation
|
|
Task {
|
|
try? await Task.sleep(nanoseconds: requestTimeout * 1_000_000)
|
|
self.continuation.map {
|
|
$0.resume(throwing: Error.requestTimedOut)
|
|
}
|
|
self.continuation = nil
|
|
}
|
|
doOutput(.zero, chunkStreamId: .command, message: RTMPCommandMessage(
|
|
streamId: id,
|
|
transactionId: 0,
|
|
objectEncoding: objectEncoding,
|
|
commandName: "publish",
|
|
commandObject: nil,
|
|
arguments: [name, type.rawValue]
|
|
))
|
|
}
|
|
info.resourceName = name
|
|
howToPublish = type
|
|
startedAt = .init()
|
|
metadata = makeMetadata()
|
|
readyState = .publishing
|
|
try? send("@setDataFrame", arguments: "onMetaData", metadata)
|
|
outgoing.startRunning()
|
|
stopMixerInputConsumers()
|
|
startMixerInputConsumers()
|
|
Task {
|
|
for await audio in outgoing.audioOutputStream {
|
|
append(audio.0, when: audio.1)
|
|
}
|
|
}
|
|
Task {
|
|
for await video in outgoing.videoOutputStream {
|
|
append(video)
|
|
}
|
|
}
|
|
Task {
|
|
for await video in outgoing.videoInputStream {
|
|
outgoing.append(video: video)
|
|
}
|
|
}
|
|
return response
|
|
} catch {
|
|
readyState = .idle
|
|
throw error
|
|
}
|
|
}
|
|
|
|
/// Stops playing or publishing and makes available other uses.
|
|
public func close() async throws -> RTMPResponse {
|
|
guard readyState == .playing || readyState == .publishing else {
|
|
throw Error.invalidState
|
|
}
|
|
stopMixerInputConsumers()
|
|
startMixerInputConsumers()
|
|
outgoing.stopRunning()
|
|
return try await withCheckedThrowingContinuation { continutation in
|
|
self.continuation = continutation
|
|
switch readyState {
|
|
case .playing:
|
|
expectedResponse = Code.playStop
|
|
case .publishing:
|
|
expectedResponse = Code.unpublishSuccess
|
|
default:
|
|
break
|
|
}
|
|
Task {
|
|
await incoming.stopRunning()
|
|
try? await Task.sleep(nanoseconds: requestTimeout * 1_000_000)
|
|
self.continuation.map {
|
|
$0.resume(throwing: Error.requestTimedOut)
|
|
}
|
|
self.continuation = nil
|
|
}
|
|
doOutput(.zero, chunkStreamId: .command, message: RTMPCommandMessage(
|
|
streamId: id,
|
|
transactionId: 0,
|
|
objectEncoding: objectEncoding,
|
|
commandName: "closeStream",
|
|
commandObject: nil,
|
|
arguments: []
|
|
))
|
|
readyState = .idle
|
|
}
|
|
}
|
|
|
|
/// Sends a message on a published stream to all subscribing clients.
|
|
///
|
|
/// ```
|
|
/// // To add a metadata to a live stream sent to an RTMP Service.
|
|
/// stream.send("@setDataFrame", "onMetaData", metaData)
|
|
/// // To clear a metadata that has already been set in the stream.
|
|
/// stream.send("@clearDataFrame", "onMetaData");
|
|
/// ```
|
|
///
|
|
/// - Parameters:
|
|
/// - handlerName: The message to send.
|
|
/// - arguments: Optional arguments.
|
|
/// - isResetTimestamp: A workaround option for sending timestamps as 0 in some services.
|
|
public func send(_ handlerName: String, arguments: (any Sendable)?..., isResetTimestamp: Bool = false) throws {
|
|
guard readyState == .publishing else {
|
|
throw Error.invalidState
|
|
}
|
|
if isResetTimestamp {
|
|
dataTimestamps[handlerName] = nil
|
|
}
|
|
let dataWasSent = dataTimestamps[handlerName] == nil ? false : true
|
|
let timestmap: UInt32 = dataWasSent ? UInt32((dataTimestamps[handlerName]?.timeIntervalSinceNow ?? 0) * -1000) : UInt32(startedAt.timeIntervalSinceNow * -1000)
|
|
doOutput(
|
|
dataWasSent ? RTMPChunkType.one : RTMPChunkType.zero,
|
|
chunkStreamId: .data,
|
|
message: RTMPDataMessage(
|
|
streamId: id,
|
|
objectEncoding: objectEncoding,
|
|
timestamp: timestmap,
|
|
handlerName: handlerName,
|
|
arguments: arguments
|
|
)
|
|
)
|
|
dataTimestamps[handlerName] = .init()
|
|
}
|
|
|
|
/// Incoming audio plays on a stream or not.
|
|
public func receiveAudio(_ receiveAudio: Bool) async throws {
|
|
guard readyState == .playing else {
|
|
throw Error.invalidState
|
|
}
|
|
doOutput(.zero, chunkStreamId: .command, message: RTMPCommandMessage(
|
|
streamId: id,
|
|
transactionId: 0,
|
|
objectEncoding: objectEncoding,
|
|
commandName: "receiveAudio",
|
|
commandObject: nil,
|
|
arguments: [receiveAudio]
|
|
))
|
|
}
|
|
|
|
/// Incoming video plays on a stream or not.
|
|
public func receiveVideo(_ receiveVideo: Bool) async throws {
|
|
guard readyState == .playing else {
|
|
throw Error.invalidState
|
|
}
|
|
doOutput(.zero, chunkStreamId: .command, message: RTMPCommandMessage(
|
|
streamId: id,
|
|
transactionId: 0,
|
|
objectEncoding: objectEncoding,
|
|
commandName: "receiveVideo",
|
|
commandObject: nil,
|
|
arguments: [receiveVideo]
|
|
))
|
|
}
|
|
|
|
/// Pauses playback a stream or not.
|
|
public func pause(_ paused: Bool) async throws -> RTMPResponse {
|
|
guard readyState == .playing else {
|
|
throw Error.invalidState
|
|
}
|
|
let response = try await withCheckedThrowingContinuation { continuation in
|
|
expectedResponse = isPaused ? Code.pauseNotify : Code.unpauseNotify
|
|
self.continuation = continuation
|
|
Task {
|
|
try? await Task.sleep(nanoseconds: requestTimeout * 1_000_000)
|
|
self.continuation.map {
|
|
$0.resume(throwing: Error.requestTimedOut)
|
|
}
|
|
self.continuation = nil
|
|
}
|
|
doOutput(.zero, chunkStreamId: .command, message: RTMPCommandMessage(
|
|
streamId: id,
|
|
transactionId: 0,
|
|
objectEncoding: objectEncoding,
|
|
commandName: "pause",
|
|
commandObject: nil,
|
|
arguments: [paused, floor(startedAt.timeIntervalSinceNow * -1000)]
|
|
))
|
|
}
|
|
isPaused = paused
|
|
return response
|
|
}
|
|
|
|
/// Pauses or resumes playback of a stream.
|
|
public func togglePause() async throws -> RTMPResponse {
|
|
try await pause(!isPaused)
|
|
}
|
|
|
|
func doOutput(_ type: RTMPChunkType, chunkStreamId: RTMPChunkStreamId, message: some RTMPMessage) {
|
|
Task {
|
|
let length = await connection?.doOutput(type, chunkStreamId: chunkStreamId, message: message) ?? 0
|
|
info.byteCount += length
|
|
}
|
|
}
|
|
|
|
func dispatch(_ message: some RTMPMessage, type: RTMPChunkType) {
|
|
info.byteCount += message.payload.count
|
|
switch message {
|
|
case let message as RTMPCommandMessage:
|
|
let response = RTMPResponse(message)
|
|
switch message.commandName {
|
|
case "onStatus":
|
|
switch response.status?.level {
|
|
case "status":
|
|
// During playback, only NetStream.Play.Start is awaited, as it follows the next sequence.
|
|
// 1. NetStream.Play.Rest
|
|
// 2. NetStream.Play.Start
|
|
if let code = response.status?.code, expectedResponse?.rawValue == code {
|
|
continuation?.resume(returning: response)
|
|
continuation = nil
|
|
}
|
|
default:
|
|
continuation?.resume(throwing: Error.requestFailed(response: response))
|
|
continuation = nil
|
|
}
|
|
_ = response.status.map {
|
|
statusContinuation?.yield($0)
|
|
}
|
|
default:
|
|
logger.info(message)
|
|
}
|
|
case let message as RTMPAudioMessage:
|
|
append(message, type: type)
|
|
case let message as RTMPVideoMessage:
|
|
append(message, type: type)
|
|
case let message as RTMPDataMessage:
|
|
switch message.handlerName {
|
|
case "onMetaData":
|
|
metadata = message.arguments[0] as? AMFArray ?? .init(count: 0)
|
|
case "|RtmpSampleAccess":
|
|
audioSampleAccess = message.arguments[0] as? Bool ?? true
|
|
videoSampleAccess = message.arguments[1] as? Bool ?? true
|
|
default:
|
|
break
|
|
}
|
|
case let message as RTMPUserControlMessage:
|
|
switch message.event {
|
|
case .bufferEmpty:
|
|
statusContinuation?.yield(Code.bufferEmpty.status(""))
|
|
case .bufferFull:
|
|
statusContinuation?.yield(Code.bufferFull.status(""))
|
|
default:
|
|
break
|
|
}
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
|
|
func createStream() async {
|
|
if let fcPublishName {
|
|
// FMLE-compatible sequences
|
|
async let _ = connection?.call("releaseStream", arguments: fcPublishName)
|
|
async let _ = connection?.call("FCPublish", arguments: fcPublishName)
|
|
}
|
|
do {
|
|
let response = try await connection?.call("createStream")
|
|
guard let first = response?.arguments.first as? Double else {
|
|
return
|
|
}
|
|
id = UInt32(first)
|
|
readyState = .idle
|
|
} catch {
|
|
logger.error(error)
|
|
}
|
|
}
|
|
|
|
func deleteStream() async {
|
|
guard let fcPublishName, readyState == .publishing else {
|
|
return
|
|
}
|
|
outgoing.stopRunning()
|
|
async let _ = try? connection?.call("FCUnpublish", arguments: fcPublishName)
|
|
async let _ = try? connection?.call("deleteStream", arguments: id)
|
|
}
|
|
|
|
private func append(_ message: RTMPAudioMessage, type: RTMPChunkType) {
|
|
audioTimestamp.update(message, chunkType: type)
|
|
guard message.codec.isSupported else {
|
|
return
|
|
}
|
|
switch message.payload[1] {
|
|
case RTMPAACPacketType.seq.rawValue:
|
|
audioFormat = message.makeAudioFormat()
|
|
case RTMPAACPacketType.raw.rawValue:
|
|
if audioFormat == nil {
|
|
audioFormat = message.makeAudioFormat()
|
|
}
|
|
if let audioBuffer {
|
|
message.copyMemory(audioBuffer)
|
|
Task { await incoming.append(audioBuffer, when: audioTimestamp.value) }
|
|
}
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
|
|
private func append(_ message: RTMPVideoMessage, type: RTMPChunkType) {
|
|
videoTimestamp.update(message, chunkType: type)
|
|
guard RTMPTagType.video.headerSize <= message.payload.count && message.isSupported else {
|
|
return
|
|
}
|
|
if message.isExHeader {
|
|
// IsExHeader for Enhancing RTMP, FLV
|
|
switch message.packetType {
|
|
case RTMPVideoPacketType.sequenceStart.rawValue:
|
|
videoFormat = message.makeFormatDescription()
|
|
case RTMPVideoPacketType.codedFrames.rawValue:
|
|
Task { await incoming.append(message, presentationTimeStamp: videoTimestamp.value, formatDesciption: videoFormat) }
|
|
case RTMPVideoPacketType.codedFramesX.rawValue:
|
|
Task { await incoming.append(message, presentationTimeStamp: videoTimestamp.value, formatDesciption: videoFormat) }
|
|
default:
|
|
break
|
|
}
|
|
} else {
|
|
switch message.packetType {
|
|
case RTMPAVCPacketType.seq.rawValue:
|
|
videoFormat = message.makeFormatDescription()
|
|
case RTMPAVCPacketType.nal.rawValue:
|
|
Task { await incoming.append(message, presentationTimeStamp: videoTimestamp.value, formatDesciption: videoFormat) }
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
private func startMixerInputConsumers() {
|
|
let (audioStream, audioContinuation) = AsyncStream.makeStream(of: (AVAudioPCMBuffer, AVAudioTime).self)
|
|
let (videoStream, videoContinuation) = AsyncStream.makeStream(of: CMSampleBuffer.self)
|
|
mixerAudioContinuation = audioContinuation
|
|
mixerVideoContinuation = videoContinuation
|
|
Task {
|
|
for await (buffer, when) in audioStream {
|
|
append(buffer, when: when)
|
|
}
|
|
}
|
|
Task {
|
|
for await sampleBuffer in videoStream {
|
|
append(sampleBuffer)
|
|
}
|
|
}
|
|
}
|
|
|
|
private func stopMixerInputConsumers() {
|
|
mixerAudioContinuation?.finish()
|
|
mixerAudioContinuation = nil
|
|
mixerVideoContinuation?.finish()
|
|
mixerVideoContinuation = nil
|
|
}
|
|
|
|
/// Creates flv metadata for a stream.
|
|
private func makeMetadata() -> AMFArray {
|
|
// https://github.com/shogo4405/HaishinKit.swift/issues/1410
|
|
var metadata: AMFObject = ["duration": 0]
|
|
if outgoing.videoInputFormat != nil {
|
|
metadata["width"] = outgoing.videoSettings.videoSize.width
|
|
metadata["height"] = outgoing.videoSettings.videoSize.height
|
|
metadata["videocodecid"] = outgoing.videoSettings.format.codecid
|
|
metadata["videodatarate"] = outgoing.videoSettings.bitRate / 1000
|
|
if let expectedFrameRate = outgoing.videoSettings.expectedFrameRate {
|
|
metadata["framerate"] = expectedFrameRate
|
|
}
|
|
}
|
|
if let audioFormat = outgoing.audioInputFormat?.audioStreamBasicDescription {
|
|
metadata["audiocodecid"] = outgoing.audioSettings.format.codecid
|
|
metadata["audiodatarate"] = outgoing.audioSettings.bitRate / 1000
|
|
metadata["audiosamplerate"] = outgoing.audioSettings.format.makeSampleRate(
|
|
audioFormat.mSampleRate,
|
|
output: outgoing.audioSettings.sampleRate
|
|
)
|
|
}
|
|
return AMFArray(metadata)
|
|
}
|
|
}
|
|
|
|
extension RTMPStream: _Stream {
|
|
public func setAudioSettings(_ audioSettings: AudioCodecSettings) throws {
|
|
guard Self.supportedAudioCodecs.contains(audioSettings.format) else {
|
|
throw Error.unsupportedCodec
|
|
}
|
|
outgoing.audioSettings = audioSettings
|
|
}
|
|
|
|
public func setVideoSettings(_ videoSettings: VideoCodecSettings) throws {
|
|
guard Self.supportedVideoCodecs.contains(videoSettings.format) else {
|
|
throw Error.unsupportedCodec
|
|
}
|
|
outgoing.videoSettings = videoSettings
|
|
}
|
|
|
|
public func append(_ sampleBuffer: CMSampleBuffer) {
|
|
switch sampleBuffer.formatDescription?.mediaType {
|
|
case .video:
|
|
if sampleBuffer.formatDescription?.isCompressed == true {
|
|
do {
|
|
let decodeTimeStamp = sampleBuffer.decodeTimeStamp.isValid ? sampleBuffer.decodeTimeStamp : sampleBuffer.presentationTimeStamp
|
|
let timedelta = try videoTimestamp.update(decodeTimeStamp)
|
|
frameCount += 1
|
|
videoFormat = sampleBuffer.formatDescription
|
|
guard let message = RTMPVideoMessage(streamId: id, timestamp: timedelta, sampleBuffer: sampleBuffer) else {
|
|
return
|
|
}
|
|
doOutput(.one, chunkStreamId: .video, message: message)
|
|
} catch {
|
|
logger.warn(error)
|
|
}
|
|
} else {
|
|
outgoing.append(sampleBuffer)
|
|
if sampleBuffer.formatDescription?.isCompressed == false {
|
|
outputs.forEach {
|
|
switch sampleBuffer.formatDescription?.mediaType {
|
|
case .audio:
|
|
if audioSampleAccess {
|
|
$0.stream(self, didOutput: sampleBuffer)
|
|
}
|
|
case .video:
|
|
if videoSampleAccess || ($0 is View) {
|
|
$0.stream(self, didOutput: sampleBuffer)
|
|
}
|
|
default:
|
|
$0.stream(self, didOutput: sampleBuffer)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
|
|
public func append(_ audioBuffer: AVAudioBuffer, when: AVAudioTime) {
|
|
switch audioBuffer {
|
|
case let audioBuffer as AVAudioCompressedBuffer:
|
|
do {
|
|
let timedelta = try audioTimestamp.update(when)
|
|
audioFormat = audioBuffer.format
|
|
guard let message = RTMPAudioMessage(streamId: id, timestamp: timedelta, audioBuffer: audioBuffer) else {
|
|
return
|
|
}
|
|
doOutput(.one, chunkStreamId: .audio, message: message)
|
|
} catch {
|
|
logger.warn(error)
|
|
}
|
|
default:
|
|
outgoing.append(audioBuffer, when: when)
|
|
if audioBuffer is AVAudioPCMBuffer && audioSampleAccess {
|
|
outputs.forEach { $0.stream(self, didOutput: audioBuffer, when: when) }
|
|
}
|
|
}
|
|
}
|
|
|
|
public func dispatch(_ event: NetworkMonitorEvent) async {
|
|
await bitRateStrategy?.adjustBitrate(event, stream: self)
|
|
currentFPS = frameCount
|
|
frameCount = 0
|
|
info.update()
|
|
}
|
|
}
|
|
|
|
extension RTMPStream: MediaMixerOutput {
|
|
// MARK: MediaMixerOutput
|
|
public func selectTrack(_ id: UInt8?, mediaType: CMFormatDescription.MediaType) {
|
|
switch mediaType {
|
|
case .audio:
|
|
audioTrackId = id
|
|
case .video:
|
|
videoTrackId = id
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
|
|
nonisolated public func mixer(_ mixer: MediaMixer, didOutput sampleBuffer: CMSampleBuffer) {
|
|
mixerVideoContinuation?.yield(sampleBuffer)
|
|
}
|
|
|
|
nonisolated public func mixer(_ mixer: MediaMixer, didOutput buffer: AVAudioPCMBuffer, when: AVAudioTime) {
|
|
mixerAudioContinuation?.yield((buffer, when))
|
|
}
|
|
}
|