Implement Publishers.Debounce (#133)

This commit is contained in:
Sergej Jaskiewicz
2020-06-28 19:50:45 +03:00
committed by GitHub
parent 2b64b7981d
commit 1cfb4a2eae
5 changed files with 951 additions and 53 deletions
-50
View File
@@ -760,56 +760,6 @@ extension Publisher {
public func throttle<S>(for interval: S.SchedulerTimeType.Stride, scheduler: S, latest: Bool) -> Publishers.Throttle<Self, S> where S : Scheduler
}
extension Publishers {
/// A publisher that publishes elements only after a specified time interval elapses between events.
public struct Debounce<Upstream, Context> : Publisher where Upstream : Publisher, Context : Scheduler {
/// The kind of values published by this publisher.
public typealias Output = Upstream.Output
/// The kind of errors this publisher might publish.
///
/// Use `Never` if this `Publisher` does not publish errors.
public typealias Failure = Upstream.Failure
/// The publisher from which this publisher receives elements.
public let upstream: Upstream
/// The amount of time the publisher should wait before publishing an element.
public let dueTime: Context.SchedulerTimeType.Stride
/// The scheduler on which this publisher delivers elements.
public let scheduler: Context
/// Scheduler options that customize this publishers delivery of elements.
public let options: Context.SchedulerOptions?
public init(upstream: Upstream, dueTime: Context.SchedulerTimeType.Stride, scheduler: Context, options: Context.SchedulerOptions?)
/// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
///
/// - SeeAlso: `subscribe(_:)`
/// - Parameters:
/// - subscriber: The subscriber to attach to this `Publisher`.
/// once attached it can begin to receive values.
public func receive<S>(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input
}
}
extension Publisher {
/// Publishes elements only after a specified time interval elapses between events.
///
/// Use this operator when you want to wait for a pause in the delivery of events from the upstream publisher. For example, call `debounce` on the publisher from a text field to only receive elements when the user pauses or stops typing. When they start typing again, the `debounce` holds event delivery until the next pause.
/// - Parameters:
/// - dueTime: The time the publisher should wait before publishing an element.
/// - scheduler: The scheduler on which this publisher delivers elements
/// - options: Scheduler options that customize this publishers delivery of elements.
/// - Returns: A publisher that publishes events only after a specified time elapses.
public func debounce<S>(for dueTime: S.SchedulerTimeType.Stride, scheduler: S, options: S.SchedulerOptions? = nil) -> Publishers.Debounce<Self, S> where S : Scheduler
}
extension Publishers {
/// A publisher created by applying the zip function to two upstream publishers.
@@ -10,3 +10,14 @@ internal enum SubscriptionStatus {
case subscribed(Subscription)
case terminal
}
extension SubscriptionStatus {
internal var isAwaigingSubscription: Bool {
switch self {
case .awaitingSubscription:
return true
default:
return false
}
}
}
@@ -0,0 +1,264 @@
//
// Publishers.Debounce.swift
//
//
// Created by Sergej Jaskiewicz on 17.12.2019.
//
extension Publisher {
/// Publishes elements only after a specified time interval elapses between events.
///
/// Use this operator when you want to wait for a pause in the delivery of events from
/// the upstream publisher. For example, call `debounce` on the publisher from a text
/// field to only receive elements when the user pauses or stops typing. When they
/// start typing again, the `debounce` holds event delivery until the next pause.
///
/// - Parameters:
/// - dueTime: The time the publisher should wait before publishing an element.
/// - scheduler: The scheduler on which this publisher delivers elements
/// - options: Scheduler options that customize this publishers delivery
/// of elements.
/// - Returns: A publisher that publishes events only after a specified time elapses.
public func debounce<Context: Scheduler>(
for dueTime: Context.SchedulerTimeType.Stride,
scheduler: Context,
options: Context.SchedulerOptions? = nil
) -> Publishers.Debounce<Self, Context> {
return .init(upstream: self,
dueTime: dueTime,
scheduler: scheduler,
options: options)
}
}
extension Publishers {
/// A publisher that publishes elements only after a specified time interval elapses
/// between events.
public struct Debounce<Upstream: Publisher, Context: Scheduler>: Publisher {
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure
/// The publisher from which this publisher receives elements.
public let upstream: Upstream
/// The amount of time the publisher should wait before publishing an element.
public let dueTime: Context.SchedulerTimeType.Stride
/// The scheduler on which this publisher delivers elements.
public let scheduler: Context
/// Scheduler options that customize this publishers delivery of elements.
public let options: Context.SchedulerOptions?
public init(upstream: Upstream,
dueTime: Context.SchedulerTimeType.Stride,
scheduler: Context,
options: Context.SchedulerOptions?) {
self.upstream = upstream
self.dueTime = dueTime
self.scheduler = scheduler
self.options = options
}
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Downstream.Failure == Failure, Downstream.Input == Output
{
let inner = Inner(downstream: subscriber,
dueTime: dueTime,
scheduler: scheduler,
options: options)
upstream.subscribe(inner)
}
}
}
extension Publishers.Debounce {
private final class Inner<Downstream: Subscriber>
: Subscriber,
Subscription,
CustomStringConvertible,
CustomReflectable,
CustomPlaygroundDisplayConvertible
where Upstream.Output == Downstream.Input,
Upstream.Failure == Downstream.Failure
{
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private typealias Generation = UInt64
private let lock = UnfairLock.allocate()
private let downstreamLock = UnfairRecursiveLock.allocate()
private let downstream: Downstream
private let dueTime: Context.SchedulerTimeType.Stride
private let scheduler: Context
private let options: Context.SchedulerOptions?
private var state = SubscriptionStatus.awaitingSubscription
private var currentCanceller: Cancellable?
private var currentValue: Output?
private var currentGeneration: Generation = 0
private var downstreamDemand = Subscribers.Demand.none
init(downstream: Downstream,
dueTime: Context.SchedulerTimeType.Stride,
scheduler: Context,
options: Context.SchedulerOptions?) {
self.downstream = downstream
self.dueTime = dueTime
self.scheduler = scheduler
self.options = options
}
deinit {
lock.deallocate()
downstreamLock.deallocate()
}
func receive(subscription: Subscription) {
lock.lock()
guard case .awaitingSubscription = state else {
lock.unlock()
subscription.cancel()
return
}
state = .subscribed(subscription)
lock.unlock()
downstreamLock.lock()
downstream.receive(subscription: self)
downstreamLock.unlock()
subscription.request(.unlimited)
}
func receive(_ input: Input) -> Subscribers.Demand {
lock.lock()
precondition(!state.isAwaigingSubscription)
guard case .subscribed = state else {
lock.unlock()
return .none
}
currentGeneration += 1
let generation = currentGeneration
currentValue = input
let due = scheduler.now.advanced(by: dueTime)
lock.unlock()
let newCanceller = scheduler.schedule(after: due,
interval: dueTime,
tolerance: scheduler.minimumTolerance,
options: options) { [weak self] in
self?.due(generation: generation)
}
lock.lock()
let canceller = currentCanceller
currentCanceller = newCanceller
lock.unlock()
canceller?.cancel()
return .none
}
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
lock.lock()
precondition(!state.isAwaigingSubscription)
guard case .subscribed = state else {
lock.unlock()
return
}
state = .terminal
let canceller = currentCanceller
lock.unlock()
canceller?.cancel()
scheduler.schedule {
self.downstreamLock.lock()
self.downstream.receive(completion: completion)
self.downstreamLock.unlock()
}
}
func request(_ demand: Subscribers.Demand) {
lock.lock()
precondition(!state.isAwaigingSubscription)
guard case .subscribed = state else {
lock.unlock()
return
}
downstreamDemand += demand
lock.unlock()
}
func cancel() {
lock.lock()
guard case .subscribed(let subscription) = state else {
lock.unlock()
return
}
state = .terminal
lock.unlock()
subscription.cancel()
}
var description: String { return "Debounce" }
var customMirror: Mirror {
let children: [Mirror.Child] = [
("downstream", downstream),
("downstreamDemand", downstreamDemand),
("currentValue", currentValue as Any)
]
return Mirror(self, children: children)
}
var playgroundDescription: Any { return description }
private func due(generation: Generation) {
lock.lock()
guard case .subscribed = state else {
lock.unlock()
return
}
// If this condition holds, it means that no values were received
// in this time frame => we should propagate the current value downstream.
guard generation == currentGeneration, let value = currentValue else {
let canceller = currentCanceller
lock.unlock()
canceller?.cancel()
return
}
let hasAnyDemand = downstreamDemand > 0
if hasAnyDemand {
downstreamDemand -= 1
}
let canceller = currentCanceller!
lock.unlock()
canceller.cancel()
guard hasAnyDemand else { return }
downstreamLock.lock()
let newDemand = downstream.receive(value)
downstreamLock.unlock()
if newDemand == .none { return }
lock.lock()
downstreamDemand += newDemand
lock.unlock()
}
}
}
@@ -11,6 +11,14 @@ import Combine
import OpenCombine
#endif
@available(macOS 10.15, iOS 13.0, *)
protocol CancellableTokenProtocol: Cancellable {
init(_ scheduler: VirtualTimeScheduler)
var isCancelled: Bool { get }
}
@available(macOS 10.15, iOS 13.0, *)
final class VirtualTimeScheduler: Scheduler {
@@ -171,15 +179,42 @@ final class VirtualTimeScheduler: Scheduler {
}
}
private final class CancellableToken: Cancellable {
final class CancellableToken: CancellableTokenProtocol {
weak var scheduler: VirtualTimeScheduler?
private(set) var isCancelled = false
init(_ scheduler: VirtualTimeScheduler) {
self.scheduler = scheduler
}
deinit {
scheduler?.cancellableTokenDeinitCount += 1
}
func cancel() {
isCancelled = true
}
}
final class NoopCancellableToken: CancellableTokenProtocol {
weak var scheduler: VirtualTimeScheduler?
init(_ scheduler: VirtualTimeScheduler) {
self.scheduler = scheduler
}
deinit {
scheduler?.cancellableTokenDeinitCount += 1
}
var isCancelled: Bool { return false }
func cancel() {}
}
enum Event: Equatable, CustomStringConvertible {
case now
case minimumTolerance
@@ -239,6 +274,14 @@ final class VirtualTimeScheduler: Scheduler {
private var workQueue = FairPriorityQueue<SchedulerTimeType, () -> Void>()
private let cancellableTokenType: CancellableTokenProtocol.Type
fileprivate(set) var cancellableTokenDeinitCount = 0
init(cancellableTokenType: CancellableTokenProtocol.Type = CancellableToken.self) {
self.cancellableTokenType = cancellableTokenType
}
var scheduledDates: [SchedulerTimeType] {
return workQueue.map { $0.0 }
}
@@ -275,7 +318,7 @@ final class VirtualTimeScheduler: Scheduler {
interval: interval,
tolerance: tolerance,
options: options))
let cancellableToken = CancellableToken()
let cancellableToken = cancellableTokenType.init(self)
repeatedlyExecute(after: date,
interval: interval,
cancellableToken: cancellableToken,
@@ -285,7 +328,7 @@ final class VirtualTimeScheduler: Scheduler {
private func repeatedlyExecute(after date: SchedulerTimeType,
interval: SchedulerTimeType.Stride,
cancellableToken: CancellableToken,
cancellableToken: CancellableTokenProtocol,
action: @escaping () -> Void) {
let enqueuedAction: () -> Void = { [unowned self] in
if cancellableToken.isCancelled { return }
@@ -0,0 +1,630 @@
//
// DebounceTests.swift
//
//
// Created by Sergej Jaskiewicz on 28.06.2020.
//
import XCTest
#if OPENCOMBINE_COMPATIBILITY_TEST
import Combine
#else
import OpenCombine
#endif
@available(macOS 10.15, iOS 13.0, *)
final class DebounceTests: XCTestCase {
func testBasicBehavior() {
let scheduler = VirtualTimeScheduler()
let helper = OperatorTestHelper(
publisherType: CustomPublisher.self,
initialDemand: .max(2),
receiveValueDemand: .max(1),
createSut: {
$0.debounce(for: .nanoseconds(13),
scheduler: scheduler,
options: .nontrivialOptions)
}
)
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history, [])
XCTAssertEqual(helper.publisher.send(1), .none)
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions)])
scheduler.rewind(to: .nanoseconds(4))
XCTAssertEqual(helper.publisher.send(2), .none)
scheduler.rewind(to: .nanoseconds(9))
XCTAssertEqual(helper.publisher.send(3), .none)
scheduler.rewind(to: .nanoseconds(200))
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"),
.value(3)])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(17),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(22),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions)])
helper.publisher.send(completion: .finished)
helper.publisher.send(completion: .failure(.oops)) // ignored
XCTAssertEqual(helper.publisher.send(-1), .none) // ignored
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"),
.value(3)])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(17),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(22),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.schedule(options: nil)])
scheduler.rewind(to: .nanoseconds(300))
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"),
.value(3),
.completion(.finished)])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(17),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(22),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.schedule(options: nil)])
XCTAssertEqual(scheduler.cancellableTokenDeinitCount, 2)
}
func testFinishBeforeDue() {
let scheduler = VirtualTimeScheduler()
let helper = OperatorTestHelper(
publisherType: CustomPublisher.self,
initialDemand: .max(1),
receiveValueDemand: .none,
createSut: {
$0.debounce(for: .nanoseconds(13),
scheduler: scheduler,
options: .nontrivialOptions)
}
)
XCTAssertEqual(helper.publisher.send(1), .none)
scheduler.rewind(to: .nanoseconds(4))
helper.publisher.send(completion: .failure(.oops))
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.schedule(options: nil)])
scheduler.rewind(to: .nanoseconds(100))
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"),
.completion(.failure(.oops))])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.schedule(options: nil)])
XCTAssertEqual(scheduler.cancellableTokenDeinitCount, 0)
}
func testFailBeforeDue() {
let scheduler = VirtualTimeScheduler()
let helper = OperatorTestHelper(
publisherType: CustomPublisher.self,
initialDemand: .max(1),
receiveValueDemand: .none,
createSut: {
$0.debounce(for: .nanoseconds(13),
scheduler: scheduler,
options: .nontrivialOptions)
}
)
XCTAssertEqual(helper.publisher.send(1), .none)
scheduler.rewind(to: .nanoseconds(4))
helper.publisher.send(completion: .finished)
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.schedule(options: nil)])
scheduler.rewind(to: .nanoseconds(100))
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"),
.completion(.finished)])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.schedule(options: nil)])
XCTAssertEqual(scheduler.cancellableTokenDeinitCount, 0)
}
func testCancelBeforeDue() throws {
let scheduler = VirtualTimeScheduler()
let helper = OperatorTestHelper(
publisherType: CustomPublisher.self,
initialDemand: .max(1),
receiveValueDemand: .none,
createSut: {
$0.debounce(for: .nanoseconds(13),
scheduler: scheduler,
options: .nontrivialOptions)
}
)
XCTAssertEqual(helper.publisher.send(1), .none)
scheduler.rewind(to: .nanoseconds(4))
try XCTUnwrap(helper.downstreamSubscription).cancel()
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited), .cancelled])
XCTAssertEqual(scheduler.history,
[.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions)])
scheduler.rewind(to: .nanoseconds(100))
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited), .cancelled])
XCTAssertEqual(scheduler.history,
[.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions)])
XCTAssertEqual(scheduler.cancellableTokenDeinitCount, 0)
}
func testDemand() throws {
let scheduler = VirtualTimeScheduler()
let helper = OperatorTestHelper(
publisherType: CustomPublisher.self,
initialDemand: nil,
receiveValueDemand: .none,
createSut: {
$0.debounce(for: .nanoseconds(13),
scheduler: scheduler,
options: .nontrivialOptions)
}
)
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history, [])
XCTAssertEqual(helper.publisher.send(1), .none)
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions)])
scheduler.rewind(to: .nanoseconds(100))
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions)])
try XCTUnwrap(helper.downstreamSubscription).request(.max(3))
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions)])
scheduler.rewind(to: .nanoseconds(200))
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce")])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions)])
XCTAssertEqual(helper.publisher.send(2), .none)
scheduler.rewind(to: .nanoseconds(250))
XCTAssertEqual(helper.publisher.send(3), .none)
scheduler.rewind(to: .nanoseconds(300))
XCTAssertEqual(helper.publisher.send(4), .none)
scheduler.rewind(to: .nanoseconds(350))
XCTAssertEqual(helper.publisher.send(5), .none)
scheduler.rewind(to: .nanoseconds(400))
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"),
.value(2),
.value(3),
.value(4)])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(213),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(263),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(313),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(363),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions)])
try XCTUnwrap(helper.downstreamSubscription).cancel()
try XCTUnwrap(helper.downstreamSubscription).request(.max(1))
XCTAssertEqual(helper.publisher.send(6), .none)
scheduler.rewind(to: .nanoseconds(450))
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"),
.value(2),
.value(3),
.value(4)])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited), .cancelled])
XCTAssertEqual(scheduler.history,
[.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(213),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(263),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(313),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(363),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions)])
XCTAssertEqual(scheduler.cancellableTokenDeinitCount, 4)
}
func testBadScheduler() {
// What if the scheduler returns a cancellable that does nothing at all?
let scheduler = VirtualTimeScheduler(
cancellableTokenType: VirtualTimeScheduler.NoopCancellableToken.self
)
let helper = OperatorTestHelper(
publisherType: CustomPublisher.self,
initialDemand: .unlimited,
receiveValueDemand: .none,
createSut: {
$0.debounce(for: .nanoseconds(13),
scheduler: scheduler,
options: .nontrivialOptions)
}
)
XCTAssertEqual(helper.publisher.send(1), .none)
scheduler.rewind(to: .nanoseconds(2))
XCTAssertEqual(helper.publisher.send(2), .none)
scheduler.rewind(to: .nanoseconds(4))
XCTAssertEqual(helper.publisher.send(3), .none)
scheduler.rewind(to: .nanoseconds(6))
XCTAssertEqual(helper.publisher.send(42), .none)
scheduler.rewind(to: .nanoseconds(50))
XCTAssertEqual(helper.tracking.history, [.subscription("Debounce"),
.value(42),
.value(42),
.value(42)])
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
XCTAssertEqual(scheduler.history,
[.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(13),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(15),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(17),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions),
.now,
.minimumTolerance,
.scheduleAfterDateWithInterval(.nanoseconds(19),
interval: .nanoseconds(13),
tolerance: .nanoseconds(7),
options: .nontrivialOptions)])
XCTAssertEqual(scheduler.cancellableTokenDeinitCount, 0)
}
func testSetupTimerWeakCapture() {
let scheduler = VirtualTimeScheduler()
var subscriptionDestroyed = false
do {
let helper = OperatorTestHelper(
publisherType: CustomPublisher.self,
initialDemand: .unlimited,
receiveValueDemand: .none,
createSut: {
$0.debounce(for: .nanoseconds(13), scheduler: scheduler)
}
)
helper.tracking.onDeinit = { subscriptionDestroyed = true }
XCTAssertEqual(helper.publisher.send(1), .none)
}
XCTAssertTrue(subscriptionDestroyed)
}
func testCrashesWithImmediateScheduler() {
let helper = OperatorTestHelper(
publisherType: CustomPublisher.self,
initialDemand: .max(2),
receiveValueDemand: .max(1),
createSut: {
$0.debounce(for: .nanoseconds(13),
scheduler: ImmediateScheduler.shared)
}
)
assertCrashes {
_ = helper.publisher.send(1)
}
}
func testTimeoutReceiveValueBeforeSubscription() {
let scheduler = VirtualTimeScheduler()
testReceiveValueBeforeSubscription(
value: 42,
expected: .crash,
{ $0.debounce(for: .nanoseconds(13), scheduler: scheduler) }
)
}
func testTimeoutReceiveCompletionBeforeSubscription() {
let scheduler = VirtualTimeScheduler()
testReceiveCompletionBeforeSubscription(
inputType: Int.self,
expected: .crash,
{ $0.debounce(for: .nanoseconds(13), scheduler: scheduler) }
)
}
func testTimeoutRequestBeforeSubscription() {
let scheduler = VirtualTimeScheduler()
testRequestBeforeSubscription(
inputType: Int.self,
shouldCrash: true,
{ $0.debounce(for: .nanoseconds(13), scheduler: scheduler) }
)
}
func testTimeoutReceiveSubscriptionTwice() throws {
let scheduler = VirtualTimeScheduler()
let helper = OperatorTestHelper(
publisherType: CustomPublisher.self,
initialDemand: nil,
receiveValueDemand: .none,
createSut: { $0.debounce(for: .nanoseconds(13), scheduler: scheduler) }
)
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited)])
let secondSubscription = CustomSubscription()
try XCTUnwrap(helper.publisher.subscriber)
.receive(subscription: secondSubscription)
XCTAssertEqual(secondSubscription.history, [.cancelled])
try XCTUnwrap(helper.publisher.subscriber)
.receive(subscription: helper.subscription)
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited),
.cancelled])
try XCTUnwrap(helper.downstreamSubscription).cancel()
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited),
.cancelled,
.cancelled])
}
func testTimeoutCancelBeforeSubscription() {
let scheduler = VirtualTimeScheduler()
testCancelBeforeSubscription(
inputType: Int.self,
shouldCrash: false,
{ $0.timeout(.nanoseconds(13), scheduler: scheduler) }
)
}
func testDebounceReflection() throws {
let scheduler = VirtualTimeScheduler()
let customMirror = hasUpdatedReflection
? expectedChildren(
("downstream", .contains("TrackingSubscriberBase")),
("downstreamDemand", "max(0)"),
("currentValue", "nil")
)
: expectedChildren(
("upstream", .contains("CustomConnectablePublisherBase")),
("downstream", .contains("TrackingSubscriberBase")),
("upstreamSubscription", .anything),
("downstreamDemand", "max(0)"),
("currentValue", "nil")
)
try testReflection(
parentInput: Int.self,
parentFailure: Error.self,
description: "Debounce",
customMirror: customMirror,
playgroundDescription: "Debounce",
{ $0.debounce(for: .nanoseconds(13), scheduler: scheduler) }
)
}
}
// FIXME: Remove this as soon as we switch our CI to the latest OS releases
private var hasUpdatedReflection: Bool {
#if OPENCOMBINE_COMPATIBILITY_TEST
if #available(macOS 10.16, iOS 14.0, *) {
return true
} else {
return false
}
#else
return true
#endif
}