Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ let package = Package(
"NIOCore",
"NIOEmbedded",
"NIOFoundationCompat",
"NIOTestUtils",
swiftAtomics,
],
swiftSettings: strictConcurrencySettings
Expand Down
166 changes: 166 additions & 0 deletions Sources/NIOTestUtils/ManualTaskExecutor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if compiler(>=6)

import DequeModule
import Synchronization

/// Provide a `ManualTaskExecutor` for the duration of the given `body`.
///
/// The executor can be used for setting the executor preference of tasks and fully control
/// when execution of the tasks is performed.
///
/// Example usage:
/// ```swift
/// await withDiscardingTaskGroup { group in
/// await withManualTaskExecutor { taskExecutor in
/// group.addTask(executorPreference: taskExecutor) {
/// print("Running")
/// }
/// taskExecutor.runUntilQueueIsEmpty() // Run the task synchronously
/// }
/// }
/// ```
///
/// - warning: Do not escape the task executor from the closure for later use and make sure that
/// all tasks running on the executor are completely finished before `body` returns.
/// It is highly recommended to use structured concurrency with this task executor.
///
/// - Parameters:
/// - body: The closure that will accept the task executor.
///
/// - Throws: When `body` throws.
///
/// - Returns: The value returned by `body`.
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
@inlinable
package func withManualTaskExecutor<T, Failure>(
body: (ManualTaskExecutor) async throws(Failure) -> T
) async throws(Failure) -> T {
let taskExecutor = ManualTaskExecutor()
defer { taskExecutor.shutdown() }
return try await body(taskExecutor)
}

/// Provide two `ManualTaskExecutor`s for the duration of the given `body`.
///
/// The executors can be used for setting the executor preference of tasks and fully control
/// when execution of the tasks is performed.
///
/// Example usage:
/// ```swift
/// await withDiscardingTaskGroup { group in
/// await withManualTaskExecutor { taskExecutor1, taskExecutor2 in
/// group.addTask(executorPreference: taskExecutor1) {
/// print("Running 1")
/// }
/// group.addTask(executorPreference: taskExecutor2) {
/// print("Running 2")
/// }
/// taskExecutor2.runUntilQueueIsEmpty() // Run second task synchronously
/// taskExecutor1.runUntilQueueIsEmpty() // Run first task synchronously
/// }
/// }
/// ```
///
/// - warning: Do not escape the task executors from the closure for later use and make sure that
/// all tasks running on the executors are completely finished before `body` returns.
/// It is highly recommended to use structured concurrency with these task executors.
///
/// - Parameters:
/// - body: The closure that will accept the task executors.
///
/// - Throws: When `body` throws.
///
/// - Returns: The value returned by `body`.
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
@inlinable
package func withManualTaskExecutor<T, Failure>(
body: (ManualTaskExecutor, ManualTaskExecutor) async throws(Failure) -> T
) async throws(Failure) -> T {
let taskExecutor1 = ManualTaskExecutor()
defer { taskExecutor1.shutdown() }

let taskExecutor2 = ManualTaskExecutor()
defer { taskExecutor2.shutdown() }

return try await body(taskExecutor1, taskExecutor2)
}

/// Manual task executor.
///
/// A `TaskExecutor` that does not use any threadpool or similar mechanism to run the jobs.
/// Jobs are manually run by calling the `runUntilQueueIsEmpty` method.
///
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
@usableFromInline
package final class ManualTaskExecutor: TaskExecutor {
struct Storage {
var isShutdown = false
var jobs = Deque<UnownedJob>()
}

private let storage = Mutex<Storage>(.init())

@usableFromInline
init() {}

/// Run jobs until queue is empty.
///
/// Synchronously runs all enqueued jobs, including any jobs that are enqueued while running.
/// When this function returns, it means that each task running on this executor is either:
/// - suspended
/// - moved (temporarily) to a different executor
/// - finished
///
/// If not all tasks are finished, this function must be called again.
package func runUntilQueueIsEmpty() {
while let job = self.storage.withLock({ $0.jobs.popFirst() }) {
job.runSynchronously(on: self.asUnownedTaskExecutor())
}
}

/// Enqueue a job.
///
/// Called by the concurrency runtime.
///
/// - Parameter job: The job to enqueue.
@usableFromInline
package func enqueue(_ job: UnownedJob) {
self.storage.withLock { storage in
if storage.isShutdown {
fatalError("A job is enqueued after manual executor shutdown")
}
storage.jobs.append(job)
}
}

/// Shutdown.
///
/// Since the manual task executor is not running anything in the background, this is purely to catch
/// any issues due to incorrect usage of the executor. The shutdown verifies that the queue is empty
/// and makes sure that no new jobs can be enqueued.
@usableFromInline
func shutdown() {
self.storage.withLock { storage in
if !storage.jobs.isEmpty {
fatalError("Shutdown of manual executor with jobs in queue")
}
storage.isShutdown = true
}
}
}

#endif // compiler(>=6)
86 changes: 46 additions & 40 deletions Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import DequeModule
import NIOConcurrencyHelpers
import NIOTestUtils
import XCTest

