Simplify Filter and TryFilter using FilterProducer

This commit is contained in:
Sergej Jaskiewicz
2019-10-24 00:12:09 +03:00
committed by Sergej Jaskiewicz
parent cba3a69e74
commit 27da28f378
4 changed files with 572 additions and 341 deletions
@@ -97,8 +97,7 @@ extension Publishers {
where Upstream.Failure == Downstream.Failure,
Upstream.Output == Downstream.Input
{
let filter = Inner(downstream: subscriber, isIncluded: catching(isIncluded))
upstream.receive(subscriber: filter)
upstream.subscribe(Inner(downstream: subscriber, filter: isIncluded))
}
}
@@ -137,97 +136,53 @@ extension Publishers {
where Upstream.Output == Downstream.Input,
Downstream.Failure == Failure
{
let filter = Inner(downstream: subscriber, isIncluded: catching(isIncluded))
upstream.receive(subscriber: filter)
upstream.subscribe(Inner(downstream: subscriber, filter: isIncluded))
}
}
}
private class _Filter<Upstream: Publisher, Downstream: Subscriber>
: OperatorSubscription<Downstream>,
Subscription
where Upstream.Output == Downstream.Input
{
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
typealias Predicate = (Input) -> Result<Bool, Downstream.Failure>
private var _isIncluded: Predicate?
var isFinished: Bool {
return _isIncluded == nil
}
init(downstream: Downstream, isIncluded: @escaping Predicate) {
_isIncluded = isIncluded
super.init(downstream: downstream)
}
func receive(subscription: Subscription) {
upstreamSubscription = subscription
downstream.receive(subscription: self)
}
func receive(_ input: Input) -> Subscribers.Demand {
guard let isIncluded = _isIncluded else { return .none }
if upstreamSubscription == nil {
fatalError("Received input before subscription")
}
switch isIncluded(input) {
case .success(let isIncluded):
return isIncluded ? downstream.receive(input) : .max(1)
case .failure(let error):
downstream.receive(completion: .failure(error))
cancel()
return .none
}
}
func request(_ demand: Subscribers.Demand) {
guard !isFinished else { return }
upstreamSubscription?.request(demand)
}
override func cancel() {
_isIncluded = nil
upstreamSubscription?.cancel()
upstreamSubscription = nil
}
}
extension Publishers.Filter {
private final class Inner<Downstream: Subscriber>
: _Filter<Upstream, Downstream>,
Subscriber,
CustomStringConvertible
where Upstream.Output == Downstream.Input,
Upstream.Failure == Downstream.Failure {
: FilterProducer<Downstream,
Upstream.Output,
Upstream.Output,
Upstream.Failure,
(Upstream.Output) -> Bool>
where Upstream.Output == Downstream.Input, Upstream.Failure == Downstream.Failure
{
// NOTE: This class has been audited for thread safety
var description: String { return "Filter" }
func receive(completion: Subscribers.Completion<Failure>) {
guard !isFinished else { return }
downstream.receive(completion: completion)
override func receive(
newValue: Upstream.Output
) -> PartialCompletion<Upstream.Output?, Downstream.Failure> {
return filter(newValue) ? .continue(newValue) : .continue(nil)
}
override var description: String { return "Filter" }
}
}
extension Publishers.TryFilter {
private final class Inner<Downstream: Subscriber>
: _Filter<Upstream, Downstream>,
Subscriber,
CustomStringConvertible
where Upstream.Output == Downstream.Input, Downstream.Failure == Error {
: FilterProducer<Downstream,
Upstream.Output,
Upstream.Output,
Upstream.Failure,
(Upstream.Output) throws -> Bool>
where Downstream.Input == Upstream.Output, Downstream.Failure == Error
{
// NOTE: This class has been audited for thread safety
var description: String { return "TryFilter" }
func receive(completion: Subscribers.Completion<Failure>) {
guard !isFinished else { return }
downstream.receive(completion: completion.eraseError())
override func receive(
newValue: Upstream.Output
) -> PartialCompletion<Upstream.Output?, Error> {
do {
return try filter(newValue) ? .continue(newValue) : .continue(nil)
} catch {
return .failure(error)
}
}
override var description: String { return "TryFilter" }
}
}
@@ -65,7 +65,14 @@ class OperatorTestHelper<SourceValue,
},
receiveValue: { _ in receiveValueDemand }
)
tracking.onSubscribe = { self.downstreamSubscription = $0 }
tracking.onSubscribe = { [weak self] in
self?.downstreamSubscription = $0
}
sut.subscribe(tracking)
}
deinit {
downstreamSubscription?.cancel()
tracking.cancel()
}
}
@@ -41,6 +41,7 @@ typealias TrackingSubscriber = TrackingSubscriberBase<Int, TestingError>
@available(macOS 10.15, iOS 13.0, *)
final class TrackingSubscriberBase<Value, Failure: Error>
: Subscriber,
Cancellable,
CustomStringConvertible
{
@@ -172,6 +173,13 @@ final class TrackingSubscriberBase<Value, Failure: Error>
line: line)
}
func cancel() {
for subscription in subscriptions {
subscription.cancel()
}
history = []
}
deinit {
onDeinit?()
_onDeinit?()
@@ -16,341 +16,602 @@ import OpenCombine
@available(macOS 10.15, iOS 13.0, *)
final class FilterTests: XCTestCase {
func testFilterRemovesElements() {
// Given
let helper = OperatorTestHelper(publisherType: CustomPublisher.self,
initialDemand: .max(2),
receiveValueDemand: .none) {
$0.filter { $0.isMultiple(of: 2) }
}
// MARK: - Filter
// When
for i in 1...5 {
XCTAssertEqual(helper.publisher.send(i),
helper.sut.isIncluded(i) ? .none : .max(1))
}
// Then
XCTAssertEqual(helper.tracking.history, [.subscription("Filter"),
.value(2),
.value(4)])
}
func testTryFilterWorks() {
// Given
let helper = OperatorTestHelper(publisherType: CustomPublisher.self,
initialDemand: .max(2),
receiveValueDemand: .none) {
$0.tryFilter {
try $0.isMultiple(of: 2) && nonthrowingReturn($0)
}
}
// When
for i in 1...5 {
XCTAssertEqual(helper.publisher.send(i),
try helper.sut.isIncluded(i) ? .none : .max(1))
}
// Then
XCTAssertEqual(helper.tracking.history, [.subscription("TryFilter"),
.value(2),
.value(4)])
}
func testTryFilterCompletesWithErrorWhenThrown() {
// Given
let helper = OperatorTestHelper(publisherType: CustomPublisher.self,
initialDemand: .unlimited,
receiveValueDemand: .none) {
$0.tryFilter {
try failOnFive(value: $0)
}
}
// When
for i in 1...5 {
_ = helper.publisher.send(i)
}
helper.publisher.send(completion: .finished)
// Then
XCTAssertEqual(helper.tracking.history, [.subscription("TryFilter"),
.value(1),
.value(2),
.value(3),
.value(4),
.completion(.failure(TestingError.oops))
])
}
func testCanCompleteWithFinished() {
// Given
let helper = OperatorTestHelper(publisherType: CustomPublisher.self,
initialDemand: .unlimited,
receiveValueDemand: .none) {
$0.filter { _ in true }
}
// When
XCTAssertEqual(helper.publisher.send(1), .none)
helper.publisher.send(completion: .finished)
// Then
XCTAssertEqual(helper.tracking.history, [.subscription("Filter"),
.value(1),
.completion(.finished)])
}
func testFilterCanCompleteWithError() {
// Given
let helper = OperatorTestHelper(publisherType: CustomPublisher.self,
initialDemand: .unlimited,
receiveValueDemand: .none) {
$0.filter { _ in true }
}
// When
XCTAssertEqual(helper.publisher.send(1), .none)
helper.publisher.send(completion: .failure(.oops))
// Then
XCTAssertEqual(helper.tracking.history, [.subscription("Filter"),
.value(1),
.completion(.failure(.oops))])
}
func testTryFilterCanCompleteWithError() {
// Given
let helper = OperatorTestHelper(
publisherType: CustomPublisher.self,
initialDemand: .unlimited,
receiveValueDemand: .none,
createSut: {
$0.tryFilter { _ in true }
}
func testFilterBasicBehavior() {
var counter = 0
FilterTests.testBasicBehavior(
input: [(1, expectedDemand: .max(1)),
(2, expectedDemand: .max(4)),
(3, expectedDemand: .max(1)),
(4, expectedDemand: .max(4)),
(5, expectedDemand: .max(1)),
(6, expectedDemand: .max(4)),
(7, expectedDemand: .max(1)),
(8, expectedDemand: .max(4)),
(9, expectedDemand: .max(1))],
expectedSubscription: "Filter",
expectedOutput: [2, 4, 6, 8],
{ $0.filter { counter += 1; return $0.isMultiple(of: 2) } }
)
// When
XCTAssertEqual(helper.publisher.send(1), .none)
helper.publisher.send(completion: .failure(.oops))
// Then
XCTAssertEqual(helper.tracking.history,
[.subscription("TryFilter"),
.value(1),
.completion(.failure(TestingError.oops))])
XCTAssertEqual(counter, 9)
}
func testFilterSubscriptionDemand() {
let helper = OperatorTestHelper(
publisherType: CustomPublisher.self,
initialDemand: .max(3),
receiveValueDemand: .none,
createSut: {
$0.filter { $0.isMultiple(of: 2) }
}
)
XCTAssertEqual(helper.publisher.send(1), .max(1))
XCTAssertEqual(helper.publisher.send(2), .max(0))
XCTAssertEqual(helper.publisher.send(3), .max(1))
XCTAssertEqual(helper.publisher.send(4), .max(0))
XCTAssertEqual(helper.publisher.send(5), .max(1))
XCTAssertEqual(helper.publisher.send(6), .max(0))
XCTAssertEqual(helper.publisher.send(7), .max(1))
XCTAssertEqual(helper.publisher.send(8), .max(0))
XCTAssertEqual(helper.subscription.history, [.requested(.max(3))])
func testFilterUpstreamFinishesImmediately() {
FilterTests.testUpstreamFinishesImmediately(expectedSubscription: "Filter",
{ $0.filter(shouldNotBeCalled()) })
}
func testTryFilterSubscriptionDemand() {
let helper = OperatorTestHelper(publisherType: CustomPublisher.self,
initialDemand: .max(3),
receiveValueDemand: .none) {
$0.tryFilter { $0.isMultiple(of: 2) }
func testFilterUpstreamFinishesWithError() {
FilterTests.testUpstreamFinishesWithError(expectedSubscription: "Filter",
{ $0.filter(shouldNotBeCalled()) })
}
func testFilterDemand() {
FilterTests.testDemand { publisher, filter in
publisher.filter { filter($0) != nil }
}
XCTAssertEqual(helper.publisher.send(1), .max(1))
XCTAssertEqual(helper.publisher.send(2), .max(0))
XCTAssertEqual(helper.publisher.send(3), .max(1))
XCTAssertEqual(helper.publisher.send(4), .max(0))
XCTAssertEqual(helper.publisher.send(5), .max(1))
XCTAssertEqual(helper.publisher.send(6), .max(0))
XCTAssertEqual(helper.publisher.send(7), .max(1))
XCTAssertEqual(helper.publisher.send(8), .max(0))
}
func testFilterCancel() throws {
let helper = OperatorTestHelper(publisherType: CustomPublisher.self,
initialDemand: .unlimited,
receiveValueDemand: .none,
createSut: { $0.filter { $0.isMultiple(of: 2) } })
try XCTUnwrap(helper.downstreamSubscription).cancel()
XCTAssertEqual(helper.publisher.send(2), .none)
helper.publisher.send(completion: .finished)
XCTAssertEqual(helper.publisher.send(4), .none)
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited), .cancelled])
XCTAssertEqual(helper.tracking.history, [.subscription("Filter")])
func testFilterNoDemand() {
FilterTests.testNoDemand { $0.filter(shouldNotBeCalled()) }
}
func testTryFilterCancel() throws {
let helper = OperatorTestHelper(
publisherType: CustomPublisher.self,
initialDemand: .unlimited,
receiveValueDemand: .none,
createSut: {
$0.tryFilter { try failOnFive(value: $0) && $0.isMultiple(of: 2) }
}
func testFilterCancelAlreadyCancelled() throws {
try FilterTests.testCancelAlreadyCancelled(
expectedSubscription: "Filter",
{ $0.filter(shouldNotBeCalled()) }
)
try XCTUnwrap(helper.downstreamSubscription).cancel()
XCTAssertEqual(helper.publisher.send(2), .none)
helper.publisher.send(completion: .finished)
XCTAssertEqual(helper.publisher.send(4), .none)
XCTAssertEqual(helper.publisher.send(5), .none)
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited), .cancelled])
XCTAssertEqual(helper.tracking.history, [.subscription("TryFilter")])
}
func testCancelAlreadyCancelled() throws {
let helper = OperatorTestHelper(publisherType: CustomPublisher.self,
initialDemand: .unlimited,
receiveValueDemand: .none,
createSut: { $0.filter { $0.isMultiple(of: 2) } })
try XCTUnwrap(helper.downstreamSubscription).cancel()
try XCTUnwrap(helper.downstreamSubscription).request(.unlimited)
try XCTUnwrap(helper.downstreamSubscription).cancel()
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited), .cancelled])
}
func testFilterReceiveValueBeforeSubscription() {
testReceiveValueBeforeSubscription(value: 0,
shouldCrash: true,
expected: .crash,
{ $0.filter(shouldNotBeCalled()) })
}
func testFilterReceiveCompletionBeforeSubscription() {
testReceiveCompletionBeforeSubscription(
inputType: Int.self,
expected: .crash,
{ $0.filter(shouldNotBeCalled()) }
)
}
func testFilterRequestBeforeSubscription() {
testRequestBeforeSubscription(inputType: Int.self,
shouldCrash: true,
{ $0.filter(shouldNotBeCalled()) })
}
func testFilterCancelBeforeSubscription() {
testCancelBeforeSubscription(inputType: Int.self,
shouldCrash: false,
{ $0.filter(shouldNotBeCalled()) })
}
func testFilterReceiveSubscriptionTwice() throws {
try testReceiveSubscriptionTwice { $0.filter(shouldNotBeCalled()) }
}
func testFilterLifecycle() throws {
try testLifecycle(sendValue: 31,
cancellingSubscriptionReleasesSubscriber: false,
{ $0.filter { _ in true } })
}
func testFilterReflection() throws {
try testReflection(parentInput: Int.self,
parentFailure: Error.self,
description: "Filter",
customMirror: expectedChildren(
("downstream", .contains("TrackingSubscriberBase"))
),
playgroundDescription: "Filter",
{ $0.filter(shouldNotBeCalled()) })
}
// MARK: - TryFilter
func testTryFilterBasicBehavior() {
var counter = 0
FilterTests.testBasicBehavior(
input: [(1, expectedDemand: .max(1)),
(2, expectedDemand: .max(4)),
(3, expectedDemand: .max(1)),
(4, expectedDemand: .max(4)),
(5, expectedDemand: .max(1)),
(6, expectedDemand: .max(4)),
(7, expectedDemand: .max(1)),
(8, expectedDemand: .max(4)),
(9, expectedDemand: .max(1))],
expectedSubscription: "TryFilter",
expectedOutput: [2, 4, 6, 8],
{ $0.tryFilter { counter += 1; return $0.isMultiple(of: 2) } }
)
XCTAssertEqual(counter, 9)
}
func testTryFilterCompletesWithErrorWhenThrown() {
var counter = 0
func predicate(_ value: Int) throws -> Bool {
counter += 1
if value == 5 {
throw TestingError.oops
}
return value.isMultiple(of: 2)
}
let helper = OperatorTestHelper(publisherType: CustomPublisher.self,
initialDemand: .max(3),
receiveValueDemand: .max(42),
createSut: { $0.tryFilter(predicate) })
XCTAssertEqual(helper.publisher.send(1), .max(1))
XCTAssertEqual(helper.publisher.send(2), .max(42))
XCTAssertEqual(helper.publisher.send(3), .max(1))
XCTAssertEqual(helper.publisher.send(4), .max(42))
XCTAssertEqual(helper.subscription.history, [.requested(.max(3))])
XCTAssertEqual(helper.publisher.send(5), .none)
XCTAssertEqual(helper.subscription.history, [.requested(.max(3)), .cancelled])
XCTAssertEqual(helper.publisher.send(6), .none)
helper.publisher.send(completion: .finished)
helper.publisher.send(completion: .failure(.oops))
helper.downstreamSubscription?.request(.max(1000))
helper.downstreamSubscription?.cancel()
XCTAssertEqual(helper.tracking.history,
[.subscription("TryFilter"),
.value(2),
.value(4),
.completion(.failure(TestingError.oops))])
XCTAssertEqual(helper.subscription.history, [.requested(.max(3)), .cancelled])
}
func testTryFilterUpstreamFinishesImmediately() {
FilterTests.testUpstreamFinishesImmediately(expectedSubscription: "TryFilter",
{ $0.tryFilter(shouldNotBeCalled()) })
}
func testTryFilterUpstreamFinishesWithError() {
FilterTests.testUpstreamFinishesWithError(expectedSubscription: "TryFilter",
{ $0.tryFilter(shouldNotBeCalled()) })
}
func testTryFilterDemand() {
FilterTests.testDemand { publisher, filter in
publisher.tryFilter { filter($0) != nil }
}
}
func testTryFilterNoDemand() {
FilterTests.testNoDemand { $0.tryFilter(shouldNotBeCalled()) }
}
func testTryFilterCancelAlreadyCancelled() throws {
try FilterTests.testCancelAlreadyCancelled(
expectedSubscription: "TryFilter",
{ $0.tryFilter(shouldNotBeCalled()) }
)
}
func testTryFilterReceiveValueBeforeSubscription() {
testReceiveValueBeforeSubscription(value: 0,
shouldCrash: true,
expected: .crash,
{ $0.tryFilter(shouldNotBeCalled()) })
}
func testTryFilterReceiveCompletionBeforeSubscription() {
testReceiveCompletionBeforeSubscription(
inputType: Int.self,
expected: .crash,
{ $0.tryFilter(shouldNotBeCalled()) }
)
}
func testTryFilterRequestBeforeSubscription() {
testRequestBeforeSubscription(inputType: Int.self,
shouldCrash: true,
{ $0.tryFilter(shouldNotBeCalled()) })
}
func testTryFilterCancelBeforeSubscription() {
testCancelBeforeSubscription(inputType: Int.self,
shouldCrash: false,
{ $0.tryFilter(shouldNotBeCalled()) })
}
func testTryFilterReceiveSubscriptionTwice() throws {
try testReceiveSubscriptionTwice { $0.tryFilter(shouldNotBeCalled()) }
}
func testTryFilterLifecycle() throws {
try testLifecycle(sendValue: 31,
cancellingSubscriptionReleasesSubscriber: false,
{ $0.tryFilter { _ in true } })
}
func testTryFilterReflection() throws {
try testReflection(parentInput: Int.self,
parentFailure: Error.self,
description: "TryFilter",
customMirror: expectedChildren(
("downstream", .contains("TrackingSubscriberBase"))
),
playgroundDescription: "TryFilter",
{ $0.tryFilter(shouldNotBeCalled()) })
}
// MARK: - Operator specializations
func testFilterOperatorSpecializationForFilter() {
// Given
var counter = 0
let helper = OperatorTestHelper(publisherType: CustomPublisher.self,
initialDemand: .max(1),
receiveValueDemand: .none) {
initialDemand: .max(10),
receiveValueDemand: .max(16)) {
$0.filter {
$0.isMultiple(of: 3)
counter += 1
return $0.isMultiple(of: 2)
}.filter {
$0.isMultiple(of: 5)
counter += 1
return $0.isMultiple(of: 3)
}
}
// When
for i in 1...20 {
XCTAssertEqual(helper.publisher.send(i),
helper.sut.isIncluded(i) ? .none : .max(1))
}
XCTAssertEqual(helper.publisher.send(1), .max(1))
XCTAssertEqual(helper.publisher.send(2), .max(1))
XCTAssertEqual(helper.publisher.send(3), .max(1))
XCTAssertEqual(helper.publisher.send(4), .max(1))
XCTAssertEqual(helper.publisher.send(5), .max(1))
XCTAssertEqual(helper.publisher.send(6), .max(16))
XCTAssertEqual(helper.publisher.send(7), .max(1))
XCTAssertEqual(helper.publisher.send(8), .max(1))
XCTAssertEqual(helper.publisher.send(9), .max(1))
XCTAssertEqual(helper.publisher.send(10), .max(1))
// Then
XCTAssertEqual(helper.tracking.history, [.subscription("Filter"), .value(15)])
XCTAssertEqual(helper.tracking.history, [.subscription("Filter"), .value(6)])
XCTAssertEqual(helper.subscription.history, [.requested(.max(10))])
XCTAssertEqual(counter, 15)
XCTAssert(helper.sut.isIncluded(12))
XCTAssertEqual(counter, 17)
}
func testTryFilterOperatorSpecializationForFilter() {
// Given
var counter = 0
let helper = OperatorTestHelper(publisherType: CustomPublisher.self,
initialDemand: .max(1),
receiveValueDemand: .none) {
initialDemand: .max(10),
receiveValueDemand: .max(16)) {
$0.filter {
$0.isMultiple(of: 3)
counter += 1
return $0.isMultiple(of: 2)
}.tryFilter {
$0.isMultiple(of: 5)
counter += 1
return $0.isMultiple(of: 3)
}
}
// When
for i in 1...20 {
XCTAssertEqual(helper.publisher.send(i),
try helper.sut.isIncluded(i) ? .none : .max(1))
}
XCTAssertEqual(helper.publisher.send(1), .max(1))
XCTAssertEqual(helper.publisher.send(2), .max(1))
XCTAssertEqual(helper.publisher.send(3), .max(1))
XCTAssertEqual(helper.publisher.send(4), .max(1))
XCTAssertEqual(helper.publisher.send(5), .max(1))
XCTAssertEqual(helper.publisher.send(6), .max(16))
XCTAssertEqual(helper.publisher.send(7), .max(1))
XCTAssertEqual(helper.publisher.send(8), .max(1))
XCTAssertEqual(helper.publisher.send(9), .max(1))
XCTAssertEqual(helper.publisher.send(10), .max(1))
// Then
XCTAssertEqual(helper.tracking.history, [.subscription("TryFilter"), .value(15)])
XCTAssertEqual(helper.tracking.history, [.subscription("TryFilter"), .value(6)])
XCTAssertEqual(helper.subscription.history, [.requested(.max(10))])
XCTAssertEqual(counter, 15)
XCTAssert(try helper.sut.isIncluded(12))
XCTAssertEqual(counter, 17)
}
func testFilterOperatorSpecializationForTryFilter() {
// Given
var counter = 0
let helper = OperatorTestHelper(publisherType: CustomPublisher.self,
initialDemand: .max(1),
receiveValueDemand: .none) {
initialDemand: .max(10),
receiveValueDemand: .max(16)) {
$0.tryFilter {
$0.isMultiple(of: 3)
counter += 1
return $0.isMultiple(of: 2)
}.filter {
$0.isMultiple(of: 5)
counter += 1
return $0.isMultiple(of: 3)
}
}
// When
for i in 1...20 {
XCTAssertEqual(helper.publisher.send(i),
try helper.sut.isIncluded(i) ? .none : .max(1))
}
XCTAssertEqual(helper.publisher.send(1), .max(1))
XCTAssertEqual(helper.publisher.send(2), .max(1))
XCTAssertEqual(helper.publisher.send(3), .max(1))
XCTAssertEqual(helper.publisher.send(4), .max(1))
XCTAssertEqual(helper.publisher.send(5), .max(1))
XCTAssertEqual(helper.publisher.send(6), .max(16))
XCTAssertEqual(helper.publisher.send(7), .max(1))
XCTAssertEqual(helper.publisher.send(8), .max(1))
XCTAssertEqual(helper.publisher.send(9), .max(1))
XCTAssertEqual(helper.publisher.send(10), .max(1))
// Then
XCTAssertEqual(helper.tracking.history, [.subscription("TryFilter"), .value(15)])
XCTAssertEqual(helper.tracking.history, [.subscription("TryFilter"), .value(6)])
XCTAssertEqual(helper.subscription.history, [.requested(.max(10))])
XCTAssertEqual(counter, 15)
XCTAssert(try helper.sut.isIncluded(12))
XCTAssertEqual(counter, 17)
}
func testTryFilterOperatorSpecializationForTryFilter() {
// Given
var counter = 0
let helper = OperatorTestHelper(publisherType: CustomPublisher.self,
initialDemand: .max(3),
receiveValueDemand: .none) {
initialDemand: .max(10),
receiveValueDemand: .max(16)) {
$0.tryFilter {
$0.isMultiple(of: 3)
counter += 1
return $0.isMultiple(of: 2)
}.tryFilter {
$0.isMultiple(of: 5)
counter += 1
return $0.isMultiple(of: 3)
}
}
// When
for i in 1...20 {
XCTAssertEqual(helper.publisher.send(i),
try helper.sut.isIncluded(i) ? .none : .max(1))
XCTAssertEqual(helper.publisher.send(1), .max(1))
XCTAssertEqual(helper.publisher.send(2), .max(1))
XCTAssertEqual(helper.publisher.send(3), .max(1))
XCTAssertEqual(helper.publisher.send(4), .max(1))
XCTAssertEqual(helper.publisher.send(5), .max(1))
XCTAssertEqual(helper.publisher.send(6), .max(16))
XCTAssertEqual(helper.publisher.send(7), .max(1))
XCTAssertEqual(helper.publisher.send(8), .max(1))
XCTAssertEqual(helper.publisher.send(9), .max(1))
XCTAssertEqual(helper.publisher.send(10), .max(1))
XCTAssertEqual(helper.tracking.history, [.subscription("TryFilter"), .value(6)])
XCTAssertEqual(helper.subscription.history, [.requested(.max(10))])
XCTAssertEqual(counter, 15)
XCTAssert(try helper.sut.isIncluded(12))
XCTAssertEqual(counter, 17)
}
// MARK: - Generic tests
static func testBasicBehavior<Operator: Publisher, Input>(
input: [(Input, expectedDemand: Subscribers.Demand)],
expectedSubscription: StringSubscription,
expectedOutput: [Operator.Output],
_ makeOperator: (CustomPublisherBase<Input, TestingError>) -> Operator
) where Operator.Output: Equatable {
let helper = OperatorTestHelper(
publisherType: CustomPublisherBase<Input, TestingError>.self,
initialDemand: .max(2),
receiveValueDemand: .max(4),
createSut: makeOperator)
XCTAssertEqual(helper.subscription.history, [.requested(.max(2))])
for (value, expectedDemand) in input {
XCTAssertEqual(helper.publisher.send(value), expectedDemand)
}
// Then
XCTAssertEqual(helper.tracking.history, [.subscription("TryFilter"),
.value(15)])
helper.publisher.send(completion: .finished)
helper.publisher.send(completion: .failure(.oops))
typealias Event = TrackingSubscriberBase<Operator.Output, Operator.Failure>.Event
var expectedHistory = [Event.subscription(expectedSubscription)]
expectedHistory.append(contentsOf: expectedOutput.lazy.map(Event.value))
expectedHistory.append(.completion(.finished))
XCTAssertEqual(helper.tracking.history, expectedHistory)
}
static func testUpstreamFinishesWithError<Operator: Publisher>(
expectedSubscription: StringSubscription,
_ makeOperator: (CustomPublisherBase<Int, Error>) -> Operator
) where Operator.Output: Equatable, Operator.Failure == Error {
let helper = OperatorTestHelper(
publisherType: CustomPublisherBase<Int, Error>.self,
initialDemand: .max(3),
receiveValueDemand: .max(10),
createSut: makeOperator
)
XCTAssertEqual(helper.tracking.history, [.subscription(expectedSubscription)])
XCTAssertEqual(helper.subscription.history, [.requested(.max(3))])
helper.publisher.send(completion: .failure(TestingError.oops))
XCTAssertEqual(helper.tracking.history,
[.subscription(expectedSubscription),
.completion(.failure(TestingError.oops))])
XCTAssertEqual(helper.subscription.history, [.requested(.max(3))])
helper.publisher.send(completion: .failure(TestingError.oops))
XCTAssertEqual(helper.tracking.history,
[.subscription(expectedSubscription),
.completion(.failure(TestingError.oops))])
XCTAssertEqual(helper.subscription.history, [.requested(.max(3))])
XCTAssertEqual(helper.publisher.send(73), .none)
helper.publisher.send(completion: .finished)
XCTAssertEqual(helper.tracking.history,
[.subscription(expectedSubscription),
.completion(.failure(TestingError.oops))])
XCTAssertEqual(helper.subscription.history, [.requested(.max(3))])
}
static func testUpstreamFinishesImmediately<Operator: Publisher>(
expectedSubscription: StringSubscription,
_ makeOperator: (CustomPublisherBase<Int, Error>) -> Operator
) where Operator.Output: Equatable, Operator.Failure == Error {
let helper = OperatorTestHelper(
publisherType: CustomPublisherBase<Int, Error>.self,
initialDemand: .max(3),
receiveValueDemand: .max(10),
createSut: makeOperator
)
XCTAssertEqual(helper.tracking.history, [.subscription(expectedSubscription)])
XCTAssertEqual(helper.subscription.history, [.requested(.max(3))])
helper.publisher.send(completion: .finished)
XCTAssertEqual(helper.tracking.history, [.subscription(expectedSubscription),
.completion(.finished)])
XCTAssertEqual(helper.subscription.history, [.requested(.max(3))])
helper.publisher.send(completion: .failure(TestingError.oops))
XCTAssertEqual(helper.tracking.history, [.subscription(expectedSubscription),
.completion(.finished)])
XCTAssertEqual(helper.subscription.history, [.requested(.max(3))])
XCTAssertEqual(helper.publisher.send(73), .none)
XCTAssertEqual(helper.tracking.history, [.subscription(expectedSubscription),
.completion(.finished)])
XCTAssertEqual(helper.subscription.history, [.requested(.max(3))])
}
static func testDemand<Operator: Publisher>(
_ makeOperator: (CustomPublisherBase<String, Error>,
_ filter: @escaping (String) -> Int?) -> Operator
)
where Operator.Output: Equatable, Operator.Failure == Error
{
let subscription = CustomSubscription()
let publisher =
CustomPublisherBase<String, Error>(subscription: subscription)
let operatorPublisher = makeOperator(publisher, Int.init)
var downstreamSubscription: Subscription?
var demandOnReceiveValue = Subscribers.Demand.max(3)
let tracking = TrackingSubscriberBase<Operator.Output, Error>(
receiveSubscription: {
$0.request(.max(5))
downstreamSubscription = $0
},
receiveValue: { _ in demandOnReceiveValue }
)
operatorPublisher.subscribe(tracking)
XCTAssertNotNil(downstreamSubscription)
XCTAssertEqual(subscription.history, [.requested(.max(5))])
// unsatisfied demand = 5
XCTAssertEqual(publisher.send("a"), .max(1))
XCTAssertEqual(subscription.history, [.requested(.max(5))])
// unsatisfied demand = 5
XCTAssertEqual(publisher.send("1"), .max(3))
XCTAssertEqual(subscription.history, [.requested(.max(5))])
// unsatisfied demand = 5 - 1 + 3 = 7
demandOnReceiveValue = .max(2)
XCTAssertEqual(publisher.send("2"), demandOnReceiveValue)
XCTAssertEqual(subscription.history, [.requested(.max(5))])
// unsatisfied demand = 7 - 1 + 2 = 8
demandOnReceiveValue = .max(1)
XCTAssertEqual(publisher.send("3"), demandOnReceiveValue)
XCTAssertEqual(subscription.history, [.requested(.max(5))])
// unsatisfied demand = 8 - 1 + 1 = 8
XCTAssertEqual(publisher.send("b"), .max(1))
XCTAssertEqual(subscription.history, [.requested(.max(5))])
// unsatisfied demand = 8
downstreamSubscription?.request(.max(15))
downstreamSubscription?.request(.max(5))
XCTAssertEqual(subscription.history, [.requested(.max(5)),
.requested(.max(15)),
.requested(.max(5))])
// unsatisfied demand = 8 + 15 + 5 = 28
demandOnReceiveValue = .none
XCTAssertEqual(publisher.send("4"), demandOnReceiveValue)
XCTAssertEqual(subscription.history, [.requested(.max(5)),
.requested(.max(15)),
.requested(.max(5))])
// unsatisfied demand = 28 - 1 + 0 = 27
downstreamSubscription?.request(.max(121))
XCTAssertEqual(subscription.history, [.requested(.max(5)),
.requested(.max(15)),
.requested(.max(5)),
.requested(.max(121))])
// unsatisfied demand = 27 + 121 = 148
XCTAssertEqual(publisher.send("c"), .max(1))
XCTAssertEqual(subscription.history, [.requested(.max(5)),
.requested(.max(15)),
.requested(.max(5)),
.requested(.max(121))])
// unsatisfied demand = 148
downstreamSubscription?.cancel()
downstreamSubscription?.cancel()
XCTAssertEqual(subscription.history, [.requested(.max(5)),
.requested(.max(15)),
.requested(.max(5)),
.requested(.max(121)),
.cancelled])
downstreamSubscription?.request(.max(3))
XCTAssertEqual(subscription.history, [.requested(.max(5)),
.requested(.max(15)),
.requested(.max(5)),
.requested(.max(121)),
.cancelled])
demandOnReceiveValue = .max(80)
XCTAssertEqual(publisher.send("8"), .none)
}
static func testNoDemand<Operator: Publisher>(
_ makeOperator: (CustomPublisherBase<Int, Error>) -> Operator
)
where Operator.Output: Equatable, Operator.Failure == Error
{
let subscription = CustomSubscription()
let publisher = CustomPublisherBase<Int, Error>(subscription: subscription)
let operatorPublisher = makeOperator(publisher)
let tracking = TrackingSubscriberBase<Operator.Output, Error>()
operatorPublisher.subscribe(tracking)
XCTAssertTrue(subscription.history.isEmpty)
}
static func testCancelAlreadyCancelled<Operator: Publisher>(
expectedSubscription: StringSubscription,
_ makeOperator: (CustomPublisherBase<Int, Error>) -> Operator
) throws
where Operator.Output: Equatable, Operator.Failure == Error
{
let helper = OperatorTestHelper(
publisherType: CustomPublisherBase<Int, Error>.self,
initialDemand: .unlimited,
receiveValueDemand: .none,
createSut: makeOperator
)
try XCTUnwrap(helper.downstreamSubscription).cancel()
XCTAssertEqual(helper.publisher.send(42), .none)
helper.publisher.send(completion: .finished)
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited),
.cancelled])
try XCTUnwrap(helper.downstreamSubscription).request(.unlimited)
try XCTUnwrap(helper.downstreamSubscription).cancel()
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited),
.cancelled])
helper.publisher.send(completion: .failure(TestingError.oops))
helper.publisher.send(completion: .finished)
XCTAssertEqual(helper.subscription.history, [.requested(.unlimited),
.cancelled])
XCTAssertEqual(helper.tracking.history, [.subscription(expectedSubscription)])
}
}
private func nonthrowingReturn(_ value: Int) throws -> Bool {
return true
}
private func failOnFive(value: Int) throws -> Bool {
if value == 5 {
throw TestingError.oops
}
return true
}