diff --git a/RemainingCombineInterface.swift b/RemainingCombineInterface.swift index 12fb215..4ea7000 100644 --- a/RemainingCombineInterface.swift +++ b/RemainingCombineInterface.swift @@ -893,54 +893,6 @@ extension Publisher { public func assertNoFailure(_ prefix: String = "", file: StaticString = #file, line: UInt = #line) -> Publishers.AssertNoFailure } -extension Publishers { - - /// A publisher that ignores elements from the upstream publisher until it receives an element from second publisher. - public struct DropUntilOutput : Publisher where Upstream : Publisher, Other : Publisher, Upstream.Failure == Other.Failure { - - /// The kind of values published by this publisher. - public typealias Output = Upstream.Output - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = Upstream.Failure - - /// The publisher that this publisher receives elements from. - public let upstream: Upstream - - /// A publisher to monitor for its first emitted element. - public let other: Other - - /// Creates a publisher that ignores elements from the upstream publisher until it receives an element from another publisher. - /// - /// - Parameters: - /// - upstream: A publisher to drop elements from while waiting for another publisher to emit elements. - /// - other: A publisher to monitor for its first emitted element. - public init(upstream: Upstream, other: Other) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, Upstream.Output == S.Input, Other.Failure == S.Failure - } -} - -extension Publisher { - - /// Ignores elements from the upstream publisher until it receives an element from a second publisher. - /// - /// This publisher requests a single value from the upstream publisher, and it ignores (drops) all elements from that publisher until the upstream publisher produces a value. After the `other` publisher produces an element, this publisher cancels its subscription to the `other` publisher, and allows events from the `upstream` publisher to pass through. - /// After this publisher receives a subscription from the upstream publisher, it passes through backpressure requests from downstream to the upstream publisher. If the upstream publisher acts on those requests before the other publisher produces an item, this publisher drops the elements it receives from the upstream publisher. - /// - /// - Parameter publisher: A publisher to monitor for its first emitted element. - /// - Returns: A publisher that drops elements from the upstream publisher until the `other` publisher produces a value. - public func drop

(untilOutputFrom publisher: P) -> Publishers.DropUntilOutput where P : Publisher, Self.Failure == P.Failure -} - extension Publishers { /// A publisher that publishes elements only after a specified time interval elapses between events. @@ -1564,30 +1516,6 @@ extension Publishers.Retry : Equatable where Upstream : Equatable { public static func == (lhs: Publishers.Retry, rhs: Publishers.Retry) -> Bool } -extension Publishers.ReplaceEmpty : Equatable where Upstream : Equatable, Upstream.Output : Equatable { - - /// Returns a Boolean value that indicates whether two publishers are equivalent. - /// - /// - Parameters: - /// - lhs: A replace empty publisher to compare for equality. - /// - rhs: Another replace empty publisher to compare for equality. - /// - Returns: `true` if the two publishers have equal upstream publishers and output elements, `false` otherwise. - public static func == (lhs: Publishers.ReplaceEmpty, rhs: Publishers.ReplaceEmpty) -> Bool -} - -extension Publishers.DropUntilOutput : Equatable where Upstream : Equatable, Other : Equatable { - - /// Returns a Boolean value indicating whether two values are equal. - /// - /// Equality is the inverse of inequality. For any values `a` and `b`, - /// `a == b` implies that `a != b` is `false`. - /// - /// - Parameters: - /// - lhs: A value to compare. - /// - rhs: Another value to compare. - public static func == (lhs: Publishers.DropUntilOutput, rhs: Publishers.DropUntilOutput) -> Bool -} - extension Publishers.Zip : Equatable where A : Equatable, B : Equatable { /// Returns a Boolean value that indicates whether two publishers are equivalent. diff --git a/Sources/OpenCombine/Publishers/Publishers.DropUntilOutput.swift b/Sources/OpenCombine/Publishers/Publishers.DropUntilOutput.swift new file mode 100644 index 0000000..db9b44b --- /dev/null +++ b/Sources/OpenCombine/Publishers/Publishers.DropUntilOutput.swift @@ -0,0 +1,267 @@ +// +// Publishers.DropUntilOutput.swift +// +// +// Created by Sergej Jaskiewicz on 24.12.2019. +// + +extension Publisher { + + /// Ignores elements from the upstream publisher until it receives an element from + /// a second publisher. + /// + /// This publisher requests a single value from the upstream publisher, and it ignores + /// (drops) all elements from that publisher until the upstream publisher produces + /// a value. After the `other` publisher produces an element, this publisher cancels + /// its subscription to the `other` publisher, and allows events from the `upstream` + /// publisher to pass through. + /// After this publisher receives a subscription from the upstream publisher, it + /// passes through backpressure requests from downstream to the upstream publisher. + /// If the upstream publisher acts on those requests before the other publisher + /// produces an item, this publisher drops the elements it receives from the upstream + /// publisher. + /// + /// - Parameter publisher: A publisher to monitor for its first emitted element. + /// - Returns: A publisher that drops elements from the upstream publisher until the + /// `other` publisher produces a value. + public func drop( + untilOutputFrom publisher: Other + ) -> Publishers.DropUntilOutput where Failure == Other.Failure { + return .init(upstream: self, other: publisher) + } +} + +extension Publishers { + + /// A publisher that ignores elements from the upstream publisher until it receives + /// an element from second publisher. + public struct DropUntilOutput: Publisher + where Upstream.Failure == Other.Failure + { + public typealias Output = Upstream.Output + + public typealias Failure = Upstream.Failure + + /// The publisher that this publisher receives elements from. + public let upstream: Upstream + + /// A publisher to monitor for its first emitted element. + public let other: Other + + /// Creates a publisher that ignores elements from the upstream publisher until + /// it receives an element from another publisher. + /// + /// - Parameters: + /// - upstream: A publisher to drop elements from while waiting for another + /// publisher to emit elements. + /// - other: A publisher to monitor for its first emitted element. + public init(upstream: Upstream, other: Other) { + self.upstream = upstream + self.other = other + } + + public func receive(subscriber: Downstream) + where Upstream.Output == Downstream.Input, + Other.Failure == Downstream.Failure + { + let inner = Inner(downstream: subscriber) + other.subscribe(Inner.OtherSubscriber(inner: inner)) + upstream.subscribe(inner) + subscriber.receive(subscription: inner) + } + } +} + +extension Publishers.DropUntilOutput: Equatable + where Upstream: Equatable, Other: Equatable {} + +extension Publishers.DropUntilOutput { + fileprivate final class Inner + : Subscriber, + Subscription, + CustomStringConvertible, + CustomReflectable, + CustomPlaygroundDisplayConvertible + where Downstream.Input == Upstream.Output, Downstream.Failure == Upstream.Failure + { + typealias Input = Upstream.Output + + typealias Failure = Upstream.Failure + + private let downstream: Downstream + + private var triggered = false + + private let lock = UnfairLock.allocate() + + private let downstreamLock = UnfairRecursiveLock.allocate() + + private var upstreamSubscription: Subscription? + + private var pendingDemand = Subscribers.Demand.none + + private var otherSubscription: Subscription? + + private var otherFinished = false + + private var cancelled = false + + init(downstream: Downstream) { + self.downstream = downstream + } + + deinit { + lock.deallocate() + downstreamLock.deallocate() + } + + func receive(subscription: Subscription) { + lock.lock() + guard upstreamSubscription == nil && !cancelled else { + lock.unlock() + subscription.cancel() + return + } + upstreamSubscription = subscription + if pendingDemand > 0 { + lock.unlock() + subscription.request(pendingDemand) + } else { + lock.unlock() + } + } + + func receive(_ input: Input) -> Subscribers.Demand { + lock.lock() + if !triggered || cancelled { + pendingDemand -= 1 + lock.unlock() + return .none + } + lock.unlock() + downstreamLock.lock() + let newDemand = downstream.receive(input) + downstreamLock.unlock() + return newDemand + } + + func receive(completion: Subscribers.Completion) { + lock.lock() + if cancelled { + lock.unlock() + return + } + cancelled = true + lock.unlock() + downstreamLock.lock() + downstream.receive(completion: completion) + downstreamLock.unlock() + } + + private func receiveOther(subscription: Subscription) { + // Combine doesn't lock here + guard otherSubscription == nil else { + subscription.cancel() + return + } + otherSubscription = subscription + subscription.request(.max(1)) + } + + private func receiveOther(_ input: Other.Output) -> Subscribers.Demand { + lock.lock() + triggered = true + otherSubscription = nil + lock.unlock() + return .none + } + + private func receiveOther(completion: Subscribers.Completion) { + lock.lock() + if triggered { + otherSubscription = nil + lock.unlock() + return + } + + otherFinished = true + if let upstreamSubscription = self.upstreamSubscription { + self.upstreamSubscription = nil + lock.unlock() + upstreamSubscription.cancel() + } else { + lock.unlock() + } + downstreamLock.lock() + downstream.receive(completion: completion) + downstreamLock.unlock() + } + + func request(_ demand: Subscribers.Demand) { + lock.lock() + pendingDemand += demand + if let subscription = upstreamSubscription { + lock.unlock() + subscription.request(demand) + } else { + lock.unlock() + } + } + + func cancel() { + lock.lock() + let upstreamSubscription = self.upstreamSubscription + let otherSubscription = self.otherSubscription + self.upstreamSubscription = nil + self.otherSubscription = nil + cancelled = true + lock.unlock() + + upstreamSubscription?.cancel() + otherSubscription?.cancel() + } + + var description: String { return "DropUntilOutput" } + + var customMirror: Mirror { + return Mirror(self, children: EmptyCollection()) + } + + var playgroundDescription: Any { return description } + } +} + +extension Publishers.DropUntilOutput.Inner { + fileprivate struct OtherSubscriber + : Subscriber, + CustomStringConvertible, + CustomReflectable, + CustomPlaygroundDisplayConvertible + { + let inner: Publishers.DropUntilOutput.Inner + + var combineIdentifier: CombineIdentifier { + return inner.combineIdentifier + } + + func receive(subscription: Subscription) { + inner.receiveOther(subscription: subscription) + } + + func receive(_ input: Other.Output) -> Subscribers.Demand { + return inner.receiveOther(input) + } + + func receive(completion: Subscribers.Completion) { + inner.receiveOther(completion: completion) + } + + var description: String { return "DropUntilOutput" } + + var customMirror: Mirror { + return Mirror(self, children: EmptyCollection()) + } + + var playgroundDescription: Any { return description } + } +} diff --git a/Sources/OpenCombine/Publishers/Publishers.ReplaceEmpty.swift b/Sources/OpenCombine/Publishers/Publishers.ReplaceEmpty.swift index ca635c8..5c749ad 100644 --- a/Sources/OpenCombine/Publishers/Publishers.ReplaceEmpty.swift +++ b/Sources/OpenCombine/Publishers/Publishers.ReplaceEmpty.swift @@ -5,10 +5,6 @@ // Created by Joe Spadafora on 12/10/19. // -#if canImport(COpenCombineHelpers) -import COpenCombineHelpers -#endif - extension Publisher { /// Replaces an empty stream with the provided element. @@ -29,12 +25,8 @@ extension Publishers { /// A publisher that replaces an empty stream with a provided element. public struct ReplaceEmpty: Publisher { - /// The kind of values published by this publisher. public typealias Output = Upstream.Output - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. public typealias Failure = Upstream.Failure /// The element to deliver when the upstream publisher finishes @@ -49,13 +41,6 @@ extension Publishers { self.output = output } - /// This function is called to attach the specified `Subscriber` - /// to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. public func receive(subscriber: Downstream) where Upstream.Failure == Downstream.Failure, Upstream.Output == Downstream.Input @@ -66,6 +51,9 @@ extension Publishers { } } +extension Publishers.ReplaceEmpty: Equatable + where Upstream: Equatable, Upstream.Output: Equatable {} + extension Publishers.ReplaceEmpty { private final class Inner diff --git a/Tests/OpenCombineTests/PublisherTests/DropUntilOutputTests.swift b/Tests/OpenCombineTests/PublisherTests/DropUntilOutputTests.swift new file mode 100644 index 0000000..bfbfe5d --- /dev/null +++ b/Tests/OpenCombineTests/PublisherTests/DropUntilOutputTests.swift @@ -0,0 +1,359 @@ +// +// DropUntilOutputTests.swift +// +// +// Created by Sergej Jaskiewicz on 24.12.2019. +// + +import XCTest + +#if OPENCOMBINE_COMPATIBILITY_TEST +import Combine +#else +import OpenCombine +#endif + +@available(macOS 10.15, iOS 13.0, *) +final class DropUntilOutputTests: XCTestCase { + + func testOtherCompletesBeforeTriggering() { + let otherSubscription = CustomSubscription() + let otherPublisher = CustomPublisher(subscription: otherSubscription) + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: .max(100), + receiveValueDemand: .max(5), + createSut: { $0.drop(untilOutputFrom: otherPublisher) } + ) + + XCTAssertEqual(helper.publisher.send(1), .none) + XCTAssertEqual(helper.publisher.send(2), .none) + + XCTAssertEqual(helper.tracking.history, [.subscription("DropUntilOutput")]) + XCTAssertEqual(helper.subscription.history, [.requested(.max(100))]) + XCTAssertEqual(otherSubscription.history, [.requested(.max(1))]) + + otherPublisher.send(completion: .finished) + otherPublisher.send(completion: .finished) + + XCTAssertEqual(helper.tracking.history, [.subscription("DropUntilOutput"), + .completion(.finished), + .completion(.finished)]) + XCTAssertEqual(helper.subscription.history, [.requested(.max(100)), .cancelled]) + XCTAssertEqual(otherSubscription.history, [.requested(.max(1))]) + + XCTAssertEqual(helper.publisher.send(3), .none) + XCTAssertEqual(otherPublisher.send(1000), .none) + XCTAssertEqual(helper.publisher.send(4), .max(5)) + + XCTAssertEqual(helper.tracking.history, [.subscription("DropUntilOutput"), + .completion(.finished), + .completion(.finished), + .value(4)]) + XCTAssertEqual(helper.subscription.history, [.requested(.max(100)), .cancelled]) + XCTAssertEqual(otherSubscription.history, [.requested(.max(1))]) + } + + func testOtherFailsAfterTriggering() { + let otherSubscription = CustomSubscription() + let otherPublisher = CustomPublisher(subscription: otherSubscription) + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: .max(2), + receiveValueDemand: .max(5), + createSut: { $0.drop(untilOutputFrom: otherPublisher) } + ) + + XCTAssertEqual(helper.publisher.send(1), .none) + XCTAssertEqual(otherPublisher.send(1000), .none) + XCTAssertEqual(helper.publisher.send(2), .max(5)) + + XCTAssertEqual(helper.tracking.history, [.subscription("DropUntilOutput"), + .value(2)]) + XCTAssertEqual(helper.subscription.history, [.requested(.max(2))]) + XCTAssertEqual(otherSubscription.history, [.requested(.max(1))]) + + otherPublisher.send(completion: .failure(.oops)) + otherPublisher.send(completion: .finished) + + XCTAssertEqual(helper.tracking.history, [.subscription("DropUntilOutput"), + .value(2)]) + XCTAssertEqual(helper.subscription.history, [.requested(.max(2))]) + XCTAssertEqual(otherSubscription.history, [.requested(.max(1))]) + } + + func testDemand() throws { + let subscription = CustomSubscription() + let otherSubscription = CustomSubscription() + let publisher = CustomPublisher(subscription: subscription) + let otherPublisher = CustomPublisher(subscription: otherSubscription) + let dropUntilOutput = publisher.drop(untilOutputFrom: otherPublisher) + var downstreamSubscription: Subscription? + let tracking = TrackingSubscriber( + receiveSubscription: { subscription in + downstreamSubscription = subscription + }, + receiveValue: { .max($0) } + ) + dropUntilOutput.subscribe(tracking) + + XCTAssertEqual(subscription.history, []) + XCTAssertEqual(otherSubscription.history, [.requested(.max(1))]) + XCTAssertEqual(tracking.history, [.subscription("DropUntilOutput")]) + + try XCTUnwrap(downstreamSubscription).request(.max(4)) + + XCTAssertEqual(subscription.history, [.requested(.max(4))]) + XCTAssertEqual(otherSubscription.history, [.requested(.max(1))]) + XCTAssertEqual(tracking.history, [.subscription("DropUntilOutput")]) + + XCTAssertEqual(publisher.send(1), .none) + XCTAssertEqual(publisher.send(2), .none) + XCTAssertEqual(otherPublisher.send(1000), .none) + XCTAssertEqual(publisher.send(3), .max(3)) + XCTAssertEqual(publisher.send(4), .max(4)) + XCTAssertEqual(publisher.send(5), .max(5)) + XCTAssertEqual(publisher.send(6), .max(6)) + XCTAssertEqual(publisher.send(7), .max(7)) + + XCTAssertEqual(subscription.history, [.requested(.max(4))]) + XCTAssertEqual(otherSubscription.history, [.requested(.max(1))]) + XCTAssertEqual(tracking.history, [.subscription("DropUntilOutput"), + .value(3), + .value(4), + .value(5), + .value(6), + .value(7)]) + } + + func testCancelAlreadyCancelled() throws { + let otherSubscription = CustomSubscription() + let otherPublisher = CustomPublisher(subscription: otherSubscription) + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: .max(2), + receiveValueDemand: .max(5), + createSut: { $0.drop(untilOutputFrom: otherPublisher) } + ) + + helper.subscription.onCancel = { + XCTAssertEqual(otherSubscription.history, [.requested(.max(1))]) + } + + otherSubscription.onCancel = { + XCTAssertEqual(helper.subscription.history, [.requested(.max(2)), .cancelled]) + } + + try XCTUnwrap(helper.downstreamSubscription).cancel() + try XCTUnwrap(helper.downstreamSubscription).cancel() + try XCTUnwrap(helper.downstreamSubscription).request(.max(10)) + XCTAssertEqual(helper.publisher.send(1000), .none) + helper.publisher.send(completion: .finished) + + let subscription2 = CustomSubscription() + helper.publisher.send(subscription: subscription2) + + XCTAssertEqual(helper.subscription.history, [.requested(.max(2)), .cancelled]) + XCTAssertEqual(otherSubscription.history, [.requested(.max(1)), .cancelled]) + XCTAssertEqual(helper.tracking.history, [.subscription("DropUntilOutput")]) + XCTAssertEqual(subscription2.history, [.cancelled]) + } + + func testSubscribesToOtherFirst() { + let subscription = CustomSubscription() + let otherSubscription = CustomSubscription() + let publisher = CustomPublisher(subscription: subscription) + let otherPublisher = CustomPublisher(subscription: otherSubscription) + let dropUntilOutput = publisher.drop(untilOutputFrom: otherPublisher) + let tracking = TrackingSubscriber( + receiveSubscription: { _ in + XCTAssertNotNil(publisher.subscriber) + XCTAssertNotNil(otherPublisher.subscriber) + XCTAssertEqual(subscription.history, []) + XCTAssertEqual(otherSubscription.history, [.requested(.max(1))]) + } + ) + + otherPublisher.willSubscribe = { _ in + XCTAssertNil(publisher.subscriber) + } + + publisher.willSubscribe = { _ in + XCTAssertNotNil(otherPublisher.subscriber) + } + + dropUntilOutput.subscribe(tracking) + tracking.cancel() + } + + func testSubscribersHaveTheSameCombineIdentifier() { + let subscription = CustomSubscription() + let otherSubscription = CustomSubscription() + let publisher = CustomPublisher(subscription: subscription) + let otherPublisher = CustomPublisher(subscription: otherSubscription) + let dropUntilOutput = publisher.drop(untilOutputFrom: otherPublisher) + let tracking = TrackingSubscriber() + dropUntilOutput.subscribe(tracking) + + XCTAssert(publisher.erasedSubscriber is CustomCombineIdentifierConvertible) + XCTAssert(otherPublisher.erasedSubscriber is CustomCombineIdentifierConvertible) + XCTAssertEqual( + (publisher.erasedSubscriber as? CustomCombineIdentifierConvertible)? + .combineIdentifier, + (otherPublisher.erasedSubscriber as? CustomCombineIdentifierConvertible)? + .combineIdentifier + ) + } + + func testLateSubscription() throws { + + // This publisher doesn't send a subscription when it receives a subscriber + let publisher = CustomPublisher(subscription: nil) + let dropUntilOutput = publisher.drop(untilOutputFrom: Empty()) + let tracking = TrackingSubscriber( + receiveSubscription: { + $0.request(.max(10)) + $0.request(.max(4)) + $0.request(.none) + } + ) + + dropUntilOutput.subscribe(tracking) + + XCTAssertEqual(tracking.history, [.completion(.finished), + .subscription("DropUntilOutput")]) + + let subscription = CustomSubscription() + try XCTUnwrap(publisher.subscriber).receive(subscription: subscription) + + XCTAssertEqual(subscription.history, [.requested(.max(14))]) + XCTAssertEqual(tracking.history, [.completion(.finished), + .subscription("DropUntilOutput")]) + } + + func testReusableOtherSubscriber() throws { + let otherSubscription = CustomSubscription() + let otherPublisher = CustomPublisher(subscription: otherSubscription) + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: .max(2), + receiveValueDemand: .max(5), + createSut: { $0.drop(untilOutputFrom: otherPublisher) } + ) + + let subscription2 = CustomSubscription() + try XCTUnwrap(otherPublisher.subscriber).receive(subscription: subscription2) + + XCTAssertEqual(subscription2.history, [.cancelled]) + XCTAssertEqual(otherPublisher.send(1000), .none) + + let subscription3 = CustomSubscription() + try XCTUnwrap(otherPublisher.subscriber).receive(subscription: subscription3) + + XCTAssertEqual(subscription3.history, [.requested(.max(1))]) + + try XCTUnwrap(helper.downstreamSubscription).cancel() + + XCTAssertEqual(subscription3.history, [.requested(.max(1)), + .cancelled]) + XCTAssertEqual(otherSubscription.history, [.requested(.max(1))]) + XCTAssertEqual(helper.subscription.history, [.requested(.max(2)), .cancelled]) + XCTAssertEqual(helper.tracking.history, [.subscription("DropUntilOutput")]) + } + + func testCrashesWhenReceivesInputAfterCancel() { + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: nil, + receiveValueDemand: .none, + createSut: { $0.drop(untilOutputFrom: Empty()) } + ) + + assertCrashes { + _ = helper.publisher.send(0) + } + } + + func testDropUntilOutputReceiveValueBeforeSubscription() { + testReceiveValueBeforeSubscription( + value: 42, + expected: .crash, + { $0.drop(untilOutputFrom: Empty()) } + ) + } + + func testDropUntilOutputOtherReceiveValueBeforeSubscription() { + testReceiveValueBeforeSubscription( + value: 42, + expected: .history([.completion(.finished), .subscription("DropUntilOutput")], + demand: .none), + { Empty().drop(untilOutputFrom: $0) } + ) + } + + func testDropUntilOutputReceiveCompletionBeforeSubscription() { + testReceiveCompletionBeforeSubscription( + inputType: Int.self, + expected: .history([.completion(.finished), + .subscription("DropUntilOutput"), + .completion(.finished)]), + { $0.drop(untilOutputFrom: Empty()) } + ) + } + + func testDropUntilOutputOtherReceiveCompletionBeforeSubscription() { + testReceiveCompletionBeforeSubscription( + inputType: Int.self, + expected: .history([.completion(.finished), + .subscription("DropUntilOutput"), + .completion(.finished)]), + { Empty().drop(untilOutputFrom: $0) } + ) + } + + func testDropUntilOutputRequestBeforeSubscription() { + testRequestBeforeSubscription(inputType: Int.self, + shouldCrash: false, + { $0.drop(untilOutputFrom: Empty()) }) + } + + func testDropUntilOutputCancelBeforeSubscription() { + testCancelBeforeSubscription(inputType: Int.self, + shouldCrash: false, + { $0.drop(untilOutputFrom: Empty()) }) + } + + func testDropUntilOutputReceiveSubscriptionTwice() throws { + try testReceiveSubscriptionTwice { + $0.drop(untilOutputFrom: Empty()) + } + } + + func testDropUntilOutputLifecycle() throws { + try testLifecycle(sendValue: 31, + cancellingSubscriptionReleasesSubscriber: false, + { $0.drop(untilOutputFrom: Empty()) }) + } + + func testDropUntilOutputOtherLifecycle() throws { + try testLifecycle(sendValue: 31, + cancellingSubscriptionReleasesSubscriber: false, + { Empty().drop(untilOutputFrom: $0) }) + } + + func testDropUntilOutputReflection() throws { + try testReflection(parentInput: Int.self, + parentFailure: TestingError.self, + description: "DropUntilOutput", + customMirror: childrenIsEmpty, + playgroundDescription: "DropUntilOutput", + { $0.drop(untilOutputFrom: Empty()) }) + + try testReflection(parentInput: Int.self, + parentFailure: TestingError.self, + description: "DropUntilOutput", + customMirror: childrenIsEmpty, + playgroundDescription: "DropUntilOutput", + { Empty().drop(untilOutputFrom: $0) }) + } +}