Fix requested changes

This commit is contained in:
Joe Spadafora
2019-06-30 21:06:50 -04:00
parent d86201af2a
commit 71fefcbe34
4 changed files with 38 additions and 24 deletions
@@ -90,12 +90,7 @@ private final class _Decode<Upstream: Publisher,
}
func receive(completion: Subscribers.Completion<Failure>) {
switch completion {
case .finished:
downstream.receive(completion: .finished)
case .failure(let error):
downstream.receive(completion: .failure(error))
}
downstream.receive(completion: completion.eraseError())
}
func request(_ demand: Subscribers.Demand) {
@@ -8,9 +8,9 @@
extension Publishers {
public struct Encode<Upstream, Coder>: Publisher
where Upstream : Publisher,
Coder : TopLevelEncoder,
Upstream.Output : Encodable
where Upstream: Publisher,
Coder: TopLevelEncoder,
Upstream.Output: Encodable
{
/// The kind of errors this publisher might publish.
@@ -37,13 +37,14 @@ extension Publishers {
/// - Parameters:
/// - subscriber: The subscriber to attach to this `Publisher`.
/// once attached it can begin to receive values.
public func receive<Receiver: Subscriber>(subscriber: Receiver)
where Failure == Receiver.Failure, Output == Receiver.Input {
let encodeSubscriber = _Encode<Upstream, Receiver, Coder>(
downstream: subscriber,
encoder: encoder
)
upstream.receive(subscriber: encodeSubscriber)
public func receive<SubscriberType: Subscriber>(subscriber: SubscriberType)
where Failure == SubscriberType.Failure, Output == SubscriberType.Input
{
let encodeSubscriber = _Encode<Upstream, SubscriberType, Coder>(
downstream: subscriber,
encoder: encoder
)
upstream.receive(subscriber: encodeSubscriber)
}
}
}
@@ -108,7 +109,7 @@ extension Publisher {
public func encode<Coder>(
encoder: Coder
) -> Publishers.Encode<Self, Coder>
where Coder : TopLevelEncoder
where Coder: TopLevelEncoder
{
return Publishers.Encode(upstream: self, encoder: encoder)
}
@@ -52,10 +52,10 @@ final class DecodeTests: XCTestCase {
subject.send(failData)
// Then
guard case .completion(.failure) = subscriber.history[1] else {
XCTFail("Decode failure not found")
return
}
let decodeContext = DecodingError.Context(codingPath: [], debugDescription: "")
let decodeError = DecodingError.dataCorrupted(decodeContext)
XCTAssertEqual(subscriber.history, [.subscription(Subscriptions.empty),
.completion(.failure(decodeError))])
}
func testDemand() {
@@ -86,7 +86,7 @@ final class DecodeTests: XCTestCase {
)
decode.subscribe(tracking)
XCTAssert(downstreamSubscription != nil) // Removes unused variable warning
XCTAssertNotNil(downstreamSubscription) // Removes unused variable warning
XCTAssertEqual(subscription.history, [.requested(.unlimited)])
}
}
@@ -17,7 +17,8 @@ import OpenCombine
final class EncodeTests: XCTestCase {
static let allTests = [
("testEncodeWorks", testEncodeWorks),
("testDemand", testDemand)
("testDemand", testDemand),
("testEncodeSuccessHistory", testEncodeSuccessHistory)
]
private let jsonEncoder = JSONEncoder()
@@ -44,6 +45,23 @@ final class EncodeTests: XCTestCase {
XCTAssert(decoded == testValue)
}
func testEncodeSuccessHistory() throws {
// Given
let testValue = ["test": "TestDecodable"]
let subject = PassthroughSubject<[String: String], Error>()
let publisher = subject.encode(encoder: jsonEncoder)
let subscriber = TrackingSubscriberBase<Data, Error>()
// When
publisher.subscribe(subscriber)
subject.send(testValue)
// Then
let testData = try jsonEncoder.encode(testValue)
XCTAssertEqual(subscriber.history, [.subscription(Subscriptions.empty),
.value(testData)])
}
func testDemand() {
// `CustomSubscription` tracks all the requests and cancellations
// in its `history` property
@@ -71,7 +89,7 @@ final class EncodeTests: XCTestCase {
)
encode.subscribe(tracking)
XCTAssert(downstreamSubscription != nil) // Removes unused variable warning
XCTAssertNotNil(downstreamSubscription) // Removes unused variable warning
XCTAssertEqual(subscription.history, [.requested(.unlimited)])
}
}