diff --git a/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift b/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift index 7d365c36209..e09d239e706 100644 --- a/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift +++ b/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift @@ -118,12 +118,12 @@ private final class TLSUserEventHandler: ChannelInboundHandler, RemovableChannel let alpn = String(string.dropFirst(15)) context.writeAndFlush(.init(ByteBuffer(string: "alpn:\(alpn)")), promise: nil) context.fireUserInboundEventTriggered(TLSUserEvent.handshakeCompleted(negotiatedProtocol: alpn)) - context.pipeline.removeHandler(self, promise: nil) + context.pipeline.syncOperations.removeHandler(self, promise: nil) } else if string.hasPrefix("alpn:") { context.fireUserInboundEventTriggered( TLSUserEvent.handshakeCompleted(negotiatedProtocol: String(string.dropFirst(5))) ) - context.pipeline.removeHandler(self, promise: nil) + context.pipeline.syncOperations.removeHandler(self, promise: nil) } else { context.fireChannelRead(data) } @@ -702,7 +702,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { try await withThrowingTaskGroup(of: EventLoopFuture.self) { group in group.addTask { // We have to use a fixed port here since we only get the channel once protocol negotiation is done - try await self.makeUDPServerChannelWithProtocolNegotiation( + try await Self.makeUDPServerChannelWithProtocolNegotiation( eventLoopGroup: eventLoopGroup, port: port ) @@ -713,7 +713,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { group.addTask { // We have to use a fixed port here since we only get the channel once protocol negotiation is done - try await self.makeUDPClientChannelWithProtocolNegotiation( + try await Self.makeUDPClientChannelWithProtocolNegotiation( eventLoopGroup: eventLoopGroup, port: port, proposedALPN: .string @@ -1063,7 +1063,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { try await withThrowingTaskGroup(of: EventLoopFuture.self) { group in group.addTask { // We have to use a fixed port here since we only get the channel once protocol negotiation is done - try await self.makeRawSocketServerChannelWithProtocolNegotiation( + try await Self.makeRawSocketServerChannelWithProtocolNegotiation( eventLoopGroup: eventLoopGroup ) } @@ -1072,7 +1072,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { try await Task.sleep(nanoseconds: 100_000_000) group.addTask { - try await self.makeRawSocketClientChannelWithProtocolNegotiation( + try await Self.makeRawSocketClientChannelWithProtocolNegotiation( eventLoopGroup: eventLoopGroup, proposedALPN: .string ) @@ -1239,7 +1239,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } } - private func makeRawSocketServerChannelWithProtocolNegotiation( + private static func makeRawSocketServerChannelWithProtocolNegotiation( eventLoopGroup: EventLoopGroup ) async throws -> EventLoopFuture { try await NIORawSocketBootstrap(group: eventLoopGroup) @@ -1262,7 +1262,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } } - private func makeRawSocketClientChannelWithProtocolNegotiation( + private static func makeRawSocketClientChannelWithProtocolNegotiation( eventLoopGroup: EventLoopGroup, proposedALPN: TLSUserEventHandler.ALPN ) async throws -> EventLoopFuture { @@ -1371,7 +1371,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } } - private func makeUDPServerChannelWithProtocolNegotiation( + private static func makeUDPServerChannelWithProtocolNegotiation( eventLoopGroup: EventLoopGroup, port: Int, proposedALPN: TLSUserEventHandler.ALPN? = nil @@ -1407,7 +1407,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } } - private func makeUDPClientChannelWithProtocolNegotiation( + private static func makeUDPClientChannelWithProtocolNegotiation( eventLoopGroup: EventLoopGroup, port: Int, proposedALPN: TLSUserEventHandler.ALPN diff --git a/Tests/NIOPosixTests/BlockingIOThreadPoolTest.swift b/Tests/NIOPosixTests/BlockingIOThreadPoolTest.swift index 38dabf7d084..86efbbe3cb0 100644 --- a/Tests/NIOPosixTests/BlockingIOThreadPoolTest.swift +++ b/Tests/NIOPosixTests/BlockingIOThreadPoolTest.swift @@ -127,7 +127,7 @@ class BlockingIOThreadPoolTest: XCTestCase { assert(weakThreadPool == nil, within: .seconds(1)) } - class SomeClass { + final class SomeClass: Sendable { init() {} func dummy() {} } diff --git a/Tests/NIOPosixTests/ChannelNotificationTest.swift b/Tests/NIOPosixTests/ChannelNotificationTest.swift index fe6fdf47297..ad7f1eaf6fc 100644 --- a/Tests/NIOPosixTests/ChannelNotificationTest.swift +++ b/Tests/NIOPosixTests/ChannelNotificationTest.swift @@ -408,13 +408,19 @@ class ChannelNotificationTest: XCTestCase { ServerBootstrap(group: group) .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) .serverChannelInitializer { channel in - channel.pipeline.addHandler(ServerSocketChannelLifecycleVerificationHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + ServerSocketChannelLifecycleVerificationHandler() + ) + } } .childChannelOption(.autoRead, value: false) .childChannelInitializer { channel in - channel.pipeline.addHandler( - AcceptedSocketChannelLifecycleVerificationHandler(acceptedChannelPromise) - ) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + AcceptedSocketChannelLifecycleVerificationHandler(acceptedChannelPromise) + ) + } } .bind(host: "127.0.0.1", port: 0).wait() ) @@ -422,7 +428,11 @@ class ChannelNotificationTest: XCTestCase { let clientChannel = try assertNoThrowWithValue( ClientBootstrap(group: group) .channelInitializer { channel in - channel.pipeline.addHandler(SocketChannelLifecycleVerificationHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + SocketChannelLifecycleVerificationHandler() + ) + } } .connect(to: serverChannel.localAddress!).wait() ) @@ -499,7 +509,9 @@ class ChannelNotificationTest: XCTestCase { .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) .childChannelOption(.autoRead, value: true) .childChannelInitializer { channel in - channel.pipeline.addHandler(OrderVerificationHandler(promise)) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(OrderVerificationHandler(promise)) + } } .bind(host: "127.0.0.1", port: 0).wait() ) diff --git a/Tests/NIOPosixTests/CodecTest.swift b/Tests/NIOPosixTests/CodecTest.swift index 483da014357..bacab155dbc 100644 --- a/Tests/NIOPosixTests/CodecTest.swift +++ b/Tests/NIOPosixTests/CodecTest.swift @@ -12,20 +12,21 @@ // //===----------------------------------------------------------------------===// +import NIOConcurrencyHelpers import NIOEmbedded import XCTest @testable import NIOCore @testable import NIOPosix -private var testDecoderIsNotQuadratic_mallocs = 0 -private var testDecoderIsNotQuadratic_reallocs = 0 +private let testDecoderIsNotQuadratic_mallocs = NIOLockedValueBox(0) +private let testDecoderIsNotQuadratic_reallocs = NIOLockedValueBox(0) private func testDecoderIsNotQuadratic_freeHook(_ ptr: UnsafeMutableRawPointer) { free(ptr) } private func testDecoderIsNotQuadratic_mallocHook(_ size: Int) -> UnsafeMutableRawPointer? { - testDecoderIsNotQuadratic_mallocs += 1 + testDecoderIsNotQuadratic_mallocs.withLockedValue { $0 += 1 } return malloc(size) } @@ -33,7 +34,7 @@ private func testDecoderIsNotQuadratic_reallocHook( _ ptr: UnsafeMutableRawPointer?, _ count: Int ) -> UnsafeMutableRawPointer? { - testDecoderIsNotQuadratic_reallocs += 1 + testDecoderIsNotQuadratic_reallocs.withLockedValue { $0 += 1 } return realloc(ptr, count) } @@ -56,7 +57,7 @@ private final class ChannelInactivePromiser: ChannelInboundHandler { } } -public final class ByteToMessageDecoderTest: XCTestCase { +final class ByteToMessageDecoderTest: XCTestCase { private final class ByteToInt32Decoder: ByteToMessageDecoder { typealias InboundIn = ByteBuffer typealias InboundOut = Int32 @@ -168,8 +169,8 @@ public final class ByteToMessageDecoderTest: XCTestCase { XCTAssertNoThrow(try channel.finish()) } let inactivePromiser = ChannelInactivePromiser(channel: channel) - _ = try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(ByteToInt32Decoder())) - _ = try channel.pipeline.addHandler(inactivePromiser).wait() + try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(ByteToInt32Decoder())) + try channel.pipeline.syncOperations.addHandler(inactivePromiser) var buffer = channel.allocator.buffer(capacity: 32) buffer.writeInteger(Int32(1)) @@ -188,8 +189,8 @@ public final class ByteToMessageDecoderTest: XCTestCase { XCTAssertNoThrow(try channel.finish()) } - XCTAssertEqual(testDecoderIsNotQuadratic_mallocs, 0) - XCTAssertEqual(testDecoderIsNotQuadratic_reallocs, 0) + XCTAssertEqual(testDecoderIsNotQuadratic_mallocs.withLockedValue { $0 }, 0) + XCTAssertEqual(testDecoderIsNotQuadratic_reallocs.withLockedValue { $0 }, 0) XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(ForeverDecoder()))) let dummyAllocator = ByteBufferAllocator( @@ -208,8 +209,8 @@ public final class ByteToMessageDecoderTest: XCTestCase { // We get one extra malloc the first time around the loop, when we have aliased the buffer. From then on it's // all reallocs of the underlying buffer. - XCTAssertEqual(testDecoderIsNotQuadratic_mallocs, 2) - XCTAssertEqual(testDecoderIsNotQuadratic_reallocs, 3) + XCTAssertEqual(testDecoderIsNotQuadratic_mallocs.withLockedValue { $0 }, 2) + XCTAssertEqual(testDecoderIsNotQuadratic_reallocs.withLockedValue { $0 }, 3) } func testMemoryIsReclaimedIfMostIsConsumed() { @@ -907,7 +908,9 @@ public final class ByteToMessageDecoderTest: XCTestCase { } } let channel = EmbeddedChannel(handler: ByteToMessageHandler(Take16BytesThenCloseAndPassOnDecoder())) - XCTAssertNoThrow(try channel.pipeline.addHandler(DoNotForwardChannelInactiveHandler(), position: .first).wait()) + XCTAssertNoThrow( + try channel.pipeline.syncOperations.addHandler(DoNotForwardChannelInactiveHandler(), position: .first) + ) var buffer = channel.allocator.buffer(capacity: 16) buffer.writeStaticString("0123456789abcdefQWER") XCTAssertNoThrow(try channel.writeInbound(buffer)) @@ -1283,7 +1286,10 @@ public final class ByteToMessageDecoderTest: XCTestCase { let decoder = Decoder() let channel = EmbeddedChannel(handler: ByteToMessageHandler(decoder)) XCTAssertNoThrow( - try channel.pipeline.addHandler(CheckStateOfDecoderHandler(decoder: decoder), position: .first).wait() + try channel.pipeline.syncOperations.addHandler( + CheckStateOfDecoderHandler(decoder: decoder), + position: .first + ) ) XCTAssertNoThrow(try channel.connect(to: SocketAddress(ipAddress: "1.2.3.4", port: 5678)).wait()) var buffer = channel.allocator.buffer(capacity: 3) @@ -1620,7 +1626,7 @@ public final class ByteToMessageDecoderTest: XCTestCase { let decoder = Decoder() let checker = CheckThingsAreOkayHandler() let channel = EmbeddedChannel(handler: ByteToMessageHandler(decoder)) - XCTAssertNoThrow(try channel.pipeline.addHandler(checker).wait()) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(checker)) XCTAssertNoThrow(try channel.connect(to: SocketAddress(ipAddress: "1.2.3.4", port: 5678)).wait()) channel.pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed) XCTAssertEqual(1, decoder.decodeLastCalls) @@ -1865,8 +1871,7 @@ public final class ByteToMessageDecoderTest: XCTestCase { } } -public final class MessageToByteEncoderTest: XCTestCase { - +final class MessageToByteEncoderTest: XCTestCase { private struct Int32ToByteEncoder: MessageToByteEncoder { typealias OutboundIn = Int32 @@ -1982,7 +1987,7 @@ private class PairOfBytesDecoder: ByteToMessageDecoder { } } -public final class MessageToByteHandlerTest: XCTestCase { +final class MessageToByteHandlerTest: XCTestCase { private struct ThrowingMessageToByteEncoder: MessageToByteEncoder { private struct HandlerError: Error {} diff --git a/Tests/NIOPosixTests/DatagramChannelTests.swift b/Tests/NIOPosixTests/DatagramChannelTests.swift index 4fdef1d868a..a1af48ed270 100644 --- a/Tests/NIOPosixTests/DatagramChannelTests.swift +++ b/Tests/NIOPosixTests/DatagramChannelTests.swift @@ -66,7 +66,7 @@ extension Channel { /// A class that records datagrams received and forwards them on. /// /// Used extensively in tests to validate messaging expectations. -final class DatagramReadRecorder: ChannelInboundHandler { +final class DatagramReadRecorder: ChannelInboundHandler { typealias InboundIn = AddressedEnvelope typealias InboundOut = AddressedEnvelope @@ -152,7 +152,12 @@ class DatagramChannelTests: XCTestCase { try DatagramBootstrap(group: group) .channelOption(.socketOption(.so_reuseaddr), value: 1) .channelInitializer { channel in - channel.pipeline.addHandler(DatagramReadRecorder(), name: "ByteReadRecorder") + channel.pipeline.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + DatagramReadRecorder(), + name: "ByteReadRecorder" + ) + } } .bind(host: host, port: 0) .wait() @@ -241,24 +246,26 @@ class DatagramChannelTests: XCTestCase { for _ in 0..<4 { // We submit to the loop here to make sure that we synchronously process the writes and checks // on writability. - let writable: Bool = try self.firstChannel.eventLoop.submit { - self.firstChannel.write(writeData, promise: nil) - return self.firstChannel.isWritable + let writable: Bool = try self.firstChannel.eventLoop.submit { [firstChannel] in + firstChannel!.write(writeData, promise: nil) + return firstChannel!.isWritable }.wait() XCTAssertTrue(writable) } let lastWritePromise = self.firstChannel.eventLoop.makePromise(of: Void.self) // The last write will push us over the edge. - var writable: Bool = try self.firstChannel.eventLoop.submit { - self.firstChannel.write(writeData, promise: lastWritePromise) - return self.firstChannel.isWritable + var writable: Bool = try self.firstChannel.eventLoop.submit { [firstChannel] in + firstChannel!.write(writeData, promise: lastWritePromise) + return firstChannel!.isWritable }.wait() XCTAssertFalse(writable) // Now we're going to flush, and check the writability immediately after. self.firstChannel.flush() - writable = try lastWritePromise.futureResult.map { _ in self.firstChannel.isWritable }.wait() + writable = try lastWritePromise.futureResult.map { [firstChannel] _ in + firstChannel!.isWritable + }.wait() XCTAssertTrue(writable) } @@ -300,31 +307,31 @@ class DatagramChannelTests: XCTestCase { } func testSendmmsgLotsOfData() throws { - var datagrams = 0 - - var overall = self.firstChannel.eventLoop.makeSucceededFuture(()) // We defer this work to the background thread because otherwise it incurs an enormous number of context // switches. - try self.firstChannel.eventLoop.submit { - let myPromise = self.firstChannel.eventLoop.makePromise(of: Void.self) + let overall = try self.firstChannel.eventLoop.submit { [firstChannel, secondChannel] in + let myPromise = firstChannel!.eventLoop.makePromise(of: Void.self) // For datagrams this buffer cannot be very large, because if it's larger than the path MTU it // will cause EMSGSIZE. let bufferSize = 1024 * 5 - var buffer = self.firstChannel.allocator.buffer(capacity: bufferSize) + var buffer = firstChannel!.allocator.buffer(capacity: bufferSize) buffer.writeRepeatingByte(4, count: bufferSize) - let envelope = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) + let envelope = AddressedEnvelope(remoteAddress: secondChannel!.localAddress!, data: buffer) let lotsOfData = Int(Int32.max) var written: Int64 = 0 + var overall = firstChannel!.eventLoop.makeSucceededFuture(()) + var datagrams = 0 while written <= lotsOfData { - self.firstChannel.write(envelope, promise: myPromise) + firstChannel!.write(envelope, promise: myPromise) overall = EventLoopFuture.andAllSucceed( [overall, myPromise.futureResult], - on: self.firstChannel.eventLoop + on: firstChannel!.eventLoop ) written += Int64(bufferSize) datagrams += 1 } + return overall }.wait() self.firstChannel.flush() @@ -415,7 +422,7 @@ class DatagramChannelTests: XCTestCase { } private func assertRecvMsgFails(error: Int32, active: Bool) throws { - final class RecvFromHandler: ChannelInboundHandler { + final class RecvFromHandler: ChannelInboundHandler, Sendable { typealias InboundIn = AddressedEnvelope typealias InboundOut = AddressedEnvelope @@ -790,7 +797,12 @@ class DatagramChannelTests: XCTestCase { .channelOption(.explicitCongestionNotification, value: true) .channelOption(.receivePacketInfo, value: receivePacketInfo) .channelInitializer { channel in - channel.pipeline.addHandler(DatagramReadRecorder(), name: "ByteReadRecorder") + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + DatagramReadRecorder(), + name: "ByteReadRecorder" + ) + } } .bind(host: address, port: 0) .wait() @@ -887,7 +899,7 @@ class DatagramChannelTests: XCTestCase { } func testWritabilityChangeDuringReentrantFlushNow() throws { - class EnvelopingHandler: ChannelOutboundHandler { + final class EnvelopingHandler: ChannelOutboundHandler, Sendable { typealias OutboundIn = ByteBuffer typealias OutboundOut = AddressedEnvelope @@ -901,10 +913,8 @@ class DatagramChannelTests: XCTestCase { } let loop = self.group.next() - let handler = ReentrantWritabilityChangingHandler( - becameUnwritable: loop.makePromise(), - becameWritable: loop.makePromise() - ) + let becameUnwritable = loop.makePromise(of: Void.self) + let becameWritable = loop.makePromise(of: Void.self) let channel1Future = DatagramBootstrap(group: self.group) .bind(host: "localhost", port: 0) @@ -916,7 +926,13 @@ class DatagramChannelTests: XCTestCase { let channel2Future = DatagramBootstrap(group: self.group) .channelOption(.writeBufferWaterMark, value: ReentrantWritabilityChangingHandler.watermark) .channelInitializer { channel in - channel.pipeline.addHandlers([EnvelopingHandler(), handler]) + channel.eventLoop.makeCompletedFuture { + let handler = ReentrantWritabilityChangingHandler( + becameUnwritable: becameUnwritable, + becameWritable: becameWritable + ) + try channel.pipeline.syncOperations.addHandlers([EnvelopingHandler(), handler]) + } } .bind(host: "localhost", port: 0) let channel2 = try assertNoThrowWithValue(try channel2Future.wait()) @@ -925,8 +941,8 @@ class DatagramChannelTests: XCTestCase { } // Now wait. - XCTAssertNoThrow(try handler.becameUnwritable.futureResult.wait()) - XCTAssertNoThrow(try handler.becameWritable.futureResult.wait()) + XCTAssertNoThrow(try becameUnwritable.futureResult.wait()) + XCTAssertNoThrow(try becameWritable.futureResult.wait()) } func testSetGetPktInfoOption() { @@ -973,7 +989,12 @@ class DatagramChannelTests: XCTestCase { let receiveChannel = try DatagramBootstrap(group: group) .channelOption(.receivePacketInfo, value: true) .channelInitializer { channel in - channel.pipeline.addHandler(DatagramReadRecorder(), name: "ByteReadRecorder") + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + DatagramReadRecorder(), + name: "ByteReadRecorder" + ) + } } .bind(host: address, port: 0) .wait() @@ -1113,7 +1134,11 @@ class DatagramChannelTests: XCTestCase { let channel = try DatagramBootstrap(group: group) .protocolSubtype(.init(.icmp)) .channelInitializer { channel in - channel.pipeline.addHandler(EchoRequestHandler(completePromise: completePromise)) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + EchoRequestHandler(completePromise: completePromise) + ) + } } .bind(host: "127.0.0.1", port: 0) .wait() @@ -1703,7 +1728,12 @@ class DatagramChannelTests: XCTestCase { .channelOption(.socketOption(.so_sndbuf), value: 16) .channelOption(.socketOption(.so_reuseaddr), value: 1) .channelInitializer { channel in - channel.pipeline.addHandler(DatagramReadRecorder(), name: "ByteReadRecorder") + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + DatagramReadRecorder(), + name: "ByteReadRecorder" + ) + } } .bind(host: "127.0.0.1", port: 0) .wait() diff --git a/Tests/NIOPosixTests/EventLoopFutureTest.swift b/Tests/NIOPosixTests/EventLoopFutureTest.swift index edd424eee67..ad2f88f2cba 100644 --- a/Tests/NIOPosixTests/EventLoopFutureTest.swift +++ b/Tests/NIOPosixTests/EventLoopFutureTest.swift @@ -437,7 +437,7 @@ class EventLoopFutureTest: XCTestCase { $0.count }.flatMapThrowing { 1 + $0 - }.whenSuccess { + }.assumeIsolated().whenSuccess { ran = true XCTAssertEqual($0, 6) } @@ -460,7 +460,7 @@ class EventLoopFutureTest: XCTestCase { }.map { (x: Int) -> Int in XCTFail("shouldn't have been called") return x - }.whenFailure { + }.assumeIsolated().whenFailure { ran = true XCTAssertEqual(.some(DummyError.dummyError), $0 as? DummyError) } @@ -483,7 +483,7 @@ class EventLoopFutureTest: XCTestCase { }.flatMapErrorThrowing { (_: Error) in XCTFail("shouldn't have been called") return 5 - }.whenSuccess { + }.assumeIsolated().whenSuccess { ran = true XCTAssertEqual($0, 5) } @@ -507,7 +507,7 @@ class EventLoopFutureTest: XCTestCase { }.map { (x: Int) -> Int in XCTFail("shouldn't have been called") return x - }.whenFailure { + }.assumeIsolated().whenFailure { ran = true XCTAssertEqual(.some(DummyError.dummyError2), $0 as? DummyError) } @@ -519,7 +519,7 @@ class EventLoopFutureTest: XCTestCase { let eventLoop = EmbeddedEventLoop() var state = 0 let p: EventLoopPromise = EventLoopPromise(eventLoop: eventLoop, file: #filePath, line: #line) - p.futureResult.map { + p.futureResult.assumeIsolated().map { XCTAssertEqual(state, 0) state += 1 }.map { diff --git a/Tests/NIOPosixTests/FileRegionTest.swift b/Tests/NIOPosixTests/FileRegionTest.swift index 4e06a86ee35..eb44691c61b 100644 --- a/Tests/NIOPosixTests/FileRegionTest.swift +++ b/Tests/NIOPosixTests/FileRegionTest.swift @@ -33,12 +33,20 @@ class FileRegionTest: XCTestCase { } let bytes = Array(content.utf8) - let countingHandler = ByteCountingHandler(numBytes: bytes.count, promise: group.next().makePromise()) - + let promise = group.next().makePromise(of: ByteBuffer.self) let serverChannel = try assertNoThrowWithValue( ServerBootstrap(group: group) .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) - .childChannelInitializer { $0.pipeline.addHandler(countingHandler) } + .childChannelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + ByteCountingHandler( + numBytes: bytes.count, + promise: promise + ) + ) + } + } .bind(host: "127.0.0.1", port: 0) .wait() ) @@ -80,7 +88,7 @@ class FileRegionTest: XCTestCase { var buffer = clientChannel.allocator.buffer(capacity: bytes.count) buffer.writeBytes(bytes) - try countingHandler.assertReceived(buffer: buffer) + XCTAssertEqual(try promise.futureResult.wait(), buffer) } } @@ -90,12 +98,20 @@ class FileRegionTest: XCTestCase { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let countingHandler = ByteCountingHandler(numBytes: 0, promise: group.next().makePromise()) - + let promise = group.next().makePromise(of: ByteBuffer.self) let serverChannel = try assertNoThrowWithValue( ServerBootstrap(group: group) .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) - .childChannelInitializer { $0.pipeline.addHandler(countingHandler) } + .childChannelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + ByteCountingHandler( + numBytes: 0, + promise: promise + ) + ) + } + } .bind(host: "127.0.0.1", port: 0) .wait() ) @@ -152,12 +168,17 @@ class FileRegionTest: XCTestCase { } let bytes = Array(content.utf8) - let countingHandler = ByteCountingHandler(numBytes: bytes.count, promise: group.next().makePromise()) - + let promise = group.next().makePromise(of: ByteBuffer.self) let serverChannel = try assertNoThrowWithValue( ServerBootstrap(group: group) .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) - .childChannelInitializer { $0.pipeline.addHandler(countingHandler) } + .childChannelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + ByteCountingHandler(numBytes: bytes.count, promise: promise) + ) + } + } .bind(host: "127.0.0.1", port: 0) .wait() ) @@ -223,7 +244,7 @@ class FileRegionTest: XCTestCase { var buffer = clientChannel.allocator.buffer(capacity: bytes.count) buffer.writeBytes(bytes) - try countingHandler.assertReceived(buffer: buffer) + XCTAssertEqual(try promise.futureResult.wait(), buffer) } } diff --git a/Tests/NIOPosixTests/HappyEyeballsTest.swift b/Tests/NIOPosixTests/HappyEyeballsTest.swift index bebf3d48f09..5dde4aec79d 100644 --- a/Tests/NIOPosixTests/HappyEyeballsTest.swift +++ b/Tests/NIOPosixTests/HappyEyeballsTest.swift @@ -52,7 +52,7 @@ extension Array where Element == Channel { } } -private class DummyError: Error, Equatable { +private final class DummyError: Error, Equatable { // For dummy error equality is identity. static func == (lhs: DummyError, rhs: DummyError) -> Bool { ObjectIdentifier(lhs) == ObjectIdentifier(rhs) @@ -75,7 +75,7 @@ private class ConnectRecorder: ChannelOutboundHandler { public func connect(context: ChannelHandlerContext, to: SocketAddress, promise: EventLoopPromise?) { self.targetHost = to.toString() let connectPromise = promise ?? context.eventLoop.makePromise() - connectPromise.futureResult.whenSuccess { + connectPromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenSuccess { self.state = .connected } context.connect(to: to, promise: connectPromise) @@ -83,7 +83,8 @@ private class ConnectRecorder: ChannelOutboundHandler { public func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise?) { let connectPromise = promise ?? context.eventLoop.makePromise() - connectPromise.futureResult.whenComplete { (_: Result) in + connectPromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { + (_: Result) in self.state = .closed } context.close(promise: connectPromise) @@ -94,9 +95,9 @@ private class ConnectionDelayer: ChannelOutboundHandler { typealias OutboundIn = Any typealias OutboundOut = Any - public var connectPromise: EventLoopPromise? + var connectPromise: EventLoopPromise? - public func connect(context: ChannelHandlerContext, to address: SocketAddress, promise: EventLoopPromise?) { + func connect(context: ChannelHandlerContext, to address: SocketAddress, promise: EventLoopPromise?) { self.connectPromise = promise } } @@ -199,42 +200,49 @@ extension EventLoopFuture { fileprivate func getError() -> Error? { guard self.isFulfilled else { return nil } - var error: Error? = nil - self.whenFailure { error = $0 } - return error! + let errorBox = NIOLockedValueBox(nil) + self.whenFailure { error in + errorBox.withLockedValue { $0 = error } + } + return errorBox.withLockedValue { $0! } } } // A simple resolver that allows control over the DNS resolution process. -private class DummyResolver: Resolver { +private final class DummyResolver: Resolver, Sendable { let v4Promise: EventLoopPromise<[SocketAddress]> let v6Promise: EventLoopPromise<[SocketAddress]> - enum Event { + enum Event: Sendable { case a(host: String, port: Int) case aaaa(host: String, port: Int) case cancel } - var events: [Event] = [] + private let _events: NIOLockedValueBox<[Event]> + + var events: [Event] { + self._events.withLockedValue { $0 } + } init(loop: EventLoop) { + self._events = NIOLockedValueBox([]) self.v4Promise = loop.makePromise() self.v6Promise = loop.makePromise() } func initiateAQuery(host: String, port: Int) -> EventLoopFuture<[SocketAddress]> { - events.append(.a(host: host, port: port)) + self._events.withLockedValue { $0.append(.a(host: host, port: port)) } return self.v4Promise.futureResult } func initiateAAAAQuery(host: String, port: Int) -> EventLoopFuture<[SocketAddress]> { - events.append(.aaaa(host: host, port: port)) + self._events.withLockedValue { $0.append(.aaaa(host: host, port: port)) } return self.v6Promise.futureResult } func cancelQueries() { - events.append(.cancel) + self._events.withLockedValue { $0.append(.cancel) } } } @@ -270,7 +278,7 @@ private func buildEyeballer( return (eyeballer: eyeballer, resolver: resolver, loop: loop) } -public final class HappyEyeballsTest: XCTestCase { +final class HappyEyeballsTest: XCTestCase { func testIPv4OnlyResolution() throws { let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80) let targetFuture = eyeballer.resolveAndConnect().flatMapThrowing { (channel) -> String? in @@ -598,7 +606,11 @@ public final class HappyEyeballsTest: XCTestCase { let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80, connectTimeout: .hours(1)) { let channelFuture = defaultChannelBuilder(loop: $0, family: $1) channelFuture.whenSuccess { channel in - try! channel.pipeline.addHandler(ConnectionDelayer(), name: CONNECT_DELAYER, position: .first).wait() + try! channel.pipeline.syncOperations.addHandler( + ConnectionDelayer(), + name: CONNECT_DELAYER, + position: .first + ) channels.append(channel) } return channelFuture @@ -667,7 +679,11 @@ public final class HappyEyeballsTest: XCTestCase { let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80, connectTimeout: .hours(1)) { let channelFuture = defaultChannelBuilder(loop: $0, family: $1) channelFuture.whenSuccess { channel in - try! channel.pipeline.addHandler(ConnectionDelayer(), name: CONNECT_DELAYER, position: .first).wait() + try! channel.pipeline.syncOperations.addHandler( + ConnectionDelayer(), + name: CONNECT_DELAYER, + position: .first + ) channels.append(channel) } return channelFuture @@ -737,7 +753,11 @@ public final class HappyEyeballsTest: XCTestCase { let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80, connectTimeout: .hours(1)) { let channelFuture = defaultChannelBuilder(loop: $0, family: $1) channelFuture.whenSuccess { channel in - try! channel.pipeline.addHandler(ConnectionDelayer(), name: CONNECT_DELAYER, position: .first).wait() + try! channel.pipeline.syncOperations.addHandler( + ConnectionDelayer(), + name: CONNECT_DELAYER, + position: .first + ) channels.append(channel) } return channelFuture @@ -828,7 +848,11 @@ public final class HappyEyeballsTest: XCTestCase { ) { let channelFuture = defaultChannelBuilder(loop: $0, family: $1) channelFuture.whenSuccess { channel in - try! channel.pipeline.addHandler(ConnectionDelayer(), name: CONNECT_DELAYER, position: .first).wait() + try! channel.pipeline.syncOperations.addHandler( + ConnectionDelayer(), + name: CONNECT_DELAYER, + position: .first + ) channels.append(channel) } return channelFuture @@ -877,7 +901,11 @@ public final class HappyEyeballsTest: XCTestCase { ) { let channelFuture = defaultChannelBuilder(loop: $0, family: $1) channelFuture.whenSuccess { channel in - try! channel.pipeline.addHandler(ConnectionDelayer(), name: CONNECT_DELAYER, position: .first).wait() + try! channel.pipeline.syncOperations.addHandler( + ConnectionDelayer(), + name: CONNECT_DELAYER, + position: .first + ) channels.append(channel) } return channelFuture @@ -932,7 +960,11 @@ public final class HappyEyeballsTest: XCTestCase { let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80, connectTimeout: .hours(1)) { let channelFuture = defaultChannelBuilder(loop: $0, family: $1) channelFuture.whenSuccess { channel in - try! channel.pipeline.addHandler(ConnectionDelayer(), name: CONNECT_DELAYER, position: .first).wait() + try! channel.pipeline.syncOperations.addHandler( + ConnectionDelayer(), + name: CONNECT_DELAYER, + position: .first + ) channels.append(channel) } return channelFuture @@ -982,7 +1014,11 @@ public final class HappyEyeballsTest: XCTestCase { ) { let channelFuture = defaultChannelBuilder(loop: $0, family: $1) channelFuture.whenSuccess { channel in - try! channel.pipeline.addHandler(ConnectionDelayer(), name: CONNECT_DELAYER, position: .first).wait() + try! channel.pipeline.syncOperations.addHandler( + ConnectionDelayer(), + name: CONNECT_DELAYER, + position: .first + ) channels.append(channel) } return channelFuture @@ -1027,7 +1063,11 @@ public final class HappyEyeballsTest: XCTestCase { let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80) { let channelFuture = defaultChannelBuilder(loop: $0, family: $1) channelFuture.whenSuccess { channel in - try! channel.pipeline.addHandler(ConnectionDelayer(), name: CONNECT_DELAYER, position: .first).wait() + try! channel.pipeline.syncOperations.addHandler( + ConnectionDelayer(), + name: CONNECT_DELAYER, + position: .first + ) channels.append(channel) } return channelFuture @@ -1077,7 +1117,11 @@ public final class HappyEyeballsTest: XCTestCase { let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80) { let channelFuture = defaultChannelBuilder(loop: $0, family: $1) channelFuture.whenSuccess { channel in - try! channel.pipeline.addHandler(ConnectionDelayer(), name: CONNECT_DELAYER, position: .first).wait() + try! channel.pipeline.syncOperations.addHandler( + ConnectionDelayer(), + name: CONNECT_DELAYER, + position: .first + ) channels.append(channel) } return channelFuture @@ -1220,7 +1264,11 @@ public final class HappyEyeballsTest: XCTestCase { ) { let channelFuture = defaultChannelBuilder(loop: $0, family: $1) channelFuture.whenSuccess { channel in - try! channel.pipeline.addHandler(ConnectionDelayer(), name: CONNECT_DELAYER, position: .first).wait() + try! channel.pipeline.syncOperations.addHandler( + ConnectionDelayer(), + name: CONNECT_DELAYER, + position: .first + ) channels.append(channel) } return channelFuture @@ -1268,7 +1316,11 @@ public final class HappyEyeballsTest: XCTestCase { ) { let channelFuture = defaultChannelBuilder(loop: $0, family: $1) channelFuture.whenSuccess { channel in - try! channel.pipeline.addHandler(ConnectionDelayer(), name: CONNECT_DELAYER, position: .first).wait() + try! channel.pipeline.syncOperations.addHandler( + ConnectionDelayer(), + name: CONNECT_DELAYER, + position: .first + ) channels.append(channel) } return channelFuture @@ -1338,7 +1390,11 @@ public final class HappyEyeballsTest: XCTestCase { let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80) { let channelFuture = defaultChannelBuilder(loop: $0, family: $1) channelFuture.whenSuccess { channel in - try! channel.pipeline.addHandler(ConnectionDelayer(), name: CONNECT_DELAYER, position: .first).wait() + try! channel.pipeline.syncOperations.addHandler( + ConnectionDelayer(), + name: CONNECT_DELAYER, + position: .first + ) channels.append(channel) } return channelFuture diff --git a/Tests/NIOPosixTests/IdleStateHandlerTest.swift b/Tests/NIOPosixTests/IdleStateHandlerTest.swift index 04b81173cfd..6aeab42cc92 100644 --- a/Tests/NIOPosixTests/IdleStateHandlerTest.swift +++ b/Tests/NIOPosixTests/IdleStateHandlerTest.swift @@ -39,7 +39,7 @@ class IdleStateHandlerTest: XCTestCase { private func testIdle( _ handler: @escaping @Sendable @autoclosure () -> IdleStateHandler, _ writeToChannel: Bool, - _ assertEventFn: @escaping (IdleStateHandler.IdleStateEvent) -> Bool + _ assertEventFn: @escaping @Sendable (IdleStateHandler.IdleStateEvent) -> Bool ) throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { diff --git a/Tests/NIOPosixTests/MulticastTest.swift b/Tests/NIOPosixTests/MulticastTest.swift index e2fe3a8b05a..59df814b7ce 100644 --- a/Tests/NIOPosixTests/MulticastTest.swift +++ b/Tests/NIOPosixTests/MulticastTest.swift @@ -16,7 +16,7 @@ import NIOCore import NIOPosix import XCTest -final class PromiseOnReadHandler: ChannelInboundHandler { +final class PromiseOnReadHandler: ChannelInboundHandler, Sendable { typealias InboundIn = AddressedEnvelope private let promise: EventLoopPromise diff --git a/Tests/NIOPosixTests/NIOThreadPoolTest.swift b/Tests/NIOPosixTests/NIOThreadPoolTest.swift index d88f7c87ece..25094ed2eca 100644 --- a/Tests/NIOPosixTests/NIOThreadPoolTest.swift +++ b/Tests/NIOPosixTests/NIOThreadPoolTest.swift @@ -31,8 +31,7 @@ class NIOThreadPoolTest: XCTestCase { XCTAssertNoThrow(try pool.syncShutdownGracefully()) } - var allThreadNames: Set = [] - let lock = NIOLock() + let allThreadNames = NIOLockedValueBox>([]) let threadNameCollectionSem = DispatchSemaphore(value: 0) let threadBlockingSem = DispatchSemaphore(value: 0) @@ -43,8 +42,8 @@ class NIOThreadPoolTest: XCTestCase { case .cancelled: XCTFail("work item \(i) cancelled") case .active: - lock.withLock { - allThreadNames.formUnion([NIOThread.current.currentName ?? "n/a"]) + allThreadNames.withLockedValue { + $0.formUnion([NIOThread.current.currentName ?? "n/a"]) } threadNameCollectionSem.signal() } @@ -61,7 +60,7 @@ class NIOThreadPoolTest: XCTestCase { threadBlockingSem.signal() } - let localAllThreads = lock.withLock { allThreadNames } + let localAllThreads = allThreadNames.withLockedValue { $0 } for threadNumber in (0..) in + ps[0].futureResult.assumeIsolated().whenComplete { (res: Result) in pwm.failAll(error: ChannelError.inputClosed, close: true) } diff --git a/Tests/NIOPosixTests/PipeChannelTest.swift b/Tests/NIOPosixTests/PipeChannelTest.swift index e2fd023b8ce..b36b79934b1 100644 --- a/Tests/NIOPosixTests/PipeChannelTest.swift +++ b/Tests/NIOPosixTests/PipeChannelTest.swift @@ -74,7 +74,7 @@ final class PipeChannelTest: XCTestCase { } func testBasicIO() throws { - class Handler: ChannelInboundHandler { + final class Handler: ChannelInboundHandler, Sendable { typealias InboundIn = ByteBuffer func channelRead(context: ChannelHandlerContext, data: NIOAny) { @@ -145,7 +145,7 @@ final class PipeChannelTest: XCTestCase { } func testWeWorkFineWithASingleFileDescriptor() throws { - final class EchoHandler: ChannelInboundHandler { + final class EchoHandler: ChannelInboundHandler, Sendable { typealias InboundIn = ByteBuffer typealias OutboundOut = ByteBuffer diff --git a/Tests/NIOPosixTests/SocketChannelTest.swift b/Tests/NIOPosixTests/SocketChannelTest.swift index a42eefb0fcc..fffffc4ace7 100644 --- a/Tests/NIOPosixTests/SocketChannelTest.swift +++ b/Tests/NIOPosixTests/SocketChannelTest.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Atomics +import NIOConcurrencyHelpers import NIOCore import NIOTestUtils import XCTest @@ -30,7 +31,7 @@ extension Array { } } -public final class SocketChannelTest: XCTestCase { +final class SocketChannelTest: XCTestCase { /// Validate that channel options are applied asynchronously. public func testAsyncSetOption() throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 2) @@ -169,7 +170,9 @@ public final class SocketChannelTest: XCTestCase { XCTAssertNoThrow( try serverChannel.eventLoop.flatSubmit { - serverChannel.pipeline.addHandler(AcceptHandler(promise)).flatMap { + serverChannel.eventLoop.makeCompletedFuture { + try serverChannel.pipeline.syncOperations.addHandler(AcceptHandler(promise)) + }.flatMap { serverChannel.register() }.flatMap { serverChannel.bind(to: try! SocketAddress(ipAddress: "127.0.0.1", port: 0)) @@ -275,7 +278,9 @@ public final class SocketChannelTest: XCTestCase { try channel.eventLoop.flatSubmit { // We need to hop to the EventLoop here to make sure that we don't get an ECONNRESET before we manage // to close. - channel.pipeline.addHandler(ActiveVerificationHandler(promise)).flatMap { + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(ActiveVerificationHandler(promise)) + }.flatMap { channel.register() }.flatMap { channel.connect(to: try! SocketAddress(ipAddress: "127.0.0.1", port: 9999)) @@ -493,7 +498,11 @@ public final class SocketChannelTest: XCTestCase { XCTAssertFalse(closePromise.futureResult.isFulfilled) } - XCTAssertNoThrow(try channel.pipeline.addHandler(NotificationOrderHandler()).wait()) + let added = channel.eventLoop.submit { + try channel.pipeline.syncOperations.addHandler(NotificationOrderHandler()) + } + + XCTAssertNoThrow(try added.wait()) // We need to call submit {...} here to ensure then {...} is called while on the EventLoop already to not have // a ECONNRESET sneak in. @@ -551,7 +560,7 @@ public final class SocketChannelTest: XCTestCase { state = .removed let loopBoundContext = context.loopBound - context.channel.closeFuture.whenComplete { (_: Result) in + context.channel.closeFuture.assumeIsolated().whenComplete { (_: Result) in let context = loopBoundContext.value XCTAssertNil(context.localAddress) XCTAssertNil(context.remoteAddress) @@ -564,10 +573,15 @@ public final class SocketChannelTest: XCTestCase { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let handler = AddressVerificationHandler(promise: group.next().makePromise()) + let promise = group.next().makePromise(of: Void.self) let serverChannel = try assertNoThrowWithValue( ServerBootstrap(group: group) - .childChannelInitializer { $0.pipeline.addHandler(handler) } + .childChannelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + let handler = AddressVerificationHandler(promise: promise) + return try channel.pipeline.syncOperations.addHandler(handler) + } + } .bind(host: "127.0.0.1", port: 0) .wait() ) @@ -580,7 +594,7 @@ public final class SocketChannelTest: XCTestCase { ) XCTAssertNoThrow(try clientChannel.close().wait()) - XCTAssertNoThrow(try handler.promise.futureResult.wait()) + XCTAssertNoThrow(try promise.futureResult.wait()) } func testSocketFlagNONBLOCKWorks() throws { @@ -619,7 +633,7 @@ public final class SocketChannelTest: XCTestCase { defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } // Handler that checks for the expected error. - final class ErrorHandler: ChannelInboundHandler { + final class ErrorHandler: ChannelInboundHandler, Sendable { typealias InboundIn = Channel typealias InboundOut = Channel @@ -650,7 +664,9 @@ public final class SocketChannelTest: XCTestCase { .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) .serverChannelOption(.backlog, value: 256) .serverChannelOption(.autoRead, value: false) - .serverChannelInitializer { channel in channel.pipeline.addHandler(ErrorHandler(serverPromise)) } + .serverChannelInitializer { + channel in channel.pipeline.addHandler(ErrorHandler(serverPromise)) + } .bind(host: "127.0.0.1", port: 0) .wait() ) @@ -777,7 +793,7 @@ public final class SocketChannelTest: XCTestCase { } } - class CloseAcceptedSocketsHandler: ChannelInboundHandler { + final class CloseAcceptedSocketsHandler: ChannelInboundHandler, Sendable { typealias InboundIn = Channel func channelRead(context: ChannelHandlerContext, data: NIOAny) { @@ -867,13 +883,13 @@ public final class SocketChannelTest: XCTestCase { channelInactivePromise.futureResult.cascade(to: channelHalfClosedPromise) } let eventCounter = EventCounterHandler() - var numberOfAcceptedChannels = 0 + let numberOfAcceptedChannels = NIOLockedValueBox(0) let server = try assertNoThrowWithValue( ServerBootstrap(group: group) .childChannelOption(.allowRemoteHalfClosure, value: mode == .halfClosureEnabled) .childChannelInitializer { channel in - numberOfAcceptedChannels += 1 - XCTAssertEqual(1, numberOfAcceptedChannels) + numberOfAcceptedChannels.withLockedValue { $0 += 1 } + XCTAssertEqual(1, numberOfAcceptedChannels.withLockedValue { $0 }) let drop = DropAllReadsOnTheFloorHandler( mode: mode, channelInactivePromise: channelInactivePromise, @@ -1015,9 +1031,12 @@ public final class SocketChannelTest: XCTestCase { let serverSocket = try assertNoThrowWithValue(ServerSocket(protocolFamily: .inet)) XCTAssertNoThrow(try serverSocket.bind(to: .init(ipAddress: "127.0.0.1", port: 0))) XCTAssertNoThrow(try serverSocket.listen()) + let serverAddress = try serverSocket.localAddress() let g = DispatchGroup() + // Transfer the socket to the dispatch queue. It's not used on this thread after this point. + let unsafeServerSocket = UnsafeTransfer(serverSocket) DispatchQueue(label: "accept one client").async(group: g) { - if let socket = try! serverSocket.accept() { + if let socket = try! unsafeServerSocket.wrappedValue.accept() { try! socket.close() } } @@ -1026,12 +1045,14 @@ public final class SocketChannelTest: XCTestCase { let client = try assertNoThrowWithValue( ClientBootstrap(group: group) .channelInitializer { channel in - channel.pipeline.addHandlers([ - eventCounter, - WaitForChannelInactiveHandler(channelInactivePromise: channelInactivePromise), - ]) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandlers([ + eventCounter, + WaitForChannelInactiveHandler(channelInactivePromise: channelInactivePromise), + ]) + } } - .connect(to: try serverSocket.localAddress()) + .connect(to: serverAddress) .wait() ) XCTAssertNoThrow( @@ -1089,7 +1110,7 @@ public final class SocketChannelTest: XCTestCase { } -class DropAllReadsOnTheFloorHandler: ChannelDuplexHandler { +final class DropAllReadsOnTheFloorHandler: ChannelDuplexHandler, Sendable { typealias InboundIn = Never typealias OutboundIn = Never typealias OutboundOut = ByteBuffer @@ -1140,7 +1161,7 @@ class DropAllReadsOnTheFloorHandler: ChannelDuplexHandler { // other side has actually fully closed the socket. let promise = self.waitUntilWriteFailedPromise func writeUntilError() { - context.writeAndFlush(Self.wrapOutboundOut(buffer)).map { + context.writeAndFlush(Self.wrapOutboundOut(buffer)).assumeIsolated().map { writeUntilError() }.whenFailure { (_: Error) in promise.succeed(()) diff --git a/Tests/NIOPosixTests/TestUtils.swift b/Tests/NIOPosixTests/TestUtils.swift index cb4d0cbc7b0..1d17d194ec4 100644 --- a/Tests/NIOPosixTests/TestUtils.swift +++ b/Tests/NIOPosixTests/TestUtils.swift @@ -147,7 +147,10 @@ func withTemporaryUnixDomainSocketPathName( return try body(shortEnoughPath) } -func withTemporaryFile(content: String? = nil, _ body: (NIOCore.NIOFileHandle, String) throws -> T) rethrows -> T { +func withTemporaryFile( + content: String? = nil, + _ body: (NIOCore.NIOFileHandle, String) throws -> T +) rethrows -> T { let (fd, path) = openTemporaryFile() let fileHandle = NIOFileHandle(_deprecatedTakingOwnershipOfDescriptor: fd) defer { @@ -178,7 +181,7 @@ func withTemporaryFile(content: String? = nil, _ body: (NIOCore.NIOFileHandle @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) func withTemporaryFile( content: String? = nil, - _ body: @escaping @Sendable (NIOCore.NIOFileHandle, String) async throws -> T + _ body: @escaping (NIOCore.NIOFileHandle, String) async throws -> T ) async rethrows -> T { let (fd, path) = openTemporaryFile() let fileHandle = NIOFileHandle(_deprecatedTakingOwnershipOfDescriptor: fd) diff --git a/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift b/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift index 9d729c5532a..70ea293b5ef 100644 --- a/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift +++ b/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import NIOConcurrencyHelpers import NIOCore import NIOEmbedded import NIOPosix @@ -52,10 +53,14 @@ class UniversalBootstrapSupportTest: XCTestCase { } } - final class FishOutChannelHandler: ChannelInboundHandler { + final class FishOutChannelHandler: ChannelInboundHandler, Sendable { typealias InboundIn = Channel - var acceptedChannels: [Channel] = [] + private let _acceptedChannels = NIOLockedValueBox<[Channel]>([]) + + var acceptedChannels: [Channel] { + self._acceptedChannels.withLockedValue { $0 } + } let firstArrived: EventLoopPromise @@ -65,8 +70,11 @@ class UniversalBootstrapSupportTest: XCTestCase { func channelRead(context: ChannelHandlerContext, data: NIOAny) { let channel = Self.unwrapInboundIn(data) - self.acceptedChannels.append(channel) - if self.acceptedChannels.count == 1 { + let count = self._acceptedChannels.withLockedValue { channels in + channels.append(channel) + return channels.count + } + if count == 1 { self.firstArrived.succeed(()) } context.fireChannelRead(data) @@ -133,7 +141,7 @@ class UniversalBootstrapSupportTest: XCTestCase { // let's check that the order is right XCTAssertNoThrow( - try client.eventLoop.submit { + try client.eventLoop.submit { [buffer] in client.pipeline.fireChannelRead(buffer) client.pipeline.fireUserInboundEventTriggered(buffer) }.wait()