diff --git a/RemainingCombineInterface.swift b/RemainingCombineInterface.swift index f56f56c..7ef1bf2 100644 --- a/RemainingCombineInterface.swift +++ b/RemainingCombineInterface.swift @@ -2,165 +2,6 @@ // Please remove the corresponding piece from this file if you implement something, // and complement this file as features are added in Apple's Combine -extension Publishers { - - /// A publisher that receives and combines the latest elements from two publishers. - public struct CombineLatest : Publisher where A : Publisher, B : Publisher, A.Failure == B.Failure { - - /// The kind of values published by this publisher. - public typealias Output = (A.Output, B.Output) - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = A.Failure - - public let a: A - - public let b: B - - public init(_ a: A, _ b: B) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, B.Failure == S.Failure, S.Input == (A.Output, B.Output) - } - - /// A publisher that receives and combines the latest elements from three publishers. - public struct CombineLatest3 : Publisher where A : Publisher, B : Publisher, C : Publisher, A.Failure == B.Failure, B.Failure == C.Failure { - - /// The kind of values published by this publisher. - public typealias Output = (A.Output, B.Output, C.Output) - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = A.Failure - - public let a: A - - public let b: B - - public let c: C - - public init(_ a: A, _ b: B, _ c: C) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, C.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output) - } - - /// A publisher that receives and combines the latest elements from four publishers. - public struct CombineLatest4 : Publisher where A : Publisher, B : Publisher, C : Publisher, D : Publisher, A.Failure == B.Failure, B.Failure == C.Failure, C.Failure == D.Failure { - - /// The kind of values published by this publisher. - public typealias Output = (A.Output, B.Output, C.Output, D.Output) - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = A.Failure - - public let a: A - - public let b: B - - public let c: C - - public let d: D - - public init(_ a: A, _ b: B, _ c: C, _ d: D) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, D.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output, D.Output) - } -} - -extension Publisher { - - /// Subscribes to an additional publisher and publishes a tuple upon receiving output from either publisher. - /// - /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer. - /// All upstream publishers need to finish for this publisher to finsh. If an upstream publisher never publishes a value, this publisher never finishes. - /// If any of the combined publishers terminates with a failure, this publisher also fails. - /// - Parameters: - /// - other: Another publisher to combine with this one. - /// - Returns: A publisher that receives and combines elements from this and another publisher. - public func combineLatest

