Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ let package = Package(
"NIOCore",
"NIOEmbedded",
"NIOFoundationCompat",
"NIOTestUtils",
swiftAtomics,
],
swiftSettings: strictConcurrencySettings
Expand Down
164 changes: 164 additions & 0 deletions Sources/NIOTestUtils/ManualTaskExecutor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if compiler(>=6)

import DequeModule
import Synchronization

/// Provide a `ManualTaskExecutor` for the duration of the given `body`.
///
/// The executor can be used for setting the executor preference of tasks and fully control
/// when execution of the tasks is performed.
///
/// Example usage:
/// ```swift
/// await withDiscardingTaskGroup { group in
/// await withManualTaskExecutor { taskExecutor in
/// group.addTask(executorPreference: taskExecutor) {
/// print("Running")
/// }
/// taskExecutor.runUntilQueueIsEmpty() // Run the task synchronously
/// }
/// }
/// ```
///
/// - warning: Do not escape the task executor from the closure for later use and make sure that
/// all tasks running on the executor are completely finished before `body` returns.
/// It is highly recommended to use structured concurrency with this task executor.
///
/// - Parameters:
/// - body: The closure that will accept the task executor.
///
/// - Throws: When `body` throws.
///
/// - Returns: The value returned by `body`.
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
@inlinable
public func withManualTaskExecutor<T, Failure>(
body: (ManualTaskExecutor) async throws(Failure) -> T
) async throws(Failure) -> T {
let taskExecutor = ManualTaskExecutor()
defer { taskExecutor.shutdown() }
return try await body(taskExecutor)
}

/// Provide two `ManualTaskExecutor`s for the duration of the given `body`.
///
/// The executors can be used for setting the executor preference of tasks and fully control
/// when execution of the tasks is performed.
///
/// Example usage:
/// ```swift
/// await withDiscardingTaskGroup { group in
/// await withManualTaskExecutor { taskExecutor1, taskExecutor2 in
/// group.addTask(executorPreference: taskExecutor1) {
/// print("Running 1")
/// }
/// group.addTask(executorPreference: taskExecutor2) {
/// print("Running 2")
/// }
/// taskExecutor2.runUntilQueueIsEmpty() // Run second task synchronously
/// taskExecutor1.runUntilQueueIsEmpty() // Run first task synchronously
/// }
/// }
/// ```
///
/// - warning: Do not escape the task executors from the closure for later use and make sure that
/// all tasks running on the executors are completely finished before `body` returns.
/// It is highly recommended to use structured concurrency with these task executors.
///
/// - Parameters:
/// - body: The closure that will accept the task executors.
///
/// - Throws: When `body` throws.
///
/// - Returns: The value returned by `body`.
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
@inlinable
public func withManualTaskExecutor<T, Failure>(
body: (ManualTaskExecutor, ManualTaskExecutor) async throws(Failure) -> T
) async throws(Failure) -> T {
let taskExecutor1 = ManualTaskExecutor()
defer { taskExecutor1.shutdown() }

let taskExecutor2 = ManualTaskExecutor()
defer { taskExecutor2.shutdown() }

return try await body(taskExecutor1, taskExecutor2)
}

