Introduce take() helper method

This commit is contained in:
Sergej Jaskiewicz
2021-08-28 16:59:31 +03:00
committed by Sergej Jaskiewicz
parent 02d1494ce9
commit f823f7b18c
23 changed files with 103 additions and 109 deletions
+5 -10
View File
@@ -108,8 +108,7 @@ public final class CurrentValueSubject<Output, Failure: Error>: Subject {
}
active = false
self.completion = completion
let downstreams = self.downstreams
self.downstreams.removeAll()
let downstreams = self.downstreams.take()
lock.unlock()
downstreams.forEach { conduit in
conduit.finish(completion: completion)
@@ -181,13 +180,11 @@ extension CurrentValueSubject {
override func finish(completion: Subscribers.Completion<Failure>) {
lock.lock()
guard let downstream = self.downstream else {
guard let downstream = self.downstream.take() else {
lock.unlock()
return
}
self.downstream = nil
let parent = self.parent
self.parent = nil
let parent = self.parent.take()
lock.unlock()
parent?.disassociate(self)
downstreamLock.lock()
@@ -227,13 +224,11 @@ extension CurrentValueSubject {
override func cancel() {
lock.lock()
if self.downstream == nil {
if downstream.take() == nil {
lock.unlock()
return
}
self.downstream = nil
let parent = self.parent
self.parent = nil
let parent = self.parent.take()
lock.unlock()
parent?.disassociate(self)
}
+3 -6
View File
@@ -43,8 +43,7 @@ public final class Future<Output, Failure: Error>: Publisher {
return
}
self.result = result
let downstreams = self.downstreams
self.downstreams.removeAll()
let downstreams = self.downstreams.take()
lock.unlock()
switch result {
case .success(let output):
@@ -182,13 +181,11 @@ extension Future {
override func cancel() {
lock.lock()
if self.downstream == nil {
if downstream.take() == nil {
lock.unlock()
return
}
self.downstream = nil
let parent = self.parent
self.parent = nil
let parent = self.parent.take()
lock.unlock()
parent?.disassociate(self)
}
@@ -11,6 +11,12 @@ internal enum ConduitList<Output, Failure: Error> {
case many(Set<ConduitBase<Output, Failure>>)
}
extension ConduitList: HasDefaultValue {
init() {
self = .empty
}
}
extension ConduitList {
internal mutating func insert(_ conduit: ConduitBase<Output, Failure>) {
switch self {
@@ -50,8 +56,4 @@ extension ConduitList {
self = .many(set)
}
}
internal mutating func removeAll() {
self = .empty
}
}
@@ -187,8 +187,7 @@ extension PublishedSubject {
return
}
self.downstream = nil
let parent = self.parent
self.parent = nil
let parent = self.parent.take()
lock.unlock()
parent?.disassociate(self)
}
+30
View File
@@ -0,0 +1,30 @@
//
// Utils.swift
//
//
// Created by Sergej Jaskiewicz on 28.08.2021.
//
internal protocol HasDefaultValue {
init()
}
extension HasDefaultValue {
@inline(__always)
internal mutating func take() -> Self {
let taken = self
self = .init()
return taken
}
}
extension Array: HasDefaultValue {}
extension Dictionary: HasDefaultValue {}
extension Optional: HasDefaultValue {
init() {
self = nil
}
}
+5 -10
View File
@@ -85,8 +85,7 @@ public final class PassthroughSubject<Output, Failure: Error>: Subject {
}
active = false
self.completion = completion
let downstreams = self.downstreams
self.downstreams.removeAll()
let downstreams = self.downstreams.take()
lock.unlock()
downstreams.forEach { conduit in
conduit.finish(completion: completion)
@@ -168,13 +167,11 @@ extension PassthroughSubject {
override func finish(completion: Subscribers.Completion<Failure>) {
lock.lock()
guard let downstream = self.downstream else {
guard let downstream = self.downstream.take() else {
lock.unlock()
return
}
self.downstream = nil
let parent = self.parent
self.parent = nil
let parent = self.parent.take()
lock.unlock()
parent?.disassociate(self)
downstreamLock.lock()
@@ -197,13 +194,11 @@ extension PassthroughSubject {
override func cancel() {
lock.lock()
if self.downstream == nil {
if downstream.take() == nil {
lock.unlock()
return
}
self.downstream = nil
let parent = self.parent
self.parent = nil
let parent = self.parent.take()
lock.unlock()
parent?.disassociate(self)
}
@@ -222,8 +222,7 @@ extension Publishers.Encode {
} catch {
lock.lock()
finished = true
let subscription = self.subscription
self.subscription = nil
let subscription = self.subscription.take()
lock.unlock()
subscription?.cancel()
downstream.receive(completion: .failure(error))
@@ -252,11 +251,10 @@ extension Publishers.Encode {
func cancel() {
lock.lock()
guard !finished, let subscription = self.subscription else {
guard !finished, let subscription = self.subscription.take() else {
lock.unlock()
return
}
self.subscription = nil
finished = true
lock.unlock()
subscription.cancel()
@@ -336,8 +334,7 @@ extension Publishers.Decode {
} catch {
lock.lock()
finished = true
let subscription = self.subscription
self.subscription = nil
let subscription = self.subscription.take()
lock.unlock()
subscription?.cancel()
downstream.receive(completion: .failure(error))
@@ -366,11 +363,10 @@ extension Publishers.Decode {
func cancel() {
lock.lock()
guard !finished, let subscription = self.subscription else {
guard !finished, let subscription = self.subscription.take() else {
lock.unlock()
return
}
self.subscription = nil
finished = true
lock.unlock()
subscription.cancel()
+1 -2
View File
@@ -293,8 +293,7 @@ extension Just {
func request(_ demand: Subscribers.Demand) {
demand.assertNonZero()
guard let downstream = self.downstream else { return }
self.downstream = nil
guard let downstream = self.downstream.take() else { return }
_ = downstream.receive(value)
downstream.receive(completion: .finished)
}
@@ -122,8 +122,7 @@ extension Optional.OCombine {
func request(_ demand: Subscribers.Demand) {
demand.assertNonZero()
guard let downstream = self.downstream else { return }
self.downstream = nil
guard let downstream = self.downstream.take() else { return }
_ = downstream.receive(output)
downstream.receive(completion: .finished)
}
@@ -331,9 +331,7 @@ extension Publishers.Buffer {
private func lockedPop(_ demand: Subscribers.Demand) -> [Input] {
assert(demand > 0)
guard let max = demand.max else {
let poppedValues = self.values
self.values = []
return poppedValues
return values.take()
}
let poppedValues = Array(values.prefix(max))
@@ -128,8 +128,7 @@ extension Publishers.CollectByCount {
lock.unlock()
return .none
}
let output = self.buffer
self.buffer = []
let output = self.buffer.take()
lock.unlock()
return downstream.receive(output) * count
}
@@ -143,8 +142,7 @@ extension Publishers.CollectByCount {
if buffer.isEmpty {
lock.unlock()
} else {
let buffer = self.buffer
self.buffer = []
let buffer = self.buffer.take()
lock.unlock()
_ = downstream.receive(buffer)
}
@@ -168,10 +166,9 @@ extension Publishers.CollectByCount {
func cancel() {
lock.lock()
if let subscription = self.subscription {
if let subscription = self.subscription.take() {
buffer = []
finished = true
self.subscription = nil
lock.unlock()
subscription.cancel()
} else {
@@ -212,8 +212,7 @@ extension Publishers.Debounce {
let generation = currentGeneration
currentValue = input
let due = scheduler.now.advanced(by: dueTime)
let previousCancellers = self.currentCancellers
currentCancellers.removeAll()
let previousCancellers = self.currentCancellers.take()
currentCancellers[generation] = .pending
lock.unlock()
let newCanceller = scheduler.schedule(after: due,
@@ -238,8 +237,7 @@ extension Publishers.Debounce {
return
}
state = .terminal
let previousCancellers = currentCancellers
currentCancellers.removeAll()
let previousCancellers = currentCancellers.take()
lock.unlock()
for canceller in previousCancellers.values {
canceller.cancel()
@@ -268,8 +266,7 @@ extension Publishers.Debounce {
return
}
state = .terminal
let previousCancellers = currentCancellers
currentCancellers.removeAll()
let previousCancellers = currentCancellers.take()
lock.unlock()
for canceller in previousCancellers.values {
canceller.cancel()
@@ -306,11 +303,10 @@ extension Publishers.Debounce {
return
}
guard let canceller = currentCancellers[generation] else {
guard let canceller = currentCancellers[generation].take() else {
lock.unlock()
return
}
currentCancellers[generation] = nil
let hasAnyDemand = downstreamDemand != .none
if hasAnyDemand {
@@ -139,8 +139,7 @@ extension Publishers.Drop {
func cancel() {
lock.lock()
let subscription = self.subscription
self.subscription = nil
let subscription = self.subscription.take()
lock.unlock()
subscription?.cancel()
}
@@ -204,8 +204,7 @@ extension Publishers.DropUntilOutput {
}
otherFinished = true
if let upstreamSubscription = self.upstreamSubscription {
self.upstreamSubscription = nil
if let upstreamSubscription = self.upstreamSubscription.take() {
lock.unlock()
upstreamSubscription.cancel()
} else {
@@ -229,10 +228,8 @@ extension Publishers.DropUntilOutput {
func cancel() {
lock.lock()
let upstreamSubscription = self.upstreamSubscription
let otherSubscription = self.otherSubscription
self.upstreamSubscription = nil
self.otherSubscription = nil
let upstreamSubscription = self.upstreamSubscription.take()
let otherSubscription = self.otherSubscription.take()
cancelled = true
lock.unlock()
@@ -221,8 +221,7 @@ extension Publishers.${instantiation} {
} catch {
lock.lock()
finished = true
let subscription = self.subscription
self.subscription = nil
let subscription = self.subscription.take()
lock.unlock()
subscription?.cancel()
downstream.receive(completion: .failure(error))
@@ -251,11 +250,10 @@ extension Publishers.${instantiation} {
func cancel() {
lock.lock()
guard !finished, let subscription = self.subscription else {
guard !finished, let subscription = self.subscription.take() else {
lock.unlock()
return
}
self.subscription = nil
finished = true
lock.unlock()
subscription.cancel()
@@ -304,8 +304,7 @@ extension Publishers.FlatMap {
}
if demand == .unlimited {
downstreamDemand = .unlimited
let buffer = self.buffer
self.buffer = []
let buffer = self.buffer.take()
let subscriptions = self.subscriptions
lock.unlock()
downstreamLock.lock()
@@ -361,10 +360,8 @@ extension Publishers.FlatMap {
return
}
cancelledOrCompleted = true
let subscriptions = self.subscriptions
self.subscriptions = [:]
let outerSubscription = self.outerSubscription
self.outerSubscription = nil
let subscriptions = self.subscriptions.take()
let outerSubscription = self.outerSubscription.take()
lock.unlock()
for (_, subscription) in subscriptions {
subscription.cancel()
@@ -450,8 +447,7 @@ extension Publishers.FlatMap {
return
}
cancelledOrCompleted = true
let subscriptions = self.subscriptions
self.subscriptions = [:]
let subscriptions = self.subscriptions.take()
lock.unlock()
for (i, subscription) in subscriptions where i != index {
subscription.cancel()
@@ -230,8 +230,7 @@ extension Publishers.SwitchToLatest {
return .none
}
if let currentInnerSubscription = self.currentInnerSubscription {
self.currentInnerSubscription = nil
if let currentInnerSubscription = self.currentInnerSubscription.take() {
lock.unlock()
currentInnerSubscription.cancel()
lock.lock()
@@ -272,8 +271,7 @@ extension Publishers.SwitchToLatest {
lock.unlock()
}
case .failure:
let currentInnerSubscription = self.currentInnerSubscription
self.currentInnerSubscription = nil
let currentInnerSubscription = self.currentInnerSubscription.take()
sentCompletion = true
lock.unlock()
currentInnerSubscription?.cancel()
@@ -298,10 +296,8 @@ extension Publishers.SwitchToLatest {
func cancel() {
lock.lock()
cancelled = true
let currentInnerSubscription = self.currentInnerSubscription
self.currentInnerSubscription = nil
let outerSubscription = self.outerSubscription
self.outerSubscription = nil
let currentInnerSubscription = self.currentInnerSubscription.take()
let outerSubscription = self.outerSubscription.take()
lock.unlock()
currentInnerSubscription?.cancel()
@@ -386,8 +382,7 @@ extension Publishers.SwitchToLatest {
return
}
cancelled = true
let outerSubscription = self.outerSubscription
self.outerSubscription = nil
let outerSubscription = self.outerSubscription.take()
sentCompletion = true
lock.unlock()
outerSubscription?.cancel()
@@ -292,11 +292,8 @@ extension Publishers.Throttle {
lastEmissionTime = scheduler.now
}
let pendingInput = self.pendingInput
let pendingCompletion = self.pendingCompletion
self.pendingInput = nil
self.pendingCompletion = nil
let pendingInput = self.pendingInput.take()
let pendingCompletion = self.pendingCompletion.take()
if pendingCompletion != nil {
state = .terminal
@@ -136,8 +136,7 @@ extension Result.OCombine {
func request(_ demand: Subscribers.Demand) {
demand.assertNonZero()
guard let downstream = self.downstream else { return }
self.downstream = nil
guard let downstream = self.downstream.take() else { return }
_ = downstream.receive(output)
downstream.receive(completion: .finished)
}
@@ -0,0 +1,14 @@
//
// Utils.swift
//
//
// Created by Sergej Jaskiewicz on 28.08.2021.
//
extension Optional {
internal mutating func take() -> Optional {
let taken = self
self = nil
return taken
}
}
@@ -200,13 +200,13 @@ extension Notification {
func cancel() {
lock.lock()
guard let center = self.center, let observation = self.observation else {
guard let center = self.center.take(),
let observation = self.observation.take()
else {
lock.unlock()
return
}
self.center = nil
self.object = nil
self.observation = nil
lock.unlock()
center.removeObserver(observation)
}
@@ -234,19 +234,16 @@ extension OperationQueue {
}
override func main() {
guard let action = self.action else { return }
self.action = nil
guard let action = self.action.take() else { return }
action()
guard let queue = self.queue,
let context = self.context
guard let queue = self.queue.take(),
let context = self.context.take()
else {
self.queue = nil
self.context = nil
return
}
self.queue = nil
self.context = nil
context.lock.lock()
if context.operation == nil {
@@ -202,11 +202,10 @@ extension Foundation.Timer {
func cancel() {
lock.lock()
if downstream == nil {
if downstream.take() == nil {
lock.unlock()
return
}
downstream = nil
lock.unlock()
parent?.disconnect(combineIdentifier)
}