From 9ef34bdc19ef5eae6bf4dda5da2df3919c151bbd Mon Sep 17 00:00:00 2001 From: George Barnett Date: Fri, 4 Apr 2025 16:44:55 +0100 Subject: [PATCH] Strict concurrency for NIOPosix ChannelTests --- Tests/NIOPosixTests/ChannelTests.swift | 310 ++++++++++-------- .../NIOPosixTests/DatagramChannelTests.swift | 2 +- 2 files changed, 181 insertions(+), 131 deletions(-) diff --git a/Tests/NIOPosixTests/ChannelTests.swift b/Tests/NIOPosixTests/ChannelTests.swift index 362260ebe09..fd939f43d11 100644 --- a/Tests/NIOPosixTests/ChannelTests.swift +++ b/Tests/NIOPosixTests/ChannelTests.swift @@ -73,28 +73,39 @@ class ChannelLifecycleHandler: ChannelInboundHandler { } } -public final class ChannelTests: XCTestCase { +final class ChannelTests: XCTestCase { func testBasicLifecycle() throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let loop = group.next() defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let serverAcceptedChannelPromise = group.next().makePromise(of: Channel.self) - let serverLifecycleHandler = ChannelLifecycleHandler() + let serverAcceptedChannelPromise = loop.makePromise(of: Channel.self) + let serverLifecycleHandler = try loop.submit { + NIOLoopBound(ChannelLifecycleHandler(), eventLoop: loop) + }.wait() let serverChannel = try assertNoThrowWithValue( - ServerBootstrap(group: group) + ServerBootstrap(group: loop) .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) .childChannelInitializer { channel in serverAcceptedChannelPromise.succeed(channel) - return channel.pipeline.addHandler(serverLifecycleHandler) + return channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(serverLifecycleHandler.value) + } }.bind(host: "127.0.0.1", port: 0).wait() ) - let clientLifecycleHandler = ChannelLifecycleHandler() + let clientLifecycleHandler = try loop.submit { + NIOLoopBound(ChannelLifecycleHandler(), eventLoop: loop) + }.wait() let clientChannel = try assertNoThrowWithValue( - ClientBootstrap(group: group) - .channelInitializer({ (channel: Channel) in channel.pipeline.addHandler(clientLifecycleHandler) }) + ClientBootstrap(group: loop) + .channelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(clientLifecycleHandler.value) + } + } .connect(to: serverChannel.localAddress!).wait() ) @@ -114,16 +125,16 @@ public final class ChannelTests: XCTestCase { clientChannel.closeFuture, serverAcceptedChannel.closeFuture, ], - on: group.next() + on: loop ).map { - XCTAssertEqual(clientLifecycleHandler.currentState, .unregistered) - XCTAssertEqual(serverLifecycleHandler.currentState, .unregistered) + XCTAssertEqual(clientLifecycleHandler.value.currentState, .unregistered) + XCTAssertEqual(serverLifecycleHandler.value.currentState, .unregistered) XCTAssertEqual( - clientLifecycleHandler.stateHistory, + clientLifecycleHandler.value.stateHistory, [.unregistered, .registered, .active, .inactive, .unregistered] ) XCTAssertEqual( - serverLifecycleHandler.stateHistory, + serverLifecycleHandler.value.stateHistory, [.unregistered, .registered, .active, .inactive, .unregistered] ) }.wait() @@ -1247,7 +1258,7 @@ public final class ChannelTests: XCTestCase { _ = pwm.add(data: .byteBuffer(buffer), promise: ps[2]) XCTAssertEqual(Int64(buffer.readableBytes * 3), pwm.bufferedBytes) - ps[0].futureResult.whenComplete { (_: Result) in + ps[0].futureResult.assumeIsolated().whenComplete { (_: Result) in pwm.failAll(error: ChannelError.inputClosed, close: true) } @@ -1415,15 +1426,19 @@ public final class ChannelTests: XCTestCase { try server.bind(to: SocketAddress.makeAddressResolvingHost("127.0.0.1", port: 0)) try server.listen() - let byteCountingHandler = ByteCountingHandler(numBytes: 4, promise: group.next().makePromise()) - let verificationHandler = ShutdownVerificationHandler( - shutdownEvent: .output, - promise: group.next().makePromise() - ) + let shutdownPromise = group.next().makePromise(of: Void.self) + let receivedPromise = group.next().makePromise(of: ByteBuffer.self) let future = ClientBootstrap(group: group) .channelInitializer { channel in - channel.pipeline.addHandler(verificationHandler).flatMap { - channel.pipeline.addHandler(byteCountingHandler) + channel.eventLoop.makeCompletedFuture { + let verificationHandler = ShutdownVerificationHandler( + shutdownEvent: .output, + promise: shutdownPromise + ) + try channel.pipeline.syncOperations.addHandler(verificationHandler) + + let byteCountingHandler = ByteCountingHandler(numBytes: 4, promise: receivedPromise) + try channel.pipeline.syncOperations.addHandler(byteCountingHandler) } } .connect(to: try! server.localAddress()) @@ -1443,7 +1458,7 @@ public final class ChannelTests: XCTestCase { try channel.writeAndFlush(buffer).wait() try channel.close(mode: .output).wait() - verificationHandler.waitForEvent() + try shutdownPromise.futureResult.wait() XCTAssertThrowsError(try channel.writeAndFlush(buffer).wait()) { error in XCTAssertEqual(.outputClosed, error as? ChannelError) } @@ -1455,7 +1470,9 @@ public final class ChannelTests: XCTestCase { } else { XCTFail() } - try byteCountingHandler.assertReceived(buffer: buffer) + + let received = try receivedPromise.futureResult.wait() + XCTAssertEqual(received, buffer) } func testCloseInput() throws { @@ -1471,7 +1488,7 @@ public final class ChannelTests: XCTestCase { try server.bind(to: SocketAddress.makeAddressResolvingHost("127.0.0.1", port: 0)) try server.listen() - class VerifyNoReadHandler: ChannelInboundHandler { + final class VerifyNoReadHandler: ChannelInboundHandler, Sendable { typealias InboundIn = ByteBuffer public func channelRead(context: ChannelHandlerContext, data: NIOAny) { @@ -1479,14 +1496,15 @@ public final class ChannelTests: XCTestCase { } } - let verificationHandler = ShutdownVerificationHandler( - shutdownEvent: .input, - promise: group.next().makePromise() - ) + let promise = group.next().makePromise(of: Void.self) let future = ClientBootstrap(group: group) .channelInitializer { channel in - channel.pipeline.addHandler(VerifyNoReadHandler()).flatMap { - channel.pipeline.addHandler(verificationHandler) + channel.pipeline.addHandler(VerifyNoReadHandler()).flatMapThrowing { + let verificationHandler = ShutdownVerificationHandler( + shutdownEvent: .input, + promise: promise + ) + return try channel.pipeline.syncOperations.addHandler(verificationHandler) } } .channelOption(.allowRemoteHalfClosure, value: true) @@ -1503,7 +1521,7 @@ public final class ChannelTests: XCTestCase { try channel.close(mode: .input).wait() - verificationHandler.waitForEvent() + try promise.futureResult.wait() var buffer = channel.allocator.buffer(capacity: 12) buffer.writeString("1234") @@ -1537,14 +1555,16 @@ public final class ChannelTests: XCTestCase { try server.bind(to: SocketAddress.makeAddressResolvingHost("127.0.0.1", port: 0)) try server.listen() - let verificationHandler = ShutdownVerificationHandler( - shutdownEvent: .input, - promise: group.next().makePromise() - ) - + let shutdownPromise = group.next().makePromise(of: Void.self) let future = ClientBootstrap(group: group) .channelInitializer { channel in - channel.pipeline.addHandler(verificationHandler) + channel.eventLoop.makeCompletedFuture { + let verificationHandler = ShutdownVerificationHandler( + shutdownEvent: .input, + promise: shutdownPromise + ) + try channel.pipeline.syncOperations.addHandler(verificationHandler) + } } .channelOption(.allowRemoteHalfClosure, value: true) .connect(to: try! server.localAddress()) @@ -1560,7 +1580,7 @@ public final class ChannelTests: XCTestCase { try accepted.shutdown(how: .WR) - verificationHandler.waitForEvent() + try shutdownPromise.futureResult.wait() var buffer = channel.allocator.buffer(capacity: 12) buffer.writeString("1234") @@ -1569,7 +1589,7 @@ public final class ChannelTests: XCTestCase { } func testInputAndOutputClosedResultsInFullClosure() throws { - final class PromiseOnChildChannelInitHandler: ChannelInboundHandler { + final class PromiseOnChildChannelInitHandler: ChannelInboundHandler, Sendable { typealias InboundIn = ByteBuffer private let promise: EventLoopPromise @@ -1583,7 +1603,7 @@ public final class ChannelTests: XCTestCase { } } - final class ChannelInactiveHandler: ChannelInboundHandler { + final class ChannelInactiveHandler: ChannelInboundHandler, Sendable { typealias InboundIn = ByteBuffer private let promise: EventLoopPromise @@ -1723,7 +1743,7 @@ public final class ChannelTests: XCTestCase { } func testWeDontCrashIfChannelReleasesBeforePipeline() throws { - final class StuffHandler: ChannelInboundHandler { + final class StuffHandler: ChannelInboundHandler, Sendable { typealias InboundIn = Never let promise: EventLoopPromise @@ -1853,7 +1873,7 @@ public final class ChannelTests: XCTestCase { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - class AddressVerificationHandler: ChannelInboundHandler { + final class AddressVerificationHandler: ChannelInboundHandler, Sendable { typealias InboundIn = Never public func channelActive(context: ChannelHandlerContext) { @@ -1898,12 +1918,12 @@ public final class ChannelTests: XCTestCase { self.readCountPromise = context.eventLoop.makePromise() } - public func expectRead(loop: EventLoop) -> EventLoopFuture { - loop.submit { + func expectRead(loop: EventLoop) -> EventLoopFuture { + loop.assumeIsolated().submit { self.waitingForReadPromise = loop.makePromise() - }.flatMap { + }.assumeIsolated().flatMap { self.waitingForReadPromise!.futureResult - } + }.nonisolated() } func channelReadComplete(context: ChannelHandlerContext) { @@ -1920,22 +1940,25 @@ public final class ChannelTests: XCTestCase { } } - public func issueDelayedRead() { + func issueDelayedRead() { self.context.read() } } let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let loop = group.next() defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let readDelayer = ReadDelayer() + let loopBoundDelayer = try loop.next().submit { NIOLoopBound(ReadDelayer(), eventLoop: loop) }.wait() let serverChannel = try assertNoThrowWithValue( ServerBootstrap(group: group) .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) - .childChannelInitializer { - $0.pipeline.addHandler(readDelayer) + .childChannelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandlers(loopBoundDelayer.value) + } } .bind(host: "127.0.0.1", port: 0).wait() ) @@ -1947,23 +1970,27 @@ public final class ChannelTests: XCTestCase { // We send a first write and expect it to arrive. var buffer = clientChannel.allocator.buffer(capacity: 12) - let firstReadPromise = readDelayer.expectRead(loop: serverChannel.eventLoop) + let firstReadFuture = try loop.submit { + loopBoundDelayer.value.expectRead(loop: loop) + }.wait() buffer.writeStaticString("hello, world") XCTAssertNoThrow(try clientChannel.writeAndFlush(buffer).wait()) - XCTAssertNoThrow(try firstReadPromise.wait()) + XCTAssertNoThrow(try firstReadFuture.wait()) // We send a second write. This won't arrive immediately. XCTAssertNoThrow(try clientChannel.writeAndFlush(buffer).wait()) - let readFuture = readDelayer.expectRead(loop: serverChannel.eventLoop) + let readFuture = try loop.submit { + loopBoundDelayer.value.expectRead(loop: loop) + }.wait() try serverChannel.eventLoop.scheduleTask(in: .milliseconds(100)) { XCTAssertFalse(readFuture.isFulfilled) }.futureResult.wait() // Ok, now let it proceed. XCTAssertNoThrow( - try serverChannel.eventLoop.submit { - XCTAssertEqual(readDelayer.reads, 2) - readDelayer.issueDelayedRead() + try loop.submit { + XCTAssertEqual(loopBoundDelayer.value.reads, 2) + loopBoundDelayer.value.issueDelayedRead() }.wait() ) @@ -1973,11 +2000,12 @@ public final class ChannelTests: XCTestCase { func testNoChannelReadBeforeEOFIfNoAutoRead() throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let loop = group.next() defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - class VerifyNoReadBeforeEOFHandler: ChannelInboundHandler { + final class VerifyNoReadBeforeEOFHandler: ChannelInboundHandler { typealias InboundIn = ByteBuffer var expectingData: Bool = false @@ -1992,13 +2020,18 @@ public final class ChannelTests: XCTestCase { } } - let handler = VerifyNoReadBeforeEOFHandler() + let loopBoundVerifyHandler = try loop.submit { + NIOLoopBound(VerifyNoReadBeforeEOFHandler(), eventLoop: loop) + }.wait() + let serverChannel = try assertNoThrowWithValue( ServerBootstrap(group: group) .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) .childChannelOption(.autoRead, value: false) .childChannelInitializer { ch in - ch.pipeline.addHandler(handler) + ch.eventLoop.makeCompletedFuture { + try ch.pipeline.syncOperations.addHandler(loopBoundVerifyHandler.value) + } } .bind(host: "127.0.0.1", port: 0).wait() ) @@ -2015,8 +2048,8 @@ public final class ChannelTests: XCTestCase { usleep(100 * 1000) // Now we send close. This should deliver data. - try clientChannel.eventLoop.flatSubmit { () -> EventLoopFuture in - handler.expectingData = true + try loop.flatSubmit { + loopBoundVerifyHandler.value.expectingData = true return clientChannel.close() }.wait() try serverChannel.close().wait() @@ -2028,20 +2061,20 @@ public final class ChannelTests: XCTestCase { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - class VerifyEOFReadOrderingAndCloseInChannelReadHandler: ChannelInboundHandler { + final class VerifyEOFReadOrderingAndCloseInChannelReadHandler: ChannelInboundHandler { typealias InboundIn = ByteBuffer private var seenEOF: Bool = false private var numberOfChannelReads: Int = 0 - public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { + func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { if case .some(ChannelEvent.inputClosed) = event as? ChannelEvent { self.seenEOF = true } context.fireUserInboundEventTriggered(event) } - public func channelRead(context: ChannelHandlerContext, data: NIOAny) { + func channelRead(context: ChannelHandlerContext, data: NIOAny) { if self.seenEOF { XCTFail( "Should not be called before seeing the EOF as autoRead is false and we did not call read(), but received \(self.unwrapInboundIn(data))" @@ -2060,7 +2093,9 @@ public final class ChannelTests: XCTestCase { .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) .childChannelOption(.autoRead, value: false) .childChannelInitializer { ch in - ch.pipeline.addHandler(VerifyEOFReadOrderingAndCloseInChannelReadHandler()) + ch.eventLoop.makeCompletedFuture { + try ch.pipeline.syncOperations.addHandler(VerifyEOFReadOrderingAndCloseInChannelReadHandler()) + } } .childChannelOption(.maxMessagesPerRead, value: 1) .childChannelOption(.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 8)) @@ -2122,7 +2157,9 @@ public final class ChannelTests: XCTestCase { .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) .childChannelOption(.autoRead, value: false) .childChannelInitializer { ch in - ch.pipeline.addHandler(CloseWhenWeGetEOFHandler(allDone: allDone)) + ch.eventLoop.makeCompletedFuture { + try ch.pipeline.syncOperations.addHandler(CloseWhenWeGetEOFHandler(allDone: allDone)) + } } // maxMessagesPerRead is large so that we definitely spin and seen the EOF .childChannelOption(.maxMessagesPerRead, value: 10) @@ -2159,7 +2196,7 @@ public final class ChannelTests: XCTestCase { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - class ChannelInactiveVerificationHandler: ChannelDuplexHandler { + final class ChannelInactiveVerificationHandler: ChannelDuplexHandler, Sendable { typealias InboundIn = ByteBuffer typealias OutboundIn = ByteBuffer @@ -2203,30 +2240,29 @@ public final class ChannelTests: XCTestCase { } func testAcceptsAfterCloseDontCauseIssues() throws { - class ChannelCollector { - let q = DispatchQueue(label: "q") - var channels: [ObjectIdentifier: Channel] = [:] + final class ChannelCollector: Sendable { + private let channels: NIOLockedValueBox<[ObjectIdentifier: Channel]> = NIOLockedValueBox([:]) deinit { - XCTAssertTrue(channels.isEmpty, "\(channels)") + XCTAssertTrue(self.channels.withLockedValue { $0.isEmpty }) } func add(_ channel: Channel) { - self.q.sync { - assert(self.channels[ObjectIdentifier(channel)] == nil) - channels[ObjectIdentifier(channel)] = channel - } + let key = ObjectIdentifier(channel) + let old = self.channels.withLockedValue { $0.updateValue(channel, forKey: key) } + assert(old == nil) } func remove(_ channel: Channel) { - let removed: Channel? = self.q.sync { - self.channels.removeValue(forKey: ObjectIdentifier(channel)) + let removed = self.channels.withLockedValue { + $0.removeValue(forKey: ObjectIdentifier(channel)) } XCTAssertTrue(removed != nil) } func closeAll() -> [EventLoopFuture] { - q.sync { self.channels.values }.map { channel in + let channels = self.channels.withLockedValue { $0.values } + return channels.map { channel in channel.close() } } @@ -2270,7 +2306,11 @@ public final class ChannelTests: XCTestCase { let collector = ChannelCollector() let serverBoot = ServerBootstrap(group: group) .childChannelInitializer { channel in - channel.pipeline.addHandler(CheckActiveHandler(channelCollector: collector)) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + CheckActiveHandler(channelCollector: collector) + ) + } } let listeningChannel = try serverBoot.bind(host: "127.0.0.1", port: 0).wait() let clientBoot = ClientBootstrap(group: group) @@ -2361,7 +2401,7 @@ public final class ChannelTests: XCTestCase { eventLoop: clientEL as! SelectableEventLoop ) - class WriteImmediatelyHandler: ChannelInboundHandler { + final class WriteImmediatelyHandler: ChannelInboundHandler, Sendable { typealias InboundIn = Any typealias OutboundOut = ByteBuffer @@ -2398,13 +2438,15 @@ public final class ChannelTests: XCTestCase { // again which our special `Socket` subclass will let succeed. _ = try sc.socket.connect(to: bootstrap.localAddress!) try serverWriteHappenedPromise.futureResult.wait() - try sc.pipeline.addHandler( - ReadDoesNotHappen( - hasRegisteredPromise: clientHasRegistered, - hasUnregisteredPromise: clientHasUnregistered, - hasReadPromise: clientHasRead + try sc.eventLoop.submit { + try sc.pipeline.syncOperations.addHandler( + ReadDoesNotHappen( + hasRegisteredPromise: clientHasRegistered, + hasUnregisteredPromise: clientHasUnregistered, + hasReadPromise: clientHasRead + ) ) - ).flatMap { + }.flatMap { // this will succeed and should not cause the socket to be read even though there'll be something // available to be read immediately sc.register() @@ -2599,8 +2641,8 @@ public final class ChannelTests: XCTestCase { // send us `EPOLLHUP`. To have it run synchronously, we need to invoke the `flatMap` on the eventloop that the // `register` will succeed. - sc.register().flatMap { - sc.pipeline.addHandler(VerifyThingsAreRightHandler(allDone: allDone)) + sc.register().flatMapThrowing { + try sc.pipeline.syncOperations.addHandler(VerifyThingsAreRightHandler(allDone: allDone)) }.flatMap { sc.connect(to: serverChannel.localAddress!) } @@ -2645,10 +2687,9 @@ public final class ChannelTests: XCTestCase { let allDone = group.next().makePromise(of: Void.self) let cf = try! sc.eventLoop.submit { - sc.pipeline.addHandler(VerifyConnectionFailureHandler(allDone: allDone)).flatMap { - sc.register().flatMap { - sc.connect(to: serverChannel.localAddress!) - } + try sc.pipeline.syncOperations.addHandler(VerifyConnectionFailureHandler(allDone: allDone)) + return sc.register().flatMap { + sc.connect(to: serverChannel.localAddress!) } }.wait() XCTAssertThrowsError(try cf.wait()) { error in @@ -2688,10 +2729,9 @@ public final class ChannelTests: XCTestCase { let allDone = group.next().makePromise(of: Void.self) try! sc.eventLoop.submit { - let f = sc.pipeline.addHandler(VerifyConnectionFailureHandler(allDone: allDone)).flatMap { - sc.register().flatMap { - sc.connect(to: serverChannel.localAddress!) - } + try sc.pipeline.syncOperations.addHandler(VerifyConnectionFailureHandler(allDone: allDone)) + let f = sc.register().flatMap { + sc.connect(to: serverChannel.localAddress!) } f.whenSuccess { XCTFail("Must not succeed") @@ -2935,7 +2975,7 @@ public final class ChannelTests: XCTestCase { } func testCloseInReadTriggeredByDrainingTheReceiveBufferBecauseOfWriteError() throws { - final class WriteWhenActiveHandler: ChannelInboundHandler { + final class WriteWhenActiveHandler: ChannelInboundHandler, Sendable { typealias InboundIn = ByteBuffer typealias OutboundOut = ByteBuffer @@ -2975,7 +3015,7 @@ public final class ChannelTests: XCTestCase { } } - final class MakeChannelInactiveInReadCausedByWriteErrorHandler: ChannelInboundHandler { + final class MakeChannelInactiveInReadCausedByWriteErrorHandler: ChannelInboundHandler, Sendable { typealias InboundIn = ByteBuffer typealias OutboundOut = ByteBuffer @@ -2990,23 +3030,10 @@ public final class ChannelTests: XCTestCase { self.allDonePromise = allDonePromise } - private func veryNasty_blockUntilReadBufferIsNonEmpty(channel: Channel) throws { - struct ThisIsNotASocketChannelError: Error {} - guard let channel = channel as? SocketChannel else { - throw ThisIsNotASocketChannelError() - } - try channel.socket.withUnsafeHandle { fd in - var pollFd: pollfd = .init(fd: fd, events: Int16(POLLIN), revents: 0) - let nfds = - try NIOBSDSocket.poll(fds: &pollFd, nfds: 1, timeout: -1) - XCTAssertEqual(1, nfds) - } - } - func channelActive(context: ChannelHandlerContext) { XCTAssert(serverChannel.eventLoop === context.eventLoop) let loopBoundContext = context.loopBound - self.serverChannel.whenSuccess { [channel = context.channel] serverChannel in + self.serverChannel.whenSuccess { [channel = context.channel, allDonePromise] serverChannel in // all of the following futures need to complete synchronously for this test to test the correct // thing. Therefore we keep track if we're still on the same stack frame. var inSameStackFrame = true @@ -3017,7 +3044,7 @@ public final class ChannelTests: XCTestCase { XCTAssertTrue(serverChannel.isActive) // we allow auto-read again to make sure that the socket buffer is drained on write error // (cf. https://github.com/apple/swift-nio/issues/593) - channel.setOption(.autoRead, value: true).flatMap { + channel.setOption(.autoRead, value: true).assumeIsolated().flatMap { let context = loopBoundContext.value // let's trigger the write error var buffer = channel.allocator.buffer(capacity: 16) @@ -3029,7 +3056,7 @@ public final class ChannelTests: XCTestCase { // arrived at the time the write fails. So this is a hack that makes sure they do have arrived. // (https://github.com/apple/swift-nio/issues/657) XCTAssertNoThrow( - try self.veryNasty_blockUntilReadBufferIsNonEmpty(channel: channel) + try veryNasty_blockUntilReadBufferIsNonEmpty(channel: channel) ) } workaroundSR487() @@ -3044,7 +3071,7 @@ public final class ChannelTests: XCTestCase { "unexpected error: \(error)" ) XCTAssertTrue(inSameStackFrame) - self.allDonePromise.succeed(()) + allDonePromise.succeed(()) } } } @@ -3106,7 +3133,8 @@ public final class ChannelTests: XCTestCase { defer { XCTAssertNoThrow(try singleThreadedELG.syncShutdownGracefully()) } - var numberOfAcceptedChannel = 0 + + let numberOfAcceptedChannel = NIOLockedValueBox(0) let acceptedChannels: [EventLoopPromise] = [ singleThreadedELG.next().makePromise(), singleThreadedELG.next().makePromise(), @@ -3119,8 +3147,8 @@ public final class ChannelTests: XCTestCase { .childChannelOption(.socketOption(.so_keepalive), value: 1) .childChannelOption(.tcpOption(.tcp_nodelay), value: 0) .childChannelInitializer { channel in - acceptedChannels[numberOfAcceptedChannel].succeed(channel) - numberOfAcceptedChannel += 1 + acceptedChannels[numberOfAcceptedChannel.withLockedValue { $0 }].succeed(channel) + numberOfAcceptedChannel.withLockedValue { $0 += 1 } return channel.eventLoop.makeSucceededFuture(()) } .bind(host: "127.0.0.1", port: 0) @@ -3234,7 +3262,12 @@ public final class ChannelTests: XCTestCase { let channel = try! assertNoThrowWithValue( ServerBootstrap(group: group).serverChannelInitializer { channel in - channel.pipeline.addHandler(MakeFirstCloseFailAndDontActuallyCloseHandler(), position: .first) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + MakeFirstCloseFailAndDontActuallyCloseHandler(), + position: .first + ) + } }.bind(host: "localhost", port: 0).wait() ) defer { @@ -3391,15 +3424,19 @@ public final class ChannelTests: XCTestCase { } let loop = group.next() - let handler = ReentrantWritabilityChangingHandler( - becameUnwritable: loop.makePromise(), - becameWritable: loop.makePromise() - ) + let becameUnwritable = loop.makePromise(of: Void.self) + let becameWritable = loop.makePromise(of: Void.self) let serverFuture = ServerBootstrap(group: group) - .childChannelOption(.writeBufferWaterMark, value: handler.watermark) + .childChannelOption(.writeBufferWaterMark, value: ReentrantWritabilityChangingHandler.watermark) .childChannelInitializer { channel in - channel.pipeline.addHandler(handler) + channel.eventLoop.makeCompletedFuture { + let handler = ReentrantWritabilityChangingHandler( + becameUnwritable: becameUnwritable, + becameWritable: becameWritable + ) + return try channel.pipeline.syncOperations.addHandler(handler) + } } .bind(host: "localhost", port: 0) @@ -3416,8 +3453,8 @@ public final class ChannelTests: XCTestCase { XCTAssertNoThrow(try client.close().wait()) } - XCTAssertNoThrow(try handler.becameUnwritable.futureResult.wait()) - XCTAssertNoThrow(try handler.becameWritable.futureResult.wait()) + XCTAssertNoThrow(try becameUnwritable.futureResult.wait()) + XCTAssertNoThrow(try becameWritable.futureResult.wait()) } func testChannelCanReportWritableBufferedBytes() throws { @@ -3483,7 +3520,7 @@ public final class ChannelTests: XCTestCase { } } -private final class FailRegistrationAndDelayCloseHandler: ChannelOutboundHandler { +private final class FailRegistrationAndDelayCloseHandler: ChannelOutboundHandler, Sendable { enum RegistrationFailedError: Error { case error } typealias OutboundIn = Never @@ -3545,7 +3582,7 @@ final class ReentrantWritabilityChangingHandler: ChannelInboundHandler { typealias InboundIn = ByteBuffer typealias OutboundOut = ByteBuffer - let watermark = ChannelOptions.Types.WriteBufferWaterMark(low: 100, high: 200) + static let watermark = ChannelOptions.Types.WriteBufferWaterMark(low: 100, high: 200) let becameWritable: EventLoopPromise let becameUnwritable: EventLoopPromise @@ -3566,7 +3603,7 @@ final class ReentrantWritabilityChangingHandler: ChannelInboundHandler { // again. let b1 = context.channel.allocator.buffer(repeating: 0, count: 50) let loopBoundContext = context.loopBound - context.write(Self.wrapOutboundOut(b1)).whenSuccess { _ in + context.write(Self.wrapOutboundOut(b1)).assumeIsolated().whenSuccess { _ in let context = loopBoundContext.value // We should still be writable. XCTAssertTrue(context.channel.isWritable) @@ -3599,3 +3636,16 @@ final class ReentrantWritabilityChangingHandler: ChannelInboundHandler { } } } + +private func veryNasty_blockUntilReadBufferIsNonEmpty(channel: Channel) throws { + struct ThisIsNotASocketChannelError: Error {} + guard let channel = channel as? SocketChannel else { + throw ThisIsNotASocketChannelError() + } + try channel.socket.withUnsafeHandle { fd in + var pollFd: pollfd = .init(fd: fd, events: Int16(POLLIN), revents: 0) + let nfds = + try NIOBSDSocket.poll(fds: &pollFd, nfds: 1, timeout: -1) + XCTAssertEqual(1, nfds) + } +} diff --git a/Tests/NIOPosixTests/DatagramChannelTests.swift b/Tests/NIOPosixTests/DatagramChannelTests.swift index 601f5faa26a..4fdef1d868a 100644 --- a/Tests/NIOPosixTests/DatagramChannelTests.swift +++ b/Tests/NIOPosixTests/DatagramChannelTests.swift @@ -914,7 +914,7 @@ class DatagramChannelTests: XCTestCase { } let channel2Future = DatagramBootstrap(group: self.group) - .channelOption(.writeBufferWaterMark, value: handler.watermark) + .channelOption(.writeBufferWaterMark, value: ReentrantWritabilityChangingHandler.watermark) .channelInitializer { channel in channel.pipeline.addHandlers([EnvelopingHandler(), handler]) }