From 1cfb4a2eaecde435edab51ec1b78e5d4c8410069 Mon Sep 17 00:00:00 2001 From: Sergej Jaskiewicz Date: Sun, 28 Jun 2020 19:50:45 +0300 Subject: [PATCH] Implement Publishers.Debounce (#133) --- RemainingCombineInterface.swift | 50 -- .../Helpers/SubscriptionStatus.swift | 11 + .../Publishers/Publishers.Debounce.swift | 264 ++++++++ .../Helpers/VirtualTimeScheduler.swift | 49 +- .../PublisherTests/DebounceTests.swift | 630 ++++++++++++++++++ 5 files changed, 951 insertions(+), 53 deletions(-) create mode 100644 Sources/OpenCombine/Publishers/Publishers.Debounce.swift create mode 100644 Tests/OpenCombineTests/PublisherTests/DebounceTests.swift diff --git a/RemainingCombineInterface.swift b/RemainingCombineInterface.swift index 4817411..9c35382 100644 --- a/RemainingCombineInterface.swift +++ b/RemainingCombineInterface.swift @@ -760,56 +760,6 @@ extension Publisher { public func throttle(for interval: S.SchedulerTimeType.Stride, scheduler: S, latest: Bool) -> Publishers.Throttle where S : Scheduler } -extension Publishers { - - /// A publisher that publishes elements only after a specified time interval elapses between events. - public struct Debounce : Publisher where Upstream : Publisher, Context : Scheduler { - - /// The kind of values published by this publisher. - public typealias Output = Upstream.Output - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = Upstream.Failure - - /// The publisher from which this publisher receives elements. - public let upstream: Upstream - - /// The amount of time the publisher should wait before publishing an element. - public let dueTime: Context.SchedulerTimeType.Stride - - /// The scheduler on which this publisher delivers elements. - public let scheduler: Context - - /// Scheduler options that customize this publisher’s delivery of elements. - public let options: Context.SchedulerOptions? - - public init(upstream: Upstream, dueTime: Context.SchedulerTimeType.Stride, scheduler: Context, options: Context.SchedulerOptions?) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input - } -} - -extension Publisher { - - /// Publishes elements only after a specified time interval elapses between events. - /// - /// Use this operator when you want to wait for a pause in the delivery of events from the upstream publisher. For example, call `debounce` on the publisher from a text field to only receive elements when the user pauses or stops typing. When they start typing again, the `debounce` holds event delivery until the next pause. - /// - Parameters: - /// - dueTime: The time the publisher should wait before publishing an element. - /// - scheduler: The scheduler on which this publisher delivers elements - /// - options: Scheduler options that customize this publisher’s delivery of elements. - /// - Returns: A publisher that publishes events only after a specified time elapses. - public func debounce(for dueTime: S.SchedulerTimeType.Stride, scheduler: S, options: S.SchedulerOptions? = nil) -> Publishers.Debounce where S : Scheduler -} - extension Publishers { /// A publisher created by applying the zip function to two upstream publishers. diff --git a/Sources/OpenCombine/Helpers/SubscriptionStatus.swift b/Sources/OpenCombine/Helpers/SubscriptionStatus.swift index 3224f35..b632c90 100644 --- a/Sources/OpenCombine/Helpers/SubscriptionStatus.swift +++ b/Sources/OpenCombine/Helpers/SubscriptionStatus.swift @@ -10,3 +10,14 @@ internal enum SubscriptionStatus { case subscribed(Subscription) case terminal } + +extension SubscriptionStatus { + internal var isAwaigingSubscription: Bool { + switch self { + case .awaitingSubscription: + return true + default: + return false + } + } +} diff --git a/Sources/OpenCombine/Publishers/Publishers.Debounce.swift b/Sources/OpenCombine/Publishers/Publishers.Debounce.swift new file mode 100644 index 0000000..7e542c5 --- /dev/null +++ b/Sources/OpenCombine/Publishers/Publishers.Debounce.swift @@ -0,0 +1,264 @@ +// +// Publishers.Debounce.swift +// +// +// Created by Sergej Jaskiewicz on 17.12.2019. +// + +extension Publisher { + + /// Publishes elements only after a specified time interval elapses between events. + /// + /// Use this operator when you want to wait for a pause in the delivery of events from + /// the upstream publisher. For example, call `debounce` on the publisher from a text + /// field to only receive elements when the user pauses or stops typing. When they + /// start typing again, the `debounce` holds event delivery until the next pause. + /// + /// - Parameters: + /// - dueTime: The time the publisher should wait before publishing an element. + /// - scheduler: The scheduler on which this publisher delivers elements + /// - options: Scheduler options that customize this publisher’s delivery + /// of elements. + /// - Returns: A publisher that publishes events only after a specified time elapses. + public func debounce( + for dueTime: Context.SchedulerTimeType.Stride, + scheduler: Context, + options: Context.SchedulerOptions? = nil + ) -> Publishers.Debounce { + return .init(upstream: self, + dueTime: dueTime, + scheduler: scheduler, + options: options) + } +} + +extension Publishers { + + /// A publisher that publishes elements only after a specified time interval elapses + /// between events. + public struct Debounce: Publisher { + + public typealias Output = Upstream.Output + + public typealias Failure = Upstream.Failure + + /// The publisher from which this publisher receives elements. + public let upstream: Upstream + + /// The amount of time the publisher should wait before publishing an element. + public let dueTime: Context.SchedulerTimeType.Stride + + /// The scheduler on which this publisher delivers elements. + public let scheduler: Context + + /// Scheduler options that customize this publisher’s delivery of elements. + public let options: Context.SchedulerOptions? + + public init(upstream: Upstream, + dueTime: Context.SchedulerTimeType.Stride, + scheduler: Context, + options: Context.SchedulerOptions?) { + self.upstream = upstream + self.dueTime = dueTime + self.scheduler = scheduler + self.options = options + } + + public func receive(subscriber: Downstream) + where Downstream.Failure == Failure, Downstream.Input == Output + { + let inner = Inner(downstream: subscriber, + dueTime: dueTime, + scheduler: scheduler, + options: options) + upstream.subscribe(inner) + } + } +} + +extension Publishers.Debounce { + private final class Inner + : Subscriber, + Subscription, + CustomStringConvertible, + CustomReflectable, + CustomPlaygroundDisplayConvertible + where Upstream.Output == Downstream.Input, + Upstream.Failure == Downstream.Failure + { + typealias Input = Upstream.Output + + typealias Failure = Upstream.Failure + + private typealias Generation = UInt64 + + private let lock = UnfairLock.allocate() + + private let downstreamLock = UnfairRecursiveLock.allocate() + + private let downstream: Downstream + + private let dueTime: Context.SchedulerTimeType.Stride + + private let scheduler: Context + + private let options: Context.SchedulerOptions? + + private var state = SubscriptionStatus.awaitingSubscription + + private var currentCanceller: Cancellable? + + private var currentValue: Output? + + private var currentGeneration: Generation = 0 + + private var downstreamDemand = Subscribers.Demand.none + + init(downstream: Downstream, + dueTime: Context.SchedulerTimeType.Stride, + scheduler: Context, + options: Context.SchedulerOptions?) { + self.downstream = downstream + self.dueTime = dueTime + self.scheduler = scheduler + self.options = options + } + + deinit { + lock.deallocate() + downstreamLock.deallocate() + } + + func receive(subscription: Subscription) { + lock.lock() + guard case .awaitingSubscription = state else { + lock.unlock() + subscription.cancel() + return + } + state = .subscribed(subscription) + lock.unlock() + downstreamLock.lock() + downstream.receive(subscription: self) + downstreamLock.unlock() + subscription.request(.unlimited) + } + + func receive(_ input: Input) -> Subscribers.Demand { + lock.lock() + precondition(!state.isAwaigingSubscription) + guard case .subscribed = state else { + lock.unlock() + return .none + } + currentGeneration += 1 + let generation = currentGeneration + currentValue = input + let due = scheduler.now.advanced(by: dueTime) + lock.unlock() + let newCanceller = scheduler.schedule(after: due, + interval: dueTime, + tolerance: scheduler.minimumTolerance, + options: options) { [weak self] in + self?.due(generation: generation) + } + lock.lock() + let canceller = currentCanceller + currentCanceller = newCanceller + lock.unlock() + canceller?.cancel() + return .none + } + + func receive(completion: Subscribers.Completion) { + lock.lock() + precondition(!state.isAwaigingSubscription) + guard case .subscribed = state else { + lock.unlock() + return + } + state = .terminal + let canceller = currentCanceller + lock.unlock() + canceller?.cancel() + scheduler.schedule { + self.downstreamLock.lock() + self.downstream.receive(completion: completion) + self.downstreamLock.unlock() + } + } + + func request(_ demand: Subscribers.Demand) { + lock.lock() + precondition(!state.isAwaigingSubscription) + guard case .subscribed = state else { + lock.unlock() + return + } + downstreamDemand += demand + lock.unlock() + } + + func cancel() { + lock.lock() + guard case .subscribed(let subscription) = state else { + lock.unlock() + return + } + state = .terminal + lock.unlock() + subscription.cancel() + } + + var description: String { return "Debounce" } + + var customMirror: Mirror { + let children: [Mirror.Child] = [ + ("downstream", downstream), + ("downstreamDemand", downstreamDemand), + ("currentValue", currentValue as Any) + ] + return Mirror(self, children: children) + } + + var playgroundDescription: Any { return description } + + private func due(generation: Generation) { + lock.lock() + guard case .subscribed = state else { + lock.unlock() + return + } + + // If this condition holds, it means that no values were received + // in this time frame => we should propagate the current value downstream. + guard generation == currentGeneration, let value = currentValue else { + let canceller = currentCanceller + lock.unlock() + canceller?.cancel() + return + } + + let hasAnyDemand = downstreamDemand > 0 + if hasAnyDemand { + downstreamDemand -= 1 + } + + let canceller = currentCanceller! + lock.unlock() + canceller.cancel() + + guard hasAnyDemand else { return } + + downstreamLock.lock() + let newDemand = downstream.receive(value) + downstreamLock.unlock() + + if newDemand == .none { return } + + lock.lock() + downstreamDemand += newDemand + lock.unlock() + } + } +} diff --git a/Tests/OpenCombineTests/Helpers/VirtualTimeScheduler.swift b/Tests/OpenCombineTests/Helpers/VirtualTimeScheduler.swift index ecd6a79..5b5924d 100644 --- a/Tests/OpenCombineTests/Helpers/VirtualTimeScheduler.swift +++ b/Tests/OpenCombineTests/Helpers/VirtualTimeScheduler.swift @@ -11,6 +11,14 @@ import Combine import OpenCombine #endif +@available(macOS 10.15, iOS 13.0, *) +protocol CancellableTokenProtocol: Cancellable { + + init(_ scheduler: VirtualTimeScheduler) + + var isCancelled: Bool { get } +} + @available(macOS 10.15, iOS 13.0, *) final class VirtualTimeScheduler: Scheduler { @@ -171,15 +179,42 @@ final class VirtualTimeScheduler: Scheduler { } } - private final class CancellableToken: Cancellable { + final class CancellableToken: CancellableTokenProtocol { + + weak var scheduler: VirtualTimeScheduler? private(set) var isCancelled = false + init(_ scheduler: VirtualTimeScheduler) { + self.scheduler = scheduler + } + + deinit { + scheduler?.cancellableTokenDeinitCount += 1 + } + func cancel() { isCancelled = true } } + final class NoopCancellableToken: CancellableTokenProtocol { + + weak var scheduler: VirtualTimeScheduler? + + init(_ scheduler: VirtualTimeScheduler) { + self.scheduler = scheduler + } + + deinit { + scheduler?.cancellableTokenDeinitCount += 1 + } + + var isCancelled: Bool { return false } + + func cancel() {} + } + enum Event: Equatable, CustomStringConvertible { case now case minimumTolerance @@ -239,6 +274,14 @@ final class VirtualTimeScheduler: Scheduler { private var workQueue = FairPriorityQueue Void>() + private let cancellableTokenType: CancellableTokenProtocol.Type + + fileprivate(set) var cancellableTokenDeinitCount = 0 + + init(cancellableTokenType: CancellableTokenProtocol.Type = CancellableToken.self) { + self.cancellableTokenType = cancellableTokenType + } + var scheduledDates: [SchedulerTimeType] { return workQueue.map { $0.0 } } @@ -275,7 +318,7 @@ final class VirtualTimeScheduler: Scheduler { interval: interval, tolerance: tolerance, options: options)) - let cancellableToken = CancellableToken() + let cancellableToken = cancellableTokenType.init(self) repeatedlyExecute(after: date, interval: interval, cancellableToken: cancellableToken, @@ -285,7 +328,7 @@ final class VirtualTimeScheduler: Scheduler { private func repeatedlyExecute(after date: SchedulerTimeType, interval: SchedulerTimeType.Stride, - cancellableToken: CancellableToken, + cancellableToken: CancellableTokenProtocol, action: @escaping () -> Void) { let enqueuedAction: () -> Void = { [unowned self] in if cancellableToken.isCancelled { return } diff --git a/Tests/OpenCombineTests/PublisherTests/DebounceTests.swift b/Tests/OpenCombineTests/PublisherTests/DebounceTests.swift new file mode 100644 index 0000000..01745d0 --- /dev/null +++ b/Tests/OpenCombineTests/PublisherTests/DebounceTests.swift @@ -0,0 +1,630 @@ +// +// DebounceTests.swift +// +// +// Created by Sergej Jaskiewicz on 28.06.2020. +// + +import XCTest + +#if OPENCOMBINE_COMPATIBILITY_TEST +import Combine +#else +import OpenCombine +#endif + +@available(macOS 10.15, iOS 13.0, *) +final class DebounceTests: XCTestCase { + + func testBasicBehavior() { + let scheduler = VirtualTimeScheduler() + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: .max(2), + receiveValueDemand: .max(1), + createSut: { + $0.debounce(for: .nanoseconds(13), + scheduler: scheduler, + options: .nontrivialOptions) + } + ) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, []) + + XCTAssertEqual(helper.publisher.send(1), .none) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [.now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions)]) + + scheduler.rewind(to: .nanoseconds(4)) + XCTAssertEqual(helper.publisher.send(2), .none) + scheduler.rewind(to: .nanoseconds(9)) + XCTAssertEqual(helper.publisher.send(3), .none) + + scheduler.rewind(to: .nanoseconds(200)) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"), + .value(3)]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [.now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(17), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(22), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions)]) + + helper.publisher.send(completion: .finished) + helper.publisher.send(completion: .failure(.oops)) // ignored + XCTAssertEqual(helper.publisher.send(-1), .none) // ignored + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"), + .value(3)]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [.now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(17), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(22), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .schedule(options: nil)]) + + scheduler.rewind(to: .nanoseconds(300)) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"), + .value(3), + .completion(.finished)]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [.now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(17), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(22), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .schedule(options: nil)]) + XCTAssertEqual(scheduler.cancellableTokenDeinitCount, 2) + } + + func testFinishBeforeDue() { + let scheduler = VirtualTimeScheduler() + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: .max(1), + receiveValueDemand: .none, + createSut: { + $0.debounce(for: .nanoseconds(13), + scheduler: scheduler, + options: .nontrivialOptions) + } + ) + + XCTAssertEqual(helper.publisher.send(1), .none) + scheduler.rewind(to: .nanoseconds(4)) + helper.publisher.send(completion: .failure(.oops)) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [ + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .schedule(options: nil)]) + + scheduler.rewind(to: .nanoseconds(100)) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"), + .completion(.failure(.oops))]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [ + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .schedule(options: nil)]) + XCTAssertEqual(scheduler.cancellableTokenDeinitCount, 0) + } + + func testFailBeforeDue() { + let scheduler = VirtualTimeScheduler() + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: .max(1), + receiveValueDemand: .none, + createSut: { + $0.debounce(for: .nanoseconds(13), + scheduler: scheduler, + options: .nontrivialOptions) + } + ) + + XCTAssertEqual(helper.publisher.send(1), .none) + scheduler.rewind(to: .nanoseconds(4)) + helper.publisher.send(completion: .finished) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [ + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .schedule(options: nil)]) + + scheduler.rewind(to: .nanoseconds(100)) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"), + .completion(.finished)]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [ + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .schedule(options: nil)]) + XCTAssertEqual(scheduler.cancellableTokenDeinitCount, 0) + } + + func testCancelBeforeDue() throws { + let scheduler = VirtualTimeScheduler() + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: .max(1), + receiveValueDemand: .none, + createSut: { + $0.debounce(for: .nanoseconds(13), + scheduler: scheduler, + options: .nontrivialOptions) + } + ) + + XCTAssertEqual(helper.publisher.send(1), .none) + scheduler.rewind(to: .nanoseconds(4)) + try XCTUnwrap(helper.downstreamSubscription).cancel() + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited), .cancelled]) + XCTAssertEqual(scheduler.history, + [.now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions)]) + + scheduler.rewind(to: .nanoseconds(100)) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited), .cancelled]) + XCTAssertEqual(scheduler.history, + [.now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions)]) + XCTAssertEqual(scheduler.cancellableTokenDeinitCount, 0) + } + + func testDemand() throws { + let scheduler = VirtualTimeScheduler() + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: nil, + receiveValueDemand: .none, + createSut: { + $0.debounce(for: .nanoseconds(13), + scheduler: scheduler, + options: .nontrivialOptions) + } + ) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, []) + + XCTAssertEqual(helper.publisher.send(1), .none) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [.now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions)]) + + scheduler.rewind(to: .nanoseconds(100)) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [.now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions)]) + + try XCTUnwrap(helper.downstreamSubscription).request(.max(3)) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [.now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions)]) + + scheduler.rewind(to: .nanoseconds(200)) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [.now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions)]) + + XCTAssertEqual(helper.publisher.send(2), .none) + scheduler.rewind(to: .nanoseconds(250)) + XCTAssertEqual(helper.publisher.send(3), .none) + scheduler.rewind(to: .nanoseconds(300)) + XCTAssertEqual(helper.publisher.send(4), .none) + scheduler.rewind(to: .nanoseconds(350)) + XCTAssertEqual(helper.publisher.send(5), .none) + scheduler.rewind(to: .nanoseconds(400)) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"), + .value(2), + .value(3), + .value(4)]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [.now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(213), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(263), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(313), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(363), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions)]) + + try XCTUnwrap(helper.downstreamSubscription).cancel() + try XCTUnwrap(helper.downstreamSubscription).request(.max(1)) + + XCTAssertEqual(helper.publisher.send(6), .none) + scheduler.rewind(to: .nanoseconds(450)) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"), + .value(2), + .value(3), + .value(4)]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited), .cancelled]) + XCTAssertEqual(scheduler.history, + [.now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(213), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(263), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(313), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(363), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions)]) + XCTAssertEqual(scheduler.cancellableTokenDeinitCount, 4) + } + + func testBadScheduler() { + // What if the scheduler returns a cancellable that does nothing at all? + + let scheduler = VirtualTimeScheduler( + cancellableTokenType: VirtualTimeScheduler.NoopCancellableToken.self + ) + + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: .unlimited, + receiveValueDemand: .none, + createSut: { + $0.debounce(for: .nanoseconds(13), + scheduler: scheduler, + options: .nontrivialOptions) + } + ) + + XCTAssertEqual(helper.publisher.send(1), .none) + scheduler.rewind(to: .nanoseconds(2)) + XCTAssertEqual(helper.publisher.send(2), .none) + scheduler.rewind(to: .nanoseconds(4)) + XCTAssertEqual(helper.publisher.send(3), .none) + scheduler.rewind(to: .nanoseconds(6)) + + XCTAssertEqual(helper.publisher.send(42), .none) + scheduler.rewind(to: .nanoseconds(50)) + + XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"), + .value(42), + .value(42), + .value(42)]) + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(scheduler.history, + [.now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(13), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(15), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(17), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions), + .now, + .minimumTolerance, + .scheduleAfterDateWithInterval(.nanoseconds(19), + interval: .nanoseconds(13), + tolerance: .nanoseconds(7), + options: .nontrivialOptions)]) + + XCTAssertEqual(scheduler.cancellableTokenDeinitCount, 0) + } + + func testSetupTimerWeakCapture() { + let scheduler = VirtualTimeScheduler() + var subscriptionDestroyed = false + do { + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: .unlimited, + receiveValueDemand: .none, + createSut: { + $0.debounce(for: .nanoseconds(13), scheduler: scheduler) + } + ) + + helper.tracking.onDeinit = { subscriptionDestroyed = true } + + XCTAssertEqual(helper.publisher.send(1), .none) + } + + XCTAssertTrue(subscriptionDestroyed) + } + + func testCrashesWithImmediateScheduler() { + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: .max(2), + receiveValueDemand: .max(1), + createSut: { + $0.debounce(for: .nanoseconds(13), + scheduler: ImmediateScheduler.shared) + } + ) + + assertCrashes { + _ = helper.publisher.send(1) + } + } + + func testTimeoutReceiveValueBeforeSubscription() { + let scheduler = VirtualTimeScheduler() + testReceiveValueBeforeSubscription( + value: 42, + expected: .crash, + { $0.debounce(for: .nanoseconds(13), scheduler: scheduler) } + ) + } + + func testTimeoutReceiveCompletionBeforeSubscription() { + let scheduler = VirtualTimeScheduler() + testReceiveCompletionBeforeSubscription( + inputType: Int.self, + expected: .crash, + { $0.debounce(for: .nanoseconds(13), scheduler: scheduler) } + ) + } + + func testTimeoutRequestBeforeSubscription() { + let scheduler = VirtualTimeScheduler() + testRequestBeforeSubscription( + inputType: Int.self, + shouldCrash: true, + { $0.debounce(for: .nanoseconds(13), scheduler: scheduler) } + ) + } + + func testTimeoutReceiveSubscriptionTwice() throws { + let scheduler = VirtualTimeScheduler() + + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: nil, + receiveValueDemand: .none, + createSut: { $0.debounce(for: .nanoseconds(13), scheduler: scheduler) } + ) + + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)]) + + let secondSubscription = CustomSubscription() + + try XCTUnwrap(helper.publisher.subscriber) + .receive(subscription: secondSubscription) + + XCTAssertEqual(secondSubscription.history, [.cancelled]) + + try XCTUnwrap(helper.publisher.subscriber) + .receive(subscription: helper.subscription) + + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited), + .cancelled]) + + try XCTUnwrap(helper.downstreamSubscription).cancel() + + XCTAssertEqual(helper.subscription.history, [.requested(.unlimited), + .cancelled, + .cancelled]) + } + + func testTimeoutCancelBeforeSubscription() { + let scheduler = VirtualTimeScheduler() + testCancelBeforeSubscription( + inputType: Int.self, + shouldCrash: false, + { $0.timeout(.nanoseconds(13), scheduler: scheduler) } + ) + } + + func testDebounceReflection() throws { + let scheduler = VirtualTimeScheduler() + + let customMirror = hasUpdatedReflection + ? expectedChildren( + ("downstream", .contains("TrackingSubscriberBase")), + ("downstreamDemand", "max(0)"), + ("currentValue", "nil") + ) + : expectedChildren( + ("upstream", .contains("CustomConnectablePublisherBase")), + ("downstream", .contains("TrackingSubscriberBase")), + ("upstreamSubscription", .anything), + ("downstreamDemand", "max(0)"), + ("currentValue", "nil") + ) + + try testReflection( + parentInput: Int.self, + parentFailure: Error.self, + description: "Debounce", + customMirror: customMirror, + playgroundDescription: "Debounce", + { $0.debounce(for: .nanoseconds(13), scheduler: scheduler) } + ) + } +} + +// FIXME: Remove this as soon as we switch our CI to the latest OS releases +private var hasUpdatedReflection: Bool { +#if OPENCOMBINE_COMPATIBILITY_TEST + if #available(macOS 10.16, iOS 14.0, *) { + return true + } else { + return false + } +#else + return true +#endif +}