Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions Sources/NIOPosix/NIOThreadPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import Dispatch
#endif

/// Errors that may be thrown when executing work on a `NIOThreadPool`
public enum NIOThreadPoolError {
public enum NIOThreadPoolError: Sendable {

/// The `NIOThreadPool` was not active.
public struct ThreadPoolInactive: Error {
Expand Down Expand Up @@ -73,7 +73,7 @@ public final class NIOThreadPool {
}

@usableFromInline
internal enum State {
internal enum State: Sendable {
/// The `NIOThreadPool` is already stopped.
case stopped
/// The `NIOThreadPool` is shutting down, the array has one boolean entry for each thread indicating if it has shut down already.
Expand All @@ -87,7 +87,7 @@ public final class NIOThreadPool {

/// Whether threads in the pool have work.
@usableFromInline
internal enum _WorkState: Hashable {
internal enum _WorkState: Hashable, Sendable {
case hasWork
case hasNoWork
}
Expand Down Expand Up @@ -138,7 +138,7 @@ public final class NIOThreadPool {
self._shutdownGracefully(queue: queue, callback)
}

private func _shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
private func _shutdownGracefully(queue: DispatchQueue, _ callback: @escaping @Sendable (Error?) -> Void) {
guard self.canBeStopped else {
queue.async {
callback(NIOThreadPoolError.UnsupportedOperation())
Expand Down Expand Up @@ -420,11 +420,17 @@ extension NIOThreadPool {
/// - body: The closure which performs some blocking work to be done on the thread pool.
/// - Returns: The `EventLoopFuture` of `promise` fulfilled with the result (or error) of the passed closure.
@preconcurrency
public func runIfActive<T>(eventLoop: EventLoop, _ body: @escaping @Sendable () throws -> T) -> EventLoopFuture<T> {
public func runIfActive<T: Sendable>(
eventLoop: EventLoop,
_ body: @escaping @Sendable () throws -> T
) -> EventLoopFuture<T> {
self._runIfActive(eventLoop: eventLoop, body)
}

private func _runIfActive<T>(eventLoop: EventLoop, _ body: @escaping () throws -> T) -> EventLoopFuture<T> {
private func _runIfActive<T: Sendable>(
eventLoop: EventLoop,
_ body: @escaping @Sendable () throws -> T
) -> EventLoopFuture<T> {
let promise = eventLoop.makePromise(of: T.self)
self.submit { shouldRun in
guard case shouldRun = NIOThreadPool.WorkItemState.active else {
Expand Down Expand Up @@ -498,20 +504,21 @@ extension NIOThreadPool {
}

private func _syncShutdownGracefully() throws {
let errorStorageLock = NIOLock()
var errorStorage: Swift.Error? = nil
let continuation = DispatchWorkItem {}
let errorStorageLock = NIOLockedValueBox<Swift.Error?>(nil)
let continuation = ConditionLock(value: 0)
self.shutdownGracefully { error in
if let error = error {
errorStorageLock.withLock {
errorStorage = error
errorStorageLock.withLockedValue {
$0 = error
}
}
continuation.perform()
continuation.lock(whenValue: 0)
continuation.unlock(withValue: 1)
}
continuation.wait()
try errorStorageLock.withLock {
if let error = errorStorage {
continuation.lock(whenValue: 1)
continuation.unlock()
try errorStorageLock.withLockedValue {
if let error = $0 {
throw error
}
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/NIOPosix/Thread.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ enum LowLevelThreadOperations {
}

protocol ThreadOps {
associatedtype ThreadHandle
associatedtype ThreadHandle: Sendable
associatedtype ThreadSpecificKey
associatedtype ThreadSpecificKeyDestructor

Expand All @@ -41,7 +41,7 @@ protocol ThreadOps {
///
/// All methods exposed are thread-safe.
@usableFromInline
final class NIOThread {
final class NIOThread: Sendable {
internal typealias ThreadBoxValue = (body: (NIOThread) -> Void, name: String?)
internal typealias ThreadBox = Box<ThreadBoxValue>

Expand Down
100 changes: 60 additions & 40 deletions Sources/NIOPosix/ThreadPosix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,27 @@ private func sysPthread_create(

typealias ThreadOpsSystem = ThreadOpsPosix

struct PthreadWrapper: @unchecked Sendable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can't we use our existing UnsafeTransfer for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but in this case I wanted a special-case wrapper that we can see, and aim to remove when it's no longer needed.

var handle: pthread_t
}

extension Optional where Wrapped == PthreadWrapper {
mutating func withHandlePointer<
ReturnValue
>(
_ body: (UnsafeMutablePointer<pthread_t?>) throws -> ReturnValue
) rethrows -> ReturnValue {
var handle = self?.handle
defer {
self = handle.map { PthreadWrapper(handle: $0) }
}

return try body(&handle)
}
}

enum ThreadOpsPosix: ThreadOps {
typealias ThreadHandle = pthread_t
typealias ThreadHandle = PthreadWrapper
typealias ThreadSpecificKey = pthread_key_t
#if canImport(Darwin)
typealias ThreadSpecificKeyDestructor = @convention(c) (UnsafeMutableRawPointer) -> Void
Expand All @@ -89,7 +108,7 @@ enum ThreadOpsPosix: ThreadOps {
// anyway.
var chars: [CChar] = Array(repeating: 0, count: 64)
return chars.withUnsafeMutableBufferPointer { ptr in
guard sys_pthread_getname_np(thread, ptr.baseAddress!, ptr.count) == 0 else {
guard sys_pthread_getname_np(thread.handle, ptr.baseAddress!, ptr.count) == 0 else {
return nil
}

Expand All @@ -105,60 +124,61 @@ enum ThreadOpsPosix: ThreadOps {
detachThread: Bool
) {
let argv0 = Unmanaged.passRetained(args).toOpaque()
let res = sysPthread_create(
handle: &handle,
destructor: {
// Cast to UnsafeMutableRawPointer? and force unwrap to make the
// same code work on macOS and Linux.
let boxed = Unmanaged<NIOThread.ThreadBox>
.fromOpaque(($0 as UnsafeMutableRawPointer?)!)
.takeRetainedValue()
let (body, name) = (boxed.value.body, boxed.value.name)
let hThread: ThreadOpsSystem.ThreadHandle = pthread_self()

if let name = name {
let maximumThreadNameLength: Int
#if os(Linux) || os(Android)
maximumThreadNameLength = 15
#else
maximumThreadNameLength = .max
#endif
name.prefix(maximumThreadNameLength).withCString { namePtr in
// this is non-critical so we ignore the result here, we've seen
// EPERM in containers.
_ = sys_pthread_setname_np(hThread, namePtr)
let res = handle.withHandlePointer { handlePtr in
sysPthread_create(
handle: handlePtr,
destructor: {
// Cast to UnsafeMutableRawPointer? and force unwrap to make the
// same code work on macOS and Linux.
let boxed = Unmanaged<NIOThread.ThreadBox>
.fromOpaque(($0 as UnsafeMutableRawPointer?)!)
.takeRetainedValue()
let (body, name) = (boxed.value.body, boxed.value.name)
let hThread: ThreadOpsSystem.ThreadHandle = PthreadWrapper(handle: pthread_self())

if let name = name {
let maximumThreadNameLength: Int
#if os(Linux) || os(Android)
maximumThreadNameLength = 15
#else
maximumThreadNameLength = .max
#endif
name.prefix(maximumThreadNameLength).withCString { namePtr in
// this is non-critical so we ignore the result here, we've seen
// EPERM in containers.
_ = sys_pthread_setname_np(hThread.handle, namePtr)
}
}
}

body(NIOThread(handle: hThread, desiredName: name))
body(NIOThread(handle: hThread, desiredName: name))

#if os(Android)
return UnsafeMutableRawPointer(bitPattern: 0xdeadbee)!
#else
return nil
#endif
},
args: argv0
)
#if os(Android)
return UnsafeMutableRawPointer(bitPattern: 0xdeadbee)!
#else
return nil
#endif
},
args: argv0
)
}
precondition(res == 0, "Unable to create thread: \(res)")

if detachThread {
let detachError = pthread_detach(handle!)
let detachError = pthread_detach(handle!.handle)
precondition(detachError == 0, "pthread_detach failed with error \(detachError)")
}

}

static func isCurrentThread(_ thread: ThreadOpsSystem.ThreadHandle) -> Bool {
pthread_equal(thread, pthread_self()) != 0
pthread_equal(thread.handle, pthread_self()) != 0
}

static var currentThread: ThreadOpsSystem.ThreadHandle {
pthread_self()
PthreadWrapper(handle: pthread_self())
}

static func joinThread(_ thread: ThreadOpsSystem.ThreadHandle) {
let err = pthread_join(thread, nil)
let err = pthread_join(thread.handle, nil)
assert(err == 0, "pthread_join failed with \(err)")
}

Expand All @@ -184,7 +204,7 @@ enum ThreadOpsPosix: ThreadOps {
}

static func compareThreads(_ lhs: ThreadOpsSystem.ThreadHandle, _ rhs: ThreadOpsSystem.ThreadHandle) -> Bool {
pthread_equal(lhs, rhs) != 0
pthread_equal(lhs.handle, rhs.handle) != 0
}
}

Expand Down
Loading