/// Manual task executor.
///
/// A `TaskExecutor` that does not use any threadpool or similar mechanism to run the jobs.
/// Jobs are manually run by calling the `runUntilQueueIsEmpty` method.
///
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
public final class ManualTaskExecutor: TaskExecutor {
struct Storage {
var isShutdown = false
var jobs = Deque<UnownedJob>()
}

private let storage = Mutex<Storage>(.init())

@usableFromInline
init() {}

/// Run jobs until queue is empty.
///
/// Synchronously runs all enqueued jobs, including any jobs that are enqueued while running.
/// When this function returns, it means that each task running on this executor is either:
/// - suspended
/// - moved (temporarily) to a different executor
/// - finished
///
/// If not all tasks are finished, this function must be called again.
public func runUntilQueueIsEmpty() {
while let job = self.storage.withLock({ $0.jobs.popFirst() }) {
job.runSynchronously(on: self.asUnownedTaskExecutor())
}
}

/// Enqueue a job.
///
/// Called by the concurrency runtime.
///
/// - Parameter job: The job to enqueue.
public func enqueue(_ job: UnownedJob) {
self.storage.withLock { storage in
if storage.isShutdown {
fatalError("A job is enqueued after manual executor shutdown")
}
storage.jobs.append(job)
}
}

/// Shutdown.
///
/// Since the manual task executor is not running anything in the background, this is purely to catch
/// any issues due to incorrect usage of the executor. The shutdown verifies that the queue is empty
/// and makes sure that no new jobs can be enqueued.
@usableFromInline
func shutdown() {
self.storage.withLock { storage in
if !storage.jobs.isEmpty {
fatalError("Shutdown of manual executor with jobs in queue")
}
storage.isShutdown = true
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we really need this complicated shutdown handling. Since task executors can only be set on a scope we shouldn't have to do any manual clean up or reject jobs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can catch some incorrect usage, like for example:

await withManualTaskExecutor { taskExecutor in
    Task(executorPreference: taskExecutor) {
        try await Task.sleep(for: .seconds(3))
    }
    taskExecutor.runUntilQueueIsEmpty()  // Runs until the sleep starts, but the task is not finished yet
}

It will definitely not catch all problems, but I was thinking there could be value in catching some.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is not a public thing anymore I'm fine with leaving it. @Lukasa @glbrntt you want to take a look before we merge it?

}

#endif // compiler(>=6)
86 changes: 46 additions & 40 deletions Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import DequeModule
import NIOConcurrencyHelpers
import NIOTestUtils
import XCTest

@testable import NIOCore
Expand Down Expand Up @@ -606,48 +607,53 @@ final class NIOAsyncWriterTests: XCTestCase {
self.assert(suspendCallCount: 1, yieldCallCount: 1, terminateCallCount: 1)
}

func testSuspendingBufferedYield_whenWriterFinished() async throws {
self.sink.setWritability(to: false)

let bothSuspended = expectation(description: "suspended on both yields")
let suspendedAgain = ConditionLock(value: false)
self.delegate.didSuspendHandler = {
if self.delegate.didSuspendCallCount == 2 {
bothSuspended.fulfill()
} else if self.delegate.didSuspendCallCount > 2 {
suspendedAgain.lock()
suspendedAgain.unlock(withValue: true)
}
}

self.delegate.didYieldHandler = { _ in
if self.delegate.didYieldCallCount == 1 {
// Delay this yield until the other yield is suspended again.
suspendedAgain.lock(whenValue: true)
suspendedAgain.unlock()
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
func testWriterFinish_AndSuspendBufferedYield() async throws {
#if compiler(>=6)
try await withThrowingTaskGroup(of: Void.self) { group in
try await withManualTaskExecutor { taskExecutor1, taskExecutor2 in
self.sink.setWritability(to: false)

self.delegate.didYieldHandler = { _ in
if self.delegate.didYieldCallCount == 1 {
// This is the yield of the first task. Run the second task until it suspends again
self.assert(suspendCallCount: 2, yieldCallCount: 1, terminateCallCount: 0)
taskExecutor2.runUntilQueueIsEmpty()
self.assert(suspendCallCount: 3, yieldCallCount: 1, terminateCallCount: 0)
}
}

group.addTask(executorPreference: taskExecutor1) { [writer] in
try await writer!.yield("message1")
}
group.addTask(executorPreference: taskExecutor2) { [writer] in
try await writer!.yield("message2")
}

// Run tasks until they are both suspended
taskExecutor1.runUntilQueueIsEmpty()
taskExecutor2.runUntilQueueIsEmpty()
self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0)

self.writer.finish()

// We have to become writable again to unbuffer the yields
self.sink.setWritability(to: true)

// Run the first task, which will complete its yield
// During this yield, didYieldHandler will run the second task, which will suspend again
taskExecutor1.runUntilQueueIsEmpty()
self.assert(suspendCallCount: 3, yieldCallCount: 1, terminateCallCount: 0)

// Run the second task to complete its yield
taskExecutor2.runUntilQueueIsEmpty()
self.assert(suspendCallCount: 3, yieldCallCount: 2, terminateCallCount: 1)

await XCTAssertNoThrow(try await group.next())
await XCTAssertNoThrow(try await group.next())
}
}

let task1 = Task { [writer] in
try await writer!.yield("message1")
}
let task2 = Task { [writer] in
try await writer!.yield("message2")
}

await fulfillment(of: [bothSuspended], timeout: 1)
self.writer.finish()

self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0)

// We have to become writable again to unbuffer the yields
// The first call to didYield will pause, so that the other yield will be suspended again.
self.sink.setWritability(to: true)

await XCTAssertNoThrow(try await task1.value)
await XCTAssertNoThrow(try await task2.value)

self.assert(suspendCallCount: 3, yieldCallCount: 2, terminateCallCount: 1)
#endif // compiler(>=6)
}

func testWriterFinish_whenFinished() {
Expand Down
69 changes: 69 additions & 0 deletions Tests/NIOTestUtilsTests/ManualTaskExecutorTest.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if compiler(>=6)

import NIOTestUtils
import Synchronization
import XCTest

class ManualTaskExecutorTest: XCTestCase {
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
func testManualTaskExecutor() async {
await withDiscardingTaskGroup { group in
await withManualTaskExecutor { taskExecutor in
let taskDidRun = Mutex(false)

group.addTask(executorPreference: taskExecutor) {
taskDidRun.withLock { $0 = true }
}

// Run task
XCTAssertFalse(taskDidRun.withLock { $0 })
taskExecutor.runUntilQueueIsEmpty()
XCTAssertTrue(taskDidRun.withLock { $0 })
}
}
}

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
func testTwoManualTaskExecutors() async {
await withDiscardingTaskGroup { group in
await withManualTaskExecutor { taskExecutor1, taskExecutor2 in
let task1DidRun = Mutex(false)
let task2DidRun = Mutex(false)

group.addTask(executorPreference: taskExecutor1) {
task1DidRun.withLock { $0 = true }
}

group.addTask(executorPreference: taskExecutor2) {
task2DidRun.withLock { $0 = true }
}

// Run task 1
XCTAssertFalse(task1DidRun.withLock { $0 })
taskExecutor1.runUntilQueueIsEmpty()
XCTAssertTrue(task1DidRun.withLock { $0 })

// Run task 2
XCTAssertFalse(task2DidRun.withLock { $0 })
taskExecutor2.runUntilQueueIsEmpty()
XCTAssertTrue(task2DidRun.withLock { $0 })
}
}
}
}

#endif // compiler(>=6)