Implement CombineLatest
This commit is contained in:
@@ -2,165 +2,6 @@
|
||||
// Please remove the corresponding piece from this file if you implement something,
|
||||
// and complement this file as features are added in Apple's Combine
|
||||
|
||||
extension Publishers {
|
||||
|
||||
/// A publisher that receives and combines the latest elements from two publishers.
|
||||
public struct CombineLatest<A, B> : Publisher where A : Publisher, B : Publisher, A.Failure == B.Failure {
|
||||
|
||||
/// The kind of values published by this publisher.
|
||||
public typealias Output = (A.Output, B.Output)
|
||||
|
||||
/// The kind of errors this publisher might publish.
|
||||
///
|
||||
/// Use `Never` if this `Publisher` does not publish errors.
|
||||
public typealias Failure = A.Failure
|
||||
|
||||
public let a: A
|
||||
|
||||
public let b: B
|
||||
|
||||
public init(_ a: A, _ b: B)
|
||||
|
||||
/// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
|
||||
///
|
||||
/// - SeeAlso: `subscribe(_:)`
|
||||
/// - Parameters:
|
||||
/// - subscriber: The subscriber to attach to this `Publisher`.
|
||||
/// once attached it can begin to receive values.
|
||||
public func receive<S>(subscriber: S) where S : Subscriber, B.Failure == S.Failure, S.Input == (A.Output, B.Output)
|
||||
}
|
||||
|
||||
/// A publisher that receives and combines the latest elements from three publishers.
|
||||
public struct CombineLatest3<A, B, C> : Publisher where A : Publisher, B : Publisher, C : Publisher, A.Failure == B.Failure, B.Failure == C.Failure {
|
||||
|
||||
/// The kind of values published by this publisher.
|
||||
public typealias Output = (A.Output, B.Output, C.Output)
|
||||
|
||||
/// The kind of errors this publisher might publish.
|
||||
///
|
||||
/// Use `Never` if this `Publisher` does not publish errors.
|
||||
public typealias Failure = A.Failure
|
||||
|
||||
public let a: A
|
||||
|
||||
public let b: B
|
||||
|
||||
public let c: C
|
||||
|
||||
public init(_ a: A, _ b: B, _ c: C)
|
||||
|
||||
/// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
|
||||
///
|
||||
/// - SeeAlso: `subscribe(_:)`
|
||||
/// - Parameters:
|
||||
/// - subscriber: The subscriber to attach to this `Publisher`.
|
||||
/// once attached it can begin to receive values.
|
||||
public func receive<S>(subscriber: S) where S : Subscriber, C.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output)
|
||||
}
|
||||
|
||||
/// A publisher that receives and combines the latest elements from four publishers.
|
||||
public struct CombineLatest4<A, B, C, D> : Publisher where A : Publisher, B : Publisher, C : Publisher, D : Publisher, A.Failure == B.Failure, B.Failure == C.Failure, C.Failure == D.Failure {
|
||||
|
||||
/// The kind of values published by this publisher.
|
||||
public typealias Output = (A.Output, B.Output, C.Output, D.Output)
|
||||
|
||||
/// The kind of errors this publisher might publish.
|
||||
///
|
||||
/// Use `Never` if this `Publisher` does not publish errors.
|
||||
public typealias Failure = A.Failure
|
||||
|
||||
public let a: A
|
||||
|
||||
public let b: B
|
||||
|
||||
public let c: C
|
||||
|
||||
public let d: D
|
||||
|
||||
public init(_ a: A, _ b: B, _ c: C, _ d: D)
|
||||
|
||||
/// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
|
||||
///
|
||||
/// - SeeAlso: `subscribe(_:)`
|
||||
/// - Parameters:
|
||||
/// - subscriber: The subscriber to attach to this `Publisher`.
|
||||
/// once attached it can begin to receive values.
|
||||
public func receive<S>(subscriber: S) where S : Subscriber, D.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output, D.Output)
|
||||
}
|
||||
}
|
||||
|
||||
extension Publisher {
|
||||
|
||||
/// Subscribes to an additional publisher and publishes a tuple upon receiving output from either publisher.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finsh. If an upstream publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also fails.
|
||||
/// - Parameters:
|
||||
/// - other: Another publisher to combine with this one.
|
||||
/// - Returns: A publisher that receives and combines elements from this and another publisher.
|
||||
public func combineLatest<P>(_ other: P) -> Publishers.CombineLatest<Self, P> where P : Publisher, Self.Failure == P.Failure
|
||||
|
||||
/// Subscribes to an additional publisher and invokes a closure upon receiving output from either publisher.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finsh. If an upstream publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also fails.
|
||||
/// - Parameters:
|
||||
/// - other: Another publisher to combine with this one.
|
||||
/// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish.
|
||||
/// - Returns: A publisher that receives and combines elements from this and another publisher.
|
||||
public func combineLatest<P, T>(_ other: P, _ transform: @escaping (Self.Output, P.Output) -> T) -> Publishers.Map<Publishers.CombineLatest<Self, P>, T> where P : Publisher, Self.Failure == P.Failure
|
||||
|
||||
/// Subscribes to two additional publishers and publishes a tuple upon receiving output from any of the publishers.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also fails.
|
||||
/// - Parameters:
|
||||
/// - publisher1: A second publisher to combine with this one.
|
||||
/// - publisher2: A third publisher to combine with this one.
|
||||
/// - Returns: A publisher that receives and combines elements from this publisher and two other publishers.
|
||||
public func combineLatest<P, Q>(_ publisher1: P, _ publisher2: Q) -> Publishers.CombineLatest3<Self, P, Q> where P : Publisher, Q : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure
|
||||
|
||||
/// Subscribes to two additional publishers and invokes a closure upon receiving output from any of the publishers.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also fails.
|
||||
/// - Parameters:
|
||||
/// - publisher1: A second publisher to combine with this one.
|
||||
/// - publisher2: A third publisher to combine with this one.
|
||||
/// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish.
|
||||
/// - Returns: A publisher that receives and combines elements from this publisher and two other publishers.
|
||||
public func combineLatest<P, Q, T>(_ publisher1: P, _ publisher2: Q, _ transform: @escaping (Self.Output, P.Output, Q.Output) -> T) -> Publishers.Map<Publishers.CombineLatest3<Self, P, Q>, T> where P : Publisher, Q : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure
|
||||
|
||||
/// Subscribes to three additional publishers and publishes a tuple upon receiving output from any of the publishers.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also fails.
|
||||
/// - Parameters:
|
||||
/// - publisher1: A second publisher to combine with this one.
|
||||
/// - publisher2: A third publisher to combine with this one.
|
||||
/// - publisher3: A fourth publisher to combine with this one.
|
||||
/// - Returns: A publisher that receives and combines elements from this publisher and three other publishers.
|
||||
public func combineLatest<P, Q, R>(_ publisher1: P, _ publisher2: Q, _ publisher3: R) -> Publishers.CombineLatest4<Self, P, Q, R> where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure
|
||||
|
||||
/// Subscribes to three additional publishers and invokes a closure upon receiving output from any of the publishers.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also fails.
|
||||
/// - Parameters:
|
||||
/// - publisher1: A second publisher to combine with this one.
|
||||
/// - publisher2: A third publisher to combine with this one.
|
||||
/// - publisher3: A fourth publisher to combine with this one.
|
||||
/// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish.
|
||||
/// - Returns: A publisher that receives and combines elements from this publisher and three other publishers.
|
||||
public func combineLatest<P, Q, R, T>(_ publisher1: P, _ publisher2: Q, _ publisher3: R, _ transform: @escaping (Self.Output, P.Output, Q.Output, R.Output) -> T) -> Publishers.Map<Publishers.CombineLatest4<Self, P, Q, R>, T> where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure
|
||||
}
|
||||
|
||||
extension Publishers {
|
||||
|
||||
/// A strategy for collecting received elements.
|
||||
@@ -797,43 +638,6 @@ extension Publisher {
|
||||
public func zip<P, Q, R, T>(_ publisher1: P, _ publisher2: Q, _ publisher3: R, _ transform: @escaping (Self.Output, P.Output, Q.Output, R.Output) -> T) -> Publishers.Map<Publishers.Zip4<Self, P, Q, R>, T> where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure
|
||||
}
|
||||
|
||||
extension Publishers.CombineLatest : Equatable where A : Equatable, B : Equatable {
|
||||
|
||||
/// Returns a Boolean value that indicates whether two publishers are equivalent.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - lhs: A combineLatest publisher to compare for equality.
|
||||
/// - rhs: Another combineLatest publisher to compare for equality.
|
||||
/// - Returns: `true` if the corresponding upstream publishers of each combineLatest publisher are equal, `false` otherwise.
|
||||
public static func == (lhs: Publishers.CombineLatest<A, B>, rhs: Publishers.CombineLatest<A, B>) -> Bool
|
||||
}
|
||||
|
||||
extension Publishers.CombineLatest3 : Equatable where A : Equatable, B : Equatable, C : Equatable {
|
||||
|
||||
/// Returns a Boolean value indicating whether two values are equal.
|
||||
///
|
||||
/// Equality is the inverse of inequality. For any values `a` and `b`,
|
||||
/// `a == b` implies that `a != b` is `false`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - lhs: A value to compare.
|
||||
/// - rhs: Another value to compare.
|
||||
public static func == (lhs: Publishers.CombineLatest3<A, B, C>, rhs: Publishers.CombineLatest3<A, B, C>) -> Bool
|
||||
}
|
||||
|
||||
extension Publishers.CombineLatest4 : Equatable where A : Equatable, B : Equatable, C : Equatable, D : Equatable {
|
||||
|
||||
/// Returns a Boolean value indicating whether two values are equal.
|
||||
///
|
||||
/// Equality is the inverse of inequality. For any values `a` and `b`,
|
||||
/// `a == b` implies that `a != b` is `false`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - lhs: A value to compare.
|
||||
/// - rhs: Another value to compare.
|
||||
public static func == (lhs: Publishers.CombineLatest4<A, B, C, D>, rhs: Publishers.CombineLatest4<A, B, C, D>) -> Bool
|
||||
}
|
||||
|
||||
extension Publishers.Merge : Equatable where A : Equatable, B : Equatable {
|
||||
|
||||
/// Returns a Boolean value that indicates whether two publishers are equivalent.
|
||||
|
||||
@@ -0,0 +1,205 @@
|
||||
//
|
||||
// AbstractCombineLatest.swift
|
||||
//
|
||||
//
|
||||
// Created by Sergej Jaskiewicz on 10.12.2019.
|
||||
//
|
||||
|
||||
internal class AbstractCombineLatest<Output, Failure, Downstream: Subscriber>
|
||||
where Downstream.Input == Output, Downstream.Failure == Failure
|
||||
{
|
||||
private let downstream: Downstream
|
||||
|
||||
// TODO: The size of these arrays always stays the same.
|
||||
// Maybe we can leverage ManagedBuffer/ManagedBufferPointer here
|
||||
// to avoid additional allocations.
|
||||
private var buffers: [Any?] // 0x78
|
||||
private var subscriptions: [Subscription?] // 0x80
|
||||
|
||||
private var demand = Subscribers.Demand.none // 0x88
|
||||
|
||||
private var recursion = false // 0x90
|
||||
|
||||
private var finished = false // 0x98
|
||||
|
||||
private var errored = false // 0xA0
|
||||
|
||||
private var cancelled = false // 0xA8
|
||||
|
||||
private let upstreamCount: Int // 0xB0
|
||||
|
||||
private var finishCount = 0 // 0xB8
|
||||
|
||||
private let lock = UnfairLock.allocate() // 0xC0
|
||||
|
||||
private let downstreamLock = UnfairRecursiveLock.allocate() // 0xC8
|
||||
|
||||
internal init(downstream: Downstream, upstreamCount: Int) {
|
||||
self.downstream = downstream
|
||||
self.buffers = Array(repeating: nil, count: upstreamCount)
|
||||
self.subscriptions = Array(repeating: nil, count: upstreamCount)
|
||||
self.upstreamCount = upstreamCount
|
||||
}
|
||||
|
||||
deinit {
|
||||
lock.deallocate()
|
||||
downstreamLock.deallocate()
|
||||
}
|
||||
|
||||
// TODO: There should be more type-safe (and faster) way.
|
||||
// E. g. what if we store `buffers` in subclasses?
|
||||
internal func convert(values: [Any?]) -> Output {
|
||||
abstractMethod()
|
||||
}
|
||||
|
||||
fileprivate final func receive(subscription: Subscription, index: Int) {
|
||||
lock.lock()
|
||||
guard !cancelled && subscriptions[index] == nil else {
|
||||
lock.unlock()
|
||||
subscription.cancel()
|
||||
return
|
||||
}
|
||||
subscriptions[index] = subscription
|
||||
lock.unlock()
|
||||
}
|
||||
|
||||
fileprivate final func receive(_ input: Any, index: Int) -> Subscribers.Demand {
|
||||
lock.lock()
|
||||
if cancelled || finished {
|
||||
lock.unlock()
|
||||
return .none
|
||||
}
|
||||
buffers[index] = input
|
||||
guard !recursion && demand > 0 && buffers.allSatisfy({ $0 != nil }) else {
|
||||
lock.unlock()
|
||||
return .none
|
||||
}
|
||||
demand -= 1
|
||||
recursion = true
|
||||
lock.unlock()
|
||||
downstreamLock.lock()
|
||||
let newDemand = downstream.receive(convert(values: buffers))
|
||||
downstreamLock.unlock()
|
||||
lock.lock()
|
||||
recursion = false
|
||||
demand += newDemand
|
||||
lock.unlock()
|
||||
return .none
|
||||
}
|
||||
|
||||
fileprivate final func receive(completion: Subscribers.Completion<Failure>,
|
||||
index: Int) {
|
||||
switch completion {
|
||||
case .finished:
|
||||
lock.lock()
|
||||
if finished {
|
||||
lock.unlock()
|
||||
return
|
||||
}
|
||||
finishCount += 1
|
||||
subscriptions[index] = nil
|
||||
if finishCount == upstreamCount {
|
||||
finished = true
|
||||
buffers = Array(repeating: nil, count: upstreamCount)
|
||||
lock.unlock()
|
||||
downstreamLock.lock()
|
||||
downstream.receive(completion: completion)
|
||||
downstreamLock.unlock()
|
||||
} else {
|
||||
lock.unlock()
|
||||
}
|
||||
case .failure:
|
||||
lock.lock()
|
||||
finished = true
|
||||
errored = true
|
||||
let subscriptions = self.subscriptions
|
||||
self.subscriptions = Array(repeating: nil, count: upstreamCount)
|
||||
buffers = Array(repeating: nil, count: upstreamCount)
|
||||
lock.unlock()
|
||||
for (i, subscription) in subscriptions.enumerated() where i != index {
|
||||
subscription?.cancel()
|
||||
}
|
||||
downstreamLock.lock()
|
||||
downstream.receive(completion: completion)
|
||||
downstreamLock.unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension AbstractCombineLatest: Subscription {
|
||||
internal func request(_ demand: Subscribers.Demand) {
|
||||
demand.assertNonZero() // TODO: Test this
|
||||
lock.lock()
|
||||
guard !cancelled && !finished else {
|
||||
lock.unlock()
|
||||
return
|
||||
}
|
||||
self.demand += demand
|
||||
lock.unlock()
|
||||
for subscription in subscriptions {
|
||||
subscription?.request(demand)
|
||||
}
|
||||
}
|
||||
|
||||
internal func cancel() {
|
||||
lock.lock()
|
||||
cancelled = true
|
||||
let subscriptions = self.subscriptions
|
||||
self.subscriptions = Array(repeating: nil, count: upstreamCount)
|
||||
buffers = Array(repeating: nil, count: upstreamCount)
|
||||
lock.unlock()
|
||||
for subscription in subscriptions {
|
||||
subscription?.cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension AbstractCombineLatest: CustomStringConvertible {
|
||||
internal var description: String { return "CombineLatest" }
|
||||
}
|
||||
|
||||
extension AbstractCombineLatest: CustomReflectable {
|
||||
internal var customMirror: Mirror {
|
||||
lock.lock()
|
||||
defer { lock.unlock() }
|
||||
let children: [Mirror.Child] = [
|
||||
("downstream", downstream),
|
||||
("upstreamSubscriptions", subscriptions),
|
||||
("demand", demand),
|
||||
("buffers", buffers)
|
||||
]
|
||||
return Mirror(self, children: children)
|
||||
}
|
||||
}
|
||||
|
||||
extension AbstractCombineLatest: CustomPlaygroundDisplayConvertible {
|
||||
internal final var playgroundDescription: Any { return description }
|
||||
}
|
||||
|
||||
extension AbstractCombineLatest {
|
||||
internal struct Side<Input>: Subscriber, CustomStringConvertible {
|
||||
private let index: Int
|
||||
private let combiner: AbstractCombineLatest
|
||||
|
||||
internal let combineIdentifier = CombineIdentifier()
|
||||
|
||||
internal init(index: Int, combiner: AbstractCombineLatest) {
|
||||
self.index = index
|
||||
self.combiner = combiner
|
||||
}
|
||||
|
||||
internal func receive(subscription: Subscription) {
|
||||
combiner.receive(subscription: subscription, index: index)
|
||||
}
|
||||
|
||||
internal func receive(_ input: Input) -> Subscribers.Demand {
|
||||
return combiner.receive(input, index: index)
|
||||
}
|
||||
|
||||
internal func receive(completion: Subscribers.Completion<Failure>) {
|
||||
combiner.receive(completion: completion, index: index)
|
||||
}
|
||||
|
||||
internal var description: String { return "CombineLatest" }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,408 @@
|
||||
// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
|
||||
// ┃ ┃
|
||||
// ┃ Auto-generated from GYB template. DO NOT EDIT! ┃
|
||||
// ┃ ┃
|
||||
// ┃ ┃
|
||||
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
|
||||
//
|
||||
// Publishers.CombineLatest.swift.gyb
|
||||
//
|
||||
//
|
||||
// Created by Sergej Jaskiewicz on 10.12.2019.
|
||||
//
|
||||
|
||||
// swiftlint:disable generic_type_name
|
||||
// swiftlint:disable large_tuple
|
||||
|
||||
// MARK: - CombineLatest methods on Publisher
|
||||
|
||||
extension Publisher {
|
||||
|
||||
/// Subscribes to an additional publisher and publishes a tuple upon
|
||||
/// receiving output from either publisher.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||
/// holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||
/// publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||
/// fails.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - other: Another publisher to combine with this one.
|
||||
/// - Returns: A publisher that receives and combines elements from this and another
|
||||
/// publisher.
|
||||
public func combineLatest<P: Publisher>(
|
||||
_ other: P
|
||||
) -> Publishers.CombineLatest<Self, P>
|
||||
where Failure == P.Failure
|
||||
{
|
||||
return .init(self, other)
|
||||
}
|
||||
|
||||
/// Subscribes to an additional publisher and invokes a closure
|
||||
/// upon receiving output from either publisher.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||
/// holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||
/// publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||
/// fails.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - other: Another publisher to combine with this one.
|
||||
/// - transform: A closure that receives the most recent value from each publisher
|
||||
/// and returns a new value to publish.
|
||||
/// - Returns: A publisher that receives and combines elements from this and another
|
||||
/// publisher.
|
||||
public func combineLatest<P: Publisher, Result>(
|
||||
_ other: P,
|
||||
_ transform: @escaping (Output, P.Output) -> Result
|
||||
) -> Publishers.Map<Publishers.CombineLatest<Self, P>, Result>
|
||||
where Failure == P.Failure
|
||||
{
|
||||
return Publishers.CombineLatest(self, other).map {
|
||||
transform($0, $1)
|
||||
}
|
||||
}
|
||||
/// Subscribes to two additional publishers and publishes a tuple upon
|
||||
/// receiving output from either publisher.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||
/// holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||
/// publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||
/// fails.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - publisher1: A second publisher to combine with this one.
|
||||
/// - publisher2: A third publisher to combine with this one.
|
||||
/// - Returns: A publisher that receives and combines elements from this and another
|
||||
/// publisher.
|
||||
public func combineLatest<P: Publisher, Q: Publisher>(
|
||||
_ publisher1: P,
|
||||
_ publisher2: Q
|
||||
) -> Publishers.CombineLatest3<Self, P, Q>
|
||||
where Failure == P.Failure,
|
||||
P.Failure == Q.Failure
|
||||
{
|
||||
return .init(self, publisher1, publisher2)
|
||||
}
|
||||
|
||||
/// Subscribes to two additional publishers and invokes a closure
|
||||
/// upon receiving output from either publisher.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||
/// holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||
/// publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||
/// fails.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - publisher1: A second publisher to combine with this one.
|
||||
/// - publisher2: A third publisher to combine with this one.
|
||||
/// - transform: A closure that receives the most recent value from each publisher
|
||||
/// and returns a new value to publish.
|
||||
/// - Returns: A publisher that receives and combines elements from this and another
|
||||
/// publisher.
|
||||
public func combineLatest<P: Publisher, Q: Publisher, Result>(
|
||||
_ publisher1: P,
|
||||
_ publisher2: Q,
|
||||
_ transform: @escaping (Output, P.Output, Q.Output) -> Result
|
||||
) -> Publishers.Map<Publishers.CombineLatest3<Self, P, Q>, Result>
|
||||
where Failure == P.Failure,
|
||||
P.Failure == Q.Failure
|
||||
{
|
||||
return Publishers.CombineLatest3(self, publisher1, publisher2).map {
|
||||
transform($0, $1, $2)
|
||||
}
|
||||
}
|
||||
/// Subscribes to three additional publishers and publishes a tuple upon
|
||||
/// receiving output from either publisher.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||
/// holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||
/// publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||
/// fails.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - publisher1: A second publisher to combine with this one.
|
||||
/// - publisher2: A third publisher to combine with this one.
|
||||
/// - publisher3: A fourth publisher to combine with this one.
|
||||
/// - Returns: A publisher that receives and combines elements from this and another
|
||||
/// publisher.
|
||||
public func combineLatest<P: Publisher, Q: Publisher, R: Publisher>(
|
||||
_ publisher1: P,
|
||||
_ publisher2: Q,
|
||||
_ publisher3: R
|
||||
) -> Publishers.CombineLatest4<Self, P, Q, R>
|
||||
where Failure == P.Failure,
|
||||
P.Failure == Q.Failure,
|
||||
Q.Failure == R.Failure
|
||||
{
|
||||
return .init(self, publisher1, publisher2, publisher3)
|
||||
}
|
||||
|
||||
/// Subscribes to three additional publishers and invokes a closure
|
||||
/// upon receiving output from either publisher.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||
/// holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||
/// publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||
/// fails.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - publisher1: A second publisher to combine with this one.
|
||||
/// - publisher2: A third publisher to combine with this one.
|
||||
/// - publisher3: A fourth publisher to combine with this one.
|
||||
/// - transform: A closure that receives the most recent value from each publisher
|
||||
/// and returns a new value to publish.
|
||||
/// - Returns: A publisher that receives and combines elements from this and another
|
||||
/// publisher.
|
||||
public func combineLatest<P: Publisher, Q: Publisher, R: Publisher, Result>(
|
||||
_ publisher1: P,
|
||||
_ publisher2: Q,
|
||||
_ publisher3: R,
|
||||
_ transform: @escaping (Output, P.Output, Q.Output, R.Output) -> Result
|
||||
) -> Publishers.Map<Publishers.CombineLatest4<Self, P, Q, R>, Result>
|
||||
where Failure == P.Failure,
|
||||
P.Failure == Q.Failure,
|
||||
Q.Failure == R.Failure
|
||||
{
|
||||
return Publishers.CombineLatest4(self, publisher1, publisher2, publisher3).map {
|
||||
transform($0, $1, $2, $3)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - CombineLatest publishers
|
||||
|
||||
extension Publishers {
|
||||
|
||||
/// A publisher that receives and combines the latest elements from two
|
||||
/// publishers.
|
||||
public struct CombineLatest<A: Publisher, B: Publisher>
|
||||
: Publisher
|
||||
where A.Failure == B.Failure
|
||||
{
|
||||
public typealias Output = (A.Output, B.Output)
|
||||
|
||||
public typealias Failure = A.Failure
|
||||
|
||||
public let a: A
|
||||
|
||||
public let b: B
|
||||
|
||||
public init(
|
||||
_ a: A,
|
||||
_ b: B
|
||||
) {
|
||||
self.a = a
|
||||
self.b = b
|
||||
}
|
||||
|
||||
public func receive<Downstream: Subscriber>(subscriber: Downstream)
|
||||
where Downstream.Failure == Failure,
|
||||
Downstream.Input == Output
|
||||
{
|
||||
typealias Inner = CombineLatest2Inner<A.Output,
|
||||
B.Output,
|
||||
Failure,
|
||||
Downstream>
|
||||
let inner = Inner(downstream: subscriber, upstreamCount: 2)
|
||||
a.subscribe(Inner.Side(index: 0, combiner: inner))
|
||||
b.subscribe(Inner.Side(index: 1, combiner: inner))
|
||||
subscriber.receive(subscription: inner)
|
||||
}
|
||||
}
|
||||
|
||||
/// A publisher that receives and combines the latest elements from three
|
||||
/// publishers.
|
||||
public struct CombineLatest3<A: Publisher, B: Publisher, C: Publisher>
|
||||
: Publisher
|
||||
where A.Failure == B.Failure,
|
||||
B.Failure == C.Failure
|
||||
{
|
||||
public typealias Output = (A.Output, B.Output, C.Output)
|
||||
|
||||
public typealias Failure = A.Failure
|
||||
|
||||
public let a: A
|
||||
|
||||
public let b: B
|
||||
|
||||
public let c: C
|
||||
|
||||
public init(
|
||||
_ a: A,
|
||||
_ b: B,
|
||||
_ c: C
|
||||
) {
|
||||
self.a = a
|
||||
self.b = b
|
||||
self.c = c
|
||||
}
|
||||
|
||||
public func receive<Downstream: Subscriber>(subscriber: Downstream)
|
||||
where Downstream.Failure == Failure,
|
||||
Downstream.Input == Output
|
||||
{
|
||||
typealias Inner = CombineLatest3Inner<A.Output,
|
||||
B.Output,
|
||||
C.Output,
|
||||
Failure,
|
||||
Downstream>
|
||||
let inner = Inner(downstream: subscriber, upstreamCount: 3)
|
||||
a.subscribe(Inner.Side(index: 0, combiner: inner))
|
||||
b.subscribe(Inner.Side(index: 1, combiner: inner))
|
||||
c.subscribe(Inner.Side(index: 2, combiner: inner))
|
||||
subscriber.receive(subscription: inner)
|
||||
}
|
||||
}
|
||||
|
||||
/// A publisher that receives and combines the latest elements from four
|
||||
/// publishers.
|
||||
public struct CombineLatest4<A: Publisher, B: Publisher, C: Publisher, D: Publisher>
|
||||
: Publisher
|
||||
where A.Failure == B.Failure,
|
||||
B.Failure == C.Failure,
|
||||
C.Failure == D.Failure
|
||||
{
|
||||
public typealias Output = (A.Output, B.Output, C.Output, D.Output)
|
||||
|
||||
public typealias Failure = A.Failure
|
||||
|
||||
public let a: A
|
||||
|
||||
public let b: B
|
||||
|
||||
public let c: C
|
||||
|
||||
public let d: D
|
||||
|
||||
public init(
|
||||
_ a: A,
|
||||
_ b: B,
|
||||
_ c: C,
|
||||
_ d: D
|
||||
) {
|
||||
self.a = a
|
||||
self.b = b
|
||||
self.c = c
|
||||
self.d = d
|
||||
}
|
||||
|
||||
public func receive<Downstream: Subscriber>(subscriber: Downstream)
|
||||
where Downstream.Failure == Failure,
|
||||
Downstream.Input == Output
|
||||
{
|
||||
typealias Inner = CombineLatest4Inner<A.Output,
|
||||
B.Output,
|
||||
C.Output,
|
||||
D.Output,
|
||||
Failure,
|
||||
Downstream>
|
||||
let inner = Inner(downstream: subscriber, upstreamCount: 4)
|
||||
a.subscribe(Inner.Side(index: 0, combiner: inner))
|
||||
b.subscribe(Inner.Side(index: 1, combiner: inner))
|
||||
c.subscribe(Inner.Side(index: 2, combiner: inner))
|
||||
d.subscribe(Inner.Side(index: 3, combiner: inner))
|
||||
subscriber.receive(subscription: inner)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Equatable conformances
|
||||
|
||||
extension Publishers.CombineLatest: Equatable
|
||||
where
|
||||
A: Equatable,
|
||||
B: Equatable {}
|
||||
|
||||
extension Publishers.CombineLatest3: Equatable
|
||||
where
|
||||
A: Equatable,
|
||||
B: Equatable,
|
||||
C: Equatable {}
|
||||
|
||||
extension Publishers.CombineLatest4: Equatable
|
||||
where
|
||||
A: Equatable,
|
||||
B: Equatable,
|
||||
C: Equatable,
|
||||
D: Equatable {}
|
||||
|
||||
// MARK: - Inners
|
||||
|
||||
private final class CombineLatest2Inner<Input0,
|
||||
Input1,
|
||||
Failure,
|
||||
Downstream: Subscriber>
|
||||
: AbstractCombineLatest<(Input0, Input1), Failure, Downstream>
|
||||
where Downstream.Input == (Input0, Input1),
|
||||
Downstream.Failure == Failure
|
||||
{
|
||||
override func convert(values: [Any?]) -> (Input0, Input1) {
|
||||
return (values[0] as! Input0,
|
||||
values[1] as! Input1)
|
||||
}
|
||||
}
|
||||
|
||||
private final class CombineLatest3Inner<Input0,
|
||||
Input1,
|
||||
Input2,
|
||||
Failure,
|
||||
Downstream: Subscriber>
|
||||
: AbstractCombineLatest<(Input0, Input1, Input2), Failure, Downstream>
|
||||
where Downstream.Input == (Input0, Input1, Input2),
|
||||
Downstream.Failure == Failure
|
||||
{
|
||||
override func convert(values: [Any?]) -> (Input0, Input1, Input2) {
|
||||
return (values[0] as! Input0,
|
||||
values[1] as! Input1,
|
||||
values[2] as! Input2)
|
||||
}
|
||||
}
|
||||
|
||||
private final class CombineLatest4Inner<Input0,
|
||||
Input1,
|
||||
Input2,
|
||||
Input3,
|
||||
Failure,
|
||||
Downstream: Subscriber>
|
||||
: AbstractCombineLatest<(Input0, Input1, Input2, Input3), Failure, Downstream>
|
||||
where Downstream.Input == (Input0, Input1, Input2, Input3),
|
||||
Downstream.Failure == Failure
|
||||
{
|
||||
override func convert(values: [Any?]) -> (Input0, Input1, Input2, Input3) {
|
||||
return (values[0] as! Input0,
|
||||
values[1] as! Input1,
|
||||
values[2] as! Input2,
|
||||
values[3] as! Input3)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,268 @@
|
||||
${template_header}
|
||||
//
|
||||
// Publishers.CombineLatest.swift.gyb
|
||||
//
|
||||
//
|
||||
// Created by Sergej Jaskiewicz on 10.12.2019.
|
||||
//
|
||||
%{
|
||||
from gyb_opencombine_support import (
|
||||
suffix_variadic,
|
||||
list_with_suffix_variadic,
|
||||
indent
|
||||
)
|
||||
|
||||
import string
|
||||
|
||||
instantiations = [(2, 'two', 'A second'),
|
||||
(3, 'three', 'A third'),
|
||||
(4, 'four', 'A fourth')]
|
||||
|
||||
def make_publisher_name(arity):
|
||||
return suffix_variadic('CombineLatest', arity, arity - 1)
|
||||
|
||||
def make_upstream_types(arity, start=0):
|
||||
return [str(c) for c in string.ascii_uppercase[start:(start + arity)]]
|
||||
|
||||
def make_upstream_generic_constraints(upstream_types, first_is_self=False):
|
||||
|
||||
format_string = '{0}Failure == {1}.Failure'
|
||||
|
||||
def format(i):
|
||||
return format_string.format(upstream_types[i] + '.',
|
||||
upstream_types[i + 1])
|
||||
|
||||
result = [format(i) for i in range(len(upstream_types) - 1)]
|
||||
|
||||
if first_is_self:
|
||||
result.insert(0, format_string.format('', upstream_types[0]))
|
||||
|
||||
return result
|
||||
|
||||
def declare_combine_latest_method(arity, transform):
|
||||
arg_count = arity - 1
|
||||
declaration_format = """\
|
||||
public func combineLatest<{}>(
|
||||
{}
|
||||
) -> {}
|
||||
where {}\
|
||||
"""
|
||||
upstream_types = make_upstream_types(arg_count, 15)
|
||||
method_generic_params = \
|
||||
[upstream_type + ': Publisher' for upstream_type in upstream_types]
|
||||
if transform:
|
||||
method_generic_params.append('Result')
|
||||
cs_method_generic_params = ', '.join(method_generic_params)
|
||||
method_args = ['_ other: P'] if arg_count == 1 \
|
||||
else ['_ publisher{}: {}'.format(i + 1, upstream_types[i]) \
|
||||
for i in range(arg_count)]
|
||||
if transform:
|
||||
output_types = ['Output'] + ['{}.Output'.format(upstream_type) \
|
||||
for upstream_type in upstream_types]
|
||||
cs_output_types = ', '.join(output_types)
|
||||
method_args \
|
||||
.append('_ transform: @escaping ({}) -> Result'.format(cs_output_types))
|
||||
cs_method_args = ',\n '.join(method_args)
|
||||
|
||||
publisher_generic_params = ['Self'] + upstream_types
|
||||
|
||||
cs_publisher_generic_params = ', '.join(publisher_generic_params)
|
||||
|
||||
publisher_name = 'Publishers.{}<{}>'.format(make_publisher_name(arity),
|
||||
cs_publisher_generic_params)
|
||||
|
||||
if transform:
|
||||
publisher_name = 'Publishers.Map<{}, Result>'.format(publisher_name)
|
||||
|
||||
generic_constraints = make_upstream_generic_constraints(upstream_types,
|
||||
first_is_self=True)
|
||||
|
||||
cs_generic_constraints = ',\n '.join(generic_constraints)
|
||||
|
||||
declaration = declaration_format.format(cs_method_generic_params,
|
||||
cs_method_args,
|
||||
publisher_name,
|
||||
cs_generic_constraints)
|
||||
|
||||
return indent(declaration, 4)
|
||||
}%
|
||||
|
||||
// swiftlint:disable generic_type_name
|
||||
// swiftlint:disable large_tuple
|
||||
|
||||
// MARK: - CombineLatest methods on Publisher
|
||||
|
||||
extension Publisher {
|
||||
|
||||
% for arity, _, _ in instantiations:
|
||||
%
|
||||
% argument_names = ['other'] \
|
||||
% if arity == 2 else ['publisher{}'.format(i) for i in range(1, arity)]
|
||||
% doc_cardinal = 'an additional publisher' if arity == 2 \
|
||||
% else '{} additional publishers'.format(instantiations[arity - 3][1])
|
||||
/// Subscribes to ${doc_cardinal} and publishes a tuple upon
|
||||
/// receiving output from either publisher.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||
/// holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||
/// publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||
/// fails.
|
||||
///
|
||||
/// - Parameters:
|
||||
% for i in range(arity - 1):
|
||||
% param_doc = 'Another' if arity == 2 else instantiations[i][2]
|
||||
/// - ${argument_names[i]}: ${param_doc} publisher to combine with this one.
|
||||
% end
|
||||
/// - Returns: A publisher that receives and combines elements from this and another
|
||||
/// publisher.
|
||||
${declare_combine_latest_method(arity, transform=False)}
|
||||
{
|
||||
return .init(self, ${', '.join(argument_names)})
|
||||
}
|
||||
|
||||
/// Subscribes to ${doc_cardinal} and invokes a closure
|
||||
/// upon receiving output from either publisher.
|
||||
///
|
||||
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||
/// holds the most recent value in each buffer.
|
||||
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||
/// publisher never publishes a value, this publisher never finishes.
|
||||
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||
/// fails.
|
||||
///
|
||||
/// - Parameters:
|
||||
% for i in range(arity - 1):
|
||||
% param_doc = 'Another' if arity == 2 else instantiations[i][2]
|
||||
/// - ${argument_names[i]}: ${param_doc} publisher to combine with this one.
|
||||
% end
|
||||
/// - transform: A closure that receives the most recent value from each publisher
|
||||
/// and returns a new value to publish.
|
||||
/// - Returns: A publisher that receives and combines elements from this and another
|
||||
/// publisher.
|
||||
${declare_combine_latest_method(arity, transform=True)}
|
||||
{
|
||||
% publisher_name = make_publisher_name(arity)
|
||||
return Publishers.${publisher_name}(self, ${', '.join(argument_names)}).map {
|
||||
transform(${', '.join(['${}'.format(i) for i in range(arity)])})
|
||||
}
|
||||
}
|
||||
% end
|
||||
}
|
||||
|
||||
// MARK: - CombineLatest publishers
|
||||
|
||||
extension Publishers {
|
||||
% for arity, cardinal, _ in instantiations:
|
||||
% publisher_name = make_publisher_name(arity)
|
||||
% upstream_types = make_upstream_types(arity)
|
||||
%
|
||||
% upstream_generic_params = \
|
||||
% [upstream_type + ': Publisher' for upstream_type in upstream_types]
|
||||
%
|
||||
% cs_upstream_generic_params = ', '.join(upstream_generic_params)
|
||||
%
|
||||
% output_types = [upstream_type + '.Output' for upstream_type in upstream_types]
|
||||
%
|
||||
% cs_output_types = ', '.join(output_types)
|
||||
%
|
||||
% upstream_generic_constraints = \
|
||||
% make_upstream_generic_constraints(upstream_types)
|
||||
%
|
||||
% cs_upstream_generic_constraints = \
|
||||
% ',\n '.join(upstream_generic_constraints)
|
||||
%
|
||||
% init_args = ['_ {}: {}'.format(upstream_type.lower(), upstream_type) \
|
||||
% for upstream_type in upstream_types]
|
||||
% cs_init_args = ',\n '.join(init_args)
|
||||
%
|
||||
% self_fields = [upstream_type.lower() for upstream_type in upstream_types]
|
||||
|
||||
/// A publisher that receives and combines the latest elements from ${cardinal}
|
||||
/// publishers.
|
||||
public struct ${publisher_name}<${cs_upstream_generic_params}>
|
||||
: Publisher
|
||||
where ${cs_upstream_generic_constraints}
|
||||
{
|
||||
public typealias Output = (${cs_output_types})
|
||||
|
||||
public typealias Failure = ${upstream_types[0]}.Failure
|
||||
% for upstream_type in upstream_types:
|
||||
|
||||
public let ${upstream_type.lower()}: ${upstream_type}
|
||||
% end
|
||||
|
||||
public init(
|
||||
${cs_init_args}
|
||||
) {
|
||||
% for self_field in self_fields:
|
||||
self.${self_field} = ${self_field}
|
||||
% end
|
||||
}
|
||||
|
||||
public func receive<Downstream: Subscriber>(subscriber: Downstream)
|
||||
where Downstream.Failure == Failure,
|
||||
Downstream.Input == Output
|
||||
{
|
||||
% cs_indented_output_types = (',\n' + (50 * ' ')).join(output_types)
|
||||
typealias Inner = CombineLatest${arity}Inner<${cs_indented_output_types},
|
||||
Failure,
|
||||
Downstream>
|
||||
let inner = Inner(downstream: subscriber, upstreamCount: ${arity})
|
||||
% for i in range(arity):
|
||||
${self_fields[i]}.subscribe(Inner.Side(index: ${i}, combiner: inner))
|
||||
% end
|
||||
subscriber.receive(subscription: inner)
|
||||
}
|
||||
}
|
||||
% end
|
||||
}
|
||||
|
||||
// MARK: - Equatable conformances
|
||||
% for arity, _, _ in instantiations:
|
||||
%
|
||||
% publisher_name = make_publisher_name(arity)
|
||||
%
|
||||
% upstream_types = make_upstream_types(arity)
|
||||
%
|
||||
% constraints = [upstream_type + ': Equatable' for upstream_type in upstream_types]
|
||||
% cs_constraints = ',\n'.join(constraints)
|
||||
% cs_constraints = indent(cs_constraints, 8)
|
||||
%
|
||||
|
||||
extension Publishers.${publisher_name}: Equatable
|
||||
where
|
||||
${cs_constraints} {}
|
||||
% end
|
||||
|
||||
// MARK: - Inners
|
||||
% for arity, _, _ in instantiations:
|
||||
%
|
||||
% publisher_name = make_publisher_name(arity)
|
||||
%
|
||||
% upstream_types = make_upstream_types(arity)
|
||||
%
|
||||
% input_types = ['Input{}'.format(i) for i in range(arity)]
|
||||
%
|
||||
% converters = ['values[{}] as! {}'.format(i, input_types[i]) for i in range(arity)]
|
||||
% output_type = '({})'.format(', '.join(input_types))
|
||||
|
||||
private final class CombineLatest${arity}Inner<${(',\n' + (40 * ' ')).join(input_types)},
|
||||
Failure,
|
||||
Downstream: Subscriber>
|
||||
: AbstractCombineLatest<${output_type}, Failure, Downstream>
|
||||
where Downstream.Input == ${output_type},
|
||||
Downstream.Failure == Failure
|
||||
{
|
||||
override func convert(values: [Any?]) -> (${', '.join(input_types)}) {
|
||||
return (${',\n '.join(converters)})
|
||||
}
|
||||
}
|
||||
% end
|
||||
@@ -3,3 +3,7 @@ def suffix_variadic(name, index, arity):
|
||||
|
||||
def list_with_suffix_variadic(name, arity):
|
||||
return [suffix_variadic(name, i, arity) for i in range(arity)]
|
||||
|
||||
def indent(input, space_count):
|
||||
padding = space_count * ' '
|
||||
return ''.join(padding + line for line in input.splitlines(True))
|
||||
|
||||
Reference in New Issue
Block a user