diff --git a/Sources/OpenCombine/Publishers/Publishers.MapError.swift b/Sources/OpenCombine/Publishers/Publishers.MapError.swift index 5eb9d45..a1364a1 100644 --- a/Sources/OpenCombine/Publishers/Publishers.MapError.swift +++ b/Sources/OpenCombine/Publishers/Publishers.MapError.swift @@ -36,7 +36,7 @@ extension Publishers { where Failure == Downstream.Failure, Upstream.Output == Downstream.Input { - upstream.subscribe(Inner(downstream: subscriber,transform: transform)) + upstream.subscribe(Inner(downstream: subscriber, transform: transform)) } } } diff --git a/Sources/OpenCombine/Publishers/Publishers.Print.swift b/Sources/OpenCombine/Publishers/Publishers.Print.swift index c79e256..9082533 100644 --- a/Sources/OpenCombine/Publishers/Publishers.Print.swift +++ b/Sources/OpenCombine/Publishers/Publishers.Print.swift @@ -74,92 +74,82 @@ extension Publishers.Print { typealias Input = Downstream.Input typealias Failure = Downstream.Failure - private var _downstream: Downstream - private let _prefix: String - private var _stream: TextOutputStream - private var _upstreamSubscription: Subscription? - private let _printerLock = Lock(recursive: false) + /// A concrete type wrapper around an abstract stream. + private struct PrintTarget: TextOutputStream { + var stream: TextOutputStream + mutating func write(_ string: String) { + stream.write(string) + } + } + + private var downstream: Downstream + private let prefix: String + private var stream: PrintTarget? + private var subscription: Subscription? + private let lock = Lock(recursive: false) init(downstream: Downstream, prefix: String, stream: TextOutputStream?) { - _downstream = downstream - _prefix = prefix - _stream = stream ?? StdoutStream() + self.downstream = downstream + self.prefix = prefix.isEmpty ? "" : "\(prefix): " + self.stream = stream.map(PrintTarget.init) } func receive(subscription: Subscription) { - _log("receive subscription", value: subscription) - _upstreamSubscription = subscription - _downstream.receive(subscription: self) + log("\(prefix)receive subscription: (\(subscription))") + lock.do { + self.subscription = subscription + } + downstream.receive(subscription: self) } func receive(_ input: Input) -> Subscribers.Demand { - _log("receive value", value: input) - let demand = _downstream.receive(input) - _logDemand(demand, synchronous: true) + log("\(prefix)receive value: (\(input))") + let demand = downstream.receive(input) + + if let max = demand.max { + log("\(prefix)request max: (\(max)) (synchronous)") + } else { + log("\(prefix)request unlimited (synchronous)") + } + return demand } func receive(completion: Subscribers.Completion) { switch completion { case .finished: - _log("receive finished") + log("\(prefix)receive finished") case .failure(let error): - _log("receive error", value: error) + log("\(prefix)receive error: (\(error))") } - _downstream.receive(completion: completion) + downstream.receive(completion: completion) } func request(_ demand: Subscribers.Demand) { - _logDemand(demand, synchronous: false) - _upstreamSubscription?.request(demand) + if let max = demand.max { + log("\(prefix)request max: (\(max))") + } else { + log("\(prefix)request unlimited") + } + subscription?.request(demand) } func cancel() { - _log("receive cancel") - _upstreamSubscription?.cancel() - _upstreamSubscription = nil + log("\(prefix)receive cancel") + subscription?.cancel() + subscription = nil } var description: String { return "Print" } var customMirror: Mirror { return Mirror(self, children: EmptyCollection()) } - private func _log(_ description: String, - value: Any? = nil, - additionalInfo: String = "") { - _printerLock.do { - if !_prefix.isEmpty { - _stream.write(_prefix) - _stream.write(": ") - } - _stream.write(description) - if let value = value { - _stream.write(": (") - _stream.write(String(describing: value)) - _stream.write(")") - } - if !additionalInfo.isEmpty { - _stream.write(" (") - _stream.write(additionalInfo) - _stream.write(")") - } - _stream.write("\n") - } - } - - private func _logDemand(_ demand: Subscribers.Demand, synchronous: Bool) { - let synchronouslyStr = synchronous ? "synchronous" : "" - if let max = demand.max { - _log("request max", value: max, additionalInfo: synchronouslyStr) + private func log(_ text: String) { + if var stream = stream { + Swift.print(text, to: &stream) } else { - _log("request unlimited", additionalInfo: synchronouslyStr) + Swift.print("", text) } } } } - -private struct StdoutStream: TextOutputStream { - mutating func write(_ string: String) { - print(string, terminator: "") - } -} diff --git a/Tests/OpenCombineTests/PublisherTests/PrintTests.swift b/Tests/OpenCombineTests/PublisherTests/PrintTests.swift index 3d899bd..c34af2b 100644 --- a/Tests/OpenCombineTests/PublisherTests/PrintTests.swift +++ b/Tests/OpenCombineTests/PublisherTests/PrintTests.swift @@ -18,7 +18,7 @@ final class PrintTests: XCTestCase { func testPrintWithoutPrefix() { - let stream = StringStream() + let stream = HistoryStream() let subscription = CustomSubscription( onRequest: { _ in stream.write("callback request demand\n") }, onCancel: { stream.write("callback cancel subscription\n") } @@ -68,44 +68,81 @@ final class PrintTests: XCTestCase { .requested(.max(30)), .cancelled]) - let expectedOutput = """ - receive subscription: (CustomSubscription) - callback subscription - request unlimited - callback request demand - request max: (30) - callback request demand - receive cancel - callback cancel subscription - receive value: (1) - callback value - request max: (100) - request max: (2) (synchronous) - request max: (42) - receive value: (2) - callback value - request max: (100) - request max: (2) (synchronous) - receive finished - callback completion - request max: (12) - receive error: (failure) - callback completion - request max: (12) - receive value: (10) - callback value - request max: (100) - request unlimited (synchronous) - receive cancel - - """ + let expectedOutput = [ + "", + "receive subscription: (CustomSubscription)", + "\n", + "callback subscription\n", + "", + "request unlimited", + "\n", + "callback request demand\n", + "", + "request max: (30)", + "\n", + "callback request demand\n", + "", + "receive cancel", + "\n", + "callback cancel subscription\n", + "", + "receive value: (1)", + "\n", + "callback value\n", + "", + "request max: (100)", + "\n", + "", + "request max: (2) (synchronous)", + "\n", + "", + "request max: (42)", + "\n", + "", + "receive value: (2)", + "\n", + "callback value\n", + "", + "request max: (100)", + "\n", + "", + "request max: (2) (synchronous)", + "\n", + "", + "receive finished", + "\n", + "callback completion\n", + "", + "request max: (12)", + "\n", + "", + "receive error: (failure)", + "\n", + "callback completion\n", + "", + "request max: (12)", + "\n", + "", + "receive value: (10)", + "\n", + "callback value\n", + "", + "request max: (100)", + "\n", + "", + "request unlimited (synchronous)", + "\n", + "", + "receive cancel", + "\n" + ] XCTAssertEqual(stream.output.value, expectedOutput) } func testPrintWithPrefix() { - let stream = StringStream() + let stream = HistoryStream() let subscription = CustomSubscription( onRequest: { _ in stream.write("callback request demand\n") }, onCancel: { stream.write("callback cancel subscription\n") } @@ -155,44 +192,81 @@ final class PrintTests: XCTestCase { .requested(.max(30)), .cancelled]) - let expectedOutput = """ - 👉: receive subscription: (CustomSubscription) - callback subscription - 👉: request unlimited - callback request demand - 👉: request max: (30) - callback request demand - 👉: receive cancel - callback cancel subscription - 👉: receive value: (1) - callback value - 👉: request max: (100) - 👉: request max: (2) (synchronous) - 👉: request max: (42) - 👉: receive value: (2) - callback value - 👉: request max: (100) - 👉: request max: (2) (synchronous) - 👉: receive finished - callback completion - 👉: request max: (12) - 👉: receive error: (failure) - callback completion - 👉: request max: (12) - 👉: receive value: (10) - callback value - 👉: request max: (100) - 👉: request unlimited (synchronous) - 👉: receive cancel - - """ + let expectedOutput = [ + "", + "👉: receive subscription: (CustomSubscription)", + "\n", + "callback subscription\n", + "", + "👉: request unlimited", + "\n", + "callback request demand\n", + "", + "👉: request max: (30)", + "\n", + "callback request demand\n", + "", + "👉: receive cancel", + "\n", + "callback cancel subscription\n", + "", + "👉: receive value: (1)", + "\n", + "callback value\n", + "", + "👉: request max: (100)", + "\n", + "", + "👉: request max: (2) (synchronous)", + "\n", + "", + "👉: request max: (42)", + "\n", + "", + "👉: receive value: (2)", + "\n", + "callback value\n", + "", + "👉: request max: (100)", + "\n", + "", + "👉: request max: (2) (synchronous)", + "\n", + "", + "👉: receive finished", + "\n", + "callback completion\n", + "", + "👉: request max: (12)", + "\n", + "", + "👉: receive error: (failure)", + "\n", + "callback completion\n", + "", + "👉: request max: (12)", + "\n", + "", + "👉: receive value: (10)", + "\n", + "callback value\n", + "", + "👉: request max: (100)", + "\n", + "", + "👉: request unlimited (synchronous)", + "\n", + "", + "👉: receive cancel", + "\n" + ] XCTAssertEqual(stream.output.value, expectedOutput) } func testSynchronization() { - let stream = StringStream() + let stream = HistoryStream() let publisher = CustomPublisherBase(subscription: nil) let printer = publisher.print(to: stream) @@ -208,11 +282,11 @@ final class PrintTests: XCTestCase { } } -private final class StringStream: TextOutputStream { +private final class HistoryStream: TextOutputStream { - var output = Atomic("") + let output = Atomic([String]()) func write(_ string: String) { - output.do { $0.write(string) } + output.do { $0.append(string) } } }