diff --git a/Sources/NIOPosix/LinuxCPUSet.swift b/Sources/NIOPosix/LinuxCPUSet.swift index 6e4af51c42e..4ca63931dd3 100644 --- a/Sources/NIOPosix/LinuxCPUSet.swift +++ b/Sources/NIOPosix/LinuxCPUSet.swift @@ -52,7 +52,7 @@ extension NIOThread { CNIOLinux_CPU_ZERO(&cpuset) let res = self.withUnsafeThreadHandle { p in - CNIOLinux_pthread_getaffinity_np(p, MemoryLayout.size(ofValue: cpuset), &cpuset) + CNIOLinux_pthread_getaffinity_np(p.handle, MemoryLayout.size(ofValue: cpuset), &cpuset) } precondition(res == 0, "pthread_getaffinity_np failed: \(res)") @@ -75,7 +75,7 @@ extension NIOThread { CNIOLinux_CPU_SET(CInt(cpuID), &cpuset) } let res = self.withUnsafeThreadHandle { p in - CNIOLinux_pthread_setaffinity_np(p, MemoryLayout.size(ofValue: cpuset), &cpuset) + CNIOLinux_pthread_setaffinity_np(p.handle, MemoryLayout.size(ofValue: cpuset), &cpuset) } precondition(res == 0, "pthread_setaffinity_np failed: \(res)") } diff --git a/Sources/NIOPosix/NIOThreadPool.swift b/Sources/NIOPosix/NIOThreadPool.swift index 75dfd08749e..c3196b67720 100644 --- a/Sources/NIOPosix/NIOThreadPool.swift +++ b/Sources/NIOPosix/NIOThreadPool.swift @@ -22,7 +22,7 @@ import Dispatch #endif /// Errors that may be thrown when executing work on a `NIOThreadPool` -public enum NIOThreadPoolError { +public enum NIOThreadPoolError: Sendable { /// The `NIOThreadPool` was not active. public struct ThreadPoolInactive: Error { @@ -73,7 +73,7 @@ public final class NIOThreadPool { } @usableFromInline - internal enum State { + internal enum State: Sendable { /// The `NIOThreadPool` is already stopped. case stopped /// The `NIOThreadPool` is shutting down, the array has one boolean entry for each thread indicating if it has shut down already. @@ -87,7 +87,7 @@ public final class NIOThreadPool { /// Whether threads in the pool have work. @usableFromInline - internal enum _WorkState: Hashable { + internal enum _WorkState: Hashable, Sendable { case hasWork case hasNoWork } @@ -138,7 +138,7 @@ public final class NIOThreadPool { self._shutdownGracefully(queue: queue, callback) } - private func _shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { + private func _shutdownGracefully(queue: DispatchQueue, _ callback: @escaping @Sendable (Error?) -> Void) { guard self.canBeStopped else { queue.async { callback(NIOThreadPoolError.UnsupportedOperation()) @@ -420,11 +420,17 @@ extension NIOThreadPool { /// - body: The closure which performs some blocking work to be done on the thread pool. /// - Returns: The `EventLoopFuture` of `promise` fulfilled with the result (or error) of the passed closure. @preconcurrency - public func runIfActive(eventLoop: EventLoop, _ body: @escaping @Sendable () throws -> T) -> EventLoopFuture { + public func runIfActive( + eventLoop: EventLoop, + _ body: @escaping @Sendable () throws -> T + ) -> EventLoopFuture { self._runIfActive(eventLoop: eventLoop, body) } - private func _runIfActive(eventLoop: EventLoop, _ body: @escaping () throws -> T) -> EventLoopFuture { + private func _runIfActive( + eventLoop: EventLoop, + _ body: @escaping @Sendable () throws -> T + ) -> EventLoopFuture { let promise = eventLoop.makePromise(of: T.self) self.submit { shouldRun in guard case shouldRun = NIOThreadPool.WorkItemState.active else { @@ -498,20 +504,21 @@ extension NIOThreadPool { } private func _syncShutdownGracefully() throws { - let errorStorageLock = NIOLock() - var errorStorage: Swift.Error? = nil - let continuation = DispatchWorkItem {} + let errorStorageLock = NIOLockedValueBox(nil) + let continuation = ConditionLock(value: 0) self.shutdownGracefully { error in if let error = error { - errorStorageLock.withLock { - errorStorage = error + errorStorageLock.withLockedValue { + $0 = error } } - continuation.perform() + continuation.lock(whenValue: 0) + continuation.unlock(withValue: 1) } - continuation.wait() - try errorStorageLock.withLock { - if let error = errorStorage { + continuation.lock(whenValue: 1) + continuation.unlock() + try errorStorageLock.withLockedValue { + if let error = $0 { throw error } } diff --git a/Sources/NIOPosix/Thread.swift b/Sources/NIOPosix/Thread.swift index 565b775ef55..ab5afb97933 100644 --- a/Sources/NIOPosix/Thread.swift +++ b/Sources/NIOPosix/Thread.swift @@ -21,7 +21,7 @@ enum LowLevelThreadOperations { } protocol ThreadOps { - associatedtype ThreadHandle + associatedtype ThreadHandle: Sendable associatedtype ThreadSpecificKey associatedtype ThreadSpecificKeyDestructor @@ -41,7 +41,7 @@ protocol ThreadOps { /// /// All methods exposed are thread-safe. @usableFromInline -final class NIOThread { +final class NIOThread: Sendable { internal typealias ThreadBoxValue = (body: (NIOThread) -> Void, name: String?) internal typealias ThreadBox = Box diff --git a/Sources/NIOPosix/ThreadPosix.swift b/Sources/NIOPosix/ThreadPosix.swift index f876c42c8aa..9da87751690 100644 --- a/Sources/NIOPosix/ThreadPosix.swift +++ b/Sources/NIOPosix/ThreadPosix.swift @@ -74,8 +74,27 @@ private func sysPthread_create( typealias ThreadOpsSystem = ThreadOpsPosix +struct PthreadWrapper: @unchecked Sendable { + var handle: pthread_t +} + +extension Optional where Wrapped == PthreadWrapper { + mutating func withHandlePointer< + ReturnValue + >( + _ body: (UnsafeMutablePointer) throws -> ReturnValue + ) rethrows -> ReturnValue { + var handle = self?.handle + defer { + self = handle.map { PthreadWrapper(handle: $0) } + } + + return try body(&handle) + } +} + enum ThreadOpsPosix: ThreadOps { - typealias ThreadHandle = pthread_t + typealias ThreadHandle = PthreadWrapper typealias ThreadSpecificKey = pthread_key_t #if canImport(Darwin) typealias ThreadSpecificKeyDestructor = @convention(c) (UnsafeMutableRawPointer) -> Void @@ -89,7 +108,7 @@ enum ThreadOpsPosix: ThreadOps { // anyway. var chars: [CChar] = Array(repeating: 0, count: 64) return chars.withUnsafeMutableBufferPointer { ptr in - guard sys_pthread_getname_np(thread, ptr.baseAddress!, ptr.count) == 0 else { + guard sys_pthread_getname_np(thread.handle, ptr.baseAddress!, ptr.count) == 0 else { return nil } @@ -105,60 +124,61 @@ enum ThreadOpsPosix: ThreadOps { detachThread: Bool ) { let argv0 = Unmanaged.passRetained(args).toOpaque() - let res = sysPthread_create( - handle: &handle, - destructor: { - // Cast to UnsafeMutableRawPointer? and force unwrap to make the - // same code work on macOS and Linux. - let boxed = Unmanaged - .fromOpaque(($0 as UnsafeMutableRawPointer?)!) - .takeRetainedValue() - let (body, name) = (boxed.value.body, boxed.value.name) - let hThread: ThreadOpsSystem.ThreadHandle = pthread_self() - - if let name = name { - let maximumThreadNameLength: Int - #if os(Linux) || os(Android) - maximumThreadNameLength = 15 - #else - maximumThreadNameLength = .max - #endif - name.prefix(maximumThreadNameLength).withCString { namePtr in - // this is non-critical so we ignore the result here, we've seen - // EPERM in containers. - _ = sys_pthread_setname_np(hThread, namePtr) + let res = handle.withHandlePointer { handlePtr in + sysPthread_create( + handle: handlePtr, + destructor: { + // Cast to UnsafeMutableRawPointer? and force unwrap to make the + // same code work on macOS and Linux. + let boxed = Unmanaged + .fromOpaque(($0 as UnsafeMutableRawPointer?)!) + .takeRetainedValue() + let (body, name) = (boxed.value.body, boxed.value.name) + let hThread: ThreadOpsSystem.ThreadHandle = PthreadWrapper(handle: pthread_self()) + + if let name = name { + let maximumThreadNameLength: Int + #if os(Linux) || os(Android) + maximumThreadNameLength = 15 + #else + maximumThreadNameLength = .max + #endif + name.prefix(maximumThreadNameLength).withCString { namePtr in + // this is non-critical so we ignore the result here, we've seen + // EPERM in containers. + _ = sys_pthread_setname_np(hThread.handle, namePtr) + } } - } - body(NIOThread(handle: hThread, desiredName: name)) + body(NIOThread(handle: hThread, desiredName: name)) - #if os(Android) - return UnsafeMutableRawPointer(bitPattern: 0xdeadbee)! - #else - return nil - #endif - }, - args: argv0 - ) + #if os(Android) + return UnsafeMutableRawPointer(bitPattern: 0xdeadbee)! + #else + return nil + #endif + }, + args: argv0 + ) + } precondition(res == 0, "Unable to create thread: \(res)") if detachThread { - let detachError = pthread_detach(handle!) + let detachError = pthread_detach(handle!.handle) precondition(detachError == 0, "pthread_detach failed with error \(detachError)") } - } static func isCurrentThread(_ thread: ThreadOpsSystem.ThreadHandle) -> Bool { - pthread_equal(thread, pthread_self()) != 0 + pthread_equal(thread.handle, pthread_self()) != 0 } static var currentThread: ThreadOpsSystem.ThreadHandle { - pthread_self() + PthreadWrapper(handle: pthread_self()) } static func joinThread(_ thread: ThreadOpsSystem.ThreadHandle) { - let err = pthread_join(thread, nil) + let err = pthread_join(thread.handle, nil) assert(err == 0, "pthread_join failed with \(err)") } @@ -184,7 +204,7 @@ enum ThreadOpsPosix: ThreadOps { } static func compareThreads(_ lhs: ThreadOpsSystem.ThreadHandle, _ rhs: ThreadOpsSystem.ThreadHandle) -> Bool { - pthread_equal(lhs, rhs) != 0 + pthread_equal(lhs.handle, rhs.handle) != 0 } }