@testable import NIOCore
Expand Down Expand Up @@ -606,49 +607,54 @@ final class NIOAsyncWriterTests: XCTestCase {
self.assert(suspendCallCount: 1, yieldCallCount: 1, terminateCallCount: 1)
}

func testSuspendingBufferedYield_whenWriterFinished() async throws {
self.sink.setWritability(to: false)

let bothSuspended = expectation(description: "suspended on both yields")
let suspendedAgain = ConditionLock(value: false)
self.delegate.didSuspendHandler = {
if self.delegate.didSuspendCallCount == 2 {
bothSuspended.fulfill()
} else if self.delegate.didSuspendCallCount > 2 {
suspendedAgain.lock()
suspendedAgain.unlock(withValue: true)
}
}

self.delegate.didYieldHandler = { _ in
if self.delegate.didYieldCallCount == 1 {
// Delay this yield until the other yield is suspended again.
suspendedAgain.lock(whenValue: true)
suspendedAgain.unlock()
#if compiler(>=6)
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
func testWriterFinish_AndSuspendBufferedYield() async throws {
try await withThrowingTaskGroup(of: Void.self) { group in
try await withManualTaskExecutor { taskExecutor1, taskExecutor2 in
self.sink.setWritability(to: false)

self.delegate.didYieldHandler = { _ in
if self.delegate.didYieldCallCount == 1 {
// This is the yield of the first task. Run the second task until it suspends again
self.assert(suspendCallCount: 2, yieldCallCount: 1, terminateCallCount: 0)
taskExecutor2.runUntilQueueIsEmpty()
self.assert(suspendCallCount: 3, yieldCallCount: 1, terminateCallCount: 0)
}
}

group.addTask(executorPreference: taskExecutor1) { [writer] in
try await writer!.yield("message1")
}
group.addTask(executorPreference: taskExecutor2) { [writer] in
try await writer!.yield("message2")
}

// Run tasks until they are both suspended
taskExecutor1.runUntilQueueIsEmpty()
taskExecutor2.runUntilQueueIsEmpty()
self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0)

self.writer.finish()

// We have to become writable again to unbuffer the yields
self.sink.setWritability(to: true)

// Run the first task, which will complete its yield
// During this yield, didYieldHandler will run the second task, which will suspend again
taskExecutor1.runUntilQueueIsEmpty()
self.assert(suspendCallCount: 3, yieldCallCount: 1, terminateCallCount: 0)

// Run the second task to complete its yield
taskExecutor2.runUntilQueueIsEmpty()
self.assert(suspendCallCount: 3, yieldCallCount: 2, terminateCallCount: 1)

await XCTAssertNoThrow(try await group.next())
await XCTAssertNoThrow(try await group.next())
}
}

let task1 = Task { [writer] in
try await writer!.yield("message1")
}
let task2 = Task { [writer] in
try await writer!.yield("message2")
}

await fulfillment(of: [bothSuspended], timeout: 1)
self.writer.finish()

self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0)

// We have to become writable again to unbuffer the yields
// The first call to didYield will pause, so that the other yield will be suspended again.
self.sink.setWritability(to: true)

await XCTAssertNoThrow(try await task1.value)
await XCTAssertNoThrow(try await task2.value)

self.assert(suspendCallCount: 3, yieldCallCount: 2, terminateCallCount: 1)
}
#endif // compiler(>=6)

func testWriterFinish_whenFinished() {
// This tests just checks that finishing again is a no-op
Expand Down
69 changes: 69 additions & 0 deletions Tests/NIOTestUtilsTests/ManualTaskExecutorTest.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if compiler(>=6)

import NIOTestUtils
import Synchronization
import XCTest

class ManualTaskExecutorTest: XCTestCase {
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
func testManualTaskExecutor() async {
await withDiscardingTaskGroup { group in
await withManualTaskExecutor { taskExecutor in
let taskDidRun = Mutex(false)

group.addTask(executorPreference: taskExecutor) {
taskDidRun.withLock { $0 = true }
}

// Run task
XCTAssertFalse(taskDidRun.withLock { $0 })
taskExecutor.runUntilQueueIsEmpty()
XCTAssertTrue(taskDidRun.withLock { $0 })
}
}
}

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
func testTwoManualTaskExecutors() async {
await withDiscardingTaskGroup { group in
await withManualTaskExecutor { taskExecutor1, taskExecutor2 in
let task1DidRun = Mutex(false)
let task2DidRun = Mutex(false)

group.addTask(executorPreference: taskExecutor1) {
task1DidRun.withLock { $0 = true }
}

group.addTask(executorPreference: taskExecutor2) {
task2DidRun.withLock { $0 = true }
}

// Run task 1
XCTAssertFalse(task1DidRun.withLock { $0 })
taskExecutor1.runUntilQueueIsEmpty()
XCTAssertTrue(task1DidRun.withLock { $0 })

// Run task 2
XCTAssertFalse(task2DidRun.withLock { $0 })
taskExecutor2.runUntilQueueIsEmpty()
XCTAssertTrue(task2DidRun.withLock { $0 })
}
}
}
}

#endif // compiler(>=6)
Loading