From defff117838c772c5380730f253f4388d3129358 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Mon, 3 Feb 2025 10:57:10 +0000 Subject: [PATCH] Make _NIOFileSystem strict concurrency compatible Motivation: We're continuing out Strict Concurrency journey, making sure users of NIO can write data-race-free code. Modifications: - Added some missing Sendable annotations in NIOAsyncSequenceProducer - Made BufferedStream unconditionally Sendable, and required its Element type to also be Sendable. The prior constraint wasn't actually correct. We always behaved as though the element types were Sendable, by passing them into continuations. This cleans things up. - Made AnyAsyncSequence Sendable, which it needs to be. - Made BufferedOrAnyStream Sendable, which it needs to be. - Made DirectoryEntries explicitly Sendable, which it was. - Made DirectoryEntries.Batched explicitly Sendable. Result: Better concurrency-safety --- Package.swift | 2 +- .../AsyncSequences/NIOAsyncSequenceProducer.swift | 4 ++-- Sources/NIOFileSystem/DirectoryEntries.swift | 10 ++++++---- .../NIOFileSystem/Internal/BufferedOrAnyStream.swift | 10 +++++----- Sources/NIOFileSystem/Internal/BufferedStream.swift | 12 ++++++------ 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/Package.swift b/Package.swift index 74d00fc68ad..298dd10b6f4 100644 --- a/Package.swift +++ b/Package.swift @@ -246,7 +246,7 @@ let package = Package( path: "Sources/NIOFileSystem", exclude: includePrivacyManifest ? [] : ["PrivacyInfo.xcprivacy"], resources: includePrivacyManifest ? [.copy("PrivacyInfo.xcprivacy")] : [], - swiftSettings: [ + swiftSettings: strictConcurrencySettings + [ .define("ENABLE_MOCKING", .when(configuration: .debug)) ] ), diff --git a/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift b/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift index e6484d84642..acc3aff5e78 100644 --- a/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift +++ b/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift @@ -105,7 +105,7 @@ public struct NIOAsyncSequenceProducer< /// to yield new elements to the sequence. /// 2. The ``sequence`` which is the actual `AsyncSequence` and /// should be passed to the consumer. - public struct NewSequence { + public struct NewSequence: Sendable { /// The source of the ``NIOAsyncSequenceProducer`` used to yield and finish. public let source: Source /// The actual sequence which should be passed to the consumer. @@ -268,7 +268,7 @@ extension NIOAsyncSequenceProducer { } /// The result of a call to ``NIOAsyncSequenceProducer/Source/yield(_:)``. - public enum YieldResult: Hashable { + public enum YieldResult: Hashable, Sendable { /// Indicates that the caller should produce more elements for now. The delegate's ``NIOAsyncSequenceProducerDelegate/produceMore()`` /// will **NOT** get called, since the demand was already signalled through this ``NIOAsyncSequenceProducer/Source/YieldResult``. case produceMore diff --git a/Sources/NIOFileSystem/DirectoryEntries.swift b/Sources/NIOFileSystem/DirectoryEntries.swift index 3830d94abd7..1c0fa9d0b71 100644 --- a/Sources/NIOFileSystem/DirectoryEntries.swift +++ b/Sources/NIOFileSystem/DirectoryEntries.swift @@ -21,7 +21,7 @@ import SystemPackage /// An `AsyncSequence` of entries in a directory. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -public struct DirectoryEntries: AsyncSequence { +public struct DirectoryEntries: AsyncSequence, Sendable { public typealias AsyncIterator = DirectoryIterator public typealias Element = DirectoryEntry @@ -35,7 +35,8 @@ public struct DirectoryEntries: AsyncSequence { /// Creates a ``DirectoryEntries`` sequence by wrapping an `AsyncSequence` of _batches_ of /// directory entries. - public init(wrapping sequence: S) where S.Element == Batched.Element { + @preconcurrency + public init(wrapping sequence: S) where S.Element == Batched.Element { self.batchedSequence = Batched(wrapping: sequence) } @@ -85,7 +86,7 @@ extension DirectoryEntries { /// The ``Batched`` sequence uses `Array` as its element type rather /// than `DirectoryEntry`. This can enable better performance by reducing the number of /// executor hops at the cost of ease-of-use. - public struct Batched: AsyncSequence { + public struct Batched: AsyncSequence, Sendable { public typealias AsyncIterator = BatchedIterator public typealias Element = [DirectoryEntry] @@ -93,7 +94,8 @@ extension DirectoryEntries { /// Creates a ``DirectoryEntries/Batched`` sequence by wrapping an `AsyncSequence` /// of directory entry batches. - public init(wrapping sequence: S) where S.Element == Element { + @preconcurrency + public init(wrapping sequence: S) where S.Element == Element { self.stream = BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>(wrapping: sequence) } diff --git a/Sources/NIOFileSystem/Internal/BufferedOrAnyStream.swift b/Sources/NIOFileSystem/Internal/BufferedOrAnyStream.swift index d28b756b96e..50e097b931e 100644 --- a/Sources/NIOFileSystem/Internal/BufferedOrAnyStream.swift +++ b/Sources/NIOFileSystem/Internal/BufferedOrAnyStream.swift @@ -16,7 +16,7 @@ import NIOCore /// Wraps a ``NIOThrowingAsyncSequenceProducer`` or ``AnyAsyncSequence``. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -internal enum BufferedOrAnyStream { +internal enum BufferedOrAnyStream: Sendable { typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer< Element, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, Delegate > @@ -28,7 +28,7 @@ internal enum BufferedOrAnyStream(wrapping stream: S) where S.Element == Element { + internal init(wrapping stream: S) where S.Element == Element { self = .anyAsyncSequence(AnyAsyncSequence(wrapping: stream)) } @@ -69,10 +69,10 @@ internal enum BufferedOrAnyStream: AsyncSequence { - private let _makeAsyncIterator: () -> AsyncIterator +internal struct AnyAsyncSequence: AsyncSequence, Sendable { + private let _makeAsyncIterator: @Sendable () -> AsyncIterator - internal init(wrapping sequence: S) where S.Element == Element { + internal init(wrapping sequence: S) where S.Element == Element { self._makeAsyncIterator = { AsyncIterator(wrapping: sequence.makeAsyncIterator()) } diff --git a/Sources/NIOFileSystem/Internal/BufferedStream.swift b/Sources/NIOFileSystem/Internal/BufferedStream.swift index d450b00b581..0195fd0efc5 100644 --- a/Sources/NIOFileSystem/Internal/BufferedStream.swift +++ b/Sources/NIOFileSystem/Internal/BufferedStream.swift @@ -118,7 +118,7 @@ import NIOConcurrencyHelpers /// } /// @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -internal struct BufferedStream { +internal struct BufferedStream { final class _Backing: Sendable { let storage: _BackPressuredStorage @@ -199,7 +199,7 @@ extension BufferedStream: AsyncSequence { } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -extension BufferedStream: Sendable where Element: Sendable {} +extension BufferedStream: Sendable {} internal struct _ManagedCriticalState: @unchecked Sendable { let lock: NIOLockedValueBox @@ -303,7 +303,7 @@ extension BufferedStream { /// - Parameter sequence: The elements to write to the asynchronous stream. /// - Returns: The result that indicates if more elements should be produced at this time. internal func write(contentsOf sequence: S) throws -> WriteResult - where Element == S.Element, S: Sequence { + where Element == S.Element, S: Sequence, Element: Sendable { try self._backing.storage.write(contentsOf: sequence) } @@ -363,7 +363,7 @@ extension BufferedStream { internal func write( contentsOf sequence: S, onProduceMore: @escaping @Sendable (Result) -> Void - ) where Element == S.Element, S: Sequence { + ) where Element == S.Element, S: Sequence, Element: Sendable { do { let writeResult = try self.write(contentsOf: sequence) @@ -407,7 +407,7 @@ extension BufferedStream { /// - Parameters: /// - sequence: The elements to write to the asynchronous stream. internal func write(contentsOf sequence: S) async throws - where Element == S.Element, S: Sequence { + where Element == S.Element, S: Sequence, Element: Sendable { let writeResult = try { try self.write(contentsOf: sequence) }() switch writeResult { @@ -458,7 +458,7 @@ extension BufferedStream { /// - Parameters: /// - sequence: The elements to write to the asynchronous stream. internal func write(contentsOf sequence: S) async throws - where Element == S.Element, S: AsyncSequence { + where Element == S.Element, S: AsyncSequence, Element: Sendable { for try await element in sequence { try await self.write(contentsOf: CollectionOfOne(element)) }