Compare commits
1 Commits
master
...
combine-latest
| Author | SHA1 | Date | |
|---|---|---|---|
| 26e86a9905 |
@@ -2,165 +2,6 @@
|
|||||||
// Please remove the corresponding piece from this file if you implement something,
|
// Please remove the corresponding piece from this file if you implement something,
|
||||||
// and complement this file as features are added in Apple's Combine
|
// and complement this file as features are added in Apple's Combine
|
||||||
|
|
||||||
extension Publishers {
|
|
||||||
|
|
||||||
/// A publisher that receives and combines the latest elements from two publishers.
|
|
||||||
public struct CombineLatest<A, B> : Publisher where A : Publisher, B : Publisher, A.Failure == B.Failure {
|
|
||||||
|
|
||||||
/// The kind of values published by this publisher.
|
|
||||||
public typealias Output = (A.Output, B.Output)
|
|
||||||
|
|
||||||
/// The kind of errors this publisher might publish.
|
|
||||||
///
|
|
||||||
/// Use `Never` if this `Publisher` does not publish errors.
|
|
||||||
public typealias Failure = A.Failure
|
|
||||||
|
|
||||||
public let a: A
|
|
||||||
|
|
||||||
public let b: B
|
|
||||||
|
|
||||||
public init(_ a: A, _ b: B)
|
|
||||||
|
|
||||||
/// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
|
|
||||||
///
|
|
||||||
/// - SeeAlso: `subscribe(_:)`
|
|
||||||
/// - Parameters:
|
|
||||||
/// - subscriber: The subscriber to attach to this `Publisher`.
|
|
||||||
/// once attached it can begin to receive values.
|
|
||||||
public func receive<S>(subscriber: S) where S : Subscriber, B.Failure == S.Failure, S.Input == (A.Output, B.Output)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A publisher that receives and combines the latest elements from three publishers.
|
|
||||||
public struct CombineLatest3<A, B, C> : Publisher where A : Publisher, B : Publisher, C : Publisher, A.Failure == B.Failure, B.Failure == C.Failure {
|
|
||||||
|
|
||||||
/// The kind of values published by this publisher.
|
|
||||||
public typealias Output = (A.Output, B.Output, C.Output)
|
|
||||||
|
|
||||||
/// The kind of errors this publisher might publish.
|
|
||||||
///
|
|
||||||
/// Use `Never` if this `Publisher` does not publish errors.
|
|
||||||
public typealias Failure = A.Failure
|
|
||||||
|
|
||||||
public let a: A
|
|
||||||
|
|
||||||
public let b: B
|
|
||||||
|
|
||||||
public let c: C
|
|
||||||
|
|
||||||
public init(_ a: A, _ b: B, _ c: C)
|
|
||||||
|
|
||||||
/// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
|
|
||||||
///
|
|
||||||
/// - SeeAlso: `subscribe(_:)`
|
|
||||||
/// - Parameters:
|
|
||||||
/// - subscriber: The subscriber to attach to this `Publisher`.
|
|
||||||
/// once attached it can begin to receive values.
|
|
||||||
public func receive<S>(subscriber: S) where S : Subscriber, C.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A publisher that receives and combines the latest elements from four publishers.
|
|
||||||
public struct CombineLatest4<A, B, C, D> : Publisher where A : Publisher, B : Publisher, C : Publisher, D : Publisher, A.Failure == B.Failure, B.Failure == C.Failure, C.Failure == D.Failure {
|
|
||||||
|
|
||||||
/// The kind of values published by this publisher.
|
|
||||||
public typealias Output = (A.Output, B.Output, C.Output, D.Output)
|
|
||||||
|
|
||||||
/// The kind of errors this publisher might publish.
|
|
||||||
///
|
|
||||||
/// Use `Never` if this `Publisher` does not publish errors.
|
|
||||||
public typealias Failure = A.Failure
|
|
||||||
|
|
||||||
public let a: A
|
|
||||||
|
|
||||||
public let b: B
|
|
||||||
|
|
||||||
public let c: C
|
|
||||||
|
|
||||||
public let d: D
|
|
||||||
|
|
||||||
public init(_ a: A, _ b: B, _ c: C, _ d: D)
|
|
||||||
|
|
||||||
/// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
|
|
||||||
///
|
|
||||||
/// - SeeAlso: `subscribe(_:)`
|
|
||||||
/// - Parameters:
|
|
||||||
/// - subscriber: The subscriber to attach to this `Publisher`.
|
|
||||||
/// once attached it can begin to receive values.
|
|
||||||
public func receive<S>(subscriber: S) where S : Subscriber, D.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output, D.Output)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
extension Publisher {
|
|
||||||
|
|
||||||
/// Subscribes to an additional publisher and publishes a tuple upon receiving output from either publisher.
|
|
||||||
///
|
|
||||||
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
|
|
||||||
/// All upstream publishers need to finish for this publisher to finsh. If an upstream publisher never publishes a value, this publisher never finishes.
|
|
||||||
/// If any of the combined publishers terminates with a failure, this publisher also fails.
|
|
||||||
/// - Parameters:
|
|
||||||
/// - other: Another publisher to combine with this one.
|
|
||||||
/// - Returns: A publisher that receives and combines elements from this and another publisher.
|
|
||||||
public func combineLatest<P>(_ other: P) -> Publishers.CombineLatest<Self, P> where P : Publisher, Self.Failure == P.Failure
|
|
||||||
|
|
||||||
/// Subscribes to an additional publisher and invokes a closure upon receiving output from either publisher.
|
|
||||||
///
|
|
||||||
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
|
|
||||||
/// All upstream publishers need to finish for this publisher to finsh. If an upstream publisher never publishes a value, this publisher never finishes.
|
|
||||||
/// If any of the combined publishers terminates with a failure, this publisher also fails.
|
|
||||||
/// - Parameters:
|
|
||||||
/// - other: Another publisher to combine with this one.
|
|
||||||
/// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish.
|
|
||||||
/// - Returns: A publisher that receives and combines elements from this and another publisher.
|
|
||||||
public func combineLatest<P, T>(_ other: P, _ transform: @escaping (Self.Output, P.Output) -> T) -> Publishers.Map<Publishers.CombineLatest<Self, P>, T> where P : Publisher, Self.Failure == P.Failure
|
|
||||||
|
|
||||||
/// Subscribes to two additional publishers and publishes a tuple upon receiving output from any of the publishers.
|
|
||||||
///
|
|
||||||
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
|
|
||||||
/// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes.
|
|
||||||
/// If any of the combined publishers terminates with a failure, this publisher also fails.
|
|
||||||
/// - Parameters:
|
|
||||||
/// - publisher1: A second publisher to combine with this one.
|
|
||||||
/// - publisher2: A third publisher to combine with this one.
|
|
||||||
/// - Returns: A publisher that receives and combines elements from this publisher and two other publishers.
|
|
||||||
public func combineLatest<P, Q>(_ publisher1: P, _ publisher2: Q) -> Publishers.CombineLatest3<Self, P, Q> where P : Publisher, Q : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure
|
|
||||||
|
|
||||||
/// Subscribes to two additional publishers and invokes a closure upon receiving output from any of the publishers.
|
|
||||||
///
|
|
||||||
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
|
|
||||||
/// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes.
|
|
||||||
/// If any of the combined publishers terminates with a failure, this publisher also fails.
|
|
||||||
/// - Parameters:
|
|
||||||
/// - publisher1: A second publisher to combine with this one.
|
|
||||||
/// - publisher2: A third publisher to combine with this one.
|
|
||||||
/// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish.
|
|
||||||
/// - Returns: A publisher that receives and combines elements from this publisher and two other publishers.
|
|
||||||
public func combineLatest<P, Q, T>(_ publisher1: P, _ publisher2: Q, _ transform: @escaping (Self.Output, P.Output, Q.Output) -> T) -> Publishers.Map<Publishers.CombineLatest3<Self, P, Q>, T> where P : Publisher, Q : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure
|
|
||||||
|
|
||||||
/// Subscribes to three additional publishers and publishes a tuple upon receiving output from any of the publishers.
|
|
||||||
///
|
|
||||||
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
|
|
||||||
/// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes.
|
|
||||||
/// If any of the combined publishers terminates with a failure, this publisher also fails.
|
|
||||||
/// - Parameters:
|
|
||||||
/// - publisher1: A second publisher to combine with this one.
|
|
||||||
/// - publisher2: A third publisher to combine with this one.
|
|
||||||
/// - publisher3: A fourth publisher to combine with this one.
|
|
||||||
/// - Returns: A publisher that receives and combines elements from this publisher and three other publishers.
|
|
||||||
public func combineLatest<P, Q, R>(_ publisher1: P, _ publisher2: Q, _ publisher3: R) -> Publishers.CombineLatest4<Self, P, Q, R> where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure
|
|
||||||
|
|
||||||
/// Subscribes to three additional publishers and invokes a closure upon receiving output from any of the publishers.
|
|
||||||
///
|
|
||||||
/// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer.
|
|
||||||
/// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes.
|
|
||||||
/// If any of the combined publishers terminates with a failure, this publisher also fails.
|
|
||||||
/// - Parameters:
|
|
||||||
/// - publisher1: A second publisher to combine with this one.
|
|
||||||
/// - publisher2: A third publisher to combine with this one.
|
|
||||||
/// - publisher3: A fourth publisher to combine with this one.
|
|
||||||
/// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish.
|
|
||||||
/// - Returns: A publisher that receives and combines elements from this publisher and three other publishers.
|
|
||||||
public func combineLatest<P, Q, R, T>(_ publisher1: P, _ publisher2: Q, _ publisher3: R, _ transform: @escaping (Self.Output, P.Output, Q.Output, R.Output) -> T) -> Publishers.Map<Publishers.CombineLatest4<Self, P, Q, R>, T> where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure
|
|
||||||
}
|
|
||||||
|
|
||||||
extension Publishers {
|
extension Publishers {
|
||||||
|
|
||||||
/// A strategy for collecting received elements.
|
/// A strategy for collecting received elements.
|
||||||
@@ -797,43 +638,6 @@ extension Publisher {
|
|||||||
public func zip<P, Q, R, T>(_ publisher1: P, _ publisher2: Q, _ publisher3: R, _ transform: @escaping (Self.Output, P.Output, Q.Output, R.Output) -> T) -> Publishers.Map<Publishers.Zip4<Self, P, Q, R>, T> where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure
|
public func zip<P, Q, R, T>(_ publisher1: P, _ publisher2: Q, _ publisher3: R, _ transform: @escaping (Self.Output, P.Output, Q.Output, R.Output) -> T) -> Publishers.Map<Publishers.Zip4<Self, P, Q, R>, T> where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure
|
||||||
}
|
}
|
||||||
|
|
||||||
extension Publishers.CombineLatest : Equatable where A : Equatable, B : Equatable {
|
|
||||||
|
|
||||||
/// Returns a Boolean value that indicates whether two publishers are equivalent.
|
|
||||||
///
|
|
||||||
/// - Parameters:
|
|
||||||
/// - lhs: A combineLatest publisher to compare for equality.
|
|
||||||
/// - rhs: Another combineLatest publisher to compare for equality.
|
|
||||||
/// - Returns: `true` if the corresponding upstream publishers of each combineLatest publisher are equal, `false` otherwise.
|
|
||||||
public static func == (lhs: Publishers.CombineLatest<A, B>, rhs: Publishers.CombineLatest<A, B>) -> Bool
|
|
||||||
}
|
|
||||||
|
|
||||||
extension Publishers.CombineLatest3 : Equatable where A : Equatable, B : Equatable, C : Equatable {
|
|
||||||
|
|
||||||
/// Returns a Boolean value indicating whether two values are equal.
|
|
||||||
///
|
|
||||||
/// Equality is the inverse of inequality. For any values `a` and `b`,
|
|
||||||
/// `a == b` implies that `a != b` is `false`.
|
|
||||||
///
|
|
||||||
/// - Parameters:
|
|
||||||
/// - lhs: A value to compare.
|
|
||||||
/// - rhs: Another value to compare.
|
|
||||||
public static func == (lhs: Publishers.CombineLatest3<A, B, C>, rhs: Publishers.CombineLatest3<A, B, C>) -> Bool
|
|
||||||
}
|
|
||||||
|
|
||||||
extension Publishers.CombineLatest4 : Equatable where A : Equatable, B : Equatable, C : Equatable, D : Equatable {
|
|
||||||
|
|
||||||
/// Returns a Boolean value indicating whether two values are equal.
|
|
||||||
///
|
|
||||||
/// Equality is the inverse of inequality. For any values `a` and `b`,
|
|
||||||
/// `a == b` implies that `a != b` is `false`.
|
|
||||||
///
|
|
||||||
/// - Parameters:
|
|
||||||
/// - lhs: A value to compare.
|
|
||||||
/// - rhs: Another value to compare.
|
|
||||||
public static func == (lhs: Publishers.CombineLatest4<A, B, C, D>, rhs: Publishers.CombineLatest4<A, B, C, D>) -> Bool
|
|
||||||
}
|
|
||||||
|
|
||||||
extension Publishers.Merge : Equatable where A : Equatable, B : Equatable {
|
extension Publishers.Merge : Equatable where A : Equatable, B : Equatable {
|
||||||
|
|
||||||
/// Returns a Boolean value that indicates whether two publishers are equivalent.
|
/// Returns a Boolean value that indicates whether two publishers are equivalent.
|
||||||
|
|||||||
@@ -0,0 +1,205 @@
|
|||||||
|
//
|
||||||
|
// AbstractCombineLatest.swift
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// Created by Sergej Jaskiewicz on 10.12.2019.
|
||||||
|
//
|
||||||
|
|
||||||
|
internal class AbstractCombineLatest<Output, Failure, Downstream: Subscriber>
|
||||||
|
where Downstream.Input == Output, Downstream.Failure == Failure
|
||||||
|
{
|
||||||
|
private let downstream: Downstream
|
||||||
|
|
||||||
|
// TODO: The size of these arrays always stays the same.
|
||||||
|
// Maybe we can leverage ManagedBuffer/ManagedBufferPointer here
|
||||||
|
// to avoid additional allocations.
|
||||||
|
private var buffers: [Any?] // 0x78
|
||||||
|
private var subscriptions: [Subscription?] // 0x80
|
||||||
|
|
||||||
|
private var demand = Subscribers.Demand.none // 0x88
|
||||||
|
|
||||||
|
private var recursion = false // 0x90
|
||||||
|
|
||||||
|
private var finished = false // 0x98
|
||||||
|
|
||||||
|
private var errored = false // 0xA0
|
||||||
|
|
||||||
|
private var cancelled = false // 0xA8
|
||||||
|
|
||||||
|
private let upstreamCount: Int // 0xB0
|
||||||
|
|
||||||
|
private var finishCount = 0 // 0xB8
|
||||||
|
|
||||||
|
private let lock = UnfairLock.allocate() // 0xC0
|
||||||
|
|
||||||
|
private let downstreamLock = UnfairRecursiveLock.allocate() // 0xC8
|
||||||
|
|
||||||
|
internal init(downstream: Downstream, upstreamCount: Int) {
|
||||||
|
self.downstream = downstream
|
||||||
|
self.buffers = Array(repeating: nil, count: upstreamCount)
|
||||||
|
self.subscriptions = Array(repeating: nil, count: upstreamCount)
|
||||||
|
self.upstreamCount = upstreamCount
|
||||||
|
}
|
||||||
|
|
||||||
|
deinit {
|
||||||
|
lock.deallocate()
|
||||||
|
downstreamLock.deallocate()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: There should be more type-safe (and faster) way.
|
||||||
|
// E. g. what if we store `buffers` in subclasses?
|
||||||
|
internal func convert(values: [Any?]) -> Output {
|
||||||
|
abstractMethod()
|
||||||
|
}
|
||||||
|
|
||||||
|
fileprivate final func receive(subscription: Subscription, index: Int) {
|
||||||
|
lock.lock()
|
||||||
|
guard !cancelled && subscriptions[index] == nil else {
|
||||||
|
lock.unlock()
|
||||||
|
subscription.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
subscriptions[index] = subscription
|
||||||
|
lock.unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
fileprivate final func receive(_ input: Any, index: Int) -> Subscribers.Demand {
|
||||||
|
lock.lock()
|
||||||
|
if cancelled || finished {
|
||||||
|
lock.unlock()
|
||||||
|
return .none
|
||||||
|
}
|
||||||
|
buffers[index] = input
|
||||||
|
guard !recursion && demand > 0 && buffers.allSatisfy({ $0 != nil }) else {
|
||||||
|
lock.unlock()
|
||||||
|
return .none
|
||||||
|
}
|
||||||
|
demand -= 1
|
||||||
|
recursion = true
|
||||||
|
lock.unlock()
|
||||||
|
downstreamLock.lock()
|
||||||
|
let newDemand = downstream.receive(convert(values: buffers))
|
||||||
|
downstreamLock.unlock()
|
||||||
|
lock.lock()
|
||||||
|
recursion = false
|
||||||
|
demand += newDemand
|
||||||
|
lock.unlock()
|
||||||
|
return .none
|
||||||
|
}
|
||||||
|
|
||||||
|
fileprivate final func receive(completion: Subscribers.Completion<Failure>,
|
||||||
|
index: Int) {
|
||||||
|
switch completion {
|
||||||
|
case .finished:
|
||||||
|
lock.lock()
|
||||||
|
if finished {
|
||||||
|
lock.unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
finishCount += 1
|
||||||
|
subscriptions[index] = nil
|
||||||
|
if finishCount == upstreamCount {
|
||||||
|
finished = true
|
||||||
|
buffers = Array(repeating: nil, count: upstreamCount)
|
||||||
|
lock.unlock()
|
||||||
|
downstreamLock.lock()
|
||||||
|
downstream.receive(completion: completion)
|
||||||
|
downstreamLock.unlock()
|
||||||
|
} else {
|
||||||
|
lock.unlock()
|
||||||
|
}
|
||||||
|
case .failure:
|
||||||
|
lock.lock()
|
||||||
|
finished = true
|
||||||
|
errored = true
|
||||||
|
let subscriptions = self.subscriptions
|
||||||
|
self.subscriptions = Array(repeating: nil, count: upstreamCount)
|
||||||
|
buffers = Array(repeating: nil, count: upstreamCount)
|
||||||
|
lock.unlock()
|
||||||
|
for (i, subscription) in subscriptions.enumerated() where i != index {
|
||||||
|
subscription?.cancel()
|
||||||
|
}
|
||||||
|
downstreamLock.lock()
|
||||||
|
downstream.receive(completion: completion)
|
||||||
|
downstreamLock.unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extension AbstractCombineLatest: Subscription {
|
||||||
|
internal func request(_ demand: Subscribers.Demand) {
|
||||||
|
demand.assertNonZero() // TODO: Test this
|
||||||
|
lock.lock()
|
||||||
|
guard !cancelled && !finished else {
|
||||||
|
lock.unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
self.demand += demand
|
||||||
|
lock.unlock()
|
||||||
|
for subscription in subscriptions {
|
||||||
|
subscription?.request(demand)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal func cancel() {
|
||||||
|
lock.lock()
|
||||||
|
cancelled = true
|
||||||
|
let subscriptions = self.subscriptions
|
||||||
|
self.subscriptions = Array(repeating: nil, count: upstreamCount)
|
||||||
|
buffers = Array(repeating: nil, count: upstreamCount)
|
||||||
|
lock.unlock()
|
||||||
|
for subscription in subscriptions {
|
||||||
|
subscription?.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extension AbstractCombineLatest: CustomStringConvertible {
|
||||||
|
internal var description: String { return "CombineLatest" }
|
||||||
|
}
|
||||||
|
|
||||||
|
extension AbstractCombineLatest: CustomReflectable {
|
||||||
|
internal var customMirror: Mirror {
|
||||||
|
lock.lock()
|
||||||
|
defer { lock.unlock() }
|
||||||
|
let children: [Mirror.Child] = [
|
||||||
|
("downstream", downstream),
|
||||||
|
("upstreamSubscriptions", subscriptions),
|
||||||
|
("demand", demand),
|
||||||
|
("buffers", buffers)
|
||||||
|
]
|
||||||
|
return Mirror(self, children: children)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extension AbstractCombineLatest: CustomPlaygroundDisplayConvertible {
|
||||||
|
internal final var playgroundDescription: Any { return description }
|
||||||
|
}
|
||||||
|
|
||||||
|
extension AbstractCombineLatest {
|
||||||
|
internal struct Side<Input>: Subscriber, CustomStringConvertible {
|
||||||
|
private let index: Int
|
||||||
|
private let combiner: AbstractCombineLatest
|
||||||
|
|
||||||
|
internal let combineIdentifier = CombineIdentifier()
|
||||||
|
|
||||||
|
internal init(index: Int, combiner: AbstractCombineLatest) {
|
||||||
|
self.index = index
|
||||||
|
self.combiner = combiner
|
||||||
|
}
|
||||||
|
|
||||||
|
internal func receive(subscription: Subscription) {
|
||||||
|
combiner.receive(subscription: subscription, index: index)
|
||||||
|
}
|
||||||
|
|
||||||
|
internal func receive(_ input: Input) -> Subscribers.Demand {
|
||||||
|
return combiner.receive(input, index: index)
|
||||||
|
}
|
||||||
|
|
||||||
|
internal func receive(completion: Subscribers.Completion<Failure>) {
|
||||||
|
combiner.receive(completion: completion, index: index)
|
||||||
|
}
|
||||||
|
|
||||||
|
internal var description: String { return "CombineLatest" }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,408 @@
|
|||||||
|
// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
|
||||||
|
// ┃ ┃
|
||||||
|
// ┃ Auto-generated from GYB template. DO NOT EDIT! ┃
|
||||||
|
// ┃ ┃
|
||||||
|
// ┃ ┃
|
||||||
|
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
|
||||||
|
//
|
||||||
|
// Publishers.CombineLatest.swift.gyb
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// Created by Sergej Jaskiewicz on 10.12.2019.
|
||||||
|
//
|
||||||
|
|
||||||
|
// swiftlint:disable generic_type_name
|
||||||
|
// swiftlint:disable large_tuple
|
||||||
|
|
||||||
|
// MARK: - CombineLatest methods on Publisher
|
||||||
|
|
||||||
|
extension Publisher {
|
||||||
|
|
||||||
|
/// Subscribes to an additional publisher and publishes a tuple upon
|
||||||
|
/// receiving output from either publisher.
|
||||||
|
///
|
||||||
|
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||||
|
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||||
|
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||||
|
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||||
|
/// holds the most recent value in each buffer.
|
||||||
|
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||||
|
/// publisher never publishes a value, this publisher never finishes.
|
||||||
|
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||||
|
/// fails.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - other: Another publisher to combine with this one.
|
||||||
|
/// - Returns: A publisher that receives and combines elements from this and another
|
||||||
|
/// publisher.
|
||||||
|
public func combineLatest<P: Publisher>(
|
||||||
|
_ other: P
|
||||||
|
) -> Publishers.CombineLatest<Self, P>
|
||||||
|
where Failure == P.Failure
|
||||||
|
{
|
||||||
|
return .init(self, other)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscribes to an additional publisher and invokes a closure
|
||||||
|
/// upon receiving output from either publisher.
|
||||||
|
///
|
||||||
|
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||||
|
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||||
|
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||||
|
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||||
|
/// holds the most recent value in each buffer.
|
||||||
|
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||||
|
/// publisher never publishes a value, this publisher never finishes.
|
||||||
|
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||||
|
/// fails.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - other: Another publisher to combine with this one.
|
||||||
|
/// - transform: A closure that receives the most recent value from each publisher
|
||||||
|
/// and returns a new value to publish.
|
||||||
|
/// - Returns: A publisher that receives and combines elements from this and another
|
||||||
|
/// publisher.
|
||||||
|
public func combineLatest<P: Publisher, Result>(
|
||||||
|
_ other: P,
|
||||||
|
_ transform: @escaping (Output, P.Output) -> Result
|
||||||
|
) -> Publishers.Map<Publishers.CombineLatest<Self, P>, Result>
|
||||||
|
where Failure == P.Failure
|
||||||
|
{
|
||||||
|
return Publishers.CombineLatest(self, other).map {
|
||||||
|
transform($0, $1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/// Subscribes to two additional publishers and publishes a tuple upon
|
||||||
|
/// receiving output from either publisher.
|
||||||
|
///
|
||||||
|
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||||
|
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||||
|
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||||
|
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||||
|
/// holds the most recent value in each buffer.
|
||||||
|
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||||
|
/// publisher never publishes a value, this publisher never finishes.
|
||||||
|
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||||
|
/// fails.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - publisher1: A second publisher to combine with this one.
|
||||||
|
/// - publisher2: A third publisher to combine with this one.
|
||||||
|
/// - Returns: A publisher that receives and combines elements from this and another
|
||||||
|
/// publisher.
|
||||||
|
public func combineLatest<P: Publisher, Q: Publisher>(
|
||||||
|
_ publisher1: P,
|
||||||
|
_ publisher2: Q
|
||||||
|
) -> Publishers.CombineLatest3<Self, P, Q>
|
||||||
|
where Failure == P.Failure,
|
||||||
|
P.Failure == Q.Failure
|
||||||
|
{
|
||||||
|
return .init(self, publisher1, publisher2)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscribes to two additional publishers and invokes a closure
|
||||||
|
/// upon receiving output from either publisher.
|
||||||
|
///
|
||||||
|
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||||
|
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||||
|
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||||
|
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||||
|
/// holds the most recent value in each buffer.
|
||||||
|
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||||
|
/// publisher never publishes a value, this publisher never finishes.
|
||||||
|
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||||
|
/// fails.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - publisher1: A second publisher to combine with this one.
|
||||||
|
/// - publisher2: A third publisher to combine with this one.
|
||||||
|
/// - transform: A closure that receives the most recent value from each publisher
|
||||||
|
/// and returns a new value to publish.
|
||||||
|
/// - Returns: A publisher that receives and combines elements from this and another
|
||||||
|
/// publisher.
|
||||||
|
public func combineLatest<P: Publisher, Q: Publisher, Result>(
|
||||||
|
_ publisher1: P,
|
||||||
|
_ publisher2: Q,
|
||||||
|
_ transform: @escaping (Output, P.Output, Q.Output) -> Result
|
||||||
|
) -> Publishers.Map<Publishers.CombineLatest3<Self, P, Q>, Result>
|
||||||
|
where Failure == P.Failure,
|
||||||
|
P.Failure == Q.Failure
|
||||||
|
{
|
||||||
|
return Publishers.CombineLatest3(self, publisher1, publisher2).map {
|
||||||
|
transform($0, $1, $2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/// Subscribes to three additional publishers and publishes a tuple upon
|
||||||
|
/// receiving output from either publisher.
|
||||||
|
///
|
||||||
|
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||||
|
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||||
|
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||||
|
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||||
|
/// holds the most recent value in each buffer.
|
||||||
|
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||||
|
/// publisher never publishes a value, this publisher never finishes.
|
||||||
|
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||||
|
/// fails.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - publisher1: A second publisher to combine with this one.
|
||||||
|
/// - publisher2: A third publisher to combine with this one.
|
||||||
|
/// - publisher3: A fourth publisher to combine with this one.
|
||||||
|
/// - Returns: A publisher that receives and combines elements from this and another
|
||||||
|
/// publisher.
|
||||||
|
public func combineLatest<P: Publisher, Q: Publisher, R: Publisher>(
|
||||||
|
_ publisher1: P,
|
||||||
|
_ publisher2: Q,
|
||||||
|
_ publisher3: R
|
||||||
|
) -> Publishers.CombineLatest4<Self, P, Q, R>
|
||||||
|
where Failure == P.Failure,
|
||||||
|
P.Failure == Q.Failure,
|
||||||
|
Q.Failure == R.Failure
|
||||||
|
{
|
||||||
|
return .init(self, publisher1, publisher2, publisher3)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscribes to three additional publishers and invokes a closure
|
||||||
|
/// upon receiving output from either publisher.
|
||||||
|
///
|
||||||
|
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||||
|
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||||
|
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||||
|
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||||
|
/// holds the most recent value in each buffer.
|
||||||
|
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||||
|
/// publisher never publishes a value, this publisher never finishes.
|
||||||
|
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||||
|
/// fails.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - publisher1: A second publisher to combine with this one.
|
||||||
|
/// - publisher2: A third publisher to combine with this one.
|
||||||
|
/// - publisher3: A fourth publisher to combine with this one.
|
||||||
|
/// - transform: A closure that receives the most recent value from each publisher
|
||||||
|
/// and returns a new value to publish.
|
||||||
|
/// - Returns: A publisher that receives and combines elements from this and another
|
||||||
|
/// publisher.
|
||||||
|
public func combineLatest<P: Publisher, Q: Publisher, R: Publisher, Result>(
|
||||||
|
_ publisher1: P,
|
||||||
|
_ publisher2: Q,
|
||||||
|
_ publisher3: R,
|
||||||
|
_ transform: @escaping (Output, P.Output, Q.Output, R.Output) -> Result
|
||||||
|
) -> Publishers.Map<Publishers.CombineLatest4<Self, P, Q, R>, Result>
|
||||||
|
where Failure == P.Failure,
|
||||||
|
P.Failure == Q.Failure,
|
||||||
|
Q.Failure == R.Failure
|
||||||
|
{
|
||||||
|
return Publishers.CombineLatest4(self, publisher1, publisher2, publisher3).map {
|
||||||
|
transform($0, $1, $2, $3)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MARK: - CombineLatest publishers
|
||||||
|
|
||||||
|
extension Publishers {
|
||||||
|
|
||||||
|
/// A publisher that receives and combines the latest elements from two
|
||||||
|
/// publishers.
|
||||||
|
public struct CombineLatest<A: Publisher, B: Publisher>
|
||||||
|
: Publisher
|
||||||
|
where A.Failure == B.Failure
|
||||||
|
{
|
||||||
|
public typealias Output = (A.Output, B.Output)
|
||||||
|
|
||||||
|
public typealias Failure = A.Failure
|
||||||
|
|
||||||
|
public let a: A
|
||||||
|
|
||||||
|
public let b: B
|
||||||
|
|
||||||
|
public init(
|
||||||
|
_ a: A,
|
||||||
|
_ b: B
|
||||||
|
) {
|
||||||
|
self.a = a
|
||||||
|
self.b = b
|
||||||
|
}
|
||||||
|
|
||||||
|
public func receive<Downstream: Subscriber>(subscriber: Downstream)
|
||||||
|
where Downstream.Failure == Failure,
|
||||||
|
Downstream.Input == Output
|
||||||
|
{
|
||||||
|
typealias Inner = CombineLatest2Inner<A.Output,
|
||||||
|
B.Output,
|
||||||
|
Failure,
|
||||||
|
Downstream>
|
||||||
|
let inner = Inner(downstream: subscriber, upstreamCount: 2)
|
||||||
|
a.subscribe(Inner.Side(index: 0, combiner: inner))
|
||||||
|
b.subscribe(Inner.Side(index: 1, combiner: inner))
|
||||||
|
subscriber.receive(subscription: inner)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A publisher that receives and combines the latest elements from three
|
||||||
|
/// publishers.
|
||||||
|
public struct CombineLatest3<A: Publisher, B: Publisher, C: Publisher>
|
||||||
|
: Publisher
|
||||||
|
where A.Failure == B.Failure,
|
||||||
|
B.Failure == C.Failure
|
||||||
|
{
|
||||||
|
public typealias Output = (A.Output, B.Output, C.Output)
|
||||||
|
|
||||||
|
public typealias Failure = A.Failure
|
||||||
|
|
||||||
|
public let a: A
|
||||||
|
|
||||||
|
public let b: B
|
||||||
|
|
||||||
|
public let c: C
|
||||||
|
|
||||||
|
public init(
|
||||||
|
_ a: A,
|
||||||
|
_ b: B,
|
||||||
|
_ c: C
|
||||||
|
) {
|
||||||
|
self.a = a
|
||||||
|
self.b = b
|
||||||
|
self.c = c
|
||||||
|
}
|
||||||
|
|
||||||
|
public func receive<Downstream: Subscriber>(subscriber: Downstream)
|
||||||
|
where Downstream.Failure == Failure,
|
||||||
|
Downstream.Input == Output
|
||||||
|
{
|
||||||
|
typealias Inner = CombineLatest3Inner<A.Output,
|
||||||
|
B.Output,
|
||||||
|
C.Output,
|
||||||
|
Failure,
|
||||||
|
Downstream>
|
||||||
|
let inner = Inner(downstream: subscriber, upstreamCount: 3)
|
||||||
|
a.subscribe(Inner.Side(index: 0, combiner: inner))
|
||||||
|
b.subscribe(Inner.Side(index: 1, combiner: inner))
|
||||||
|
c.subscribe(Inner.Side(index: 2, combiner: inner))
|
||||||
|
subscriber.receive(subscription: inner)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A publisher that receives and combines the latest elements from four
|
||||||
|
/// publishers.
|
||||||
|
public struct CombineLatest4<A: Publisher, B: Publisher, C: Publisher, D: Publisher>
|
||||||
|
: Publisher
|
||||||
|
where A.Failure == B.Failure,
|
||||||
|
B.Failure == C.Failure,
|
||||||
|
C.Failure == D.Failure
|
||||||
|
{
|
||||||
|
public typealias Output = (A.Output, B.Output, C.Output, D.Output)
|
||||||
|
|
||||||
|
public typealias Failure = A.Failure
|
||||||
|
|
||||||
|
public let a: A
|
||||||
|
|
||||||
|
public let b: B
|
||||||
|
|
||||||
|
public let c: C
|
||||||
|
|
||||||
|
public let d: D
|
||||||
|
|
||||||
|
public init(
|
||||||
|
_ a: A,
|
||||||
|
_ b: B,
|
||||||
|
_ c: C,
|
||||||
|
_ d: D
|
||||||
|
) {
|
||||||
|
self.a = a
|
||||||
|
self.b = b
|
||||||
|
self.c = c
|
||||||
|
self.d = d
|
||||||
|
}
|
||||||
|
|
||||||
|
public func receive<Downstream: Subscriber>(subscriber: Downstream)
|
||||||
|
where Downstream.Failure == Failure,
|
||||||
|
Downstream.Input == Output
|
||||||
|
{
|
||||||
|
typealias Inner = CombineLatest4Inner<A.Output,
|
||||||
|
B.Output,
|
||||||
|
C.Output,
|
||||||
|
D.Output,
|
||||||
|
Failure,
|
||||||
|
Downstream>
|
||||||
|
let inner = Inner(downstream: subscriber, upstreamCount: 4)
|
||||||
|
a.subscribe(Inner.Side(index: 0, combiner: inner))
|
||||||
|
b.subscribe(Inner.Side(index: 1, combiner: inner))
|
||||||
|
c.subscribe(Inner.Side(index: 2, combiner: inner))
|
||||||
|
d.subscribe(Inner.Side(index: 3, combiner: inner))
|
||||||
|
subscriber.receive(subscription: inner)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MARK: - Equatable conformances
|
||||||
|
|
||||||
|
extension Publishers.CombineLatest: Equatable
|
||||||
|
where
|
||||||
|
A: Equatable,
|
||||||
|
B: Equatable {}
|
||||||
|
|
||||||
|
extension Publishers.CombineLatest3: Equatable
|
||||||
|
where
|
||||||
|
A: Equatable,
|
||||||
|
B: Equatable,
|
||||||
|
C: Equatable {}
|
||||||
|
|
||||||
|
extension Publishers.CombineLatest4: Equatable
|
||||||
|
where
|
||||||
|
A: Equatable,
|
||||||
|
B: Equatable,
|
||||||
|
C: Equatable,
|
||||||
|
D: Equatable {}
|
||||||
|
|
||||||
|
// MARK: - Inners
|
||||||
|
|
||||||
|
private final class CombineLatest2Inner<Input0,
|
||||||
|
Input1,
|
||||||
|
Failure,
|
||||||
|
Downstream: Subscriber>
|
||||||
|
: AbstractCombineLatest<(Input0, Input1), Failure, Downstream>
|
||||||
|
where Downstream.Input == (Input0, Input1),
|
||||||
|
Downstream.Failure == Failure
|
||||||
|
{
|
||||||
|
override func convert(values: [Any?]) -> (Input0, Input1) {
|
||||||
|
return (values[0] as! Input0,
|
||||||
|
values[1] as! Input1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class CombineLatest3Inner<Input0,
|
||||||
|
Input1,
|
||||||
|
Input2,
|
||||||
|
Failure,
|
||||||
|
Downstream: Subscriber>
|
||||||
|
: AbstractCombineLatest<(Input0, Input1, Input2), Failure, Downstream>
|
||||||
|
where Downstream.Input == (Input0, Input1, Input2),
|
||||||
|
Downstream.Failure == Failure
|
||||||
|
{
|
||||||
|
override func convert(values: [Any?]) -> (Input0, Input1, Input2) {
|
||||||
|
return (values[0] as! Input0,
|
||||||
|
values[1] as! Input1,
|
||||||
|
values[2] as! Input2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class CombineLatest4Inner<Input0,
|
||||||
|
Input1,
|
||||||
|
Input2,
|
||||||
|
Input3,
|
||||||
|
Failure,
|
||||||
|
Downstream: Subscriber>
|
||||||
|
: AbstractCombineLatest<(Input0, Input1, Input2, Input3), Failure, Downstream>
|
||||||
|
where Downstream.Input == (Input0, Input1, Input2, Input3),
|
||||||
|
Downstream.Failure == Failure
|
||||||
|
{
|
||||||
|
override func convert(values: [Any?]) -> (Input0, Input1, Input2, Input3) {
|
||||||
|
return (values[0] as! Input0,
|
||||||
|
values[1] as! Input1,
|
||||||
|
values[2] as! Input2,
|
||||||
|
values[3] as! Input3)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,268 @@
|
|||||||
|
${template_header}
|
||||||
|
//
|
||||||
|
// Publishers.CombineLatest.swift.gyb
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// Created by Sergej Jaskiewicz on 10.12.2019.
|
||||||
|
//
|
||||||
|
%{
|
||||||
|
from gyb_opencombine_support import (
|
||||||
|
suffix_variadic,
|
||||||
|
list_with_suffix_variadic,
|
||||||
|
indent
|
||||||
|
)
|
||||||
|
|
||||||
|
import string
|
||||||
|
|
||||||
|
instantiations = [(2, 'two', 'A second'),
|
||||||
|
(3, 'three', 'A third'),
|
||||||
|
(4, 'four', 'A fourth')]
|
||||||
|
|
||||||
|
def make_publisher_name(arity):
|
||||||
|
return suffix_variadic('CombineLatest', arity, arity - 1)
|
||||||
|
|
||||||
|
def make_upstream_types(arity, start=0):
|
||||||
|
return [str(c) for c in string.ascii_uppercase[start:(start + arity)]]
|
||||||
|
|
||||||
|
def make_upstream_generic_constraints(upstream_types, first_is_self=False):
|
||||||
|
|
||||||
|
format_string = '{0}Failure == {1}.Failure'
|
||||||
|
|
||||||
|
def format(i):
|
||||||
|
return format_string.format(upstream_types[i] + '.',
|
||||||
|
upstream_types[i + 1])
|
||||||
|
|
||||||
|
result = [format(i) for i in range(len(upstream_types) - 1)]
|
||||||
|
|
||||||
|
if first_is_self:
|
||||||
|
result.insert(0, format_string.format('', upstream_types[0]))
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def declare_combine_latest_method(arity, transform):
|
||||||
|
arg_count = arity - 1
|
||||||
|
declaration_format = """\
|
||||||
|
public func combineLatest<{}>(
|
||||||
|
{}
|
||||||
|
) -> {}
|
||||||
|
where {}\
|
||||||
|
"""
|
||||||
|
upstream_types = make_upstream_types(arg_count, 15)
|
||||||
|
method_generic_params = \
|
||||||
|
[upstream_type + ': Publisher' for upstream_type in upstream_types]
|
||||||
|
if transform:
|
||||||
|
method_generic_params.append('Result')
|
||||||
|
cs_method_generic_params = ', '.join(method_generic_params)
|
||||||
|
method_args = ['_ other: P'] if arg_count == 1 \
|
||||||
|
else ['_ publisher{}: {}'.format(i + 1, upstream_types[i]) \
|
||||||
|
for i in range(arg_count)]
|
||||||
|
if transform:
|
||||||
|
output_types = ['Output'] + ['{}.Output'.format(upstream_type) \
|
||||||
|
for upstream_type in upstream_types]
|
||||||
|
cs_output_types = ', '.join(output_types)
|
||||||
|
method_args \
|
||||||
|
.append('_ transform: @escaping ({}) -> Result'.format(cs_output_types))
|
||||||
|
cs_method_args = ',\n '.join(method_args)
|
||||||
|
|
||||||
|
publisher_generic_params = ['Self'] + upstream_types
|
||||||
|
|
||||||
|
cs_publisher_generic_params = ', '.join(publisher_generic_params)
|
||||||
|
|
||||||
|
publisher_name = 'Publishers.{}<{}>'.format(make_publisher_name(arity),
|
||||||
|
cs_publisher_generic_params)
|
||||||
|
|
||||||
|
if transform:
|
||||||
|
publisher_name = 'Publishers.Map<{}, Result>'.format(publisher_name)
|
||||||
|
|
||||||
|
generic_constraints = make_upstream_generic_constraints(upstream_types,
|
||||||
|
first_is_self=True)
|
||||||
|
|
||||||
|
cs_generic_constraints = ',\n '.join(generic_constraints)
|
||||||
|
|
||||||
|
declaration = declaration_format.format(cs_method_generic_params,
|
||||||
|
cs_method_args,
|
||||||
|
publisher_name,
|
||||||
|
cs_generic_constraints)
|
||||||
|
|
||||||
|
return indent(declaration, 4)
|
||||||
|
}%
|
||||||
|
|
||||||
|
// swiftlint:disable generic_type_name
|
||||||
|
// swiftlint:disable large_tuple
|
||||||
|
|
||||||
|
// MARK: - CombineLatest methods on Publisher
|
||||||
|
|
||||||
|
extension Publisher {
|
||||||
|
|
||||||
|
% for arity, _, _ in instantiations:
|
||||||
|
%
|
||||||
|
% argument_names = ['other'] \
|
||||||
|
% if arity == 2 else ['publisher{}'.format(i) for i in range(1, arity)]
|
||||||
|
% doc_cardinal = 'an additional publisher' if arity == 2 \
|
||||||
|
% else '{} additional publishers'.format(instantiations[arity - 3][1])
|
||||||
|
/// Subscribes to ${doc_cardinal} and publishes a tuple upon
|
||||||
|
/// receiving output from either publisher.
|
||||||
|
///
|
||||||
|
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||||
|
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||||
|
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||||
|
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||||
|
/// holds the most recent value in each buffer.
|
||||||
|
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||||
|
/// publisher never publishes a value, this publisher never finishes.
|
||||||
|
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||||
|
/// fails.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
% for i in range(arity - 1):
|
||||||
|
% param_doc = 'Another' if arity == 2 else instantiations[i][2]
|
||||||
|
/// - ${argument_names[i]}: ${param_doc} publisher to combine with this one.
|
||||||
|
% end
|
||||||
|
/// - Returns: A publisher that receives and combines elements from this and another
|
||||||
|
/// publisher.
|
||||||
|
${declare_combine_latest_method(arity, transform=False)}
|
||||||
|
{
|
||||||
|
return .init(self, ${', '.join(argument_names)})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscribes to ${doc_cardinal} and invokes a closure
|
||||||
|
/// upon receiving output from either publisher.
|
||||||
|
///
|
||||||
|
/// The combined publisher passes through any requests to *all* upstream publishers.
|
||||||
|
/// However, it still obeys the demand-fulfilling rule of only sending the request
|
||||||
|
/// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream
|
||||||
|
/// publishers. It implements this by using a buffer size of 1 for each upstream, and
|
||||||
|
/// holds the most recent value in each buffer.
|
||||||
|
/// All upstream publishers need to finish for this publisher to finsh. If an upstream
|
||||||
|
/// publisher never publishes a value, this publisher never finishes.
|
||||||
|
/// If any of the combined publishers terminates with a failure, this publisher also
|
||||||
|
/// fails.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
% for i in range(arity - 1):
|
||||||
|
% param_doc = 'Another' if arity == 2 else instantiations[i][2]
|
||||||
|
/// - ${argument_names[i]}: ${param_doc} publisher to combine with this one.
|
||||||
|
% end
|
||||||
|
/// - transform: A closure that receives the most recent value from each publisher
|
||||||
|
/// and returns a new value to publish.
|
||||||
|
/// - Returns: A publisher that receives and combines elements from this and another
|
||||||
|
/// publisher.
|
||||||
|
${declare_combine_latest_method(arity, transform=True)}
|
||||||
|
{
|
||||||
|
% publisher_name = make_publisher_name(arity)
|
||||||
|
return Publishers.${publisher_name}(self, ${', '.join(argument_names)}).map {
|
||||||
|
transform(${', '.join(['${}'.format(i) for i in range(arity)])})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
% end
|
||||||
|
}
|
||||||
|
|
||||||
|
// MARK: - CombineLatest publishers
|
||||||
|
|
||||||
|
extension Publishers {
|
||||||
|
% for arity, cardinal, _ in instantiations:
|
||||||
|
% publisher_name = make_publisher_name(arity)
|
||||||
|
% upstream_types = make_upstream_types(arity)
|
||||||
|
%
|
||||||
|
% upstream_generic_params = \
|
||||||
|
% [upstream_type + ': Publisher' for upstream_type in upstream_types]
|
||||||
|
%
|
||||||
|
% cs_upstream_generic_params = ', '.join(upstream_generic_params)
|
||||||
|
%
|
||||||
|
% output_types = [upstream_type + '.Output' for upstream_type in upstream_types]
|
||||||
|
%
|
||||||
|
% cs_output_types = ', '.join(output_types)
|
||||||
|
%
|
||||||
|
% upstream_generic_constraints = \
|
||||||
|
% make_upstream_generic_constraints(upstream_types)
|
||||||
|
%
|
||||||
|
% cs_upstream_generic_constraints = \
|
||||||
|
% ',\n '.join(upstream_generic_constraints)
|
||||||
|
%
|
||||||
|
% init_args = ['_ {}: {}'.format(upstream_type.lower(), upstream_type) \
|
||||||
|
% for upstream_type in upstream_types]
|
||||||
|
% cs_init_args = ',\n '.join(init_args)
|
||||||
|
%
|
||||||
|
% self_fields = [upstream_type.lower() for upstream_type in upstream_types]
|
||||||
|
|
||||||
|
/// A publisher that receives and combines the latest elements from ${cardinal}
|
||||||
|
/// publishers.
|
||||||
|
public struct ${publisher_name}<${cs_upstream_generic_params}>
|
||||||
|
: Publisher
|
||||||
|
where ${cs_upstream_generic_constraints}
|
||||||
|
{
|
||||||
|
public typealias Output = (${cs_output_types})
|
||||||
|
|
||||||
|
public typealias Failure = ${upstream_types[0]}.Failure
|
||||||
|
% for upstream_type in upstream_types:
|
||||||
|
|
||||||
|
public let ${upstream_type.lower()}: ${upstream_type}
|
||||||
|
% end
|
||||||
|
|
||||||
|
public init(
|
||||||
|
${cs_init_args}
|
||||||
|
) {
|
||||||
|
% for self_field in self_fields:
|
||||||
|
self.${self_field} = ${self_field}
|
||||||
|
% end
|
||||||
|
}
|
||||||
|
|
||||||
|
public func receive<Downstream: Subscriber>(subscriber: Downstream)
|
||||||
|
where Downstream.Failure == Failure,
|
||||||
|
Downstream.Input == Output
|
||||||
|
{
|
||||||
|
% cs_indented_output_types = (',\n' + (50 * ' ')).join(output_types)
|
||||||
|
typealias Inner = CombineLatest${arity}Inner<${cs_indented_output_types},
|
||||||
|
Failure,
|
||||||
|
Downstream>
|
||||||
|
let inner = Inner(downstream: subscriber, upstreamCount: ${arity})
|
||||||
|
% for i in range(arity):
|
||||||
|
${self_fields[i]}.subscribe(Inner.Side(index: ${i}, combiner: inner))
|
||||||
|
% end
|
||||||
|
subscriber.receive(subscription: inner)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
% end
|
||||||
|
}
|
||||||
|
|
||||||
|
// MARK: - Equatable conformances
|
||||||
|
% for arity, _, _ in instantiations:
|
||||||
|
%
|
||||||
|
% publisher_name = make_publisher_name(arity)
|
||||||
|
%
|
||||||
|
% upstream_types = make_upstream_types(arity)
|
||||||
|
%
|
||||||
|
% constraints = [upstream_type + ': Equatable' for upstream_type in upstream_types]
|
||||||
|
% cs_constraints = ',\n'.join(constraints)
|
||||||
|
% cs_constraints = indent(cs_constraints, 8)
|
||||||
|
%
|
||||||
|
|
||||||
|
extension Publishers.${publisher_name}: Equatable
|
||||||
|
where
|
||||||
|
${cs_constraints} {}
|
||||||
|
% end
|
||||||
|
|
||||||
|
// MARK: - Inners
|
||||||
|
% for arity, _, _ in instantiations:
|
||||||
|
%
|
||||||
|
% publisher_name = make_publisher_name(arity)
|
||||||
|
%
|
||||||
|
% upstream_types = make_upstream_types(arity)
|
||||||
|
%
|
||||||
|
% input_types = ['Input{}'.format(i) for i in range(arity)]
|
||||||
|
%
|
||||||
|
% converters = ['values[{}] as! {}'.format(i, input_types[i]) for i in range(arity)]
|
||||||
|
% output_type = '({})'.format(', '.join(input_types))
|
||||||
|
|
||||||
|
private final class CombineLatest${arity}Inner<${(',\n' + (40 * ' ')).join(input_types)},
|
||||||
|
Failure,
|
||||||
|
Downstream: Subscriber>
|
||||||
|
: AbstractCombineLatest<${output_type}, Failure, Downstream>
|
||||||
|
where Downstream.Input == ${output_type},
|
||||||
|
Downstream.Failure == Failure
|
||||||
|
{
|
||||||
|
override func convert(values: [Any?]) -> (${', '.join(input_types)}) {
|
||||||
|
return (${',\n '.join(converters)})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
% end
|
||||||
@@ -3,3 +3,7 @@ def suffix_variadic(name, index, arity):
|
|||||||
|
|
||||||
def list_with_suffix_variadic(name, arity):
|
def list_with_suffix_variadic(name, arity):
|
||||||
return [suffix_variadic(name, i, arity) for i in range(arity)]
|
return [suffix_variadic(name, i, arity) for i in range(arity)]
|
||||||
|
|
||||||
|
def indent(input, space_count):
|
||||||
|
padding = space_count * ' '
|
||||||
|
return ''.join(padding + line for line in input.splitlines(True))
|
||||||
|
|||||||
Reference in New Issue
Block a user