Audit Optional.Publisher for thread safety (nothing to do here)

This commit is contained in:
Sergej Jaskiewicz
2019-09-21 23:18:47 +03:00
committed by Sergej Jaskiewicz
parent e00a6f06fc
commit ecd4766129
2 changed files with 63 additions and 15 deletions
@@ -116,35 +116,43 @@ extension Result {
}
extension Result.OCombine {
private final class Inner<Downstream: Subscriber>: Subscription,
CustomStringConvertible,
CustomReflectable
private final class Inner<Downstream: Subscriber>
: Subscription,
CustomStringConvertible,
CustomReflectable,
CustomPlaygroundDisplayConvertible
where Downstream.Input == Success, Downstream.Failure == Failure
{
private let _output: Downstream.Input
private var _downstream: Downstream?
// NOTE: this class has been audited for thread safety.
// Combine doesn't use any locking here.
init(value: Downstream.Input, downstream: Downstream) {
_output = value
_downstream = downstream
private var downstream: Downstream?
private let output: Success
init(value: Success, downstream: Downstream) {
self.output = value
self.downstream = downstream
}
func request(_ demand: Subscribers.Demand) {
if let downstream = _downstream, demand > 0 {
_ = downstream.receive(_output)
downstream.receive(completion: .finished)
_downstream = nil
}
demand.assertNonZero()
guard let downstream = self.downstream else { return }
self.downstream = nil
_ = downstream.receive(output)
downstream.receive(completion: .finished)
}
func cancel() {
_downstream = nil
downstream = nil
}
var description: String { return "Once" }
var customMirror: Mirror {
return Mirror(self, unlabeledChildren: CollectionOfOne(_output))
return Mirror(self, unlabeledChildren: CollectionOfOne(output))
}
var playgroundDescription: Any { return description }
}
}
@@ -100,6 +100,46 @@ final class ResultPublisherTests: XCTestCase {
.completion(.failure("failure"))])
}
func testRecursion() {
let success = makePublisher(42)
var subscription: Subscription?
let tracking = TrackingSubscriberBase<Int, TestingError>(
receiveSubscription: {
subscription = $0
$0.request(.unlimited)
},
receiveValue: { _ in
subscription?.request(.unlimited)
return .none
}
)
success.subscribe(tracking)
}
func testReflection() throws {
func testCustomMirror(_ mirror: Mirror) -> Bool {
return mirror.children.count == 1 &&
mirror.children.first!.label == nil &&
(mirror.children.first!.value as? Int) == 42
}
try testSubscriptionReflection(description: "Once",
customMirror: testCustomMirror,
playgroundDescription: "Once",
sut: Sut(42))
}
func testCrashesOnZeroDemand() {
assertCrashes {
let tracking =
TrackingSubscriberBase<Int, TestingError>(receiveSubscription: {
$0.request(.none)
})
makePublisher(42).subscribe(tracking)
}
}
func testLifecycle() {
var deinitCount = 0
do {