1 Commits

Author SHA1 Message Date
Sergej Jaskiewicz 26e86a9905 Implement CombineLatest 2021-06-21 19:09:04 +03:00
5 changed files with 885 additions and 196 deletions
-196
View File
@@ -2,165 +2,6 @@
// Please remove the corresponding piece from this file if you implement something,
// and complement this file as features are added in Apple's Combine
extension Publishers {
/// A publisher that receives and combines the latest elements from two publishers.
public struct CombineLatest<A, B> : Publisher where A : Publisher, B : Publisher, A.Failure == B.Failure {
/// The kind of values published by this publisher.
public typealias Output = (A.Output, B.Output)
/// The kind of errors this publisher might publish.
///
/// Use `Never` if this `Publisher` does not publish errors.
public typealias Failure = A.Failure
public let a: A
public let b: B
public init(_ a: A, _ b: B)
/// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
///
/// - SeeAlso: `subscribe(_:)`
/// - Parameters:
/// - subscriber: The subscriber to attach to this `Publisher`.
/// once attached it can begin to receive values.
public func receive<S>(subscriber: S) where S : Subscriber, B.Failure == S.Failure, S.Input == (A.Output, B.Output)
}
/// A publisher that receives and combines the latest elements from three publishers.
public struct CombineLatest3<A, B, C> : Publisher where A : Publisher, B : Publisher, C : Publisher, A.Failure == B.Failure, B.Failure == C.Failure {
/// The kind of values published by this publisher.
public typealias Output = (A.Output, B.Output, C.Output)
/// The kind of errors this publisher might publish.
///
/// Use `Never` if this `Publisher` does not publish errors.
public typealias Failure = A.Failure
public let a: A
public let b: B
public let c: C
public init(_ a: A, _ b: B, _ c: C)
/// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
///
/// - SeeAlso: `subscribe(_:)`
/// - Parameters:
/// - subscriber: The subscriber to attach to this `Publisher`.
/// once attached it can begin to receive values.
public func receive<S>(subscriber: S) where S : Subscriber, C.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output)
}
/// A publisher that receives and combines the latest elements from four publishers.
public struct CombineLatest4<A, B, C, D> : Publisher where A : Publisher, B : Publisher, C : Publisher, D : Publisher, A.Failure == B.Failure, B.Failure == C.Failure, C.Failure == D.Failure {
/// The kind of values published by this publisher.
public typealias Output = (A.Output, B.Output, C.Output, D.Output)
/// The kind of errors this publisher might publish.
///
/// Use `Never` if this `Publisher` does not publish errors.
public typealias Failure = A.Failure
public let a: A
public let b: B
public let c: C
public let d: D
public init(_ a: A, _ b: B, _ c: C, _ d: D)
/// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
///
/// - SeeAlso: `subscribe(_:)`
/// - Parameters:
/// - subscriber: The subscriber to attach to this `Publisher`.
/// once attached it can begin to receive values.
public func receive<S>(subscriber: S) where S : Subscriber, D.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output, D.Output)
}
}
extension Publisher {
/// Subscribes to an additional publisher and publishes a tuple upon receiving output from either publisher.
///
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isnt `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finsh. If an upstream publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also fails.
/// - Parameters:
/// - other: Another publisher to combine with this one.
/// - Returns: A publisher that receives and combines elements from this and another publisher.
public func combineLatest<P>(_ other: P) -> Publishers.CombineLatest<Self, P> where P : Publisher, Self.Failure == P.Failure
/// Subscribes to an additional publisher and invokes a closure upon receiving output from either publisher.
///
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isnt `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finsh. If an upstream publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also fails.
/// - Parameters:
/// - other: Another publisher to combine with this one.
/// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish.
/// - Returns: A publisher that receives and combines elements from this and another publisher.
public func combineLatest<P, T>(_ other: P, _ transform: @escaping (Self.Output, P.Output) -> T) -> Publishers.Map<Publishers.CombineLatest<Self, P>, T> where P : Publisher, Self.Failure == P.Failure
/// Subscribes to two additional publishers and publishes a tuple upon receiving output from any of the publishers.
///
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isnt `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also fails.
/// - Parameters:
/// - publisher1: A second publisher to combine with this one.
/// - publisher2: A third publisher to combine with this one.
/// - Returns: A publisher that receives and combines elements from this publisher and two other publishers.
public func combineLatest<P, Q>(_ publisher1: P, _ publisher2: Q) -> Publishers.CombineLatest3<Self, P, Q> where P : Publisher, Q : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure
/// Subscribes to two additional publishers and invokes a closure upon receiving output from any of the publishers.
///
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isnt `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also fails.
/// - Parameters:
/// - publisher1: A second publisher to combine with this one.
/// - publisher2: A third publisher to combine with this one.
/// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish.
/// - Returns: A publisher that receives and combines elements from this publisher and two other publishers.
public func combineLatest<P, Q, T>(_ publisher1: P, _ publisher2: Q, _ transform: @escaping (Self.Output, P.Output, Q.Output) -> T) -> Publishers.Map<Publishers.CombineLatest3<Self, P, Q>, T> where P : Publisher, Q : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure
/// Subscribes to three additional publishers and publishes a tuple upon receiving output from any of the publishers.
///
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isnt `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also fails.
/// - Parameters:
/// - publisher1: A second publisher to combine with this one.
/// - publisher2: A third publisher to combine with this one.
/// - publisher3: A fourth publisher to combine with this one.
/// - Returns: A publisher that receives and combines elements from this publisher and three other publishers.
public func combineLatest<P, Q, R>(_ publisher1: P, _ publisher2: Q, _ publisher3: R) -> Publishers.CombineLatest4<Self, P, Q, R> where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure
/// Subscribes to three additional publishers and invokes a closure upon receiving output from any of the publishers.
///
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isnt `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also fails.
/// - Parameters:
/// - publisher1: A second publisher to combine with this one.
/// - publisher2: A third publisher to combine with this one.
/// - publisher3: A fourth publisher to combine with this one.
/// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish.
/// - Returns: A publisher that receives and combines elements from this publisher and three other publishers.
public func combineLatest<P, Q, R, T>(_ publisher1: P, _ publisher2: Q, _ publisher3: R, _ transform: @escaping (Self.Output, P.Output, Q.Output, R.Output) -> T) -> Publishers.Map<Publishers.CombineLatest4<Self, P, Q, R>, T> where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure
}
extension Publishers {
/// A strategy for collecting received elements.
@@ -797,43 +638,6 @@ extension Publisher {
public func zip<P, Q, R, T>(_ publisher1: P, _ publisher2: Q, _ publisher3: R, _ transform: @escaping (Self.Output, P.Output, Q.Output, R.Output) -> T) -> Publishers.Map<Publishers.Zip4<Self, P, Q, R>, T> where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure
}
extension Publishers.CombineLatest : Equatable where A : Equatable, B : Equatable {
/// Returns a Boolean value that indicates whether two publishers are equivalent.
///
/// - Parameters:
/// - lhs: A combineLatest publisher to compare for equality.
/// - rhs: Another combineLatest publisher to compare for equality.
/// - Returns: `true` if the corresponding upstream publishers of each combineLatest publisher are equal, `false` otherwise.
public static func == (lhs: Publishers.CombineLatest<A, B>, rhs: Publishers.CombineLatest<A, B>) -> Bool
}
extension Publishers.CombineLatest3 : Equatable where A : Equatable, B : Equatable, C : Equatable {
/// Returns a Boolean value indicating whether two values are equal.
///
/// Equality is the inverse of inequality. For any values `a` and `b`,
/// `a == b` implies that `a != b` is `false`.
///
/// - Parameters:
/// - lhs: A value to compare.
/// - rhs: Another value to compare.
public static func == (lhs: Publishers.CombineLatest3<A, B, C>, rhs: Publishers.CombineLatest3<A, B, C>) -> Bool
}
extension Publishers.CombineLatest4 : Equatable where A : Equatable, B : Equatable, C : Equatable, D : Equatable {
/// Returns a Boolean value indicating whether two values are equal.
///
/// Equality is the inverse of inequality. For any values `a` and `b`,
/// `a == b` implies that `a != b` is `false`.
///
/// - Parameters:
/// - lhs: A value to compare.
/// - rhs: Another value to compare.
public static func == (lhs: Publishers.CombineLatest4<A, B, C, D>, rhs: Publishers.CombineLatest4<A, B, C, D>) -> Bool
}
extension Publishers.Merge : Equatable where A : Equatable, B : Equatable {
/// Returns a Boolean value that indicates whether two publishers are equivalent.
@@ -0,0 +1,205 @@
//
// AbstractCombineLatest.swift
//
//
// Created by Sergej Jaskiewicz on 10.12.2019.
//
internal class AbstractCombineLatest<Output, Failure, Downstream: Subscriber>
where Downstream.Input == Output, Downstream.Failure == Failure
{
private let downstream: Downstream
// TODO: The size of these arrays always stays the same.
// Maybe we can leverage ManagedBuffer/ManagedBufferPointer here
// to avoid additional allocations.
private var buffers: [Any?] // 0x78
private var subscriptions: [Subscription?] // 0x80
private var demand = Subscribers.Demand.none // 0x88
private var recursion = false // 0x90
private var finished = false // 0x98
private var errored = false // 0xA0
private var cancelled = false // 0xA8
private let upstreamCount: Int // 0xB0
private var finishCount = 0 // 0xB8
private let lock = UnfairLock.allocate() // 0xC0
private let downstreamLock = UnfairRecursiveLock.allocate() // 0xC8
internal init(downstream: Downstream, upstreamCount: Int) {
self.downstream = downstream
self.buffers = Array(repeating: nil, count: upstreamCount)
self.subscriptions = Array(repeating: nil, count: upstreamCount)
self.upstreamCount = upstreamCount
}
deinit {
lock.deallocate()
downstreamLock.deallocate()
}
// TODO: There should be more type-safe (and faster) way.
// E. g. what if we store `buffers` in subclasses?
internal func convert(values: [Any?]) -> Output {
abstractMethod()
}
fileprivate final func receive(subscription: Subscription, index: Int) {
lock.lock()
guard !cancelled && subscriptions[index] == nil else {
lock.unlock()
subscription.cancel()
return
}
subscriptions[index] = subscription
lock.unlock()
}
fileprivate final func receive(_ input: Any, index: Int) -> Subscribers.Demand {
lock.lock()
if cancelled || finished {
lock.unlock()
return .none
}
buffers[index] = input
guard !recursion && demand > 0 && buffers.allSatisfy({ $0 != nil }) else {
lock.unlock()
return .none
}
demand -= 1
recursion = true
lock.unlock()
downstreamLock.lock()
let newDemand = downstream.receive(convert(values: buffers))
downstreamLock.unlock()
lock.lock()
recursion = false
demand += newDemand
lock.unlock()
return .none
}
fileprivate final func receive(completion: Subscribers.Completion<Failure>,
index: Int) {
switch completion {
case .finished:
lock.lock()
if finished {
lock.unlock()
return
}
finishCount += 1
subscriptions[index] = nil
if finishCount == upstreamCount {
finished = true
buffers = Array(repeating: nil, count: upstreamCount)
lock.unlock()
downstreamLock.lock()
downstream.receive(completion: completion)
downstreamLock.unlock()
} else {
lock.unlock()
}
case .failure:
lock.lock()
finished = true
errored = true
let subscriptions = self.subscriptions
self.subscriptions = Array(repeating: nil, count: upstreamCount)
buffers = Array(repeating: nil, count: upstreamCount)
lock.unlock()
for (i, subscription) in subscriptions.enumerated() where i != index {
subscription?.cancel()
}
downstreamLock.lock()
downstream.receive(completion: completion)
downstreamLock.unlock()
}
}
}
extension AbstractCombineLatest: Subscription {
internal func request(_ demand: Subscribers.Demand) {
demand.assertNonZero() // TODO: Test this
lock.lock()
guard !cancelled && !finished else {
lock.unlock()
return
}
self.demand += demand
lock.unlock()
for subscription in subscriptions {
subscription?.request(demand)
}
}
internal func cancel() {
lock.lock()
cancelled = true
let subscriptions = self.subscriptions
self.subscriptions = Array(repeating: nil, count: upstreamCount)
buffers = Array(repeating: nil, count: upstreamCount)
lock.unlock()
for subscription in subscriptions {
subscription?.cancel()
}
}
}
extension AbstractCombineLatest: CustomStringConvertible {
internal var description: String { return "CombineLatest" }
}
extension AbstractCombineLatest: CustomReflectable {
internal var customMirror: Mirror {
lock.lock()
defer { lock.unlock() }
let children: [Mirror.Child] = [
("downstream", downstream),
("upstreamSubscriptions", subscriptions),
("demand", demand),
("buffers", buffers)
]
return Mirror(self, children: children)
}
}
extension AbstractCombineLatest: CustomPlaygroundDisplayConvertible {
internal final var playgroundDescription: Any { return description }
}
extension AbstractCombineLatest {
internal struct Side<Input>: Subscriber, CustomStringConvertible {
private let index: Int
private let combiner: AbstractCombineLatest
internal let combineIdentifier = CombineIdentifier()
internal init(index: Int, combiner: AbstractCombineLatest) {
self.index = index
self.combiner = combiner
}
internal func receive(subscription: Subscription) {
combiner.receive(subscription: subscription, index: index)
}
internal func receive(_ input: Input) -> Subscribers.Demand {
return combiner.receive(input, index: index)
}
internal func receive(completion: Subscribers.Completion<Failure>) {
combiner.receive(completion: completion, index: index)
}
internal var description: String { return "CombineLatest" }
}
}
@@ -0,0 +1,408 @@
//
//
// Auto-generated from GYB template. DO NOT EDIT!
//
//
//
//
// Publishers.CombineLatest.swift.gyb
//
//
// Created by Sergej Jaskiewicz on 10.12.2019.
//
// swiftlint:disable generic_type_name
// swiftlint:disable large_tuple
// MARK: - CombineLatest methods on Publisher
extension Publisher {
/// Subscribes to an additional publisher and publishes a tuple upon
/// receiving output from either publisher.
///
/// The combined publisher passes through any requests to *all* upstream publishers.
/// However, it still obeys the demand-fulfilling rule of only sending the request
/// amount downstream. If the demand isnt `.unlimited`, it drops values from upstream
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
/// holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
/// publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also
/// fails.
///
/// - Parameters:
/// - other: Another publisher to combine with this one.
/// - Returns: A publisher that receives and combines elements from this and another
/// publisher.
public func combineLatest<P: Publisher>(
_ other: P
) -> Publishers.CombineLatest<Self, P>
where Failure == P.Failure
{
return .init(self, other)
}
/// Subscribes to an additional publisher and invokes a closure
/// upon receiving output from either publisher.
///
/// The combined publisher passes through any requests to *all* upstream publishers.
/// However, it still obeys the demand-fulfilling rule of only sending the request
/// amount downstream. If the demand isnt `.unlimited`, it drops values from upstream
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
/// holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
/// publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also
/// fails.
///
/// - Parameters:
/// - other: Another publisher to combine with this one.
/// - transform: A closure that receives the most recent value from each publisher
/// and returns a new value to publish.
/// - Returns: A publisher that receives and combines elements from this and another
/// publisher.
public func combineLatest<P: Publisher, Result>(
_ other: P,
_ transform: @escaping (Output, P.Output) -> Result
) -> Publishers.Map<Publishers.CombineLatest<Self, P>, Result>
where Failure == P.Failure
{
return Publishers.CombineLatest(self, other).map {
transform($0, $1)
}
}
/// Subscribes to two additional publishers and publishes a tuple upon
/// receiving output from either publisher.
///
/// The combined publisher passes through any requests to *all* upstream publishers.
/// However, it still obeys the demand-fulfilling rule of only sending the request
/// amount downstream. If the demand isnt `.unlimited`, it drops values from upstream
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
/// holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
/// publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also
/// fails.
///
/// - Parameters:
/// - publisher1: A second publisher to combine with this one.
/// - publisher2: A third publisher to combine with this one.
/// - Returns: A publisher that receives and combines elements from this and another
/// publisher.
public func combineLatest<P: Publisher, Q: Publisher>(
_ publisher1: P,
_ publisher2: Q
) -> Publishers.CombineLatest3<Self, P, Q>
where Failure == P.Failure,
P.Failure == Q.Failure
{
return .init(self, publisher1, publisher2)
}
/// Subscribes to two additional publishers and invokes a closure
/// upon receiving output from either publisher.
///
/// The combined publisher passes through any requests to *all* upstream publishers.
/// However, it still obeys the demand-fulfilling rule of only sending the request
/// amount downstream. If the demand isnt `.unlimited`, it drops values from upstream
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
/// holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
/// publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also
/// fails.
///
/// - Parameters:
/// - publisher1: A second publisher to combine with this one.
/// - publisher2: A third publisher to combine with this one.
/// - transform: A closure that receives the most recent value from each publisher
/// and returns a new value to publish.
/// - Returns: A publisher that receives and combines elements from this and another
/// publisher.
public func combineLatest<P: Publisher, Q: Publisher, Result>(
_ publisher1: P,
_ publisher2: Q,
_ transform: @escaping (Output, P.Output, Q.Output) -> Result
) -> Publishers.Map<Publishers.CombineLatest3<Self, P, Q>, Result>
where Failure == P.Failure,
P.Failure == Q.Failure
{
return Publishers.CombineLatest3(self, publisher1, publisher2).map {
transform($0, $1, $2)
}
}
/// Subscribes to three additional publishers and publishes a tuple upon
/// receiving output from either publisher.
///
/// The combined publisher passes through any requests to *all* upstream publishers.
/// However, it still obeys the demand-fulfilling rule of only sending the request
/// amount downstream. If the demand isnt `.unlimited`, it drops values from upstream
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
/// holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
/// publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also
/// fails.
///
/// - Parameters:
/// - publisher1: A second publisher to combine with this one.
/// - publisher2: A third publisher to combine with this one.
/// - publisher3: A fourth publisher to combine with this one.
/// - Returns: A publisher that receives and combines elements from this and another
/// publisher.
public func combineLatest<P: Publisher, Q: Publisher, R: Publisher>(
_ publisher1: P,
_ publisher2: Q,
_ publisher3: R
) -> Publishers.CombineLatest4<Self, P, Q, R>
where Failure == P.Failure,
P.Failure == Q.Failure,
Q.Failure == R.Failure
{
return .init(self, publisher1, publisher2, publisher3)
}
/// Subscribes to three additional publishers and invokes a closure
/// upon receiving output from either publisher.
///
/// The combined publisher passes through any requests to *all* upstream publishers.
/// However, it still obeys the demand-fulfilling rule of only sending the request
/// amount downstream. If the demand isnt `.unlimited`, it drops values from upstream
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
/// holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
/// publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also
/// fails.
///
/// - Parameters:
/// - publisher1: A second publisher to combine with this one.
/// - publisher2: A third publisher to combine with this one.
/// - publisher3: A fourth publisher to combine with this one.
/// - transform: A closure that receives the most recent value from each publisher
/// and returns a new value to publish.
/// - Returns: A publisher that receives and combines elements from this and another
/// publisher.
public func combineLatest<P: Publisher, Q: Publisher, R: Publisher, Result>(
_ publisher1: P,
_ publisher2: Q,
_ publisher3: R,
_ transform: @escaping (Output, P.Output, Q.Output, R.Output) -> Result
) -> Publishers.Map<Publishers.CombineLatest4<Self, P, Q, R>, Result>
where Failure == P.Failure,
P.Failure == Q.Failure,
Q.Failure == R.Failure
{
return Publishers.CombineLatest4(self, publisher1, publisher2, publisher3).map {
transform($0, $1, $2, $3)
}
}
}
// MARK: - CombineLatest publishers
extension Publishers {
/// A publisher that receives and combines the latest elements from two
/// publishers.
public struct CombineLatest<A: Publisher, B: Publisher>
: Publisher
where A.Failure == B.Failure
{
public typealias Output = (A.Output, B.Output)
public typealias Failure = A.Failure
public let a: A
public let b: B
public init(
_ a: A,
_ b: B
) {
self.a = a
self.b = b
}
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Downstream.Failure == Failure,
Downstream.Input == Output
{
typealias Inner = CombineLatest2Inner<A.Output,
B.Output,
Failure,
Downstream>
let inner = Inner(downstream: subscriber, upstreamCount: 2)
a.subscribe(Inner.Side(index: 0, combiner: inner))
b.subscribe(Inner.Side(index: 1, combiner: inner))
subscriber.receive(subscription: inner)
}
}
/// A publisher that receives and combines the latest elements from three
/// publishers.
public struct CombineLatest3<A: Publisher, B: Publisher, C: Publisher>
: Publisher
where A.Failure == B.Failure,
B.Failure == C.Failure
{
public typealias Output = (A.Output, B.Output, C.Output)
public typealias Failure = A.Failure
public let a: A
public let b: B
public let c: C
public init(
_ a: A,
_ b: B,
_ c: C
) {
self.a = a
self.b = b
self.c = c
}
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Downstream.Failure == Failure,
Downstream.Input == Output
{
typealias Inner = CombineLatest3Inner<A.Output,
B.Output,
C.Output,
Failure,
Downstream>
let inner = Inner(downstream: subscriber, upstreamCount: 3)
a.subscribe(Inner.Side(index: 0, combiner: inner))
b.subscribe(Inner.Side(index: 1, combiner: inner))
c.subscribe(Inner.Side(index: 2, combiner: inner))
subscriber.receive(subscription: inner)
}
}
/// A publisher that receives and combines the latest elements from four
/// publishers.
public struct CombineLatest4<A: Publisher, B: Publisher, C: Publisher, D: Publisher>
: Publisher
where A.Failure == B.Failure,
B.Failure == C.Failure,
C.Failure == D.Failure
{
public typealias Output = (A.Output, B.Output, C.Output, D.Output)
public typealias Failure = A.Failure
public let a: A
public let b: B
public let c: C
public let d: D
public init(
_ a: A,
_ b: B,
_ c: C,
_ d: D
) {
self.a = a
self.b = b
self.c = c
self.d = d
}
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Downstream.Failure == Failure,
Downstream.Input == Output
{
typealias Inner = CombineLatest4Inner<A.Output,
B.Output,
C.Output,
D.Output,
Failure,
Downstream>
let inner = Inner(downstream: subscriber, upstreamCount: 4)
a.subscribe(Inner.Side(index: 0, combiner: inner))
b.subscribe(Inner.Side(index: 1, combiner: inner))
c.subscribe(Inner.Side(index: 2, combiner: inner))
d.subscribe(Inner.Side(index: 3, combiner: inner))
subscriber.receive(subscription: inner)
}
}
}
// MARK: - Equatable conformances
extension Publishers.CombineLatest: Equatable
where
A: Equatable,
B: Equatable {}
extension Publishers.CombineLatest3: Equatable
where
A: Equatable,
B: Equatable,
C: Equatable {}
extension Publishers.CombineLatest4: Equatable
where
A: Equatable,
B: Equatable,
C: Equatable,
D: Equatable {}
// MARK: - Inners
private final class CombineLatest2Inner<Input0,
Input1,
Failure,
Downstream: Subscriber>
: AbstractCombineLatest<(Input0, Input1), Failure, Downstream>
where Downstream.Input == (Input0, Input1),
Downstream.Failure == Failure
{
override func convert(values: [Any?]) -> (Input0, Input1) {
return (values[0] as! Input0,
values[1] as! Input1)
}
}
private final class CombineLatest3Inner<Input0,
Input1,
Input2,
Failure,
Downstream: Subscriber>
: AbstractCombineLatest<(Input0, Input1, Input2), Failure, Downstream>
where Downstream.Input == (Input0, Input1, Input2),
Downstream.Failure == Failure
{
override func convert(values: [Any?]) -> (Input0, Input1, Input2) {
return (values[0] as! Input0,
values[1] as! Input1,
values[2] as! Input2)
}
}
private final class CombineLatest4Inner<Input0,
Input1,
Input2,
Input3,
Failure,
Downstream: Subscriber>
: AbstractCombineLatest<(Input0, Input1, Input2, Input3), Failure, Downstream>
where Downstream.Input == (Input0, Input1, Input2, Input3),
Downstream.Failure == Failure
{
override func convert(values: [Any?]) -> (Input0, Input1, Input2, Input3) {
return (values[0] as! Input0,
values[1] as! Input1,
values[2] as! Input2,
values[3] as! Input3)
}
}
@@ -0,0 +1,268 @@
${template_header}
//
// Publishers.CombineLatest.swift.gyb
//
//
// Created by Sergej Jaskiewicz on 10.12.2019.
//
%{
from gyb_opencombine_support import (
suffix_variadic,
list_with_suffix_variadic,
indent
)
import string
instantiations = [(2, 'two', 'A second'),
(3, 'three', 'A third'),
(4, 'four', 'A fourth')]
def make_publisher_name(arity):
return suffix_variadic('CombineLatest', arity, arity - 1)
def make_upstream_types(arity, start=0):
return [str(c) for c in string.ascii_uppercase[start:(start + arity)]]
def make_upstream_generic_constraints(upstream_types, first_is_self=False):
format_string = '{0}Failure == {1}.Failure'
def format(i):
return format_string.format(upstream_types[i] + '.',
upstream_types[i + 1])
result = [format(i) for i in range(len(upstream_types) - 1)]
if first_is_self:
result.insert(0, format_string.format('', upstream_types[0]))
return result
def declare_combine_latest_method(arity, transform):
arg_count = arity - 1
declaration_format = """\
public func combineLatest<{}>(
{}
) -> {}
where {}\
"""
upstream_types = make_upstream_types(arg_count, 15)
method_generic_params = \
[upstream_type + ': Publisher' for upstream_type in upstream_types]
if transform:
method_generic_params.append('Result')
cs_method_generic_params = ', '.join(method_generic_params)
method_args = ['_ other: P'] if arg_count == 1 \
else ['_ publisher{}: {}'.format(i + 1, upstream_types[i]) \
for i in range(arg_count)]
if transform:
output_types = ['Output'] + ['{}.Output'.format(upstream_type) \
for upstream_type in upstream_types]
cs_output_types = ', '.join(output_types)
method_args \
.append('_ transform: @escaping ({}) -> Result'.format(cs_output_types))
cs_method_args = ',\n '.join(method_args)
publisher_generic_params = ['Self'] + upstream_types
cs_publisher_generic_params = ', '.join(publisher_generic_params)
publisher_name = 'Publishers.{}<{}>'.format(make_publisher_name(arity),
cs_publisher_generic_params)
if transform:
publisher_name = 'Publishers.Map<{}, Result>'.format(publisher_name)
generic_constraints = make_upstream_generic_constraints(upstream_types,
first_is_self=True)
cs_generic_constraints = ',\n '.join(generic_constraints)
declaration = declaration_format.format(cs_method_generic_params,
cs_method_args,
publisher_name,
cs_generic_constraints)
return indent(declaration, 4)
}%
// swiftlint:disable generic_type_name
// swiftlint:disable large_tuple
// MARK: - CombineLatest methods on Publisher
extension Publisher {
% for arity, _, _ in instantiations:
%
% argument_names = ['other'] \
% if arity == 2 else ['publisher{}'.format(i) for i in range(1, arity)]
% doc_cardinal = 'an additional publisher' if arity == 2 \
% else '{} additional publishers'.format(instantiations[arity - 3][1])
/// Subscribes to ${doc_cardinal} and publishes a tuple upon
/// receiving output from either publisher.
///
/// The combined publisher passes through any requests to *all* upstream publishers.
/// However, it still obeys the demand-fulfilling rule of only sending the request
/// amount downstream. If the demand isnt `.unlimited`, it drops values from upstream
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
/// holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
/// publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also
/// fails.
///
/// - Parameters:
% for i in range(arity - 1):
% param_doc = 'Another' if arity == 2 else instantiations[i][2]
/// - ${argument_names[i]}: ${param_doc} publisher to combine with this one.
% end
/// - Returns: A publisher that receives and combines elements from this and another
/// publisher.
${declare_combine_latest_method(arity, transform=False)}
{
return .init(self, ${', '.join(argument_names)})
}
/// Subscribes to ${doc_cardinal} and invokes a closure
/// upon receiving output from either publisher.
///
/// The combined publisher passes through any requests to *all* upstream publishers.
/// However, it still obeys the demand-fulfilling rule of only sending the request
/// amount downstream. If the demand isnt `.unlimited`, it drops values from upstream
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
/// holds the most recent value in each buffer.
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
/// publisher never publishes a value, this publisher never finishes.
/// If any of the combined publishers terminates with a failure, this publisher also
/// fails.
///
/// - Parameters:
% for i in range(arity - 1):
% param_doc = 'Another' if arity == 2 else instantiations[i][2]
/// - ${argument_names[i]}: ${param_doc} publisher to combine with this one.
% end
/// - transform: A closure that receives the most recent value from each publisher
/// and returns a new value to publish.
/// - Returns: A publisher that receives and combines elements from this and another
/// publisher.
${declare_combine_latest_method(arity, transform=True)}
{
% publisher_name = make_publisher_name(arity)
return Publishers.${publisher_name}(self, ${', '.join(argument_names)}).map {
transform(${', '.join(['${}'.format(i) for i in range(arity)])})
}
}
% end
}
// MARK: - CombineLatest publishers
extension Publishers {
% for arity, cardinal, _ in instantiations:
% publisher_name = make_publisher_name(arity)
% upstream_types = make_upstream_types(arity)
%
% upstream_generic_params = \
% [upstream_type + ': Publisher' for upstream_type in upstream_types]
%
% cs_upstream_generic_params = ', '.join(upstream_generic_params)
%
% output_types = [upstream_type + '.Output' for upstream_type in upstream_types]
%
% cs_output_types = ', '.join(output_types)
%
% upstream_generic_constraints = \
% make_upstream_generic_constraints(upstream_types)
%
% cs_upstream_generic_constraints = \
% ',\n '.join(upstream_generic_constraints)
%
% init_args = ['_ {}: {}'.format(upstream_type.lower(), upstream_type) \
% for upstream_type in upstream_types]
% cs_init_args = ',\n '.join(init_args)
%
% self_fields = [upstream_type.lower() for upstream_type in upstream_types]
/// A publisher that receives and combines the latest elements from ${cardinal}
/// publishers.
public struct ${publisher_name}<${cs_upstream_generic_params}>
: Publisher
where ${cs_upstream_generic_constraints}
{
public typealias Output = (${cs_output_types})
public typealias Failure = ${upstream_types[0]}.Failure
% for upstream_type in upstream_types:
public let ${upstream_type.lower()}: ${upstream_type}
% end
public init(
${cs_init_args}
) {
% for self_field in self_fields:
self.${self_field} = ${self_field}
% end
}
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Downstream.Failure == Failure,
Downstream.Input == Output
{
% cs_indented_output_types = (',\n' + (50 * ' ')).join(output_types)
typealias Inner = CombineLatest${arity}Inner<${cs_indented_output_types},
Failure,
Downstream>
let inner = Inner(downstream: subscriber, upstreamCount: ${arity})
% for i in range(arity):
${self_fields[i]}.subscribe(Inner.Side(index: ${i}, combiner: inner))
% end
subscriber.receive(subscription: inner)
}
}
% end
}
// MARK: - Equatable conformances
% for arity, _, _ in instantiations:
%
% publisher_name = make_publisher_name(arity)
%
% upstream_types = make_upstream_types(arity)
%
% constraints = [upstream_type + ': Equatable' for upstream_type in upstream_types]
% cs_constraints = ',\n'.join(constraints)
% cs_constraints = indent(cs_constraints, 8)
%
extension Publishers.${publisher_name}: Equatable
where
${cs_constraints} {}
% end
// MARK: - Inners
% for arity, _, _ in instantiations:
%
% publisher_name = make_publisher_name(arity)
%
% upstream_types = make_upstream_types(arity)
%
% input_types = ['Input{}'.format(i) for i in range(arity)]
%
% converters = ['values[{}] as! {}'.format(i, input_types[i]) for i in range(arity)]
% output_type = '({})'.format(', '.join(input_types))
private final class CombineLatest${arity}Inner<${(',\n' + (40 * ' ')).join(input_types)},
Failure,
Downstream: Subscriber>
: AbstractCombineLatest<${output_type}, Failure, Downstream>
where Downstream.Input == ${output_type},
Downstream.Failure == Failure
{
override func convert(values: [Any?]) -> (${', '.join(input_types)}) {
return (${',\n '.join(converters)})
}
}
% end
+4
View File
@@ -3,3 +3,7 @@ def suffix_variadic(name, index, arity):
def list_with_suffix_variadic(name, arity):
return [suffix_variadic(name, i, arity) for i in range(arity)]
def indent(input, space_count):
padding = space_count * ' '
return ''.join(padding + line for line in input.splitlines(True))