[Xcode 13] Implement async/await support for publishers (no tests yet)
This commit is contained in:
committed by
Sergej Jaskiewicz
parent
f823f7b18c
commit
c911862a24
@@ -32,6 +32,7 @@ let package = Package(
|
||||
condition: .when(platforms: supportedPlatforms.except([.wasi])))
|
||||
],
|
||||
exclude: [
|
||||
"Concurrency/Publisher+Concurrency.swift.gyb",
|
||||
"Publishers/Publishers.Encode.swift.gyb",
|
||||
"Publishers/Publishers.MapKeyPath.swift.gyb",
|
||||
"Publishers/Publishers.Catch.swift.gyb"
|
||||
|
||||
@@ -2,104 +2,6 @@
|
||||
// Please remove the corresponding piece from this file if you implement something,
|
||||
// and complement this file as features are added in Apple's Combine
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
public struct AsyncPublisher<P> : AsyncSequence where P : Publisher, P.Failure == Never {
|
||||
|
||||
/// The type of element produced by this asynchronous sequence.
|
||||
public typealias Element = P.Output
|
||||
|
||||
public struct Iterator : AsyncIteratorProtocol {
|
||||
|
||||
/// Asynchronously advances to the next element and returns it, or ends the
|
||||
/// sequence if there is no next element.
|
||||
///
|
||||
/// - Returns: The next element, if it exists, or `nil` to signal the end of
|
||||
/// the sequence.
|
||||
public mutating func next() async -> P.Output?
|
||||
|
||||
public typealias Element = P.Output
|
||||
}
|
||||
|
||||
public init(_ publisher: P)
|
||||
|
||||
/// Creates the asynchronous iterator that produces elements of this
|
||||
/// asynchronous sequence.
|
||||
///
|
||||
/// - Returns: An instance of the `AsyncIterator` type used to produce
|
||||
/// elements of the asynchronous sequence.
|
||||
public func makeAsyncIterator() -> AsyncPublisher<P>.Iterator
|
||||
|
||||
/// The type of asynchronous iterator that produces elements of this
|
||||
/// asynchronous sequence.
|
||||
public typealias AsyncIterator = AsyncPublisher<P>.Iterator
|
||||
}
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
public struct AsyncThrowingPublisher<P> : AsyncSequence where P : Publisher {
|
||||
|
||||
/// The type of element produced by this asynchronous sequence.
|
||||
public typealias Element = P.Output
|
||||
|
||||
public struct Iterator : AsyncIteratorProtocol {
|
||||
|
||||
/// Asynchronously advances to the next element and returns it, or ends the
|
||||
/// sequence if there is no next element.
|
||||
///
|
||||
/// - Returns: The next element, if it exists, or `nil` to signal the end of
|
||||
/// the sequence.
|
||||
public mutating func next() async throws -> P.Output?
|
||||
|
||||
public typealias Element = P.Output
|
||||
}
|
||||
|
||||
public init(_ publisher: P)
|
||||
|
||||
/// Creates the asynchronous iterator that produces elements of this
|
||||
/// asynchronous sequence.
|
||||
///
|
||||
/// - Returns: An instance of the `AsyncIterator` type used to produce
|
||||
/// elements of the asynchronous sequence.
|
||||
public func makeAsyncIterator() -> AsyncThrowingPublisher<P>.Iterator
|
||||
|
||||
/// The type of asynchronous iterator that produces elements of this
|
||||
/// asynchronous sequence.
|
||||
public typealias AsyncIterator = AsyncThrowingPublisher<P>.Iterator
|
||||
}
|
||||
|
||||
|
||||
extension Future where Failure == Never {
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
final public var value: Output { get async }
|
||||
}
|
||||
|
||||
extension Future {
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
final public var value: Output { get async throws }
|
||||
}
|
||||
|
||||
|
||||
extension Publisher where Self.Failure == Never {
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
public var values: AsyncPublisher<Self> { get }
|
||||
}
|
||||
|
||||
extension Publisher {
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
public var values: AsyncThrowingPublisher<Self> { get }
|
||||
}
|
||||
|
||||
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
|
||||
extension Subscribers.Completion : Sendable {
|
||||
}
|
||||
|
||||
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
|
||||
extension Subscribers.Demand : Sendable {
|
||||
}
|
||||
|
||||
extension Publishers {
|
||||
|
||||
/// A publisher that receives and combines the latest elements from two publishers.
|
||||
|
||||
@@ -0,0 +1,119 @@
|
||||
//
|
||||
// Future+Concurrency.swift
|
||||
//
|
||||
//
|
||||
// Created by Sergej Jaskiewicz on 28.08.2021.
|
||||
//
|
||||
|
||||
// async/await is only available since Swift 5.5
|
||||
#if compiler(>=5.5)
|
||||
extension Future where Failure == Never {
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
public var value: Output {
|
||||
get async { // swiftlint:disable:this implicit_getter
|
||||
await ContinuationSubscriber.withUnsafeSubscription(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension Future {
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
public var value: Output {
|
||||
get async throws { // swiftlint:disable:this implicit_getter
|
||||
try await ContinuationSubscriber.withUnsafeThrowingSubscription(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
private final class ContinuationSubscriber<Input,
|
||||
UpstreamFailure: Error,
|
||||
ErrorOrNever: Error>
|
||||
: Subscriber
|
||||
{
|
||||
typealias Failure = UpstreamFailure
|
||||
|
||||
private var continuation: UnsafeContinuation<Input, ErrorOrNever>?
|
||||
private var subscription: Subscription?
|
||||
private let lock = UnfairLock.allocate()
|
||||
|
||||
private init(_ continuation: UnsafeContinuation<Input, ErrorOrNever>) {
|
||||
self.continuation = continuation
|
||||
}
|
||||
|
||||
deinit {
|
||||
lock.deallocate()
|
||||
}
|
||||
|
||||
func receive(subscription: Subscription) {
|
||||
lock.lock()
|
||||
guard self.subscription == nil else {
|
||||
lock.unlock()
|
||||
subscription.cancel()
|
||||
return
|
||||
}
|
||||
self.subscription = subscription
|
||||
lock.unlock()
|
||||
subscription.request(.max(1))
|
||||
}
|
||||
|
||||
func receive(_ input: Input) -> Subscribers.Demand {
|
||||
lock.lock()
|
||||
if let continuation = self.continuation.take() {
|
||||
lock.unlock()
|
||||
continuation.resume(returning: input)
|
||||
} else {
|
||||
lock.unlock()
|
||||
}
|
||||
return .none
|
||||
}
|
||||
|
||||
func receive(completion: Subscribers.Completion<Failure>) {
|
||||
lock.lock()
|
||||
subscription = nil
|
||||
lock.unlock()
|
||||
completion.failure.map(handleFailure)
|
||||
}
|
||||
|
||||
private func handleFailure(_ error: Failure) {
|
||||
lock.lock()
|
||||
if let continuation = self.continuation.take() {
|
||||
lock.unlock()
|
||||
continuation.resume(throwing: error as! ErrorOrNever)
|
||||
} else {
|
||||
lock.unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
extension ContinuationSubscriber where ErrorOrNever == Error {
|
||||
fileprivate static func withUnsafeThrowingSubscription<Upstream: Publisher>(
|
||||
_ upstream: Upstream
|
||||
) async throws -> Input
|
||||
where Upstream.Output == Input,
|
||||
Upstream.Failure == UpstreamFailure
|
||||
{
|
||||
try await withUnsafeThrowingContinuation { continuation in
|
||||
upstream.subscribe(ContinuationSubscriber(continuation))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
extension ContinuationSubscriber where UpstreamFailure == Never, ErrorOrNever == Never {
|
||||
fileprivate static func withUnsafeSubscription<Upstream: Publisher>(
|
||||
_ upstream: Upstream
|
||||
) async -> Input
|
||||
where Upstream.Output == Input,
|
||||
Upstream.Failure == Never
|
||||
{
|
||||
await withUnsafeContinuation { continuation in
|
||||
upstream.subscribe(ContinuationSubscriber(continuation))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif // compiler(>=5.5)
|
||||
@@ -0,0 +1,327 @@
|
||||
// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
|
||||
// ┃ ┃
|
||||
// ┃ Auto-generated from GYB template. DO NOT EDIT! ┃
|
||||
// ┃ ┃
|
||||
// ┃ ┃
|
||||
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
|
||||
//
|
||||
// Publisher+Concurrency.swift
|
||||
//
|
||||
//
|
||||
// Created by Sergej Jaskiewicz on 28.08.2021.
|
||||
//
|
||||
|
||||
// async/await is only available since Swift 5.5
|
||||
#if compiler(>=5.5)
|
||||
extension Publisher where Failure == Never {
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
public var values: AsyncPublisher<Self> {
|
||||
return .init(self)
|
||||
}
|
||||
}
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
public struct AsyncPublisher<Upstream: Publisher>: AsyncSequence
|
||||
where Upstream.Failure == Never
|
||||
{
|
||||
|
||||
public typealias Element = Upstream.Output
|
||||
|
||||
public struct Iterator: AsyncIteratorProtocol {
|
||||
|
||||
public typealias Element = Upstream.Output
|
||||
|
||||
fileprivate let inner: Inner
|
||||
|
||||
public func next() async -> Element? {
|
||||
return await withTaskCancellationHandler(
|
||||
handler: { [inner] in inner.cancel() },
|
||||
operation: { [inner] in await inner.next() }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
public typealias AsyncIterator = Iterator
|
||||
|
||||
private let publisher: Upstream
|
||||
|
||||
public init(_ publisher: Upstream) {
|
||||
self.publisher = publisher
|
||||
}
|
||||
|
||||
public func makeAsyncIterator() -> Iterator {
|
||||
let inner = Iterator.Inner()
|
||||
publisher.subscribe(inner)
|
||||
return Iterator(inner: inner)
|
||||
}
|
||||
}
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
extension AsyncPublisher.Iterator {
|
||||
|
||||
// TODO: Test if it's really cancellable
|
||||
fileprivate final class Inner: Subscriber, Cancellable {
|
||||
typealias Input = Upstream.Output
|
||||
typealias Failure = Upstream.Failure
|
||||
|
||||
private enum State {
|
||||
case awaitingSubscription
|
||||
case subscribed(Subscription)
|
||||
case terminal
|
||||
}
|
||||
|
||||
private let lock = UnfairLock.allocate()
|
||||
private var pending: [UnsafeContinuation<Input?, Never>] = []
|
||||
private var state = State.awaitingSubscription
|
||||
private var pendingDemand = Subscribers.Demand.none
|
||||
|
||||
deinit {
|
||||
lock.deallocate()
|
||||
}
|
||||
|
||||
func receive(subscription: Subscription) {
|
||||
lock.lock()
|
||||
guard case .awaitingSubscription = state else {
|
||||
lock.unlock()
|
||||
subscription.cancel()
|
||||
return
|
||||
}
|
||||
state = .subscribed(subscription)
|
||||
let pendingDemand = self.pendingDemand
|
||||
self.pendingDemand = .none
|
||||
lock.unlock()
|
||||
if pendingDemand != .none {
|
||||
subscription.request(pendingDemand)
|
||||
}
|
||||
}
|
||||
|
||||
func receive(_ input: Input) -> Subscribers.Demand {
|
||||
lock.lock()
|
||||
guard case .subscribed = state else {
|
||||
let pending = self.pending.take()
|
||||
lock.unlock()
|
||||
pending.resumeAllWithNil()
|
||||
return .none
|
||||
}
|
||||
precondition(!pending.isEmpty, "Received an output without requesting demand")
|
||||
let continuation = pending.removeFirst()
|
||||
lock.unlock()
|
||||
continuation.resume(returning: input)
|
||||
return .none
|
||||
}
|
||||
|
||||
func receive(completion: Subscribers.Completion<Failure>) {
|
||||
lock.lock()
|
||||
state = .terminal
|
||||
let pending = self.pending.take()
|
||||
lock.unlock()
|
||||
pending.resumeAllWithNil()
|
||||
}
|
||||
|
||||
func cancel() {
|
||||
lock.lock()
|
||||
let pending = self.pending.take()
|
||||
guard case .subscribed(let subscription) = state else {
|
||||
state = .terminal
|
||||
lock.unlock()
|
||||
pending.resumeAllWithNil()
|
||||
return
|
||||
}
|
||||
state = .terminal
|
||||
lock.unlock()
|
||||
subscription.cancel()
|
||||
pending.resumeAllWithNil()
|
||||
}
|
||||
|
||||
fileprivate func next() async -> Input? {
|
||||
return await withUnsafeContinuation { continuation in
|
||||
lock.lock()
|
||||
switch state {
|
||||
case .awaitingSubscription:
|
||||
pending.append(continuation)
|
||||
pendingDemand += 1
|
||||
lock.unlock()
|
||||
case .subscribed(let subscription):
|
||||
pending.append(continuation)
|
||||
lock.unlock()
|
||||
subscription.request(.max(1))
|
||||
case .terminal:
|
||||
lock.unlock()
|
||||
continuation.resume(returning: nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
extension Publisher {
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
public var values: AsyncThrowingPublisher<Self> {
|
||||
return .init(self)
|
||||
}
|
||||
}
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
public struct AsyncThrowingPublisher<Upstream: Publisher>: AsyncSequence
|
||||
{
|
||||
|
||||
public typealias Element = Upstream.Output
|
||||
|
||||
public struct Iterator: AsyncIteratorProtocol {
|
||||
|
||||
public typealias Element = Upstream.Output
|
||||
|
||||
fileprivate let inner: Inner
|
||||
|
||||
public func next() async throws -> Element? {
|
||||
return try await withTaskCancellationHandler(
|
||||
handler: { [inner] in inner.cancel() },
|
||||
operation: { [inner] in try await inner.next() }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
public typealias AsyncIterator = Iterator
|
||||
|
||||
private let publisher: Upstream
|
||||
|
||||
public init(_ publisher: Upstream) {
|
||||
self.publisher = publisher
|
||||
}
|
||||
|
||||
public func makeAsyncIterator() -> Iterator {
|
||||
let inner = Iterator.Inner()
|
||||
publisher.subscribe(inner)
|
||||
return Iterator(inner: inner)
|
||||
}
|
||||
}
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
extension AsyncThrowingPublisher.Iterator {
|
||||
|
||||
// TODO: Test if it's really cancellable
|
||||
fileprivate final class Inner: Subscriber, Cancellable {
|
||||
typealias Input = Upstream.Output
|
||||
typealias Failure = Upstream.Failure
|
||||
|
||||
private enum State {
|
||||
case awaitingSubscription
|
||||
case subscribed(Subscription)
|
||||
case terminal(Error?)
|
||||
}
|
||||
|
||||
private let lock = UnfairLock.allocate()
|
||||
private var pending: [UnsafeContinuation<Input?, Error>] = []
|
||||
private var state = State.awaitingSubscription
|
||||
private var pendingDemand = Subscribers.Demand.none
|
||||
|
||||
deinit {
|
||||
lock.deallocate()
|
||||
}
|
||||
|
||||
func receive(subscription: Subscription) {
|
||||
lock.lock()
|
||||
guard case .awaitingSubscription = state else {
|
||||
lock.unlock()
|
||||
subscription.cancel()
|
||||
return
|
||||
}
|
||||
state = .subscribed(subscription)
|
||||
let pendingDemand = self.pendingDemand
|
||||
self.pendingDemand = .none
|
||||
lock.unlock()
|
||||
if pendingDemand != .none {
|
||||
subscription.request(pendingDemand)
|
||||
}
|
||||
}
|
||||
|
||||
func receive(_ input: Input) -> Subscribers.Demand {
|
||||
lock.lock()
|
||||
guard case .subscribed = state else {
|
||||
let pending = self.pending.take()
|
||||
lock.unlock()
|
||||
pending.resumeAllWithNil()
|
||||
return .none
|
||||
}
|
||||
precondition(!pending.isEmpty, "Received an output without requesting demand")
|
||||
let continuation = pending.removeFirst()
|
||||
lock.unlock()
|
||||
continuation.resume(returning: input)
|
||||
return .none
|
||||
}
|
||||
|
||||
func receive(completion: Subscribers.Completion<Failure>) {
|
||||
lock.lock()
|
||||
switch state {
|
||||
case .awaitingSubscription, .subscribed:
|
||||
if let continuation = pending.first {
|
||||
// TODO: Test that it's nil even if the publisher fails
|
||||
state = .terminal(nil)
|
||||
let remaining = pending.take().dropFirst()
|
||||
lock.unlock()
|
||||
switch completion {
|
||||
case .finished:
|
||||
continuation.resume(returning: nil)
|
||||
case .failure(let error):
|
||||
continuation.resume(throwing: error)
|
||||
}
|
||||
remaining.resumeAllWithNil()
|
||||
} else {
|
||||
state = .terminal(completion.failure)
|
||||
lock.unlock()
|
||||
}
|
||||
case .terminal:
|
||||
let pending = self.pending.take()
|
||||
lock.unlock()
|
||||
pending.resumeAllWithNil()
|
||||
}
|
||||
}
|
||||
|
||||
func cancel() {
|
||||
lock.lock()
|
||||
let pending = self.pending.take()
|
||||
guard case .subscribed(let subscription) = state else {
|
||||
state = .terminal(nil)
|
||||
lock.unlock()
|
||||
pending.resumeAllWithNil()
|
||||
return
|
||||
}
|
||||
state = .terminal(nil)
|
||||
lock.unlock()
|
||||
subscription.cancel()
|
||||
pending.resumeAllWithNil()
|
||||
}
|
||||
|
||||
fileprivate func next() async throws -> Input? {
|
||||
return try await withUnsafeThrowingContinuation { continuation in
|
||||
lock.lock()
|
||||
switch state {
|
||||
case .awaitingSubscription:
|
||||
pending.append(continuation)
|
||||
pendingDemand += 1
|
||||
lock.unlock()
|
||||
case .subscribed(let subscription):
|
||||
pending.append(continuation)
|
||||
lock.unlock()
|
||||
subscription.request(.max(1))
|
||||
case .terminal:
|
||||
lock.unlock()
|
||||
continuation.resume(returning: nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
extension Sequence {
|
||||
fileprivate func resumeAllWithNil<Output, Failure: Error>()
|
||||
where Element == UnsafeContinuation<Output?, Failure>
|
||||
{
|
||||
for continuation in self {
|
||||
continuation.resume(returning: nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif // compiler(>=5.5)
|
||||
@@ -0,0 +1,196 @@
|
||||
${template_header}
|
||||
//
|
||||
// Publisher+Concurrency.swift
|
||||
//
|
||||
//
|
||||
// Created by Sergej Jaskiewicz on 28.08.2021.
|
||||
//
|
||||
|
||||
// async/await is only available since Swift 5.5
|
||||
#if compiler(>=5.5)
|
||||
%{
|
||||
instantiations = [('AsyncPublisher', False), ('AsyncThrowingPublisher', True)]
|
||||
}%
|
||||
% for instantiation, throwing in instantiations:
|
||||
extension Publisher ${'' if throwing else 'where Failure == Never '}{
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
public var values: ${instantiation}<Self> {
|
||||
return .init(self)
|
||||
}
|
||||
}
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
public struct ${instantiation}<Upstream: Publisher>: AsyncSequence
|
||||
% if not throwing:
|
||||
where Upstream.Failure == Never
|
||||
% end
|
||||
{
|
||||
|
||||
public typealias Element = Upstream.Output
|
||||
|
||||
public struct Iterator: AsyncIteratorProtocol {
|
||||
|
||||
public typealias Element = Upstream.Output
|
||||
|
||||
fileprivate let inner: Inner
|
||||
|
||||
public func next() async ${'throws ' if throwing else ''}-> Element? {
|
||||
return ${'try ' if throwing else ''}await withTaskCancellationHandler(
|
||||
handler: { [inner] in inner.cancel() },
|
||||
operation: { [inner] in ${'try ' if throwing else ''}await inner.next() }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
public typealias AsyncIterator = Iterator
|
||||
|
||||
private let publisher: Upstream
|
||||
|
||||
public init(_ publisher: Upstream) {
|
||||
self.publisher = publisher
|
||||
}
|
||||
|
||||
public func makeAsyncIterator() -> Iterator {
|
||||
let inner = Iterator.Inner()
|
||||
publisher.subscribe(inner)
|
||||
return Iterator(inner: inner)
|
||||
}
|
||||
}
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
extension ${instantiation}.Iterator {
|
||||
|
||||
// TODO: Test if it's really cancellable
|
||||
fileprivate final class Inner: Subscriber, Cancellable {
|
||||
typealias Input = Upstream.Output
|
||||
typealias Failure = Upstream.Failure
|
||||
|
||||
private enum State {
|
||||
case awaitingSubscription
|
||||
case subscribed(Subscription)
|
||||
case terminal${'(Error?)' if throwing else ''}
|
||||
}
|
||||
|
||||
private let lock = UnfairLock.allocate()
|
||||
private var pending: [UnsafeContinuation<Input?, ${'Error' if throwing else 'Never'}>] = []
|
||||
private var state = State.awaitingSubscription
|
||||
private var pendingDemand = Subscribers.Demand.none
|
||||
|
||||
deinit {
|
||||
lock.deallocate()
|
||||
}
|
||||
|
||||
func receive(subscription: Subscription) {
|
||||
lock.lock()
|
||||
guard case .awaitingSubscription = state else {
|
||||
lock.unlock()
|
||||
subscription.cancel()
|
||||
return
|
||||
}
|
||||
state = .subscribed(subscription)
|
||||
let pendingDemand = self.pendingDemand
|
||||
self.pendingDemand = .none
|
||||
lock.unlock()
|
||||
if pendingDemand != .none {
|
||||
subscription.request(pendingDemand)
|
||||
}
|
||||
}
|
||||
|
||||
func receive(_ input: Input) -> Subscribers.Demand {
|
||||
lock.lock()
|
||||
guard case .subscribed = state else {
|
||||
let pending = self.pending.take()
|
||||
lock.unlock()
|
||||
pending.resumeAllWithNil()
|
||||
return .none
|
||||
}
|
||||
precondition(!pending.isEmpty, "Received an output without requesting demand")
|
||||
let continuation = pending.removeFirst()
|
||||
lock.unlock()
|
||||
continuation.resume(returning: input)
|
||||
return .none
|
||||
}
|
||||
|
||||
func receive(completion: Subscribers.Completion<Failure>) {
|
||||
lock.lock()
|
||||
% if throwing:
|
||||
switch state {
|
||||
case .awaitingSubscription, .subscribed:
|
||||
if let continuation = pending.first {
|
||||
// TODO: Test that it's nil even if the publisher fails
|
||||
state = .terminal(nil)
|
||||
let remaining = pending.take().dropFirst()
|
||||
lock.unlock()
|
||||
switch completion {
|
||||
case .finished:
|
||||
continuation.resume(returning: nil)
|
||||
case .failure(let error):
|
||||
continuation.resume(throwing: error)
|
||||
}
|
||||
remaining.resumeAllWithNil()
|
||||
} else {
|
||||
state = .terminal(completion.failure)
|
||||
lock.unlock()
|
||||
}
|
||||
case .terminal:
|
||||
let pending = self.pending.take()
|
||||
lock.unlock()
|
||||
pending.resumeAllWithNil()
|
||||
}
|
||||
% else:
|
||||
state = .terminal
|
||||
let pending = self.pending.take()
|
||||
lock.unlock()
|
||||
pending.resumeAllWithNil()
|
||||
% end
|
||||
}
|
||||
|
||||
func cancel() {
|
||||
lock.lock()
|
||||
let pending = self.pending.take()
|
||||
guard case .subscribed(let subscription) = state else {
|
||||
state = .terminal${'(nil)' if throwing else ''}
|
||||
lock.unlock()
|
||||
pending.resumeAllWithNil()
|
||||
return
|
||||
}
|
||||
state = .terminal${'(nil)' if throwing else ''}
|
||||
lock.unlock()
|
||||
subscription.cancel()
|
||||
pending.resumeAllWithNil()
|
||||
}
|
||||
|
||||
fileprivate func next() async ${'throws ' if throwing else ''}-> Input? {
|
||||
return ${'try ' if throwing else ''}await withUnsafe${'Throwing' if throwing else ''}Continuation { continuation in
|
||||
lock.lock()
|
||||
switch state {
|
||||
case .awaitingSubscription:
|
||||
pending.append(continuation)
|
||||
pendingDemand += 1
|
||||
lock.unlock()
|
||||
case .subscribed(let subscription):
|
||||
pending.append(continuation)
|
||||
lock.unlock()
|
||||
subscription.request(.max(1))
|
||||
case .terminal:
|
||||
lock.unlock()
|
||||
continuation.resume(returning: nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
% end
|
||||
|
||||
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
|
||||
extension Sequence {
|
||||
fileprivate func resumeAllWithNil<Output, Failure: Error>()
|
||||
where Element == UnsafeContinuation<Output?, Failure>
|
||||
{
|
||||
for continuation in self {
|
||||
continuation.resume(returning: nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif // compiler(>=5.5)
|
||||
@@ -23,6 +23,10 @@ extension Subscribers.Completion: Equatable where Failure: Equatable {}
|
||||
|
||||
extension Subscribers.Completion: Hashable where Failure: Hashable {}
|
||||
|
||||
#if compiler(>=5.5)
|
||||
extension Subscribers.Completion: Sendable {}
|
||||
#endif
|
||||
|
||||
extension Subscribers.Completion {
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case success = "success"
|
||||
@@ -70,4 +74,13 @@ extension Subscribers.Completion {
|
||||
return .failure(error)
|
||||
}
|
||||
}
|
||||
|
||||
internal var failure: Failure? {
|
||||
switch self {
|
||||
case .finished:
|
||||
return nil
|
||||
case .failure(let failure):
|
||||
return failure
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -466,3 +466,7 @@ extension Subscribers {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#if compiler(>=5.5)
|
||||
extension Subscribers.Demand: Sendable {}
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user