mirror of
https://github.com/HaishinKit/HaishinKit.swift.git
synced 2026-05-07 20:12:28 +00:00
113 lines
3.4 KiB
Swift
113 lines
3.4 KiB
Swift
import Foundation
|
|
import HaishinKit
|
|
|
|
actor RTMPSession: StreamSession {
|
|
var connected: Bool {
|
|
get async {
|
|
await connection.connected
|
|
}
|
|
}
|
|
|
|
@AsyncStreamed(.closed)
|
|
private(set) var readyState: AsyncStream<StreamSessionReadyState>
|
|
|
|
var stream: any StreamConvertible {
|
|
_stream
|
|
}
|
|
|
|
private let uri: RTMPURL
|
|
private let mode: StreamSessionMode
|
|
private var retryCount: Int = 0
|
|
private var maxRetryCount = kStreamSession_maxRetryCount
|
|
private lazy var connection: RTMPConnection = {
|
|
switch mode {
|
|
case .publish:
|
|
return RTMPConnection()
|
|
case .playback:
|
|
return RTMPConnection(flashVer: "MAC 9,0,124,2")
|
|
}
|
|
}()
|
|
private lazy var _stream: RTMPStream = {
|
|
switch mode {
|
|
case .publish:
|
|
return RTMPStream(connection: connection, fcPublishName: uri.streamName)
|
|
case .playback:
|
|
return RTMPStream(connection: connection)
|
|
}
|
|
}()
|
|
private var disconnctedTask: Task<Void, any Error>? {
|
|
didSet {
|
|
oldValue?.cancel()
|
|
}
|
|
}
|
|
|
|
init(uri: URL, mode: StreamSessionMode, configuration: (any StreamSessionConfiguration)?) {
|
|
self.uri = RTMPURL(url: uri)
|
|
self.mode = mode
|
|
}
|
|
|
|
func setMaxRetryCount(_ maxRetryCount: Int) {
|
|
self.maxRetryCount = maxRetryCount
|
|
}
|
|
|
|
func connect(_ disconnected: @Sendable @escaping () -> Void) async throws {
|
|
guard await connection.connected == false else {
|
|
return
|
|
}
|
|
_readyState.value = .connecting
|
|
disconnctedTask = nil
|
|
// Retry handling at the TCP/IP level and during RTMP connection.
|
|
do {
|
|
_ = try await connection.connect(uri.command)
|
|
} catch {
|
|
guard retryCount < maxRetryCount else {
|
|
retryCount = 0
|
|
_readyState.value = .closed
|
|
throw error
|
|
}
|
|
// It is being delayed using backoff for congestion control.
|
|
try await Task.sleep(nanoseconds: UInt64(pow(2.0, Double(retryCount))) * 1_000_000_000)
|
|
retryCount += 1
|
|
try await connect(disconnected)
|
|
}
|
|
do {
|
|
retryCount = 0
|
|
switch mode {
|
|
case .publish:
|
|
_ = try await _stream.publish(uri.streamName)
|
|
case .playback:
|
|
_ = try await _stream.play(uri.streamName)
|
|
}
|
|
_readyState.value = .open
|
|
} catch {
|
|
// Errors at the NetStream layer, such as incorrect stream names,
|
|
// cannot be resolved by retrying, so they are thrown as exceptions.
|
|
try await connection.close()
|
|
_readyState.value = .closed
|
|
throw error
|
|
}
|
|
disconnctedTask = Task {
|
|
for await event in await connection.status {
|
|
switch event.code {
|
|
case RTMPConnection.Code.connectClosed.rawValue:
|
|
_readyState.value = .closed
|
|
disconnected()
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func close() async throws {
|
|
guard await connection.connected else {
|
|
return
|
|
}
|
|
_readyState.value = .closing
|
|
disconnctedTask = nil
|
|
try await connection.close()
|
|
retryCount = 0
|
|
_readyState.value = .closed
|
|
}
|
|
}
|