(_ other: P) -> Publishers.CombineLatest where P : Publisher, Self.Failure == P.Failure - - /// Subscribes to an additional publisher and invokes a closure upon receiving output from either publisher. - /// - /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer. - /// All upstream publishers need to finish for this publisher to finsh. If an upstream publisher never publishes a value, this publisher never finishes. - /// If any of the combined publishers terminates with a failure, this publisher also fails. - /// - Parameters: - /// - other: Another publisher to combine with this one. - /// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish. - /// - Returns: A publisher that receives and combines elements from this and another publisher. - public func combineLatest(_ other: P, _ transform: @escaping (Self.Output, P.Output) -> T) -> Publishers.Map, T> where P : Publisher, Self.Failure == P.Failure - - /// Subscribes to two additional publishers and publishes a tuple upon receiving output from any of the publishers. - /// - /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer. - /// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes. - /// If any of the combined publishers terminates with a failure, this publisher also fails. - /// - Parameters: - /// - publisher1: A second publisher to combine with this one. - /// - publisher2: A third publisher to combine with this one. - /// - Returns: A publisher that receives and combines elements from this publisher and two other publishers. - public func combineLatest(_ publisher1: P, _ publisher2: Q) -> Publishers.CombineLatest3 where P : Publisher, Q : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure - - /// Subscribes to two additional publishers and invokes a closure upon receiving output from any of the publishers. - /// - /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer. - /// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes. - /// If any of the combined publishers terminates with a failure, this publisher also fails. - /// - Parameters: - /// - publisher1: A second publisher to combine with this one. - /// - publisher2: A third publisher to combine with this one. - /// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish. - /// - Returns: A publisher that receives and combines elements from this publisher and two other publishers. - public func combineLatest(_ publisher1: P, _ publisher2: Q, _ transform: @escaping (Self.Output, P.Output, Q.Output) -> T) -> Publishers.Map, T> where P : Publisher, Q : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure - - /// Subscribes to three additional publishers and publishes a tuple upon receiving output from any of the publishers. - /// - /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer. - /// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes. - /// If any of the combined publishers terminates with a failure, this publisher also fails. - /// - Parameters: - /// - publisher1: A second publisher to combine with this one. - /// - publisher2: A third publisher to combine with this one. - /// - publisher3: A fourth publisher to combine with this one. - /// - Returns: A publisher that receives and combines elements from this publisher and three other publishers. - public func combineLatest(_ publisher1: P, _ publisher2: Q, _ publisher3: R) -> Publishers.CombineLatest4 where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure - - /// Subscribes to three additional publishers and invokes a closure upon receiving output from any of the publishers. - /// - /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer. - /// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes. - /// If any of the combined publishers terminates with a failure, this publisher also fails. - /// - Parameters: - /// - publisher1: A second publisher to combine with this one. - /// - publisher2: A third publisher to combine with this one. - /// - publisher3: A fourth publisher to combine with this one. - /// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish. - /// - Returns: A publisher that receives and combines elements from this publisher and three other publishers. - public func combineLatest(_ publisher1: P, _ publisher2: Q, _ publisher3: R, _ transform: @escaping (Self.Output, P.Output, Q.Output, R.Output) -> T) -> Publishers.Map, T> where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure -} - extension Publishers { /// A strategy for collecting received elements. @@ -797,43 +638,6 @@ extension Publisher { public func zip(_ publisher1: P, _ publisher2: Q, _ publisher3: R, _ transform: @escaping (Self.Output, P.Output, Q.Output, R.Output) -> T) -> Publishers.Map, T> where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure } -extension Publishers.CombineLatest : Equatable where A : Equatable, B : Equatable { - - /// Returns a Boolean value that indicates whether two publishers are equivalent. - /// - /// - Parameters: - /// - lhs: A combineLatest publisher to compare for equality. - /// - rhs: Another combineLatest publisher to compare for equality. - /// - Returns: `true` if the corresponding upstream publishers of each combineLatest publisher are equal, `false` otherwise. - public static func == (lhs: Publishers.CombineLatest, rhs: Publishers.CombineLatest) -> Bool -} - -extension Publishers.CombineLatest3 : Equatable where A : Equatable, B : Equatable, C : Equatable { - - /// Returns a Boolean value indicating whether two values are equal. - /// - /// Equality is the inverse of inequality. For any values `a` and `b`, - /// `a == b` implies that `a != b` is `false`. - /// - /// - Parameters: - /// - lhs: A value to compare. - /// - rhs: Another value to compare. - public static func == (lhs: Publishers.CombineLatest3, rhs: Publishers.CombineLatest3) -> Bool -} - -extension Publishers.CombineLatest4 : Equatable where A : Equatable, B : Equatable, C : Equatable, D : Equatable { - - /// Returns a Boolean value indicating whether two values are equal. - /// - /// Equality is the inverse of inequality. For any values `a` and `b`, - /// `a == b` implies that `a != b` is `false`. - /// - /// - Parameters: - /// - lhs: A value to compare. - /// - rhs: Another value to compare. - public static func == (lhs: Publishers.CombineLatest4, rhs: Publishers.CombineLatest4) -> Bool -} - extension Publishers.Merge : Equatable where A : Equatable, B : Equatable { /// Returns a Boolean value that indicates whether two publishers are equivalent. diff --git a/Sources/OpenCombine/Helpers/AbstractCombineLatest.swift b/Sources/OpenCombine/Helpers/AbstractCombineLatest.swift new file mode 100644 index 0000000..36c4b76 --- /dev/null +++ b/Sources/OpenCombine/Helpers/AbstractCombineLatest.swift @@ -0,0 +1,205 @@ +// +// AbstractCombineLatest.swift +// +// +// Created by Sergej Jaskiewicz on 10.12.2019. +// + +internal class AbstractCombineLatest + where Downstream.Input == Output, Downstream.Failure == Failure +{ + private let downstream: Downstream + + // TODO: The size of these arrays always stays the same. + // Maybe we can leverage ManagedBuffer/ManagedBufferPointer here + // to avoid additional allocations. + private var buffers: [Any?] // 0x78 + private var subscriptions: [Subscription?] // 0x80 + + private var demand = Subscribers.Demand.none // 0x88 + + private var recursion = false // 0x90 + + private var finished = false // 0x98 + + private var errored = false // 0xA0 + + private var cancelled = false // 0xA8 + + private let upstreamCount: Int // 0xB0 + + private var finishCount = 0 // 0xB8 + + private let lock = UnfairLock.allocate() // 0xC0 + + private let downstreamLock = UnfairRecursiveLock.allocate() // 0xC8 + + internal init(downstream: Downstream, upstreamCount: Int) { + self.downstream = downstream + self.buffers = Array(repeating: nil, count: upstreamCount) + self.subscriptions = Array(repeating: nil, count: upstreamCount) + self.upstreamCount = upstreamCount + } + + deinit { + lock.deallocate() + downstreamLock.deallocate() + } + + // TODO: There should be more type-safe (and faster) way. + // E. g. what if we store `buffers` in subclasses? + internal func convert(values: [Any?]) -> Output { + abstractMethod() + } + + fileprivate final func receive(subscription: Subscription, index: Int) { + lock.lock() + guard !cancelled && subscriptions[index] == nil else { + lock.unlock() + subscription.cancel() + return + } + subscriptions[index] = subscription + lock.unlock() + } + + fileprivate final func receive(_ input: Any, index: Int) -> Subscribers.Demand { + lock.lock() + if cancelled || finished { + lock.unlock() + return .none + } + buffers[index] = input + guard !recursion && demand > 0 && buffers.allSatisfy({ $0 != nil }) else { + lock.unlock() + return .none + } + demand -= 1 + recursion = true + lock.unlock() + downstreamLock.lock() + let newDemand = downstream.receive(convert(values: buffers)) + downstreamLock.unlock() + lock.lock() + recursion = false + demand += newDemand + lock.unlock() + return .none + } + + fileprivate final func receive(completion: Subscribers.Completion, + index: Int) { + switch completion { + case .finished: + lock.lock() + if finished { + lock.unlock() + return + } + finishCount += 1 + subscriptions[index] = nil + if finishCount == upstreamCount { + finished = true + buffers = Array(repeating: nil, count: upstreamCount) + lock.unlock() + downstreamLock.lock() + downstream.receive(completion: completion) + downstreamLock.unlock() + } else { + lock.unlock() + } + case .failure: + lock.lock() + finished = true + errored = true + let subscriptions = self.subscriptions + self.subscriptions = Array(repeating: nil, count: upstreamCount) + buffers = Array(repeating: nil, count: upstreamCount) + lock.unlock() + for (i, subscription) in subscriptions.enumerated() where i != index { + subscription?.cancel() + } + downstreamLock.lock() + downstream.receive(completion: completion) + downstreamLock.unlock() + } + } +} + +extension AbstractCombineLatest: Subscription { + internal func request(_ demand: Subscribers.Demand) { + demand.assertNonZero() // TODO: Test this + lock.lock() + guard !cancelled && !finished else { + lock.unlock() + return + } + self.demand += demand + lock.unlock() + for subscription in subscriptions { + subscription?.request(demand) + } + } + + internal func cancel() { + lock.lock() + cancelled = true + let subscriptions = self.subscriptions + self.subscriptions = Array(repeating: nil, count: upstreamCount) + buffers = Array(repeating: nil, count: upstreamCount) + lock.unlock() + for subscription in subscriptions { + subscription?.cancel() + } + } +} + +extension AbstractCombineLatest: CustomStringConvertible { + internal var description: String { return "CombineLatest" } +} + +extension AbstractCombineLatest: CustomReflectable { + internal var customMirror: Mirror { + lock.lock() + defer { lock.unlock() } + let children: [Mirror.Child] = [ + ("downstream", downstream), + ("upstreamSubscriptions", subscriptions), + ("demand", demand), + ("buffers", buffers) + ] + return Mirror(self, children: children) + } +} + +extension AbstractCombineLatest: CustomPlaygroundDisplayConvertible { + internal final var playgroundDescription: Any { return description } +} + +extension AbstractCombineLatest { + internal struct Side: Subscriber, CustomStringConvertible { + private let index: Int + private let combiner: AbstractCombineLatest + + internal let combineIdentifier = CombineIdentifier() + + internal init(index: Int, combiner: AbstractCombineLatest) { + self.index = index + self.combiner = combiner + } + + internal func receive(subscription: Subscription) { + combiner.receive(subscription: subscription, index: index) + } + + internal func receive(_ input: Input) -> Subscribers.Demand { + return combiner.receive(input, index: index) + } + + internal func receive(completion: Subscribers.Completion) { + combiner.receive(completion: completion, index: index) + } + + internal var description: String { return "CombineLatest" } + } +} diff --git a/Sources/OpenCombine/Publishers/GENERATED-Publishers.CombineLatest.swift b/Sources/OpenCombine/Publishers/GENERATED-Publishers.CombineLatest.swift new file mode 100644 index 0000000..2775bf4 --- /dev/null +++ b/Sources/OpenCombine/Publishers/GENERATED-Publishers.CombineLatest.swift @@ -0,0 +1,408 @@ +// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +// ┃ ┃ +// ┃ Auto-generated from GYB template. DO NOT EDIT! ┃ +// ┃ ┃ +// ┃ ┃ +// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ +// +// Publishers.CombineLatest.swift.gyb +// +// +// Created by Sergej Jaskiewicz on 10.12.2019. +// + +// swiftlint:disable generic_type_name +// swiftlint:disable large_tuple + +// MARK: - CombineLatest methods on Publisher + +extension Publisher { + + /// Subscribes to an additional publisher and publishes a tuple upon + /// receiving output from either publisher. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. + /// However, it still obeys the demand-fulfilling rule of only sending the request + /// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream + /// publishers. It implements this by using a buffer size of 1 for each upstream, and + /// holds the most recent value in each buffer. + /// All upstream publishers need to finish for this publisher to finsh. If an upstream + /// publisher never publishes a value, this publisher never finishes. + /// If any of the combined publishers terminates with a failure, this publisher also + /// fails. + /// + /// - Parameters: + /// - other: Another publisher to combine with this one. + /// - Returns: A publisher that receives and combines elements from this and another + /// publisher. + public func combineLatest( + _ other: P + ) -> Publishers.CombineLatest + where Failure == P.Failure + { + return .init(self, other) + } + + /// Subscribes to an additional publisher and invokes a closure + /// upon receiving output from either publisher. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. + /// However, it still obeys the demand-fulfilling rule of only sending the request + /// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream + /// publishers. It implements this by using a buffer size of 1 for each upstream, and + /// holds the most recent value in each buffer. + /// All upstream publishers need to finish for this publisher to finsh. If an upstream + /// publisher never publishes a value, this publisher never finishes. + /// If any of the combined publishers terminates with a failure, this publisher also + /// fails. + /// + /// - Parameters: + /// - other: Another publisher to combine with this one. + /// - transform: A closure that receives the most recent value from each publisher + /// and returns a new value to publish. + /// - Returns: A publisher that receives and combines elements from this and another + /// publisher. + public func combineLatest( + _ other: P, + _ transform: @escaping (Output, P.Output) -> Result + ) -> Publishers.Map, Result> + where Failure == P.Failure + { + return Publishers.CombineLatest(self, other).map { + transform($0, $1) + } + } + /// Subscribes to two additional publishers and publishes a tuple upon + /// receiving output from either publisher. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. + /// However, it still obeys the demand-fulfilling rule of only sending the request + /// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream + /// publishers. It implements this by using a buffer size of 1 for each upstream, and + /// holds the most recent value in each buffer. + /// All upstream publishers need to finish for this publisher to finsh. If an upstream + /// publisher never publishes a value, this publisher never finishes. + /// If any of the combined publishers terminates with a failure, this publisher also + /// fails. + /// + /// - Parameters: + /// - publisher1: A second publisher to combine with this one. + /// - publisher2: A third publisher to combine with this one. + /// - Returns: A publisher that receives and combines elements from this and another + /// publisher. + public func combineLatest( + _ publisher1: P, + _ publisher2: Q + ) -> Publishers.CombineLatest3 + where Failure == P.Failure, + P.Failure == Q.Failure + { + return .init(self, publisher1, publisher2) + } + + /// Subscribes to two additional publishers and invokes a closure + /// upon receiving output from either publisher. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. + /// However, it still obeys the demand-fulfilling rule of only sending the request + /// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream + /// publishers. It implements this by using a buffer size of 1 for each upstream, and + /// holds the most recent value in each buffer. + /// All upstream publishers need to finish for this publisher to finsh. If an upstream + /// publisher never publishes a value, this publisher never finishes. + /// If any of the combined publishers terminates with a failure, this publisher also + /// fails. + /// + /// - Parameters: + /// - publisher1: A second publisher to combine with this one. + /// - publisher2: A third publisher to combine with this one. + /// - transform: A closure that receives the most recent value from each publisher + /// and returns a new value to publish. + /// - Returns: A publisher that receives and combines elements from this and another + /// publisher. + public func combineLatest( + _ publisher1: P, + _ publisher2: Q, + _ transform: @escaping (Output, P.Output, Q.Output) -> Result + ) -> Publishers.Map, Result> + where Failure == P.Failure, + P.Failure == Q.Failure + { + return Publishers.CombineLatest3(self, publisher1, publisher2).map { + transform($0, $1, $2) + } + } + /// Subscribes to three additional publishers and publishes a tuple upon + /// receiving output from either publisher. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. + /// However, it still obeys the demand-fulfilling rule of only sending the request + /// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream + /// publishers. It implements this by using a buffer size of 1 for each upstream, and + /// holds the most recent value in each buffer. + /// All upstream publishers need to finish for this publisher to finsh. If an upstream + /// publisher never publishes a value, this publisher never finishes. + /// If any of the combined publishers terminates with a failure, this publisher also + /// fails. + /// + /// - Parameters: + /// - publisher1: A second publisher to combine with this one. + /// - publisher2: A third publisher to combine with this one. + /// - publisher3: A fourth publisher to combine with this one. + /// - Returns: A publisher that receives and combines elements from this and another + /// publisher. + public func combineLatest( + _ publisher1: P, + _ publisher2: Q, + _ publisher3: R + ) -> Publishers.CombineLatest4 + where Failure == P.Failure, + P.Failure == Q.Failure, + Q.Failure == R.Failure + { + return .init(self, publisher1, publisher2, publisher3) + } + + /// Subscribes to three additional publishers and invokes a closure + /// upon receiving output from either publisher. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. + /// However, it still obeys the demand-fulfilling rule of only sending the request + /// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream + /// publishers. It implements this by using a buffer size of 1 for each upstream, and + /// holds the most recent value in each buffer. + /// All upstream publishers need to finish for this publisher to finsh. If an upstream + /// publisher never publishes a value, this publisher never finishes. + /// If any of the combined publishers terminates with a failure, this publisher also + /// fails. + /// + /// - Parameters: + /// - publisher1: A second publisher to combine with this one. + /// - publisher2: A third publisher to combine with this one. + /// - publisher3: A fourth publisher to combine with this one. + /// - transform: A closure that receives the most recent value from each publisher + /// and returns a new value to publish. + /// - Returns: A publisher that receives and combines elements from this and another + /// publisher. + public func combineLatest( + _ publisher1: P, + _ publisher2: Q, + _ publisher3: R, + _ transform: @escaping (Output, P.Output, Q.Output, R.Output) -> Result + ) -> Publishers.Map, Result> + where Failure == P.Failure, + P.Failure == Q.Failure, + Q.Failure == R.Failure + { + return Publishers.CombineLatest4(self, publisher1, publisher2, publisher3).map { + transform($0, $1, $2, $3) + } + } +} + +// MARK: - CombineLatest publishers + +extension Publishers { + + /// A publisher that receives and combines the latest elements from two + /// publishers. + public struct CombineLatest + : Publisher + where A.Failure == B.Failure + { + public typealias Output = (A.Output, B.Output) + + public typealias Failure = A.Failure + + public let a: A + + public let b: B + + public init( + _ a: A, + _ b: B + ) { + self.a = a + self.b = b + } + + public func receive(subscriber: Downstream) + where Downstream.Failure == Failure, + Downstream.Input == Output + { + typealias Inner = CombineLatest2Inner + let inner = Inner(downstream: subscriber, upstreamCount: 2) + a.subscribe(Inner.Side(index: 0, combiner: inner)) + b.subscribe(Inner.Side(index: 1, combiner: inner)) + subscriber.receive(subscription: inner) + } + } + + /// A publisher that receives and combines the latest elements from three + /// publishers. + public struct CombineLatest3 + : Publisher + where A.Failure == B.Failure, + B.Failure == C.Failure + { + public typealias Output = (A.Output, B.Output, C.Output) + + public typealias Failure = A.Failure + + public let a: A + + public let b: B + + public let c: C + + public init( + _ a: A, + _ b: B, + _ c: C + ) { + self.a = a + self.b = b + self.c = c + } + + public func receive(subscriber: Downstream) + where Downstream.Failure == Failure, + Downstream.Input == Output + { + typealias Inner = CombineLatest3Inner + let inner = Inner(downstream: subscriber, upstreamCount: 3) + a.subscribe(Inner.Side(index: 0, combiner: inner)) + b.subscribe(Inner.Side(index: 1, combiner: inner)) + c.subscribe(Inner.Side(index: 2, combiner: inner)) + subscriber.receive(subscription: inner) + } + } + + /// A publisher that receives and combines the latest elements from four + /// publishers. + public struct CombineLatest4 + : Publisher + where A.Failure == B.Failure, + B.Failure == C.Failure, + C.Failure == D.Failure + { + public typealias Output = (A.Output, B.Output, C.Output, D.Output) + + public typealias Failure = A.Failure + + public let a: A + + public let b: B + + public let c: C + + public let d: D + + public init( + _ a: A, + _ b: B, + _ c: C, + _ d: D + ) { + self.a = a + self.b = b + self.c = c + self.d = d + } + + public func receive(subscriber: Downstream) + where Downstream.Failure == Failure, + Downstream.Input == Output + { + typealias Inner = CombineLatest4Inner + let inner = Inner(downstream: subscriber, upstreamCount: 4) + a.subscribe(Inner.Side(index: 0, combiner: inner)) + b.subscribe(Inner.Side(index: 1, combiner: inner)) + c.subscribe(Inner.Side(index: 2, combiner: inner)) + d.subscribe(Inner.Side(index: 3, combiner: inner)) + subscriber.receive(subscription: inner) + } + } +} + +// MARK: - Equatable conformances + +extension Publishers.CombineLatest: Equatable + where + A: Equatable, + B: Equatable {} + +extension Publishers.CombineLatest3: Equatable + where + A: Equatable, + B: Equatable, + C: Equatable {} + +extension Publishers.CombineLatest4: Equatable + where + A: Equatable, + B: Equatable, + C: Equatable, + D: Equatable {} + +// MARK: - Inners + +private final class CombineLatest2Inner + : AbstractCombineLatest<(Input0, Input1), Failure, Downstream> + where Downstream.Input == (Input0, Input1), + Downstream.Failure == Failure +{ + override func convert(values: [Any?]) -> (Input0, Input1) { + return (values[0] as! Input0, + values[1] as! Input1) + } +} + +private final class CombineLatest3Inner + : AbstractCombineLatest<(Input0, Input1, Input2), Failure, Downstream> + where Downstream.Input == (Input0, Input1, Input2), + Downstream.Failure == Failure +{ + override func convert(values: [Any?]) -> (Input0, Input1, Input2) { + return (values[0] as! Input0, + values[1] as! Input1, + values[2] as! Input2) + } +} + +private final class CombineLatest4Inner + : AbstractCombineLatest<(Input0, Input1, Input2, Input3), Failure, Downstream> + where Downstream.Input == (Input0, Input1, Input2, Input3), + Downstream.Failure == Failure +{ + override func convert(values: [Any?]) -> (Input0, Input1, Input2, Input3) { + return (values[0] as! Input0, + values[1] as! Input1, + values[2] as! Input2, + values[3] as! Input3) + } +} diff --git a/Sources/OpenCombine/Publishers/Publishers.CombineLatest.swift.gyb b/Sources/OpenCombine/Publishers/Publishers.CombineLatest.swift.gyb new file mode 100644 index 0000000..2345982 --- /dev/null +++ b/Sources/OpenCombine/Publishers/Publishers.CombineLatest.swift.gyb @@ -0,0 +1,268 @@ +${template_header} +// +// Publishers.CombineLatest.swift.gyb +// +// +// Created by Sergej Jaskiewicz on 10.12.2019. +// +%{ +from gyb_opencombine_support import ( + suffix_variadic, + list_with_suffix_variadic, + indent +) + +import string + +instantiations = [(2, 'two', 'A second'), + (3, 'three', 'A third'), + (4, 'four', 'A fourth')] + +def make_publisher_name(arity): + return suffix_variadic('CombineLatest', arity, arity - 1) + +def make_upstream_types(arity, start=0): + return [str(c) for c in string.ascii_uppercase[start:(start + arity)]] + +def make_upstream_generic_constraints(upstream_types, first_is_self=False): + + format_string = '{0}Failure == {1}.Failure' + + def format(i): + return format_string.format(upstream_types[i] + '.', + upstream_types[i + 1]) + + result = [format(i) for i in range(len(upstream_types) - 1)] + + if first_is_self: + result.insert(0, format_string.format('', upstream_types[0])) + + return result + +def declare_combine_latest_method(arity, transform): + arg_count = arity - 1 + declaration_format = """\ +public func combineLatest<{}>( + {} +) -> {} + where {}\ +""" + upstream_types = make_upstream_types(arg_count, 15) + method_generic_params = \ + [upstream_type + ': Publisher' for upstream_type in upstream_types] + if transform: + method_generic_params.append('Result') + cs_method_generic_params = ', '.join(method_generic_params) + method_args = ['_ other: P'] if arg_count == 1 \ + else ['_ publisher{}: {}'.format(i + 1, upstream_types[i]) \ + for i in range(arg_count)] + if transform: + output_types = ['Output'] + ['{}.Output'.format(upstream_type) \ + for upstream_type in upstream_types] + cs_output_types = ', '.join(output_types) + method_args \ + .append('_ transform: @escaping ({}) -> Result'.format(cs_output_types)) + cs_method_args = ',\n '.join(method_args) + + publisher_generic_params = ['Self'] + upstream_types + + cs_publisher_generic_params = ', '.join(publisher_generic_params) + + publisher_name = 'Publishers.{}<{}>'.format(make_publisher_name(arity), + cs_publisher_generic_params) + + if transform: + publisher_name = 'Publishers.Map<{}, Result>'.format(publisher_name) + + generic_constraints = make_upstream_generic_constraints(upstream_types, + first_is_self=True) + + cs_generic_constraints = ',\n '.join(generic_constraints) + + declaration = declaration_format.format(cs_method_generic_params, + cs_method_args, + publisher_name, + cs_generic_constraints) + + return indent(declaration, 4) +}% + +// swiftlint:disable generic_type_name +// swiftlint:disable large_tuple + +// MARK: - CombineLatest methods on Publisher + +extension Publisher { + +% for arity, _, _ in instantiations: +% +% argument_names = ['other'] \ +% if arity == 2 else ['publisher{}'.format(i) for i in range(1, arity)] +% doc_cardinal = 'an additional publisher' if arity == 2 \ +% else '{} additional publishers'.format(instantiations[arity - 3][1]) + /// Subscribes to ${doc_cardinal} and publishes a tuple upon + /// receiving output from either publisher. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. + /// However, it still obeys the demand-fulfilling rule of only sending the request + /// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream + /// publishers. It implements this by using a buffer size of 1 for each upstream, and + /// holds the most recent value in each buffer. + /// All upstream publishers need to finish for this publisher to finsh. If an upstream + /// publisher never publishes a value, this publisher never finishes. + /// If any of the combined publishers terminates with a failure, this publisher also + /// fails. + /// + /// - Parameters: +% for i in range(arity - 1): +% param_doc = 'Another' if arity == 2 else instantiations[i][2] + /// - ${argument_names[i]}: ${param_doc} publisher to combine with this one. +% end + /// - Returns: A publisher that receives and combines elements from this and another + /// publisher. +${declare_combine_latest_method(arity, transform=False)} + { + return .init(self, ${', '.join(argument_names)}) + } + + /// Subscribes to ${doc_cardinal} and invokes a closure + /// upon receiving output from either publisher. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. + /// However, it still obeys the demand-fulfilling rule of only sending the request + /// amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream + /// publishers. It implements this by using a buffer size of 1 for each upstream, and + /// holds the most recent value in each buffer. + /// All upstream publishers need to finish for this publisher to finsh. If an upstream + /// publisher never publishes a value, this publisher never finishes. + /// If any of the combined publishers terminates with a failure, this publisher also + /// fails. + /// + /// - Parameters: +% for i in range(arity - 1): +% param_doc = 'Another' if arity == 2 else instantiations[i][2] + /// - ${argument_names[i]}: ${param_doc} publisher to combine with this one. +% end + /// - transform: A closure that receives the most recent value from each publisher + /// and returns a new value to publish. + /// - Returns: A publisher that receives and combines elements from this and another + /// publisher. +${declare_combine_latest_method(arity, transform=True)} + { +% publisher_name = make_publisher_name(arity) + return Publishers.${publisher_name}(self, ${', '.join(argument_names)}).map { + transform(${', '.join(['${}'.format(i) for i in range(arity)])}) + } + } +% end +} + +// MARK: - CombineLatest publishers + +extension Publishers { +% for arity, cardinal, _ in instantiations: +% publisher_name = make_publisher_name(arity) +% upstream_types = make_upstream_types(arity) +% +% upstream_generic_params = \ +% [upstream_type + ': Publisher' for upstream_type in upstream_types] +% +% cs_upstream_generic_params = ', '.join(upstream_generic_params) +% +% output_types = [upstream_type + '.Output' for upstream_type in upstream_types] +% +% cs_output_types = ', '.join(output_types) +% +% upstream_generic_constraints = \ +% make_upstream_generic_constraints(upstream_types) +% +% cs_upstream_generic_constraints = \ +% ',\n '.join(upstream_generic_constraints) +% +% init_args = ['_ {}: {}'.format(upstream_type.lower(), upstream_type) \ +% for upstream_type in upstream_types] +% cs_init_args = ',\n '.join(init_args) +% +% self_fields = [upstream_type.lower() for upstream_type in upstream_types] + + /// A publisher that receives and combines the latest elements from ${cardinal} + /// publishers. + public struct ${publisher_name}<${cs_upstream_generic_params}> + : Publisher + where ${cs_upstream_generic_constraints} + { + public typealias Output = (${cs_output_types}) + + public typealias Failure = ${upstream_types[0]}.Failure +% for upstream_type in upstream_types: + + public let ${upstream_type.lower()}: ${upstream_type} +% end + + public init( + ${cs_init_args} + ) { +% for self_field in self_fields: + self.${self_field} = ${self_field} +% end + } + + public func receive(subscriber: Downstream) + where Downstream.Failure == Failure, + Downstream.Input == Output + { +% cs_indented_output_types = (',\n' + (50 * ' ')).join(output_types) + typealias Inner = CombineLatest${arity}Inner<${cs_indented_output_types}, + Failure, + Downstream> + let inner = Inner(downstream: subscriber, upstreamCount: ${arity}) +% for i in range(arity): + ${self_fields[i]}.subscribe(Inner.Side(index: ${i}, combiner: inner)) +% end + subscriber.receive(subscription: inner) + } + } +% end +} + +// MARK: - Equatable conformances +% for arity, _, _ in instantiations: +% +% publisher_name = make_publisher_name(arity) +% +% upstream_types = make_upstream_types(arity) +% +% constraints = [upstream_type + ': Equatable' for upstream_type in upstream_types] +% cs_constraints = ',\n'.join(constraints) +% cs_constraints = indent(cs_constraints, 8) +% + +extension Publishers.${publisher_name}: Equatable + where +${cs_constraints} {} +% end + +// MARK: - Inners +% for arity, _, _ in instantiations: +% +% publisher_name = make_publisher_name(arity) +% +% upstream_types = make_upstream_types(arity) +% +% input_types = ['Input{}'.format(i) for i in range(arity)] +% +% converters = ['values[{}] as! {}'.format(i, input_types[i]) for i in range(arity)] +% output_type = '({})'.format(', '.join(input_types)) + +private final class CombineLatest${arity}Inner<${(',\n' + (40 * ' ')).join(input_types)}, + Failure, + Downstream: Subscriber> + : AbstractCombineLatest<${output_type}, Failure, Downstream> + where Downstream.Input == ${output_type}, + Downstream.Failure == Failure +{ + override func convert(values: [Any?]) -> (${', '.join(input_types)}) { + return (${',\n '.join(converters)}) + } +} +% end diff --git a/utils/gyb_opencombine_support.py b/utils/gyb_opencombine_support.py index 2c1f962..d04c461 100644 --- a/utils/gyb_opencombine_support.py +++ b/utils/gyb_opencombine_support.py @@ -3,3 +3,7 @@ def suffix_variadic(name, index, arity): def list_with_suffix_variadic(name, arity): return [suffix_variadic(name, i, arity) for i in range(arity)] + +def indent(input, space_count): + padding = space_count * ' ' + return ''.join(padding + line for line in input.splitlines(True))