Fix semantics of Publishers.Print

This commit is contained in:
Sergej Jaskiewicz
2019-09-16 15:31:29 +03:00
committed by Sergej Jaskiewicz
parent 8a39f35d3f
commit ec037dbb3d
3 changed files with 189 additions and 125 deletions
@@ -36,7 +36,7 @@ extension Publishers {
where Failure == Downstream.Failure,
Upstream.Output == Downstream.Input
{
upstream.subscribe(Inner(downstream: subscriber,transform: transform))
upstream.subscribe(Inner(downstream: subscriber, transform: transform))
}
}
}
@@ -74,92 +74,82 @@ extension Publishers.Print {
typealias Input = Downstream.Input
typealias Failure = Downstream.Failure
private var _downstream: Downstream
private let _prefix: String
private var _stream: TextOutputStream
private var _upstreamSubscription: Subscription?
private let _printerLock = Lock(recursive: false)
/// A concrete type wrapper around an abstract stream.
private struct PrintTarget: TextOutputStream {
var stream: TextOutputStream
mutating func write(_ string: String) {
stream.write(string)
}
}
private var downstream: Downstream
private let prefix: String
private var stream: PrintTarget?
private var subscription: Subscription?
private let lock = Lock(recursive: false)
init(downstream: Downstream, prefix: String, stream: TextOutputStream?) {
_downstream = downstream
_prefix = prefix
_stream = stream ?? StdoutStream()
self.downstream = downstream
self.prefix = prefix.isEmpty ? "" : "\(prefix): "
self.stream = stream.map(PrintTarget.init)
}
func receive(subscription: Subscription) {
_log("receive subscription", value: subscription)
_upstreamSubscription = subscription
_downstream.receive(subscription: self)
log("\(prefix)receive subscription: (\(subscription))")
lock.do {
self.subscription = subscription
}
downstream.receive(subscription: self)
}
func receive(_ input: Input) -> Subscribers.Demand {
_log("receive value", value: input)
let demand = _downstream.receive(input)
_logDemand(demand, synchronous: true)
log("\(prefix)receive value: (\(input))")
let demand = downstream.receive(input)
if let max = demand.max {
log("\(prefix)request max: (\(max)) (synchronous)")
} else {
log("\(prefix)request unlimited (synchronous)")
}
return demand
}
func receive(completion: Subscribers.Completion<Failure>) {
switch completion {
case .finished:
_log("receive finished")
log("\(prefix)receive finished")
case .failure(let error):
_log("receive error", value: error)
log("\(prefix)receive error: (\(error))")
}
_downstream.receive(completion: completion)
downstream.receive(completion: completion)
}
func request(_ demand: Subscribers.Demand) {
_logDemand(demand, synchronous: false)
_upstreamSubscription?.request(demand)
if let max = demand.max {
log("\(prefix)request max: (\(max))")
} else {
log("\(prefix)request unlimited")
}
subscription?.request(demand)
}
func cancel() {
_log("receive cancel")
_upstreamSubscription?.cancel()
_upstreamSubscription = nil
log("\(prefix)receive cancel")
subscription?.cancel()
subscription = nil
}
var description: String { return "Print" }
var customMirror: Mirror { return Mirror(self, children: EmptyCollection()) }
private func _log(_ description: String,
value: Any? = nil,
additionalInfo: String = "") {
_printerLock.do {
if !_prefix.isEmpty {
_stream.write(_prefix)
_stream.write(": ")
}
_stream.write(description)
if let value = value {
_stream.write(": (")
_stream.write(String(describing: value))
_stream.write(")")
}
if !additionalInfo.isEmpty {
_stream.write(" (")
_stream.write(additionalInfo)
_stream.write(")")
}
_stream.write("\n")
}
}
private func _logDemand(_ demand: Subscribers.Demand, synchronous: Bool) {
let synchronouslyStr = synchronous ? "synchronous" : ""
if let max = demand.max {
_log("request max", value: max, additionalInfo: synchronouslyStr)
private func log(_ text: String) {
if var stream = stream {
Swift.print(text, to: &stream)
} else {
_log("request unlimited", additionalInfo: synchronouslyStr)
Swift.print("", text)
}
}
}
}
private struct StdoutStream: TextOutputStream {
mutating func write(_ string: String) {
print(string, terminator: "")
}
}
@@ -18,7 +18,7 @@ final class PrintTests: XCTestCase {
func testPrintWithoutPrefix() {
let stream = StringStream()
let stream = HistoryStream()
let subscription = CustomSubscription(
onRequest: { _ in stream.write("callback request demand\n") },
onCancel: { stream.write("callback cancel subscription\n") }
@@ -68,44 +68,81 @@ final class PrintTests: XCTestCase {
.requested(.max(30)),
.cancelled])
let expectedOutput = """
receive subscription: (CustomSubscription)
callback subscription
request unlimited
callback request demand
request max: (30)
callback request demand
receive cancel
callback cancel subscription
receive value: (1)
callback value
request max: (100)
request max: (2) (synchronous)
request max: (42)
receive value: (2)
callback value
request max: (100)
request max: (2) (synchronous)
receive finished
callback completion
request max: (12)
receive error: (failure)
callback completion
request max: (12)
receive value: (10)
callback value
request max: (100)
request unlimited (synchronous)
receive cancel
"""
let expectedOutput = [
"",
"receive subscription: (CustomSubscription)",
"\n",
"callback subscription\n",
"",
"request unlimited",
"\n",
"callback request demand\n",
"",
"request max: (30)",
"\n",
"callback request demand\n",
"",
"receive cancel",
"\n",
"callback cancel subscription\n",
"",
"receive value: (1)",
"\n",
"callback value\n",
"",
"request max: (100)",
"\n",
"",
"request max: (2) (synchronous)",
"\n",
"",
"request max: (42)",
"\n",
"",
"receive value: (2)",
"\n",
"callback value\n",
"",
"request max: (100)",
"\n",
"",
"request max: (2) (synchronous)",
"\n",
"",
"receive finished",
"\n",
"callback completion\n",
"",
"request max: (12)",
"\n",
"",
"receive error: (failure)",
"\n",
"callback completion\n",
"",
"request max: (12)",
"\n",
"",
"receive value: (10)",
"\n",
"callback value\n",
"",
"request max: (100)",
"\n",
"",
"request unlimited (synchronous)",
"\n",
"",
"receive cancel",
"\n"
]
XCTAssertEqual(stream.output.value, expectedOutput)
}
func testPrintWithPrefix() {
let stream = StringStream()
let stream = HistoryStream()
let subscription = CustomSubscription(
onRequest: { _ in stream.write("callback request demand\n") },
onCancel: { stream.write("callback cancel subscription\n") }
@@ -155,44 +192,81 @@ final class PrintTests: XCTestCase {
.requested(.max(30)),
.cancelled])
let expectedOutput = """
👉: receive subscription: (CustomSubscription)
callback subscription
👉: request unlimited
callback request demand
👉: request max: (30)
callback request demand
👉: receive cancel
callback cancel subscription
👉: receive value: (1)
callback value
👉: request max: (100)
👉: request max: (2) (synchronous)
👉: request max: (42)
👉: receive value: (2)
callback value
👉: request max: (100)
👉: request max: (2) (synchronous)
👉: receive finished
callback completion
👉: request max: (12)
👉: receive error: (failure)
callback completion
👉: request max: (12)
👉: receive value: (10)
callback value
👉: request max: (100)
👉: request unlimited (synchronous)
👉: receive cancel
"""
let expectedOutput = [
"",
"👉: receive subscription: (CustomSubscription)",
"\n",
"callback subscription\n",
"",
"👉: request unlimited",
"\n",
"callback request demand\n",
"",
"👉: request max: (30)",
"\n",
"callback request demand\n",
"",
"👉: receive cancel",
"\n",
"callback cancel subscription\n",
"",
"👉: receive value: (1)",
"\n",
"callback value\n",
"",
"👉: request max: (100)",
"\n",
"",
"👉: request max: (2) (synchronous)",
"\n",
"",
"👉: request max: (42)",
"\n",
"",
"👉: receive value: (2)",
"\n",
"callback value\n",
"",
"👉: request max: (100)",
"\n",
"",
"👉: request max: (2) (synchronous)",
"\n",
"",
"👉: receive finished",
"\n",
"callback completion\n",
"",
"👉: request max: (12)",
"\n",
"",
"👉: receive error: (failure)",
"\n",
"callback completion\n",
"",
"👉: request max: (12)",
"\n",
"",
"👉: receive value: (10)",
"\n",
"callback value\n",
"",
"👉: request max: (100)",
"\n",
"",
"👉: request unlimited (synchronous)",
"\n",
"",
"👉: receive cancel",
"\n"
]
XCTAssertEqual(stream.output.value, expectedOutput)
}
func testSynchronization() {
let stream = StringStream()
let stream = HistoryStream()
let publisher = CustomPublisherBase<Int, Never>(subscription: nil)
let printer = publisher.print(to: stream)
@@ -208,11 +282,11 @@ final class PrintTests: XCTestCase {
}
}
private final class StringStream: TextOutputStream {
private final class HistoryStream: TextOutputStream {
var output = Atomic("")
let output = Atomic([String]())
func write(_ string: String) {
output.do { $0.write(string) }
output.do { $0.append(string) }
}
}