@@ -4,7 +4,7 @@ import CNodeAPISupport
44
55extension NodeContext {
66 // if we're on a node thread, run `action` on it
7- static func runOnActor< T> ( _ action: @NodeActor ( ) throws -> T ) rethrows -> T ? {
7+ static func runOnActor< T> ( _ action: @NodeActor @ Sendable ( ) throws -> T ) rethrows -> T ? {
88 guard NodeContext . hasCurrent else { return nil }
99 return try NodeActor . unsafeAssumeIsolated ( action)
1010 }
@@ -27,34 +27,54 @@ extension UnownedJob {
2727
2828@available ( macOS 10 . 15 , iOS 13 . 0 , watchOS 6 . 0 , tvOS 13 . 0 , * )
2929private final class NodeExecutor : SerialExecutor {
30- func enqueue( _ job: UnownedJob ) {
31- // We want to access `job`'s task-local storage. To do so,
32- // this temporarily swaps ResumeTask for our own function.
33- // Then, swift_job_run is called, which sets the active task to
34- // the receiver and invokes its ResumeTask. We then execute the
35- // given closure, allowing us to grab task-local values. Finally,
36- // we "suspend" the task and return ResumeTask to its old value.
37- //
38- // on Darwin we can instead replace the "current task" thread-local
39- // (key 103) temporarily, but that isn't portable.
40- //
41- // This is sort of like inserting a "work(); await Task.yield()" block
42- // at the top of the task, since when a Task awaits it similarly changes
43- // the Resume function and suspends. Note that we can assume that this
44- // is a Task and not a basic Job, because Executor.enqueue is only
45- // called from swift_task_enqueue.
46- let target = job. asCurrent { NodeActor . target }
47-
48- guard let q = target? . queue else {
49- nodeFatalError ( " There is no target NodeAsyncQueue associated with this Task " )
30+ private let schedulerQueue = DispatchQueue ( label: " NodeExecutorScheduler " )
31+
32+ fileprivate init ( ) {
33+ if #unavailable( macOS 15 . 0 , iOS 18 . 0 , watchOS 11 . 0 , tvOS 18 . 0 , visionOS 2 . 0 ) {
34+ // checkExecutor isn't respected prior to these OS versions, so
35+ // we end up with a lot of false alarms. Disable unexpected executor
36+ // checking to suppress this.
37+ setenv ( " SWIFT_UNEXPECTED_EXECUTOR_LOG_LEVEL " , " 0 " , 1 )
5038 }
39+ }
5140
52- let ref = asUnownedSerialExecutor ( )
41+ func enqueue( _ job: UnownedJob ) {
42+ schedulerQueue. async {
43+ // We want to access `job`'s task-local storage. To do so,
44+ // this temporarily swaps ResumeTask for our own function.
45+ // Then, swift_job_run is called, which sets the active task to
46+ // the receiver and invokes its ResumeTask. We then execute the
47+ // given closure, allowing us to grab task-local values. Finally,
48+ // we "suspend" the task and return ResumeTask to its old value.
49+ //
50+ // on Darwin we can instead replace the "current task" thread-local
51+ // (key 103) temporarily, but that isn't portable.
52+ //
53+ // This is sort of like inserting a "work(); await Task.yield()" block
54+ // at the top of the task, since when a Task awaits it similarly changes
55+ // the Resume function and suspends. Note that we can assume that this
56+ // is a Task and not a basic Job, because Executor.enqueue is only
57+ // called from swift_task_enqueue.
58+ //
59+ // Regarding `schedulerQueue.async`:
60+ // Pre Swift 6.0 we didn't need a scheduler queue as enqueue would always
61+ // run on the global queue. However, Swift 6 introduces optimizations in
62+ // Task dispatch that allow tasks to be enqueued more efficiently, including
63+ // that Task.init avoids a hop when possible. This, however, interferes with
64+ // our `job.asCurrent` code because `asCurrent` relies on there being no already-
65+ // running task (swift_job_run doesn't play well with nesting, it's possible but
66+ // requires more private APIs, cf [swift_task_startOnMainActor][1]). The simplest
67+ // solution is to hop onto our own queue for scheduling.
68+ //
69+ // [1]: https://github.com/apple/swift/blob/876c056153554f93b89dfd134794a05426ee789a/stdlib/public/Concurrency/Task.cpp#L1739
70+ let target = job. asCurrent { NodeActor . target }
71+
72+ guard let q = target? . queue else {
73+ nodeFatalError ( " There is no target NodeAsyncQueue associated with this Task " )
74+ }
75+
76+ let ref = self . asUnownedSerialExecutor ( )
5377
54- if q. instanceID == NodeContext . runOnActor ( { try ? Node . instanceID ( ) } ) {
55- // if we're already on the right thread, skip a hop
56- job. runSynchronously ( on: ref)
57- } else {
5878 do {
5979 try q. run { job. runSynchronously ( on: ref) }
6080 } catch {
@@ -66,6 +86,10 @@ private final class NodeExecutor: SerialExecutor {
6686 func asUnownedSerialExecutor( ) -> UnownedSerialExecutor {
6787 . init( ordinary: self )
6888 }
89+
90+ func checkIsolated( ) {
91+ // TODO: crash if we're not on a Node thread
92+ }
6993}
7094
7195// This isn't *actually* a single global actor. Rather, its associated
@@ -81,9 +105,9 @@ private final class NodeExecutor: SerialExecutor {
81105
82106 @TaskLocal static var target : NodeAsyncQueue . Handle ?
83107
84- private nonisolated let _unownedExecutor = NodeExecutor ( )
108+ private nonisolated let executor = NodeExecutor ( )
85109 public nonisolated var unownedExecutor : UnownedSerialExecutor {
86- _unownedExecutor . asUnownedSerialExecutor ( )
110+ executor . asUnownedSerialExecutor ( )
87111 }
88112
89113 public static func run< T: Sendable > ( resultType: T . Type = T . self, body: @NodeActor @Sendable ( ) throws -> T ) async rethrows -> T {
@@ -92,14 +116,14 @@ private final class NodeExecutor: SerialExecutor {
92116}
93117
94118extension NodeActor {
95- public static func unsafeAssumeIsolated< T> ( _ action: @NodeActor ( ) throws -> T ) rethrows -> T {
119+ public static func unsafeAssumeIsolated< T> ( _ action: @NodeActor @ Sendable ( ) throws -> T ) rethrows -> T {
96120 try withoutActuallyEscaping ( action) {
97121 try unsafeBitCast( $0, to: ( ( ) throws -> T) . self ) ( )
98122 }
99123 }
100124
101125 public static func assumeIsolated< T> (
102- _ action: @NodeActor ( ) throws -> T ,
126+ _ action: @NodeActor @ Sendable ( ) throws -> T ,
103127 file: StaticString = #fileID,
104128 line: UInt = #line
105129 ) rethrows -> T {
0 commit comments