From ef7ee95ea9c0008a1cdfec60b27249d040777d33 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 20 Nov 2024 21:04:52 +0000 Subject: [PATCH 1/3] Revert "Complete NIOCore strict concurrency (#2959)" This reverts commit 37b31b95c2741f7f7220852d503adacc3e995e92. --- Package.swift | 7 +-- Sources/NIOCore/BSDSocketAPI.swift | 2 +- .../AsyncChannel/AsyncChannelTests.swift | 6 +-- .../NIOAsyncSequenceTests.swift | 4 +- .../AsyncSequences/NIOAsyncWriterTests.swift | 13 +++--- .../NIOThrowingAsyncSequenceTests.swift | 13 +++--- Tests/NIOCoreTests/ByteBufferTest.swift | 43 ++++++++----------- .../ChannelOptionStorageTest.swift | 13 +----- .../DispatchQueue+WithFutureTest.swift | 13 +++--- .../NIOCloseOnErrorHandlerTest.swift | 2 +- Tests/NIOCoreTests/XCTest+Extensions.swift | 2 +- 11 files changed, 47 insertions(+), 71 deletions(-) diff --git a/Package.swift b/Package.swift index 4bbbe48e722..3b380614a18 100644 --- a/Package.swift +++ b/Package.swift @@ -86,8 +86,7 @@ let package = Package( "_NIODataStructures", swiftCollections, swiftAtomics, - ], - swiftSettings: strictConcurrencySettings + ] ), .target( name: "_NIODataStructures", @@ -415,13 +414,11 @@ let package = Package( .testTarget( name: "NIOCoreTests", dependencies: [ - "NIOConcurrencyHelpers", "NIOCore", "NIOEmbedded", "NIOFoundationCompat", swiftAtomics, - ], - swiftSettings: strictConcurrencySettings + ] ), .testTarget( name: "NIOEmbeddedTests", diff --git a/Sources/NIOCore/BSDSocketAPI.swift b/Sources/NIOCore/BSDSocketAPI.swift index cb75c9e5d4a..3899c8ec969 100644 --- a/Sources/NIOCore/BSDSocketAPI.swift +++ b/Sources/NIOCore/BSDSocketAPI.swift @@ -121,7 +121,7 @@ let SO_TIMESTAMP = CNIOLinux_SO_TIMESTAMP let SO_RCVTIMEO = CNIOLinux_SO_RCVTIMEO #endif -public enum NIOBSDSocket: Sendable { +public enum NIOBSDSocket { #if os(Windows) public typealias Handle = SOCKET #else diff --git a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift index 3d21c2c7a97..c1473722a45 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift @@ -253,8 +253,7 @@ final class AsyncChannelTests: XCTestCase { let strongSentinel: Sentinel? = Sentinel() sentinel = strongSentinel! try await XCTAsyncAssertNotNil( - await channel.pipeline.handler(type: NIOAsyncChannelHandler.self).map { - _ -> Bool in + await channel.pipeline.handler(type: NIOAsyncChannelHandler.self).map { _ in true }.get() ) @@ -429,8 +428,9 @@ private final class CloseRecorder: ChannelOutboundHandler, @unchecked Sendable { } } -private final class CloseSuppressor: ChannelOutboundHandler, RemovableChannelHandler, Sendable { +private final class CloseSuppressor: ChannelOutboundHandler, RemovableChannelHandler { typealias OutboundIn = Any + typealias outbound = Any func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise?) { // We drop the close here. diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift index 2def03562a9..7a7583f59ee 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift @@ -519,7 +519,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let value = await iterator.next() resumed.fulfill() - await XCTWaiter().fulfillment(of: [cancelled], timeout: 1) + await fulfillment(of: [cancelled], timeout: 1) return value } @@ -562,7 +562,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let cancelled = expectation(description: "task cancelled") let task: Task = Task { - await XCTWaiter().fulfillment(of: [cancelled], timeout: 1) + await fulfillment(of: [cancelled], timeout: 1) let iterator = sequence.makeAsyncIterator() return await iterator.next() } diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift index 31c680b8bf3..95760bd4118 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift @@ -69,8 +69,7 @@ final class NIOAsyncWriterTests: XCTestCase { override func setUp() { super.setUp() - let delegate = MockAsyncWriterDelegate() - self.delegate = delegate + self.delegate = .init() let newWriter = NIOAsyncWriter.makeWriter( elementType: String.self, isWritable: true, @@ -79,7 +78,7 @@ final class NIOAsyncWriterTests: XCTestCase { ) self.writer = newWriter.writer self.sink = newWriter.sink - self.sink._storage._setDidSuspend { delegate.didSuspend() } + self.sink._storage._setDidSuspend { self.delegate.didSuspend() } } override func tearDown() { @@ -412,7 +411,7 @@ final class NIOAsyncWriterTests: XCTestCase { let cancelled = expectation(description: "task cancelled") let task = Task { [writer] in - await XCTWaiter().fulfillment(of: [cancelled], timeout: 1) + await fulfillment(of: [cancelled], timeout: 1) try await writer!.yield("message2") } @@ -471,7 +470,7 @@ final class NIOAsyncWriterTests: XCTestCase { let cancelled = expectation(description: "task cancelled") let task = Task { [writer] in - await XCTWaiter().fulfillment(of: [cancelled], timeout: 1) + await fulfillment(of: [cancelled], timeout: 1) try await writer!.yield("message1") } @@ -492,7 +491,7 @@ final class NIOAsyncWriterTests: XCTestCase { let cancelled = expectation(description: "task cancelled") let task = Task { [writer] in - await XCTWaiter().fulfillment(of: [cancelled], timeout: 1) + await fulfillment(of: [cancelled], timeout: 1) try await writer!.yield("message2") } @@ -546,7 +545,7 @@ final class NIOAsyncWriterTests: XCTestCase { let cancelled = expectation(description: "task cancelled") let task = Task { [writer] in - await XCTWaiter().fulfillment(of: [cancelled], timeout: 1) + await fulfillment(of: [cancelled], timeout: 1) try await writer!.yield("message1") } diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift index 888361a82eb..5d4f8d14432 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift @@ -611,7 +611,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let iterator = sequence.makeAsyncIterator() let element = try await iterator.next() resumed.fulfill() - await XCTWaiter().fulfillment(of: [cancelled], timeout: 1) + await fulfillment(of: [cancelled], timeout: 1) return element } @@ -655,7 +655,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let cancelled = expectation(description: "task cancelled") let task: Task = Task { - await XCTWaiter().fulfillment(of: [cancelled], timeout: 1) + await fulfillment(of: [cancelled], timeout: 1) let iterator = sequence.makeAsyncIterator() return try await iterator.next() } @@ -686,7 +686,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let cancelled = expectation(description: "task cancelled") let task: Task = Task { - await XCTWaiter().fulfillment(of: [cancelled], timeout: 1) + await fulfillment(of: [cancelled], timeout: 1) let iterator = sequence.makeAsyncIterator() return try await iterator.next() } @@ -879,13 +879,12 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { func testIteratorThrows_whenCancelled() async { _ = self.source.yield(contentsOf: Array(1...100)) - guard let sequence = self.sequence else { - return XCTFail("Expected to have an AsyncSequence") - } - await withThrowingTaskGroup(of: Void.self) { group in group.addTask { var itemsYieldedCounter = 0 + guard let sequence = self.sequence else { + return XCTFail("Expected to have an AsyncSequence") + } do { for try await next in sequence { diff --git a/Tests/NIOCoreTests/ByteBufferTest.swift b/Tests/NIOCoreTests/ByteBufferTest.swift index 3826b7e1c4b..052fb9ee29a 100644 --- a/Tests/NIOCoreTests/ByteBufferTest.swift +++ b/Tests/NIOCoreTests/ByteBufferTest.swift @@ -12,7 +12,6 @@ // //===----------------------------------------------------------------------===// -import Atomics import NIOFoundationCompat import XCTest import _NIOBase64 @@ -2441,8 +2440,8 @@ class ByteBufferTest: XCTestCase { } func testReserveCapacityLargerUniquelyReferencedCallsRealloc() throws { - testReserveCapacityLarger_reallocCount.store(0, ordering: .sequentiallyConsistent) - testReserveCapacityLarger_mallocCount.store(0, ordering: .sequentiallyConsistent) + testReserveCapacityLarger_reallocCount = 0 + testReserveCapacityLarger_mallocCount = 0 let alloc = ByteBufferAllocator( hookedMalloc: testReserveCapacityLarger_mallocHook, @@ -2454,17 +2453,17 @@ class ByteBufferTest: XCTestCase { let oldCapacity = buf.capacity - XCTAssertEqual(testReserveCapacityLarger_mallocCount.load(ordering: .sequentiallyConsistent), 1) - XCTAssertEqual(testReserveCapacityLarger_reallocCount.load(ordering: .sequentiallyConsistent), 0) + XCTAssertEqual(testReserveCapacityLarger_mallocCount, 1) + XCTAssertEqual(testReserveCapacityLarger_reallocCount, 0) buf.reserveCapacity(32) - XCTAssertEqual(testReserveCapacityLarger_mallocCount.load(ordering: .sequentiallyConsistent), 1) - XCTAssertEqual(testReserveCapacityLarger_reallocCount.load(ordering: .sequentiallyConsistent), 1) + XCTAssertEqual(testReserveCapacityLarger_mallocCount, 1) + XCTAssertEqual(testReserveCapacityLarger_reallocCount, 1) XCTAssertNotEqual(buf.capacity, oldCapacity) } func testReserveCapacityLargerMultipleReferenceCallsMalloc() throws { - testReserveCapacityLarger_reallocCount.store(0, ordering: .sequentiallyConsistent) - testReserveCapacityLarger_mallocCount.store(0, ordering: .sequentiallyConsistent) + testReserveCapacityLarger_reallocCount = 0 + testReserveCapacityLarger_mallocCount = 0 let alloc = ByteBufferAllocator( hookedMalloc: testReserveCapacityLarger_mallocHook, @@ -2481,11 +2480,11 @@ class ByteBufferTest: XCTestCase { UInt(bitPattern: $0.baseAddress!) } - XCTAssertEqual(testReserveCapacityLarger_mallocCount.load(ordering: .sequentiallyConsistent), 1) - XCTAssertEqual(testReserveCapacityLarger_reallocCount.load(ordering: .sequentiallyConsistent), 0) + XCTAssertEqual(testReserveCapacityLarger_mallocCount, 1) + XCTAssertEqual(testReserveCapacityLarger_reallocCount, 0) buf.reserveCapacity(32) - XCTAssertEqual(testReserveCapacityLarger_mallocCount.load(ordering: .sequentiallyConsistent), 2) - XCTAssertEqual(testReserveCapacityLarger_reallocCount.load(ordering: .sequentiallyConsistent), 0) + XCTAssertEqual(testReserveCapacityLarger_mallocCount, 2) + XCTAssertEqual(testReserveCapacityLarger_reallocCount, 0) let newPtrVal = buf.withVeryUnsafeBytes { UInt(bitPattern: $0.baseAddress!) @@ -3355,15 +3354,7 @@ private enum AllocationExpectationState: Int { case freeDone } -private let _testAllocationOfReallyBigByteBuffer_state = ManagedAtomic(AllocationExpectationState.begin.rawValue) -private var testAllocationOfReallyBigByteBuffer_state: AllocationExpectationState { - get { - .init(rawValue: _testAllocationOfReallyBigByteBuffer_state.load(ordering: .acquiring))! - } - set { - _testAllocationOfReallyBigByteBuffer_state.store(newValue.rawValue, ordering: .releasing) - } -} +private var testAllocationOfReallyBigByteBuffer_state = AllocationExpectationState.begin private func testAllocationOfReallyBigByteBuffer_freeHook(_ ptr: UnsafeMutableRawPointer?) { precondition(AllocationExpectationState.reallocDone == testAllocationOfReallyBigByteBuffer_state) testAllocationOfReallyBigByteBuffer_state = .freeDone @@ -3396,14 +3387,14 @@ private func testAllocationOfReallyBigByteBuffer_memcpyHook( // not actually doing any copies } -private let testReserveCapacityLarger_reallocCount = ManagedAtomic(0) -private let testReserveCapacityLarger_mallocCount = ManagedAtomic(0) +private var testReserveCapacityLarger_reallocCount = 0 +private var testReserveCapacityLarger_mallocCount = 0 private func testReserveCapacityLarger_freeHook(_ ptr: UnsafeMutableRawPointer) { free(ptr) } private func testReserveCapacityLarger_mallocHook(_ size: Int) -> UnsafeMutableRawPointer? { - testReserveCapacityLarger_mallocCount.wrappingIncrement(ordering: .sequentiallyConsistent) + testReserveCapacityLarger_mallocCount += 1 return malloc(size) } @@ -3411,7 +3402,7 @@ private func testReserveCapacityLarger_reallocHook( _ ptr: UnsafeMutableRawPointer?, _ count: Int ) -> UnsafeMutableRawPointer? { - testReserveCapacityLarger_reallocCount.wrappingIncrement(ordering: .sequentiallyConsistent) + testReserveCapacityLarger_reallocCount += 1 return realloc(ptr, count) } diff --git a/Tests/NIOCoreTests/ChannelOptionStorageTest.swift b/Tests/NIOCoreTests/ChannelOptionStorageTest.swift index 399df6b9ca8..be568e696c1 100644 --- a/Tests/NIOCoreTests/ChannelOptionStorageTest.swift +++ b/Tests/NIOCoreTests/ChannelOptionStorageTest.swift @@ -12,7 +12,6 @@ // //===----------------------------------------------------------------------===// -import NIOConcurrencyHelpers import NIOCore import NIOEmbedded import XCTest @@ -111,16 +110,8 @@ class ChannelOptionStorageTest: XCTestCase { } } -final class OptionsCollectingChannel: Channel { - private let _allOptions = NIOLockedValueBox<[(any Sendable, any Sendable)]>([]) - var allOptions: [(any Sendable, any Sendable)] { - get { - self._allOptions.withLockedValue { $0 } - } - set { - self._allOptions.withLockedValue { $0 = newValue } - } - } +class OptionsCollectingChannel: Channel { + var allOptions: [(Any, Any)] = [] var allocator: ByteBufferAllocator { fatalError() } diff --git a/Tests/NIOCoreTests/DispatchQueue+WithFutureTest.swift b/Tests/NIOCoreTests/DispatchQueue+WithFutureTest.swift index 0809c115f40..e70a30f6ca4 100644 --- a/Tests/NIOCoreTests/DispatchQueue+WithFutureTest.swift +++ b/Tests/NIOCoreTests/DispatchQueue+WithFutureTest.swift @@ -12,7 +12,6 @@ // //===----------------------------------------------------------------------===// -import Atomics import Dispatch import NIOCore import NIOEmbedded @@ -31,7 +30,7 @@ class DispatchQueueWithFutureTest: XCTestCase { } let eventLoop = group.next() let sem = DispatchSemaphore(value: 0) - let nonBlockingRan = ManagedAtomic(false) + var nonBlockingRan = false let futureResult: EventLoopFuture = DispatchQueue.global().asyncWithFuture(eventLoop: eventLoop) { () -> String in sem.wait() // Block in callback @@ -39,12 +38,12 @@ class DispatchQueueWithFutureTest: XCTestCase { } futureResult.whenSuccess { value in XCTAssertEqual(value, "hello") - XCTAssertTrue(nonBlockingRan.load(ordering: .sequentiallyConsistent)) + XCTAssertTrue(nonBlockingRan) } let p2 = eventLoop.makePromise(of: Bool.self) p2.futureResult.whenSuccess { _ in - nonBlockingRan.store(true, ordering: .sequentiallyConsistent) + nonBlockingRan = true } p2.succeed(true) @@ -58,7 +57,7 @@ class DispatchQueueWithFutureTest: XCTestCase { } let eventLoop = group.next() let sem = DispatchSemaphore(value: 0) - let nonBlockingRan = ManagedAtomic(false) + var nonBlockingRan = false let futureResult: EventLoopFuture = DispatchQueue.global().asyncWithFuture(eventLoop: eventLoop) { () -> String in sem.wait() // Block in callback @@ -66,12 +65,12 @@ class DispatchQueueWithFutureTest: XCTestCase { } futureResult.whenFailure { err in XCTAssertEqual(err as! DispatchQueueTestError, DispatchQueueTestError.example) - XCTAssertTrue(nonBlockingRan.load(ordering: .sequentiallyConsistent)) + XCTAssertTrue(nonBlockingRan) } let p2 = eventLoop.makePromise(of: Bool.self) p2.futureResult.whenSuccess { _ in - nonBlockingRan.store(true, ordering: .sequentiallyConsistent) + nonBlockingRan = true } p2.succeed(true) diff --git a/Tests/NIOCoreTests/NIOCloseOnErrorHandlerTest.swift b/Tests/NIOCoreTests/NIOCloseOnErrorHandlerTest.swift index 4ed8a89ff70..16445957b2f 100644 --- a/Tests/NIOCoreTests/NIOCloseOnErrorHandlerTest.swift +++ b/Tests/NIOCoreTests/NIOCloseOnErrorHandlerTest.swift @@ -16,7 +16,7 @@ import NIOCore import NIOEmbedded import XCTest -final class DummyFailingHandler1: ChannelInboundHandler, Sendable { +final class DummyFailingHandler1: ChannelInboundHandler { typealias InboundIn = NIOAny struct DummyError1: Error {} diff --git a/Tests/NIOCoreTests/XCTest+Extensions.swift b/Tests/NIOCoreTests/XCTest+Extensions.swift index 0ae75cdb497..1717542d9ea 100644 --- a/Tests/NIOCoreTests/XCTest+Extensions.swift +++ b/Tests/NIOCoreTests/XCTest+Extensions.swift @@ -57,7 +57,7 @@ func assertNoThrowWithValue( func withTemporaryFile(content: String? = nil, _ body: (NIOCore.NIOFileHandle, String) throws -> T) throws -> T { let temporaryFilePath = "\(temporaryDirectory)/nio_\(UUID())" - _ = FileManager.default.createFile(atPath: temporaryFilePath, contents: content?.data(using: .utf8)) + FileManager.default.createFile(atPath: temporaryFilePath, contents: content?.data(using: .utf8)) defer { XCTAssertNoThrow(try FileManager.default.removeItem(atPath: temporaryFilePath)) } From 7d9246781843103e51a4243d39fd31c7dd55aa1f Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 20 Nov 2024 21:07:21 +0000 Subject: [PATCH 2/3] Revert "Clean up Sendability for ChannelInvoker (#2955)" This reverts commit de116b7eff35ce87e1e881f6086e32324ed91736. --- Sources/NIOCore/AsyncAwaitSupport.swift | 18 +-- Sources/NIOCore/Channel.swift | 77 ++++-------- Sources/NIOCore/ChannelInvoker.swift | 34 +---- Sources/NIOCore/ChannelPipeline.swift | 117 +++--------------- .../NIOCore/Docs.docc/swift-concurrency.md | 29 ----- Sources/NIOCore/NIOAny.swift | 3 - Sources/NIOEmbedded/AsyncTestingChannel.swift | 4 +- Sources/NIOEmbedded/Embedded.swift | 40 +----- Sources/NIOPerformanceTester/main.swift | 19 +-- Sources/NIOPosix/SocketChannel.swift | 2 +- Sources/NIOTestUtils/NIOHTTP1TestServer.swift | 50 +------- .../AsyncChannel/AsyncChannelTests.swift | 16 +-- .../AsyncTestingChannelTests.swift | 9 +- .../EmbeddedChannelTest.swift | 4 +- .../NIOHTTP1Tests/HTTPServerClientTest.swift | 68 ++-------- .../HTTPServerPipelineHandlerTest.swift | 96 +++++++------- .../HTTPServerProtocolErrorHandlerTest.swift | 2 +- .../HTTPServerUpgradeTests.swift | 50 ++++---- Tests/NIOPosixTests/BootstrapTest.swift | 2 +- Tests/NIOPosixTests/ChannelPipelineTest.swift | 2 +- Tests/NIOPosixTests/ChannelTests.swift | 28 ++--- Tests/NIOPosixTests/CodecTest.swift | 16 +-- .../NIOPosixTests/DatagramChannelTests.swift | 79 ++++++------ .../NIOPosixTests/EchoServerClientTest.swift | 16 +-- Tests/NIOPosixTests/EventLoopTest.swift | 2 +- Tests/NIOPosixTests/FileRegionTest.swift | 103 +++++---------- .../NIOPosixTests/IdleStateHandlerTest.swift | 4 +- .../UniversalBootstrapSupportTest.swift | 2 +- .../NIOHTTP1TestServerTest.swift | 58 +-------- 29 files changed, 271 insertions(+), 679 deletions(-) diff --git a/Sources/NIOCore/AsyncAwaitSupport.swift b/Sources/NIOCore/AsyncAwaitSupport.swift index 5152728c0d1..066d78cedae 100644 --- a/Sources/NIOCore/AsyncAwaitSupport.swift +++ b/Sources/NIOCore/AsyncAwaitSupport.swift @@ -86,9 +86,8 @@ extension Channel { /// - data: the data to write @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @inlinable - @preconcurrency - public func writeAndFlush(_ data: T) async throws { - try await self.writeAndFlush(data).get() + public func writeAndFlush(_ any: T) async throws { + try await self.writeAndFlush(any).get() } /// Set `option` to `value` on this `Channel`. @@ -148,11 +147,6 @@ extension ChannelOutboundInvoker { /// - file: The file this function was called in, for debugging purposes. /// - line: The line this function was called on, for debugging purposes. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @available( - *, - deprecated, - message: "NIOAny is not Sendable: avoid wrapping the value in NIOAny to silence this warning." - ) public func writeAndFlush(_ data: NIOAny, file: StaticString = #fileID, line: UInt = #line) async throws { try await self.writeAndFlush(data, file: file, line: line).get() } @@ -173,14 +167,8 @@ extension ChannelOutboundInvoker { /// - Parameters: /// - event: the event itself. /// - file: The file this function was called in, for debugging purposes. - /// - line: The line this function was called on, for debugging purposes. - @preconcurrency @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - public func triggerUserOutboundEvent( - _ event: Any & Sendable, - file: StaticString = #fileID, - line: UInt = #line - ) async throws { + public func triggerUserOutboundEvent(_ event: Any, file: StaticString = #fileID, line: UInt = #line) async throws { try await self.triggerUserOutboundEvent(event, file: file, line: line).get() } } diff --git a/Sources/NIOCore/Channel.swift b/Sources/NIOCore/Channel.swift index 8d0af1fec54..5e54413912a 100644 --- a/Sources/NIOCore/Channel.swift +++ b/Sources/NIOCore/Channel.swift @@ -145,30 +145,6 @@ public protocol Channel: AnyObject, ChannelOutboundInvoker, _NIOPreconcurrencySe /// The default implementation returns `nil`, and `Channel` implementations must opt in to /// support this behavior. var syncOptions: NIOSynchronousChannelOptions? { get } - - /// Write data into the `Channel`, automatically wrapping with `NIOAny`. - /// - /// - seealso: `ChannelOutboundInvoker.write`. - @preconcurrency - func write(_ any: T) -> EventLoopFuture - - /// Write data into the `Channel`, automatically wrapping with `NIOAny`. - /// - /// - seealso: `ChannelOutboundInvoker.write`. - @preconcurrency - func write(_ any: T, promise: EventLoopPromise?) - - /// Write and flush data into the `Channel`, automatically wrapping with `NIOAny`. - /// - /// - seealso: `ChannelOutboundInvoker.writeAndFlush`. - @preconcurrency - func writeAndFlush(_ any: T) -> EventLoopFuture - - /// Write and flush data into the `Channel`, automatically wrapping with `NIOAny`. - /// - /// - seealso: `ChannelOutboundInvoker.writeAndFlush`. - @preconcurrency - func writeAndFlush(_ any: T, promise: EventLoopPromise?) } extension Channel { @@ -201,36 +177,18 @@ extension Channel { pipeline.connect(to: address, promise: promise) } - @available( - *, - deprecated, - message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning." - ) public func write(_ data: NIOAny, promise: EventLoopPromise?) { pipeline.write(data, promise: promise) } - public func write(_ data: T, promise: EventLoopPromise?) { - pipeline.write(data, promise: promise) - } - public func flush() { pipeline.flush() } - @available( - *, - deprecated, - message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning." - ) public func writeAndFlush(_ data: NIOAny, promise: EventLoopPromise?) { pipeline.writeAndFlush(data, promise: promise) } - public func writeAndFlush(_ data: T, promise: EventLoopPromise?) { - pipeline.writeAndFlush(data, promise: promise) - } - public func read() { pipeline.read() } @@ -247,8 +205,7 @@ extension Channel { promise?.fail(ChannelError._operationUnsupported) } - @preconcurrency - public func triggerUserOutboundEvent(_ event: Any & Sendable, promise: EventLoopPromise?) { + public func triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise?) { pipeline.triggerUserOutboundEvent(event, promise: promise) } } @@ -256,24 +213,32 @@ extension Channel { /// Provides special extension to make writing data to the `Channel` easier by removing the need to wrap data in `NIOAny` manually. extension Channel { - /// Write data into the `Channel`. + /// Write data into the `Channel`, automatically wrapping with `NIOAny`. /// /// - seealso: `ChannelOutboundInvoker.write`. - @preconcurrency - public func write(_ any: T) -> EventLoopFuture { - let promise = self.eventLoop.makePromise(of: Void.self) - self.write(any, promise: promise) - return promise.futureResult + public func write(_ any: T) -> EventLoopFuture { + self.write(NIOAny(any)) } - /// Write and flush data into the `Channel`. + /// Write data into the `Channel`, automatically wrapping with `NIOAny`. + /// + /// - seealso: `ChannelOutboundInvoker.write`. + public func write(_ any: T, promise: EventLoopPromise?) { + self.write(NIOAny(any), promise: promise) + } + + /// Write and flush data into the `Channel`, automatically wrapping with `NIOAny`. + /// + /// - seealso: `ChannelOutboundInvoker.writeAndFlush`. + public func writeAndFlush(_ any: T) -> EventLoopFuture { + self.writeAndFlush(NIOAny(any)) + } + + /// Write and flush data into the `Channel`, automatically wrapping with `NIOAny`. /// /// - seealso: `ChannelOutboundInvoker.writeAndFlush`. - @preconcurrency - public func writeAndFlush(_ any: T) -> EventLoopFuture { - let promise = self.eventLoop.makePromise(of: Void.self) - self.writeAndFlush(any, promise: promise) - return promise.futureResult + public func writeAndFlush(_ any: T, promise: EventLoopPromise?) { + self.writeAndFlush(NIOAny(any), promise: promise) } } diff --git a/Sources/NIOCore/ChannelInvoker.swift b/Sources/NIOCore/ChannelInvoker.swift index 8d831cac34c..66fe6153334 100644 --- a/Sources/NIOCore/ChannelInvoker.swift +++ b/Sources/NIOCore/ChannelInvoker.swift @@ -45,11 +45,6 @@ public protocol ChannelOutboundInvoker { /// - data: the data to write /// - promise: the `EventLoopPromise` that will be notified once the operation completes, /// or `nil` if not interested in the outcome of the operation. - @available( - *, - deprecated, - message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning." - ) func write(_ data: NIOAny, promise: EventLoopPromise?) /// Flush data that was previously written via `write` to the remote peer. @@ -61,11 +56,6 @@ public protocol ChannelOutboundInvoker { /// - data: the data to write /// - promise: the `EventLoopPromise` that will be notified once the `write` operation completes, /// or `nil` if not interested in the outcome of the operation. - @available( - *, - deprecated, - message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning." - ) func writeAndFlush(_ data: NIOAny, promise: EventLoopPromise?) /// Signal that we want to read from the `Channel` once there is data ready. @@ -89,8 +79,7 @@ public protocol ChannelOutboundInvoker { /// - event: The event itself. /// - promise: the `EventLoopPromise` that will be notified once the operation completes, /// or `nil` if not interested in the outcome of the operation. - @preconcurrency - func triggerUserOutboundEvent(_ event: Any & Sendable, promise: EventLoopPromise?) + func triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise?) /// The `EventLoop` which is used by this `ChannelOutboundInvoker` for execution. var eventLoop: EventLoop { get } @@ -155,11 +144,6 @@ extension ChannelOutboundInvoker { /// - file: The file this function was called in, for debugging purposes. /// - line: The line this function was called on, for debugging purposes. /// - Returns: the future which will be notified once the operation completes. - @available( - *, - deprecated, - message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning." - ) public func write(_ data: NIOAny, file: StaticString = #fileID, line: UInt = #line) -> EventLoopFuture { let promise = makePromise(file: file, line: line) write(data, promise: promise) @@ -173,11 +157,6 @@ extension ChannelOutboundInvoker { /// - file: The file this function was called in, for debugging purposes. /// - line: The line this function was called on, for debugging purposes. /// - Returns: the future which will be notified once the `write` operation completes. - @available( - *, - deprecated, - message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning." - ) public func writeAndFlush(_ data: NIOAny, file: StaticString = #fileID, line: UInt = #line) -> EventLoopFuture { let promise = makePromise(file: file, line: line) @@ -206,9 +185,8 @@ extension ChannelOutboundInvoker { /// - file: The file this function was called in, for debugging purposes. /// - line: The line this function was called on, for debugging purposes. /// - Returns: the future which will be notified once the operation completes. - @preconcurrency public func triggerUserOutboundEvent( - _ event: Any & Sendable, + _ event: Any, file: StaticString = #fileID, line: UInt = #line ) -> EventLoopFuture { @@ -247,11 +225,6 @@ public protocol ChannelInboundInvoker { /// /// - Parameters: /// - data: the data that was read and is ready to be processed. - @available( - *, - deprecated, - message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning." - ) func fireChannelRead(_ data: NIOAny) /// Called once there is no more data to read immediately on a `Channel`. Any new data received will be handled later. @@ -280,8 +253,7 @@ public protocol ChannelInboundInvoker { /// /// - Parameters: /// - event: the event itself. - @preconcurrency - func fireUserInboundEventTriggered(_ event: Any & Sendable) + func fireUserInboundEventTriggered(_ event: Any) } /// A protocol that signals that outbound and inbound events are triggered by this invoker. diff --git a/Sources/NIOCore/ChannelPipeline.swift b/Sources/NIOCore/ChannelPipeline.swift index 3db4a88abd7..dd752192875 100644 --- a/Sources/NIOCore/ChannelPipeline.swift +++ b/Sources/NIOCore/ChannelPipeline.swift @@ -705,30 +705,12 @@ public final class ChannelPipeline: ChannelInvoker { } } - @available( - *, - deprecated, - message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning." - ) public func fireChannelRead(_ data: NIOAny) { if eventLoop.inEventLoop { - _fireChannelRead0(data) + fireChannelRead0(data) } else { - // This is unsafe, but necessary. - let unsafeTransfer = UnsafeTransfer(data) eventLoop.execute { - self._fireChannelRead0(unsafeTransfer.wrappedValue) - } - } - } - - @inlinable - public func fireChannelRead(_ data: T) { - if eventLoop.inEventLoop { - _fireChannelRead0(NIOAny(data)) - } else { - eventLoop.execute { - self._fireChannelRead0(NIOAny(data)) + self.fireChannelRead0(data) } } } @@ -753,8 +735,7 @@ public final class ChannelPipeline: ChannelInvoker { } } - @preconcurrency - public func fireUserInboundEventTriggered(_ event: Any & Sendable) { + public func fireUserInboundEventTriggered(_ event: Any) { if eventLoop.inEventLoop { fireUserInboundEventTriggered0(event) } else { @@ -804,58 +785,22 @@ public final class ChannelPipeline: ChannelInvoker { } } - @available( - *, - deprecated, - message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning." - ) public func write(_ data: NIOAny, promise: EventLoopPromise?) { if eventLoop.inEventLoop { - _write0(data, promise: promise) - } else { - // This is unsafe, but unavoidable. - let unsafeTransfer = UnsafeTransfer(data) - eventLoop.execute { - self._write0(unsafeTransfer.wrappedValue, promise: promise) - } - } - } - - @inlinable - public func write(_ data: T, promise: EventLoopPromise?) { - if eventLoop.inEventLoop { - _write0(NIOAny(data), promise: promise) + write0(data, promise: promise) } else { eventLoop.execute { - self._write0(NIOAny(data), promise: promise) + self.write0(data, promise: promise) } } } - @available( - *, - deprecated, - message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning." - ) public func writeAndFlush(_ data: NIOAny, promise: EventLoopPromise?) { if eventLoop.inEventLoop { - _writeAndFlush0(data, promise: promise) - } else { - // This is unsafe, but unavoidable. - let unsafeTransfer = UnsafeTransfer(data) - eventLoop.execute { - self._writeAndFlush0(unsafeTransfer.wrappedValue, promise: promise) - } - } - } - - @inlinable - public func writeAndFlush(_ data: T, promise: EventLoopPromise?) { - if eventLoop.inEventLoop { - _writeAndFlush0(NIOAny(data), promise: promise) + writeAndFlush0(data, promise: promise) } else { eventLoop.execute { - self._writeAndFlush0(NIOAny(data), promise: promise) + self.writeAndFlush0(data, promise: promise) } } } @@ -890,8 +835,7 @@ public final class ChannelPipeline: ChannelInvoker { } } - @preconcurrency - public func triggerUserOutboundEvent(_ event: Any & Sendable, promise: EventLoopPromise?) { + public func triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise?) { if eventLoop.inEventLoop { triggerUserOutboundEvent0(event, promise: promise) } else { @@ -931,7 +875,7 @@ public final class ChannelPipeline: ChannelInvoker { } } - @usableFromInline func _write0(_ data: NIOAny, promise: EventLoopPromise?) { + private func write0(_ data: NIOAny, promise: EventLoopPromise?) { if let firstOutboundCtx = firstOutboundCtx { firstOutboundCtx.invokeWrite(data, promise: promise) } else { @@ -939,7 +883,7 @@ public final class ChannelPipeline: ChannelInvoker { } } - @usableFromInline func _writeAndFlush0(_ data: NIOAny, promise: EventLoopPromise?) { + private func writeAndFlush0(_ data: NIOAny, promise: EventLoopPromise?) { if let firstOutboundCtx = firstOutboundCtx { firstOutboundCtx.invokeWriteAndFlush(data, promise: promise) } else { @@ -1003,7 +947,7 @@ public final class ChannelPipeline: ChannelInvoker { } } - @usableFromInline func _fireChannelRead0(_ data: NIOAny) { + private func fireChannelRead0(_ data: NIOAny) { if let firstInboundCtx = firstInboundCtx { firstInboundCtx.invokeChannelRead(data) } @@ -1366,7 +1310,7 @@ extension ChannelPipeline { /// This method should typically only be called by `Channel` implementations directly. public func fireChannelRead(_ data: NIOAny) { self.eventLoop.assertInEventLoop() - self._pipeline._fireChannelRead0(data) + self._pipeline.fireChannelRead0(data) } /// Fires `channelReadComplete` from the head to the tail. @@ -1430,17 +1374,7 @@ extension ChannelPipeline { /// This method should typically only be called by `Channel` implementations directly. public func write(_ data: NIOAny, promise: EventLoopPromise?) { self.eventLoop.assertInEventLoop() - self._pipeline._write0(data, promise: promise) - } - - /// Fires `write` from the tail to the head. - /// - /// This method should typically only be called by `Channel` implementations directly. - public func write(_ data: NIOAny) -> EventLoopFuture { - self.eventLoop.assertInEventLoop() - let promise = self.eventLoop.makePromise(of: Void.self) - self._pipeline._write0(data, promise: promise) - return promise.futureResult + self._pipeline.write0(data, promise: promise) } /// Fires `writeAndFlush` from the tail to the head. @@ -1448,17 +1382,7 @@ extension ChannelPipeline { /// This method should typically only be called by `Channel` implementations directly. public func writeAndFlush(_ data: NIOAny, promise: EventLoopPromise?) { self.eventLoop.assertInEventLoop() - self._pipeline._writeAndFlush0(data, promise: promise) - } - - /// Fires `writeAndFlush` from the tail to the head. - /// - /// This method should typically only be called by `Channel` implementations directly. - public func writeAndFlush(_ data: NIOAny) -> EventLoopFuture { - self.eventLoop.assertInEventLoop() - let promise = self.eventLoop.makePromise(of: Void.self) - self._pipeline._writeAndFlush0(data, promise: promise) - return promise.futureResult + self._pipeline.writeAndFlush0(data, promise: promise) } /// Fires `bind` from the tail to the head. @@ -1757,15 +1681,6 @@ public final class ChannelHandlerContext: ChannelInvoker { } /// Send a user event to the next inbound `ChannelHandler`. - /// - /// This method exists for compatiblity with ``ChannelInboundInvoker``. - @available(*, deprecated) - @_disfavoredOverload - public func fireUserInboundEventTriggered(_ event: Any & Sendable) { - self.next?.invokeUserInboundEventTriggered(event) - } - - /// Send a user event to the next inbound `ChannelHandler` from on the event loop. public func fireUserInboundEventTriggered(_ event: Any) { self.next?.invokeUserInboundEventTriggered(event) } @@ -1889,10 +1804,6 @@ public final class ChannelHandlerContext: ChannelInvoker { /// - event: The user event to send. /// - promise: The promise fulfilled when the user event has been sent or failed if it couldn't be sent. public func triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise?) { - self._triggerUserOutboundEvent(event, promise: promise) - } - - private func _triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise?) { if let outboundNext = self.prev { outboundNext.invokeTriggerUserOutboundEvent(event, promise: promise) } else { diff --git a/Sources/NIOCore/Docs.docc/swift-concurrency.md b/Sources/NIOCore/Docs.docc/swift-concurrency.md index 027e374bdf7..7198e182603 100644 --- a/Sources/NIOCore/Docs.docc/swift-concurrency.md +++ b/Sources/NIOCore/Docs.docc/swift-concurrency.md @@ -297,35 +297,6 @@ case .notUpgraded: } ``` -### NIOAny - -In NIO 2.77.0, a number of methods that took `NIOAny` as a parameter started -emitting deprecation warnings. These deprecation warnings are a substitute for the -concurrency warnings that you might otherwise see. - -The problem with these methods (most of which were defined on ``ChannelInvoker``) -is that they frequently would send a `NIOAny` across an event loop boundary. -Most commonly users will encounter this when calling methods on ``Channel`` types -(which conform to ``ChannelInvoker``), though they may encounter it on -``ChannelPipeline`` as well. - -The problem these methods have is that they can be safely called both on and off -of the ``EventLoop`` to which a ``Channel`` is bound. That means that they must be -capable of sending the value across an isolation domain, into the ``EventLoop``. -That requires the parameter to be `Sendable` (or to be `sending`). - -`NIOAny` cannot be made to be `Sendable`, so these methods are now deprecated. -They have been replaced with equivalent methods that take a generic type that -must be `Sendable`, and they take charge of wrapping the type in `NIOAny`. If -you encounter such a warning, this is the most common change. - -In cases where a non-`Sendable` value must actually be sent into the pipeline, there -are a few methods that can still be used. These methods are available on -``ChannelPipeline/SynchronousOperations``, which can be accessed via -``ChannelPipeline/syncOperations``. The ``ChannelPipeline/SynchronousOperations`` type -can only be accessed from on the `EventLoop`, and so no sending of a value -across isolation domains will occur here. - ### General guidance #### Where should your code live? diff --git a/Sources/NIOCore/NIOAny.swift b/Sources/NIOCore/NIOAny.swift index e342c568699..ffbcc37b59a 100644 --- a/Sources/NIOCore/NIOAny.swift +++ b/Sources/NIOCore/NIOAny.swift @@ -267,9 +267,6 @@ public struct NIOAny { } } -@available(*, unavailable) -extension NIOAny._NIOAny: Sendable {} - @available(*, unavailable) extension NIOAny: Sendable {} diff --git a/Sources/NIOEmbedded/AsyncTestingChannel.swift b/Sources/NIOEmbedded/AsyncTestingChannel.swift index 75889e5c37f..184bee7932f 100644 --- a/Sources/NIOEmbedded/AsyncTestingChannel.swift +++ b/Sources/NIOEmbedded/AsyncTestingChannel.swift @@ -477,7 +477,7 @@ public final class NIOAsyncTestingChannel: Channel { @inlinable @discardableResult public func writeInbound(_ data: T) async throws -> BufferState { try await self.testingEventLoop.executeInContext { - self.pipeline.fireChannelRead(data) + self.pipeline.fireChannelRead(NIOAny(data)) self.pipeline.fireChannelReadComplete() try self._throwIfErrorCaught() return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(self.channelcore.inboundBuffer) @@ -496,7 +496,7 @@ public final class NIOAsyncTestingChannel: Channel { // all the way. @inlinable @discardableResult public func writeOutbound(_ data: T) async throws -> BufferState { - try await self.writeAndFlush(data) + try await self.writeAndFlush(NIOAny(data)) return try await self.testingEventLoop.executeInContext { self.channelcore.outboundBuffer.isEmpty ? .empty : .full(self.channelcore.outboundBuffer) diff --git a/Sources/NIOEmbedded/Embedded.swift b/Sources/NIOEmbedded/Embedded.swift index c509a8ffa35..801e428b7cc 100644 --- a/Sources/NIOEmbedded/Embedded.swift +++ b/Sources/NIOEmbedded/Embedded.swift @@ -856,7 +856,7 @@ public final class EmbeddedChannel: Channel { @inlinable @discardableResult public func writeInbound(_ data: T) throws -> BufferState { self.embeddedEventLoop.checkCorrectThread() - self.pipeline.fireChannelRead(data) + self.pipeline.fireChannelRead(NIOAny(data)) self.pipeline.fireChannelReadComplete() try self.throwIfErrorCaught() return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.inboundBuffer)) @@ -875,7 +875,7 @@ public final class EmbeddedChannel: Channel { @inlinable @discardableResult public func writeOutbound(_ data: T) throws -> BufferState { self.embeddedEventLoop.checkCorrectThread() - try self.writeAndFlush(data).wait() + try self.writeAndFlush(NIOAny(data)).wait() return self.channelcore.outboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.outboundBuffer)) } @@ -1012,42 +1012,6 @@ public final class EmbeddedChannel: Channel { } self.pipeline.connect(to: address, promise: promise) } - - /// An overload of `Channel.write` that does not require a Sendable type, as ``EmbeddedEventLoop`` - /// is bound to a single thread. - @inlinable - public func write(_ data: T, promise: EventLoopPromise?) { - self.embeddedEventLoop.checkCorrectThread() - self.pipeline.syncOperations.write(NIOAny(data), promise: promise) - } - - /// An overload of `Channel.write` that does not require a Sendable type, as ``EmbeddedEventLoop`` - /// is bound to a single thread. - @inlinable - public func write(_ data: T) -> EventLoopFuture { - self.embeddedEventLoop.checkCorrectThread() - let promise = self.eventLoop.makePromise(of: Void.self) - self.pipeline.syncOperations.write(NIOAny(data), promise: promise) - return promise.futureResult - } - - /// An overload of `Channel.writeAndFlush` that does not require a Sendable type, as ``EmbeddedEventLoop`` - /// is bound to a single thread. - @inlinable - public func writeAndFlush(_ data: T, promise: EventLoopPromise?) { - self.embeddedEventLoop.checkCorrectThread() - self.pipeline.syncOperations.writeAndFlush(NIOAny(data), promise: promise) - } - - /// An overload of `Channel.writeAndFlush` that does not require a Sendable type, as ``EmbeddedEventLoop`` - /// is bound to a single thread. - @inlinable - public func writeAndFlush(_ data: T) -> EventLoopFuture { - self.embeddedEventLoop.checkCorrectThread() - let promise = self.eventLoop.makePromise(of: Void.self) - self.pipeline.syncOperations.writeAndFlush(NIOAny(data), promise: promise) - return promise.futureResult - } } extension EmbeddedChannel { diff --git a/Sources/NIOPerformanceTester/main.swift b/Sources/NIOPerformanceTester/main.swift index 13fc75222e0..644246073bd 100644 --- a/Sources/NIOPerformanceTester/main.swift +++ b/Sources/NIOPerformanceTester/main.swift @@ -664,12 +664,8 @@ measureAndPrint(desc: "http1_1k_reqs_1_conn") { .connect(to: serverChannel.localAddress!) .wait() - try! clientChannel.eventLoop.flatSubmit { - let promise = clientChannel.eventLoop.makePromise(of: Void.self) - clientChannel.pipeline.syncOperations.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) - clientChannel.pipeline.syncOperations.writeAndFlush(NIOAny(HTTPClientRequestPart.end(nil)), promise: promise) - return promise.futureResult - }.wait() + clientChannel.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) + try! clientChannel.writeAndFlush(NIOAny(HTTPClientRequestPart.end(nil))).wait() return try! repeatedRequestsHandler.wait() } @@ -691,15 +687,8 @@ measureAndPrint(desc: "http1_1k_reqs_100_conns") { .connect(to: serverChannel.localAddress!) .wait() - try! clientChannel.eventLoop.flatSubmit { - let promise = clientChannel.eventLoop.makePromise(of: Void.self) - clientChannel.pipeline.syncOperations.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) - clientChannel.pipeline.syncOperations.writeAndFlush( - NIOAny(HTTPClientRequestPart.end(nil)), - promise: promise - ) - return promise.futureResult - }.wait() + clientChannel.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) + try! clientChannel.writeAndFlush(NIOAny(HTTPClientRequestPart.end(nil))).wait() reqs.append(try! repeatedRequestsHandler.wait()) } return reqs.reduce(0, +) / numConns diff --git a/Sources/NIOPosix/SocketChannel.swift b/Sources/NIOPosix/SocketChannel.swift index e1a9a473c06..860ca81e250 100644 --- a/Sources/NIOPosix/SocketChannel.swift +++ b/Sources/NIOPosix/SocketChannel.swift @@ -825,7 +825,7 @@ final class DatagramChannel: BaseSocketChannel { var messageIterator = results.makeIterator() while self.isActive, let message = messageIterator.next() { - pipeline.fireChannelRead(message) + pipeline.fireChannelRead(NIOAny(message)) } readResult = .some diff --git a/Sources/NIOTestUtils/NIOHTTP1TestServer.swift b/Sources/NIOTestUtils/NIOHTTP1TestServer.swift index d3dbb15f6d3..acac2090399 100644 --- a/Sources/NIOTestUtils/NIOHTTP1TestServer.swift +++ b/Sources/NIOTestUtils/NIOHTTP1TestServer.swift @@ -17,50 +17,6 @@ import NIOCore import NIOHTTP1 import NIOPosix -typealias SendableHTTPServerResponsePart = HTTPPart - -extension HTTPServerResponsePart { - init(_ target: SendableHTTPServerResponsePart) { - switch target { - case .head(let head): - self = .head(head) - case .body(let body): - self = .body(.byteBuffer(body)) - case .end(let end): - self = .end(end) - } - } -} - -extension SendableHTTPServerResponsePart { - init(_ target: HTTPServerResponsePart) throws { - switch target { - case .head(let head): - self = .head(head) - case .body(.byteBuffer(let body)): - self = .body(body) - case .body(.fileRegion): - throw NIOHTTP1TestServerError( - reason: "FileRegion is not Sendable and cannot be passed across concurrency domains" - ) - case .end(let end): - self = .end(end) - } - } -} - -/// A helper handler to transform a Sendable response into a -/// non-Sendable one, to manage warnings. -private final class TransformerHandler: ChannelOutboundHandler { - typealias OutboundIn = SendableHTTPServerResponsePart - typealias OutboundOut = HTTPServerResponsePart - - func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { - let response = self.unwrapOutboundIn(data) - context.write(self.wrapOutboundOut(.init(response)), promise: promise) - } -} - private final class BlockingQueue { private let condition = ConditionLock(value: false) private var buffer = CircularBuffer>() @@ -269,8 +225,6 @@ public final class NIOHTTP1TestServer { } }.flatMap { channel.pipeline.addHandler(WebServerHandler(webServer: self)) - }.flatMap { - channel.pipeline.addHandler(TransformerHandler()) }.whenSuccess { _ = channel.setOption(.autoRead, value: true) } @@ -351,11 +305,9 @@ extension NIOHTTP1TestServer { public func writeOutbound(_ data: HTTPServerResponsePart) throws { self.eventLoop.assertNotInEventLoop() - - let transformed = try SendableHTTPServerResponsePart(data) try self.eventLoop.flatSubmit { () -> EventLoopFuture in if let channel = self.currentClientChannel { - return channel.writeAndFlush(transformed) + return channel.writeAndFlush(data) } else { return self.eventLoop.makeFailedFuture(ChannelError.ioOnClosedChannel) } diff --git a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift index c1473722a45..182b2907067 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift @@ -292,9 +292,9 @@ final class AsyncChannelTests: XCTestCase { // Push 3 elements into the buffer. Reads continue to work. try await channel.testingEventLoop.executeInContext { - channel.pipeline.fireChannelRead(()) - channel.pipeline.fireChannelRead(()) - channel.pipeline.fireChannelRead(()) + channel.pipeline.fireChannelRead(NIOAny(())) + channel.pipeline.fireChannelRead(NIOAny(())) + channel.pipeline.fireChannelRead(NIOAny(())) channel.pipeline.fireChannelReadComplete() channel.pipeline.read() @@ -305,7 +305,7 @@ final class AsyncChannelTests: XCTestCase { // Add one more element into the buffer. This should flip our backpressure mode, and the reads should now be delayed. try await channel.testingEventLoop.executeInContext { - channel.pipeline.fireChannelRead(()) + channel.pipeline.fireChannelRead(NIOAny(())) channel.pipeline.fireChannelReadComplete() channel.pipeline.read() @@ -316,7 +316,7 @@ final class AsyncChannelTests: XCTestCase { // More elements don't help. try await channel.testingEventLoop.executeInContext { - channel.pipeline.fireChannelRead(()) + channel.pipeline.fireChannelRead(NIOAny(())) channel.pipeline.fireChannelReadComplete() channel.pipeline.read() @@ -345,7 +345,7 @@ final class AsyncChannelTests: XCTestCase { channel.pipeline.read() channel.pipeline.read() - channel.pipeline.fireChannelRead(()) + channel.pipeline.fireChannelRead(NIOAny(())) channel.pipeline.fireChannelReadComplete() channel.pipeline.read() @@ -357,8 +357,8 @@ final class AsyncChannelTests: XCTestCase { // The next reads arriving pushes us past the limit again. // This time we won't read. try await channel.testingEventLoop.executeInContext { - channel.pipeline.fireChannelRead(()) - channel.pipeline.fireChannelRead(()) + channel.pipeline.fireChannelRead(NIOAny(())) + channel.pipeline.fireChannelRead(NIOAny(())) channel.pipeline.fireChannelReadComplete() } XCTAssertEqual(readCounter.readCount, 13) diff --git a/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift b/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift index 1619f95638e..0dea56abc51 100644 --- a/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift +++ b/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift @@ -394,10 +394,17 @@ class AsyncTestingChannelTests: XCTestCase { let channel = NIOAsyncTestingChannel() let buffer = ByteBufferAllocator().buffer(capacity: 5) let socketAddress = try SocketAddress(unixDomainSocketPath: "path") - + let handle = NIOFileHandle(descriptor: 1) + let fileRegion = FileRegion(fileHandle: handle, readerIndex: 1, endIndex: 2) + defer { + // fake descriptor, so shouldn't be closed. + XCTAssertNoThrow(try handle.takeDescriptorOwnership()) + } try await channel.writeAndFlush(1) try await channel.writeAndFlush("1") try await channel.writeAndFlush(buffer) + try await channel.writeAndFlush(IOData.byteBuffer(buffer)) + try await channel.writeAndFlush(IOData.fileRegion(fileRegion)) try await channel.writeAndFlush(AddressedEnvelope(remoteAddress: socketAddress, data: buffer)) } diff --git a/Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift b/Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift index ce5e1500775..c08ecb5c437 100644 --- a/Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift +++ b/Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift @@ -396,8 +396,8 @@ class EmbeddedChannelTest: XCTestCase { try channel.writeAndFlush(1).wait() try channel.writeAndFlush("1").wait() try channel.writeAndFlush(buffer).wait() - try channel.writeOutbound(IOData.byteBuffer(buffer)) - try channel.writeOutbound(IOData.fileRegion(fileRegion)) + try channel.writeAndFlush(IOData.byteBuffer(buffer)).wait() + try channel.writeAndFlush(IOData.fileRegion(fileRegion)).wait() try channel.writeAndFlush(AddressedEnvelope(remoteAddress: socketAddress, data: buffer)).wait() } diff --git a/Tests/NIOHTTP1Tests/HTTPServerClientTest.swift b/Tests/NIOHTTP1Tests/HTTPServerClientTest.swift index a9f56d4adbb..59ff595102c 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerClientTest.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerClientTest.swift @@ -403,15 +403,8 @@ class HTTPServerClientTest: XCTestCase { var head = HTTPRequestHead(version: httpVersion, method: .GET, uri: uri) head.headers.add(name: "Host", value: "apple.com") - try clientChannel.eventLoop.flatSubmit { - let promise = clientChannel.eventLoop.makePromise(of: Void.self) - clientChannel.pipeline.syncOperations.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) - clientChannel.pipeline.syncOperations.writeAndFlush( - NIOAny(HTTPClientRequestPart.end(nil)), - promise: promise - ) - return promise.futureResult - }.wait() + clientChannel.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) + try clientChannel.writeAndFlush(NIOAny(HTTPClientRequestPart.end(nil))).wait() accumulation.syncWaitForCompletion() } @@ -471,15 +464,8 @@ class HTTPServerClientTest: XCTestCase { var head = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/count-to-ten") head.headers.add(name: "Host", value: "apple.com") - try clientChannel.eventLoop.flatSubmit { - let promise = clientChannel.eventLoop.makePromise(of: Void.self) - clientChannel.pipeline.syncOperations.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) - clientChannel.pipeline.syncOperations.writeAndFlush( - NIOAny(HTTPClientRequestPart.end(nil)), - promise: promise - ) - return promise.futureResult - }.wait() + clientChannel.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) + try clientChannel.writeAndFlush(NIOAny(HTTPClientRequestPart.end(nil))).wait() accumulation.syncWaitForCompletion() } @@ -537,15 +523,8 @@ class HTTPServerClientTest: XCTestCase { var head = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/zero-length-body-part") head.headers.add(name: "Host", value: "apple.com") - try clientChannel.eventLoop.flatSubmit { - let promise = clientChannel.eventLoop.makePromise(of: Void.self) - clientChannel.pipeline.syncOperations.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) - clientChannel.pipeline.syncOperations.writeAndFlush( - NIOAny(HTTPClientRequestPart.end(nil)), - promise: promise - ) - return promise.futureResult - }.wait() + clientChannel.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) + try clientChannel.writeAndFlush(NIOAny(HTTPClientRequestPart.end(nil))).wait() accumulation.syncWaitForCompletion() } @@ -602,15 +581,8 @@ class HTTPServerClientTest: XCTestCase { var head = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/trailers") head.headers.add(name: "Host", value: "apple.com") - try clientChannel.eventLoop.flatSubmit { - let promise = clientChannel.eventLoop.makePromise(of: Void.self) - clientChannel.pipeline.syncOperations.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) - clientChannel.pipeline.syncOperations.writeAndFlush( - NIOAny(HTTPClientRequestPart.end(nil)), - promise: promise - ) - return promise.futureResult - }.wait() + clientChannel.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) + try clientChannel.writeAndFlush(NIOAny(HTTPClientRequestPart.end(nil))).wait() accumulation.syncWaitForCompletion() } @@ -668,7 +640,7 @@ class HTTPServerClientTest: XCTestCase { var buffer = clientChannel.allocator.buffer(capacity: numBytes) buffer.writeStaticString("GET /massive-response HTTP/1.1\r\nHost: nio.net\r\n\r\n") - try clientChannel.writeAndFlush(buffer).wait() + try clientChannel.writeAndFlush(NIOAny(buffer)).wait() accumulation.syncWaitForCompletion() } @@ -715,15 +687,8 @@ class HTTPServerClientTest: XCTestCase { var head = HTTPRequestHead(version: .http1_1, method: .HEAD, uri: "/head") head.headers.add(name: "Host", value: "apple.com") - try clientChannel.eventLoop.flatSubmit { - let promise = clientChannel.eventLoop.makePromise(of: Void.self) - clientChannel.pipeline.syncOperations.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) - clientChannel.pipeline.syncOperations.writeAndFlush( - NIOAny(HTTPClientRequestPart.end(nil)), - promise: promise - ) - return promise.futureResult - }.wait() + clientChannel.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) + try clientChannel.writeAndFlush(NIOAny(HTTPClientRequestPart.end(nil))).wait() accumulation.syncWaitForCompletion() } @@ -769,15 +734,8 @@ class HTTPServerClientTest: XCTestCase { var head = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/204") head.headers.add(name: "Host", value: "apple.com") - try clientChannel.eventLoop.flatSubmit { - let promise = clientChannel.eventLoop.makePromise(of: Void.self) - clientChannel.pipeline.syncOperations.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) - clientChannel.pipeline.syncOperations.writeAndFlush( - NIOAny(HTTPClientRequestPart.end(nil)), - promise: promise - ) - return promise.futureResult - }.wait() + clientChannel.write(NIOAny(HTTPClientRequestPart.head(head)), promise: nil) + try clientChannel.writeAndFlush(NIOAny(HTTPClientRequestPart.end(nil))).wait() accumulation.syncWaitForCompletion() } diff --git a/Tests/NIOHTTP1Tests/HTTPServerPipelineHandlerTest.swift b/Tests/NIOHTTP1Tests/HTTPServerPipelineHandlerTest.swift index 15144e8f2bb..7029e2f73e0 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerPipelineHandlerTest.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerPipelineHandlerTest.swift @@ -173,8 +173,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { ) // Unblock by sending a response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) // Two requests should have made it through now. XCTAssertEqual( @@ -188,8 +188,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { ) // Now send the last response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) // Now all three. XCTAssertEqual( @@ -221,8 +221,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertEqual(self.readCounter.readCount, 1) // Send a response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) // This should have automatically triggered a call to read(), but only one. XCTAssertEqual(self.readCounter.readCount, 2) @@ -246,8 +246,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertEqual(self.readCounter.readCount, 1) // Send a response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) // This should have not triggered a call to read. XCTAssertEqual(self.readCounter.readCount, 1) @@ -258,8 +258,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertEqual(self.readCounter.readCount, 1) // Now send in the last response, and see the read go through. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) XCTAssertEqual(self.readCounter.readCount, 2) } @@ -273,8 +273,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertEqual(self.readCounter.readCount, 1) // Now the server sends a response immediately. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) // We're still moving forward and can read. XCTAssertEqual(self.readCounter.readCount, 1) @@ -310,8 +310,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { ) // Unblock by sending a response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) // Two requests should have made it through now. XCTAssertEqual( @@ -325,8 +325,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { ) // Now send the last response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) // Now the half-closure should be delivered. XCTAssertEqual( @@ -362,8 +362,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { ) // Unblock by sending a response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) // The second request head, followed by the half-close, should have made it through. XCTAssertEqual( @@ -396,15 +396,15 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertEqual(self.readCounter.readCount, 1) // Send a response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) // This should have not triggered a call to read. XCTAssertEqual(self.readCounter.readCount, 1) // Now send in the last response. This should also not issue a read. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) XCTAssertEqual(self.readCounter.readCount, 1) } @@ -425,8 +425,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { ) // Unblock by sending a response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) // Two requests should have made it through now. Still no half-closure. XCTAssertEqual( @@ -558,7 +558,7 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertEqual(.req2HeadExpected, handler.state) // finish 1st request, that will send through the 2nd one which will then write the 'req_boom' request - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) XCTAssertEqual(.done, handler.state) } @@ -585,8 +585,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { ) // Now send a response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) // No further events should have happened. XCTAssertEqual( @@ -616,8 +616,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertTrue(self.channel.isActive) // Now send a response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) // still missing the request .end XCTAssertTrue(self.channel.isActive) @@ -656,8 +656,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertTrue(self.channel.isActive) // Now send a response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) XCTAssertFalse(self.channel.isActive) @@ -689,14 +689,14 @@ class HTTPServerPipelineHandlerTest: XCTestCase { ) // Now send the response .head. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) XCTAssertTrue(self.channel.isActive) self.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent()) XCTAssertTrue(self.channel.isActive) // Now send the response .end. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) XCTAssertFalse(self.channel.isActive) XCTAssertEqual( @@ -721,7 +721,7 @@ class HTTPServerPipelineHandlerTest: XCTestCase { ) // Now send the response .head. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) XCTAssertTrue(self.channel.isActive) self.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent()) @@ -740,7 +740,7 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertTrue(self.channel.isActive) // Response .end. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) XCTAssertFalse(self.channel.isActive) XCTAssertEqual( @@ -767,14 +767,14 @@ class HTTPServerPipelineHandlerTest: XCTestCase { ) // Now send the response .head. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) XCTAssertTrue(self.channel.isActive) self.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent()) XCTAssertTrue(self.channel.isActive) // Response .end. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) XCTAssertTrue(self.channel.isActive) // Request .end. @@ -825,8 +825,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertTrue(self.channel.isActive) // Now send a response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) XCTAssertFalse(self.channel.isActive) @@ -999,7 +999,7 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertEqual(self.readCounter.readCount, 1) // Send a partial response, which should not trigger a read. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) XCTAssertEqual(self.readCounter.readCount, 1) // Remove the handler. @@ -1136,14 +1136,14 @@ class HTTPServerPipelineHandlerTest: XCTestCase { continueResponse!.status = .continue // Now the server sends a continue response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(continueResponse!))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(continueResponse!)).wait()) // The client response completes. XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.end(nil))) // Now the server sends the final response. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) } func testServerCanRespondProcessingMultipleTimes() throws { @@ -1160,7 +1160,7 @@ class HTTPServerPipelineHandlerTest: XCTestCase { processResponse.status = .processing // Now the server sends multiple processing responses. - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(processResponse))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(processResponse)).wait()) // We are processing... Reading not allowed XCTAssertEqual(self.readCounter.readCount, 0) @@ -1168,7 +1168,7 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertEqual(self.readCounter.readCount, 0) // Continue processing... - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(processResponse))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(processResponse)).wait()) // We are processing... Reading not allowed XCTAssertEqual(self.readCounter.readCount, 0) @@ -1176,7 +1176,7 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertEqual(self.readCounter.readCount, 0) // Continue processing... - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(processResponse))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(processResponse)).wait()) // We are processing... Reading not allowed XCTAssertEqual(self.readCounter.readCount, 0) @@ -1184,8 +1184,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase { XCTAssertEqual(self.readCounter.readCount, 0) // Now send the actual response! - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.head(self.responseHead))) - XCTAssertNoThrow(try channel.writeOutbound(HTTPServerResponsePart.end(nil))) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait()) + XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait()) // This should have triggered a read XCTAssertEqual(self.readCounter.readCount, 1) diff --git a/Tests/NIOHTTP1Tests/HTTPServerProtocolErrorHandlerTest.swift b/Tests/NIOHTTP1Tests/HTTPServerProtocolErrorHandlerTest.swift index 3d06bd1580a..93fcaf05fea 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerProtocolErrorHandlerTest.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerProtocolErrorHandlerTest.swift @@ -93,7 +93,7 @@ class HTTPServerProtocolErrorHandlerTest: XCTestCase { headers: .init([("Content-Length", "0")]) ) ) - XCTAssertNoThrow(try channel.writeOutbound(res)) + XCTAssertNoThrow(try channel.writeAndFlush(res).wait()) // now we have started a response but it's not complete yet, let's inject a parser error channel.pipeline.fireErrorCaught(HTTPParserError.invalidEOFState) var allOutbound = try channel.readAllOutboundBuffers() diff --git a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift index 3a69f806e35..f68edaa6445 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift @@ -473,7 +473,7 @@ private class ReentrantReadOnChannelReadCompleteHandler: ChannelInboundHandler { let data = context.channel.allocator.buffer(string: "re-entrant read from channelReadComplete!") // Please never do this. - context.channel.pipeline.fireChannelRead(data) + context.channel.pipeline.fireChannelRead(NIOAny(data)) } context.fireChannelReadComplete() } @@ -519,7 +519,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { } let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // At this time the channel pipeline should not contain our handler: it should have removed itself. try connectedServer.pipeline.waitForUpgraderToBeRemoved() @@ -540,7 +540,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { // This request fires a subsequent upgrade in immediately. It should also be ignored. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\n\r\nOPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nConnection: upgrade\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // At this time the channel pipeline should not contain our handler: it should have removed itself. try connectedServer.pipeline.waitForUpgraderToBeRemoved() @@ -614,7 +614,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { // This request is safe to upgrade. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Let the machinery do its thing. XCTAssertNoThrow(try completePromise.futureResult.wait()) @@ -641,7 +641,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { } let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nConnection: upgrade\r\nUpgrade: myproto\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // At this time the channel pipeline should not contain our handler: it should have removed itself. try connectedServer.pipeline.waitForUpgraderToBeRemoved() @@ -662,7 +662,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { // This request is missing a 'Kafkaesque' connection header. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nConnection: upgrade\r\nUpgrade: myproto\r\nKafkaesque: true\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // At this time the channel pipeline should not contain our handler: it should have removed itself. try connectedServer.pipeline.waitForUpgraderToBeRemoved() @@ -681,7 +681,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { } let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nConnection: upgrade\r\nUpgrade: something-else\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // At this time the channel pipeline should not contain our handler: it should have removed itself. try connectedServer.pipeline.waitForUpgraderToBeRemoved() @@ -728,7 +728,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { // This request is safe to upgrade. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto, exploder\r\nKafkaesque: yup\r\nConnection: upgrade, kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Let the machinery do its thing. XCTAssertNoThrow(try completePromise.futureResult.wait()) @@ -776,7 +776,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { // This request is safe to upgrade. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade,kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Let the machinery do its thing. XCTAssertNoThrow(try completePromise.futureResult.wait()) @@ -843,7 +843,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { // This request is safe to upgrade. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: noproto,myproto\r\nKafkaesque: yup\r\nConnection: upgrade, kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Let the machinery do its thing. XCTAssertNoThrow(try completePromise.futureResult.wait()) @@ -926,7 +926,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { // This request is safe to upgrade. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nConnection: upgrade\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Ok, we don't think this upgrade should have succeeded yet, but neither should it have failed. We want to // dispatch onto the server event loop and check that the channel still contains the upgrade handler. @@ -969,11 +969,11 @@ class HTTPServerUpgradeTestCase: XCTestCase { // This request is safe to upgrade, but is immediately followed by non-HTTP data. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nConnection: upgrade\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Ok, send the application data in. let appData = "supersecretawesome data definitely not http\r\nawesome\r\ndata\ryeah" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: appData)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: appData))).wait()) // Now we need to wait a little bit before we move forward. This needs to give time for the // I/O to settle. 100ms should be plenty to handle that I/O. @@ -1277,7 +1277,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { // This request is safe to upgrade. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Let the machinery do its thing. XCTAssertNoThrow(try connectedServer.pipeline.waitForUpgraderToBeRemoved()) @@ -1415,11 +1415,11 @@ class HTTPServerUpgradeTestCase: XCTestCase { var request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" request += "A" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) XCTAssertNoThrow(try firstByteDonePromise.futureResult.wait() as Void) - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: "B")).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: "B"))).wait()) XCTAssertNoThrow(try secondByteDonePromise.futureResult.wait() as Void) @@ -1609,7 +1609,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { // This request is safe to upgrade. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Let the machinery do its thing. XCTAssertNoThrow(try completePromise.futureResult.wait()) @@ -1849,7 +1849,7 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { // This request is safe to upgrade. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: notmyproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Let the machinery do its thing. XCTAssertNoThrow(try completePromise.futureResult.wait()) @@ -1908,7 +1908,7 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { // This request is safe to upgrade. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Let the machinery do its thing. XCTAssertNoThrow(try completePromise.futureResult.wait()) @@ -1965,7 +1965,7 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { // This request is safe to upgrade. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto, exploder\r\nKafkaesque: yup\r\nConnection: upgrade, kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Let the machinery do its thing. XCTAssertNoThrow(try completePromise.futureResult.wait()) @@ -2023,7 +2023,7 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { // This request is safe to upgrade. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: noproto,myproto\r\nKafkaesque: yup\r\nConnection: upgrade, kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Let the machinery do its thing. XCTAssertNoThrow(try completePromise.futureResult.wait()) @@ -2172,11 +2172,11 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { var request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" request += "A" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) XCTAssertNoThrow(try firstByteDonePromise.futureResult.wait() as Void) - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: "B")).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: "B"))).wait()) XCTAssertNoThrow(try secondByteDonePromise.futureResult.wait() as Void) @@ -2248,7 +2248,7 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { // This request is safe to upgrade. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Let the machinery do its thing. XCTAssertNoThrow(try completePromise.futureResult.wait()) @@ -2296,7 +2296,7 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { // This request is safe to upgrade. let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade,kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) // Let the machinery do its thing. XCTAssertNoThrow(try completePromise.futureResult.wait()) diff --git a/Tests/NIOPosixTests/BootstrapTest.swift b/Tests/NIOPosixTests/BootstrapTest.swift index cd75b45a36a..a6120ca94f9 100644 --- a/Tests/NIOPosixTests/BootstrapTest.swift +++ b/Tests/NIOPosixTests/BootstrapTest.swift @@ -270,7 +270,7 @@ class BootstrapTest: XCTestCase { var buffer = clientChannel.allocator.buffer(capacity: 1) buffer.writeString("a") - try clientChannel.writeAndFlush(buffer).wait() + try clientChannel.writeAndFlush(NIOAny(buffer)).wait() let serverAcceptedChannel = try serverAcceptedChannelPromise.futureResult.wait() diff --git a/Tests/NIOPosixTests/ChannelPipelineTest.swift b/Tests/NIOPosixTests/ChannelPipelineTest.swift index e8133289810..14b5697df9b 100644 --- a/Tests/NIOPosixTests/ChannelPipelineTest.swift +++ b/Tests/NIOPosixTests/ChannelPipelineTest.swift @@ -216,7 +216,7 @@ class ChannelPipelineTest: XCTestCase { } ).wait() - XCTAssertNoThrow(try channel.writeAndFlush("msg").wait() as Void) + XCTAssertNoThrow(try channel.writeAndFlush(NIOAny("msg")).wait() as Void) if let data = try channel.readOutbound(as: ByteBuffer.self) { XCTAssertEqual(buf, data) } else { diff --git a/Tests/NIOPosixTests/ChannelTests.swift b/Tests/NIOPosixTests/ChannelTests.swift index a1df4bee111..aa15db3b417 100644 --- a/Tests/NIOPosixTests/ChannelTests.swift +++ b/Tests/NIOPosixTests/ChannelTests.swift @@ -100,7 +100,7 @@ public final class ChannelTests: XCTestCase { var buffer = clientChannel.allocator.buffer(capacity: 1) buffer.writeString("a") - try clientChannel.writeAndFlush(buffer).wait() + try clientChannel.writeAndFlush(NIOAny(buffer)).wait() let serverAcceptedChannel = try serverAcceptedChannelPromise.futureResult.wait() @@ -153,11 +153,11 @@ public final class ChannelTests: XCTestCase { for _ in 0.. Channel in XCTAssertTrue(pipeline.channel is DeadChannel) return pipeline.channel - }.wait().writeAndFlush(()).wait() + }.wait().writeAndFlush(NIOAny(())).wait() ) { error in XCTAssertEqual(.ioOnClosedChannel, error as? ChannelError) } @@ -2580,7 +2580,7 @@ public final class ChannelTests: XCTestCase { .childChannelInitializer { channel in var buffer = channel.allocator.buffer(capacity: 4) buffer.writeString("foo") - channel.writeAndFlush(buffer, promise: nil) + channel.writeAndFlush(NIOAny(buffer), promise: nil) return channel.eventLoop.makeSucceededVoidFuture() } .bind(host: "127.0.0.1", port: 0) @@ -2953,7 +2953,7 @@ public final class ChannelTests: XCTestCase { func channelActive(context: ChannelHandlerContext) { var buffer = context.channel.allocator.buffer(capacity: 1) buffer.writeStaticString("X") - context.channel.writeAndFlush(buffer).map { context.channel }.cascade( + context.channel.writeAndFlush(Self.wrapOutboundOut(buffer)).map { context.channel }.cascade( to: self.channelAvailablePromise ) } @@ -3435,7 +3435,7 @@ public final class ChannelTests: XCTestCase { let buffer = client.allocator.buffer(string: "abcd") let writeCount = 3 - let promises = (0..= 0 && bufferedAmount <= buffer.readableBytes * writeCount) - promises.append(client.write(buffer)) + promises.append(client.write(NIOAny(buffer))) bufferedAmount = try client.getOption(.bufferedWritableBytes).wait() XCTAssertTrue( bufferedAmount >= buffer.readableBytes && bufferedAmount <= buffer.readableBytes * (writeCount + 1) diff --git a/Tests/NIOPosixTests/CodecTest.swift b/Tests/NIOPosixTests/CodecTest.swift index 483da014357..95f5057ddd5 100644 --- a/Tests/NIOPosixTests/CodecTest.swift +++ b/Tests/NIOPosixTests/CodecTest.swift @@ -143,16 +143,16 @@ public final class ByteToMessageDecoderTest: XCTestCase { let writerIndex = buffer.writerIndex buffer.moveWriterIndex(to: writerIndex - 1) - channel.pipeline.fireChannelRead(buffer) + channel.pipeline.fireChannelRead(NIOAny(buffer)) XCTAssertNoThrow(XCTAssertNil(try channel.readInbound())) buffer.moveWriterIndex(to: writerIndex) - channel.pipeline.fireChannelRead(buffer.getSlice(at: writerIndex - 1, length: 1)!) + channel.pipeline.fireChannelRead(NIOAny(buffer.getSlice(at: writerIndex - 1, length: 1)!)) var buffer2 = channel.allocator.buffer(capacity: 32) buffer2.writeInteger(Int32(2)) buffer2.writeInteger(Int32(3)) - channel.pipeline.fireChannelRead(buffer2) + channel.pipeline.fireChannelRead(NIOAny(buffer2)) XCTAssertNoThrow(try channel.finish()) @@ -173,7 +173,7 @@ public final class ByteToMessageDecoderTest: XCTestCase { var buffer = channel.allocator.buffer(capacity: 32) buffer.writeInteger(Int32(1)) - channel.pipeline.fireChannelRead(buffer) + channel.pipeline.fireChannelRead(NIOAny(buffer)) XCTAssertNoThrow(XCTAssertEqual(Int32(1), try channel.readInbound())) XCTAssertFalse(inactivePromiser.channelInactivePromise.futureResult.isFulfilled) @@ -203,7 +203,7 @@ public final class ByteToMessageDecoderTest: XCTestCase { inputBuffer.writeStaticString("whatwhat") for _ in 0..<10 { - channel.pipeline.fireChannelRead(inputBuffer) + channel.pipeline.fireChannelRead(NIOAny(inputBuffer)) } // We get one extra malloc the first time around the loop, when we have aliased the buffer. From then on it's @@ -385,13 +385,13 @@ public final class ByteToMessageDecoderTest: XCTestCase { self.hasReentranced = true reentrantWriteBuffer.clear() reentrantWriteBuffer.writeStaticString("3") - context.channel.pipeline.syncOperations.fireChannelRead(Self.wrapInboundOut(reentrantWriteBuffer)) + context.channel.pipeline.fireChannelRead(Self.wrapInboundOut(reentrantWriteBuffer)) } context.fireChannelRead(Self.wrapInboundOut(buffer.readSlice(length: 1)!)) if self.numberOfDecodeCalls == 2 { reentrantWriteBuffer.clear() reentrantWriteBuffer.writeStaticString("4") - context.channel.pipeline.syncOperations.fireChannelRead(Self.wrapInboundOut(reentrantWriteBuffer)) + context.channel.pipeline.fireChannelRead(Self.wrapInboundOut(reentrantWriteBuffer)) } return .continue } @@ -1901,7 +1901,7 @@ public final class MessageToByteEncoderTest: XCTestCase { line: line ) - XCTAssertNoThrow(try channel.writeAndFlush(Int32(5)).wait(), file: (file), line: line) + XCTAssertNoThrow(try channel.writeAndFlush(NIOAny(Int32(5))).wait(), file: (file), line: line) if var buffer = try channel.readOutbound(as: ByteBuffer.self) { XCTAssertEqual(Int32(5), buffer.readInteger()) diff --git a/Tests/NIOPosixTests/DatagramChannelTests.swift b/Tests/NIOPosixTests/DatagramChannelTests.swift index 601f5faa26a..933fc3279d8 100644 --- a/Tests/NIOPosixTests/DatagramChannelTests.swift +++ b/Tests/NIOPosixTests/DatagramChannelTests.swift @@ -185,7 +185,7 @@ class DatagramChannelTests: XCTestCase { var buffer = self.firstChannel.allocator.buffer(capacity: 256) buffer.writeStaticString("hello, world!") let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) - XCTAssertNoThrow(try self.firstChannel.writeAndFlush(writeData).wait()) + XCTAssertNoThrow(try self.firstChannel.writeAndFlush(NIOAny(writeData)).wait()) let reads = try self.secondChannel.waitForDatagrams(count: 1) XCTAssertEqual(reads.count, 1) @@ -197,7 +197,7 @@ class DatagramChannelTests: XCTestCase { func testEmptyDatagram() throws { let buffer = self.firstChannel.allocator.buffer(capacity: 0) let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) - XCTAssertNoThrow(try self.firstChannel.writeAndFlush(writeData).wait()) + XCTAssertNoThrow(try self.firstChannel.writeAndFlush(NIOAny(writeData)).wait()) let reads = try self.secondChannel.waitForDatagrams(count: 1) XCTAssertEqual(reads.count, 1) @@ -212,7 +212,7 @@ class DatagramChannelTests: XCTestCase { let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) var writeFutures: [EventLoopFuture] = [] for _ in 0..<5 { - writeFutures.append(self.firstChannel.write(writeData)) + writeFutures.append(self.firstChannel.write(NIOAny(writeData))) } self.firstChannel.flush() XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(writeFutures, on: self.firstChannel.eventLoop).wait()) @@ -242,7 +242,7 @@ class DatagramChannelTests: XCTestCase { // We submit to the loop here to make sure that we synchronously process the writes and checks // on writability. let writable: Bool = try self.firstChannel.eventLoop.submit { - self.firstChannel.write(writeData, promise: nil) + self.firstChannel.write(NIOAny(writeData), promise: nil) return self.firstChannel.isWritable }.wait() XCTAssertTrue(writable) @@ -251,7 +251,7 @@ class DatagramChannelTests: XCTestCase { let lastWritePromise = self.firstChannel.eventLoop.makePromise(of: Void.self) // The last write will push us over the edge. var writable: Bool = try self.firstChannel.eventLoop.submit { - self.firstChannel.write(writeData, promise: lastWritePromise) + self.firstChannel.write(NIOAny(writeData), promise: lastWritePromise) return self.firstChannel.isWritable }.wait() XCTAssertFalse(writable) @@ -266,7 +266,7 @@ class DatagramChannelTests: XCTestCase { var buffer = self.firstChannel.allocator.buffer(capacity: 256) buffer.writeStaticString("hello, world!") let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) - let promises = (0..<5).map { _ in self.firstChannel.write(writeData) } + let promises = (0..<5).map { _ in self.firstChannel.write(NIOAny(writeData)) } // Now close the channel. When that completes, all the futures should be complete too. let fulfilled = try self.firstChannel.close().map { @@ -291,7 +291,7 @@ class DatagramChannelTests: XCTestCase { var buffer = self.firstChannel.allocator.buffer(capacity: 1) buffer.writeString("a") let envelope = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) - self.firstChannel.write(envelope, promise: myPromise) + self.firstChannel.write(NIOAny(envelope), promise: myPromise) overall = EventLoopFuture.andAllSucceed([overall, myPromise.futureResult], on: self.firstChannel.eventLoop) } self.firstChannel.flush() @@ -317,7 +317,7 @@ class DatagramChannelTests: XCTestCase { let lotsOfData = Int(Int32.max) var written: Int64 = 0 while written <= lotsOfData { - self.firstChannel.write(envelope, promise: myPromise) + self.firstChannel.write(NIOAny(envelope), promise: myPromise) overall = EventLoopFuture.andAllSucceed( [overall, myPromise.futureResult], on: self.firstChannel.eventLoop @@ -338,7 +338,7 @@ class DatagramChannelTests: XCTestCase { buffer.writeRepeatingByte(4, count: bufferSize) let envelope = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) - let writeFut = self.firstChannel.write(envelope) + let writeFut = self.firstChannel.write(NIOAny(envelope)) self.firstChannel.flush() XCTAssertThrowsError(try writeFut.wait()) { error in @@ -360,9 +360,9 @@ class DatagramChannelTests: XCTestCase { let secondEnvelope = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) // Now, three writes. We're sandwiching the big write between two small ones. - let firstWrite = self.firstChannel.write(firstEnvelope) - let secondWrite = self.firstChannel.write(secondEnvelope) - let thirdWrite = self.firstChannel.writeAndFlush(firstEnvelope) + let firstWrite = self.firstChannel.write(NIOAny(firstEnvelope)) + let secondWrite = self.firstChannel.write(NIOAny(secondEnvelope)) + let thirdWrite = self.firstChannel.writeAndFlush(NIOAny(firstEnvelope)) // The first and third writes should be fine. XCTAssertNoThrow(try firstWrite.wait()) @@ -388,9 +388,9 @@ class DatagramChannelTests: XCTestCase { let secondEnvelope = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) // Now, three writes. We're sandwiching the big write between two small ones. - let firstWrite = self.firstChannel.write(firstEnvelope) - let secondWrite = self.firstChannel.write(secondEnvelope) - let thirdWrite = self.firstChannel.writeAndFlush(firstEnvelope) + let firstWrite = self.firstChannel.write(NIOAny(firstEnvelope)) + let secondWrite = self.firstChannel.write(NIOAny(secondEnvelope)) + let thirdWrite = self.firstChannel.writeAndFlush(NIOAny(firstEnvelope)) // The first and third writes should be fine. XCTAssertNoThrow(try firstWrite.wait()) @@ -582,8 +582,8 @@ class DatagramChannelTests: XCTestCase { data: buffer.getSlice(at: buffer.readerIndex, length: 5)! ) let secondWrite = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) - self.firstChannel.write(firstWrite, promise: nil) - self.firstChannel.write(secondWrite, promise: nil) + self.firstChannel.write(NIOAny(firstWrite), promise: nil) + self.firstChannel.write(NIOAny(secondWrite), promise: nil) self.firstChannel.flush() let reads = try self.secondChannel.waitForDatagrams(count: 2) @@ -641,9 +641,9 @@ class DatagramChannelTests: XCTestCase { let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) // We write this in three times. - self.firstChannel.write(writeData, promise: nil) - self.firstChannel.write(writeData, promise: nil) - self.firstChannel.write(writeData, promise: nil) + self.firstChannel.write(NIOAny(writeData), promise: nil) + self.firstChannel.write(NIOAny(writeData), promise: nil) + self.firstChannel.write(NIOAny(writeData), promise: nil) self.firstChannel.flush() let reads = try self.secondChannel.waitForDatagrams(count: 3) @@ -673,9 +673,9 @@ class DatagramChannelTests: XCTestCase { let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) // We write this in three times. - self.firstChannel.write(writeData, promise: nil) - self.firstChannel.write(writeData, promise: nil) - self.firstChannel.write(writeData, promise: nil) + self.firstChannel.write(NIOAny(writeData), promise: nil) + self.firstChannel.write(NIOAny(writeData), promise: nil) + self.firstChannel.write(NIOAny(writeData), promise: nil) self.firstChannel.flush() let reads = try self.secondChannel.waitForDatagrams(count: 3) @@ -707,9 +707,9 @@ class DatagramChannelTests: XCTestCase { // Ok, now we're good. Let's queue up a bunch of datagrams. We've configured to receive 10 at a time, so we'll send 30. for _ in 0..<29 { - self.firstChannel.write(writeData, promise: nil) + self.firstChannel.write(NIOAny(writeData), promise: nil) } - XCTAssertNoThrow(try self.firstChannel.writeAndFlush(writeData).wait()) + XCTAssertNoThrow(try self.firstChannel.writeAndFlush(NIOAny(writeData)).wait()) // Now we read. Rather than issue many read() calls, we'll turn autoread back on. XCTAssertNoThrow(try self.secondChannel.setOption(.autoRead, value: true).wait()) @@ -1176,14 +1176,17 @@ class DatagramChannelTests: XCTestCase { line: UInt = #line ) throws { // Wrap data in AddressedEnvelope if required. - let writeResult: EventLoopFuture + let writePayload: NIOAny if shouldWrapInAddressedEnvelope { let envelope = AddressedEnvelope(remoteAddress: destinationChannel.localAddress!, data: data) - writeResult = sourceChannel.writeAndFlush(envelope) + writePayload = NIOAny(envelope) } else { - writeResult = sourceChannel.writeAndFlush(data) + writePayload = NIOAny(data) } + // Write and flush. + let writeResult = sourceChannel.writeAndFlush(writePayload) + // Check the expected result. switch expectedResult { case .success: @@ -1453,7 +1456,7 @@ class DatagramChannelTests: XCTestCase { // Write the single large buffer. let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) - XCTAssertNoThrow(try self.firstChannel.writeAndFlush(writeData).wait()) + XCTAssertNoThrow(try self.firstChannel.writeAndFlush(NIOAny(writeData)).wait()) // The receiver will receive separate segments. let receivedBuffers = try self.secondChannel.waitForDatagrams(count: segments) @@ -1498,8 +1501,8 @@ class DatagramChannelTests: XCTestCase { // Write the single large buffer. let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) - let write1 = self.firstChannel.write(writeData) - let write2 = self.firstChannel.write(writeData) + let write1 = self.firstChannel.write(NIOAny(writeData)) + let write2 = self.firstChannel.write(NIOAny(writeData)) self.firstChannel.flush() XCTAssertNoThrow(try write1.wait()) XCTAssertNoThrow(try write2.wait()) @@ -1535,7 +1538,7 @@ class DatagramChannelTests: XCTestCase { func send(byteCount: Int) throws { let buffer = self.firstChannel.allocator.buffer(repeating: 1, count: byteCount) let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) - try self.firstChannel.writeAndFlush(writeData).wait() + try self.firstChannel.writeAndFlush(NIOAny(writeData)).wait() } do { @@ -1567,7 +1570,7 @@ class DatagramChannelTests: XCTestCase { let buffer = self.firstChannel.allocator.buffer(repeating: 1, count: segmentSize * udpMaxSegments + 1) let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) // The kernel limits messages to a maximum of UDP_MAX_SEGMENTS segments; any more should result in an error. - XCTAssertThrowsError(try self.firstChannel.writeAndFlush(writeData).wait()) { + XCTAssertThrowsError(try self.firstChannel.writeAndFlush(NIOAny(writeData)).wait()) { XCTAssert($0 is IOError) } } @@ -1637,7 +1640,7 @@ class DatagramChannelTests: XCTestCase { // Write to the channel with GRO enabled. do { let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) - let promises = (0..] = [] for i in 0..] = [] - for _ in 0..<10 { - futures.append(clientChannel.pipeline.syncOperations.write(NIOAny(fr))) - } - futures.append(clientChannel.pipeline.syncOperations.writeAndFlush(NIOAny(fr))) - - let bound = NIOLoopBound(handle, eventLoop: clientChannel.eventLoop) - return .andAllSucceed(futures, on: clientChannel.eventLoop).flatMapErrorThrowing { error in - try? bound.value.close() - throw error - }.flatMapThrowing { - try bound.value.close() - } - }.wait() + var futures: [EventLoopFuture] = [] + for _ in 0..<10 { + futures.append(clientChannel.write(NIOAny(fr))) + } + try clientChannel.writeAndFlush(NIOAny(fr)).wait() + for future in futures { + try future.wait() + } } } @@ -177,46 +159,25 @@ class FileRegionTest: XCTestCase { } try withTemporaryFile { fd, filePath in + let fh1 = try NIOFileHandle(path: filePath) + let fh2 = try NIOFileHandle(path: filePath) + let fr1 = FileRegion(fileHandle: fh1, readerIndex: 0, endIndex: bytes.count) + let fr2 = FileRegion(fileHandle: fh2, readerIndex: 0, endIndex: bytes.count) + defer { + XCTAssertNoThrow(try fh1.close()) + XCTAssertNoThrow(try fh2.close()) + } try content.write(toFile: filePath, atomically: false, encoding: .ascii) - - let future = clientChannel.eventLoop.submit { - let fh1 = try NIOFileHandle(path: filePath) - let fh2 = try NIOFileHandle(path: filePath) - return (fh1, fh2) - }.flatMap { (fh1, fh2) in - let fr1 = FileRegion(fileHandle: fh1, readerIndex: 0, endIndex: bytes.count) - let fr2 = FileRegion(fileHandle: fh2, readerIndex: 0, endIndex: bytes.count) - - let loopBoundFr2 = NIOLoopBound(fr2, eventLoop: clientChannel.eventLoop) - let loopBoundHandles = NIOLoopBound((fh1, fh2), eventLoop: clientChannel.eventLoop) - - return clientChannel.pipeline.syncOperations.writeAndFlush(NIOAny(fr1)).flatMap { - () -> EventLoopFuture in - let frFuture = clientChannel.pipeline.syncOperations.write(NIOAny(loopBoundFr2.value)) + XCTAssertThrowsError( + try clientChannel.writeAndFlush(NIOAny(fr1)).flatMap { () -> EventLoopFuture in + let frFuture = clientChannel.write(NIOAny(fr2)) var buffer = clientChannel.allocator.buffer(capacity: bytes.count) buffer.writeBytes(bytes) - let bbFuture = clientChannel.pipeline.syncOperations.write(NIOAny(buffer)) + let bbFuture = clientChannel.write(NIOAny(buffer)) clientChannel.close(promise: nil) clientChannel.flush() return frFuture.flatMap { bbFuture } - }.flatMapErrorThrowing { error in - let (fh1, fh2) = loopBoundHandles.value - try? fh1.close() - try? fh2.close() - throw error - }.flatMapThrowing { - let (fh1, fh2) = loopBoundHandles.value - do { - try fh1.close() - } catch { - try? fh2.close() - throw error - } - try fh2.close() - } - } - XCTAssertThrowsError( - try future.wait() + }.wait() ) { error in XCTAssertEqual(.ioOnClosedChannel, error as? ChannelError) } diff --git a/Tests/NIOPosixTests/IdleStateHandlerTest.swift b/Tests/NIOPosixTests/IdleStateHandlerTest.swift index 87ae3d8cd77..2f9b153356d 100644 --- a/Tests/NIOPosixTests/IdleStateHandlerTest.swift +++ b/Tests/NIOPosixTests/IdleStateHandlerTest.swift @@ -104,7 +104,7 @@ class IdleStateHandlerTest: XCTestCase { if !writeToChannel { var buffer = clientChannel.allocator.buffer(capacity: 4) buffer.writeStaticString("test") - XCTAssertNoThrow(try clientChannel.writeAndFlush(buffer).wait()) + XCTAssertNoThrow(try clientChannel.writeAndFlush(NIOAny(buffer)).wait()) } XCTAssertNoThrow(try clientChannel.closeFuture.wait()) } @@ -178,7 +178,7 @@ class IdleStateHandlerTest: XCTestCase { channel.pipeline.fireChannelRegistered() channel.pipeline.fireChannelActive() - channel.pipeline.fireChannelRead("") + channel.pipeline.fireChannelRead(NIOAny("")) channel.pipeline.fireChannelReadComplete() channel.pipeline.fireErrorCaught(ChannelError.alreadyClosed) channel.pipeline.fireUserInboundEventTriggered("") diff --git a/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift b/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift index 8b671fce63f..9b5c970d2f6 100644 --- a/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift +++ b/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift @@ -128,7 +128,7 @@ class UniversalBootstrapSupportTest: XCTestCase { // let's check that the order is right XCTAssertNoThrow( try client.eventLoop.submit { - client.pipeline.fireChannelRead(buffer) + client.pipeline.fireChannelRead(NIOAny(buffer)) client.pipeline.fireUserInboundEventTriggered(buffer) }.wait() ) diff --git a/Tests/NIOTestUtilsTests/NIOHTTP1TestServerTest.swift b/Tests/NIOTestUtilsTests/NIOHTTP1TestServerTest.swift index 65b12fbfe6f..a31f222e5f1 100644 --- a/Tests/NIOTestUtilsTests/NIOHTTP1TestServerTest.swift +++ b/Tests/NIOTestUtilsTests/NIOHTTP1TestServerTest.swift @@ -18,50 +18,6 @@ import NIOPosix import NIOTestUtils import XCTest -typealias SendableRequestPart = HTTPPart - -extension HTTPClientRequestPart { - init(_ target: SendableRequestPart) { - switch target { - case .head(let head): - self = .head(head) - case .body(let body): - self = .body(.byteBuffer(body)) - case .end(let end): - self = .end(end) - } - } -} - -extension SendableRequestPart { - init(_ target: HTTPClientRequestPart) throws { - switch target { - case .head(let head): - self = .head(head) - case .body(.byteBuffer(let body)): - self = .body(body) - case .body(.fileRegion): - throw NIOHTTP1TestServerError( - reason: "FileRegion is not Sendable and cannot be passed across concurrency domains" - ) - case .end(let end): - self = .end(end) - } - } -} - -/// A helper handler to transform a Sendable request into a -/// non-Sendable one, to manage warnings. -private final class TransformerHandler: ChannelOutboundHandler { - typealias OutboundIn = SendableRequestPart - typealias OutboundOut = HTTPClientRequestPart - - func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { - let response = self.unwrapOutboundIn(data) - context.write(self.wrapOutboundOut(.init(response)), promise: nil) - } -} - class NIOHTTP1TestServerTest: XCTestCase { private var group: EventLoopGroup! private let allocator = ByteBufferAllocator() @@ -86,8 +42,6 @@ class NIOHTTP1TestServerTest: XCTestCase { channel.pipeline.addHandler(AggregateBodyHandler()) }.flatMap { channel.pipeline.addHandler(TestHTTPHandler(responsePromise: responsePromise)) - }.flatMap { - channel.pipeline.addHandler(TransformerHandler()) } } return bootstrap.connect(host: "127.0.0.1", port: serverPort) @@ -106,9 +60,9 @@ class NIOHTTP1TestServerTest: XCTestCase { headers: headers ) - channel.write(SendableRequestPart.head(requestHead), promise: nil) - channel.write(SendableRequestPart.body(requestBuffer), promise: nil) - channel.writeAndFlush(SendableRequestPart.end(nil), promise: nil) + channel.write(NIOAny(HTTPClientRequestPart.head(requestHead)), promise: nil) + channel.write(NIOAny(HTTPClientRequestPart.body(.byteBuffer(requestBuffer))), promise: nil) + channel.writeAndFlush(NIOAny(HTTPClientRequestPart.end(nil)), promise: nil) } private func sendRequestTo(_ url: URL, body: String) throws -> EventLoopFuture { @@ -427,7 +381,7 @@ class NIOHTTP1TestServerTest: XCTestCase { var headers = HTTPHeaders() headers.add(name: "Content-Type", value: "text/plain; charset=utf-8") let requestHead = HTTPRequestHead(version: .http1_1, method: .POST, uri: "/uri", headers: headers) - channel.writeAndFlush(SendableRequestPart.head(requestHead), promise: nil) + channel.writeAndFlush(NIOAny(HTTPClientRequestPart.head(requestHead)), promise: nil) XCTAssertNoThrow( try testServer.receiveHeadAndVerify { head in XCTAssertEqual(head.uri, "/uri") @@ -438,7 +392,7 @@ class NIOHTTP1TestServerTest: XCTestCase { for _ in 0..<10 { channel.writeAndFlush( - SendableRequestPart.body(ByteBuffer(string: "ping")), + NIOAny(HTTPClientRequestPart.body(.byteBuffer(ByteBuffer(string: "ping")))), promise: nil ) XCTAssertNoThrow( @@ -449,7 +403,7 @@ class NIOHTTP1TestServerTest: XCTestCase { XCTAssertNoThrow(try testServer.writeOutbound(.body(.byteBuffer(ByteBuffer(string: "pong"))))) } - channel.writeAndFlush(SendableRequestPart.end(nil), promise: nil) + channel.writeAndFlush(NIOAny(HTTPClientRequestPart.end(nil)), promise: nil) XCTAssertNoThrow( try testServer.receiveEndAndVerify { trailers in XCTAssertNil(trailers) From 1e8e99a4433d096c21d58f3ac4ecf8337423ca94 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 20 Nov 2024 21:08:34 +0000 Subject: [PATCH 3/3] Revert "Handle Sendability of RemovableChannelHandler (#2953)" This reverts commit 411c2c553c177801aecb4f1a393e076aea62e3a1. --- Sources/NIOCore/AsyncAwaitSupport.swift | 5 - Sources/NIOCore/ChannelPipeline.swift | 161 ++++-------------- .../NIOHTTP1/HTTPServerUpgradeHandler.swift | 4 +- .../NIOHTTPClientUpgradeHandler.swift | 4 +- ...pplicationProtocolNegotiationHandler.swift | 10 +- ...pplicationProtocolNegotiationHandler.swift | 6 +- Sources/NIOTLS/SNIHandler.swift | 6 +- .../AsyncChannel/AsyncChannelTests.swift | 2 +- .../HTTPServerUpgradeTests.swift | 2 +- .../AcceptBackoffHandlerTest.swift | 4 +- Tests/NIOPosixTests/ChannelPipelineTest.swift | 10 +- Tests/NIOPosixTests/CodecTest.swift | 10 +- Tests/NIOPosixTests/MulticastTest.swift | 2 +- 13 files changed, 59 insertions(+), 167 deletions(-) diff --git a/Sources/NIOCore/AsyncAwaitSupport.swift b/Sources/NIOCore/AsyncAwaitSupport.swift index 066d78cedae..66c8b7dc9ae 100644 --- a/Sources/NIOCore/AsyncAwaitSupport.swift +++ b/Sources/NIOCore/AsyncAwaitSupport.swift @@ -196,11 +196,6 @@ extension ChannelPipeline { } @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - @available( - *, - deprecated, - message: "Use .syncOperations.removeHandler(context:) instead, this method is not Sendable-safe." - ) public func removeHandler(context: ChannelHandlerContext) async throws { try await self.removeHandler(context: context).get() } diff --git a/Sources/NIOCore/ChannelPipeline.swift b/Sources/NIOCore/ChannelPipeline.swift index dd752192875..d844729612e 100644 --- a/Sources/NIOCore/ChannelPipeline.swift +++ b/Sources/NIOCore/ChannelPipeline.swift @@ -167,9 +167,8 @@ public final class ChannelPipeline: ChannelInvoker { /// - handler: the `ChannelHandler` to add /// - position: The position in the `ChannelPipeline` to add `handler`. Defaults to `.last`. /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was added. - @preconcurrency public func addHandler( - _ handler: ChannelHandler & Sendable, + _ handler: ChannelHandler, name: String? = nil, position: ChannelPipeline.Position = .last ) -> EventLoopFuture { @@ -350,8 +349,7 @@ public final class ChannelPipeline: ChannelInvoker { /// - Parameters: /// - handler: the `ChannelHandler` to remove. /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed. - @preconcurrency - public func removeHandler(_ handler: RemovableChannelHandler & Sendable) -> EventLoopFuture { + public func removeHandler(_ handler: RemovableChannelHandler) -> EventLoopFuture { let promise = self.eventLoop.makePromise(of: Void.self) self.removeHandler(handler, promise: promise) return promise.futureResult @@ -373,11 +371,6 @@ public final class ChannelPipeline: ChannelInvoker { /// - Parameters: /// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed. /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed. - @available( - *, - deprecated, - message: "Use .syncOperations.removeHandler(context:) instead, this method is not Sendable-safe." - ) public func removeHandler(context: ChannelHandlerContext) -> EventLoopFuture { let promise = self.eventLoop.makePromise(of: Void.self) self.removeHandler(context: context, promise: promise) @@ -389,11 +382,14 @@ public final class ChannelPipeline: ChannelInvoker { /// - Parameters: /// - handler: the `ChannelHandler` to remove. /// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed. - @preconcurrency - public func removeHandler(_ handler: RemovableChannelHandler & Sendable, promise: EventLoopPromise?) { - @Sendable + public func removeHandler(_ handler: RemovableChannelHandler, promise: EventLoopPromise?) { func removeHandler0() { - self.syncOperations.removeHandler(handler, promise: promise) + switch self.contextSync(handler: handler) { + case .success(let context): + self.removeHandler(context: context, promise: promise) + case .failure(let error): + promise?.fail(error) + } } if self.eventLoop.inEventLoop { @@ -411,9 +407,13 @@ public final class ChannelPipeline: ChannelInvoker { /// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before. /// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed. public func removeHandler(name: String, promise: EventLoopPromise?) { - @Sendable func removeHandler0() { - self.syncOperations.removeHandler(name: name, promise: promise) + switch self.contextSync(name: name) { + case .success(let context): + self.removeHandler(context: context, promise: promise) + case .failure(let error): + promise?.fail(error) + } } if self.eventLoop.inEventLoop { @@ -430,22 +430,13 @@ public final class ChannelPipeline: ChannelInvoker { /// - Parameters: /// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed. /// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed. - @available( - *, - deprecated, - message: "Use .syncOperations.removeHandler(context:) instead, this method is not Sendable-safe." - ) public func removeHandler(context: ChannelHandlerContext, promise: EventLoopPromise?) { - let sendableView = context.sendableView - - guard sendableView.channelHandlerIsRemovable else { + guard context.handler is RemovableChannelHandler else { promise?.fail(ChannelError._unremovableHandler) return } - - @Sendable func removeHandler0() { - sendableView.wrappedValue.startUserTriggeredRemoval(promise: promise) + context.startUserTriggeredRemoval(promise: promise) } if self.eventLoop.inEventLoop { @@ -462,13 +453,7 @@ public final class ChannelPipeline: ChannelInvoker { /// - Parameters: /// - handler: the `ChannelHandler` for which the `ChannelHandlerContext` should be returned /// - Returns: the `EventLoopFuture` which will be notified once the the operation completes. - @available( - *, - deprecated, - message: "This method is not strict concurrency safe. Prefer .syncOperations.context(handler:)" - ) - @preconcurrency - public func context(handler: ChannelHandler & Sendable) -> EventLoopFuture { + public func context(handler: ChannelHandler) -> EventLoopFuture { let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self) if self.eventLoop.inEventLoop { @@ -1020,9 +1005,8 @@ extension ChannelPipeline { /// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`. /// /// - Returns: A future that will be completed when all of the supplied `ChannelHandler`s were added. - @preconcurrency public func addHandlers( - _ handlers: [ChannelHandler & Sendable], + _ handlers: [ChannelHandler], position: ChannelPipeline.Position = .last ) -> EventLoopFuture { let future: EventLoopFuture @@ -1046,9 +1030,8 @@ extension ChannelPipeline { /// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`. /// /// - Returns: A future that will be completed when all of the supplied `ChannelHandler`s were added. - @preconcurrency public func addHandlers( - _ handlers: (ChannelHandler & Sendable)..., + _ handlers: ChannelHandler..., position: ChannelPipeline.Position = .last ) -> EventLoopFuture { self.addHandlers(handlers, position: position) @@ -1166,51 +1149,18 @@ extension ChannelPipeline { /// - Parameters: /// - handler: the `ChannelHandler` to remove. /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed. + @preconcurrency public func removeHandler(_ handler: RemovableChannelHandler) -> EventLoopFuture { let promise = self.eventLoop.makePromise(of: Void.self) - self.removeHandler(handler, promise: promise) - return promise.futureResult - } - - /// Remove a ``ChannelHandler`` from the ``ChannelPipeline``. - /// - /// - Parameters: - /// - handler: the ``ChannelHandler`` to remove. - /// - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed. - public func removeHandler(_ handler: RemovableChannelHandler, promise: EventLoopPromise?) { switch self._pipeline.contextSync(handler: handler) { case .success(let context): - self.removeHandler(context: context, promise: promise) + self._pipeline.removeHandler(context: context, promise: promise) case .failure(let error): - promise?.fail(error) + promise.fail(error) } - } - - /// Remove a `ChannelHandler` from the `ChannelPipeline`. - /// - /// - Parameters: - /// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before. - /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed. - public func removeHandler(name: String) -> EventLoopFuture { - let promise = self.eventLoop.makePromise(of: Void.self) - self.removeHandler(name: name, promise: promise) return promise.futureResult } - /// Remove a ``ChannelHandler`` from the ``ChannelPipeline``. - /// - /// - Parameters: - /// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before. - /// - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed. - public func removeHandler(name: String, promise: EventLoopPromise?) { - switch self._pipeline.contextSync(name: name) { - case .success(let context): - self.removeHandler(context: context, promise: promise) - case .failure(let error): - promise?.fail(error) - } - } - /// Remove a `ChannelHandler` from the `ChannelPipeline`. /// /// - Parameters: @@ -1218,23 +1168,10 @@ extension ChannelPipeline { /// - Returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed. public func removeHandler(context: ChannelHandlerContext) -> EventLoopFuture { let promise = self.eventLoop.makePromise(of: Void.self) - self.removeHandler(context: context, promise: promise) + self._pipeline.removeHandler(context: context, promise: promise) return promise.futureResult } - /// Remove a `ChannelHandler` from the `ChannelPipeline`. - /// - /// - Parameters: - /// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed. - /// - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed. - public func removeHandler(context: ChannelHandlerContext, promise: EventLoopPromise?) { - if context.handler is RemovableChannelHandler { - context.startUserTriggeredRemoval(promise: promise) - } else { - promise?.fail(ChannelError.unremovableHandler) - } - } - /// Returns the `ChannelHandlerContext` for the given handler instance if it is in /// the `ChannelPipeline`, if it exists. /// @@ -1430,8 +1367,7 @@ extension ChannelPipeline.SynchronousOperations: Sendable {} extension ChannelPipeline { /// A `Position` within the `ChannelPipeline` used to insert handlers into the `ChannelPipeline`. - @preconcurrency - public enum Position: Sendable { + public enum Position { /// The first `ChannelHandler` -- the front of the `ChannelPipeline`. case first @@ -1439,15 +1375,18 @@ extension ChannelPipeline { case last /// Before the given `ChannelHandler`. - case before(ChannelHandler & Sendable) + case before(ChannelHandler) /// After the given `ChannelHandler`. - case after(ChannelHandler & Sendable) + case after(ChannelHandler) } } +@available(*, unavailable) +extension ChannelPipeline.Position: Sendable {} + /// Special `ChannelHandler` that forwards all events to the `Channel.Unsafe` implementation. -final class HeadChannelHandler: _ChannelOutboundHandler, Sendable { +final class HeadChannelHandler: _ChannelOutboundHandler { static let name = "head" static let sharedInstance = HeadChannelHandler() @@ -1503,7 +1442,7 @@ extension CloseMode { } /// Special `ChannelInboundHandler` which will consume all inbound events. -final class TailChannelHandler: _ChannelInboundHandler, Sendable { +final class TailChannelHandler: _ChannelInboundHandler { static let name = "tail" static let sharedInstance = TailChannelHandler() @@ -2081,42 +2020,6 @@ extension ChannelHandlerContext { } } -extension ChannelHandlerContext { - var sendableView: SendableView { - SendableView(wrapping: self) - } - - /// A wrapper over ``ChannelHandlerContext`` that allows access to the thread-safe API - /// surface on the type. - /// - /// Very little of ``ChannelHandlerContext`` is thread-safe, but in a rare few places - /// there are things we can access. This type makes those available. - struct SendableView: @unchecked Sendable { - private let context: ChannelHandlerContext - - fileprivate init(wrapping context: ChannelHandlerContext) { - self.context = context - } - - /// Whether the ``ChannelHandler`` associated with this context conforms to - /// ``RemovableChannelHandler``. - var channelHandlerIsRemovable: Bool { - // `context.handler` is not mutable, and set at construction, so this access is - // acceptable. The protocol conformance check is also safe. - self.context.handler is RemovableChannelHandler - } - - /// Grabs the underlying ``ChannelHandlerContext``. May only be called on the - /// event loop. - var wrappedValue: ChannelHandlerContext { - // The event loop lookup here is also thread-safe, so we can grab the value out - // and use it. - self.context.eventLoop.preconditionInEventLoop() - return self.context - } - } -} - extension ChannelPipeline: CustomDebugStringConvertible { public var debugDescription: String { // This method forms output in the following format: diff --git a/Sources/NIOHTTP1/HTTPServerUpgradeHandler.swift b/Sources/NIOHTTP1/HTTPServerUpgradeHandler.swift index cadaff37c32..af9a278237c 100644 --- a/Sources/NIOHTTP1/HTTPServerUpgradeHandler.swift +++ b/Sources/NIOHTTP1/HTTPServerUpgradeHandler.swift @@ -290,7 +290,7 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha ) self.upgradeState = .upgradeComplete // When we remove ourselves we'll be delivering any buffered data. - context.pipeline.syncOperations.removeHandler(context: context, promise: nil) + context.pipeline.removeHandler(context: context, promise: nil) case .failure(let error): // Remain in the '.upgrading' state. @@ -357,7 +357,7 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha context.fireChannelReadComplete() // Ok, we've delivered all the parts. We can now remove ourselves, which should happen synchronously. - context.pipeline.syncOperations.removeHandler(context: context, promise: nil) + context.pipeline.removeHandler(context: context, promise: nil) } /// Builds the initial mandatory HTTP headers for HTTP upgrade responses. diff --git a/Sources/NIOHTTP1/NIOHTTPClientUpgradeHandler.swift b/Sources/NIOHTTP1/NIOHTTPClientUpgradeHandler.swift index dbc403f69fb..b092512f16e 100644 --- a/Sources/NIOHTTP1/NIOHTTPClientUpgradeHandler.swift +++ b/Sources/NIOHTTP1/NIOHTTPClientUpgradeHandler.swift @@ -356,7 +356,7 @@ public final class NIOHTTPClientUpgradeHandler: ChannelDuplexHandler, RemovableC self.upgradeState = .upgradeComplete } .whenComplete { _ in - context.pipeline.syncOperations.removeHandler(context: context, promise: nil) + context.pipeline.removeHandler(context: context, promise: nil) } } } @@ -397,7 +397,7 @@ public final class NIOHTTPClientUpgradeHandler: ChannelDuplexHandler, RemovableC context.fireChannelRead(Self.wrapInboundOut(data)) // We've delivered the data. We can now remove ourselves, which should happen synchronously. - context.pipeline.syncOperations.removeHandler(context: context, promise: nil) + context.pipeline.removeHandler(context: context, promise: nil) } } diff --git a/Sources/NIOTLS/ApplicationProtocolNegotiationHandler.swift b/Sources/NIOTLS/ApplicationProtocolNegotiationHandler.swift index 3d6859dec9b..40c799ddfa8 100644 --- a/Sources/NIOTLS/ApplicationProtocolNegotiationHandler.swift +++ b/Sources/NIOTLS/ApplicationProtocolNegotiationHandler.swift @@ -128,12 +128,10 @@ public final class ApplicationProtocolNegotiationHandler: ChannelInboundHandler, } private func userFutureCompleted(context: ChannelHandlerContext, result: Result) { - context.eventLoop.assertInEventLoop() - switch self.stateMachine.userFutureCompleted(with: result) { case .fireErrorCaughtAndRemoveHandler(let error): context.fireErrorCaught(error) - context.pipeline.syncOperations.removeHandler(self, promise: nil) + context.pipeline.removeHandler(self, promise: nil) case .fireErrorCaughtAndStartUnbuffering(let error): context.fireErrorCaught(error) @@ -143,7 +141,7 @@ public final class ApplicationProtocolNegotiationHandler: ChannelInboundHandler, self.unbuffer(context: context) case .removeHandler: - context.pipeline.syncOperations.removeHandler(self, promise: nil) + context.pipeline.removeHandler(self, promise: nil) case .none: break @@ -151,8 +149,6 @@ public final class ApplicationProtocolNegotiationHandler: ChannelInboundHandler, } private func unbuffer(context: ChannelHandlerContext) { - context.eventLoop.assertInEventLoop() - while true { switch self.stateMachine.unbuffer() { case .fireChannelRead(let data): @@ -160,7 +156,7 @@ public final class ApplicationProtocolNegotiationHandler: ChannelInboundHandler, case .fireChannelReadCompleteAndRemoveHandler: context.fireChannelReadComplete() - context.pipeline.syncOperations.removeHandler(self, promise: nil) + context.pipeline.removeHandler(self, promise: nil) return } } diff --git a/Sources/NIOTLS/NIOTypedApplicationProtocolNegotiationHandler.swift b/Sources/NIOTLS/NIOTypedApplicationProtocolNegotiationHandler.swift index 4749be12ecd..c4a29841432 100644 --- a/Sources/NIOTLS/NIOTypedApplicationProtocolNegotiationHandler.swift +++ b/Sources/NIOTLS/NIOTypedApplicationProtocolNegotiationHandler.swift @@ -138,7 +138,7 @@ public final class NIOTypedApplicationProtocolNegotiationHandler.self).flatMap { context in - channel.pipeline.syncOperations.removeHandler(context: context) + channel.pipeline.removeHandler(context: context) }.whenFailure { error in XCTFail("unexpected error: \(error)") } @@ -834,7 +834,7 @@ public final class ByteToMessageDecoderTest: XCTestCase { mutating func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState { if let slice = buffer.readSlice(length: 16) { context.fireChannelRead(Self.wrapInboundOut(slice)) - context.pipeline.syncOperations.removeHandler(context: context).whenFailure { error in + context.pipeline.removeHandler(context: context).whenFailure { error in XCTFail("unexpected error: \(error)") } return .continue @@ -943,7 +943,7 @@ public final class ByteToMessageDecoderTest: XCTestCase { ) ) ) - context.pipeline.syncOperations.removeHandler(context: context).whenFailure { error in + context.pipeline.removeHandler(context: context).whenFailure { error in XCTFail("unexpected error: \(error)") } return .continue @@ -1101,7 +1101,7 @@ public final class ByteToMessageDecoderTest: XCTestCase { buffer.writeString("x") XCTAssertNoThrow(try channel.writeInbound(buffer)) let removalFuture = channel.pipeline.context(handlerType: ByteToMessageHandler.self).flatMap { - channel.pipeline.syncOperations.removeHandler(context: $0) + channel.pipeline.removeHandler(context: $0) } channel.embeddedEventLoop.run() XCTAssertNoThrow(try removalFuture.wait()) @@ -1135,7 +1135,7 @@ public final class ByteToMessageDecoderTest: XCTestCase { let channel = EmbeddedChannel(handler: ByteToMessageHandler(decoder)) XCTAssertNoThrow(try channel.connect(to: SocketAddress(ipAddress: "1.2.3.4", port: 5678)).wait()) let removalFuture = channel.pipeline.context(handlerType: ByteToMessageHandler.self).flatMap { - channel.pipeline.syncOperations.removeHandler(context: $0) + channel.pipeline.removeHandler(context: $0) } channel.embeddedEventLoop.run() XCTAssertNoThrow(try removalFuture.wait()) diff --git a/Tests/NIOPosixTests/MulticastTest.swift b/Tests/NIOPosixTests/MulticastTest.swift index c9ae19d2914..22a3e4d0a80 100644 --- a/Tests/NIOPosixTests/MulticastTest.swift +++ b/Tests/NIOPosixTests/MulticastTest.swift @@ -27,7 +27,7 @@ final class PromiseOnReadHandler: ChannelInboundHandler { func channelRead(context: ChannelHandlerContext, data: NIOAny) { self.promise.succeed(Self.unwrapInboundIn(data)) - context.pipeline.syncOperations.removeHandler(context: context, promise: nil) + _ = context.pipeline.removeHandler(context: context) } }