Files
swift-nio/Sources/NIOFS/Internal/ParallelDirCopy.swift
George Barnett d963335ba0 Fail on early errors in parallel copy (#3435)
Motivation:

'copyItem' can be used to copy files/directories from a source to a
destination address and fails if the destination already exists. The
parallel version of it can wedge if a directory is being copied and the
directory can't be created at the destination path (for example, if a
file already exists at the destination).

The parallel copy works by feeding tasks (e.g. "copy this item from here
to there") into an async sequence and processing each task in a separate
child task within a task group. When copying directories a new directory
is created at the destination path and then each file within the source
directory is emitted as a separate item to process. Another item is sent
on the async sequence to indicate when the source directory is finished
with which may result in finishing the async sequence.

However, if creating the destination directory fails then that event
isn't sent. This results in the calling code never terminating the async
sequence and causes 'copyItem' to wedge.

Modifications:

- Use non-idempotent directory creation (copy item should fail if the
destination already exists, this was a regression introduced in
7124f096).
- Check whether to continue when a dir can't be created but always emit
the end of dir event
- If terminating when the task group hasn't reached its width limit then
check child tasks for errors

Result:

Parallel copy doesn't wedge
2025-11-05 08:47:05 +00:00

172 lines
7.3 KiB
Swift

//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension FileSystem {
/// Iterative implementation of a recursive parallel copy of the directory from `sourcePath` to
/// `destinationPath`.
///
/// The parallelism is solely at the level of individual items (files, symbolic links and
/// directories). A larger file is not 'split' into concurrent reads or writes.
///
/// If a symbolic link is encountered, then only the link is copied. If supported by the file
/// system, the copied items will preserve permissions and any extended attributes.
///
/// Note: `maxConcurrentOperations` is used as a hard (conservative) limit on the number of open
/// file descriptors at any point. Operations are assumed to consume 2 descriptors, so the
/// maximum open descriptors are `maxConcurrentOperations * 2`
@usableFromInline
func copyDirectoryParallel(
from sourcePath: FilePath,
to destinationPath: FilePath,
maxConcurrentOperations: Int,
shouldProceedAfterError:
@escaping @Sendable (
_ entry: DirectoryEntry,
_ error: Error
) async throws -> Void,
shouldCopyItem:
@escaping @Sendable (
_ source: DirectoryEntry,
_ destination: FilePath
) async -> Bool
) async throws {
// Implemented with NIOAsyncSequenceProducer rather than AsyncStream. It is approximately
// the same speed in the best case, but has significantly less variance.
// NIOAsyncSequenceProducer also enforces a multi-producer, single-consumer access pattern.
let copyRequiredQueue = NIOAsyncSequenceProducer.makeSequence(
elementType: DirCopyItem.self,
backPressureStrategy: NoBackPressureStrategy(),
finishOnDeinit: false,
delegate: DirCopyDelegate()
)
// We ignore the result of yield in all cases, because we are not implementing back
// pressure, and cancellation is dealt with separately.
@Sendable func yield(_ contentsOf: [DirCopyItem]) {
_ = copyRequiredQueue.source.yield(contentsOf: contentsOf)
}
// Kick-start the procees by enqueuing the root entry. The calling function already
// validated the root needed copying, so it is safe to force unwrap the value.
_ = copyRequiredQueue.source.yield(
.toCopy(from: .init(path: NIOFilePath(sourcePath), type: .directory)!, to: destinationPath)
)
// The processing of the very first item (the root) will increment this counter. Processing
// will finish when the counter hits zero again.
//
// This does not need to be a ManagedAtomic or similar because:
// - All state maintenance is done within the withThrowingTaskGroup closure
// - All actual file system work is done by tasks created on the `taskGroup`
var activeDirCount = 0
// Despite there being no 'result' for each operation, we cannot use a discarding task group,
// because we use the 'drain results' queue as a concurrency limiting side effect.
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
// Process each item in the current task.
//
// Side Effects:
// - Updates activeDirCount and finishes the stream if required.
// - Might add a task to `taskGroup`.
//
// Returns true if it added a task, false otherwise.
func onNextItem(_ item: DirCopyItem) -> Bool {
switch item {
case .endOfDir:
activeDirCount -= 1
if activeDirCount == 0 {
copyRequiredQueue.source.finish()
}
return false
case let .toCopy(from: from, to: to):
if from.type == .directory {
activeDirCount += 1
}
taskGroup.addTask {
try await copySelfAndEnqueueChildren(
from: from,
to: to,
yield: yield,
shouldProceedAfterError: shouldProceedAfterError,
shouldCopyItem: shouldCopyItem
)
}
return true
}
}
let iter = copyRequiredQueue.sequence.makeAsyncIterator()
// inProgress counts the number of tasks we have added to the task group.
// Get up to the maximum concurrency first.
// We haven't started monitoring for task completion, so inProgress is 'worst case'.
var inProgress = 0
while inProgress <= maxConcurrentOperations {
let item = await iter.next()
if let item = item {
if onNextItem(item) {
inProgress += 1
}
} else {
// Either we completed things before we hit the limit or we were cancelled. In
// the latter case we choose to propagate the cancellation clearly. This makes
// testing for it more reliable.
try Task.checkCancellation()
// If any child tasks failed throw up an error.
try await taskGroup.waitForAll()
return
}
}
// One in (finish) -> one out (start another), but only for items that trigger a task.
while let _ = try await taskGroup.next() {
var keepConsuming = true
while keepConsuming {
let item = await iter.next()
if let item = item {
keepConsuming = !onNextItem(item)
} else {
// We must check here, to accurately propagate the cancellation.
try Task.checkCancellation()
keepConsuming = false
}
}
}
}
}
}
/// An 'always ask for more' no back-pressure strategy for a ``NIOAsyncSequenceProducer``.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
private struct NoBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategy {
mutating func didYield(bufferDepth: Int) -> Bool { true }
mutating func didConsume(bufferDepth: Int) -> Bool { true }
}
/// We ignore back pressure, the inherent handle limiting in copyDirectoryParallel means it is unnecessary.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
private struct DirCopyDelegate: NIOAsyncSequenceProducerDelegate, Sendable {
@inlinable
func produceMore() {}
@inlinable
func didTerminate() {}
}