Update for Xcode 14

This commit is contained in:
Sergej Jaskiewicz
2022-11-27 16:30:40 +01:00
committed by Sergej Jaskiewicz
parent ff31c43375
commit 8f8ef5057d
32 changed files with 1077 additions and 368 deletions
+2 -11
View File
@@ -15,14 +15,5 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Run tests against Apple's Combine
# Attempt to run compatibility tests on macOS.
# If they fail, run on iOS.
run: |
make test-compatibility \
|| (set -o pipefail \
&& xcodebuild test \
-scheme OpenCombine-Package \
-destination "name=iPhone 13" \
-xcconfig Combine-Compatibility.xcconfig \
| tee xcodebuild_test.log \
| xcpretty)
run: make test-compatibility
+6 -2
View File
@@ -77,7 +77,7 @@ jobs:
matrix:
include:
- os: macos-10.15
xcode-version: "11.3.1" # Swift 5.3.1
xcode-version: "11.3.1" # Swift 5.1.3
- os: macos-10.15
xcode-version: "11.7" # Swift 5.2.4
- os: macos-11
@@ -86,6 +86,10 @@ jobs:
xcode-version: "12.5.1" # Swift 5.4.2
- os: macos-11
xcode-version: "13.2.1" # Swift 5.5.2
- os: macos-12
xcode-version: "13.4.1" # Swift 5.6.1
- os: macos-12
xcode-version: "14.2" # Swift 5.7.2
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
@@ -107,7 +111,7 @@ jobs:
.build-test-debug/debug/OpenCombinePackageTests.xctest/Contents/MacOS/OpenCombinePackageTests \
> coverage.txt
- name: Build and run tests in debug mode with TSan
if: ${{ matrix.xcode-version != '13.2.1' }} # https://bugs.swift.org/browse/SR-15444
if: ${{ matrix.xcode-version != '13.2.1' && matrix.xcode-version != '13.4.1' }} # https://bugs.swift.org/browse/SR-15444
run: |
swift test \
-c debug \
+1 -1
View File
@@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: false
matrix:
swift_version: ["5.0", "5.1", "5.2", "5.3", "5.4", "5.5"]
swift_version: ["5.0", "5.1", "5.2", "5.3", "5.4", "5.5", "5.6", "5.7"]
runs-on: ubuntu-latest
container: swift:${{ matrix.swift_version }}-bionic
steps:
+15
View File
@@ -31,3 +31,18 @@ jobs:
- uses: actions/checkout@v2
- uses: swiftwasm/swiftwasm-action@v5.5
carton_wasmer_test_5_6:
name: "Execute tests on Wasm (Swift 5.6)"
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
- uses: swiftwasm/swiftwasm-action@v5.6
carton_wasmer_test_5_7:
name: "Execute tests on Wasm (Swift 5.7)"
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
- uses: swiftwasm/swiftwasm-action@v5.7
+15 -4
View File
@@ -12,10 +12,21 @@ jobs:
strategy:
fail-fast: false
matrix:
swift_version: ["5.4.2", "5.5.1"]
runs-on: windows-2019
include:
- os: windows-2019
swift_version: "5.4.2"
- os: windows-2019
swift_version: "5.5.1"
- os: windows-2019
swift_version: "5.6.1"
- os: windows-2019
swift_version: "5.7.2"
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- uses: MaxDesiatov/swift-windows-action@v1
- uses: compnerd/gha-setup-swift@main
with:
swift-version: ${{ matrix.swift_version }}
branch: swift-${{ matrix.swift_version }}-release
tag: ${{ matrix.swift_version }}-RELEASE
- name: Building and running tests in debug mode
run: swift test
+1
View File
@@ -5,6 +5,7 @@ included:
child_config: Tests/.swiftlint.yml
disabled_rules:
- blanket_disable_command
- block_based_kvo
- class_delegate_protocol
- colon
@@ -0,0 +1,24 @@
//
// ConcurrencyHelpers.swift
//
//
// Created by Sergej Jaskiewicz on 14.11.2022.
//
#if canImport(_Concurrency) && compiler(>=5.5)
import _Concurrency
#endif
#if (canImport(_Concurrency) && compiler(>=5.5) || compiler(>=5.5.1)) && swift(<5.7)
/// A polyfill for pre-5.7 Swift versions.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func withTaskCancellationHandler<T>( // swiftlint:disable:this generic_type_name
operation: () async throws -> T,
onCancel handler: @Sendable () -> Void
) async rethrows -> T {
return try await withTaskCancellationHandler(
handler: handler,
operation: operation
)
}
#endif
@@ -12,6 +12,11 @@ import _Concurrency
#if canImport(_Concurrency) && compiler(>=5.5) || compiler(>=5.5.1)
extension Future where Failure == Never {
/// The published value of the future, delivered asynchronously.
///
/// This property subscribes to the `Future` and delivers the value asynchronously
/// when the `Future` publishes it. Use this property when you want to use
/// the `async`-`await` syntax with a `Future`.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public var value: Output {
get async {
@@ -22,6 +27,12 @@ extension Future where Failure == Never {
extension Future {
/// The published value of the future or an error, delivered asynchronously.
///
/// This property subscribes to the `Future` and delivers the value asynchronously
/// when the `Future` publishes it. If the `Future` terminates with an error,
/// the awaiting caller receives the error instead. Use this property when you want
/// to the `async`-`await` syntax with a `Future` whose `Failure` type is not `Never`.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public var value: Output {
get async throws {
@@ -18,12 +18,25 @@ import _Concurrency
#if canImport(_Concurrency) && compiler(>=5.5) || compiler(>=5.5.1)
extension Publisher where Failure == Never {
/// The elements produced by the publisher, as an asynchronous sequence.
///
/// This property provides an `AsyncPublisher`, which allows you to use
/// the Swift `async`-`await` syntax to receive the publisher's elements.
/// Because `AsyncPublisher` conforms to `AsyncSequence`, you iterate over its
/// elements with a `for`-`await`-`in` loop, rather than attaching a subscriber.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public var values: AsyncPublisher<Self> {
return .init(self)
}
}
/// A publisher that exposes its elements as an asynchronous sequence.
///
/// `AsyncPublisher` conforms to `AsyncSequence`, which allows callers to receive
/// values with the `for`-`await`-`in` syntax, rather than attaching a `Subscriber`.
///
/// Use the `values` property of the `Publisher` protocol to wrap an existing publisher
/// with an instance of this type.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public struct AsyncPublisher<Upstream: Publisher>: AsyncSequence
where Upstream.Failure == Never
@@ -31,28 +44,45 @@ public struct AsyncPublisher<Upstream: Publisher>: AsyncSequence
public typealias Element = Upstream.Output
/// The iterator that produces elements of the asynchronous publisher sequence.
public struct Iterator: AsyncIteratorProtocol {
public typealias Element = Upstream.Output
fileprivate let inner: Inner
/// Produces the next element in the prefix sequence.
///
/// - Returns: The next published element, or `nil` if the publisher finishes
/// normally.
public mutating func next() async -> Element? {
return await withTaskCancellationHandler(
handler: { [inner] in inner.cancel() },
operation: { [inner] in await inner.next() }
operation: { [inner] in await inner.next() },
onCancel: { [inner] in inner.cancel() }
)
}
}
/// The type of asynchronous iterator that produces elements of this
/// asynchronous sequence.
public typealias AsyncIterator = Iterator
private let publisher: Upstream
/// Creates a publisher that exposes elements received from an upstream publisher as
/// a throwing asynchronous sequence.
///
/// - Parameter publisher: An upstream publisher. The asynchronous publisher converts
/// elements received from this publisher into an asynchronous sequence.
public init(_ publisher: Upstream) {
self.publisher = publisher
}
/// Creates the asynchronous iterator that produces elements of this asynchronous
/// sequence.
///
/// - Returns: An instance of the `AsyncIterator` type used to produce elements of
/// the asynchronous sequence.
public func makeAsyncIterator() -> Iterator {
let inner = Iterator.Inner()
publisher.subscribe(inner)
@@ -158,40 +188,76 @@ extension AsyncPublisher.Iterator {
}
extension Publisher {
/// The elements produced by the publisher, as a throwing asynchronous sequence.
///
/// This property provides an `AsyncThrowingPublisher`, which allows you to use
/// the Swift `async`-`await` syntax to receive the publisher's elements.
/// Because `AsyncPublisher` conforms to `AsyncSequence`, you iterate over its
/// elements with a `for`-`await`-`in` loop, rather than attaching a subscriber.
/// If the publisher terminates with an error, the awaiting caller receives the error
/// as a `throw`.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public var values: AsyncThrowingPublisher<Self> {
return .init(self)
}
}
/// A publisher that exposes its elements as a throwing asynchronous sequence.
///
/// `AsyncThrowingPublisher` conforms to `AsyncSequence`, which allows callers to receive
/// values with the `for`-`await`-`in` syntax, rather than attaching a `Subscriber`.
/// If the upstream publisher terminates with an error, `AsyncThrowingPublisher` throws
/// the error to the awaiting caller.
///
/// Use the `values` property of the `Publisher` protocol to wrap an existing publisher
/// with an instance of this type.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public struct AsyncThrowingPublisher<Upstream: Publisher>: AsyncSequence
{
public typealias Element = Upstream.Output
/// The iterator that produces elements of the asynchronous publisher sequence.
public struct Iterator: AsyncIteratorProtocol {
public typealias Element = Upstream.Output
fileprivate let inner: Inner
/// Produces the next element in the prefix sequence.
///
/// - Returns: The next published element, or `nil` if the publisher finishes
/// normally.
/// If the publisher terminates with an error, the call point receives
/// the error as a `throw`.
public mutating func next() async throws -> Element? {
return try await withTaskCancellationHandler(
handler: { [inner] in inner.cancel() },
operation: { [inner] in try await inner.next() }
operation: { [inner] in try await inner.next() },
onCancel: { [inner] in inner.cancel() }
)
}
}
/// The type of asynchronous iterator that produces elements of this
/// asynchronous sequence.
public typealias AsyncIterator = Iterator
private let publisher: Upstream
/// Creates a publisher that exposes elements received from an upstream publisher as
/// an asynchronous sequence.
///
/// - Parameter publisher: An upstream publisher. The asynchronous publisher converts
/// elements received from this publisher into an asynchronous sequence.
public init(_ publisher: Upstream) {
self.publisher = publisher
}
/// Creates the asynchronous iterator that produces elements of this asynchronous
/// sequence.
///
/// - Returns: An instance of the `AsyncIterator` type used to produce elements of
/// the asynchronous sequence.
public func makeAsyncIterator() -> Iterator {
let inner = Iterator.Inner()
publisher.subscribe(inner)
@@ -17,12 +17,33 @@ instantiations = [('AsyncPublisher', False), ('AsyncThrowingPublisher', True)]
% for instantiation, throwing in instantiations:
extension Publisher ${'' if throwing else 'where Failure == Never '}{
/// The elements produced by the publisher, as ${'a throwing' if throwing else 'an'} asynchronous sequence.
///
/// This property provides an `${instantiation}`, which allows you to use
/// the Swift `async`-`await` syntax to receive the publisher's elements.
/// Because `AsyncPublisher` conforms to `AsyncSequence`, you iterate over its
/// elements with a `for`-`await`-`in` loop, rather than attaching a subscriber.
% if throwing:
/// If the publisher terminates with an error, the awaiting caller receives the error
/// as a `throw`.
% end
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public var values: ${instantiation}<Self> {
return .init(self)
}
}
/// A publisher that exposes its elements as ${'a throwing' if throwing else 'an'} asynchronous sequence.
///
/// `${instantiation}` conforms to `AsyncSequence`, which allows callers to receive
/// values with the `for`-`await`-`in` syntax, rather than attaching a `Subscriber`.
% if throwing:
/// If the upstream publisher terminates with an error, `${instantiation}` throws
/// the error to the awaiting caller.
% end
///
/// Use the `values` property of the `Publisher` protocol to wrap an existing publisher
/// with an instance of this type.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public struct ${instantiation}<Upstream: Publisher>: AsyncSequence
% if not throwing:
@@ -32,28 +53,53 @@ public struct ${instantiation}<Upstream: Publisher>: AsyncSequence
public typealias Element = Upstream.Output
/// The iterator that produces elements of the asynchronous publisher sequence.
public struct Iterator: AsyncIteratorProtocol {
public typealias Element = Upstream.Output
fileprivate let inner: Inner
/// Produces the next element in the prefix sequence.
///
/// - Returns: The next published element, or `nil` if the publisher finishes
/// normally.
% if throwing:
/// If the publisher terminates with an error, the call point receives
/// the error as a `throw`.
% end
public mutating func next() async ${'throws ' if throwing else ''}-> Element? {
return ${'try ' if throwing else ''}await withTaskCancellationHandler(
handler: { [inner] in inner.cancel() },
operation: { [inner] in ${'try ' if throwing else ''}await inner.next() }
operation: { [inner] in ${'try ' if throwing else ''}await inner.next() },
onCancel: { [inner] in inner.cancel() }
)
}
}
/// The type of asynchronous iterator that produces elements of this
/// asynchronous sequence.
public typealias AsyncIterator = Iterator
private let publisher: Upstream
/// Creates a publisher that exposes elements received from an upstream publisher as
% if throwing:
/// an asynchronous sequence.
% else:
/// a throwing asynchronous sequence.
% end
///
/// - Parameter publisher: An upstream publisher. The asynchronous publisher converts
/// elements received from this publisher into an asynchronous sequence.
public init(_ publisher: Upstream) {
self.publisher = publisher
}
/// Creates the asynchronous iterator that produces elements of this asynchronous
/// sequence.
///
/// - Returns: An instance of the `AsyncIterator` type used to produce elements of
/// the asynchronous sequence.
public func makeAsyncIterator() -> Iterator {
let inner = Iterator.Inner()
publisher.subscribe(inner)
@@ -1,24 +0,0 @@
//
// ConnectablePublisher.swift
//
//
// Created by Sergej Jaskiewicz on 14.06.2019.
//
/// A publisher that provides an explicit means of connecting and canceling publication.
///
/// Use a `ConnectablePublisher` when you need to perform additional configuration or
/// setup prior to producing any elements.
///
/// This publisher doesnt produce any elements until you call its `connect()` method.
///
/// Use `makeConnectable()` to create a `ConnectablePublisher` from any publisher whose
/// failure type is `Never`.
public protocol ConnectablePublisher: Publisher {
/// Connects to the publisher, allowing it to produce elements, and returns
/// an instance with which to cancel publishing.
///
/// - Returns: A `Cancellable` instance that you use to cancel publishing.
func connect() -> Cancellable
}
+3 -1
View File
@@ -153,8 +153,10 @@ extension Future {
lock.unlock()
downstreamLock.lock()
lockedFulfill(downstream: downstream, result: result)
let parent = self.parent.take()
downstreamLock.unlock()
lock.lock()
let parent = self.parent.take()
lock.unlock()
parent?.disassociate(self)
}
+426
View File
@@ -0,0 +1,426 @@
//
//
// Auto-generated from GYB template. DO NOT EDIT!
//
//
//
//
// RootProtocols.swift
// OpenCombine
//
// Created by Sergej Jaskiewicz on 10.06.2019.
//
#if compiler(>=5.7)
/// Declares that a type can transmit a sequence of values over time.
///
/// A publisher delivers elements to one or more `Subscriber` instances.
/// The subscribers `Input` and `Failure` associated types must match the `Output` and
/// `Failure` types declared by the publisher.
/// The publisher implements the `receive(subscriber:)`method to accept a subscriber.
///
/// After this, the publisher can call the following methods on the subscriber:
/// - `receive(subscription:)`: Acknowledges the subscribe request and returns
/// a `Subscription` instance. The subscriber uses the subscription to demand elements
/// from the publisher and can use it to cancel publishing.
/// - `receive(_:)`: Delivers one element from the publisher to the subscriber.
/// - `receive(completion:)`: Informs the subscriber that publishing has ended,
/// either normally or with an error.
///
/// Every `Publisher` must adhere to this contract for downstream subscribers to function
/// correctly.
///
/// Extensions on `Publisher` define a wide variety of _operators_ that you compose to
/// create sophisticated event-processing chains.
/// Each operator returns a type that implements the `Publisher` protocol
/// Most of these types exist as extensions on the `Publishers` enumeration.
/// For example, the `map(_:)` operator returns an instance of `Publishers.Map`.
///
/// # Creating Your Own Publishers
///
/// Rather than implementing the `Publisher` protocol yourself, you can create your own
/// publisher by using one of several types provided by the OpenCombine framework:
///
/// - Use a concrete subclass of `Subject`, such as `PassthroughSubject`, to publish
/// values on-demand by calling its `send(_:)` method.
/// - Use a `CurrentValueSubject` to publish whenever you update the subjects underlying
/// value.
/// - Add the `@Published` annotation to a property of one of your own types. In doing so,
/// the property gains a publisher that emits an event whenever the propertys value
/// changes. See the `Published` type for an example of this approach.
public protocol Publisher<Output, Failure> {
/// The kind of values published by this publisher.
associatedtype Output
/// The kind of errors this publisher might publish.
///
/// Use `Never` if this `Publisher` does not publish errors.
associatedtype Failure: Error
/// Attaches the specified subscriber to this publisher.
///
/// Always call this function instead of `receive(subscriber:)`.
/// Adopters of `Publisher` must implement `receive(subscriber:)`. The implementation
/// of `subscribe(_:)` provided by `Publisher` calls through to
/// `receive(subscriber:)`.
///
/// - Parameter subscriber: The subscriber to attach to this publisher. After
/// attaching, the subscriber can start to receive values.
func receive<Subscriber: OpenCombine.Subscriber>(subscriber: Subscriber)
where Failure == Subscriber.Failure, Output == Subscriber.Input
}
/// A publisher that exposes a method for outside callers to publish elements.
///
/// A subject is a publisher that you can use to inject values into a stream, by calling
/// its `send()` method. This can be useful for adapting existing imperative code to the
/// Combine model.
public protocol Subject<Output, Failure>: AnyObject, Publisher {
/// Sends a value to the subscriber.
///
/// - Parameter value: The value to send.
func send(_ value: Output)
/// Sends a completion signal to the subscriber.
///
/// - Parameter completion: A `Completion` instance which indicates whether publishing
/// has finished normally or failed with an error.
func send(completion: Subscribers.Completion<Failure>)
/// Sends a subscription to the subscriber.
///
/// This call provides the `Subject` an opportunity to establish demand for any new
/// upstream subscriptions.
///
/// - Parameter subscription: The subscription instance through which the subscriber
/// can request elements.
func send(subscription: Subscription)
}
/// A publisher that provides an explicit means of connecting and canceling publication.
///
/// Use a `ConnectablePublisher` when you need to perform additional configuration or
/// setup prior to producing any elements.
///
/// This publisher doesnt produce any elements until you call its `connect()` method.
///
/// Use `makeConnectable()` to create a `ConnectablePublisher` from any publisher whose
/// failure type is `Never`.
public protocol ConnectablePublisher<Output, Failure>: Publisher {
/// Connects to the publisher, allowing it to produce elements, and returns
/// an instance with which to cancel publishing.
///
/// - Returns: A `Cancellable` instance that you use to cancel publishing.
func connect() -> Cancellable
}
/// A protocol that declares a type that can receive input from a publisher.
///
/// A `Subscriber` instance receives a stream of elements from a `Publisher`, along with
/// life cycle events describing changes to their relationship. A given subscribers
/// `Input` and `Failure` associated types must match the `Output` and `Failure` of its
/// corresponding publisher.
///
/// You connect a subscriber to a publisher by calling the publishers `subscribe(_:)`
/// method. After making this call, the publisher invokes the subscribers
/// `receive(subscription:)` method. This gives the subscriber a `Subscription` instance,
/// which it uses to demand elements from the publisher, and to optionally cancel
/// the subscription. After the subscriber makes an initial demand, the publisher calls
/// `receive(_:)`, possibly asynchronously, to deliver newly-published elements.
/// If the publisher stops publishing, it calls `receive(completion:)`, using a parameter
/// of type `Subscribers.Completion` to indicate whether publishing completes normally or
/// with an error.
///
/// OpenCombine provides the following subscribers as operators on the `Publisher` type:
///
/// - `sink(receiveCompletion:receiveValue:)` executes arbitrary closures when
/// it receives a completion signal and each time it receives a new element.
/// - `assign(to:on:)` writes each newly-received value to a property identified by
/// a key path on a given instance.
public protocol Subscriber<Input, Failure>: CustomCombineIdentifierConvertible {
/// The kind of values this subscriber receives.
associatedtype Input
/// The kind of errors this subscriber might receive.
///
/// Use `Never` if this `Subscriber` cannot receive errors.
associatedtype Failure: Error
/// Tells the subscriber that it has successfully subscribed to the publisher and may
/// request items.
///
/// Use the received `Subscription` to request items from the publisher.
/// - Parameter subscription: A subscription that represents the connection between
/// publisher and subscriber.
func receive(subscription: Subscription)
/// Tells the subscriber that the publisher has produced an element.
///
/// - Parameter input: The published element.
/// - Returns: A `Subscribers.Demand` instance indicating how many more elements
/// the subscriber expects to receive.
func receive(_ input: Input) -> Subscribers.Demand
/// Tells the subscriber that the publisher has completed publishing, either normally
/// or with an error.
///
/// - Parameter completion: A `Subscribers.Completion` case indicating whether
/// publishing completed normally or with an error.
func receive(completion: Subscribers.Completion<Failure>)
}
/// A protocol that defines when and how to execute a closure.
///
/// You can use a scheduler to execute code as soon as possible, or after a future date.
/// Individual scheduler implementations use whatever time-keeping system makes sense
/// for them. Schedulers express this as their `SchedulerTimeType`. Since this type
/// conforms to `SchedulerTimeIntervalConvertible`, you can always express these times
/// with the convenience functions like `.milliseconds(500)`. Schedulers can accept
/// options to control how they execute the actions passed to them. These options may
/// control factors like which threads or dispatch queues execute the actions.
public protocol Scheduler<SchedulerTimeType> {
/// Describes an instant in time for this scheduler.
associatedtype SchedulerTimeType: Strideable
where SchedulerTimeType.Stride: SchedulerTimeIntervalConvertible
/// A type that defines options accepted by the scheduler.
///
/// This type is freely definable by each `Scheduler`. Typically, operations that
/// take a `Scheduler` parameter will also take `SchedulerOptions`.
associatedtype SchedulerOptions
/// This schedulers definition of the current moment in time.
var now: SchedulerTimeType { get }
/// The minimum tolerance allowed by the scheduler.
var minimumTolerance: SchedulerTimeType.Stride { get }
/// Performs the action at the next possible opportunity.
func schedule(options: SchedulerOptions?, _ action: @escaping () -> Void)
/// Performs the action at some time after the specified date.
func schedule(after date: SchedulerTimeType,
tolerance: SchedulerTimeType.Stride,
options: SchedulerOptions?,
_ action: @escaping () -> Void)
/// Performs the action at some time after the specified date, at the specified
/// frequency, optionally taking into account tolerance if possible.
func schedule(after date: SchedulerTimeType,
interval: SchedulerTimeType.Stride,
tolerance: SchedulerTimeType.Stride,
options: SchedulerOptions?,
_ action: @escaping () -> Void) -> Cancellable
}
#else
/// Declares that a type can transmit a sequence of values over time.
///
/// A publisher delivers elements to one or more `Subscriber` instances.
/// The subscribers `Input` and `Failure` associated types must match the `Output` and
/// `Failure` types declared by the publisher.
/// The publisher implements the `receive(subscriber:)`method to accept a subscriber.
///
/// After this, the publisher can call the following methods on the subscriber:
/// - `receive(subscription:)`: Acknowledges the subscribe request and returns
/// a `Subscription` instance. The subscriber uses the subscription to demand elements
/// from the publisher and can use it to cancel publishing.
/// - `receive(_:)`: Delivers one element from the publisher to the subscriber.
/// - `receive(completion:)`: Informs the subscriber that publishing has ended,
/// either normally or with an error.
///
/// Every `Publisher` must adhere to this contract for downstream subscribers to function
/// correctly.
///
/// Extensions on `Publisher` define a wide variety of _operators_ that you compose to
/// create sophisticated event-processing chains.
/// Each operator returns a type that implements the `Publisher` protocol
/// Most of these types exist as extensions on the `Publishers` enumeration.
/// For example, the `map(_:)` operator returns an instance of `Publishers.Map`.
///
/// # Creating Your Own Publishers
///
/// Rather than implementing the `Publisher` protocol yourself, you can create your own
/// publisher by using one of several types provided by the OpenCombine framework:
///
/// - Use a concrete subclass of `Subject`, such as `PassthroughSubject`, to publish
/// values on-demand by calling its `send(_:)` method.
/// - Use a `CurrentValueSubject` to publish whenever you update the subjects underlying
/// value.
/// - Add the `@Published` annotation to a property of one of your own types. In doing so,
/// the property gains a publisher that emits an event whenever the propertys value
/// changes. See the `Published` type for an example of this approach.
public protocol Publisher {
/// The kind of values published by this publisher.
associatedtype Output
/// The kind of errors this publisher might publish.
///
/// Use `Never` if this `Publisher` does not publish errors.
associatedtype Failure: Error
/// Attaches the specified subscriber to this publisher.
///
/// Always call this function instead of `receive(subscriber:)`.
/// Adopters of `Publisher` must implement `receive(subscriber:)`. The implementation
/// of `subscribe(_:)` provided by `Publisher` calls through to
/// `receive(subscriber:)`.
///
/// - Parameter subscriber: The subscriber to attach to this publisher. After
/// attaching, the subscriber can start to receive values.
func receive<Subscriber: OpenCombine.Subscriber>(subscriber: Subscriber)
where Failure == Subscriber.Failure, Output == Subscriber.Input
}
/// A publisher that exposes a method for outside callers to publish elements.
///
/// A subject is a publisher that you can use to inject values into a stream, by calling
/// its `send()` method. This can be useful for adapting existing imperative code to the
/// Combine model.
public protocol Subject: AnyObject, Publisher {
/// Sends a value to the subscriber.
///
/// - Parameter value: The value to send.
func send(_ value: Output)
/// Sends a completion signal to the subscriber.
///
/// - Parameter completion: A `Completion` instance which indicates whether publishing
/// has finished normally or failed with an error.
func send(completion: Subscribers.Completion<Failure>)
/// Sends a subscription to the subscriber.
///
/// This call provides the `Subject` an opportunity to establish demand for any new
/// upstream subscriptions.
///
/// - Parameter subscription: The subscription instance through which the subscriber
/// can request elements.
func send(subscription: Subscription)
}
/// A publisher that provides an explicit means of connecting and canceling publication.
///
/// Use a `ConnectablePublisher` when you need to perform additional configuration or
/// setup prior to producing any elements.
///
/// This publisher doesnt produce any elements until you call its `connect()` method.
///
/// Use `makeConnectable()` to create a `ConnectablePublisher` from any publisher whose
/// failure type is `Never`.
public protocol ConnectablePublisher: Publisher {
/// Connects to the publisher, allowing it to produce elements, and returns
/// an instance with which to cancel publishing.
///
/// - Returns: A `Cancellable` instance that you use to cancel publishing.
func connect() -> Cancellable
}
/// A protocol that declares a type that can receive input from a publisher.
///
/// A `Subscriber` instance receives a stream of elements from a `Publisher`, along with
/// life cycle events describing changes to their relationship. A given subscribers
/// `Input` and `Failure` associated types must match the `Output` and `Failure` of its
/// corresponding publisher.
///
/// You connect a subscriber to a publisher by calling the publishers `subscribe(_:)`
/// method. After making this call, the publisher invokes the subscribers
/// `receive(subscription:)` method. This gives the subscriber a `Subscription` instance,
/// which it uses to demand elements from the publisher, and to optionally cancel
/// the subscription. After the subscriber makes an initial demand, the publisher calls
/// `receive(_:)`, possibly asynchronously, to deliver newly-published elements.
/// If the publisher stops publishing, it calls `receive(completion:)`, using a parameter
/// of type `Subscribers.Completion` to indicate whether publishing completes normally or
/// with an error.
///
/// OpenCombine provides the following subscribers as operators on the `Publisher` type:
///
/// - `sink(receiveCompletion:receiveValue:)` executes arbitrary closures when
/// it receives a completion signal and each time it receives a new element.
/// - `assign(to:on:)` writes each newly-received value to a property identified by
/// a key path on a given instance.
public protocol Subscriber: CustomCombineIdentifierConvertible {
/// The kind of values this subscriber receives.
associatedtype Input
/// The kind of errors this subscriber might receive.
///
/// Use `Never` if this `Subscriber` cannot receive errors.
associatedtype Failure: Error
/// Tells the subscriber that it has successfully subscribed to the publisher and may
/// request items.
///
/// Use the received `Subscription` to request items from the publisher.
/// - Parameter subscription: A subscription that represents the connection between
/// publisher and subscriber.
func receive(subscription: Subscription)
/// Tells the subscriber that the publisher has produced an element.
///
/// - Parameter input: The published element.
/// - Returns: A `Subscribers.Demand` instance indicating how many more elements
/// the subscriber expects to receive.
func receive(_ input: Input) -> Subscribers.Demand
/// Tells the subscriber that the publisher has completed publishing, either normally
/// or with an error.
///
/// - Parameter completion: A `Subscribers.Completion` case indicating whether
/// publishing completed normally or with an error.
func receive(completion: Subscribers.Completion<Failure>)
}
/// A protocol that defines when and how to execute a closure.
///
/// You can use a scheduler to execute code as soon as possible, or after a future date.
/// Individual scheduler implementations use whatever time-keeping system makes sense
/// for them. Schedulers express this as their `SchedulerTimeType`. Since this type
/// conforms to `SchedulerTimeIntervalConvertible`, you can always express these times
/// with the convenience functions like `.milliseconds(500)`. Schedulers can accept
/// options to control how they execute the actions passed to them. These options may
/// control factors like which threads or dispatch queues execute the actions.
public protocol Scheduler {
/// Describes an instant in time for this scheduler.
associatedtype SchedulerTimeType: Strideable
where SchedulerTimeType.Stride: SchedulerTimeIntervalConvertible
/// A type that defines options accepted by the scheduler.
///
/// This type is freely definable by each `Scheduler`. Typically, operations that
/// take a `Scheduler` parameter will also take `SchedulerOptions`.
associatedtype SchedulerOptions
/// This schedulers definition of the current moment in time.
var now: SchedulerTimeType { get }
/// The minimum tolerance allowed by the scheduler.
var minimumTolerance: SchedulerTimeType.Stride { get }
/// Performs the action at the next possible opportunity.
func schedule(options: SchedulerOptions?, _ action: @escaping () -> Void)
/// Performs the action at some time after the specified date.
func schedule(after date: SchedulerTimeType,
tolerance: SchedulerTimeType.Stride,
options: SchedulerOptions?,
_ action: @escaping () -> Void)
/// Performs the action at some time after the specified date, at the specified
/// frequency, optionally taking into account tolerance if possible.
func schedule(after date: SchedulerTimeType,
interval: SchedulerTimeType.Stride,
tolerance: SchedulerTimeType.Stride,
options: SchedulerOptions?,
_ action: @escaping () -> Void) -> Cancellable
}
#endif
@@ -0,0 +1,53 @@
//
// Publisher+Subscribe.swift
//
//
// Created by Sergej Jaskiewicz on 23.04.2023.
//
extension Publisher {
/// Attaches the specified subscriber to this publisher.
///
/// Always call this function instead of `receive(subscriber:)`.
/// Adopters of `Publisher` must implement `receive(subscriber:)`.
/// The implementation of `subscribe(_:)` in this extension calls through to
/// `receive(subscriber:)`.
/// - SeeAlso: `receive(subscriber:)`
/// - Parameters:
/// - subscriber: The subscriber to attach to this `Publisher`. After attaching,
/// the subscriber can start to receive values.
public func subscribe<Subscriber: OpenCombine.Subscriber>(_ subscriber: Subscriber)
where Failure == Subscriber.Failure, Output == Subscriber.Input
{
if let hook = DebugHook.getGlobalHook() {
if var marker = subscriber as? SubscriberTapMarker {
let anySubscriber = marker.inner
as! AnySubscriber<Subscriber.Input, Subscriber.Failure>
hook.willReceive(publisher: self, subscriber: anySubscriber)
receive(subscriber: subscriber)
hook.didReceive(publisher: self, subscriber: anySubscriber)
} else {
let tap = SubscriberTap(subscriber: subscriber)
hook.willReceive(publisher: self, subscriber: subscriber)
receive(subscriber: tap)
hook.didReceive(publisher: self, subscriber: subscriber)
}
} else {
receive(subscriber: subscriber)
}
}
/// Attaches the specified subject to this publisher.
///
/// - Parameter subject: The subject to attach to this publisher.
public func subscribe<Subject: OpenCombine.Subject>(
_ subject: Subject
) -> AnyCancellable
where Failure == Subject.Failure, Output == Subject.Output
{
let subscriber = SubjectSubscriber(subject)
self.subscribe(subscriber)
return AnyCancellable(subscriber)
}
}
-112
View File
@@ -1,112 +0,0 @@
//
// Publisher.swift
// OpenCombine
//
// Created by Sergej Jaskiewicz on 10.06.2019.
//
/// Declares that a type can transmit a sequence of values over time.
///
/// A publisher delivers elements to one or more `Subscriber` instances.
/// The subscribers `Input` and `Failure` associated types must match the `Output` and
/// `Failure` types declared by the publisher.
/// The publisher implements the `receive(subscriber:)`method to accept a subscriber.
///
/// After this, the publisher can call the following methods on the subscriber:
/// - `receive(subscription:)`: Acknowledges the subscribe request and returns
/// a `Subscription` instance. The subscriber uses the subscription to demand elements
/// from the publisher and can use it to cancel publishing.
/// - `receive(_:)`: Delivers one element from the publisher to the subscriber.
/// - `receive(completion:)`: Informs the subscriber that publishing has ended,
/// either normally or with an error.
///
/// Every `Publisher` must adhere to this contract for downstream subscribers to function
/// correctly.
///
/// Extensions on `Publisher` define a wide variety of _operators_ that you compose to
/// create sophisticated event-processing chains.
/// Each operator returns a type that implements the `Publisher` protocol
/// Most of these types exist as extensions on the `Publishers` enumeration.
/// For example, the `map(_:)` operator returns an instance of `Publishers.Map`.
///
/// # Creating Your Own Publishers
///
/// Rather than implementing the `Publisher` protocol yourself, you can create your own
/// publisher by using one of several types provided by the OpenCombine framework:
///
/// - Use a concrete subclass of `Subject`, such as `PassthroughSubject`, to publish
/// values on-demand by calling its `send(_:)` method.
/// - Use a `CurrentValueSubject` to publish whenever you update the subjects underlying
/// value.
/// - Add the `@Published` annotation to a property of one of your own types. In doing so,
/// the property gains a publisher that emits an event whenever the propertys value
/// changes. See the `Published` type for an example of this approach.
public protocol Publisher {
/// The kind of values published by this publisher.
associatedtype Output
/// The kind of errors this publisher might publish.
///
/// Use `Never` if this `Publisher` does not publish errors.
associatedtype Failure: Error
/// Attaches the specified subscriber to this publisher.
///
/// Always call this function instead of `receive(subscriber:)`.
/// Adopters of `Publisher` must implement `receive(subscriber:)`. The implementation
/// of `subscribe(_:)` provided by `Publisher` calls through to
/// `receive(subscriber:)`.
///
/// - Parameter subscriber: The subscriber to attach to this publisher. After
/// attaching, the subscriber can start to receive values.
func receive<Subscriber: OpenCombine.Subscriber>(subscriber: Subscriber)
where Failure == Subscriber.Failure, Output == Subscriber.Input
}
extension Publisher {
/// Attaches the specified subscriber to this publisher.
///
/// Always call this function instead of `receive(subscriber:)`.
/// Adopters of `Publisher` must implement `receive(subscriber:)`.
/// The implementation of `subscribe(_:)` in this extension calls through to
/// `receive(subscriber:)`.
/// - SeeAlso: `receive(subscriber:)`
/// - Parameters:
/// - subscriber: The subscriber to attach to this `Publisher`. After attaching,
/// the subscriber can start to receive values.
public func subscribe<Subscriber: OpenCombine.Subscriber>(_ subscriber: Subscriber)
where Failure == Subscriber.Failure, Output == Subscriber.Input
{
if let hook = DebugHook.getGlobalHook() {
if var marker = subscriber as? SubscriberTapMarker {
let anySubscriber = marker.inner
as! AnySubscriber<Subscriber.Input, Subscriber.Failure>
hook.willReceive(publisher: self, subscriber: anySubscriber)
receive(subscriber: subscriber)
hook.didReceive(publisher: self, subscriber: anySubscriber)
} else {
let tap = SubscriberTap(subscriber: subscriber)
hook.willReceive(publisher: self, subscriber: subscriber)
receive(subscriber: tap)
hook.didReceive(publisher: self, subscriber: subscriber)
}
} else {
receive(subscriber: subscriber)
}
}
/// Attaches the specified subject to this publisher.
///
/// - Parameter subject: The subject to attach to this publisher.
public func subscribe<Subject: OpenCombine.Subject>(
_ subject: Subject
) -> AnyCancellable
where Failure == Subject.Failure, Output == Subject.Output
{
let subscriber = SubjectSubscriber(subject)
self.subscribe(subscriber)
return AnyCancellable(subscriber)
}
}
@@ -11,6 +11,8 @@
// Created by Sergej Jaskiewicz on 03/10/2019.
//
// swiftlint:disable large_tuple
extension Publisher {
/// Publishes the value of the key path.
///
@@ -31,7 +33,7 @@ extension Publisher {
/// .sink {
/// print ("Rolled: \($0)")
/// }
/// // Prints "Rolled: 6 (or some other random value).
/// // Prints "Rolled: 4 (or some other random value).
///
/// - Parameters:
/// - keyPath: The key path of a property on `Output`.
@@ -68,7 +70,7 @@ extension Publisher {
/// (total \(values.0 + values.1))
/// """)
/// }
/// // Prints "Rolled: 5, 3 (total: 8)" (or other random values).
/// // Prints "Rolled: 4, 1 (total: 5)" (or other random values).
///
/// - Parameters:
/// - keyPath0: The key path of a property on `Output`.
@@ -110,7 +112,7 @@ extension Publisher {
/// (total \(values.0 + values.1 + values.2))
/// """)
/// }
/// // Prints "Rolled: 2, 4, 3 (total: 9)" (or other random values).
/// // Prints "Rolled: 3, 5, 4 (total: 12)" (or other random values).
///
/// - Parameters:
/// - keyPath0: The key path of a property on `Output`.
@@ -10,10 +10,10 @@ extension Publisher {
/// Raises a fatal error when its upstream publisher fails, and otherwise republishes
/// all received input.
///
/// Use `assertNoFailure()` for internal sanity checks that are active during testing.
/// However, it is important to note that, like its Swift counterpart
/// Use `assertNoFailure()` for internal integrity checks that are active during
/// testing. However, it is important to note that, like its Swift counterpart
/// `fatalError(_:)`, the `assertNoFailure()` operator asserts a fatal exception when
/// triggered in both development/testing _and_ shipping versions of code.
/// triggered during development and testing, _and_ in shipping versions of code.
///
/// In the example below, a `CurrentValueSubject` publishes the initial and second
/// values successfully. The third value, containing a `genericSubjectError`, causes
@@ -57,8 +57,8 @@ extension Publishers {
/// A publisher that raises a fatal error upon receiving any failure, and otherwise
/// republishes all received input.
///
/// Use this function for internal sanity checks that are active during testing but
/// do not impact performance of shipping code.
/// Use this function for internal integrity checks that are active during testing but
/// don't affect performance of shipping code.
public struct AssertNoFailure<Upstream: Publisher>: Publisher {
public typealias Output = Upstream.Output
@@ -139,12 +139,12 @@ extension Publishers.HandleEvents {
private var status = SubscriptionStatus.awaitingSubscription
private let lock = UnfairLock.allocate()
public var receiveSubscription: ((Subscription) -> Void)?
public var receiveOutput: ((Upstream.Output) -> Void)?
public var receiveCompletion:
fileprivate var receiveSubscription: ((Subscription) -> Void)?
fileprivate var receiveOutput: ((Upstream.Output) -> Void)?
fileprivate var receiveCompletion:
((Subscribers.Completion<Upstream.Failure>) -> Void)?
public var receiveCancel: (() -> Void)?
public var receiveRequest: ((Subscribers.Demand) -> Void)?
fileprivate var receiveCancel: (() -> Void)?
fileprivate var receiveRequest: ((Subscribers.Demand) -> Void)?
private let downstream: Downstream
init(_ events: Publishers.HandleEvents<Upstream>, downstream: Downstream) {
@@ -6,6 +6,8 @@ ${template_header}
// Created by Sergej Jaskiewicz on 03/10/2019.
//
// swiftlint:disable large_tuple
%{
from gyb_opencombine_support import (
suffix_variadic,
@@ -4,6 +4,8 @@
// Created by Eric Patey on 29.08.2019.
//
// swiftlint:disable large_tuple
#if canImport(COpenCombineHelpers)
import COpenCombineHelpers
#endif
@@ -85,7 +87,6 @@ extension Publishers {
public func receive<Downstream>(subscriber: Downstream)
where Downstream: Subscriber,
UpstreamC.Failure == Downstream.Failure,
// swiftlint:disable:next large_tuple
Downstream.Input == (UpstreamA.Output, UpstreamB.Output, UpstreamC.Output)
{
_ = Inner<Downstream>(downstream: subscriber, a, b, c)
@@ -140,7 +141,6 @@ extension Publishers {
/// once attached it can begin to receive values.
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where UpstreamD.Failure == Downstream.Failure,
// swiftlint:disable:next large_tuple
Downstream.Input == (
UpstreamA.Output,
UpstreamB.Output,
@@ -659,7 +659,7 @@ private protocol ChildSubscription: AnyObject, Subscription {
var hasValue: Bool { get }
}
fileprivate final class ChildSubscriber<Upstream: Publisher, Downstream: Subscriber>
private final class ChildSubscriber<Upstream: Publisher, Downstream: Subscriber>
where Upstream.Failure == Downstream.Failure
{
typealias Input = Upstream.Output
+220
View File
@@ -0,0 +1,220 @@
${template_header}
//
// RootProtocols.swift
// OpenCombine
//
// Created by Sergej Jaskiewicz on 10.06.2019.
//
%{
variants = [(True, '#if compiler(>=5.7)'), (False, '#else')]
}%
% for primary_associated_types_supported, guard in variants:
${guard}
/// Declares that a type can transmit a sequence of values over time.
///
/// A publisher delivers elements to one or more `Subscriber` instances.
/// The subscribers `Input` and `Failure` associated types must match the `Output` and
/// `Failure` types declared by the publisher.
/// The publisher implements the `receive(subscriber:)`method to accept a subscriber.
///
/// After this, the publisher can call the following methods on the subscriber:
/// - `receive(subscription:)`: Acknowledges the subscribe request and returns
/// a `Subscription` instance. The subscriber uses the subscription to demand elements
/// from the publisher and can use it to cancel publishing.
/// - `receive(_:)`: Delivers one element from the publisher to the subscriber.
/// - `receive(completion:)`: Informs the subscriber that publishing has ended,
/// either normally or with an error.
///
/// Every `Publisher` must adhere to this contract for downstream subscribers to function
/// correctly.
///
/// Extensions on `Publisher` define a wide variety of _operators_ that you compose to
/// create sophisticated event-processing chains.
/// Each operator returns a type that implements the `Publisher` protocol
/// Most of these types exist as extensions on the `Publishers` enumeration.
/// For example, the `map(_:)` operator returns an instance of `Publishers.Map`.
///
/// # Creating Your Own Publishers
///
/// Rather than implementing the `Publisher` protocol yourself, you can create your own
/// publisher by using one of several types provided by the OpenCombine framework:
///
/// - Use a concrete subclass of `Subject`, such as `PassthroughSubject`, to publish
/// values on-demand by calling its `send(_:)` method.
/// - Use a `CurrentValueSubject` to publish whenever you update the subjects underlying
/// value.
/// - Add the `@Published` annotation to a property of one of your own types. In doing so,
/// the property gains a publisher that emits an event whenever the propertys value
/// changes. See the `Published` type for an example of this approach.
public protocol Publisher${'<Output, Failure>' if primary_associated_types_supported else ''} {
/// The kind of values published by this publisher.
associatedtype Output
/// The kind of errors this publisher might publish.
///
/// Use `Never` if this `Publisher` does not publish errors.
associatedtype Failure: Error
/// Attaches the specified subscriber to this publisher.
///
/// Always call this function instead of `receive(subscriber:)`.
/// Adopters of `Publisher` must implement `receive(subscriber:)`. The implementation
/// of `subscribe(_:)` provided by `Publisher` calls through to
/// `receive(subscriber:)`.
///
/// - Parameter subscriber: The subscriber to attach to this publisher. After
/// attaching, the subscriber can start to receive values.
func receive<Subscriber: OpenCombine.Subscriber>(subscriber: Subscriber)
where Failure == Subscriber.Failure, Output == Subscriber.Input
}
/// A publisher that exposes a method for outside callers to publish elements.
///
/// A subject is a publisher that you can use to inject values into a stream, by calling
/// its `send()` method. This can be useful for adapting existing imperative code to the
/// Combine model.
public protocol Subject${'<Output, Failure>' if primary_associated_types_supported else ''}: AnyObject, Publisher {
/// Sends a value to the subscriber.
///
/// - Parameter value: The value to send.
func send(_ value: Output)
/// Sends a completion signal to the subscriber.
///
/// - Parameter completion: A `Completion` instance which indicates whether publishing
/// has finished normally or failed with an error.
func send(completion: Subscribers.Completion<Failure>)
/// Sends a subscription to the subscriber.
///
/// This call provides the `Subject` an opportunity to establish demand for any new
/// upstream subscriptions.
///
/// - Parameter subscription: The subscription instance through which the subscriber
/// can request elements.
func send(subscription: Subscription)
}
/// A publisher that provides an explicit means of connecting and canceling publication.
///
/// Use a `ConnectablePublisher` when you need to perform additional configuration or
/// setup prior to producing any elements.
///
/// This publisher doesnt produce any elements until you call its `connect()` method.
///
/// Use `makeConnectable()` to create a `ConnectablePublisher` from any publisher whose
/// failure type is `Never`.
public protocol ConnectablePublisher${'<Output, Failure>' if primary_associated_types_supported else ''}: Publisher {
/// Connects to the publisher, allowing it to produce elements, and returns
/// an instance with which to cancel publishing.
///
/// - Returns: A `Cancellable` instance that you use to cancel publishing.
func connect() -> Cancellable
}
/// A protocol that declares a type that can receive input from a publisher.
///
/// A `Subscriber` instance receives a stream of elements from a `Publisher`, along with
/// life cycle events describing changes to their relationship. A given subscribers
/// `Input` and `Failure` associated types must match the `Output` and `Failure` of its
/// corresponding publisher.
///
/// You connect a subscriber to a publisher by calling the publishers `subscribe(_:)`
/// method. After making this call, the publisher invokes the subscribers
/// `receive(subscription:)` method. This gives the subscriber a `Subscription` instance,
/// which it uses to demand elements from the publisher, and to optionally cancel
/// the subscription. After the subscriber makes an initial demand, the publisher calls
/// `receive(_:)`, possibly asynchronously, to deliver newly-published elements.
/// If the publisher stops publishing, it calls `receive(completion:)`, using a parameter
/// of type `Subscribers.Completion` to indicate whether publishing completes normally or
/// with an error.
///
/// OpenCombine provides the following subscribers as operators on the `Publisher` type:
///
/// - `sink(receiveCompletion:receiveValue:)` executes arbitrary closures when
/// it receives a completion signal and each time it receives a new element.
/// - `assign(to:on:)` writes each newly-received value to a property identified by
/// a key path on a given instance.
public protocol Subscriber${'<Input, Failure>' if primary_associated_types_supported else ''}: CustomCombineIdentifierConvertible {
/// The kind of values this subscriber receives.
associatedtype Input
/// The kind of errors this subscriber might receive.
///
/// Use `Never` if this `Subscriber` cannot receive errors.
associatedtype Failure: Error
/// Tells the subscriber that it has successfully subscribed to the publisher and may
/// request items.
///
/// Use the received `Subscription` to request items from the publisher.
/// - Parameter subscription: A subscription that represents the connection between
/// publisher and subscriber.
func receive(subscription: Subscription)
/// Tells the subscriber that the publisher has produced an element.
///
/// - Parameter input: The published element.
/// - Returns: A `Subscribers.Demand` instance indicating how many more elements
/// the subscriber expects to receive.
func receive(_ input: Input) -> Subscribers.Demand
/// Tells the subscriber that the publisher has completed publishing, either normally
/// or with an error.
///
/// - Parameter completion: A `Subscribers.Completion` case indicating whether
/// publishing completed normally or with an error.
func receive(completion: Subscribers.Completion<Failure>)
}
/// A protocol that defines when and how to execute a closure.
///
/// You can use a scheduler to execute code as soon as possible, or after a future date.
/// Individual scheduler implementations use whatever time-keeping system makes sense
/// for them. Schedulers express this as their `SchedulerTimeType`. Since this type
/// conforms to `SchedulerTimeIntervalConvertible`, you can always express these times
/// with the convenience functions like `.milliseconds(500)`. Schedulers can accept
/// options to control how they execute the actions passed to them. These options may
/// control factors like which threads or dispatch queues execute the actions.
public protocol Scheduler${'<SchedulerTimeType>' if primary_associated_types_supported else ''} {
/// Describes an instant in time for this scheduler.
associatedtype SchedulerTimeType: Strideable
where SchedulerTimeType.Stride: SchedulerTimeIntervalConvertible
/// A type that defines options accepted by the scheduler.
///
/// This type is freely definable by each `Scheduler`. Typically, operations that
/// take a `Scheduler` parameter will also take `SchedulerOptions`.
associatedtype SchedulerOptions
/// This schedulers definition of the current moment in time.
var now: SchedulerTimeType { get }
/// The minimum tolerance allowed by the scheduler.
var minimumTolerance: SchedulerTimeType.Stride { get }
/// Performs the action at the next possible opportunity.
func schedule(options: SchedulerOptions?, _ action: @escaping () -> Void)
/// Performs the action at some time after the specified date.
func schedule(after date: SchedulerTimeType,
tolerance: SchedulerTimeType.Stride,
options: SchedulerOptions?,
_ action: @escaping () -> Void)
/// Performs the action at some time after the specified date, at the specified
/// frequency, optionally taking into account tolerance if possible.
func schedule(after date: SchedulerTimeType,
interval: SchedulerTimeType.Stride,
tolerance: SchedulerTimeType.Stride,
options: SchedulerOptions?,
_ action: @escaping () -> Void) -> Cancellable
}
% end
#endif
-45
View File
@@ -29,51 +29,6 @@ public protocol SchedulerTimeIntervalConvertible {
static func nanoseconds(_ ns: Int) -> Self
}
/// A protocol that defines when and how to execute a closure.
///
/// You can use a scheduler to execute code as soon as possible, or after a future date.
/// Individual scheduler implementations use whatever time-keeping system makes sense
/// for them. Schedulers express this as their `SchedulerTimeType`. Since this type
/// conforms to `SchedulerTimeIntervalConvertible`, you can always express these times
/// with the convenience functions like `.milliseconds(500)`. Schedulers can accept
/// options to control how they execute the actions passed to them. These options may
/// control factors like which threads or dispatch queues execute the actions.
public protocol Scheduler {
/// Describes an instant in time for this scheduler.
associatedtype SchedulerTimeType: Strideable
where SchedulerTimeType.Stride: SchedulerTimeIntervalConvertible
/// A type that defines options accepted by the scheduler.
///
/// This type is freely definable by each `Scheduler`. Typically, operations that
/// take a `Scheduler` parameter will also take `SchedulerOptions`.
associatedtype SchedulerOptions
/// This schedulers definition of the current moment in time.
var now: SchedulerTimeType { get }
/// The minimum tolerance allowed by the scheduler.
var minimumTolerance: SchedulerTimeType.Stride { get }
/// Performs the action at the next possible opportunity.
func schedule(options: SchedulerOptions?, _ action: @escaping () -> Void)
/// Performs the action at some time after the specified date.
func schedule(after date: SchedulerTimeType,
tolerance: SchedulerTimeType.Stride,
options: SchedulerOptions?,
_ action: @escaping () -> Void)
/// Performs the action at some time after the specified date, at the specified
/// frequency, optionally taking into account tolerance if possible.
func schedule(after date: SchedulerTimeType,
interval: SchedulerTimeType.Stride,
tolerance: SchedulerTimeType.Stride,
options: SchedulerOptions?,
_ action: @escaping () -> Void) -> Cancellable
}
extension Scheduler {
/// Performs the action at some time after the specified date, using the schedulers
+17
View File
@@ -0,0 +1,17 @@
//
// Subject+Void.swift
// OpenCombine
//
// Created by Sergej Jaskiewicz on 10.06.2019.
//
extension Subject where Output == Void {
/// Sends a void value to the subscriber.
///
/// Use `Void` inputs and outputs when you want to signal that an event has occurred,
/// but dont need to send the event itself.
public func send() {
send(())
}
}
-45
View File
@@ -1,45 +0,0 @@
//
// Subject.swift
// OpenCombine
//
// Created by Sergej Jaskiewicz on 10.06.2019.
//
/// A publisher that exposes a method for outside callers to publish elements.
///
/// A subject is a publisher that you can use to inject values into a stream, by calling
/// its `send()` method. This can be useful for adapting existing imperative code to the
/// Combine model.
public protocol Subject: AnyObject, Publisher {
/// Sends a value to the subscriber.
///
/// - Parameter value: The value to send.
func send(_ value: Output)
/// Sends a completion signal to the subscriber.
///
/// - Parameter completion: A `Completion` instance which indicates whether publishing
/// has finished normally or failed with an error.
func send(completion: Subscribers.Completion<Failure>)
/// Sends a subscription to the subscriber.
///
/// This call provides the `Subject` an opportunity to establish demand for any new
/// upstream subscriptions.
///
/// - Parameter subscription: The subscription instance through which the subscriber
/// can request elements.
func send(subscription: Subscription)
}
extension Subject where Output == Void {
/// Sends a void value to the subscriber.
///
/// Use `Void` inputs and outputs when you want to signal that an event has occurred,
/// but dont need to send the event itself.
public func send() {
send(())
}
}
+20
View File
@@ -0,0 +1,20 @@
//
// Subscriber+Void.swift
// OpenCombine
//
// Created by Sergej Jaskiewicz on 10.06.2019.
//
extension Subscriber where Input == Void {
/// Tells the subscriber that a publisher of void elements is ready to receive further
/// requests.
///
/// Use `Void` inputs and outputs when you want to signal that an event has occurred,
/// but dont need to send the event itself.
/// - Returns: A `Subscribers.Demand` instance indicating how many more elements
/// the subscriber expects to receive.
public func receive() -> Subscribers.Demand {
return receive(())
}
}
-76
View File
@@ -1,76 +0,0 @@
//
// Subscriber.swift
// OpenCombine
//
// Created by Sergej Jaskiewicz on 10.06.2019.
//
/// A protocol that declares a type that can receive input from a publisher.
///
/// A `Subscriber` instance receives a stream of elements from a `Publisher`, along with
/// life cycle events describing changes to their relationship. A given subscribers
/// `Input` and `Failure` associated types must match the `Output` and `Failure` of its
/// corresponding publisher.
///
/// You connect a subscriber to a publisher by calling the publishers `subscribe(_:)`
/// method. After making this call, the publisher invokes the subscribers
/// `receive(subscription:)` method. This gives the subscriber a `Subscription` instance,
/// which it uses to demand elements from the publisher, and to optionally cancel
/// the subscription. After the subscriber makes an initial demand, the publisher calls
/// `receive(_:)`, possibly asynchronously, to deliver newly-published elements.
/// If the publisher stops publishing, it calls `receive(completion:)`, using a parameter
/// of type `Subscribers.Completion` to indicate whether publishing completes normally or
/// with an error.
///
/// OpenCombine provides the following subscribers as operators on the `Publisher` type:
///
/// - `sink(receiveCompletion:receiveValue:)` executes arbitrary closures when
/// it receives a completion signal and each time it receives a new element.
/// - `assign(to:on:)` writes each newly-received value to a property identified by
/// a key path on a given instance.
public protocol Subscriber: CustomCombineIdentifierConvertible {
/// The kind of values this subscriber receives.
associatedtype Input
/// The kind of errors this subscriber might receive.
///
/// Use `Never` if this `Subscriber` cannot receive errors.
associatedtype Failure: Error
/// Tells the subscriber that it has successfully subscribed to the publisher and may
/// request items.
///
/// Use the received `Subscription` to request items from the publisher.
/// - Parameter subscription: A subscription that represents the connection between
/// publisher and subscriber.
func receive(subscription: Subscription)
/// Tells the subscriber that the publisher has produced an element.
///
/// - Parameter input: The published element.
/// - Returns: A `Subscribers.Demand` instance indicating how many more elements
/// the subscriber expects to receive.
func receive(_ input: Input) -> Subscribers.Demand
/// Tells the subscriber that the publisher has completed publishing, either normally
/// or with an error.
///
/// - Parameter completion: A `Subscribers.Completion` case indicating whether
/// publishing completed normally or with an error.
func receive(completion: Subscribers.Completion<Failure>)
}
extension Subscriber where Input == Void {
/// Tells the subscriber that a publisher of void elements is ready to receive further
/// requests.
///
/// Use `Void` inputs and outputs when you want to signal that an event has occurred,
/// but dont need to send the event itself.
/// - Returns: A `Subscribers.Demand` instance indicating how many more elements
/// the subscriber expects to receive.
public func receive() -> Subscribers.Demand {
return receive(())
}
}
@@ -5,7 +5,7 @@
// Created by Sergej Jaskiewicz on 10.06.2019.
//
// swiftlint:disable shorthand_operator - because of false positives here
// swiftlint:disable attributes
#if canImport(_Concurrency) && compiler(>=5.5)
import _Concurrency
+1
View File
@@ -5,6 +5,7 @@
// Created by Sergej Jaskiewicz on 27.09.2020.
//
// swiftlint:disable:next type_name
public protocol _Introspection: AnyObject {
func willReceive<Upstream: Publisher, Downstream: Subscriber>(
@@ -50,13 +50,7 @@ extension DispatchQueue {
/// - Parameter other: Another dispatch queue time.
/// - Returns: The time interval between this time and the provided time.
public func distance(to other: SchedulerTimeType) -> Stride {
let start = dispatchTime.rawValue
let end = other.dispatchTime.rawValue
return .nanoseconds(
end >= start
? Int(Int64(bitPattern: end) - Int64(bitPattern: start))
: -Int(Int64(bitPattern: start) - Int64(bitPattern: end))
)
return Stride(dispatchTime.polyfillDistance(to: other.dispatchTime))
}
/// Returns a dispatch queue scheduler time calculated by advancing
@@ -415,3 +409,31 @@ private func clampedIntProduct(_ lhs: Int64, _ rhs: Int64) -> Int64 {
}
return result
}
extension DispatchTime {
fileprivate func polyfillDistance(to other: DispatchTime) -> DispatchTimeInterval {
#if canImport(Darwin) && compiler(>=5.1)
if #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) {
return distance(to: other)
}
#endif
let start = rawValue
let end = other.rawValue
if end >= start {
let result = end &- start
if result > UInt64(Int.max) {
return .never
} else {
return .nanoseconds(Int(result))
}
} else {
let result = start &- end
if result > UInt64(Int.max) {
return .never
} else {
return .nanoseconds(-Int(result))
}
}
}
}
@@ -314,7 +314,7 @@ final class CurrentValueSubjectTests: XCTestCase {
receiveSubscription: { subscription in
subscription.request(.unlimited)
},
receiveCompletion: { completion in
receiveCompletion: { _ in
cvs.send(completion: .failure("must not recurse"))
}
)
@@ -31,23 +31,34 @@ final class DispatchQueueSchedulerTests: XCTestCase {
uptimeNanoseconds: DispatchTime.distantFuture.uptimeNanoseconds - 1024
)
)
let int64max = Scheduler.SchedulerTimeType(
DispatchTime(
uptimeNanoseconds: UInt64(Int.max)
)
)
XCTAssertEqual(time1.distance(to: time2), .nanoseconds(431))
XCTAssertEqual(time2.distance(to: time1), .nanoseconds(-431))
XCTAssertEqual(time1.distance(to: distantFuture), .nanoseconds(-10001))
XCTAssertEqual(distantFuture.distance(to: time1), .nanoseconds(10001))
XCTAssertEqual(time2.distance(to: distantFuture), .nanoseconds(-10432))
XCTAssertEqual(distantFuture.distance(to: time2), .nanoseconds(10432))
XCTAssertEqual(time1.distance(to: distantFuture), .nanoseconds(.max))
XCTAssertEqual(distantFuture.distance(to: time1), .nanoseconds(.max))
XCTAssertEqual(time2.distance(to: distantFuture), .nanoseconds(.max))
XCTAssertEqual(distantFuture.distance(to: time2), .nanoseconds(.max))
XCTAssertEqual(time1.distance(to: notSoDistantFuture), .nanoseconds(-11025))
XCTAssertEqual(notSoDistantFuture.distance(to: time1), .nanoseconds(11025))
XCTAssertEqual(time2.distance(to: notSoDistantFuture), .nanoseconds(-11456))
XCTAssertEqual(notSoDistantFuture.distance(to: time2), .nanoseconds(11456))
XCTAssertEqual(time1.distance(to: notSoDistantFuture), .nanoseconds(.max))
XCTAssertEqual(notSoDistantFuture.distance(to: time1), .nanoseconds(.max))
XCTAssertEqual(time2.distance(to: notSoDistantFuture), .nanoseconds(.max))
XCTAssertEqual(notSoDistantFuture.distance(to: time2), .nanoseconds(.max))
XCTAssertEqual(time1.distance(to: int64max), .nanoseconds(.max - 10000))
XCTAssertEqual(int64max.distance(to: time1), .nanoseconds(-(.max - 10000)))
XCTAssertEqual(time2.distance(to: int64max), .nanoseconds(.max - 10431))
XCTAssertEqual(int64max.distance(to: time2), .nanoseconds(-(.max - 10431)))
XCTAssertEqual(distantFuture.distance(to: distantFuture), .nanoseconds(0))
XCTAssertEqual(notSoDistantFuture.distance(to: notSoDistantFuture),
.nanoseconds(0))
XCTAssertEqual(int64max.distance(to: int64max), .nanoseconds(0))
}
func testSchedulerTimeTypeAdvanced() {
@@ -13,22 +13,75 @@ import Combine
import OpenCombine
#endif
// swiftlint:disable generic_type_name
/// See https://forums.swift.org/t/casting-from-any-to-optional/21883
private func dynamicCast<T>(_ value: Any, to: T.Type) -> T? {
if let value = value as? T {
return value
} else {
return nil
}
}
// swiftlint:enable generic_type_name
@available(macOS 10.15, iOS 13.0, *)
final class FutureTests: XCTestCase {
private typealias Sut = Future<Int, TestingError>
func testFutureSuccess() {
private func assertParent(of futureSubscription: Subscription, isNil: Bool) {
let parent: Mirror.Child
do {
parent = try XCTUnwrap(
Mirror(reflecting: futureSubscription)
.children
.first { $0.label == "parent" }
)
} catch {
XCTFail("Missing 'parent' property in \(futureSubscription)")
return
}
let parentAsSut: Sut?
do {
parentAsSut = try XCTUnwrap(dynamicCast(parent.value, to: Sut?.self))
} catch {
XCTFail("Unexpected type of the 'parent' property: \(parent.value)")
return
}
if isNil {
XCTAssertNil(parentAsSut)
} else {
XCTAssertNotNil(parentAsSut)
}
}
func testFutureSuccess() throws {
var promise: Sut.Promise?
let future = Sut { promise = $0 }
var downstreamSubscription: Subscription?
let subscriber = TrackingSubscriber(receiveSubscription: { subscription in
downstreamSubscription = subscription
subscription.request(.unlimited)
})
future.subscribe(subscriber)
let unwrappedDownstreamSubscription = try XCTUnwrap(downstreamSubscription)
self.assertParent(of: unwrappedDownstreamSubscription, isNil: false)
subscriber.onValue = { _ in
self.assertParent(of: unwrappedDownstreamSubscription, isNil: false)
}
promise?(.success(42))
self.assertParent(of: unwrappedDownstreamSubscription, isNil: true)
XCTAssertEqual(subscriber.history, [
.subscription("Future"),
.value(42),
@@ -36,13 +89,15 @@ final class FutureTests: XCTestCase {
])
}
func testFutureFailure() {
func testFutureFailure() throws {
var promise: Sut.Promise?
let future = Sut { promise = $0 }
var downstreamSubscription: Subscription?
let subscriber = TrackingSubscriber(
receiveSubscription: { subscription in
downstreamSubscription = subscription
subscription.request(.unlimited)
}, receiveValue: { _ in
XCTFail("no value should be returned")
@@ -51,9 +106,19 @@ final class FutureTests: XCTestCase {
)
future.subscribe(subscriber)
let unwrappedDownstreamSubscription = try XCTUnwrap(downstreamSubscription)
self.assertParent(of: unwrappedDownstreamSubscription, isNil: false)
subscriber.onFailure = { _ in
self.assertParent(of: unwrappedDownstreamSubscription, isNil: false)
}
let error = TestingError(description: "\(#function)")
promise?(.failure(error))
self.assertParent(of: unwrappedDownstreamSubscription, isNil: true)
XCTAssertEqual(subscriber.history, [
.subscription("Future"),
.completion(.failure(error))
@@ -250,6 +315,12 @@ final class FutureTests: XCTestCase {
let unwrappedDownstreamSubscription = try XCTUnwrap(downstreamSubscription)
self.assertParent(of: unwrappedDownstreamSubscription, isNil: false)
subscriber.onValue = { _ in
self.assertParent(of: unwrappedDownstreamSubscription, isNil: false)
}
unwrappedDownstreamSubscription.request(.max(1))
XCTAssertEqual(subscriber.history, [
@@ -258,12 +329,7 @@ final class FutureTests: XCTestCase {
.completion(.finished)
])
let parent = try XCTUnwrap(
Mirror(reflecting: unwrappedDownstreamSubscription)
.descendant("parent") as? Sut?
)
XCTAssertNotNil(parent)
assertParent(of: unwrappedDownstreamSubscription, isNil: false)
}
func testReleasesEverythingOnTermination() {